這是一個使用 Flink SQL 從 Kafka 中消費數據并寫入 MySQL 的示例。在這個示例中,我們將假設有一個 Kafka 主題 “input_topic”,它產生格式為 (user_id: int, item_id: int, behavior: string, timestamp: long) 的數據,我們需要把這些數據寫入名為"output_table"的 MySQL 表中。
首先,我們需要創建表達 Kafka 和 MySQL 的源和目標表條目:
CREATE TABLE kafka_source (user_id INT,item_id INT,behavior STRING,timestamp AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000, 'yyyy-MM-dd HH:mm:ss')),WATERMARK FOR timestamp as timestamp - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'input_topic','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);CREATE TABLE mysql_sink (user_id INT,item_id INT,behavior STRING,timestamp TIMESTAMP(3)
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/test','table-name' = 'output_table','username' = 'test','password' = '123456'
);
進行消費 Kafka 和寫入 MySQL 的操作:
INSERT INTO mysql_sink SELECT * FROM kafka_source;
首先,運行上述 SQL 創建對應 Kafka 和 MySQL 的表。其次,運行 INSERT INTO 語句進行從 Kafka 消費數據并將結果寫入 MySQL 表的操作。這是一個基本的操作,你可以根據自己的需求進行相應的修改。
注意,需要根據實際的 Kafka 和 MySQL 的配置,如地址、用戶名和密碼,來修改上述 SQL 語句中的對應部分。
另外,Flink SQL 對 SQL 語句的語法進行了些許改變以適應流處理的特性,如在 kafka_source 表中的 WATERMARK 和 timestamp 的定義。
這個例子中,你需要確保已經引入了 flink-connector-kafka_2.11、flink-connector-jdbc_2.11 和 mysql-connector-java 等相關的 jar 包依賴。