1. TableEnvironment
創建 TableEnvironment
from pyflink.table import Environmentsettings, TableEnvironment# create a streaming TableEnvironmentenv_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)# or create a batch TableEnvironmentenv_settings = Environmentsettings.in_batch_mode()table_env = TableEnvironment.create(env_settings)
TableEnvironment 是 Table API 和 SQL 集成的核心概念。
TableEnvironment 可以用來:
- ·創建 Table
- ·將 Table 注冊成臨時表
- ·執行 SQL 查詢
- ·注冊用戶自定義的 (標量,表值,或者聚合) 函數
- ·配置作業
- ·管理 Python 依賴
- ·提交作業執行
創建 source 表
table_env.execute_sql("""CREATE TABLE datagen (id INT,data STRING) WITH ('connector' = 'datagen','fields.id.kind' = 'sequence','fields.id.start' = '1','fields.id.end' = '10')""")
創建 sink 表
table_env.execute_sql("""CREATE TABLE print (id INT,data STRING) WITH ('connector' = 'print')""")
2. Table
Table 是 Python Table API 的核心組件。Table 是 Table API 作業中間結果的邏輯表示。
一個 Table 實例總是與一個特定的 TableEnvironment 相綁定。
不支持在同一個查詢中合并來自不同 TableEnvironments 的表,例如 join 或者 union 它們。
通過列表類型的對象創建
你可以使用一個列表對象創建一張表:
from pyflink.table import Environmentsettings, TableEnvironment# 創建 批 TableEnvironmentenv_settings = Environmentsettings.in_batch_mode()table_env = TableEnvironment.create(env_settings)table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])table.to_pandas()==>print(table.to_pandas())table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])print(table.to_pandas())
通過 DDL 創建
你可以通過 DDL 創建一張表,execute_sql(stmt) 執行指定的語句并返回執行結果。
執行語句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。
注意,對于 "INSERT INTO" 語句,這是一個異步操作,通常在向遠程集群提交作業時才需要使用。
但是,如果在本地集群或者 IDE 中執行作業時,你需要等待作業執行完成。
from pyflink.table import Environmentsettings, TableEnvironment# 創建流 TableEnvironmentenv_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='3','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='6')""")table = table_env.from_path("random_source")table.to_pandas()
通過 Catalog 創建
Catalog
Catalog提供了元數據信息,例如數據庫、表、分區、視圖以及數據庫或其他外部系統中存儲的函數和信息。
數據處理最關鍵的方面之一是管理元數據。
元數據可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF。
元數據也可以是持久化的,例如 Hive Metastore 中的元數據。
Catalog 提供了一個統一的API,用于管理元數據,并使其可以從 Table API 和 SQL 查詢語句中來訪問。
Catalog類型
GenericInMemoryCatalog | 基于內存實現的 Catalog,所有元數據只在 session 的生命周期內可用。 |
JdbcCatalog | JdbcCatalog使得用戶可以將 Flink 通過 JDBC 協議連接到關系數據庫。 PostgresCatalog 是當前實現的唯一一種 JDBC Catalog。 |
HiveCatalog | HiveCatalog 有兩個用途:作為原生 Flink 元數據的持久化存儲,以及作為讀寫現有 Hive 元數據的接口。 |
警告 Hive Metastore 以小寫形式存儲所有元數據對象名稱,GenericInMemoryCatalog 區分大小寫。
用戶自定義 Catalog
Catalog 是可擴展的,用戶可以通過實現 Catalog 接口來開發自定義 Catalog。
想要在 SQL CLI 中使用自定義 Catalog,用戶除了需要實現自定義的 Catalog 之外,還需要為這個 Catalog 實現對應的 CatalogFactory 接口。
CatalogFactory 定義了一組屬性,用于 SQL CLI 啟動時配置 Catalog。
這組屬性集將傳遞給發現服務,在該服務中,服務會嘗試將屬性關聯到 CatalogFactory 并初始化相應的 Catalog 實例。
創建 Flink 表并將其注冊到 Catalog
使用 SQL DDL
用戶可以使用 DDL 通過 Table API 或者 SQL Client 在 Catalog 中創建表。
from pyflink.table.catalog import HiveCatalog# Create a HiveCatalogcatalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")# Register the catalogt_env.register_catalog("myhive", catalog)# Create a catalog databaset_env.execute_sql("CREATE DATABASE mydb WITH (...)")# Create a catalog tablet_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")# should return the tables in current catalog and database.t_env.list_tables()
通過 SQL DDL 創建的表和視圖, 例如 “create table …” 和 “create view …",都存儲在 catalog 中。
你可以通過 SQL 直接訪問 catalog 中的表。
使用 Java/Scala
用戶可以用編程的方式使用Java 或者 Scala 來創建 Catalog 表。
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
from pyflink.table.descriptors import Kafka
settings = Environmentsettings.in_batch_mode()
t_env = TableEnvironment.create(settings)
# Create a HiveCatalog
catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
# Register the catalog
t_env.register_catalog("myhive", catalog)
# Create a catalog database
database = CatalogDatabase.create_instance({"k1": "v1"}, None)
catalog.create_database("mydb", database)
# Create a catalog table
schema = Schema.new_builder() \.column("name", DataTypes.STRING()) \.column("age", DataTypes.INT()) \.build()??
catalog_table = t_env.create_table("myhive.mydb.mytable", TableDescriptor.for_connector("kafka").schema(schema)// ….build())
# tables should contain "mytable"
tables = catalog.list_tables("mydb")
TableEnvironment 維護了一個使用標識符創建的表的 catalogs 映射。
Catalog 中的表既可以是臨時的,并與單個 Flink 會話生命周期相關聯,也可以是永久的,跨多個 Flink 會話可見。
如果你要用 Table API 來使用 catalog 中的表,可以使用 “from_path” 方法來創建 Table API 對象:
from_path(path)?? 通過指定路徑下已注冊的表來創建一個表,例如通過 create_temporary_view 注冊表。