安裝環境
- Java環境, 略 (Flume依賴Java)
- Flume下載, 略
- Scala環境, 略 (Kafka依賴Scala)
- Kafak下載, 略
- Hadoop下載, 略 (不需要啟動, 寫OSS依賴)
配置Hadoop
下載JindoSDK(連接OSS依賴), 下載地址Github
解壓后配置環境變量
export JINDOSDK_HOME=/usr/lib/jindosdk-x.x.x
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${JINDOSDK_HOME}/lib/*
修改Hadoop配置, core-site.xml
<property><name>fs.oss.credentials.provider</name><value>com.aliyun.jindodata.oss.auth.SimpleCredentialsProvider</value></property><property><name>fs.oss.accessKeyId</name><value>xxxx</value></property><property><name>fs.oss.accessKeySecret</name><value>xxxx</value></property><property><name>fs.oss.endpoint</name><value>xxxxx</value></property><property><name>fs.AbstractFileSystem.oss.impl</name><value>com.aliyun.jindodata.oss.JindoOSS</value></property><property><name>fs.oss.impl</name><value>com.aliyun.jindodata.oss.JindoOssFileSystem</value></property>
配置可參考非EMR集群接入OSS-HDFS服務快速入門
配置Flume
此部分全文最關鍵, 請仔細看
- 基礎配置部分, Flume配置
a1.sources = source1
a1.sinks = k1
a1.channels = c1a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.channels = c1
a1.sources.source1.kafka.bootstrap.servers = xxx
a1.sources.source1.kafka.topics = test
a1.sources.source1.kafka.consumer.group.id = flume-sink-group # 消費者組, 云組件需要先在管理后臺創建
a1.sources.source1.kafka.consumer.auto.offset.reset = earliest # 從頭消費Kafka里數據a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = oss://xxx/test/%Y%m%d # 自動按天分文件夾
a1.sinks.k1.hdfs.fileType=DataStreama1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
可參考使用Flume同步EMR Kafka集群的數據至OSS-HDFS服務
2. 進階配置, 根據自己情況按需配置
a1.sinks.k1.hdfs.rollInterval = 600 # 5分鐘切換一個新文件
a1.sinks.k1.hdfs.rollSize = 134217728 # 或者文件大小達到128M則切換新文件
a1.sinks.k1.hdfs.rollCount = 0 # 寫入多少條數據切換新文件, 0為不限制
我這里是為了防止sink的文件過于零碎, 但因為使用的memory channel, 緩存時間過長容易丟數據
3. Flume JVM參數
默認啟動時-Xmx20m, 過于小了, 加大堆內存可以直接放開flume-env.sh
內JAVA_OPTS
的注釋
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
- Channel問題
如果對數據一致性要求較高, 可以把memory channel改用file channel, 請自行研究
XX啟動!
幾條測試命令
bin/zookeeper-server-start.sh config/zookeeper.properties # 啟動zookeeper
bin/kafka-server-start.sh config/server.properties # 啟動kafak服務bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 # 啟動flumebin/kafka-console-producer.sh --topic flume-test --bootstrap-server localhost:9092 # 啟動一個生產者寫測試數據