1、環境
JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.20
2、執行python-flink腳本
從kafka的demo獲取消息,并將其中的a字段存入kafka的test_kafka_topic內,并打印sum(b)的值
from pyflink.table import TableEnvironment, EnvironmentSettingsdef log_processing():# 創建流處理環境env_settings = EnvironmentSettings.in_streaming_mode()t_env = TableEnvironment.create(env_settings)# 設置 Kafka 連接器 JAR 文件的路徑# 確保 JAR 文件確實存在于指定路徑,并且與 Flink 版本兼容t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///home/data/flink/flink-1.20.0/lib/flink-sql-connector-kafka-3.3.0-1.20.jar")# 定義源表 DDLsource_ddl = """CREATE TABLE source_table(a VARCHAR,b INT -- 如果 b 字段不重要,可以考慮從源表中移除它) WITH ('connector' = 'kafka','topic' = 'demo','properties.bootstrap.servers' = '192.168.15.130:9092','properties.group.id' = 'test_3','scan.startup.mode' = 'latest-offset','format' = 'json')"""# 定義目標表 DDLsink_ddl = """CREATE TABLE sink_table(a VARCHAR) WITH ('connector' = 'kafka','topic' = 'test_kafka_topic','properties.bootstrap.servers' = '192.168.15.130:9092','format' = 'json')"""# 執行 DDL 語句創建表t_env.execute_sql(source_ddl)#table = t_env.from_path("sql_source")#table.execute().print()table_result = t_env.execute_sql("select sum(b) sb from source_table")table_result.print()t_env.execute_sql(sink_ddl)# 執行 SQL 查詢并將結果插入到目標表# 注意:wait() 方法會阻塞,直到插入操作完成(在流處理中通常是無限的)t_env.sql_query("SELECT a FROM source_table") \.execute_insert("sink_table").wait() # 考慮是否真的需要 wait()if __name__ == '__main__':log_processing()
python3 KafkaSource.py
3、啟動kafka生產者
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.130:9092 --topic demo
輸入模擬數據進行測試
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
可以看到sum(b)值已輸出
4、啟動kafka消費者
查看往test_kafka_topic插入的a字段數據已被消費
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.130:9092 --from-beginning --topic test_kafka_topic