flume
1.flume是什么
Flume:** Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、傳輸、聚合的系統。** Flume僅僅運行在linux環境下** flume.apache.org(Documentation--Flume User Guide)
Flume體系結構(Architecture):
Source: 用于采集數據,Source是產生數據流的地方,同時Source會將產生的數據流傳輸到Channel
Channel:連接 source 和 sink的數據傳輸通道
Sink: 從Channel收集數據,將數據寫到目標源,可以是下一個Source也可以是HDFS或者HBase
2.flume安裝
----flume安裝-----------------------------1、解壓(建議安裝到cdh目錄里)2、改名,并修改flume-env.sh $ mv flume-env.sh.template flume-env.sh export JAVA_HOME=/opt/modules/jdk1.7.0_673、使用flume-ng命令 $ bin/flume-ng --conf 指定配置目錄 --name 指定Agent的名稱 --conf-file 指定具體的配置文件
3.案例:
需求:使用flume監控某個端口,把從端口寫入的數據輸出為logger1、復制 $ cp -a flume-conf.properties.template flume-telnet.conf2、修改flume-telnet.conf# Name the components on this agent # a1為代理(中介)實例名,任意命名,agent分三部分 a1.sources = r1 a1.sinks = k1 a1.channels = c1# Describe/configure the source # netcat是用于調試和檢查網絡的工具包,windows和linux(redhat)均可用,需要安裝 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444# Describe the sink # 可以在文檔Flume Sinks--Logger Sink部分查找 # 往日志文件里面寫 a1.sinks.k1.type = logger# Use a channel which buffers events in memory # 內存channel a1.channels.c1.type = memory # channel里存放的最大event數 a1.channels.c1.capacity = 1000 # 每個事務支持的最大event數 a1.channels.c1.transactionCapacity = 100# 綁定source和sink到channel # 注意:這里有's' a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1*** 配置文件的使用: a) 命名 b) 配置source、sink、channel c) 關聯--------------------- 測試: *** 安裝telnet $ su - # yum -y install telnet*** 啟動flume,'-D'設置日志級別和輸出源 $ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/flume-telnet.conf -Dflume.root.logger=INFO,console //把日志結果輸出到控制臺*** 打開另外一個窗口 $ netstat -an|grep 44444 --檢查是否有程序(flume)在監聽44444端口 $ telnet localhost 44444 --連接本機的44444端口,telnet是訪問這個端口的客戶端 然后隨意輸入字符串...PS: a) 退出telnet:'ctrl+]',然后輸入quit。 b) 若flume-ng無法退出,則打開一個新的窗口,jps(或netstat -antp|grep 44444)查找pid,使用 kill -9
需求:實時抽取新生成的日志文件內容 --> 追加到HDFS上對應文件的末尾 本例使用flume去監控某個文件,將新增添的內容抽取到其他地方,如HDFS本例監控的是apache的日志文件 /var/log/httpd/access_log----安裝Apache服務器------- $ su - # yum -y install httpd # service httpd start # service httpd status ** 編輯主頁,/var/www/html是Apache web服務器根目錄 # vi /var/www/html/index.html 隨意輸入內容... ** 打開瀏覽器,http://192.168.2.200訪問網頁** 授權 # chmod 755 /var/log/httpd/** 動態監看日志變化,刷新頁面可以觸發日志生成 # su - tom $ tail -f /var/log/httpd/access_log --'-F'和'-f'效果相同---------------------------- $ cp -a flume-telnet.conf flume-apache.conf a2.sources = r2 a2.channels = c2 a2.sinks = k2# define sources a2.sources.r2.type = exec a2.sources.r2.command = tail -F /var/log/httpd/access_log # '-c'表示命令行,必需寫 a2.sources.r2.shell = /bin/bash -c# define channels a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100# define sinks #啟用設置多級目錄,這里按"年月日/時"2級目錄,每1小時生成一個文件夾 a2.sinks.k2.type = hdfs #目錄會自動生成 a2.sinks.k2.hdfs.path=hdfs://192.168.2.200:8020/flume/%Y%m%d/%H # 文件前綴 a2.sinks.k2.hdfs.filePrefix = accesslog #啟用按時間生成文件夾 a2.sinks.k2.hdfs.round=true #設置round值:1,單位:小時 a2.sinks.k2.hdfs.roundValue=1 a2.sinks.k2.hdfs.roundUnit=hour #使用本地時間戳,如:用來命名文件 a2.sinks.k2.hdfs.useLocalTimeStamp=true# 緩沖到hdfs之前,用以寫文件的事件的最大數 a2.sinks.k2.hdfs.batchSize=1000 a2.sinks.k2.hdfs.fileType=DataStream a2.sinks.k2.hdfs.writeFormat=Text#解決文件過多過小的問題(若是使用默認配置,會生成很多個小文件) #每600秒生成一個文件 a2.sinks.k2.hdfs.rollInterval=600 #當文件達到128000000字節時,會創建一個新文件 #實際環境中如果一個文件塊128M,那么這里一般設置成127M(127*1024*1024) a2.sinks.k2.hdfs.rollSize=128000000 #設置文件的生成和events數無關 a2.sinks.k2.hdfs.rollCount=0 #需要設置為1,否則當有副本復制時,就重新生成文件,上面三條則會失效 a2.sinks.k2.hdfs.minBlockReplicas=1# bind the sources and sinks to the channels a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2測試: a) 啟動CDH Hadoop $ sbin/start-dfs.sh ; sbin/start-yarn.sh ; sbin/mr-jobhistory-daemon.sh start historyserver b) 啟動Apache # service httpd start c) 啟動flume $ bin/flume-ng agent --conf conf/ --name a2 --conf-file conf/flume-apache.conf d) 刷新http://192.168.2.200監看web日志:$ tail -f /var/log/httpd/access_log 監看HDFS: $ bin/hdfs dfs -tail -f /flume/20170519/10/accesslog.1495161507253.tmp
利用flume監控某個目錄(/home/tom/log),把里面回滾好的文件實時抽取到HDFS平臺。$ mkdir /home/hadoop/log $ cd log $ cp /var/log/httpd/access_log access_log.1 $ cp /var/log/httpd/access_log access_log.2 需求:抽取文件access_log.1和access_log.2$ mkdir /opt/cdh-5.3.6/apache-flume-1.5.0-cdh5.3.6-bin/checkpoint $ mkdir /opt/cdh-5.3.6/apache-flume-1.5.0-cdh5.3.6-bin/checkdata$ cp -a flume-apache.conf flume-dir.confa3.sources = r3 a3.channels = c3 a3.sinks = k3# define sources a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /home/tom/log # 使用正則表達式指定忽略的文件 # '.'表示除了'\r\n'以外的任意字符,'*'表示0-n個 a3.sources.r3.ignorePattern = ^.*\_log$# define channels # 通過臨時文件進行轉存(即把數據緩存到一個臨時文件中,然后一起flush),速度慢,但數據相對安全 # 這里使用memory channel也可以 a3.channels.c3.type = file # checkpoint文件存放的地方,checkpoint里存儲著元數據信息,比如哪些文件被抽取過,哪些還沒有... a3.channels.c3.checkpointDir = /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/checkpoint # 臨時文件存放的地方 a3.channels.c3.dataDirs = /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/checkdata# define sinks #啟用設置多級目錄,這里按"年月日/時"2級目錄,每1小時生成一個文件夾 a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path=hdfs://192.168.122.128:8020/flume2/%Y%m%d/%H a3.sinks.k3.hdfs.filePrefix = accesslog #啟用按時間生成文件夾 a3.sinks.k3.hdfs.round=true a3.sinks.k3.hdfs.roundValue=1 a3.sinks.k3.hdfs.roundUnit=hour #使用本地時間戳 a3.sinks.k3.hdfs.useLocalTimeStamp=truea3.sinks.k3.hdfs.batchSize=1000 a3.sinks.k3.hdfs.fileType=DataStream a3.sinks.k3.hdfs.writeFormat=Text#解決文件過多過小問題 #每600秒生成一個文件 a3.sinks.k3.hdfs.rollInterval=600 a3.sinks.k3.hdfs.rollSize=128000000 #設置文件的生成和events數無關 a3.sinks.k3.hdfs.rollCount=0 #設置成1,否則當有副本復制時就重新生成文件,上面三條則會失去效果 a3.sinks.k3.hdfs.minBlockReplicas=1# bind the sources and sinks to the channels a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3測試: $ bin/flume-ng agent --conf conf/ --name a3 --conf-file conf/flume-dir.conf 去http://192.168.2.200:50070查看即可 ** 進入log/,可以看到,帶后綴的表示抽取完成 $ ls access_log.1.COMPLETED access_log.2.COMPLETED再次生成一個日志文件,會發現其會被立即抽取 $ cp access_log.1.COMPLETED access_log.3 $ ls access_log.1.COMPLETED access_log.3.COMPLETED access_log.2.COMPLETED
在同一個服務器啟動三個agent: agent1:用于實時監控/var/log/httpd/access_log** flume-apache.conf# 配置agent1 agent1.sources = r1 agent1.channels = c1 agent1.sinks = k1# define sources agent1.sources.r1.type = exec # 注意:執行flume命令的用戶對/var/log/httpd/access_log文件一定要有可讀權限 agent1.sources.r1.command = tail -F /var/log/httpd/access_log agent1.sources.r1.shell = /bin/bash -c# define channels agent1.channels.c1.type = memory agent1.channels.c1.capacity = 1000 agent1.channels.c1.transactionCapacity = 100# define sinks # 一種序列號技術 agent1.sinks.k1.type = avro agent1.sinks.k1.hostname = 192.168.2.200 agent1.sinks.k1.port = 4545# bind the sources and sinks to the channels agent1.sources.r1.channels = c1 agent1.sinks.k1.channel = c1測試: 啟動Apache啟動agent1: $ bin/flume-ng agent --conf conf/ --name agent1 --conf-file conf/flume-apache.conf $ tail -F /var/log/httpd/access_log 刷新網頁,查看變化------------------ agent2:用于實時監控/opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs/hive.log $ mkdir logs $ vi conf/hive-log4j.properties hive.log.dir=/opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs** flume-hive.conf# 配置agent2 agent2.sources = r2 agent2.channels = c2 agent2.sinks = k2# define sources agent2.sources.r2.type = exec agent2.sources.r2.command = tail -F /opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs/hive.log agent2.sources.r2.shell = /bin/bash -c# define channels agent2.channels.c2.type = memory agent2.channels.c2.capacity = 1000 agent2.channels.c2.transactionCapacity = 100# define sinks agent2.sinks.k2.type = avro agent2.sinks.k2.hostname = 192.168.2.200 agent2.sinks.k2.port = 4545# bind the sources and sinks to the channels agent2.sources.r2.channels = c2 agent2.sinks.k2.channel = c2測試: 啟動agent2: $ bin/flume-ng agent --conf conf/ --name agent2 --conf-file conf/flume-hive.conf $ tail -F /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log 進入hive,隨便執行幾條語句,查看日志變化 hive> show databases; ...------------------- agent3:用于實時監控收集agent1和agent2傳遞過來的數據** flume-collector.conf# 配置agent3 agent3.sources = r3 agent3.channels = c3 agent3.sinks = k3# define sources agent3.sources.r3.type = avro agent3.sources.r3.bind = 192.168.2.200 agent3.sources.r3.port = 4545# define channels agent3.channels.c3.type = memory agent3.channels.c3.capacity = 1000 agent3.channels.c3.transactionCapacity = 100# define sinks # 啟用設置多級目錄,這里按"年月日"時 2級目錄,每個小時生成一個文件夾 agent3.sinks.k3.type = hdfs agent3.sinks.k3.hdfs.path=hdfs://192.168.2.200:8020/flume3/%Y%m%d/%H agent3.sinks.k3.hdfs.filePrefix = accesslog# 啟用按小時生成文件夾 agent3.sinks.k3.hdfs.round=true agent3.sinks.k3.hdfs.roundValue=1 agent3.sinks.k3.hdfs.roundUnit=hour agent3.sinks.k3.hdfs.useLocalTimeStamp=trueagent3.sinks.k3.hdfs.batchSize=1000 agent3.sinks.k3.hdfs.fileType=DataStream agent3.sinks.k3.hdfs.writeFormat=Text# 解決文件過多過小的問題 # 每600秒生成一個文件 agent3.sinks.k3.hdfs.rollInterval=600 agent3.sinks.k3.hdfs.rollSize=128000000 # 設置文件的生成和events數無關 agent3.sinks.k3.hdfs.rollCount=0 # 設置成1,否則當有副本復制時就會重新生成文件,上面三條則會失效 agent3.sinks.k3.hdfs.minBlockReplicas=1# bind the sources and sinks to the channels agent3.sources.r3.channels = c3 agent3.sinks.k3.channel = c3測試: 啟動agent3: $ bin/flume-ng agent --conf conf/ --name agent3 --conf-file conf/flume-collector.conf 進入CDH Hadoop,監控日志變化,注意:路徑要修改(監控.temp文件效果會明顯點) $ bin/hdfs dfs -tail -f /flume3/20161220/11/accesslog.1482203839459
?