flume avro java 發送數據_flume將數據發送到kafka、hdfs、hive、http、netcat等模式的使用總結...

1、source為http模式,sink為logger模式,將數據在控制臺打印出來。

conf配置文件如下:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = http #該設置表示接收通過http方式發送過來的數據

a1.sources.r1.bind = hadoop-master #運行flume的主機或IP地址都可以

a1.sources.r1.port = 9000#端口

#a1.sources.r1.fileHeader = true

# Describe the sink

a1.sinks.k1.type = logger#該設置表示將數據在控制臺打印出來

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

啟動flume命令為:

bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console。

顯示如下的信息表示啟動flume成功。

895 (lifecycleSupervisor-1-3) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started

打開另外一個終端,通過http post的方式發送數據:

curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:9000。

hadoop-master就是flume配置文件綁定的主機名,9000就是綁定的端口。

然后在運行flume的窗口就是看到如下的內容:

2018-06-12 08:24:04,472 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{timestampe=1234567, host=master} body: 62 61 64 6F 75 20 66 6C 75 6D 65 badou flume }

2、source為netcat(udp、tcp模式),sink為logger模式,將數據打印在控制臺

conf配置文件如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = netcat

a1.sources.r1.bind = hadoop-master#綁定的主機名或IP地址

a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transcationCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

啟動flume

bin/flume-ng agent -c conf -f conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console。

然后在另外一個終端,使用telnet發送數據:

命令為:telnet hadoop-maser 44444

[root@hadoop-master ~]# telnet hadoop-master 44444

Trying 192.168.194.6...

Connected to hadoop-master.

Escape character is '^]'.

顯示上面的信息表示連接flume成功,然后輸入:

12213213213

OK

12321313

OK

在flume就會收到相應的信息:

2018-06-12 08:38:51,129 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 32 31 33 32 31 33 32 31 33 0D 12213213213. }

2018-06-12 08:38:51,130 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 33 32 31 33 31 33 0D 12321313. }

3、source為netcat/http模式,sink為hdfs模式,將數據存儲在hdfs中。

conf配置文件如下,文件名為hdfs.conf:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = hadoop-master

a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type =regex_filter

a1.sources.r1.interceptors.i1.regex =^[0-9]*$

a1.sources.r1.interceptors.i1.excludeEvents =true

# Describe the sink

#a1.sinks.k1.type = logger

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs:/flume/events #文件在hdfs文件系統中存放的位置

a1.sinks.k1.hdfs.filePrefix = events- #文件的前綴

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

a1.sinks.k1.hdfs.fileType = DataStream #制定文件的存放格式,這個設置是以text的格式存放從flume傳輸過來的數據。

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

在hdfs文件系統中創建文件存放的路徑:

hadoop fs -mkdir /flume/event1。

啟動flume:

bin/flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console

通過telnet模式向flume中發送文件:

telnet hadoop-master 44444

然后輸入:

aaaaaaaa

bbbbbbb

ccccccccc

dddddddddd

通過如下的命令hadoop fs -ls /flume/events/查看hdfs中的文件,可以看到hdfs中有/flume/events有如下文件:

-rw-r--r-- 3 root supergroup 16 2018-06-05 06:02 /flume/events/events-.1528203709070

-rw-r--r-- 3 root supergroup 5 2018-06-05 06:02 /flume/events/events-.1528203755556

-rw-r--r-- 3 root supergroup 11 2018-06-05 06:03 /flume/events/events-.1528203755557

-rw-r--r-- 3 root supergroup 26 2018-06-13 07:28 /flume/events/events-.1528900112215

-rw-r--r-- 3 root supergroup 209 2018-06-13 07:29 /flume/events/events-.1528900112216

-rw-r--r-- 3 root supergroup 72 2018-06-13 07:29 /flume/events/events-.1528900112217

通過hadoop fs -cat /flume/events/events-.1528900112216查看文件events-.1528900112216的內容:

aaaaaaaaaaaaaaaaa

bbbbbbbbbbbbbbbb

ccccccccccccccccccc

dddddddddddddddd

eeeeeeeeeeeeeeeeeee

fffffffffffffffffffffff

gggggggggggggggggg

hhhhhhhhhhhhhhhhhhhhhhh

iiiiiiiiiiiiiiiiiii

jjjjjjjjjjjjjjjjjjj

http模式就是把hdfs.conf文件中的netcat改為http,然后傳輸文件從telnet改為:

curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:44444。

在hadoop文件中就會看到上面命令傳輸的內容:badou flume。

4、source為netcat/http模式,sink為hive模式,將數據存儲在hive中,并分區存儲。

conf配置如下,文件名為hive.conf:

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = hadoop-master

a1.sources.r1.port = 44444

# Describe the sink

#a1.sinks.k1.type = logger

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore=thrift://hadoop-master:9083

a1.sinks.k1.hive.database=default#hive數據庫名

a1.sinks.k1.hive.table=flume_user1

a1.sinks.k1.serializer=DELIMITED

a1.sinks.k1.hive.partition=3#如果以netcat模式,只能靜態設置分區的值,因為netcat模式傳輸數據,無法傳輸某個字段的值,只能按照順序來。這里設置age的分區值為3。

#a1.sinks.k1.hive.partition=%{age}#如果以http或json等模式,只能動態設置分區的值,因為http模式可以動態傳輸age的值。

a1.sinks.k1.serializer.delimiter=" "

a1.sinks.k1.serializer.serderSeparator=' '

a1.sinks.k1.serializer.fieldnames=user_id,user_name

a1.sinks.k1.hive.txnsPerBatchAsk = 10

a1.sinks.k1.hive.batchSize = 1500

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

在hive中創建表:

create table flume_user(

user_id int

,user_name string

)

partitioned by(age int)

clustered by (user_id) into 2 buckets

stored as orc

在hive-site.xml中添加如下內容:

javax.jdo.option.ConnectionPassword

hive

password to use against metastore database

hive.support.concurrency

true

hive.exec.dynamic.partition.mode

nonstrict

hive.txn.manager

org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

hive.compactor.initiator.on

true

hive.compactor.worker.threads

1

將hive根目錄下的/hcatalog/share/hcatalog文件夾中的如下三個文件夾添加到flume的lib目錄下。

運行flume:

bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。

重新打開一個窗口,

啟動metastroe服務:

hive --service metastore &

重新打開一個客戶端,通過telnet連接到flume

telnet hadoop-master 44444

然后輸入:

1 1

3 3

就會在hive中看到如下兩行數據:

flume_user1.user_id flume_user1.user_name flume_user1.age

1 1 3

3 3 3

age是在hive.conf中設置的值3。

現在將flume的source換成http模式,然后hive分區通過參數模式動態的傳輸分區值。

將hive.conf中的

a1.sources.r1.type = netcat改成a1.sources.r1.type = http

a1.sinks.k1.hive.partition=3改成a1.sinks.k1.hive.partition=%{age}。

然后啟動flume:

bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。

在重新打開的窗口中通過http的模式傳輸數據到flume

curl -X POST -d '[{"headers":{"age":"109"},"body":"11 ligongong"}]' hadoop-master:44444。

在hive中可以看到如下的數據:

flume_user1.user_id flume_user1.user_name flume_user1.age

11 ligongong 109

由此可以看出通過http模式傳輸數據到hive中時,分區字段的信息是在header中傳輸,而其他字段的信息是放在bady中傳輸,并且不同列之間以hive.conf文件定義好的分隔符分隔。

5、使用avro模式,將數據在控制臺打印出來。

不同的agent之間傳輸數據只能通過avro模式。

這里我們需要兩臺服務器來演示avro的使用,兩臺服務器分別是hadoop-master和hadoop-slave2

hadoop-master中運行agent2,然后指定agent2的sink為avro,并且將數據發送的主機名設置為hadoop-slave2。hadoop-master中flume的conf文件設置如下,名字為push.conf:

#Name the components on this agent

a2.sources= r1

a2.sinks= k1

a2.channels= c1

#Describe/configure the source

a2.sources.r1.type= netcat

a2.sources.r1.bind= hadoop-master

a2.sources.r1.port = 44444

a2.sources.r1.channels= c1

#Use a channel which buffers events in memory

a2.channels.c1.type= memory

a2.channels.c1.keep-alive= 10

a2.channels.c1.capacity= 100000

a2.channels.c1.transactionCapacity= 100000

#Describe/configure the source

a2.sinks.k1.type= avro#制定sink為avro

a2.sinks.k1.channel= c1

a2.sinks.k1.hostname= hadoop-slave2#指定sink要發送數據到的目的服務器名

a2.sinks.k1.port= 44444#目的服務器的端口

hadoop-slave2中運行的是agent1,agent1的source為avro。flume配置內容如下,文件名為pull.conf

#Name the components on this agent

a1.sources= r1

a1.sinks= k1

a1.channels= c1

#Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels= c1

a1.sources.r1.bind= hadoop-slave2

a1.sources.r1.port= 44444

#Describe the sink

a1.sinks.k1.type= logger

a1.sinks.k1.channel = c1

#Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.keep-alive= 10

a1.channels.c1.capacity= 100000

a1.channels.c1.transactionCapacity= 100000。

現在hadoop-slave2中啟動flume,然后在hadoop-master中啟動flume,順序一定要對,否則會報如下的錯誤:org.apache.flume.FlumeException: java.net.SocketException: Unresolved address

在hadoop-slave2中啟動flume:

bin/flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console

在hadoop-master中啟動flume:

bin/flume-ng agent -c conf -f conf/push.conf -n a2 -Dflume.root.logger=INFO,console

重新打開一個窗口,通過telnet連接到hadoop-master

telnet hadoop-master 44444

然后發送11111aaaa

在hadoop-slave2的控制臺中就會顯示之前發送的,11111aaaa,如下所示:

2018-06-14 06:43:00,686 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 31 31 31 31 61 61 61 61 0D 11111aaaa. }

6、通過flume將數據通傳輸到kafka,然后通過kafka將數據存儲在hdfs和hive中。

在分別在hadoop-master、hadoop-slave1、hadoop-slave2上啟動zookeeper。

命令為:

然后啟動kafka,進入kafka的安裝目錄,執行命令:

./bin/kafka-server-start.sh config/server.properties &

在kafka中創建topic:

bin/kafka-topics.sh --create --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --replication-factor 1 --partitions 2 --topic flume_kafka

查看kafka中的topic:

bin/kafka-topics.sh --list --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181

啟動kafka的消費者:

./kafka-console-consumer.sh --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --topic flume_kafka

配置flume中conf文件,設置source類型為exec,sink為org.apache.flume.sink.kafka.KafkaSink,設置kafka的topic為上面創建的flume_kafka,具體配置如下:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

#設置sources的類型為exec,就是執行命令的意思

a1.sources.r1.type = exec

#設置sources要執行的命令

a1.sources.r1.command = tail -f /home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt

# 設置kafka接收器

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

# 設置kafka的broker地址和端口號

a1.sinks.k1.brokerList=hadoop-master:9092

# 設置Kafka的topic

a1.sinks.k1.topic=flume_kafka

# 設置序列化的方式

a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# use a channel which buffers events in memory

a1.channels.c1.type=memory

a1.channels.c1.capacity = 100000

a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

啟動flume:

只要/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt中有數據時flume就會加載kafka中,然后被上面啟動的kafka消費者消費掉。

我們查看發現/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt文件中有如下的數據:

131,dry pasta

131,dry pasta

132,beauty

133,muscles joints pain relief

133,muscles joints pain relief

133,muscles joints pain relief

133,muscles joints pain relief

134,specialty wines champagnes

134,specialty wines champagnes

134,specialty wines champagnes

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/532030.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/532030.shtml
英文地址,請注明出處:http://en.pswp.cn/news/532030.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python三角函數擬合_使用python進行數據擬合最小化函數

這是我對這個問題的理解。首先,我通過以下代碼生成一些數據import numpy as npfrom scipy.integrate import quadfrom random import randomdef boxmuller(x0,sigma):u1random()u2random()llnp.sqrt(-2*np.log(u1))z0ll*np.cos(2*np.pi*u2)z1ll*np.cos(2*np.pi*u2)r…

java url 本地文件是否存在_我的應用程序知道URL中是否存在文件會一直停止[重復]...

這個問題在這里已有答案:我試圖寫一個應用程序,如果在給定的URL中有一個文件,將字符串放在textview中,這是代碼和崩潰信息,可能是什么錯誤?public class MainActivity extends AppCompatActivity {String u…

python枚舉類的意義_用于ORM目的的python枚舉類

編輯問題我正在嘗試創建一個類工廠,它可以生成具有以下屬性的枚舉類:>從列表中初始化類允許值(即,它)自動生成!).> Class創建自己的一個實例對于每個允許的值.>類不允許創建任何其他實例一旦上述步驟已完成(任何嘗試這樣做會導致異常).>類實…

java 生成校驗驗證碼_java生成驗證碼并進行驗證

一實現思路使用BufferedImage用于在內存中存儲生成的驗證碼圖片使用Graphics來進行驗證碼圖片的繪制,并將繪制在圖片上的驗證碼存放到session中用于后續驗證最后通過ImageIO將生成的圖片進行輸出通過頁面提交的驗證碼和存放在session中的驗證碼對比來進行校驗二、生…

yy自動語音接待機器人_智能語音機器人落地產品有哪些?

據相關研究報告表明,在眾多人工智能落地產品或者應用場景中,智能語音機器人無論從產品的成熟度還是應用的廣泛度來說,都是人工智能行業最熱門和最有前景的產品。智能語音機器人并不只是一款產品,它是所有智能語音系列產品的統稱&a…

java資源文件獲取屬性_Java讀寫資源文件類Properties

Java中讀寫資源文件最重要的類是Properties1) 資源文件要求如下:1、properties文件是一個文本文件2、properties文件的語法有兩種,一種是注釋,一種屬性配置。注 釋:前面加上#號屬性配置:以“鍵值”的方式書寫一個屬性的配置信息…

java被放棄了_為什么學Java那么容易放棄?

學習Java確實很容易就放棄,但是也很容易就學好,因為大多數人都是抱著試一試的心態,然后當后面就堅持不下去但是回過頭來想一想,打游戲上分容易嗎,一樣是磕磕碰碰的,有時候十幾連跪都不會放棄你上分的心情。…

python 隱馬爾科夫_機器學習算法之——隱馬爾可夫(Hidden Markov ModelsHMM)原理及Python實現...

前言上星期寫了Kaggle競賽的詳細介紹及入門指導,但對于真正想要玩這個競賽的伙伴,機器學習中的相關算法是必不可少的,即使是你不想獲得名次和獎牌。那么,從本周開始,我將介紹在Kaggle比賽中的最基本的也是運用最廣的機…

java編程50_java經典50編程題(1-10)

1.有一對兔子從出生后第三個月起,每個月都生一對小兔子,小兔子長到三個月后每個月又生一對兔子,假設兔子不死亡,問每個月兔子的總數為多少?分析過程圖片發自簡書App示例代碼圖片發自簡書App運行結果圖片發自簡書App反思…

python替代hadoop_Python連接Hadoop數據中遇到的各種坑(匯總)

最近準備使用PythonHadoopPandas進行一些深度的分析與機器學習相關工作。(當然隨著學習過程的進展,現在準備使用PythonSparkHadoop這樣一套體系來搭建后續的工作環境),當然這是后話。但是這項工作首要條件就是將Python與Hadoop進行打通,本來認…

java 自動化測試_java寫一個自動化測試

你模仿購物車試一下,同樣是買東西,加上勝負平的賠率,輸出改下應該就可以了package com.homework.lhh;import java.util.ArrayList;import java.util.Comparator;import java.util.Scanner;public class Ex04 {public static void main(String…

超大規模集成電路_納米級超大規模集成電路芯片低功耗物理設計分析(二)

文 | 大順簡要介紹了功耗的組成,在此基礎上從工藝、電路、門、系統四個層面探討了納米級超大規模集成電路的低功耗物理設計方法。關鍵詞:納米級;超大規模集成電路;電路芯片;電路設計02納米級超大規模集成電路芯片低功耗…

java中的printnb_javaI/O系統筆記

1、File類File類的名字有一定的誤導性;我們可能認為它指代的是文件,實際上卻并非如此。它既能代表一個特定文件的名稱,又能代表一個目錄下的一組文件的名稱。1.1、目錄列表器如果需要查看目錄列表,可以通過file.list(FilenameFilt…

outlook反應慢的原因_保險管怎么區分慢熔和快熔?

保險絲快熔與慢熔的區別所有雙帽;對于這樣的產品特性和安全性熔絲; gG的”,即,與接觸帽組合接觸;即,所述雙(內/外蓋)的蓋。和一般的小型或地下加工廠,以便執行切割角,降低生產成本,這將選擇單個帽鉚接“單&…

java成員內部類_Java中的內部類(二)成員內部類

Java中的成員內部類(實例內部類):相當于類中的一個成員變量,下面通過一個例子來觀察成員內部類的特點public classOuter {//定義一個實例變量和一個靜態變量private inta;private static intb;//定義一個靜態方法和一個非靜態方法public static voidsay(…

word 通配符_學會Word通配符,可以幫助我們批量處理好多事情

長文檔需要批量修改或刪除某些內容的時候,我們可以利用Word中的通配符來搞定這一切,當然,前提是你必須會使用它。通配符的功能非常強大,能夠隨意組合替換或刪除我們定義的規則內容,下面易老師就分享一些關于查找替換通…

java存儲鍵值結構_java-鍵值存儲為主數據庫

我將要開始一個項目,該項目的讀寫操作非常頻繁且頻繁.因此,環顧四周,我發現內存數據庫正是為此目的而創建的.經過更多調查后,我進入了redis.Redis看起來很酷(雖然剛開始閱讀,但是對此有很多了解).但是我主要只看過關系數據庫,并且以元組和關系的方式來考慮數據(我認為我可以隨著…

python 輸入文件名查找_python 查找文件名包含指定字符串的方法

編寫一個程序,能在當前目錄以及當前目錄的所有子目錄下查找文件名包含指定字符串的文件,并打印出絕對路徑。import osclass searchfile(object):def __init__(self,path.):self._pathpathself.abspathos.path.abspath(self._path) # 默認當前目錄def fin…

java 運行 出現選擇_Eclipse?運行出現java.lang.NoClassDefFoundError的解決方法

上篇博文也提到了這個問題,但沒有深入的講解。這次特意做了整理,詳細解釋其原因。先看錯誤java.lang.NoClassDefFoundError,顯然是java虛擬機找不到指定的類,多數情況下是外部jar中的類。Eclipse的自動化,集成化&#…

設置熄屏_剛買的手機微信收不到信息提醒耽誤事情,手機到手一定要這樣設置...

手機使用過程中經常會遇到第三方軟件接收不到信息提醒的狀況,常常因此耽誤了很多重要的事情,造成損失。特別是剛換新手機或者手機剛升級系統時發生的最多。一般都覺得是手機問題,其實只是手機的系統設置出現了問題,只要跟我按照以…