一、Kafka整合flume
cd /opt/software/flume/conf/
vi flume-kafka.conf
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=spooldirt
a1.sources.r1.spoolDir=/root/flume-kafka
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=testTopice
a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize=20
a1.sinks.k1.kafka.producer.acks=1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
cd /root/
mkdir flume-kafka
ll
drwxr-xr-x?? 2 root root 4096 11月? 8 22:02 agent3
-rw-------.? 1 root root? 955 9月?? 6 16:41 anaconda-ks.cfg
-rw-r--r--?? 1 root root??? 1 10月 25 18:30 exec-logger.conf
drwxr-xr-x?? 2 root root?? 27 11月 15 18:00 flume-hive
drwxr-xr-x?? 2 root root??? 6 12月? 3 03:59 flume-kafka
-rw-r--r--?? 1 root root?? 63 11月? 8 23:01 flume-position.json
drwxr-xr-x? 22 root root 4096 12月? 3 03:59 kafkadata
drwxr-xr-x?? 3 root root?? 21 10月 11 18:32 opt
drwxr-xr-x?? 3 root root 4096 11月? 8 18:17 testDir
drwxr-xr-x?? 2 root root?? 38 11月? 8 19:01 testdir2
-rw-r--r--?? 1 root root? 108 11月 15 17:09 test.log
drwxr-xr-x?? 2 root root 4096 11月? 8 18:49 testSink
kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 2
flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1
kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node:9092,node03:9092 --from-beginning
cd /root/flume-kafka/
echo "hello" >>test3.txt
echo "hello flume" >>test2.txt
cd /opt/software/flume/conf/
vi kafka-flume.conf
kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node:9092,node03:9092 --from-beginning
Hello
?hello kafka
hello flume
flume-ng agent -c conf/ -f conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console
二、kafka架構深入
分區策略:輪詢(RoundRobin)、按 Key 哈希(Hash)、自定義分區。
數據可靠性:通過 ACK 機制(0、1、-1)和 ISR(同步副本集合)保證,acks=-1時需等待 Leader 和 Follower 全部落盤。
事務與冪等性:0.11 版本引入冪等性(enable.idompotence=true),結合 At Least Once 實現 Exactly Once 語義。
三、Spark-Streaming核心編程
1.DStream 轉換
DStream 是 Spark-Streaming 處理實時數據的基本單位,可以理解為 “實時數據流”。
轉換操作就是對這個數據流進行加工處理,比如過濾、拆分、統計等,就像工廠流水線對原材料進行加工一樣。
操作分為兩類:
無狀態轉換:只處理當前批次的數據,不關心歷史數據(比如統計當前 3 秒內的單詞數)。
有狀態轉換:會記住歷史數據(比如統計從程序啟動到現在的總單詞數),文檔里沒詳細講,重點在無狀態部分。
2.無狀態轉換的常見操作
無狀態轉換就像 “即處理即丟棄”,每次只處理當前批次的數據,不保留之前的結果。
常見函數舉例
3.Transform轉換
Transform是一個 “萬能轉換” 函數,可以對每個批次的 RDD(DStream 內部由多個 RDD 組成)執行任意自定義操作,甚至可以使用 Spark 原生的 RDD 函數(即使 DStream 沒有直接提供)
4.Join轉換
join用于合并兩個數據流中相同鍵的數據,就像拼拼圖一樣,只有鍵匹配的部分才能拼在一起。
適用于合并兩個來源的單詞數據
最后運行結果