需求1:利用flume監控某目錄中新生成的文件,將監控到的變更數據發送給kafka,kafka將收到的數據打印到控制臺:
在flume/conf下添加.conf文件,
vi flume-kafka.conf
# 定義 Agent 組件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
# 配置 Source(監控目錄)
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/flume-kafka/
a1.sources.r1.inputCharset=utf-8
# 配置 Sink(寫入 Kafka)
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#指定寫入數據到哪一個topic
a1.sinks.k1.kafka.topic=testTopic
#指定寫入數據到哪一個集群
a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
#指定寫入批次
a1.sinks.k1.kafka.flumeBatchSize=20
#指定acks機制
a1.sinks.k1.kafka.producer.acks=1
# 配置 Channel(內存緩沖)
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
# 最大存儲 1000 個 Event
a1.channels.c1.transactionCapacity=100
# 每次事務處理 100 個 Event
?
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
?
在指定目錄之下創建文件夾:
kafka中創建topic:
kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 3
啟動flume:
flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console
啟動kafka消費者,驗證數據寫入成功
kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node02:9029,node03:9092 --from-beginning
新增測試數據:
echo "hello flume,hello kafka" >> /root/flume-kafka/1.txt
flume:
Kafka消費者
?
需求2:Kafka生產者生成的數據利用Flume進行采集,將采集到的數據打印到Flume的控制臺上。
vi?kafka-flume.conf
# 定義 Agent 組件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#?將 Flume Source 設置為 Kafka 消費者,從指定 Kafka 主題拉取數據。
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
#指定zookeeper集群地址
a1.sources.r1.zookeepers=node01:2181,node02:2181,node03:2181
#指定kafka集群地址
a1.sources.r1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
#指定生成消息的topic
a1.sources.r1.kafka.topics=testTopic
# 將 Flume 傳輸的數據內容直接打印到日志中,
a1.sinks.k1.type=logger
# 配置 Channel(內存緩沖)
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transcationCapacity=100
?
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
啟動Kafka生產者
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic testTopic
啟動Flume
flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console
在生產者中寫入數據
Flume中采集到數據?
?
?
?
?