文章目錄
- 前言
- Kafka連接器基礎案例演示
- 前置說明和環境準備步驟
- Kafka連接器基本配置
- 關聯數據源
- 映射轉換
- 案例效果演示
- 基于Kafka連接器同步數據到MySQL
- 案例說明
- 前置準備
- Kafka連接器消費位點調整
- 映射轉換與數據投遞
- MysqlSlink持久化收集器數據
- 最終效果演示
- 小結
- 參考
前言
本文將基于內置kafka連接器演示如何使用kafka內置流收集器的api完成Kafka數據的采集,同時我們也會給出一個收集Kafka數據流數據保存到MySQL的示例,希望對你有幫助。
Kafka連接器基礎案例演示
前置說明和環境準備步驟
本案例將基于Kafka投遞的單詞(用逗號分隔),通過flink完成抽取,切割為獨立單詞,并完成詞頻統計,例如我們輸入hello,world,最終控制臺就會輸出hello,1
和world,1
。
在正式演示之前,筆者介紹一些flink
的使用版本:
<flink.version>1.16.0</flink.version>
對應還有下面這些依賴分別用于:
- 使用
Kafka
連接器 - 使用hutool的jdbc連接器
- MySQL驅動包
<!-- CSV Format for Kafka (因為你的配置中用了 'format' = 'csv') --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- JDBC Connector (用于你的 spend_report 表寫入 MySQL) --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version> <!-- 推薦使用 8.0.x 版本 --></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency>
完成這些后我們將Kafka等相關環境準備好就可以著手編碼工作了。
Kafka連接器基本配置
首先我們基于StreamExecutionEnvironment
初始化環境構建配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
然后我們就可以基于內置的KafkaSource
的建造者模式完成如Kafka連接器的構建:
setBootstrapServers
設置Kafka
地址為broker
字符串配置的ip和端口號setTopics
設置消費的主題為input-topic
setGroupId
當前kafka消費者組為my-group
setStartingOffsets
設置為從最早偏移量開始消費setValueOnlyDeserializer
設置收到Kafka
數據時直接反序列化為字符串
對應的代碼如下所示:
//基于建造者模式完成Kafka連接器的配置KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers)//設置Kafka server端地址.setTopics("input-topic") //指定消費的Topic為input-topic.setGroupId("my-group")//設置消費組ID為my-group.setStartingOffsets(OffsetsInitializer.earliest())//設置從Kafka的最開始位置開始消費.setValueOnlyDeserializer(new SimpleStringSchema())// 設置數據直接反序列化為字符串.build();
這里需要補充一下關于Kafka
消費位點的設置,flink已經內置了如下幾種消費位點的設置,對應的代碼配置示例如下,讀者可參閱并進行配置:
KafkaSource.builder()