1、HDFS Sink
此sink將事件寫入 Hadoop 分布式文件系統 (HDFS) 中。它目前支持創建文本和序列文件。它支持兩種文件類型的壓縮。可以根據經過的時間或數據大小或事件數定期滾動文件(關閉當前文件并創建一個新文件)。它還按事件起源的時間戳或計算機等屬性對數據進行存儲/分區。HDFS 目錄路徑可能包含格式轉義序列,這些轉義序列將由 HDFS 接收器替換,以生成用于存儲事件的目錄/文件名。使用此 sink 需要安裝 hadoop,以便 Flume 可以使用 Hadoop jar 與 HDFS 集群進行通信。請注意,需要支持 sync() 調用的 Hadoop 版本。
注意 對于所有與時間相關的轉義序列,事件的標頭中必須存在鍵為“timestamp”的標頭(除非 hdfs.useLocalTimeStamp 設置為 true)。自動添加此功能的一種方法是使用 TimestampInterceptor。
Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?hdfs |
hdfs.path | – | HDFS directory path (eg hdfs://namenode/flume/webdata/) |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
上述配置會將時間戳四舍五入到最后 10 分鐘。例如,如果事件的時間戳為 2012 年 6 月 12 日上午 11:54:34,則 hdfs 路徑將變為 /flume/events/2012-06-12/1150/00。
2、Hive Sink
此接收器將包含分隔文本或 JSON 數據的事件直接流式傳輸到 Hive 表或分區中。事件是使用 Hive 事務寫入的。一旦將一組事件提交到 Hive,Hive 查詢就會立即看到這些事件。Flume 將流式傳輸到的分區可以預先創建,也可以選擇性地在缺少分區時創建它們。傳入事件數據中的字段將映射到 Hive 表中的相應列。
Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?hive |
hive.metastore | – | Hive metastore URI (eg thrift://a.b.com:9083 ) |
hive.database | – | Hive database name |
hive.table | – | Hive table name |
hive.partition | – | Comma separate list of partition values identifying the partition to write to. May contain escape sequences. E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21 |
serializer | Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table. Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON |
Example Hive table :
create table weblogs ( id int , msg string )
??? partitioned by (continent string, country string, time string)
??? clustered by (id) into 5 buckets
??? stored as orc;
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%Y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg
上述配置會將時間戳四舍五入到最后 10 分鐘。例如,如果事件的時間戳標頭設置為 2012 年 6 月 12 日上午 11:54:34,并且“country”標頭設置為“india”,則該事件的計算結果將達到分區 (continent='asia',country='india',time='2012-06-12-11-50')。序列化程序配置為接受包含三個字段的制表符分隔輸入,并跳過第二個字段。
3、Logger Sink
在 INFO 級別記錄事件。通常可用于測試/調試目的。
Property Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?logger |
maxBytesToLog | 16 | Maximum number of bytes of the Event body to log |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
4、Avro Sink
發送到此sink的 Flume 事件將轉換為 Avro 事件,并發送到配置的主機名/端口對。事件以配置的批處理大小的批處理方式從配置的通道中獲取。
Property Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?avro. |
hostname | – | The hostname or IP address to bind to. |
port | – | The port # to listen on. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
5、Thrift Sink
發送到此接收器的 Flume 事件將轉換為 Thrift 事件,并發送到配置的主機名/端口對。事件從配置的通道中按配置的批處理大小批量獲取
通過啟用 kerberos 身份驗證,可以將 Thrift sink 配置為在安全模式下啟動。要與以安全模式啟動的 Thrift 源進行通信,Thrift 接收器也應在安全模式下運行。client-principal 和 client-keytab 是 Thrift 接收器用于向 kerberos KDC 進行身份驗證的屬性。server-principal 表示此接收器配置為在安全模式下連接到的 Thrift 源的主體。
Property Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?thrift. |
hostname | – | The hostname or IP address to bind to. |
port | – | The port # to listen on. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
6、File Roll Sink
將事件存儲在本地文件系統上
Property Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?file_roll. |
sink.directory | – | The directory where files will be stored |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
7、IRC Sink
IRC 接收器從附加通道獲取消息,并將這些消息中繼到配置的 IRC 目標。
Property Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?irc |
hostname | – | The hostname or IP address to connect to |
port | 6667 | The port number of remote host to connect |
nick | – | Nick name |
user | – | User name |
password | – | User password |
chan | – | channel |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume
8、HBaseSinks?.
此接收器將數據寫入 HBase。Hbase 配置是從類路徑中遇到的第一個hbase-site.xml中選取的。實現 HbaseEventSerializer 的類(由配置指定)用于將事件轉換為 HBase 全量和/或增量。然后將這些全量和增量寫入 HBase。此接收器提供與 HBase 相同的一致性保證,HBase 目前是逐行原子性。如果 Hbase 無法寫入某些事件,接收器將回滾該事務中的所有事件。
HBaseSink 支持寫入數據以保護 HBase。若要寫入安全 HBase,代理正在運行的用戶必須對接收器配置為寫入的表具有寫入權限。可以在配置中指定用于對 KDC 進行身份驗證的主體和密鑰表。Flume 代理類路徑中的hbase-site.xml必須將身份驗證設置為 kerberos(有關如何執行此操作的詳細信息,請參閱 HBase 文檔)
為方便起見,Flume 提供了兩個序列化器。SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer) 將事件正文按原樣寫入 HBase,并選擇性地在 Hbase 中遞增列。RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer) 根據給定的正則表達式中斷事件正文,并將每個部分寫入不同的列中。類型為 FQCN:org.apache.flume.sink.hbase.HBaseSink。
Property Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?hbase |
table | – | The name of the table in Hbase to write to. |
columnFamily | – | The column family in Hbase to write to. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
9、HBase2Sink
HBase2Sink 相當于 HBase 版本 2 的 HBaseSink。提供的功能和配置參數與 HBaseSink 相同(sink 類型中的 hbase2 標記和包/類名稱除外)。類型為FQCN: org.apache.flume.sink.hbase2.HBase2Sink
Property Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?hbase2 |
table | – | The name of the table in HBase to write to. |
columnFamily | – | The column family in HBase to write to. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase2
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.channel = c1
10、AsyncHBaseSink
此接收器使用異步模型將數據寫入 HBase。實現 AsyncHbaseEventSerializer 的類(由配置指定)用于將事件轉換為 HBase 全量和/或增量。然后將這些看全量和增量寫入 HBase。此接收器使用 Asynchbase API 寫入 HBase。此接收器提供與 HBase 相同的一致性保證,HBase 目前是逐行原子性。如果 Hbase 無法寫入某些事件,接收器將回滾該事務中的所有事件。AsyncHBaseSink 只能與 HBase 1.x 一起使用。AsyncHBaseSink 使用的異步客戶端庫不適用于 HBase 2。類型為 FQCN:org.apache.flume.sink.hbase.AsyncHBaseSink。
Property Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?asynchbase |
table | – | The name of the table in Hbase to write to. |
columnFamily | – | The column family in Hbase to write to. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1
11、MorphlineSolrSink
該sink從 Flume 事件中提取數據,對其進行轉換,并將其近乎實時地加載到 Apache Solr 服務器中,而 Apache Solr 服務器又向最終用戶或搜索應用程序提供查詢。
此接收器非常適合將原始數據流式傳輸到 HDFS(通過 HdfsSink)并同時提取、轉換和加載相同數據到 Solr(通過 MorphlineSolrSink)的用例。具體而言,此sink可以處理來自不同數據源的任意異構原始數據,并將其轉換為對搜索應用程序有用的數據模型。
ETL 功能可使用 morphline 配置文件進行自定義,該文件定義了一系列轉換命令,這些命令將事件記錄從一個命令傳遞到另一個命令。
Morphlines 可以看作是 Unix 管道的演變,其中數據模型被推廣為處理通用記錄流,包括任意二進制有效負載。morphline 命令有點像 Flume Interceptor。Morphlines 可以嵌入到 Hadoop 組件(如 Flume)中
提供用于解析和轉換一組標準數據格式(如日志文件、Avro、CSV、文本、HTML、XML、PDF、Word、Excel 等)的命令,并且可以添加用于其他數據格式的其他自定義命令和解析器作為 morphline 插件。可以對任何類型的數據格式進行索引,并且可以生成任何類型的Solr模式的任何Solr文檔,并且可以注冊和執行任何自定義ETL邏輯。
Morphlines 操作連續的記錄流。數據模型可以描述如下:記錄是一組命名字段,其中每個字段都有一個或多個值的有序列表。值可以是任何 Java 對象。也就是說,記錄本質上是一個哈希表,其中每個哈希表條目都包含一個 String 鍵和一個 Java 對象列表作為值。(該實現使用 Guava 的 ArrayListMultimap,即 ListMultimap)。請注意,一個字段可以有多個值,并且任何兩個記錄都不需要使用通用字段名稱。
此sink將 Flume 事件的主體填充到 morphline 記錄的 _attachment_body 字段中,并將 Flume 事件的標頭復制到同名的記錄字段中。然后,命令可以對此數據執行操作。
支持路由到 SolrCloud 集群以提高可擴展性。索引負載可以分布在大量 MorphlineSolrSink 上,以提高可伸縮性。索引負載可以在多個MorphlineSolrSinks之間復制,以實現高可用性,例如使用Flume功能,如負載平衡Sink Processor。MorphlineInterceptor 還可以幫助實現到多個 Solr 集合的動態路由(例如,用于多租戶)。
您的環境所需的 morphline 和 solr jar 必須放在 Apache Flume 安裝的 lib 目錄中。
The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
Property Name | Default | Description |
channel | – | |
type | – | The component type name, needs to be?org.apache.flume.sink.solr.morphline.MorphlineSolrSink |
morphlineFile | – | The relative or absolute path on the local file system to the morphline configuration file. Example:?/etc/flume-ng/conf/morphline.conf |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000
12、Kafka Sink
可以將數據發布到 Kafka topic。其中一個目標是將 Flume 與 Kafka 集成,以便基于拉取的處理系統可以處理來自各種 Flume 源的數據。
目前支持 Kafka 服務器版本 0.10.1.0 或更高版本。測試完成到 2.0.1,這是發布時最高的可用版本。
Property Name | Default | Description |
type | – | Must be set to?org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | – | List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
kafka Sink 使用 FlumeEvent 標頭中的主題和鍵屬性將事件發送到 Kafka。如果標頭中存在主題,則事件將發送到該特定主題,覆蓋為 Sink 配置的主題。如果標頭中存在 key,則 Kafka 將使用該 key 在主題分區之間對數據進行分區。具有相同鍵的事件將發送到同一分區。如果鍵為 null,則事件將發送到隨機分區。
下面給出了 Kafka 接收器的示例配置。以前綴 kafka.producer 開頭的屬性,即 Kafka 生產者。創建 Kafka 生產者時傳遞的屬性不限于此示例中給出的屬性。此外,還可以在此處包含您的自定義屬性,并通過作為方法參數傳入的 Flume Context 對象在預處理器中訪問它們。
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
Flume 和 Kafka 之間的通信通道支持安全認證和數據加密。對于安全身份驗證,可以從 Kafka 版本 0.9.0 開始使用 SASL/GSSAPI (Kerberos V5) 或 SSL(即使參數名為 SSL,實際協議是 TLS 實現)。
截至目前,數據加密僅由 SSL/TLS 提供。
將 kafka.producer.security.protocol 設置為以下任一值意味著:
SASL_PLAINTEXT - Kerberos 或純文本身份驗證,無需數據加密
SASL_SSL - 具有數據加密功能的 Kerberos 或明文身份驗證
SSL - 基于 TLS 的加密,具有可選身份驗證。
警告 啟用 SSL 時性能會下降,其程度取決于 CPU 類型和 JVM 實現。參考:Kafka 安全概述和用于跟蹤此問題的 Jira:KAFKA-2561