一、Flink與其他組件的協同
Flink 是一個分布式、高性能、始終可用、準確一次(Exactly-Once)語義的流處理引擎,廣泛應用于大數據實時處理場景中。它與 Hadoop 生態系統中的組件可以深度集成,形成完整的大數據處理鏈路。下面我們從 Flink 的 核心架構 出發,結合與 Hadoop 組件協同方式,詳細剖析 Flink 的作用。
1. Flink 核心架構詳解
1)架構組件圖概覽
+-------------------------+
| Client |
+-------------------------+|v
+-------------------------+
| JobManager (JM) | <-- Master 負責調度
+-------------------------+|v
+-------------------------+
| TaskManagers (TM) | <-- Worker 執行算子任務
+-------------------------+|v
+-------------------------+
| Slot | <-- 執行資源單位
+-------------------------+
2)核心組件職責
組件 | 描述 |
---|---|
Client | 提交作業到 Flink 集群,觸發作業執行。 |
JobManager (JM) | 管理作業生命周期,負責調度任務、故障恢復、協調檢查點(Checkpoint)等。 |
TaskManager (TM) | 具體執行作業的物理任務(算子),負責數據交換、狀態管理等。 |
Slot | TaskManager 內部的資源單位,用于任務部署。每個 TaskManager 有多個 Slot。 |
3)狀態管理與容錯
-
Checkpoint/Savepoint:可恢復一致性狀態(Exactly Once)
-
State Backend:保存狀態(如 RocksDB、FsStateBackend)
-
Recovery:通過重放 Checkpoint 恢復任務
2. Flink 與 Hadoop 各組件的協同關系
Flink 雖然是獨立系統,但能與 Hadoop 生態的多個關鍵組件協同工作,構建完整的大數據平臺。
1)與 HDFS(Hadoop Distributed File System)
協同方式 | 描述 |
---|---|
輸入源 | Flink 可直接讀取 HDFS 中的批量數據(如 ORC、Parquet、Text 等格式) |
狀態后端 | Flink Checkpoint/Savepoint 可存儲到 HDFS 上,保證高可用與容災 |
輸出目標 | Flink 作業可以將計算結果輸出到 HDFS,作為后續離線處理的數據 |
fs.defaultFS: hdfs://namenode:8020
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints/
2)與 Hive
協同方式 | 描述 |
---|---|
讀取表數據 | Flink 可通過 Hive Catalog 與 Hive 元數據打通,直接讀取 Hive 表 |
寫入表 | Flink SQL 可將流式數據寫入 Hive(使用 INSERT INTO) |
統一元數據 | Flink + Hive Catalog 支持表結構共享,便于湖倉一體實踐 |
CREATE CATALOG my_hive WITH ('type' = 'hive','hive-conf-dir' = '/etc/hive/conf'
);
3)與 Kafka(實時采集)
協同方式 | 描述 |
---|---|
實時數據源 | Flink 通過 Kafka Source 接收實時數據流(如日志、訂單等) |
下游結果寫入 | Flink 可將流式計算結果寫入 Kafka(供下游消費) |
Exactly Once 語義 | Flink + Kafka + Checkpoint 可實現端到端的精確一次語義 |
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);
4)與 HBase(實時查詢)
協同方式 | 描述 |
---|---|
維表關聯 | Flink 可使用 HBase 作為維表進行流批 Join,實時補充維度數據 |
實時寫入 | 計算結果可實時寫入 HBase,支持下游查詢系統使用(如用戶畫像等) |
tableEnv.executeSql("CREATE TABLE hbase_dim (...) WITH ('connector' = 'hbase-2.2', ...)");
5)與 YARN
協同方式 | 描述 |
---|---|
資源調度 | Flink 可部署在 YARN 上,利用 Hadoop 的資源調度管理能力 |
Session / Per-Job 模式 | 支持多租戶資源隔離或每個作業獨立資源隔離部署 |
flink run -m yarn-cluster -ynm my-flink-job myjob.jar
6)與 Zookeeper
協同方式 | 描述 |
---|---|
高可用 JobManager | 使用 Zookeeper 實現 JobManager 的 leader election |
Checkpoint HA 元數據存儲 | 配合 HDFS 存儲 Checkpoint 元數據路徑 |
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs://namenode:8020/flink/ha/
3. Flink 的作用總結
模塊 | Flink 的角色 |
---|---|
實時數據處理 | 核心組件,進行低延遲、高吞吐流處理計算 |
數據清洗與 ETL | 提供強大 SQL / DataStream API 進行多源數據處理與聚合 |
實時指標計算 | 支持實時 KPI、UV/PV、訂單流等分析 |
數據湖構建 | 可作為流式數據入湖的計算引擎(結合 Hudi/Iceberg) |
實時監控預警 | 搭配 Kafka + Prometheus,構建告警與監控系統 |
實時數倉建設 | 聯合 Kafka + Hive + HDFS + HBase 構建流批一體數倉體系 |
4. Flink 架構在 Hadoop 平臺的實際部署圖
+-------------+| Flume/Nginx|+------+------+|Kafka集群|+-------------------+--------------------+| |+---v---+ +----v----+| Flink |--> 清洗 → 維表 Join → 計算 | Spark |+---+---+ +----+----+| |
+-------v---------+ +--------v--------+
| HBase/Redis | | HDFS / Hive |
+-----------------+ +-----------------+
二、Flink DataStream API的使用
現在以 Flink DataStream API 為核心,深入剖析一個真實生產場景的 從 Kafka 到 Kafka 的流式處理全流程,包括:
-
項目結構與依賴
-
數據模型與清洗
-
水位線與亂序處理
-
異步維表查詢(HBase/MySQL/Redis)
-
窗口聚合邏輯
-
數據下發(Kafka Sink)
-
容錯機制與 Checkpoint 配置
1. 項目結構與依賴
1)Maven 依賴(pom.xml
)
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.0.1-1.17</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-2.2</artifactId><version>1.17.1</version></dependency>
</dependencies>
2. 數據模型定義
1)訂單數據結構(OrderEvent)
public class OrderEvent {public String orderId;public String userId;public String productId;public double price;public int quantity;public long orderTime; // epoch millis
}
2) 商品維度(ProductInfo)
public class ProductInfo {public String productId;public String categoryId;public String productName;
}
3)聚合結果結構(OrderStat)
public class OrderStat {public String categoryId;public long windowStart;public long windowEnd;public double totalAmount;
}
3. Kafka Source + JSON 反序列化
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("kafka:9092").setTopics("order_events").setGroupId("flink-consumer").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<OrderEvent> orderStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource").map(json -> new ObjectMapper().readValue(json, OrderEvent.class)).returns(OrderEvent.class);
4. 水位線處理(亂序數據支持)
WatermarkStrategy<OrderEvent> watermarkStrategy = WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, ts) -> event.orderTime);DataStream<OrderEvent> orderStreamWithWM = orderStream.assignTimestampsAndWatermarks(watermarkStrategy);
5. 異步維表關聯(以 HBase 為例)
使用 AsyncFunction
實現異步查詢(支持 Redis/HBase/MySQL)
示例實現:AsyncProductEnrichmentFunction
public class AsyncProductEnrichmentFunction extends RichAsyncFunction<OrderEvent, Tuple2<OrderEvent, ProductInfo>> {private transient HBaseClient hBaseClient;@Overridepublic void open(Configuration parameters) throws Exception {hBaseClient = new HBaseClient("hbase.zookeeper.quorum");}@Overridepublic void asyncInvoke(OrderEvent input, ResultFuture<Tuple2<OrderEvent, ProductInfo>> resultFuture) {CompletableFuture.supplyAsync(() -> hBaseClient.queryProductInfo(input.productId)).thenAccept(productInfo -> resultFuture.complete(Collections.singletonList(Tuple2.of(input, productInfo))));}@Overridepublic void close() throws Exception {hBaseClient.close();}
}
應用異步函數
DataStream<Tuple2<OrderEvent, ProductInfo>> enrichedStream = AsyncDataStream.unorderedWait(orderStreamWithWM,new AsyncProductEnrichmentFunction(),5, TimeUnit.SECONDS, 100
);
6. 按類目 ID 滾動窗口聚合
DataStream<OrderStat> resultStream = enrichedStream.map(tuple -> new Tuple3<>(tuple.f1.categoryId, tuple.f0.orderTime, tuple.f0.price * tuple.f0.quantity)).returns(Types.TUPLE(Types.STRING, Types.LONG, Types.DOUBLE)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Long, Double>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((t, ts) -> t.f1)).keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(1))).aggregate(new AggregateFunction<Tuple3<String, Long, Double>, Double, OrderStat>() {private long windowStart, windowEnd;private String categoryId;public Double createAccumulator() { return 0.0; }public Double add(Tuple3<String, Long, Double> value, Double acc) {categoryId = value.f0;return acc + value.f2;}public OrderStat getResult(Double acc) {return new OrderStat(categoryId, windowStart, windowEnd, acc);}public Double merge(Double acc1, Double acc2) {return acc1 + acc2;}}, new ProcessWindowFunction<OrderStat, OrderStat, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<OrderStat> elements, Collector<OrderStat> out) {OrderStat stat = elements.iterator().next();stat.windowStart = context.window().getStart();stat.windowEnd = context.window().getEnd();out.collect(stat);}});
7. 寫入 Kafka Sink
KafkaSink<OrderStat> kafkaSink = KafkaSink.<OrderStat>builder().setBootstrapServers("kafka:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("order_stats").setValueSerializationSchema(stat -> {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsBytes(stat);}).build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).build();resultStream.sinkTo(kafkaSink);
8.? 容錯與 HA 配置(關鍵)
1)Checkpoint 配置
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode/flink/checkpoints"));
2)高可用配置(flink-conf.yaml)
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181
state.checkpoints.dir: hdfs://namenode/flink/checkpoints
state.savepoints.dir: hdfs://namenode/flink/savepoints
9. 運行命令(on YARN)
flink run -m yarn-cluster -c com.company.OrderRealtimeJob your-job.jar
10. 監控與排障建議
工具 | 功能 |
---|---|
Flink Web UI | 監控 Task、Checkpoint、Watermark |
Prometheus | 指標采集 |
Grafana | 可視化 |
AlertManager | 告警配置 |
Savepoint | 容錯恢復點 |
三、FlinkCDC實時采集數據入湖
解析Flink CDC(Change Data Capture)在大數據體系中的使用方法,并結合 Kafka、Hudi、Iceberg、Hive、HDFS 等大數據組件,提供一套 可落地、可執行、可擴展的完整集成方案。
1. Flink CDC 簡介
Flink CDC 是 Apache Flink + Debezium 的組合,用于實時采集 MySQL/PostgreSQL 等數據庫的變更數據(INSERT/UPDATE/DELETE),并以 流式方式傳遞到下游系統(Kafka、Hudi、Iceberg、HBase 等)。
2. 典型架構場景:Flink CDC + Hudi + Hive 實時數據湖方案
+-------------+ +---------------------+| MySQL/Postgres | || Source DB +--------> | Flink CDC Connector |+-------------+ | |+----------+----------+|| Row-level ChangeLogv+----------+----------+| Flink Job || (數據清洗/處理) |+----------+----------+|v+----------+----------+| Hudi Sink (Flink) |+----------+----------+|v+-------------+-------------+| Hive / Presto / Trino || 實時查詢(支持 ACID) |+---------------------------+
3. 方案目標
-
實時采集 MySQL 數據(基于 Binlog)
-
支持變更(Insert/Update/Delete)語義
-
數據存入 Hudi 表(支持 MOR/COW 格式)
-
Hive/Presto 端可直接查詢
4. 組件版本建議
組件 | 版本建議 |
---|---|
Flink | 1.17.x 或 1.18.x |
Flink CDC | 2.4.1 |
Debezium | 內嵌于 Flink CDC |
Hudi | 0.13.1+ |
Hive | 2.3.x / 3.1.x |
Hadoop/HDFS | 3.x |
5. 部署準備
1)安裝 Kafka(可選)
用于做 CDC 中轉(可選,支持 Flink 直接接 Hudi)
2)安裝 Hive Metastore + Hadoop HDFS
用于管理 Hudi 表元數據和 HDFS 存儲
3)準備 MySQL 源數據庫
配置 binlog,設置 binlog_format = ROW
,并開啟 server_id
、binlog_row_image = full
6. 關鍵配置代碼與步驟
1)添加 Maven 依賴
<dependencies><!-- Flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.1</version></dependency><!-- Hudi Sink --><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink-bundle_2.12</artifactId><version>0.13.1</version></dependency>
</dependencies>
2)Flink SQL 示例(CDC → Hudi)
-- 1. 源表:MySQL CDC 表
CREATE TABLE ods_orders (id STRING,user_id STRING,amount DOUBLE,ts TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = 'mysql-host','port' = '3306','username' = 'flink','password' = 'flink123','database-name' = 'srm','table-name' = 'orders','scan.startup.mode' = 'initial'
);-- 2. 目標表:Hudi 表(MOR 模式)
CREATE TABLE dwd_orders (id STRING PRIMARY KEY NOT ENFORCED,user_id STRING,amount DOUBLE,ts TIMESTAMP(3)
) PARTITIONED BY (`user_id`)
WITH ('connector' = 'hudi','path' = 'hdfs://namenode/data/hudi/dwd_orders','table.type' = 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field' = 'id','write.tasks' = '4','compaction.async.enabled' = 'true','hive_sync.enabled' = 'true','hive_sync.mode' = 'hms','hive_sync.metastore.uris' = 'thrift://hive-metastore:9083','hive_sync.db' = 'ods','hive_sync.table' = 'dwd_orders'
);-- 3. 實時寫入
INSERT INTO dwd_orders
SELECT * FROM ods_orders;
7. 關鍵功能說明
功能 | 配置字段 | 說明 |
---|---|---|
主鍵變更支持 | PRIMARY KEY ... NOT ENFORCED | 支持 upsert |
增量采集模式 | scan.startup.mode = initial | 首次全量 + 后續增量 |
實時 compaction | compaction.async.enabled = true | MOR 表性能保障 |
Hive 數據同步 | hive_sync.enabled = true | Hudi 自動注冊 Hive 元數據 |
8. 整合優化建議
1)多表 CDC 同步統一處理
使用 Flink CDC 的 schema-name.table-name
通配符:
'database-name' = 'srm',
'table-name' = '.*',
配合 Flink SQL Catalog + Dynamic Table Factory,可實現一拖 N 的多表處理邏輯。
2)增加清洗邏輯(如空值過濾、轉換)
SELECTid,user_id,amount * 1.13 AS amount_tax,ts
FROM ods_orders
WHERE amount IS NOT NULL;
3)寫入 Kafka(替代 Hudi) → 用于事件總線或下游消費
CREATE TABLE kafka_sink (id STRING,user_id STRING,amount DOUBLE,ts TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'ods.orders','properties.bootstrap.servers' = 'kafka:9092','format' = 'json','scan.startup.mode' = 'latest-offset'
);
9. Flink CDC 整合場景匯總
場景 | 描述 | 推薦組件 |
---|---|---|
實時數據入湖 | MySQL → Hudi | Flink CDC + Hudi |
數據倉庫加速 | Oracle → Iceberg | Flink CDC + Iceberg |
數據中臺構建 | MySQL → Kafka → 多下游 | Flink CDC + Kafka |
數據回流校驗 | Kafka → Flink → MySQL | Flink SQL + JDBC Sink |
DWD建模 | ODS → DWD/DWM → ADS | Flink SQL + 維表 JOIN |
10. 可視化監控
工具 | 功能 |
---|---|
Flink UI | Checkpoint、Watermark、吞吐 |
Prometheus | 指標采集 |
Grafana | 監控儀表盤 |
Hive | SQL 查詢驗證 |
四、自定義 Flink CDC Job 的完整實現
自定義 Flink CDC Job 的完整實現,采用 Java DataStream API 編寫,支持:
-
多表接入(MySQL 為例)
-
自定義清洗、轉換邏輯
-
支持寫入 Kafka、Hudi、Iceberg 等下游系統
-
可部署為標準 Flink 應用(
flink run
執行)
1. 自定義 Flink CDC Job 場景說明
目標:
-
從 MySQL 采集訂單表
srm.orders
-
做清洗(如金額換算、字段過濾)
-
輸出到 Hudi 表(或 Kafka/Console)
2. 依賴配置(Maven)
<dependencies><!-- Flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.1</version></dependency><!-- Flink 通用 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.2</version></dependency><!-- 可選:Sink 依賴,如 Kafka、Hudi、Iceberg -->
</dependencies>
3. 完整代碼示例:CustomCdcJob.java
public class CustomCdcJob {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 配置 CDC 源:MySQLMySqlSource<Order> mysqlSource = MySqlSource.<Order>builder().hostname("mysql-host").port(3306).databaseList("srm").tableList("srm.orders").username("flink").password("flink123").deserializer(new OrderDeserializationSchema()) // 自定義反序列化.build();// 3. 接入 SourceDataStreamSource<Order> orderStream = env.fromSource(mysqlSource,WatermarkStrategy.noWatermarks(),"MySQL CDC Source");// 4. 數據清洗/轉換SingleOutputStreamOperator<Order> cleaned = orderStream.filter(order -> order.amount > 0).map(order -> {order.amount = order.amount * 1.13; // 加稅return order;});// 5. Sink:控制臺 / Kafka / Hudicleaned.print();env.execute("Custom Flink CDC Job");}
}
4. 自定義反序列化器:OrderDeserializationSchema
public class OrderDeserializationSchema implements DebeziumDeserializationSchema<Order> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<Order> collector) {Struct value = (Struct) sourceRecord.value();if (value == null) return;Struct after = value.getStruct("after");if (after != null) {Order order = new Order();order.id = after.getString("id");order.userId = after.getString("user_id");order.amount = after.getFloat64("amount");order.ts = Instant.ofEpochMilli(after.getInt64("ts")).atZone(ZoneId.of("UTC")).toLocalDateTime();collector.collect(order);}}@Overridepublic TypeInformation<Order> getProducedType() {return TypeInformation.of(Order.class);}
}
5. 定義 POJO 類:Order.java
public class Order implements Serializable {public String id;public String userId;public Double amount;public LocalDateTime ts;@Overridepublic String toString() {return String.format("[Order] id=%s, user=%s, amt=%.2f, ts=%s",id, userId, amount, ts.toString());}
}
6. Sink 可選方案
1)控制臺輸出(開發調試)
cleaned.print();
2)Kafka Sink(事件總線)
KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("kafka:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("srm.orders.cdc").setValueSerializationSchema(new SimpleStringSchema()).build()).build();cleaned.map(order -> JSON.toJSONString(order)).sinkTo(kafkaSink);
3)寫入 Hudi 表(通過 Flink Hudi Sink)
cleaned.addSink(HudiSinkUtil.getSink());
自定義 Hudi Sink 工具類可基于 HoodieSink
封裝。
七、打包部署方式
1)使用 maven-shade-plugin
打 fat-jar:
mvn clean package -DskipTests
輸出:custom-cdc-job-1.0-SNAPSHOT.jar
2)提交到 Flink 集群
flink run -m yarn-cluster -c com.example.CustomCdcJob custom-cdc-job.jar
8. 擴展功能(可選)
功能 | 實現方式 |
---|---|
多表同步 | .tableList("srm.orders,srm.invoice") |
動態 schema 推導 | 使用 JsonDebeziumDeserializationSchema |
維表 join | Flink SQL / Broadcast Join |
自定義狀態存儲 | Flink KeyedState |
exactly-once 寫入 Kafka/Hudi | 使用 checkpoint 支持 |