-- 創建源表,使用 RAW 格式接收原始 JSON 數據
CREATE TABLE source_kafka (
id STRING,
`data` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'source_kafka-topic',
'properties.bootstrap.servers' = 'master01:9092',
'properties.group.id' = 'flink-kafka-group',
'scan.startup.mode' = 'latest-offset',
'key.format' = 'csv',
'key.fields' = 'id',
'value.format' = 'raw',
'value.fields-include' = 'EXCEPT_KEY'
);-- 創建目標表,同樣使用 RAW 格式保持數據原樣
CREATE TABLE sink_kafka (
id STRING,
`data` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'sink-kafka-topic',
'properties.bootstrap.servers' = 'master02:9092',
'key.format' = 'csv',
'key.fields' = 'id',
'value.format' = 'raw',
'value.fields-include' = 'EXCEPT_KEY'
);-- 執行復制操作
INSERT INTO sink_kafka
SELECT * FROM source_kafka;
- value.fields-include
定義消息體(Value)格式如何處理消息鍵(Key)字段的策略。 默認情況下,表結構中 ‘ALL’ 即所有的字段都會包含在消息體格式中,即消息鍵字段在消息鍵和消息體格式中都會出現。
- 參考文檔:
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/kafka/#%E6%B6%88%E6%81%AF%E9%94%AEkey%E4%B8%8E%E6%B6%88%E6%81%AF%E4%BD%93value%E7%9A%84%E6%A0%BC%E5%BC%8F
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/overview/