1.概述
Flink 提供了 Kafka 連接器使用精確一次(Exactly-once)的語義在 Kafka topic 中讀取和寫入數據。
目前還沒有 Flink 1.19 可用的連接器。
2.Kafka Source
a)使用方法
Kafka Source 提供了構建類來創建 KafkaSource
的實例。以下代碼片段展示了如何構建 KafkaSource
來消費 “input-topic” 最早位點的數據,使用消費組 “my-group”,并且將 Kafka 消息體反序列化為字符串:
KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics("input-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
以下屬性在構建 KafkaSource 時是必須指定的:
- Bootstrap server,通過 setBootstrapServers(String) 方法配置
- 消費者組 ID,通過 setGroupId(String) 配置
- 要訂閱的 Topic / Partition
- 用于解析 Kafka 消息的反序列化器(Deserializer)
b)Topic / Partition 訂閱
Kafka Source 提供了 3 種 Topic / Partition 的訂閱方式。
Topic 列表,訂閱 Topic 列表中所有 Partition 的消息
KafkaSource.builder().setTopics("topic-a", "topic-b");
正則表達式匹配,訂閱與正則表達式所匹配的 Topic 下的所有 Partition
KafkaSource.builder().setTopicPattern("topic.*");
Partition 列表,訂閱指定的 Partition
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a"new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet);
c)消息解析
代碼中需要提供一個反序列化器(Deserializer)來對 Kafka 的消息進行解析,反序列化器通過 setDeserializer(KafkaRecordDeserializationSchema) 來指定,其中 KafkaRecordDeserializationSchema 定義了如何解析 Kafka 的 ConsumerRecord。
如果只需要 Kafka 消息中的消息體(value)部分的數據,可以使用 KafkaSource 構建類中的 setValueOnlyDeserializer(DeserializationSchema) 方法,其中 DeserializationSchema 定義了如何解析 Kafka 消息體中的二進制數據。
也可使用 Kafka 提供的解析器 來解析 Kafka 消息體,例如使用 StringDeserializer 來將 Kafka 消息體解析成字符串:
import org.apache.kafka.common.serialization.StringDeserializer;KafkaSource.<String>builder().setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
d)起始消費位點
Kafka source 能夠通過偏移量初始化器(OffsetsInitializer
)來指定從不同的偏移量開始消費,內置的偏移量初始化器包括:
KafkaSource.builder()// 從消費組提交的位點開始消費,不指定位點重置策略.setStartingOffsets(OffsetsInitializer.committedOffsets())// 從消費組提交的位點開始消費,如果提交位點不存在,使用最早位點.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))// 從時間戳大于等于指定時間戳(毫秒)的數據開始消費.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))// 從最早位點開始消費.setStartingOffsets(OffsetsInitializer.earliest())// 從最末尾位點開始消費.setStartingOffsets(OffsetsInitializer.latest());
如果內置的初始化器不能滿足需求,也可以實現自定義的位點初始化器(OffsetsInitializer
)。
如果未指定位點初始化器,將默認使用 OffsetsInitializer.earliest()
。
e)有界 / 無界模式
Kafka Source 支持流式和批式兩種運行模式。默認情況下,KafkaSource 設置為以流模式運行,因此作業永遠不會停止,直到 Flink 作業失敗或被取消。
可以使用 setBounded(OffsetsInitializer)
指定停止偏移量使 Kafka Source 以批處理模式運行,當所有分區都達到其停止偏移量時,Kafka Source 會退出運行。
流模式下運行通過使用 setUnbounded(OffsetsInitializer)
也可以指定停止消費位點,當所有分區達到其指定的停止偏移量時,Kafka Source 會退出運行。
f)其他屬性
還可以使用 setProperties(Properties) 和 setProperty(String, String) 為 Kafka Source 和 Kafka Consumer 設置任意屬性。
KafkaSource 有以下配置項:
- client.id.prefix,指定用于 Kafka Consumer 的客戶端 ID 前綴
- partition.discovery.interval.ms,定義 Kafka Source 檢查新分區的時間間隔
- register.consumer.metrics 指定是否在 Flink 中注冊 Kafka Consumer 的指標
- commit.offsets.on.checkpoint 指定是否在進行 checkpoint 時將消費位點提交至 Kafka broker
請注意,即使指定了以下配置項,構建器也會將其覆蓋:
- key.deserializer 始終設置為 ByteArrayDeserializer
- value.deserializer 始終設置為 ByteArrayDeserializer
- auto.offset.reset.strategy 被 OffsetsInitializer#getAutoOffsetResetStrategy() 覆蓋
- partition.discovery.interval.ms 會在批模式下被覆蓋為 -1
g)動態分區檢查
為了在不重啟 Flink 作業的情況下處理 Topic 擴容或新建 Topic 等場景,可以將 Kafka Source 配置為在提供的 Topic / Partition 訂閱模式下定期檢查新分區。
要啟用動態分區檢查,請將 partition.discovery.interval.ms
設置為非負值:
KafkaSource.builder().setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒檢查一次新分區
分區檢查功能默認不開啟,需要顯式地設置分區檢查間隔才能啟用此功能。
h)事件時間和水印
默認情況下,Kafka Source 使用 Kafka 消息中的時間戳作為事件時間,可以定義自己的水印策略(Watermark Strategy) 以從消息中提取事件時間,并向下游發送水印。
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");
i)空閑
如果并行度高于分區數,Kafka Source 不會自動進入空閑狀態,將需要降低并行度或向水印策略添加空閑超時。
如果在這段時間內沒有記錄在流的分區中流動,則該分區被視為“空閑”并且不會阻止下游操作符中水印的進度,定義 WatermarkStrategy#withIdleness
。
j)消費位點提交
Kafka source 在 checkpoint 完成時提交當前的消費位點 ,以保證 Flink 的 checkpoint 狀態和 Kafka broker 上的提交位點一致。
如果未開啟 checkpoint,Kafka source 依賴于 Kafka consumer 內部的位點定時自動提交邏輯,自動提交功能由 enable.auto.commit 和 auto.commit.interval.ms 兩個 Kafka consumer 配置項進行配置。
注意:Kafka source 不依賴于 broker 上提交的位點來恢復失敗的作業,提交位點只是為了上報 Kafka consumer 和消費組的消費進度,以在 broker 端進行監控。
k)監控
范圍 | 指標 | 用戶變量 | 描述 | 類型 |
---|---|---|---|---|
算子 | currentEmitEventTimeLag | n/a | 數據的事件時間與數據離開 Source 時的間隔1:currentEmitEventTimeLag = EmitTime - EventTime | Gauge |
watermarkLag | n/a | 水印時間滯后于當前時間的時長:watermarkLag = CurrentTime - Watermark | Gauge | |
sourceIdleTime | n/a | Source 閑置時長:sourceIdleTime = CurrentTime - LastRecordProcessTime | Gauge | |
pendingRecords | n/a | 尚未被 Source 拉取的數據數量,即 Kafka partition 當前消費位點之后的數據數量。 | Gauge | |
KafkaSourceReader.commitsSucceeded | n/a | 位點成功提交至 Kafka 的總次數,在開啟了位點提交功能時適用。 | Counter | |
KafkaSourceReader.commitsFailed | n/a | 位點未能成功提交至 Kafka 的總次數,在開啟了位點提交功能時適用。注意位點提交僅是為了向 Kafka 上報消費進度,因此提交失敗并不影響 Flink checkpoint 中存儲的位點信息的完整性。 | Counter | |
KafkaSourceReader.committedOffsets | topic, partition | 每個 partition 最近一次成功提交至 Kafka 的位點。各個 partition 的指標可以通過指定 topic 名稱和 partition ID 獲取。 | Gauge | |
KafkaSourceReader.currentOffsets | topic, partition | 每個 partition 當前讀取的位點。各個 partition 的指標可以通過指定 topic 名稱和 partition ID 獲取。 | Gauge |
1 該指標反映了最后一條數據的瞬時值。之所以提供瞬時值是因為統計延遲直方圖會消耗更多資源,瞬時值通常足以很好地反映延遲。
Kafka Consumer 指標
Kafka consumer 的所有指標都注冊在指標組 KafkaSourceReader.KafkaConsumer
下,例如 Kafka consumer 的指標 records-consumed-total
將在該 Flink 指標中匯報: .operator.KafkaSourceReader.KafkaConsumer.records-consumed-total
。
可以使用配置項 register.consumer.metrics
配置是否注冊 Kafka consumer 的指標,默認此選項設置為 true。
l)安全
要啟用加密和認證相關的安全配置,只需將安全配置作為其他屬性配置在 Kafka source 上即可。下面的代碼片段展示了如何配置 Kafka source 以使用 PLAIN 作為 SASL 機制并提供 JAAS 配置:
KafkaSource.builder().setProperty("security.protocol", "SASL_PLAINTEXT").setProperty("sasl.mechanism", "PLAIN").setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
更復雜的例子,使用 SASL_SSL 作為安全協議并使用 SCRAM-SHA-256 作為 SASL 機制:
KafkaSource.builder().setProperty("security.protocol", "SASL_SSL")// SSL 配置// 配置服務端提供的 truststore (CA 證書) 的路徑.setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks").setProperty("ssl.truststore.password", "test1234")// 如果要求客戶端認證,則需要配置 keystore (私鑰) 的路徑.setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks").setProperty("ssl.keystore.password", "test1234")// SASL 配置// 將 SASL 機制配置為 SCRAM-SHA-256.setProperty("sasl.mechanism", "SCRAM-SHA-256")// 配置 JAAS.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
如果在作業 JAR 中 Kafka 客戶端依賴的類路徑被重置了(relocate class),登錄模塊(login module)的類路徑可能會不同,因此請根據登錄模塊在 JAR 中實際的類路徑來改寫以上配置。
m)實現細節
在新 Source API 的抽象中,Kafka source 由以下幾個部分組成:
數據源分片(Source Split)
Kafka source 的數據源分片(source split)表示 Kafka topic 中的一個 partition,Kafka 的數據源分片包括:
- 該分片表示的 topic 和 partition
- 該 partition 的起始位點
- 該 partition 的停止位點,當 source 運行在批模式時適用
Kafka source 分片的狀態同時存儲該 partition 的當前消費位點,該分片狀態將會在 Kafka 源讀取器(source reader)進行快照(snapshot) 時將當前消費位點保存為起始消費位點以將分片狀態轉換成不可變更的分片。
分片枚舉器(Split Enumerator)
Kafka source 的分片枚舉器負責檢查在當前的 topic / partition 訂閱模式下的新分片(partition),并將分片輪流均勻地分配給源讀取器(source reader)。
Kafka source 的分片枚舉器會將分片主動推送給源讀取器,因此它無需處理來自源讀取器的分片請求。
源讀取器(Source Reader)
Kafka source 的源讀取器擴展了 SourceReaderBase
,并使用單線程復用(single thread multiplex)的線程模型,使用一個由分片讀取器 (split reader)驅動的 KafkaConsumer
來處理多個分片(partition)。
消息會在從 Kafka 拉取下來后在分片讀取器中立刻被解析,分片的狀態即當前的消息消費進度會在 KafkaRecordEmitter
中更新,同時會在數據發送至下游時指定事件時間。
n)Kafka SourceFunction
FlinkKafkaConsumer
已被棄用并將在 Flink 1.17 中移除,請改用 KafkaSource
。
3.Kafka Sink
a)使用方法
KafkaSink
可將數據流寫入一個或多個 Kafka topic。
Kafka sink 提供了構建類來創建 KafkaSink
的實例。以下代碼片段展示了如何將字符串數據按照至少一次(at lease once)的語義保證寫入 Kafka topic。
DataStream<String> stream = ...;KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(brokers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("topic-name").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();stream.sinkTo(sink);
以下屬性在構建 KafkaSink 時是必須指定的:
- Bootstrap servers,
setBootstrapServers(String)
- 消息序列化器(Serializer),
setRecordSerializer(KafkaRecordSerializationSchema)
- 如果使用
DeliveryGuarantee.EXACTLY_ONCE
的語義保證,則需要使用setTransactionalIdPrefix(String)
b)序列化器
構建時需要提供 KafkaRecordSerializationSchema 來將輸入數據轉換為 Kafka 的 ProducerRecord。
Flink 提供了 schema 構建器以提供一些通用的組件,例如消息鍵(key)/消息體(value)序列化、topic 選擇、消息分區,同樣也可以通過實現對應的接口來進行更豐富的控制。
KafkaRecordSerializationSchema.builder().setTopicSelector((element) -> {<your-topic-selection-logic>}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build();
其中消息體(value)序列化方法和 topic 的選擇方法是必須指定的,此外也可以通過 setKafkaKeySerializer(Serializer)
或 setKafkaValueSerializer(Serializer)
來使用 Kafka 提供而非 Flink 提供的序列化器。
c)容錯
KafkaSink
總共支持三種不同的語義保證(DeliveryGuarantee
),對于 DeliveryGuarantee.AT_LEAST_ONCE
和 DeliveryGuarantee.EXACTLY_ONCE
,Flink checkpoint 必須啟用;默認情況下 KafkaSink
使用 DeliveryGuarantee.NONE
。 以下是對不同語義保證的解釋:
DeliveryGuarantee.NONE
不提供任何保證:消息有可能會因 Kafka broker 的原因發生丟失或因 Flink 的故障發生重復。DeliveryGuarantee.AT_LEAST_ONCE
: sink 在 checkpoint 時會等待 Kafka 緩沖區中的數據全部被 Kafka producer 確認。消息不會因 Kafka broker 端發生的事件而丟失,但可能會在 Flink 重啟時重復,因為 Flink 會重新處理舊數據。DeliveryGuarantee.EXACTLY_ONCE
: 該模式下,Kafka sink 會將所有數據通過在 checkpoint 時提交的事務寫入。因此,如果 consumer 只讀取已提交的數據(參見 Kafka consumer 配置isolation.level
),在 Flink 發生重啟時不會發生數據重復。然而這會使數據在 checkpoint 完成時才會可見,因此請按需調整 checkpoint 的間隔。請確認事務 ID 的前綴(transactionIdPrefix)對不同的應用是唯一的,以保證不同作業的事務不會互相影響!此外,強烈建議將 Kafka 的事務超時時間調整至遠大于 checkpoint 最大間隔 + 最大重啟時間,否則 Kafka 對未提交事務的過期處理會導致數據丟失。
d)監控
范圍 | 指標 | 用戶變量 | 描述 | 類型 |
---|---|---|---|---|
算子 | currentSendTime | n/a | 發送最近一條數據的耗時。該指標反映最后一條數據的瞬時值。 | Gauge |
e)Kafka Producer
FlinkKafkaProducer
已被棄用并將在 Flink 1.15 中移除,請改用 KafkaSink
。
4.Kafka 連接器指標
Flink 的 Kafka 連接器通過 Flink 的指標系統提供一些指標來幫助分析 connector 的行為,各個版本的 Kafka producer 和 consumer 會通過 Flink 的指標系統匯報 Kafka 內部的指標。
也可通過將 Kafka source 在 register.consumer.metrics
,或 Kafka sink 的 register.producer.metrics
配置設置為 false 來關閉 Kafka 指標的注冊。
5.啟用 Kerberos 身份驗證
只需在 flink-conf.yaml
中配置為 Kafka 啟用 Kerberos 身份驗證
通過設置以下內容配置 Kerberos credentials
security.kerberos.login.use-ticket-cache
:默認情況下,這個值是true
,Flink 將嘗試在kinit
管理的緩存中使用 Kerberos credentials,在 YARN 上部署的 Flink jobs 中使用 Kafka 連接器時,使用緩存的 Kerberos 授權將不起作用。security.kerberos.login.keytab
和security.kerberos.login.principal
:要使用 Kerberos keytabs,需為這兩個屬性設置值。
將 KafkaClient
追加到 security.kerberos.login.contexts
:這告訴 Flink 將配置的 Kerberos credentials 提供給 Kafka 登錄上下文以用于 Kafka 身份驗證。
一旦啟用了基于 Kerberos 的 Flink 安全性后,只需在提供的屬性配置中包含以下兩個設置(通過傳遞給內部 Kafka 客戶端),即可使用 Flink Kafka Consumer 或 Producer 向 Kafka 進行身份驗證:
- 將
security.protocol
設置為SASL_PLAINTEXT
(默認為NONE
):用于與 Kafka broker 進行通信的協議。使用獨立 Flink 部署時,也可以使用SASL_SSL
; - 將
sasl.kerberos.service.name
設置為kafka
(默認為kafka
):此值應與用于 Kafka broker 配置的sasl.kerberos.service.name
相匹配。客戶端和服務器配置之間的服務名稱不匹配將導致身份驗證失敗。
6.升級到最近的連接器版本
- 不要同時升級 Flink 和 Kafka 連接器
- 確保 Consumer 設置了
group.id
- 在 Consumer 上設置
setCommitOffsetsOnCheckpoints(true)
,以便將讀的 offset 提交到 Kafka。務必在停止和恢復 savepoint 前執行此操作,可能需要在舊的連接器版本上進行停止/重啟循環來啟用此設置。 - 在 Consumer 上設置
setStartFromGroupOffsets(true)
,以便從 Kafka 獲取讀的 offset。這只會在 Flink 狀態中沒有讀的 offset 時生效。 - 修改 source/sink 分配到的
uid
。這會確保新的 source/sink 不會從舊的 sink/source 算子中讀取狀態。 - 使用
--allow-non-restored-state
參數啟動新 job,因為在 savepoint 中仍然有先前連接器版本的狀態。
7.問題排查
數據丟失
根據 Kafka 配置,即使在 Kafka 確認寫入后,仍然可能會遇到數據丟失,需要在 Kafka 的配置中設置以下屬性:
acks
log.flush.interval.messages
log.flush.interval.ms
log.flush.*
上述選項的默認值是很容易導致數據丟失的。
UnknownTopicOrPartitionException
導致此錯誤的一個可能原因是正在進行新的 leader 選舉,例如在重新啟動 Kafka broker 之后或期間。這是一個可重試的異常,因此 Flink job 應該能夠重啟并恢復正常運行。
也可以通過更改 producer 設置中的 retries
屬性來規避。但是,這可能會導致重新排序消息,反過來可以通過將 max.in.flight.requests.per.connection
設置為 1 來避免不需要的消息。
ProducerFencedException
這個錯誤是由于 FlinkKafkaProducer
所生成的 transactional.id
與其他應用所使用的的產生了沖突。默認FlinkKafkaProducer
產生的 ID 都是以 taskName + "-" + operatorUid
為前綴的,產生沖突的應用是使用了相同 Job Graph 的 Flink Job。
可以使用 setTransactionalIdPrefix()
方法來覆蓋默認的行為,為每個不同的 Job 分配不同的 transactional.id
前綴來解決。