流式ETL配置指南:從MySQL到Elasticsearch的實時數據同步
場景介紹
假設您運營一個電商平臺,需要將MySQL數據庫中的訂單、用戶和產品信息實時同步到Elasticsearch,以支持實時搜索、分析和儀表盤展示。傳統的批處理ETL無法滿足實時性要求,因此我們將使用Flink CDC構建流式ETL管道。
前提條件
- MySQL數據庫 (作為數據源)
- Elasticsearch (作為目標系統)
- Flink環境 (處理引擎)
- Java開發環境
步驟一:環境準備
1.1 準備MySQL環境
-- 創建數據庫
CREATE DATABASE IF NOT EXISTS shop;
USE shop;-- 創建用戶表
CREATE TABLE users (id INT PRIMARY KEY,name VARCHAR(100),email VARCHAR(100),create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- 創建產品表
CREATE TABLE products (id INT PRIMARY KEY,name VARCHAR(200),price DECIMAL(10,2),stock INT,category VARCHAR(100)
);-- 創建訂單表
CREATE TABLE orders (id INT PRIMARY KEY,user_id INT,order_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,status VARCHAR(20),total_amount DECIMAL(10,2),FOREIGN KEY (user_id) REFERENCES users(id)
);-- 創建訂單詳情表
CREATE TABLE order_items (id INT PRIMARY KEY,order_id INT,product_id INT,quantity INT,price DECIMAL(10,2),FOREIGN KEY (order_id) REFERENCES orders(id),FOREIGN KEY (product_id) REFERENCES products(id)
);-- 插入一些測試數據
INSERT INTO users VALUES (1, '張三', 'zhangsan@example.com', '2023-01-01 10:00:00');
INSERT INTO products VALUES (101, 'iPhone 14', 5999.00, 100, '電子產品');
INSERT INTO orders VALUES (1001, 1, '2023-01-05 14:30:00', '已完成', 5999.00);
INSERT INTO order_items VALUES (10001, 1001, 101, 1, 5999.00);
確保MySQL已開啟binlog,編輯MySQL配置文件:
[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
1.2 準備Elasticsearch環境
創建索引映射:
PUT /shop_orders
{"mappings": {"properties": {"order_id": { "type": "keyword" },"user_id": { "type": "keyword" },"user_name": { "type": "keyword" },"user_email": { "type": "keyword" },"order_time": { "type": "date" },"status": { "type": "keyword" },"total_amount": { "type": "double" },"items": {"type": "nested","properties": {"product_id": { "type": "keyword" },"product_name": { "type": "text" },"quantity": { "type": "integer" },"price": { "type": "double" },"category": { "type": "keyword" }}}}}
}
步驟二:創建Flink流式ETL項目
2.1 創建Maven項目
pom.xml
文件配置:
<dependencies><!-- Flink核心依賴 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version></dependency><!-- Flink CDC連接器 --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Elasticsearch連接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>1.17.0</version></dependency><!-- JSON處理 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.9.0</version></dependency>
</dependencies>
2.2 實現ETL主程序
創建MySQLToElasticsearchETL.java
文件:
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class MySQLToElasticsearchETL {public static void main(String[] args) throws Exception {// 1. 設置Flink執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 開發環境設置為1,生產環境根據需要調整env.enableCheckpointing(60000); // 每60秒做一次檢查點// 2. 配置MySQL CDC源MySqlSource<String> userSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.users").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();MySqlSource<String> productSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.products").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();MySqlSource<String> orderSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.orders").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();MySqlSource<String> orderItemSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.order_items").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();// 3. 創建數據流DataStream<String> userStream = env.fromSource(userSource,WatermarkStrategy.noWatermarks(),"User CDC Source");DataStream<String> productStream = env.fromSource(productSource,WatermarkStrategy.noWatermarks(),"Product CDC Source");DataStream<String> orderStream = env.fromSource(orderSource,WatermarkStrategy.noWatermarks(),"Order CDC Source");DataStream<String> orderItemStream = env.fromSource(orderItemSource,WatermarkStrategy.noWatermarks(),"OrderItem CDC Source");// 4. 數據轉換與關聯// 用戶緩存Map<Integer, Map<String, Object>> userCache = new HashMap<>();userStream.map(json -> {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");if (after != null) {int userId = after.get("id").getAsInt();Map<String, Object> userInfo = new HashMap<>();userInfo.put("name", after.get("name").getAsString());userInfo.put("email", after.get("email").getAsString());userCache.put(userId, userInfo);}return json;});// 產品緩存Map<Integer, Map<String, Object>> productCache = new HashMap<>();productStream.map(json -> {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");if (after != null) {int productId = after.get("id").getAsInt();Map<String, Object> productInfo = new HashMap<>();productInfo.put("name", after.get("name").getAsString());productInfo.put("price", after.get("price").getAsDouble());productInfo.put("category", after.get("category").getAsString());productCache.put(productId, productInfo);}return json;});// 訂單與訂單項關聯Map<Integer, List<Map<String, Object>>> orderItemsCache = new HashMap<>();orderItemStream.map(json -> {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");if (after != null) {int orderId = after.get("order_id").getAsInt();int productId = after.get("product_id").getAsInt();Map<String, Object> itemInfo = new HashMap<>();itemInfo.put("product_id", productId);itemInfo.put("quantity", after.get("quantity").getAsInt());itemInfo.put("price", after.get("price").getAsDouble());// 添加產品信息if (productCache.containsKey(productId)) {itemInfo.put("product_name", productCache.get(productId).get("name"));itemInfo.put("category", productCache.get(productId).get("category"));}if (!orderItemsCache.containsKey(orderId)) {orderItemsCache.put(orderId, new ArrayList<>());}orderItemsCache.get(orderId).add(itemInfo);}return json;});// 處理訂單并關聯用戶和訂單項SingleOutputStreamOperator<Map<String, Object>> enrichedOrderStream = orderStream.map(new MapFunction<String, Map<String, Object>>() {@Overridepublic Map<String, Object> map(String json) throws Exception {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");String op = jsonObject.get("op").getAsString();Map<String, Object> orderInfo = new HashMap<>();// 只處理插入和更新事件if ("c".equals(op) || "u".equals(op)) {int orderId = after.get("id").getAsInt();int userId = after.get("user_id").getAsInt();orderInfo.put("order_id", orderId);orderInfo.put("user_id", userId);orderInfo.put("order_time", after.get("order_time").getAsString());orderInfo.put("status", after.get("status").getAsString());orderInfo.put("total_amount", after.get("total_amount").getAsDouble());// 關聯用戶信息if (userCache.containsKey(userId)) {orderInfo.put("user_name", userCache.get(userId).get("name"));orderInfo.put("user_email", userCache.get(userId).get("email"));}// 關聯訂單項if (orderItemsCache.containsKey(orderId)) {orderInfo.put("items", orderItemsCache.get(orderId));}}return orderInfo;}});// 5. 配置Elasticsearch接收器List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("localhost", 9200, "http"));ElasticsearchSink.Builder<Map<String, Object>> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,(request, context, element) -> {if (element.containsKey("order_id")) {request.index("shop_orders").id(element.get("order_id").toString()).source(element);}});// 配置批量寫入esSinkBuilder.setBulkFlushMaxActions(1); // 每條記錄立即寫入,生產環境可以調大esSinkBuilder.setBulkFlushInterval(1000); // 每秒刷新一次// 6. 寫入ElasticsearchenrichedOrderStream.addSink(esSinkBuilder.build());// 7. 執行作業env.execute("MySQL to Elasticsearch ETL Job");}
}
步驟三:部署和運行
3.1 編譯打包
使用Maven打包:
mvn clean package
3.2 提交到Flink集群
flink run -c MySQLToElasticsearchETL target/your-jar-file.jar
3.3 驗證數據同步
在Elasticsearch中查詢數據:
curl -X GET "localhost:9200/shop_orders/_search?pretty"
關鍵點和注意事項
-
數據一致性:
- 確保開啟Flink的檢查點機制,實現exactly-once語義
- 合理設置檢查點間隔,平衡一致性和性能
-
狀態管理:
- 在上述例子中,我們在內存中維護了用戶和產品的緩存,生產環境應使用Flink的狀態API
- 考慮狀態大小和清理策略,避免狀態無限增長
-
表關聯策略:
- 上述示例使用了簡化的表關聯方式
- 生產環境可以考慮使用Flink SQL或異步I/O進行優化
-
性能優化:
- 調整并行度以匹配業務需求
- 設置合適的批處理大小和間隔
- 監控反壓(backpressure)情況
-
錯誤處理:
- 添加錯誤處理邏輯,處理數據格式異常
- 實現重試機制,應對臨時網絡故障
- 考慮死信隊列(DLQ)來處理無法處理的消息
-
監控和告警:
- 接入Prometheus和Grafana監控Flink作業
- 設置關鍵指標告警,如延遲、失敗次數等
-
擴展性考慮:
- 設計時考慮表結構變更的處理方式
- 為未來增加新數據源或新目標系統預留擴展點
擴展功能
基于這個基礎架構,您可以進一步實現:
- 增量更新優化:只同步變更字段,減少網絡傳輸
- 歷史數據回溯:支持從特定時間點重新同步數據
- 數據轉換:增加復雜的業務計算邏輯
- 數據過濾:根據業務規則過濾不需要的數據
- 多目標寫入:同時將數據寫入Elasticsearch和其他系統如Kafka
這個完整的方案展示了如何使用Flink CDC構建一個端到端的流式ETL系統,實現從MySQL到Elasticsearch的實時數據同步,同時處理表之間的關聯關系。