舉個例子
查詢 source 表,同時執行計算
# 通過 Table API 創建一張表:
source_table = table_env.from_path("datagen")
# 或者通過 SQL 查詢語句創建一張表:
source_table = table_env.sql_query("SELECT * FROM datagen")
result_table = source_table.select(source_table.id + 1, source_table.data)
Table API 查詢
Table 對象有許多方法,可以用于進行關系操作。
這些方法返回新的 Table 對象,表示對輸入 Table 應用關系操作之后的結果。
這些關系操作可以由多個方法調用組成,例如 table.group_by(...).select(...)。
Table API 文檔描述了流和批處理上所有支持的 Table API 操作。
以下示例展示了一個簡單的 Table API 聚合查詢:
from pyflink.table import Environmentsettings, TableEnvironment
# 通過 batch table environment 來執行查詢
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],['name', 'country', 'revenue'])
# 計算所有來自法國客戶的收入
revenue = orders \.select(orders.name, orders.country, orders.revenue) \.where(orders.country == 'FRANCE') \.group_by(orders.name) \.select(orders.name, orders.revenue.sum.alias('rev_sum'))
revenue.to_pandas()
Table API 也支持行操作的 API, 這些行操作包括 Map Operation, FlatMap Operation, Aggregate Operation 和 FlatAggregate Operation.
以下示例展示了一個簡單的 Table API 基于行操作的查詢
from pyflink.table import Environmentsettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd# 通過 batch table environment 來執行查詢
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)], ['name', 'country', 'revenue'])
map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),result_type=DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("revenue", DataTypes.BIGINT())]),func_type="pandas")
orders.map(map_function).alias('name', 'revenue').to_pandas()
SQL 查詢
Flink 的 SQL 基于 Apache Calcite,它實現了標準的 SQL。SQL 查詢語句使用字符串來表達。SQL 支持Flink 對流和批處理。
下面示例展示了一個簡單的 SQL 聚合查詢:
from pyflink.table import Environmentsettings, TableEnvironment# 通過 stream table environment 來執行查詢env_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='8','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='11')""")table_env.execute_sql("""CREATE TABLE print_sink (id BIGINT,data_sum TINYINT) WITH ('connector' = 'print')""")table_env.execute_sql("""INSERT INTO print_sinkSELECT id, sum(data) as data_sum FROM(SELECT id / 2 as id, data FROM random_source)WHERE id > 1GROUP BY id""").wait()
Table API 和 SQL 的混合使用
Table API 中的 Table 對象和 SQL 中的 Table 可以自由地相互轉換。
下面例子展示了如何在 SQL 中使用 Table 對象:
create_temporary_view(view_path, table)? 將一個 `Table` 對象注冊為一張臨時表,類似于 SQL 的臨時表。
# 創建一張 sink 表來接收結果數據
table_env.execute_sql("""CREATE TABLE table_sink (id BIGINT,data VARCHAR) WITH ('connector' = 'print')
""")
# 將 Table API 表轉換成 SQL 中的視圖
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)
# 將 Table API 表的數據寫入結果表
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()
下面例子展示了如何在 Table API 中使用 SQL 表:
sql_query(query)?? 執行一條 SQL 查詢,并將查詢的結果作為一個 `Table` 對象。
# 創建一張 SQL source 表
table_env.execute_sql("""CREATE TABLE sql_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='4','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='7')
""")# 將 SQL 表轉換成 Table API 表
table = table_env.from_path("sql_source")
# 或者通過 SQL 查詢語句創建表
table = table_env.sql_query("SELECT * FROM sql_source")
# 將表中的數據寫出
table.to_pandas()
優化
數據傾斜
當數據發生傾斜(某一部分數據量特別大),雖然沒有GC(Gabage Collection,垃圾回收),但是task執行時間嚴重不一致。
- 需要重新設計key,以更小粒度的key使得task大小合理化。
- 修改并行度。
- 調用rebalance操作,使數據分區均勻。
緩沖區超時設置
由于task在執行過程中存在數據通過網絡進行交換,數據在不同服務器之間傳遞的緩沖區超時時間可以通過setBufferTimeout進行設置。
當設置“setBufferTimeout(-1)”,會等待緩沖區滿之后才會刷新,使其達到最大吞吐量;當設置“setBufferTimeout(0)”時,可以最小化延遲,數據一旦接收到就會刷新;當設置“setBufferTimeout”大于0時,緩沖區會在該時間之后超時,然后進行緩沖區的刷新。
示例可以參考如下:
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);