Flink 系列文章
1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接
13、Flink 的table api與sql的基本概念、通用api介紹及入門示例
14、Flink 的table api與sql之數據類型: 內置數據類型以及它們的屬性
15、Flink 的table api與sql之流式概念-詳解的介紹了動態表、時間屬性配置(如何處理更新結果)、時態表、流上的join、流上的確定性以及查詢配置
16、Flink 的table api與sql之連接外部系統: 讀寫外部系統的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統: 讀寫外部系統的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統: 讀寫外部系統的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統: 讀寫外部系統的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統: 讀寫外部系統的連接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和語法
19、Flink 的Table API 和 SQL 中的內置函數及示例(1)
19、Flink 的Table API 和 SQL 中的自定義函數及示例(2)
19、Flink 的Table API 和 SQL 中的自定義函數及示例(3)
19、Flink 的Table API 和 SQL 中的自定義函數及示例(4)
20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務到集群上
21、Flink 的table API與DataStream API 集成(1)- 介紹及入門示例、集成說明
21、Flink 的table API與DataStream API 集成(2)- 批處理模式和inser-only流處理
21、Flink 的table API與DataStream API 集成(3)- changelog流處理、管道示例、類型轉換和老版本轉換示例
21、Flink 的table API與DataStream API 集成(完整版)
22、Flink 的table api與sql之創建表的DDL
24、Flink 的table api與sql之Catalogs(介紹、類型、java api和sql實現ddl、java api和sql操作catalog)-1
24、Flink 的table api與sql之Catalogs(java api操作數據庫、表)-2
24、Flink 的table api與sql之Catalogs(java api操作視圖)-3
24、Flink 的table api與sql之Catalogs(java api操作分區與函數)-4
25、Flink 的table api與sql之函數(自定義函數示例)
26、Flink 的SQL之概覽與入門示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介紹及詳細示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介紹及詳細示例(2)
27、Flink 的SQL之SELECT (窗口函數)介紹及詳細示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介紹及詳細示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分組聚合、Over Aggregation Over聚合 和 Window Join 窗口關聯)介紹及詳細示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介紹及詳細示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式檢測)介紹及詳細示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 語句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等)
31、Flink的SQL Gateway介紹及示例
32、Flink table api和SQL 之用戶自定義 Sources & Sinks實現及詳細示例
33、Flink 的Table API 和 SQL 中的時區
35、Flink 的 Formats 之CSV 和 JSON Format
36、Flink 的 Formats 之Parquet 和 Orc Format
41、Flink之Hive 方言介紹及詳細示例
40、Flink 的Apache Kafka connector(kafka source的介紹及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介紹及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 說明及使用示例) 完整版
42、Flink 的table api與sql之Hive Catalog
43、Flink之Hive 讀寫及詳細驗證示例
44、Flink之module模塊介紹及使用示例和Flink SQL使用hive內置函數及自定義函數詳細示例–網上有些說法好像是錯誤的
文章目錄
- Flink 系列文章
- 一、Apache Kafka 連接器
- 1、maven依賴
- 2、kafka source
- 1)、使用示例
- 1、Flink 1.13版本實現
- 2、Flink 1.17版本實現
- 3、說明
- 2)、Topic / Partition 訂閱
- 1、Topic 列表
- 2、正則表達式匹配
- 3、Partition 列表
- 3)、消息解析
- 4)、起始消費位點
- 5)、有界 / 無界模式
- 6)、其他屬性
- 7)、動態分區檢查
- 8)、事件時間和水印
- 9)、空閑
- 10)、消費位點提交
- 11)、監控
- 1、范圍
- 1)、用戶范圍
- 2)、系統范圍System Scope
- 3)、所有變量列表
- 4)、用戶變量
- 2、指標范圍
- 3、Kafka Consumer 指標
- 12)、安全
- 13)、實現細節
- 1、數據源分片(Source Split)
- 2、分片枚舉器(Split Enumerator)
- 3、源讀取器(Source Reader)
- 3、kafka sourcefunction
- 4、kafka sink
- 1)、使用示例
- 1、Flink 1.13版本實現
- 2、Flink 1.17版本實現
- 3、說明
- 2)、序列化器
- 3)、容錯
- 4)、監控
- 5、kafka producer
- 6、kafka 連接器指標
- 7、啟用 Kerberos 身份驗證
- 8、升級到最近的連接器版本
- 9、問題排查
- 1)、數據丟失
- 2)、UnknownTopicOrPartitionException
- 3)、ProducerFencedException
本文系統介紹了kafka連接器的source、sink、身份驗證、版本升級、問題排查的幾個主要 方面,關于常用的功能均以可運行的示例進行展示并提供完整的驗證步驟。
本專題為了便于閱讀以及整體查閱分為三個部分:
40、Flink 的Apache Kafka connector(kafka source的介紹及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介紹及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 說明及使用示例) 完整版
本文依賴kafka集群能正常使用。
本文分為9個部分,即maven依賴、source、sourcefunction、sink、producer、連接器指標、身份認證、版本升級及問題排查。
本文的示例是在Flink 1.17版本中運行。
一、Apache Kafka 連接器
Flink 提供了 Apache Kafka 連接器使用精確一次(Exactly-once)的語義在 Kafka topic 中讀取和寫入數據。
1、maven依賴
Apache Flink 集成了通用的 Kafka 連接器,它會盡力與 Kafka client 的最新版本保持同步。 該連接器使用的 Kafka client 版本可能會在 Flink 版本之間發生變化。 當前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。 有關 Kafka 兼容性的更多細節,請參考 Kafka 官方文檔。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version>
</dependency>
如果使用 Kafka source,flink-connector-base 也需要包含在依賴中:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.1</version>
</dependency>
Flink 目前的流連接器還不是二進制發行版的一部分。 在集群中運行需要增加kafka的jar包,然后重啟集群。比如/usr/local/bigdata/flink-1.13.5/lib/flink-sql-connector-kafka_2.11-1.13.5.jar。
2、kafka source
1)、使用示例
Kafka Source 提供了構建類來創建 KafkaSource 的實例。
以下代碼片段展示了如何構建 KafkaSource 來消費 “alan_kafkasource” 最早位點的數據, 使用消費組 “flink_kafka”,并且將 Kafka 消息體反序列化為字符串:
1、Flink 1.13版本實現
- maven依賴
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- flink連接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies>
- 實現代碼
import java.util.Properties;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
......
public static void test1() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 source// 準備kafka連接參數Properties props = new Properties();// 集群地址props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");// 消費者組idprops.setProperty("group.id", "flink_kafka");// latest有offset記錄從記錄位置開始消費,沒有記錄從最新的/最后的消息開始消費// earliest有offset記錄從記錄位置開始消費,沒有記錄從最早的/最開始的消息開始消費props.setProperty("auto.offset.reset", "latest");// 會開啟一個后臺線程每隔5s檢測一下Kafka的分區情況,實現動態分區檢測props.setProperty("flink.partition-discovery.interval-millis", "5000");// 自動提交(提交到默認主題,后續學習了Checkpoint后隨著Checkpoint存儲在Checkpoint和默認主題中)props.setProperty("enable.auto.commit", "true");// 自動提交的時間間隔props.setProperty("auto.commit.interval.ms", "2000");// 使用連接參數創建FlinkKafkaConsumer/kafkaSourceFlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("alan_kafkasource", new SimpleStringSchema(), props);// 使用kafkaSourceDataStream<String> kafkaDS = env.addSource(kafkaSource);// 3、 transformation// 4、 sinkkafkaDS.print();// 5、/ executeenv.execute();}
- 驗證
1、創建kafka主題alan_kafkasource,kafka命令發送數據
[alanchan@server2 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic alan_kafkasource --partitions 1 --replication-factor 1[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_kafkasource
>alan,18
>alanchan,19
>alanchan,20
2、啟動應用程序,并觀察控制臺輸出
2、Flink 1.17版本實現
- maven依賴
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- flink連接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency></dependencies>
- 實現代碼
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;......public static void test2() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_kafkasource").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformation// 4、 sinkkafkaDS.print();// 5、executeenv.execute();}
- 驗證
1、創建kafka主題alan_kafkasource,kafka命令發送數據
[alanchan@server2 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic alan_kafkasource --partitions 1 --replication-factor 1[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_kafkasource
>alan,18
>alanchan,19
>alanchan,20
2、啟動應用程序,并觀察控制臺輸出
3、說明
以下屬性在構建 KafkaSource 時是必須指定的:
- Bootstrap server,通過 setBootstrapServers(String) 方法配置
- 消費者組 ID,通過 setGroupId(String) 配置
- 要訂閱的 Topic / Partition,請參閱 Topic / Partition 一節
- 用于解析 Kafka 消息的反序列化器(Deserializer),請參閱消息解析一節
2)、Topic / Partition 訂閱
Kafka Source 提供了 3 種 Topic / Partition 的訂閱方式:
1、Topic 列表
訂閱 Topic 列表中所有 Partition 的消息
// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_kafkasource1","alan_kafkasource2").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();
2、正則表達式匹配
訂閱與正則表達式所匹配的 Topic 下的所有 Partition
// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_kafkasource*").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();
3、Partition 列表
訂閱指定的 Partition
- 實現代碼
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.common.TopicPartition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;........public static void test3() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceHashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(new TopicPartition("topic_alan", 0), // Partition 0 of topic "topic_alan"new TopicPartition("topic_alanchan", 3))); // Partition 5 of topic "topic_alanchan"KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setPartitions(partitionSet)//.setTopics("alan_kafkasource").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformation// 4、 sinkkafkaDS.print();// 5、executeenv.execute();}
- 驗證
1、創建kafka主題,topic_alan和topic_alanchan,其中topic_alanchan有四個分區,topic_alan只有一個分區
topic_alan主題信息
topic_alanchan主題信息
2、啟動程序
3、通過命令向topic_alan和topic_alanchan主題中發送數據
topic_alan主題發送的數據
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic topic_alan
>alan,18
>alan,19
>alan,20
>
topic_alanchan主題發送的數據
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic topic_alanchan
>alanchan,28
>alan,29
>alanchan,30
>alanchan,31
>alanchan,32
>alanchan,33
>alanchan,34
>alanchan,35
>
4、程序控制臺輸出
3)、消息解析
代碼中需要提供一個反序列化器(Deserializer)來對 Kafka 的消息進行解析。 反序列化器通過 setDeserializer(KafkaRecordDeserializationSchema) 來指定,其中 KafkaRecordDeserializationSchema 定義了如何解析 Kafka 的 ConsumerRecord。
如果只需要 Kafka 消息中的消息體(value)部分的數據,可以使用 KafkaSource 構建類中的 setValueOnlyDeserializer(DeserializationSchema) 方法,其中 DeserializationSchema 定義了如何解析 Kafka 消息體中的二進制數據。
也可使用 Kafka 提供的解析器 來解析 Kafka 消息體。例如使用 StringDeserializer 來將 Kafka 消息體解析成字符串:
- 示例代碼
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;......public static void test4() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_kafkasource").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
// .setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformation// 4、 sinkkafkaDS.print();// 5、executeenv.execute();}
- 驗證結果
kafka命令發送數據
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_kafkasource
>alan,18
>alanchan,19
>alanchan,20
>
程序運行結果
4)、起始消費位點
Kafka source 能夠通過位點初始化器(OffsetsInitializer)來指定從不同的偏移量開始消費 。內置的位點初始化器包括:
KafkaSource.builder()// 從消費組提交的位點開始消費,不指定位點重置策略.setStartingOffsets(OffsetsInitializer.committedOffsets())// 從消費組提交的位點開始消費,如果提交位點不存在,使用最早位點.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))// 從時間戳大于等于指定時間戳(毫秒)的數據開始消費.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))// 從最早位點開始消費.setStartingOffsets(OffsetsInitializer.earliest())// 從最末尾位點開始消費.setStartingOffsets(OffsetsInitializer.latest());
- 示例代碼
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;......public static void test5() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("topic_alanchan").setGroupId("flink_kafka")
// .setStartingOffsets(OffsetsInitializer.earliest()).setStartingOffsets(OffsetsInitializer.latest()).setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
// .setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformation// 4、 sinkkafkaDS.print();// 5、executeenv.execute();}
- 驗證
1、kafka命令行輸入數據
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic topic_alanchan
>alanchan,37
>alanchan,38
>alanchan,39
>alanchan,40
>alanchan,41
>alanchan,42
>alanchan,43
>alanchan,44
>alanchan,45
>alanchan,46
>alanchan,47
>alanchan,48
>alanchan,49
>alanchan,50
>alanchan,51
>
2、運行程序,控制臺輸出結果
如果內置的初始化器不能滿足需求,也可以實現自定義的位點初始化器(OffsetsInitializer)。
如果未指定位點初始化器,將默認使用 OffsetsInitializer.earliest()。
5)、有界 / 無界模式
Kafka Source 支持流式和批式兩種運行模式。默認情況下,KafkaSource 設置為以流模式運行,因此作業永遠不會停止,直到 Flink 作業失敗或被取消。 可以使用 setBounded(OffsetsInitializer) 指定停止偏移量使 Kafka Source 以批處理模式運行。當所有分區都達到其停止偏移量時,Kafka Source 會退出運行。
流模式下運行通過使用 setUnbounded(OffsetsInitializer) 也可以指定停止消費位點,當所有分區達到其指定的停止偏移量時,Kafka Source 會退出運行。
- 示例代碼
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;public static void test6() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("topic_alan").setGroupId("flink_kafka")
// .setStartingOffsets(OffsetsInitializer.earliest()).setStartingOffsets(OffsetsInitializer.latest()).setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
// .setValueOnlyDeserializer(new SimpleStringSchema()).setUnbounded(OffsetsInitializer.timestamp(1700546218367L)).build();DataStreamSource<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformation// 4、 sinkkafkaDS.print();// 5、executeenv.execute();}
6)、其他屬性
除了上述屬性之外,您還可以使用 setProperties(Properties) 和 setProperty(String, String) 為 Kafka Source 和 Kafka Consumer 設置任意屬性。KafkaSource 有以下配置項:
- client.id.prefix,指定用于 Kafka Consumer 的客戶端 ID 前綴
- partition.discovery.interval.ms,定義 Kafka Source 檢查新分區的時間間隔。 請參閱下面的動態分區檢查一節
- register.consumer.metrics 指定是否在 Flink 中注冊 Kafka Consumer 的指標
- commit.offsets.on.checkpoint 指定是否在進行 checkpoint 時將消費位點提交至 Kafka broker
Kafka consumer 的配置可以參考 Apache Kafka 文檔。
請注意,即使指定了以下配置項,構建器也會將其覆蓋:
-
key.deserializer 始終設置為 ByteArrayDeserializer
-
value.deserializer 始終設置為 ByteArrayDeserializer
-
auto.offset.reset.strategy 被 OffsetsInitializer#getAutoOffsetResetStrategy() 覆蓋
-
partition.discovery.interval.ms 會在批模式下被覆蓋為 -1
-
示例代碼
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;......public static void test7() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 source// 準備kafka連接參數Properties props = new Properties();// 集群地址props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");// 消費者組idprops.setProperty("group.id", "flink_kafka");// latest有offset記錄從記錄位置開始消費,沒有記錄從最新的/最后的消息開始消費// earliest有offset記錄從記錄位置開始消費,沒有記錄從最早的/最開始的消息開始消費props.setProperty("auto.offset.reset", "latest");// 會開啟一個后臺線程每隔5s檢測一下Kafka的分區情況,實現動態分區檢測props.setProperty("flink.partition-discovery.interval-millis", "5000");// 自動提交(提交到默認主題,后續學習了Checkpoint后隨著Checkpoint存儲在Checkpoint和默認主題中)props.setProperty("enable.auto.commit", "true");// 自動提交的時間間隔props.setProperty("auto.commit.interval.ms", "2000");KafkaSource<String> source = KafkaSource.<String>builder()
// .setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("topic_alan")
// .setGroupId("flink_kafka")
// .setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema())
// .setUnbounded(OffsetsInitializer.timestamp(1700546218367L)).setProperties(props).build();DataStreamSource<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformation// 4、 sinkkafkaDS.print();// 5、executeenv.execute();}
7)、動態分區檢查
為了在不重啟 Flink 作業的情況下處理 Topic 擴容或新建 Topic 等場景,可以將 Kafka Source 配置為在提供的 Topic / Partition 訂閱模式下定期檢查新分區。要啟用動態分區檢查,請將 partition.discovery.interval.ms 設置為非負值:
// 會開啟一個后臺線程每隔5s檢測一下Kafka的分區情況,實現動態分區檢測
props.setProperty("flink.partition-discovery.interval-millis", "5000");
KafkaSource<String> source = KafkaSource.<String>builder().setProperties(props).build();// 或通過方法屬性設置
KafkaSource.builder().setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒檢查一次新分區
分區檢查功能默認不開啟。需要顯式地設置分區檢查間隔才能啟用此功能。
8)、事件時間和水印
默認情況下,Kafka Source 使用 Kafka 消息中的時間戳作為事件時間。您可以定義自己的水印策略(Watermark Strategy) 以從消息中提取事件時間,并向下游發送水印:
import java.time.Duration;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;......public static void test1() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("topic_alan").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();// 設置Watermaker = 當前最大的事件時間 - 最大允許的延遲時間或亂序時間DataStream<String> kafkaSource = env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "Kafka Source");// 3、 transformation// 4、 sinkkafkaSource.print();// 5、executeenv.execute();}
關于watermark內容可參考文章:7、Flink四大基石之Time和WaterMaker詳解與詳細示例(watermaker基本使用、kafka作為數據源的watermaker使用示例以及超出最大允許延遲數據的接收實現)
9)、空閑
如果并行度高于分區數,Kafka Source 不會自動進入空閑狀態。您將需要降低并行度或向水印策略添加空閑超時。如果在這段時間內沒有記錄在流的分區中流動,則該分區被視為“空閑”并且不會阻止下游操作符中水印的進度。
如果數據源中的某一個分區/分片在一段時間內未發送事件數據,則意味著 WatermarkGenerator 也不會獲得任何新數據去生成 watermark。我們稱這類數據源為空閑輸入或空閑源。在這種情況下,當某些其他分區仍然發送事件數據的時候就會出現問題。由于下游算子 watermark 的計算方式是取所有不同的上游并行數據源 watermark 的最小值,則其 watermark 將不會發生變化。
為了解決這個問題,你可以使用 WatermarkStrategy 來檢測空閑輸入并將其標記為空閑狀態。WatermarkStrategy 為此提供了一個工具接口:
WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
- 示例代碼
import java.time.Duration;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;/*** @author alanchan**/
public class TestKafkaSourceWithWatermarkDemo {public static void test1() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 source KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("topic_alan").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();// 設置Watermaker = 當前最大的事件時間 - 最大允許的延遲時間或亂序時間// default WatermarkStrategy<T> withIdleness(Duration idleTimeout) // static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) DataStream< String> kafkaDS = env.fromSource(source,(WatermarkStrategy)WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60)).withIdleness(Duration.ofMinutes(60)), "Kafka Source");// DataStreamSource<String> kafkaDS = env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "Kafka Source");// 3、 transformation// 4、 sinkkafkaDS.print();// 5、executeenv.execute();}public static void main(String[] args) throws Exception {test1();}}
10)、消費位點提交
Kafka source 在 checkpoint 完成時提交當前的消費位點 ,以保證 Flink 的 checkpoint 狀態和 Kafka broker 上的提交位點一致。如果未開啟 checkpoint,Kafka source 依賴于 Kafka consumer 內部的位點定時自動提交邏輯,自動提交功能由 enable.auto.commit 和 auto.commit.interval.ms 兩個 Kafka consumer 配置項進行配置。
Kafka source 不依賴于 broker 上提交的位點來恢復失敗的作業。提交位點只是為了上報 Kafka consumer 和消費組的消費進度,以在 broker 端進行監控。
11)、監控
Kafka source 會在不同的范圍 (Scope)中匯報下列指標。
1、范圍
每個metric 度量都被分配了一個標識符和一組key-value對,在這些key-value對下將報告度量。
標識符基于3個組件:注冊度量時的用戶定義名稱、可選的用戶定義范圍和系統提供的范圍。例如,如果A.B是系統作用域,C.D是用戶作用域,E是名稱,那么度量的標識符將是A.B.C.D.E。
您可以通過在conf/flink-conf.yaml中設置metrics.scope.delimiter鍵來配置用于標識符的分隔符(默認值:.)。
1)、用戶范圍
您可以通過調用MetricGroup#addGroup(String name)、MetricGroup#addGroup(int name) 或MetricGroup#addGroup(String key, String value)來定義用戶作用域。這些方法影響MetricGroup#getMetricIdentifier和MetricGroup#getScopeComponents返回的內容。
counter = getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter");counter = getRuntimeContext().getMetricGroup().addGroup("MyMetricsKey", "MyMetricsValue").counter("myCounter");
2)、系統范圍System Scope
系統范圍包含有關度量的上下文信息,例如它在哪個任務中注冊,或者該任務屬于哪個作業。
應該包括哪些上下文信息可以通過在conf/flink-conf.yaml中設置以下鍵來配置。這些鍵中的每一個都需要一個格式字符串,該字符串可能包含常量(例如“taskmanager”)和變量(例如“<task_id>”),這些常量和變量將在運行時被替換。
- metrics.scope.jm
Default: .jobmanager
應用于job manager范圍內的所有指標 - metrics.scope.jm-job
Default: .jobmanager.<job_name>
應用于 job manager and job范圍內的所有度量 - metrics.scope.tm
Default: .taskmanager.<tm_id>
應用于task manager范圍內的所有度量 - metrics.scope.tm-job
Default: .taskmanager.<tm_id>.<job_name>
應用于范圍為task manager and job的所有度量 - metrics.scope.task
Default: .taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
應用于task范圍內的所有度量 - metrics.scope.operator
Default: .taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
應用于作用域為operator的所有度量
變量的數量或順序沒有限制。變量區分大小寫。
操作員度量的默認作用域將產生類似于 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的標識符
如果還希望包含任務名稱但省略task manager信息,則可以指定以下格式:
metrics.scope.operator: .<job_name>.<task_name>.<operator_name>.<subtask_index>
這可以創建標識符localhost localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric.
對于此格式字符串,如果同一作業同時運行多次,可能會發生標識符沖突,從而導致度量數據不一致。因此,建議使用通過包括id(例如<job_id>)或通過為作業和運算符分配唯一名稱來提供一定程度的唯一性的格式字符串。
3)、所有變量列表
- JobManager:
- TaskManager: , <tm_id>
- Job: <job_id>, <job_name>
- Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
- Operator: <operator_id>,<operator_name>, <subtask_index>
對于Batch API, <operator_id> = <task_id>.
4)、用戶變量
您可以通過調用MetricGroup#addGroup(String key, String value)來定義用戶變量。此方法會影響MetricGroup#getMetricIdentifier、MetricGroup#getScopeComponents和MetricGroup#getAllVariables()返回的內容。
用戶變量不能用于范圍格式。
2、指標范圍
該指標反映了最后一條數據的瞬時值。之所以提供瞬時值是因為統計延遲直方圖會消耗更多資源,瞬時值通常足以很好地反映延遲。
3、Kafka Consumer 指標
Kafka consumer 的所有指標都注冊在指標組 KafkaSourceReader.KafkaConsumer 下。
例如 Kafka consumer 的指標 records-consumed-total 將在該 Flink 指標中匯報: <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total。
您可以使用配置項 register.consumer.metrics 配置是否注冊 Kafka consumer 的指標 。默認此選項設置為 true。
關于 Kafka consumer 的指標,您可以參考 Apache Kafka 文檔 了解更多詳細信息。
12)、安全
要啟用加密和認證相關的安全配置,只需將安全配置作為其他屬性配置在 Kafka source 上即可。
下面代碼未經過驗證,由于缺乏環境,代碼來源于官網示例。
下面的代碼片段展示了如何配置 Kafka source 以使用 PLAIN 作為 SASL 機制并提供 JAAS 配置:
KafkaSource.builder().setProperty("security.protocol", "SASL_PLAINTEXT").setProperty("sasl.mechanism", "PLAIN").setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
使用 SASL_SSL 作為安全協議并使用 SCRAM-SHA-256 作為 SASL 機制:
KafkaSource.builder().setProperty("security.protocol", "SASL_SSL")// SSL 配置// 配置服務端提供的 truststore (CA 證書) 的路徑.setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks").setProperty("ssl.truststore.password", "test1234")// 如果要求客戶端認證,則需要配置 keystore (私鑰) 的路徑.setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks").setProperty("ssl.keystore.password", "test1234")// SASL 配置// 將 SASL 機制配置為 as SCRAM-SHA-256.setProperty("sasl.mechanism", "SCRAM-SHA-256")// 配置 JAAS.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
如果在作業 JAR 中 Kafka 客戶端依賴的類路徑被重置了(relocate class),登錄模塊(login module)的類路徑可能會不同,因此請根據登錄模塊在 JAR 中實際的類路徑來改寫以上配置。
關于安全配置的詳細描述,請參閱 Apache Kafka 文檔中的"安全"一節。
13)、實現細節
在新 Source API 的抽象中,Kafka source 由以下幾個部分組成:
1、數據源分片(Source Split)
Kafka source 的數據源分片(source split)表示 Kafka topic 中的一個 partition。Kafka 的數據源分片包括:
- 該分片表示的 topic 和 partition
- 該 partition 的起始位點
- 該 partition 的停止位點,當 source 運行在批模式時適用
Kafka source 分片的狀態同時存儲該 partition 的當前消費位點,該分片狀態將會在 Kafka 源讀取器(source reader)進行快照(snapshot) 時將當前消費位點保存為起始消費位點以將分片狀態轉換成不可變更的分片。
可查看 KafkaPartitionSplit 和 KafkaPartitionSplitState 類來了解細節。
2、分片枚舉器(Split Enumerator)
Kafka source 的分片枚舉器負責檢查在當前的 topic / partition 訂閱模式下的新分片(partition),并將分片輪流均勻地分配給源讀取器(source reader)。 注意 Kafka source 的分片枚舉器會將分片主動推送給源讀取器,因此它無需處理來自源讀取器的分片請求。
3、源讀取器(Source Reader)
Kafka source 的源讀取器擴展了 SourceReaderBase,并使用單線程復用(single thread multiplex)的線程模型,使用一個由分片讀取器 (split reader)驅動的 KafkaConsumer 來處理多個分片(partition)。消息會在從 Kafka 拉取下來后在分片讀取器中立刻被解析。分片的狀態 即當前的消息消費進度會在 KafkaRecordEmitter 中更新,同時會在數據發送至下游時指定事件時間。
3、kafka sourcefunction
FlinkKafkaConsumer 已被棄用并將在 Flink 1.17 中移除,請改用 KafkaSource。
1.13版本的實現參考本文開頭的示例。
4、kafka sink
KafkaSink 可將數據流寫入一個或多個 Kafka topic。
1)、使用示例
Kafka sink 提供了構建類來創建 KafkaSink 的實例。
以下代碼片段展示了如何將字符串數據按照至少一次(at lease once)的語義保證寫入 Kafka topic:
1、Flink 1.13版本實現
- 實現代碼
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestKafkaSinkDemo {public static void test1() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、source-主題:alan_source// 準備kafka連接參數Properties propSource = new Properties();propSource.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");// 集群地址propSource.setProperty("group.id", "flink_kafka");propSource.setProperty("auto.offset.reset", "latest");propSource.setProperty("flink.partition-discovery.interval-millis", "5000");propSource.setProperty("enable.auto.commit", "true");// 自動提交的時間間隔propSource.setProperty("auto.commit.interval.ms", "2000");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("alan_source", new SimpleStringSchema(), propSource);// 使用kafkaSourceDataStream<String> kafkaDS = env.addSource(kafkaSource);// 3、transformation-統計單詞個數SingleOutputStreamOperator<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {private Random ran = new Random();@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {System.out.println("輸出:" + value.f0 + "->" + value.f1);return value.f0 + "->" + value.f1;}});// 4、sink-主題alan_sinkProperties propSink = new Properties();propSink.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");propSink.setProperty("transaction.timeout.ms", "5000");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("alan_sink", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), propSink,FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-toleranceresult.addSink(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {test1();}}
- 驗證
1、創建kafka 主題 alan_source 和 alan_sink
2、驅動程序,觀察運行控制臺
3、通過命令往alan_source 寫入數據,同時消費 alan_sink 主題的數據
## kafka生產數據
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_source
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>## kafka消費數據
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_sink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
hello->2
alan->3
alach->2
hello->3
123->1
4、應用程序控制臺輸出
2、Flink 1.17版本實現
- 代碼實現
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestKafkaSinkDemo {public static void test2() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_nsource").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformationDataStream<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {System.out.println("輸出:" + value.f0 + "->" + value.f1);return value.f0 + "->" + value.f1;}});// 4、 sinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("alan_nsink").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();result.sinkTo(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {
// test1();test2();}}
- 驗證
1、創建kafka 主題 alan_nsource 和 alan_nsink
2、驅動程序,觀察運行控制臺
3、通過命令往alan_nsource 寫入數據,同時消費 alan_nsink 主題的數據
## kafka生產數據
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>## kafka消費數據
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
alach->2
alan->3
hello->2
hello->3
123->1
4、應用程序控制臺輸出
3、說明
以下屬性在構建 KafkaSink 時是必須指定的:
- Bootstrap servers, setBootstrapServers(String)
- 消息序列化器(Serializer), setRecordSerializer(KafkaRecordSerializationSchema)
- 如果使用DeliveryGuarantee.EXACTLY_ONCE 的語義保證,則需要使用 setTransactionalIdPrefix(String)
2)、序列化器
構建時需要提供 KafkaRecordSerializationSchema 來將輸入數據轉換為 Kafka 的 ProducerRecord。Flink 提供了 schema 構建器 以提供一些通用的組件,例如消息鍵(key)/消息體(value)序列化、topic 選擇、消息分區,同樣也可以通過實現對應的接口來進行更豐富的控制。
KafkaRecordSerializationSchema.builder().setTopicSelector(new TopicSelector() {@Overridepublic String apply(Object t) {//設置選擇的 topic return "alan_nsink";}}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build()
- 示例代碼
import java.util.Properties;
import java.util.Random;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.TopicSelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestKafkaSinkDemo {public static void test3() throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_nsource").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformationDataStream<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t -> t.f0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {System.out.println("輸出:" + value.f0 + "->" + value.f1);return value.f0 + "->" + value.f1;}});// 4、 sinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopicSelector(new TopicSelector() {@Overridepublic String apply(Object t) {//設置選擇的 topic return "alan_nsink";}}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();result.sinkTo(kafkaSink);// 5、executeenv.execute();}public static void main(String[] args) throws Exception {test3();}}
- 驗證
1、創建kafka 主題 alan_nsource 和 alan_nsink
2、驅動程序,觀察運行控制臺
3、通過命令往alan_nsource 寫入數據,同時消費 alan_nsink 主題的數據
## kafka生產數據
[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_nsource
>alan,alach,alanchan,hello
>alan_chan,hi,flink
>alan,flink,good
>alan,alach,alanchan,hello
>hello,123
>## kafka消費數據
[alanchan@server2 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_nsink --from-beginning
alanchan->1
hello->1
alan->1
alach->1
flink->1
alan_chan->1
hi->1
alan->2
flink->2
good->1
alanchan->2
alach->2
alan->3
hello->2
hello->3
123->1
4、應用程序控制臺輸出
其中消息體(value)序列化方法和 topic 的選擇方法是必須指定的,此外也可以通過 setKafkaKeySerializer(Serializer) 或 setKafkaValueSerializer(Serializer) 來使用 Kafka 提供而非 Flink 提供的序列化器。
3)、容錯
KafkaSink 總共支持三種不同的語義保證(DeliveryGuarantee)。對于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCE,Flink checkpoint 必須啟用。
默認情況下 KafkaSink 使用 DeliveryGuarantee.NONE。
以下是對不同語義保證的解釋:
- DeliveryGuarantee.NONE 不提供任何保證:消息有可能會因 Kafka broker 的原因發生丟失或因 Flink 的故障發生重復。
- DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 時會等待 Kafka 緩沖區中的數據全部被 Kafka producer 確認。消息不會因 Kafka broker 端發生的事件而丟失,但可能會在 Flink 重啟時重復,因為 Flink 會重新處理舊數據。
- DeliveryGuarantee.EXACTLY_ONCE: 該模式下,Kafka sink 會將所有數據通過在 checkpoint 時提交的事務寫入。因此,如果 consumer 只讀取已提交的數據(參見 Kafka consumer 配置 isolation.level),在 Flink 發生重啟時不會發生數據重復。然而這會使數據在 checkpoint 完成時才會可見,因此請按需調整 checkpoint 的間隔。請確認事務 ID 的前綴(transactionIdPrefix)對不同的應用是唯一的,以保證不同作業的事務 不會互相影響!此外,強烈建議將 Kafka 的事務超時時間調整至遠大于 checkpoint 最大間隔 + 最大重啟時間,否則 Kafka 對未提交事務的過期處理會導致數據丟失。
關于容錯,請參考文章:9、Flink四大基石之Checkpoint容錯機制詳解及示例(checkpoint配置、重啟策略、手動恢復checkpoint和savepoint)
4)、監控
Kafka sink 會在不同的范圍(Scope)中匯報下列指標。
5、kafka producer
FlinkKafkaProducer 已被棄用并將在 Flink 1.15 中移除,請改用 KafkaSink。
關于Flink 1.13版本的實現,請參考上文中的示例。
6、kafka 連接器指標
Flink 的 Kafka 連接器通過 Flink 的指標系統提供一些指標來幫助分析 connector 的行為。 各個版本的 Kafka producer 和 consumer 會通過 Flink 的指標系統匯報 Kafka 內部的指標。 該 Kafka 文檔列出了所有匯報的指標。
同樣也可通過將 Kafka source 在該章節描述的 register.consumer.metrics,或 Kafka sink 的 register.producer.metrics 配置設置為 false 來關閉 Kafka 指標的注冊。
7、啟用 Kerberos 身份驗證
Flink 通過 Kafka 連接器提供了一流的支持,可以對 Kerberos 配置的 Kafka 安裝進行身份驗證。只需在 flink-conf.yaml 中配置 Flink。
像這樣為 Kafka 啟用 Kerberos 身份驗證:
1、通過設置以下內容配置 Kerberos 票據
- security.kerberos.login.use-ticket-cache:默認情況下,這個值是 true,Flink 將嘗試在 kinit 管理的票據緩存中使用 Kerberos 票據。注意!在 YARN 上部署的 Flink jobs 中使用 Kafka 連接器時,使用票據緩存的 Kerberos 授權將不起作用。
- security.kerberos.login.keytab 和 security.kerberos.login.principal:要使用 Kerberos keytabs,需為這兩個屬性設置值。
2、將 KafkaClient 追加到 security.kerberos.login.contexts:這告訴 Flink 將配置的 Kerberos 票據提供給 Kafka 登錄上下文以用于 Kafka 身份驗證。
一旦啟用了基于 Kerberos 的 Flink 安全性后,只需在提供的屬性配置中包含以下兩個設置(通過傳遞給內部 Kafka 客戶端),即可使用 Flink Kafka Consumer 或 Producer 向 Kafk a進行身份驗證:
- 將 security.protocol 設置為 SASL_PLAINTEXT(默認為 NONE):用于與 Kafka broker 進行通信的協議。使用獨立 Flink 部署時,也可以使用 SASL_SSL;請在此處查看如何為 SSL 配置 Kafka 客戶端。
- 將 sasl.kerberos.service.name 設置為 kafka(默認為 kafka):此值應與用于 Kafka broker 配置的 sasl.kerberos.service.name 相匹配。客戶端和服務器配置之間的服務名稱不匹配將導致身份驗證失敗。
有關 Kerberos 安全性 Flink 配置的更多信息,請參見這里。你也可以在這里進一步了解 Flink 如何在內部設置基于 kerberos 的安全性。
該部分由于沒有環境,未做驗證,內容來至于官網。將來擬計劃以專欄的形式介紹該部分內容。
8、升級到最近的連接器版本
通用的升級步驟概述見 升級 Jobs 和 Flink 版本指南。對于 Kafka,你還需要遵循這些步驟:
- 不要同時升級 Flink 和 Kafka 連接器
- 確保你對 Consumer 設置了 group.id
- 在 Consumer 上設置 setCommitOffsetsOnCheckpoints(true),以便讀 offset 提交到 Kafka。務必在停止和恢復 savepoint 前執行此操作。你可能需要在舊的連接器版本上進行停止/重啟循環來啟用此設置。
- 在 Consumer 上設置 setStartFromGroupOffsets(true),以便我們從 Kafka 獲取讀 offset。這只會在 Flink 狀態中沒有讀 offset 時生效,這也是為什么下一步非要重要的原因。
- 修改 source/sink 分配到的 uid。這會確保新的 source/sink 不會從舊的 sink/source 算子中讀取狀態。
- 使用 --allow-non-restored-state 參數啟動新 job,因為我們在 savepoint 中仍然有先前連接器版本的狀態。
9、問題排查
如果在使用 Flink 時對 Kafka 有問題,Flink 只封裝 KafkaConsumer 或 KafkaProducer,你的問題可能獨立于 Flink,有時可以通過升級 Kafka broker 程序、重新配置 Kafka broker 程序或在 Flink 中重新配置 KafkaConsumer 或 KafkaProducer 來解決。
一句話,大概是kafka的問題或配置的kafka的問題,和flink關系不大。
下面列出了一些常見問題的示例。
1)、數據丟失
根據你的 Kafka 配置,即使在 Kafka 確認寫入后,你仍然可能會遇到數據丟失。特別要記住在 Kafka 的配置中設置以下屬性:
- acks
- log.flush.interval.messages
- log.flush.interval.ms
- log.flush.*
上述選項的默認值是很容易導致數據丟失的。請參考 Kafka 文檔以獲得更多的解釋。
2)、UnknownTopicOrPartitionException
導致此錯誤的一個可能原因是正在進行新的 leader 選舉,例如在重新啟動 Kafka broker 之后或期間。這是一個可重試的異常,因此 Flink job 應該能夠重啟并恢復正常運行。也可以通過更改 producer 設置中的 retries 屬性來規避。但是,這可能會導致重新排序消息,反過來可以通過將 max.in.flight.requests.per.connection 設置為 1 來避免不需要的消息。
3)、ProducerFencedException
這個錯誤是由于 FlinkKafkaProducer 所生成的 transactional.id 與其他應用所使用的的產生了沖突。多數情況下,由于 FlinkKafkaProducer 產生的 ID 都是以 taskName + “-” + operatorUid 為前綴的,這些產生沖突的應用也是使用了相同 Job Graph 的 Flink Job。 我們可以使用 setTransactionalIdPrefix() 方法來覆蓋默認的行為,為每個不同的 Job 分配不同的 transactional.id 前綴來解決這個問題。
以上,本文系統介紹了kafka連接器的source、sink、身份驗證、版本升級、問題排查的幾個主要 方面,關于常用的功能均以可運行的示例進行展示并提供完整的驗證步驟。
本專題為了便于閱讀以及整體查閱分為三個部分:
40、Flink 的Apache Kafka connector(kafka source的介紹及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介紹及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 說明及使用示例) 完整版