流式ETL配置指南:從MySQL到Elasticsearch的實時數據同步

流式ETL配置指南:從MySQL到Elasticsearch的實時數據同步

場景介紹

假設您運營一個電商平臺,需要將MySQL數據庫中的訂單、用戶和產品信息實時同步到Elasticsearch,以支持實時搜索、分析和儀表盤展示。傳統的批處理ETL無法滿足實時性要求,因此我們將使用Flink CDC構建流式ETL管道。

前提條件

  1. MySQL數據庫 (作為數據源)
  2. Elasticsearch (作為目標系統)
  3. Flink環境 (處理引擎)
  4. Java開發環境

步驟一:環境準備

1.1 準備MySQL環境

-- 創建數據庫
CREATE DATABASE IF NOT EXISTS shop;
USE shop;-- 創建用戶表
CREATE TABLE users (id INT PRIMARY KEY,name VARCHAR(100),email VARCHAR(100),create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- 創建產品表
CREATE TABLE products (id INT PRIMARY KEY,name VARCHAR(200),price DECIMAL(10,2),stock INT,category VARCHAR(100)
);-- 創建訂單表
CREATE TABLE orders (id INT PRIMARY KEY,user_id INT,order_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,status VARCHAR(20),total_amount DECIMAL(10,2),FOREIGN KEY (user_id) REFERENCES users(id)
);-- 創建訂單詳情表
CREATE TABLE order_items (id INT PRIMARY KEY,order_id INT,product_id INT,quantity INT,price DECIMAL(10,2),FOREIGN KEY (order_id) REFERENCES orders(id),FOREIGN KEY (product_id) REFERENCES products(id)
);-- 插入一些測試數據
INSERT INTO users VALUES (1, '張三', 'zhangsan@example.com', '2023-01-01 10:00:00');
INSERT INTO products VALUES (101, 'iPhone 14', 5999.00, 100, '電子產品');
INSERT INTO orders VALUES (1001, 1, '2023-01-05 14:30:00', '已完成', 5999.00);
INSERT INTO order_items VALUES (10001, 1001, 101, 1, 5999.00);

確保MySQL已開啟binlog,編輯MySQL配置文件:

[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL

1.2 準備Elasticsearch環境

創建索引映射:

PUT /shop_orders
{"mappings": {"properties": {"order_id": { "type": "keyword" },"user_id": { "type": "keyword" },"user_name": { "type": "keyword" },"user_email": { "type": "keyword" },"order_time": { "type": "date" },"status": { "type": "keyword" },"total_amount": { "type": "double" },"items": {"type": "nested","properties": {"product_id": { "type": "keyword" },"product_name": { "type": "text" },"quantity": { "type": "integer" },"price": { "type": "double" },"category": { "type": "keyword" }}}}}
}

步驟二:創建Flink流式ETL項目

2.1 創建Maven項目

pom.xml文件配置:

<dependencies><!-- Flink核心依賴 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version></dependency><!-- Flink CDC連接器 --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Elasticsearch連接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>1.17.0</version></dependency><!-- JSON處理 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.9.0</version></dependency>
</dependencies>

2.2 實現ETL主程序

創建MySQLToElasticsearchETL.java文件:

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class MySQLToElasticsearchETL {public static void main(String[] args) throws Exception {// 1. 設置Flink執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);  // 開發環境設置為1,生產環境根據需要調整env.enableCheckpointing(60000);  // 每60秒做一次檢查點// 2. 配置MySQL CDC源MySqlSource<String> userSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.users").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();MySqlSource<String> productSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.products").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();MySqlSource<String> orderSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.orders").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();MySqlSource<String> orderItemSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.order_items").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();// 3. 創建數據流DataStream<String> userStream = env.fromSource(userSource,WatermarkStrategy.noWatermarks(),"User CDC Source");DataStream<String> productStream = env.fromSource(productSource,WatermarkStrategy.noWatermarks(),"Product CDC Source");DataStream<String> orderStream = env.fromSource(orderSource,WatermarkStrategy.noWatermarks(),"Order CDC Source");DataStream<String> orderItemStream = env.fromSource(orderItemSource,WatermarkStrategy.noWatermarks(),"OrderItem CDC Source");// 4. 數據轉換與關聯// 用戶緩存Map<Integer, Map<String, Object>> userCache = new HashMap<>();userStream.map(json -> {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");if (after != null) {int userId = after.get("id").getAsInt();Map<String, Object> userInfo = new HashMap<>();userInfo.put("name", after.get("name").getAsString());userInfo.put("email", after.get("email").getAsString());userCache.put(userId, userInfo);}return json;});// 產品緩存Map<Integer, Map<String, Object>> productCache = new HashMap<>();productStream.map(json -> {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");if (after != null) {int productId = after.get("id").getAsInt();Map<String, Object> productInfo = new HashMap<>();productInfo.put("name", after.get("name").getAsString());productInfo.put("price", after.get("price").getAsDouble());productInfo.put("category", after.get("category").getAsString());productCache.put(productId, productInfo);}return json;});// 訂單與訂單項關聯Map<Integer, List<Map<String, Object>>> orderItemsCache = new HashMap<>();orderItemStream.map(json -> {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");if (after != null) {int orderId = after.get("order_id").getAsInt();int productId = after.get("product_id").getAsInt();Map<String, Object> itemInfo = new HashMap<>();itemInfo.put("product_id", productId);itemInfo.put("quantity", after.get("quantity").getAsInt());itemInfo.put("price", after.get("price").getAsDouble());// 添加產品信息if (productCache.containsKey(productId)) {itemInfo.put("product_name", productCache.get(productId).get("name"));itemInfo.put("category", productCache.get(productId).get("category"));}if (!orderItemsCache.containsKey(orderId)) {orderItemsCache.put(orderId, new ArrayList<>());}orderItemsCache.get(orderId).add(itemInfo);}return json;});// 處理訂單并關聯用戶和訂單項SingleOutputStreamOperator<Map<String, Object>> enrichedOrderStream = orderStream.map(new MapFunction<String, Map<String, Object>>() {@Overridepublic Map<String, Object> map(String json) throws Exception {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");String op = jsonObject.get("op").getAsString();Map<String, Object> orderInfo = new HashMap<>();// 只處理插入和更新事件if ("c".equals(op) || "u".equals(op)) {int orderId = after.get("id").getAsInt();int userId = after.get("user_id").getAsInt();orderInfo.put("order_id", orderId);orderInfo.put("user_id", userId);orderInfo.put("order_time", after.get("order_time").getAsString());orderInfo.put("status", after.get("status").getAsString());orderInfo.put("total_amount", after.get("total_amount").getAsDouble());// 關聯用戶信息if (userCache.containsKey(userId)) {orderInfo.put("user_name", userCache.get(userId).get("name"));orderInfo.put("user_email", userCache.get(userId).get("email"));}// 關聯訂單項if (orderItemsCache.containsKey(orderId)) {orderInfo.put("items", orderItemsCache.get(orderId));}}return orderInfo;}});// 5. 配置Elasticsearch接收器List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("localhost", 9200, "http"));ElasticsearchSink.Builder<Map<String, Object>> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,(request, context, element) -> {if (element.containsKey("order_id")) {request.index("shop_orders").id(element.get("order_id").toString()).source(element);}});// 配置批量寫入esSinkBuilder.setBulkFlushMaxActions(1);  // 每條記錄立即寫入,生產環境可以調大esSinkBuilder.setBulkFlushInterval(1000);  // 每秒刷新一次// 6. 寫入ElasticsearchenrichedOrderStream.addSink(esSinkBuilder.build());// 7. 執行作業env.execute("MySQL to Elasticsearch ETL Job");}
}

步驟三:部署和運行

3.1 編譯打包

使用Maven打包:

mvn clean package

3.2 提交到Flink集群

flink run -c MySQLToElasticsearchETL target/your-jar-file.jar

3.3 驗證數據同步

在Elasticsearch中查詢數據:

curl -X GET "localhost:9200/shop_orders/_search?pretty"

關鍵點和注意事項

  1. 數據一致性

    • 確保開啟Flink的檢查點機制,實現exactly-once語義
    • 合理設置檢查點間隔,平衡一致性和性能
  2. 狀態管理

    • 在上述例子中,我們在內存中維護了用戶和產品的緩存,生產環境應使用Flink的狀態API
    • 考慮狀態大小和清理策略,避免狀態無限增長
  3. 表關聯策略

    • 上述示例使用了簡化的表關聯方式
    • 生產環境可以考慮使用Flink SQL或異步I/O進行優化
  4. 性能優化

    • 調整并行度以匹配業務需求
    • 設置合適的批處理大小和間隔
    • 監控反壓(backpressure)情況
  5. 錯誤處理

    • 添加錯誤處理邏輯,處理數據格式異常
    • 實現重試機制,應對臨時網絡故障
    • 考慮死信隊列(DLQ)來處理無法處理的消息
  6. 監控和告警

    • 接入Prometheus和Grafana監控Flink作業
    • 設置關鍵指標告警,如延遲、失敗次數等
  7. 擴展性考慮

    • 設計時考慮表結構變更的處理方式
    • 為未來增加新數據源或新目標系統預留擴展點

擴展功能

基于這個基礎架構,您可以進一步實現:

  1. 增量更新優化:只同步變更字段,減少網絡傳輸
  2. 歷史數據回溯:支持從特定時間點重新同步數據
  3. 數據轉換:增加復雜的業務計算邏輯
  4. 數據過濾:根據業務規則過濾不需要的數據
  5. 多目標寫入:同時將數據寫入Elasticsearch和其他系統如Kafka

這個完整的方案展示了如何使用Flink CDC構建一個端到端的流式ETL系統,實現從MySQL到Elasticsearch的實時數據同步,同時處理表之間的關聯關系。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/73339.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/73339.shtml
英文地址,請注明出處:http://en.pswp.cn/web/73339.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Docker-Volume數據卷詳講

Docker數據卷-Volume 一&#xff1a;Volume是什么&#xff0c;用來做什么的 當刪除docker容器時&#xff0c;容器內部的文件就會跟隨容器所銷毀&#xff0c;在生產環境中我們需要將數據持久化保存&#xff0c;就催生了將容器內部的數據保存在宿主機的需求&#xff0c;volume …

單片機和微控制器知識匯總——《器件手冊--單片機、數字信號處理器和可編程邏輯器件》

目錄 四、單片機和微控制器 4.1 單片機(MCU/MPU/SOC) 一、定義 二、主要特點 三、工作原理 四、主要類型 五、應用領域 六、選型與設計注意事項 七、發展趨勢 4.2 數字信號處理器(DSP/DSC) ?編輯?編輯 一、定義 二、工作原理 三、結構特點 四、應用領域 五、選型與設計注…

macOS 安裝 Miniconda

macOS 安裝 Miniconda 1. Quickstart install instructions2. 執行3. shell 上初始化 conda4. 關閉 終端登錄用戶名前的 base參考 1. Quickstart install instructions mkdir -p ~/miniconda3 curl https://repo.anaconda.com/miniconda/Miniconda3-latest-MacOSX-arm64.sh -o…

高數下---8.1平面與直線

目錄 平面的確定 直線的確定 若要求某一直線或平面就根據要素來求。 例題 平面中的特殊情況 平面中的解題思路 直線的解題思路 平面的確定 兩要素 一 一點 二 傾斜角 即法向量 點法式 可化為一般式 Ax By Cz D 0; (A,B,C) 即法向量&#xff1b; 改變D 即…

CMS遷移中SEO優化整合步驟詳解

內容概要 在CMS遷移過程中&#xff0c;系統化的規劃與執行是保障SEO排名穩定性的核心。首先需明確遷移流程的關鍵階段&#xff0c;包括數據備份、URL適配、元數據同步及安全配置等環節。其中&#xff0c;數據備份不僅需覆蓋原始數據庫與靜態資源&#xff0c;還需驗證備份文件的…

存儲過程、存儲函數與觸發器詳解(MySQL 案例)

存儲過程、存儲函數與觸發器詳解&#xff08;MySQL 案例&#xff09; 一、存儲過程&#xff08;Stored Procedure&#xff09; 定義 存儲過程是預先編譯好并存儲在數據庫中的一段 SQL 代碼集合&#xff0c;可以接收參數、執行邏輯操作&#xff08;如條件判斷、循環&#xff09;…

Python:進程間的通信,進程的操作隊列

進程間的隊列&#xff1a; 隊列的基本操作&#xff1a; 入隊&#xff1a;將數據放到隊列尾部 出隊&#xff1a;從隊列的頭部取出一個元素 maxsize&#xff1a;隊列中能存放數據個數的上限(整數)&#xff0c;一旦達到上限插入會導致阻塞&#xff0c;直到隊列中的數據被消費掉 …

【C++初階】--- 類與對象(中)

1.類的默認成員函數 默認成員函數就是??沒有顯式實現&#xff0c;編譯器會?動?成的成員函數稱為默認成員函數。?個類&#xff0c;我們不寫的情況下編譯器會默認?成以下6個默認成員函數&#xff0c;我們主要需要掌握前4個&#xff0c;后兩個了解以下即可&#xff0c;默認…

python處理音頻相關的庫

1 音頻信號采集與播放 pyaudio import sys import pyaudio import wave import timeCHUNK 1024 FORMAT pyaudio.paInt16 CHANNELS 1#僅支持單聲道 RATE 16000 RECORD_SECONDS 3#更改錄音時長#錄音函數&#xff0c;生成wav文件 def record(file_name):try:os.close(file_…

[M模擬] lc2711. 對角線上不同值的數量差(對角線遍歷+前后綴分解)

文章目錄 1. 題目來源2. 題目解析 1. 題目來源 鏈接&#xff1a;2711. 對角線上不同值的數量差 前置題&#xff1a; [M模擬] lc3446. 按對角線進行矩陣排序(對角線遍歷公式推導模板題) 矩形的對角線遍歷的基礎題。 題單&#xff1a; 待補充 2. 題目解析 2025年03月25日…

設計一個基于機器學習的光伏發電功率預測模型,以Python和Scikit - learn庫為例

下面為你設計一個基于機器學習的光伏發電功率預測模型&#xff0c;以Python和Scikit - learn庫為例。此模型借助歷史氣象數據和光伏發電功率數據來預測未來的光伏發電功率。 模型設計思路 數據收集&#xff1a;收集歷史氣象數據&#xff08;像溫度、光照強度、濕度等&#xf…

洛谷 P1351 [NOIP 2014 提高組] 聯合權值(樹)

題目描述 無向連通圖 G 有 n 個點&#xff0c;n?1 條邊。點從 1 到 n 依次編號,編號為 i 的點的權值為 Wi?&#xff0c;每條邊的長度均為 1。圖上兩點 (u,v) 的距離定義為 u 點到 v 點的最短距離。對于圖 G 上的點對 (u,v)&#xff0c;若它們的距離為 2&#xff0c;則它們之間…

YoloV8訓練和平精英人物檢測模型

概述 和平精英人物檢測&#xff0c;可以識別游戲中所有人物角色&#xff0c;并通過繪制框將人物選中&#xff0c;訓練的模型僅僅具有識別功能&#xff0c;可以識別游戲中的視頻、圖片等文件&#xff0c;搭配Autox.js可以推理&#xff0c;實現實時繪制&#xff0c;但是對手機性…

智能汽車圖像及視頻處理方案,支持視頻實時拍攝特效能力

在智能汽車日新月異的今天&#xff0c;美攝科技作為智能汽車圖像及視頻處理領域的先行者&#xff0c;憑借其卓越的技術實力和前瞻性的設計理念&#xff0c;為全球智能汽車制造商帶來了一場視覺盛宴的革新。美攝科技推出智能汽車圖像及視頻處理方案&#xff0c;一個集高效性、智…

架構設計之自定義延遲雙刪緩存注解(下)

架構設計之自定義延遲雙刪緩存注解(下) 小薛博客官方架構設計之自定義延遲雙刪緩存注解(下)地址 為了保證Cache和ClearAndReloadCache的靈活性&#xff0c;特意加入EL表達式解析 1、Cache package com.xx.cache;import java.lang.annotation.*; import java.util.concurren…

rosbag|ROS中.bag數據包轉換為matlab中.mat數據類型

代碼見代碼 msg_dict中設置自定義消息類型 test_config中設置需要記錄的具體的值 test_config中topic_name以及message_type照搬plotjuggler打開時的參數 最后生成.mat文件在matlab中進行使用

基于動態 FOF(基金中的基金)策略的基金交易推薦系統的設計與實現思路

下面為你呈現一個基于動態 FOF&#xff08;基金中的基金&#xff09;策略的基金交易推薦系統的設計與實現思路&#xff0c;同時給出一個簡單的 Python 示例代碼。 系統設計 1. 需求分析 收集各類基金的歷史數據&#xff0c;涵蓋凈值、收益率、風險指標等。依據動態 FOF 策略…

搭建主從DNS、nfs、nginx

任務需求&#xff1a; 客戶端通過訪問 www.nihao.com 后&#xff0c;能夠通過 dns 域名解析&#xff0c;訪問到 nginx 服務中由 nfs 共享的首頁文件&#xff0c;內容為&#xff1a;Very good, you have successfully set up the system. 各個主機能夠實現時間同步&#xff0c;…

JS 對象轉數組,數組轉對象

數據格式 objMap : {apiP: 8000, sder: true, host: "1.111", wPort: "1335" }要求&#xff1a;將 objMap 轉化為 數組 const equipArray Object.keys(objMap ).map(key > {return {name: key,value: objMap [key]}打印結果 數組轉為對象 let equipAr…

vue - [Vue warn]: Duplicate keys detected: ‘0‘. This may cause an update error.

問題描述&#xff1a; vue項目中&#xff0c;對表單數組賦值時&#xff0c;控制臺拋出警告&#xff1a; 問題代碼&#xff1a; 問題分析&#xff1a; 1、Vue 要求每個虛擬 DOM 節點必須有唯一的 key。該警告信息通常出現在使用v-for循環的場景中&#xff0c;多個同級節點使用…