一、Flume Channels
channel是在代理上暫存事件的存儲庫。Source 添加事件,Sink 將其刪除。
1、Memory Channel
事件存儲在具有可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流,但在agent發生故障時會丟失暫存數據
Property Name | Default | Description |
type | – | The component type name, needs to be?memory |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
2、JDBC Channel
事件存儲在由數據庫支持的持久性存儲中。JDBC 通道目前支持嵌入式 Derby。這是一個持久的通道,非常適合可恢復性很重要的流。
Property Name | Default | Description |
type | – | The component type name, needs to be?jdbc |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = jdbc
3、Kafka Channel
這些事件存儲在 Kafka 集群中(必須單獨安裝)。Kafka 提供高可用性和復制,因此,如果agent或 Kafka 崩潰,這些事件會立即提供給其他sink
Kafka 通道可用于多種方案:
1. 使用 Flume source和sink - 它為事件提供了一個可靠且高度可用的通道
2. 使用 Flume source和interceptor,但沒有sink - 它允許將 Flume 事件寫入 Kafka topic,供其他應用程序使用
3. 使用 Flume sink,但沒有source - 這是一種低延遲、高容錯的方式,可以將事件從 Kafka 發送到 Flume sink,例如 HDFS、HBase 或 Solr
目前支持 Kafka 服務器版本 0.10.1.0 或更高版本。測試完成到 2.0.1,這是發布時最高的可用版本。
配置參數的組織方式如下
1. 與通道相關的配置值一般應用于通道配置級別,例如:a1.channel.k1.type =
2. 與 Kafka 或通道運行方式相關的配置值以“kafka”為前綴(這與 CommonClient 配置無關),例如:a1.channels.k1.kafka.topic 和 a1.channels.k1.kafka.bootstrap.servers。這與 hdfs 接收器的運行方式沒有什么不同
3. 特定于生產者/消費者的屬性以 kafka.producer 或 kafka.consumer 為前綴
4. 在可能的情況下,使用 Kafka 參數名稱,例如:bootstrap.servers 和 acks
Property Name | Default | Description |
type | – | The component type name, needs to be?org.apache.flume.channel.kafka.KafkaChannel |
kafka.bootstrap.servers | – | List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
Example for agent named a1:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
4、File Channel
?默認情況下,文件通道使用用戶主目錄內的檢查點和數據目錄的路徑。因此,如果代理中有多個文件通道實例處于活動狀態,則只有一個實例能夠鎖定目錄并導致另一個通道初始化失敗。因此,有必要提供所有已配置通道的顯式路徑,最好是在不同的磁盤上。此外,由于文件通道將在每次提交后同步到磁盤,因此可能需要將其與將事件批處理在一起的sink/source耦合,以便在多個磁盤不可用于檢查點和數據目錄的情況下提供良好的性能。
Property Name Default | Description | ? |
type | – | The component type name, needs to be?file. |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
二、Flume Channel Selectors?
通道選擇器,如果未指定類型,則默認為“復制”。
1、Replicating Channel Selector (default)
Property Name | Default | Description |
selector.type | replicating | The component type name, needs to be?replicating |
selector.optional | – | Set of channels to be marked as?optional |
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
在上面的配置中,c3 是可選通道。寫入 c3 失敗不會對事務產生影響。而 c1 和 c2 未標記為可選(optional),寫入c1 和 c2失敗將導致事務失敗。
2、Load Balancing Channel Selector
負載平衡通道選擇器提供了在多個通道上對流量進行負載均衡的能力。這 有效地允許在多個線程上處理傳入數據。它維護一個索引列表,該列表必須在其上分配負載。實現支持使用round_robin或隨機選擇機制分配負載。選擇機制的選擇默認為round_robin類型,但可以通過配置進行覆蓋。
Property Name | Default | Description |
selector.type | replicating | The component type name, needs to be?load_balancing |
selector.policy | round_robin | Selection mechanism. Must be either?round_robin?or?random. |
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = load_balancing
a1.sources.r1.selector.policy = round_robin
3、Multiplexing Channel Selector
多路復用通道選擇器
Property Name | Default | Description |
selector.type | replicating | The component type name, needs to be?multiplexing |
selector.header | flume.selector.header | |
selector.default | – | |
selector.mapping.* | – |
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
#header的值對應自定義Interceptor中header的key
a1.sources.r1.selector.header = state
# CZ、US對應自定義Interceptor中header的value
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
三、Flume Interceptors
Flume 能夠在數據傳輸中修改/刪除事件。這是在攔截器Interceptors的幫助下完成的。攔截器是實現 org.apache.flume.interceptor.Interceptor 接口的類。攔截器可以根據攔截器開發人員選擇的任何條件修改甚至刪除事件。Flume 支持攔截器的鏈接。這是通過在配置中指定攔截器生成器類名列表來實現的。攔截器在源配置中被指定為空格分隔列表。配置中的攔截器的順序即是調用攔截器的順序。一個攔截器返回的事件列表將傳遞給鏈中的下一個攔截器。攔截器可以修改或刪除事件。如果攔截器需要刪除事件,則它不會在返回的列表中返回該事件。如果要刪除所有事件,則僅返回一個空列表。攔截器是需要自編的組件,下面是一個示例
1、自定義interceptors
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 先調用i1再調用i2
a1.sources.r1.interceptors = i1 i2
#這里自編的攔截器名為HostInterceptor
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1
請注意,攔截器生成器將傳遞給 type config 參數。攔截器本身就是 可配置,并且可以傳遞配置值,就像傳遞給任何其他可配置組件一樣。 在上面的示例中,事件首先傳遞給 HostInterceptor,然后由 HostInterceptor 返回事件 然后傳遞給 TimestampInterceptor。可以指定完全限定的類名 (FQCN) 或別名時間戳。如果您有多個收集器寫入同一 HDFS 路徑,則還可以使用 HostInterceptor。
2、Timestamp Interceptor
此攔截器將處理事件的時間(以毫秒)插入到事件標頭中。這個攔截器 插入一個具有鍵?timestamp?(或由?header?屬性指定) 的標頭,其值為相關時間戳。 如果配置中已存在現有時間戳,則此偵聽器可以保留該時間戳。
Property Name | Default | Description |
type | – | The component type name, has to be?timestamp?or the FQCN |
headerName | timestamp | The name of the header in which to place the generated timestamp. |
preserveExisting | false | If the timestamp already exists, should it be preserved - true or false |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =? c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
3、Host Interceptor
此偵聽器插入運行此代理的主機的主機名或 IP 地址。它插入一個標題 使用密鑰主機或已配置的密鑰,其值為主機的主機名或 IP 地址(基于配置)
Property Name | Default | Description |
type | – | The component type name, has to be?host |
preserveExisting | false | If the host header already exists, should it be preserved - true or false |
useIP | true | Use the IP Address if true, else use hostname. |
hostHeader | host | The header key to be used. |
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host