kafka + flink +mysql 案例

假設你有兩個Kafka主題:user_activities_topicproduct_views_topic,并且你希望將user_activities_topic中的數據寫入到user_activities表,而將product_views_topic中的數據寫入到product_views表。

maven

<dependencies><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.14.0</version></dependency><!-- MySQL JDBC Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency>
</dependencies>

Flink Job 示例代碼

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;import java.util.Properties;public class MultipleKafkaToFlinkToMysql {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置Kafka消費者屬性Properties kafkaProperties = new Properties();kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");kafkaProperties.setProperty("group.id", "test");// 創建第一個Kafka消費者 (User Activities)FlinkKafkaConsumer<String> userActivitiesConsumer = new FlinkKafkaConsumer<>("user_activities_topic",new SimpleStringSchema(),kafkaProperties);// 創建第二個Kafka消費者 (Product Views)FlinkKafkaConsumer<String> productViewsConsumer = new FlinkKafkaConsumer<>("product_views_topic",new SimpleStringSchema(),kafkaProperties);// 從Kafka獲取用戶活動數據流env.addSource(userActivitiesConsumer).map(value -> {String[] parts = value.split(",");return new UserActivity(parts[0], parts[1]);}).addSink(JdbcSink.sink("INSERT INTO user_activities (user_id, activity) VALUES (?, ?)",(statement, userActivity) -> {statement.setString(1, userActivity.userId);statement.setString(2, userActivity.activity);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));// 從Kafka獲取產品瀏覽數據流env.addSource(productViewsConsumer).map(value -> {String[] parts = value.split(",");return new ProductView(parts[0], Integer.parseInt(parts[1]));}).addSink(JdbcSink.sink("INSERT INTO product_views (user_id, product_id) VALUES (?, ?)",(statement, productView) -> {statement.setString(1, productView.userId);statement.setInt(2, productView.productId);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));env.execute("Multiple Kafka to Multiple MySQL Tables with Flink");}// 用戶活動類public static class UserActivity {public String userId;public String activity;public UserActivity(String userId, String activity) {this.userId = userId;this.activity = activity;}}// 產品瀏覽類public static class ProductView {public String userId;public int productId;public ProductView(String userId, int productId) {this.userId = userId;this.productId = productId;}}
}

當處理多個消費者和表時,直接為每個消費者編寫獨立的代碼會導致代碼冗長且難以維護。為了提高代碼的可維護性和擴展性,可以采用一些設計模式和抽象方法來簡化代碼結構。以下是一些改進策略:

### 1. 使用工廠模式和配置文件

通過使用工廠模式和配置文件,可以將不同Kafka主題和MySQL表的映射關系抽象出來,從而減少重復代碼。

### 2. 示例代碼重構

下面是一個示例,展示了如何通過配置文件和工廠模式來管理多個Kafka消費者和相應的MySQL輸出。

#### 2.1 配置文件 (`application.yaml`)

首先,定義一個配置文件來描述每個消費者的配置信息,包括Kafka主題、目標MySQL表名以及字段映射等。

consumers:- name: user_activities_consumerkafka_topic: user_activities_topicmysql_table: user_activitiesfields:- { index: 0, column: user_id }- { index: 1, column: activity }- name: product_views_consumerkafka_topic: product_views_topicmysql_table: product_viewsfields:- { index: 0, column: user_id }- { index: 1, column: product_id }

#### 2.2 工廠類 (`ConsumerFactory.java`)

創建一個工廠類,根據配置文件中的信息動態生成消費者并設置其數據處理邏輯。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import java.util.Properties;
import java.util.List;
import java.util.Map;public class ConsumerFactory {public static void createAndRegisterConsumers(StreamExecutionEnvironment env, List<Map<String, Object>> consumers) {Properties kafkaProperties = new Properties();kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");kafkaProperties.setProperty("group.id", "test");for (Map<String, Object> consumerConfig : consumers) {String kafkaTopic = (String) consumerConfig.get("kafka_topic");String mysqlTable = (String) consumerConfig.get("mysql_table");List<Map<String, Object>> fields = (List<Map<String, Object>>) consumerConfig.get("fields");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(kafkaTopic,new SimpleStringSchema(),kafkaProperties);env.addSource(kafkaConsumer).map(value -> parseMessage(value, fields)).addSink(JdbcSink.sink(generateInsertSQL(mysqlTable, fields),(statement, record) -> populateStatement(statement, record, fields),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));}}private static Map<String, Object> parseMessage(String value, List<Map<String, Object>> fields) {String[] parts = value.split(",");return fields.stream().collect(Collectors.toMap(field -> (String) field.get("column"),field -> parts[(Integer) field.get("index")]));}private static String generateInsertSQL(String table, List<Map<String, Object>> fields) {StringBuilder columns = new StringBuilder();StringBuilder placeholders = new StringBuilder();for (int i = 0; i < fields.size(); i++) {if (i > 0) {columns.append(", ");placeholders.append(", ");}columns.append(fields.get(i).get("column"));placeholders.append("?");}return "INSERT INTO " + table + " (" + columns.toString() + ") VALUES (" + placeholders.toString() + ")";}private static void populateStatement(java.sql.PreparedStatement statement, Map<String, Object> record, List<Map<String, Object>> fields) throws Exception {for (int i = 0; i < fields.size(); i++) {String column = (String) fields.get(i).get("column");Object value = record.get(column);if (value instanceof Integer) {statement.setInt(i + 1, (Integer) value);} else if (value instanceof String) {statement.setString(i + 1, (String) value);}// 其他類型可以根據需要添加}}
}

#### 2.3 主程序 (`Main.java`)

在主程序中加載配置文件,并調用工廠類來注冊所有消費者。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.yaml.snakeyaml.Yaml;import java.io.InputStream;
import java.util.List;
import java.util.Map;public class Main {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Yaml yaml = new Yaml();InputStream inputStream = Main.class.getClassLoader().getResourceAsStream("application.yaml");Map<String, Object> config = yaml.load(inputStream);List<Map<String, Object>> consumers = (List<Map<String, Object>>) config.get("consumers");ConsumerFactory.createAndRegisterConsumers(env, consumers);env.execute("Multiple Kafka to Multiple MySQL Tables with Flink");}
}

### 關鍵點解釋

1. **配置文件**:通過配置文件定義每個消費者的信息,使得添加新的消費者變得簡單,只需修改配置文件即可。
? ?
2. **工廠模式**:使用工廠類 `ConsumerFactory` 根據配置動態創建消費者,并為其設置數據處理邏輯和輸出目標。

3. **通用的數據處理邏輯**:`parseMessage` 方法根據配置文件中的字段映射解析消息,`generateInsertSQL` 和 `populateStatement` 方法則用于生成插入SQL語句和填充PreparedStatement。

4. **擴展性**:這種設計方式非常靈活,易于擴展。如果需要增加新的消費者或修改現有消費者的配置,只需更新配置文件而無需更改代碼邏輯。

這種方法不僅減少了代碼量,還提高了代碼的可維護性和擴展性,使得系統更容易管理和維護。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/72893.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/72893.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/72893.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

遠程登錄客戶端軟件 CTerm 發布了 v4.0.0

有時候我們需要遠程登錄到 Linux/Unix 服務器&#xff0c;這方面使用最廣泛的客戶端軟件是 PuTTY&#xff0c;不過它是全英文的&#xff0c;而且是單窗口的&#xff0c;有時候顯得不那么方便。 CTerm (Clever Terminal) 是一個 Windows 平臺下支持 Telnet 和 SSH 協議進行遠程…

從李佳琦團隊看新型用工:靈活就業如何重構組織架構?

2022年“雙11”期間&#xff0c;李佳琦直播間累計銷售額突破115億元&#xff08;來源&#xff1a;新腕數據《2022雙11直播電商戰報》&#xff09;&#xff0c;其背后團隊規模約400人&#xff0c;但全職員工僅占35%&#xff0c;其余65%為外包選品團隊、兼職客服、第三方MCN機構人…

微軟程序的打包格式MSIX

MSIX 微軟推出的MSIX格式是其為統一Windows應用程序打包和部署而設計的新一代安裝包格式&#xff0c;具有以下核心特點和進展&#xff1a; 1. 推出背景與時間線 MSIX最初于2018年在微軟Build大會上宣布&#xff0c;并在同年7月發布預覽版打包工具&#xff0c;10月正式版上線…

AFL++安裝

學習fuzzing也幾天了&#xff0c;今天記錄AFL的安裝及使用 一、實驗環境 虛擬機&#xff1a;ubuntu20.04 當然也可以uname -a去看自己的版本號 二、AFL安裝 1.先更新一下工具 sudo apt update2.安裝AFL必要的一些依賴&#xff0c;例如編譯工具&#xff08;如 build-essen…

【STM32】ADC功能-單通道多通道(學習筆記)

本章結合上一節內容復習更好理解【江協科技STM32】ADC數模轉換器-學習筆記-CSDN博客 一、ADC單通道 接線圖 ADC初始化 ①RCC開啟時鐘&#xff0c;包括ADC和GPIO的時鐘&#xff0c;另外ADCCLK的分頻器也要配置 ②配置GPIO,&#xff0c;把需要用的GPIO配置成模擬輸入模式&am…

基于YOLO11深度學習的運動品牌LOGO檢測與識別系統【python源碼+Pyqt5界面+數據集+訓練代碼】

《------往期經典推薦------》 一、AI應用軟件開發實戰專欄【鏈接】 項目名稱項目名稱1.【人臉識別與管理系統開發】2.【車牌識別與自動收費管理系統開發】3.【手勢識別系統開發】4.【人臉面部活體檢測系統開發】5.【圖片風格快速遷移軟件開發】6.【人臉表表情識別系統】7.【…

當前主流的大模型訓練與推理框架的全面匯總

以下是當前主流的大模型訓練與推理框架的全面匯總 以下是更新后包含 SGLang 的大模型訓練與推理框架列表&#xff0c;并對分類和示例進行了優化&#xff1a; 一、通用深度學習推理框架 TensorRT-LLM 特點&#xff1a;NVIDIA推出的針對Transformer類模型的優化框架&#xff0c;支…

Linux學習(八)(服務管理(檢查服務狀態,開始/停止服務,檢查服務日志,創建新服務))

服務管理 Linux 中的服務管理是指控制 Linux 在啟動和關閉計算機的過程中啟動和停止的服務&#xff08;或“守護程序”&#xff09;的系統。這些服務執行各種功能&#xff0c;并提供未附加到用戶界面的進程。 Linux 系統&#xff0c;尤其是系統管理員&#xff0c;通常需要管理…

ElasticSearch 分詞器介紹及測試:Standard(標準分詞器)、English(英文分詞器)、Chinese(中文分詞器)、IK(IK 分詞器)

ElasticSearch 分詞器介紹及測試&#xff1a;Standard&#xff08;標準分詞器&#xff09;、English&#xff08;英文分詞器&#xff09;、Chinese&#xff08;中文分詞器&#xff09;、IK&#xff08;IK 分詞器&#xff09; ElasticSearch 分詞器介紹及測試1. Standard Analyz…

【計算機網絡】確認家庭網絡是千兆/百兆帶寬并排查問題

要確認你的帶寬是千兆&#xff08;1000Mbps&#xff09;還是百兆&#xff08;100Mbps&#xff09;&#xff0c;可以通過以下方法逐步排查&#xff1a; 一、檢查物理設備 1. 查看路由器和光貓的網口 千兆網口&#xff1a;路由器或光貓的網口旁通常會標注 “10/100/1000M” 或 …

[數據分享第七彈]全球洪水相關數據集

洪水是一種常見的自然災害&#xff0c;在全球范圍內造成了極為嚴重的威脅。近年來&#xff0c;針對洪水事件的檢測分析&#xff0c;以及對于洪水災害和災后恢復能力的研究日漸增多&#xff0c;也產生了眾多洪水數據集。今天&#xff0c;我們一起來收集整理一下相關數據集。&…

深入探討AI-Ops架構 第一講 - 運維的進化歷程以及未來發展趨勢

首先&#xff0c;讓我們一起回顧運維的進化之路&#xff0c;然后再深入探討AI-Ops架構的細節。 運維的進化歷程 1. AI 大范圍普及前的運維狀態 (傳統運維) 在AI技術尚未廣泛滲透到運維領域之前&#xff0c;我們稱之為傳統運維&#xff0c;其主要特點是&#xff1a; 人工驅動…

Hive-數據傾斜優化

數據傾斜的原因 1&#xff09;key分布不均勻&#xff0c;本質上就是業務數據有可能會存在傾斜 2&#xff09;某些SQL語句本身就有數據傾斜 關鍵詞 情形 后果 Join A、其中一個表較小&#xff0c;但是key集中; B、兩張表都是大表&#xff0c;key不均 分發到…

番外篇 - Docker的使用

一、Docker的介紹 Docker 是一個開源的應用容器引擎&#xff0c;基于 Go 語言 并遵從Apache2.0協議開源。 Docker 可以讓開發者打包他們的應用以及依賴包到一個輕量級、可移植的容器中&#xff0c;然后發布到任何流行的 Linux 機器上&#xff0c;也可以實現虛擬化。 容器是完…

深度學習與普通神經網絡有何區別?

深度學習與普通神經網絡的主要區別體現在以下幾個方面&#xff1a; 一、結構復雜度 普通神經網絡&#xff1a;通常指淺層結構&#xff0c;層數較少&#xff0c;一般為2-3層&#xff0c;包括輸入層、一個或多個隱藏層、輸出層。深度學習&#xff1a;強調通過5層以上的深度架構…

RuleOS:區塊鏈開發的“新引擎”,點燃Web3創新之火

RuleOS&#xff1a;區塊鏈開發的“新引擎”&#xff0c;點燃Web3創新之火 在區塊鏈技術的浪潮中&#xff0c;RuleOS宛如一臺強勁的“新引擎”&#xff0c;為個人和企業開發去中心化應用&#xff08;DApp&#xff09;注入了前所未有的動力。它以獨特的設計理念和強大的功能特性&…

c# MimeEntity修改郵件附件名稱

在C#中&#xff0c;當你使用如MimeKit庫來處理電子郵件時&#xff0c;你可以通過修改MimeEntity的ContentDisposition屬性來更改郵件附件的名稱。以下是如何做到這一點的步驟&#xff1a; 1. 添加MimeKit引用 首先&#xff0c;確保你的項目中已經添加了MimeKit庫。如果你使用…

Windows編譯環境搭建(MSYS2\MinGW\cmake)

我的音視頻/流媒體開源項目(github) 一、基礎環境搭建 1.1 MSYS2\MinGW 參考&#xff1a;1. 基于MSYS2的Mingw-w64 GCC搭建Windows下C開發環境_msys2使用mingw64編譯 在Widndows系統上&#xff0c;使用gcc工具鏈&#xff08;g&#xff09;進行C程序開發&#xff1f;可以的&a…

TikTok美國戰略升級:聚焦美食旅行,本地化服務如何重塑市場格局

平臺深耕本土內容生態&#xff0c;餐飲旅游創作者迎流量紅利&#xff0c;算法推薦機制激發地域經濟新活力 過去一年&#xff0c;TikTok在美國市場的動作頻頻引發行業關注。從早期以娛樂、舞蹈為主的全球化內容&#xff0c;到如今將資源向美食、旅行兩大垂類傾斜&#xff0c;這…

Unity Dots環境配置

文章目錄 前言環境配置1.新建Unity 工程2.安裝Entities包2.安裝EntitiesGraphics包3.安裝URP渲染管線 Dots窗口 前言 DOTS&#xff08;Data-Oriented Technology Stack&#xff09;是Unity推出的一種用于開發高性能游戲和應用的數據導向技術棧&#xff0c;包含三大核心組件&am…