1. 數據湖
2. 本地安裝Pyflink和Paimon
- 必須安裝Python 3.11
- Pip install
python -m pip install apache-flink==1.20.1
- 需要手動加入這兩個jar
測試代碼:
import argparse
import logging
import sys
import timefrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtflogging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(message)s")t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")my_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')
""".format("D:/PyCharmWorkspace/PaimonLakeP02/src/basic/words.csv")print(t_env.execute_sql(my_source_ddl))print(t_env.execute_sql("""-- if you're trying out Paimon in a distributed environment,-- the warehouse path should be set to a shared file system, such as HDFS or OSSCREATE CATALOG paimon_catalog WITH ('type'='paimon','warehouse'='D:/PyCharmWorkspace/PaimonLakeP02/src/basic/paimon');
"""))print(t_env.execute_sql("""USE CATALOG paimon_catalog;
"""))print(t_env.execute_sql("""-- create a word count tableCREATE TABLE IF NOT EXISTS word_count (word STRING PRIMARY KEY NOT ENFORCED,cnt BIGINT);
"""))# r=t_env.sql_query("select word from source").execute()
# r.print()
stmt_set = t_env.create_statement_set()
r=stmt_set.add_insert_sql("""
insert into word_count select word, count(1) as `count` from default_catalog.default_database.source group by word
""")
stmt_set.execute().wait()# print sink
t_env.sql_query("select 'another print', * from word_count").execute().print()print("===========end==============")
啟動成功:
Paimon的本地數據文件:
參考資料
安裝指引:Quick Start | Apache Paimon??????
下載包:Downloads | Apache Flink
附錄:遇到的問題
1. Flink2.0 + Paimon
//沒有配套的Paimon庫,會報Sink不匹配異常。