flume channel和interceptor簡介及官方用例

一、Flume Channels

channel是在代理上暫存事件的存儲庫。Source 添加事件,Sink 將其刪除。

1、Memory Channel

事件存儲在具有可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流,但在agent發生故障時會丟失暫存數據

Property Name

Default

Description

type

The component type name, needs to be?memory

Example for agent named a1:

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 10000

a1.channels.c1.byteCapacityBufferPercentage = 20

a1.channels.c1.byteCapacity = 800000

2、JDBC Channel

事件存儲在由數據庫支持的持久性存儲中。JDBC 通道目前支持嵌入式 Derby。這是一個持久的通道,非常適合可恢復性很重要的流。

Property Name

Default

Description

type

The component type name, needs to be?jdbc

Example for agent named a1:

a1.channels = c1

a1.channels.c1.type = jdbc

3、Kafka Channel

這些事件存儲在 Kafka 集群中(必須單獨安裝)。Kafka 提供高可用性和復制,因此,如果agent或 Kafka 崩潰,這些事件會立即提供給其他sink

Kafka 通道可用于多種方案:

1. 使用 Flume source和sink - 它為事件提供了一個可靠且高度可用的通道

2. 使用 Flume source和interceptor,但沒有sink - 它允許將 Flume 事件寫入 Kafka topic,供其他應用程序使用

3. 使用 Flume sink,但沒有source - 這是一種低延遲、高容錯的方式,可以將事件從 Kafka 發送到 Flume sink,例如 HDFS、HBase 或 Solr

目前支持 Kafka 服務器版本 0.10.1.0 或更高版本。測試完成到 2.0.1,這是發布時最高的可用版本。

配置參數的組織方式如下

1. 與通道相關的配置值一般應用于通道配置級別,例如:a1.channel.k1.type =

2. 與 Kafka 或通道運行方式相關的配置值以“kafka”為前綴(這與 CommonClient 配置無關),例如:a1.channels.k1.kafka.topic 和 a1.channels.k1.kafka.bootstrap.servers。這與 hdfs 接收器的運行方式沒有什么不同

3. 特定于生產者/消費者的屬性以 kafka.producer 或 kafka.consumer 為前綴

4. 在可能的情況下,使用 Kafka 參數名稱,例如:bootstrap.servers 和 acks

Property Name

Default

Description

type

The component type name, needs to be?org.apache.flume.channel.kafka.KafkaChannel

kafka.bootstrap.servers

List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port

Example for agent named a1:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092

a1.channels.channel1.kafka.topic = channel1

a1.channels.channel1.kafka.consumer.group.id = flume-consumer

4、File Channel

?默認情況下,文件通道使用用戶主目錄內的檢查點和數據目錄的路徑。因此,如果代理中有多個文件通道實例處于活動狀態,則只有一個實例能夠鎖定目錄并導致另一個通道初始化失敗。因此,有必要提供所有已配置通道的顯式路徑,最好是在不同的磁盤上。此外,由于文件通道將在每次提交后同步到磁盤,因此可能需要將其與將事件批處理在一起的sink/source耦合,以便在多個磁盤不可用于檢查點和數據目錄的情況下提供良好的性能。

Property Name Default

Description

?

type

The component type name, needs to be?file.

Example for agent named a1:

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /mnt/flume/checkpoint

a1.channels.c1.dataDirs = /mnt/flume/data

二、Flume Channel Selectors?

通道選擇器,如果未指定類型,則默認為“復制”。

1、Replicating Channel Selector (default)

Property Name

Default

Description

selector.type

replicating

The component type name, needs to be?replicating

selector.optional

Set of channels to be marked as?optional

Example for agent named a1 and it’s source called r1:

a1.sources = r1

a1.channels = c1 c2 c3

a1.sources.r1.selector.type = replicating

a1.sources.r1.channels = c1 c2 c3

a1.sources.r1.selector.optional = c3

在上面的配置中,c3 是可選通道。寫入 c3 失敗不會對事務產生影響。而 c1 和 c2 未標記為可選(optional),寫入c1 和 c2失敗將導致事務失敗。

2、Load Balancing Channel Selector

負載平衡通道選擇器提供了在多個通道上對流量進行負載均衡的能力。這 有效地允許在多個線程上處理傳入數據。它維護一個索引列表,該列表必須在其上分配負載。實現支持使用round_robin或隨機選擇機制分配負載。選擇機制的選擇默認為round_robin類型,但可以通過配置進行覆蓋。

Property Name

Default

Description

selector.type

replicating

The component type name, needs to be?load_balancing

selector.policy

round_robin

Selection mechanism. Must be either?round_robin?or?random.

Example for agent named a1 and it’s source called r1:

a1.sources = r1

a1.channels = c1 c2 c3 c4

a1.sources.r1.channels = c1 c2 c3 c4

a1.sources.r1.selector.type = load_balancing

a1.sources.r1.selector.policy = round_robin

3、Multiplexing Channel Selector

多路復用通道選擇器

Property Name

Default

Description

selector.type

replicating

The component type name, needs to be?multiplexing

selector.header

flume.selector.header

selector.default

selector.mapping.*

Example for agent named a1 and it’s source called r1:

a1.sources = r1

a1.channels = c1 c2 c3 c4

a1.sources.r1.selector.type = multiplexing

#header的值對應自定義Interceptor中header的key

a1.sources.r1.selector.header = state

# CZ、US對應自定義Interceptor中header的value

a1.sources.r1.selector.mapping.CZ = c1

a1.sources.r1.selector.mapping.US = c2 c3

a1.sources.r1.selector.default = c4

三、Flume Interceptors

Flume 能夠在數據傳輸中修改/刪除事件。這是在攔截器Interceptors的幫助下完成的。攔截器是實現 org.apache.flume.interceptor.Interceptor 接口的類。攔截器可以根據攔截器開發人員選擇的任何條件修改甚至刪除事件。Flume 支持攔截器的鏈接。這是通過在配置中指定攔截器生成器類名列表來實現的。攔截器在源配置中被指定為空格分隔列表。配置中的攔截器的順序即是調用攔截器的順序。一個攔截器返回的事件列表將傳遞給鏈中的下一個攔截器。攔截器可以修改或刪除事件。如果攔截器需要刪除事件,則它不會在返回的列表中返回該事件。如果要刪除所有事件,則僅返回一個空列表。攔截器是需要自編的組件,下面是一個示例

1、自定義interceptors

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 先調用i1再調用i2

a1.sources.r1.interceptors = i1 i2

#這里自編的攔截器名為HostInterceptor

a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder

a1.sources.r1.interceptors.i1.preserveExisting = false

a1.sources.r1.interceptors.i1.hostHeader = hostname

a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d

a1.sinks.k1.channel = c1

請注意,攔截器生成器將傳遞給 type config 參數。攔截器本身就是 可配置,并且可以傳遞配置值,就像傳遞給任何其他可配置組件一樣。 在上面的示例中,事件首先傳遞給 HostInterceptor,然后由 HostInterceptor 返回事件 然后傳遞給 TimestampInterceptor。可以指定完全限定的類名 (FQCN) 或別名時間戳。如果您有多個收集器寫入同一 HDFS 路徑,則還可以使用 HostInterceptor。

2、Timestamp Interceptor

此攔截器將處理事件的時間(以毫秒)插入到事件標頭中。這個攔截器 插入一個具有鍵?timestamp?(或由?header?屬性指定) 的標頭,其值為相關時間戳。 如果配置中已存在現有時間戳,則此偵聽器可以保留該時間戳。

Property Name

Default

Description

type

The component type name, has to be?timestamp?or the FQCN

headerName

timestamp

The name of the header in which to place the generated timestamp.

preserveExisting

false

If the timestamp already exists, should it be preserved - true or false

Example for agent named a1:

a1.sources = r1

a1.channels = c1

a1.sources.r1.channels =? c1

a1.sources.r1.type = seq

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

3、Host Interceptor

此偵聽器插入運行此代理的主機的主機名或 IP 地址。它插入一個標題 使用密鑰主機或已配置的密鑰,其值為主機的主機名或 IP 地址(基于配置)

Property Name

Default

Description

type

The component type name, has to be?host

preserveExisting

false

If the host header already exists, should it be preserved - true or false

useIP

true

Use the IP Address if true, else use hostname.

hostHeader

host

The header key to be used.

Example for agent named a1:

a1.sources = r1

a1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = host

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/14218.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/14218.shtml
英文地址,請注明出處:http://en.pswp.cn/web/14218.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

k近鄰和kd樹

K近鄰 選取k值的時候可以采用交叉驗證的方法 一般采用歐氏距離 kd樹 采用樹這個特殊的數據結構來實現k近鄰算法 先假設是二維的情況 下面講解kd樹的完整構造過程 找這個中位數是按照每棵子樹來創建的 前提是已經有了一棵kd樹,然后來一個實例點

java組合設計模式Composite Pattern

組合設計模式(Composite Pattern)是一種結構型設計模式,它允許你將對象組合成樹形結構來表示“部分-整體”的層次結構。組合模式使得客戶端對單個對象和組合對象的使用具有一致性。 // Component - 圖形接口 interface Graphic {void draw()…

Python UDP編程簡單實例

TCP是建立可靠的連接,并且通信雙方都可以以流的形式發送數據。 相對于TCP,UDP則是面向無連接的協議,不需要建立連接,只需要知道對方IP地址和端口號,就可以直接發送數據包。但是只管發送不保證到達。 雖然UDP傳輸數據…

Docker快速部署Seata的TC服務以及微服務引入Seata教程

目錄 一、使用docker部署Seata的TC服務 1、拉取TC服務鏡像 2、創建并運行容器 ?3、修改配置文件 4、在Nacos中添加TC服務的配置 5、重啟TC服務 二、微服務集成Seata 1、引入依賴 2、修改配置文件 Seata是阿里的一個開源的分布式事務解決方案,能夠為分布…

STM32學習和實踐筆記(31):輸入捕獲實驗

1.輸入捕獲介紹 在定時器中斷實驗章節中我們介紹了通用定時器具有多種功能,輸入捕獲就是其中一種。STM32F1除了基本定時器TIM6和TIM7,其他定時器都具有輸入捕獲功能。輸入捕獲可以對輸入的信號的上升沿,下降沿或者雙邊沿進行捕獲,…

【博客主頁】博客主旨 精華

前言 與博客園不同, 最近CSDN在進行資本化的轉型.其一部分的VIP代碼和小冊我也有相關消費, 個人認為是一部分做的比較成過, 另一部分又不是特別成功. 其CSDN博客已經失去其原本技術交流的意義, 變成一種免費的知識引流和收費交流. 這其實與我們的開源社區背道而馳, 但是又吸引…

世界電信日 | 紫光展銳以科技創新支撐數字經濟可持續發展

專注科技創新,打造全球數字經濟技術基石 紫光展銳堅持科技創新,為數字經濟蓬勃發展提供基石力量。 面對5G-A技術的巨大潛力,紫光展銳與眾多生態伙伴緊密合作,積極推動5G-A的商用進程。紫光展銳提出的兩項R18 eRedCap演進方案已被3GPP標準采…

為什么要實現設備之間的互通?

設備之間的互通是電信設備的普遍性要求,特別是在接入網領域中,不同廠商的局端設備與用戶端(終端)設備之間的互通顯得尤其重要。 一、互通能為產業鏈的各個環節帶寬積極影響。 (1)對用戶而言,互…

安裝新版的Ubuntu WSL以使能BBR擁塞控制算法

【多次嘗試成功的方案】通過> wsl - -list -online列出可以安裝的版本,用命令> wsl --install -d Ubuntu-24.04 安裝。 【未成功的方案】通過掛在ubuntu24.04.iso到E盤后,用命令> wsl --import Ubuntu24.04 C:\WSL\Ubuntu24.04\ E:\ --versio…

Redis系統架構中各個處理模塊是干什么的?no.19

Redis 系統架構 通過前面的學習,相信你已經掌握了 Redis 的原理、數據類型及訪問協議等內容。本課時,我將進一步分析 Redis 的系統架構,重點講解 Redis 系統架構的事件處理機制、數據管理、功能擴展、系統擴展等內容。 事件處理機制 Redis…

API-BigInteger、BigDecimal

BigInteger: demo1: package BigInteger;import java.math.BigInteger; import java.util.Random;public class demo1 {public static void main(String[] args) {//獲取一個隨機最大整數BigInteger bd1 new BigInteger(5, new Random());System.out.println(bd1…

SSMP整合案例第一步 制作分析模塊創建與開發業務實體類

制作分析 我們要實現一個模塊的增刪改查 實際開發中mybatisplus用的不多,他只能對沒有外鍵的單表進行簡單的查詢 但在這個案例中我們還是選擇mybatisplus開發 模塊創建 我們把所有服務器都放在一起 就不用前后端分離 我們嘗試用后端開發進行全棧開發 新建項目添…

macos brew安裝多版本protobuf,切換指定版本protobuf 為默認版本方法

protobuf 不同的版本語法相差很大, 而在不同的項目中可能使用的protobuf版本也不同,所以我們的電腦就可能需要安裝多個版本的protobuf, 下面介紹macos下如何通過brew安裝多版本和設置想要的默認版本的方法 安裝,則可以先執行 bre…

Thinkphp3.2.3網站后臺不能訪問如何修復

我是使用Thinkphp3.2.3新搭建的PHP網站,但是網站前臺可以訪問,后臺訪問出現如圖錯誤: 由于我使用的Hostease的Linux虛擬主機產品默認帶普通用戶權限的cPanel面板,對于上述出現的問題不清楚如何處理,因此聯系Hostease的…

(3)醫療圖像處理:MRI磁共振成像-快速采集--(楊正漢)

目錄 一、磁共振快速采集技術基礎 1.K空間的基本特點 2.快速成像的理由: 3.快速成像的硬件要求: 二、磁共振快速采集技術 1.采集更少的相位編碼線 2.平行采集技術PAT 3.其他與快速采集有關的技術 1)部分回波技術 2)頻率…

java實現一個動態監控系統,監控接口請求超時的趨勢

目錄 整體思路案例實現1. 數據收集2. 數據聚合3. 趨勢分析4. 異常檢測5. 異常處理定時任務 整體思路 理想情況下,你可以實現一個簡單的動態監控算法來檢測渠道請求的響應時間趨勢,并在發現頻繁超時的情況下進行處理。以下是一個可能的算法框架&#xff…

Oracle表關聯更新幾種方法

1、測試表及數據準備 create table T_update01(ID int ,infoname varchar2(32),sys_guid varchar2(36)); create table T_update02(ID int ,infoname varchar2(32),sys_guid varchar2(36));insert into T_update01 select 1,N1_updateName,sys_guid() from dual union select …

java如何獲取IP和IP的歸屬地?

在Java中,獲取IP地址通常指的是獲取本地機器的IP地址或者通過某種方式(如HTTP請求)獲取的遠程IP地址。代碼案例如下: 而要獲取IP的歸屬地(地理位置信息),則通常需要使用第三方IP地址查詢服務,我…

c++ 排序算法merge使用要求

在C中&#xff0c;std::merge是一個算法&#xff0c;它用于合并兩個已排序的范圍&#xff08;例如數組或容器中的一部分&#xff09;到一個新的范圍中。這個函數在<algorithm>頭文件中定義。 輸入范圍必須已排序 std::merge要求輸入的兩個范圍都必須是已排序的&#xf…

23種設計模式順口溜

口訣&#xff1a; 原型 抽風 &#xff0c;單獨 建造 工廠 &#xff08;寓意&#xff1a;&#xff08;這里代指本來很簡單的東西&#xff0c;卻要干工廠這里復雜的業務&#xff09; 抽風&#xff1a;抽象工廠單獨&#xff1a;單例橋代理組合享元適配器&#xff0c;&#xff0…