Flink中Kafka連接器的基本應用

文章目錄

    • 前言
    • Kafka連接器基礎案例演示
      • 前置說明和環境準備步驟
      • Kafka連接器基本配置
      • 關聯數據源
      • 映射轉換
      • 案例效果演示
    • 基于Kafka連接器同步數據到MySQL
      • 案例說明
      • 前置準備
      • Kafka連接器消費位點調整
      • 映射轉換與數據投遞
      • MysqlSlink持久化收集器數據
      • 最終效果演示
    • 小結
    • 參考

前言

本文將基于內置kafka連接器演示如何使用kafka內置流收集器的api完成Kafka數據的采集,同時我們也會給出一個收集Kafka數據流數據保存到MySQL的示例,希望對你有幫助。

Kafka連接器基礎案例演示

前置說明和環境準備步驟

本案例將基于Kafka投遞的單詞(用逗號分隔),通過flink完成抽取,切割為獨立單詞,并完成詞頻統計,例如我們輸入hello,world,最終控制臺就會輸出hello,1world,1

在正式演示之前,筆者介紹一些flink的使用版本:

<flink.version>1.16.0</flink.version>

對應還有下面這些依賴分別用于:

  1. 使用Kafka連接器
  2. 使用hutool的jdbc連接器
  3. MySQL驅動包
 <!-- CSV Format for Kafka (因為你的配置中用了 'format' = 'csv') --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- JDBC Connector (用于你的 spend_report 表寫入 MySQL) --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version> <!-- 推薦使用 8.0.x 版本 --></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency>

完成這些后我們將Kafka等相關環境準備好就可以著手編碼工作了。

Kafka連接器基本配置

首先我們基于StreamExecutionEnvironment 初始化環境構建配置:

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

然后我們就可以基于內置的KafkaSource的建造者模式完成如Kafka連接器的構建:

  1. setBootstrapServers設置Kafka地址為broker字符串配置的ip和端口號
  2. setTopics設置消費的主題為input-topic
  3. setGroupId當前kafka消費者組為my-group
  4. setStartingOffsets設置為從最早偏移量開始消費
  5. setValueOnlyDeserializer設置收到Kafka數據時直接反序列化為字符串

對應的代碼如下所示:

	//基于建造者模式完成Kafka連接器的配置KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers)//設置Kafka server端地址.setTopics("input-topic") //指定消費的Topic為input-topic.setGroupId("my-group")//設置消費組ID為my-group.setStartingOffsets(OffsetsInitializer.earliest())//設置從Kafka的最開始位置開始消費.setValueOnlyDeserializer(new SimpleStringSchema())// 設置數據直接反序列化為字符串.build();

這里需要補充一下關于Kafka消費位點的設置,flink已經內置了如下幾種消費位點的設置,對應的代碼配置示例如下,讀者可參閱并進行配置:

KafkaSource.builder()

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/82457.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/82457.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/82457.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Leetcode 刷題記錄 11 —— 二叉樹第二彈

本系列為筆者的 Leetcode 刷題記錄&#xff0c;順序為 Hot 100 題官方順序&#xff0c;根據標簽命名&#xff0c;記錄筆者總結的做題思路&#xff0c;附部分代碼解釋和疑問解答&#xff0c;01~07為C語言&#xff0c;08及以后為Java語言。 01 二叉樹的層序遍歷 /*** Definition…

【R語言科研繪圖】

R語言在繪制SCI期刊圖像時具有顯著優勢&#xff0c;以下從功能、靈活性和學術適配性三個方面分析其適用性&#xff1a; 數據可視化庫豐富 R語言擁有ggplot2、lattice、ggpubr等專業繪圖包&#xff0c;支持生成符合SCI期刊要求的高分辨率圖像&#xff08;如TIFF/PDF格式&#…

【Node.js】Web開發框架

個人主頁&#xff1a;Guiat 歸屬專欄&#xff1a;node.js 文章目錄 1. Node.js Web框架概述1.1 Web框架的作用1.2 Node.js主要Web框架生態1.3 框架選擇考慮因素 2. Express.js2.1 Express.js概述2.2 基本用法2.2.1 安裝Express2.2.2 創建基本服務器 2.3 路由2.4 中間件2.5 請求…

PDF 轉 JPG 圖片小工具:CodeBuddy 助力解決轉換痛點

本文所使用的 CodeBuddy 免費下載鏈接&#xff1a;騰訊云代碼助手 CodeBuddy - AI 時代的智能編程伙伴 前言 在數字化辦公與內容創作的浪潮中&#xff0c;將 PDF 文件轉換為 JPG 圖片格式的需求日益頻繁。無論是學術文獻中的圖表提取&#xff0c;還是宣傳資料的視覺化呈現&am…

Linux 文件系統層次結構

Linux 的文件系統遵循 Filesystem Hierarchy Standard (FHS) 標準&#xff0c;其目錄結構是層次化的&#xff0c;每個目錄都有明確的用途。以下是 Linux 中部分目錄的作用解析&#xff1a; 1. 根目錄 / 作用&#xff1a;根目錄是整個文件系統的頂層目錄&#xff0c;所有其他目…

密碼學標準(Cryptography Standards)介紹

密碼學標準(Cryptography Standards)是為確保信息安全傳輸、存儲和處理而制定的一系列技術規范和協議,廣泛應用于通信、金融、互聯網等領域。以下從分類、主流標準、應用場景和發展趨勢四個方面進行詳細介紹: 一、密碼學標準的分類 密碼學標準可根據技術原理和應用場景分…

ubuntu 22.04安裝和使用docker介紹

docker安裝和使用 準備環境常見的docker操作linux系統常用的配置卸載docker 準備環境 本機環境&#xff1a; Linux yz-MS-7E06 6.8.0-59-generic #61~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Tue Apr 15 17:03:15 UTC 2 x86_64 x86_64 x86_64 GNU/Linux安裝依賴軟件&#xff1a;…

obsidian 中的查找和替換插件,支持正則

最近用著 obsidian 時&#xff0c;發現想要在當前文檔中 查找和替換 內容時&#xff0c;沒有自動查找和替換的功能&#xff0c;去插件市場查找也沒有發現好用的插件&#xff0c;那就自己寫一個吧。 全程用的 AI 來寫的&#xff0c;當然&#xff0c;我對 JS/CSS/TypeScript 等沒…

針對vue項目的webpack優化攻略

一、開發階段優化 1. 熱更新加速&#xff08;HMR&#xff09; // vue.config.js module.exports {devServer: {hot: true, // 開啟熱更新injectClient: true, // 自動注入HMR客戶端watchOptions: {ignored: /node_modules/, // 忽略node_modules變化aggregateTimeout: 300…

BTC官網關注巨鯨12億美元平倉,XBIT去中心化交易平臺表現穩定

在全球加密貨幣市場波動加劇的背景下&#xff0c;2025年5月25日傳出重磅消息。據今日最新國際報道&#xff0c;知名巨鯨James Wynn完全平倉價值12億美元的BTC多頭倉位&#xff0c;整體盈利約845萬美元&#xff0c;此舉引發市場廣泛關注。與此同時&#xff0c;收益型穩定幣市場迎…

在WPF中添加動畫背景

在WPF中添加動畫背景 在WPF中創建動畫背景可以大大增強應用程序的視覺效果。以下是幾種實現動畫背景的方法&#xff1a; 方法1&#xff1a;使用動畫ImageBrush&#xff08;圖片輪播&#xff09; <Window x:Class"AnimatedBackground.MainWindow"xmlns"htt…

單點擊登錄sso實現

一、單點登錄&#xff08;SSO&#xff09;是什么&#xff1f; 核心定義 單點登錄&#xff08;Single Sign-On&#xff0c;SSO&#xff09;是一種身份認證解決方案&#xff0c;允許用戶通過一次登錄訪問多個相互信任的應用系統。其核心邏輯是統一認證中心與分布式會話管理&…

JavaWebsocket-demo

Websocket客戶端 pom依賴 <dependency><groupId>org.java-websocket</groupId><artifactId>Java-WebSocket</artifactId><version>1.4.0</version></dependency>客戶端代碼片段 Component Slf4j public class PositionAlarmL…

Java Collection(集合) 接口

Date: 2025-05-21 20:21:32 author: lijianzhan Java 集合框架提供了一組接口和類&#xff0c;以實現各種數據結構和算法。 以下是關于 Java 集合的核心內容說明&#xff1a; /*** Java Collection Framework 說明&#xff1a;** 在 Java 中&#xff0c;集合&#xff08;Collec…

讓MySQL更快:EXPLAIN語句詳盡解析

前言 在數據庫性能調優中&#xff0c;SQL 查詢的執行效率是影響系統整體性能的關鍵因素之一。MySQL 提供了強大的工具——EXPLAIN 語句&#xff0c;幫助開發者和數據庫管理員深入分析查詢的執行計劃&#xff0c;從而發現潛在的性能瓶頸并進行針對性優化。 EXPLAIN 語句能夠模…

Java基礎 Day20

一、HashSet 集合類 1、簡介 HashSet 集合底層采取哈希表存儲數據 底層是HashMap 不能使存取有序 JDK8之前的哈希表是數組和鏈表&#xff0c;頭插法 JDK8之后的哈希表是數組、鏈表和紅黑樹&#xff0c;尾插法 2、存儲元素 &#xff08;1&#xff09;如果要保證元素的唯…

2505C++,32位轉64位

原文 假設有個想要將一個32位值傳遞給一個帶64位值的函數的函數.你不關心高32位的內容,因為該值是傳遞給回調函數的直通值,回調函數會把它截斷為32位值. 因此,你都擔心編譯器一般生成的將32位值擴展到64位值的那條指令的性能影響. 我懷疑這條指令不是程序中的性能瓶頸. 我想出…

光伏電站及時巡檢:守護清潔能源的“生命線”

在“雙碳”目標驅動下&#xff0c;光伏電站作為清潔能源的主力軍&#xff0c;正以年均20%以上的裝機增速重塑全球能源格局。然而&#xff0c;這些遍布荒漠、屋頂的“光伏矩陣”并非一勞永逸的能源提款機&#xff0c;其穩定運行高度依賴精細化的巡檢維護。山東棗莊觸電事故、衢州…

C++初階-list的使用2

目錄 1.std::list::splice的使用 2.std::list::remove和std::list::remove_if的使用 2.1remove_if函數的簡單介紹 基本用法 函數原型 使用函數對象作為謂詞 使用普通函數作為謂詞 注意事項 復雜對象示例 2.2remove與remove_if的簡單使用 3.std::list::unique的使用 …

OpenHarmony平臺驅動使用(一),ADC

OpenHarmony平臺驅動使用&#xff08;一&#xff09; ADC 概述 功能簡介 ADC&#xff08;Analog to Digital Converter&#xff09;&#xff0c;即模擬-數字轉換器&#xff0c;可將模擬信號轉換成對應的數字信號&#xff0c;便于存儲與計算等操作。除電源線和地線之外&#…