Paimon 支持 Flink 1.17, 1.16, 1.15 和 1.14,當前 Paimon 提供了兩類 Jar 包,一類支持數據讀寫,另一類支持其它操作(compaction)
Version Type Jar
Flink 1.18 Bundled Jar paimon-flink-1.18-0.7.0-incubating.jar
Flink 1.17 Bundled Jar paimon-flink-1.17-0.7.0-incubating.jar
Flink 1.16 Bundled Jar paimon-flink-1.16-0.7.0-incubating.jar
Flink 1.15 Bundled Jar paimon-flink-1.15-0.7.0-incubating.jar
Flink 1.14 Bundled Jar paimon-flink-1.14-0.7.0-incubating.jar
Flink Action Action Jar paimon-flink-action-0.7.0-incubating.jar
1.環境準備
下載 Flink 后解壓
tar -xzf flink-*.tgz
拷貝 Paimon bundled jar 包到 Flink 的 lib 目錄下
cp paimon-flink-*.jar <FLINK_HOME>/lib/
拷貝 Hadoop Bundled Jar 包到 Flink 的 lib 目錄下
cp flink-shaded-hadoop-2-uber-*.jar <FLINK_HOME>/lib/
為同時運行多個Flink作業,修改/conf/flink-conf.yaml
中的集群配置
taskmanager.numberOfTaskSlots: 2
本地啟動 Flink 集群
<FLINK_HOME>/bin/start-cluster.sh
驗證 Web UI 查看集群是否已啟動并運行
localhost:8081
啟動Flink SQL客戶端來執行SQL腳本
<FLINK_HOME>/bin/sql-client.sh
2.創建 Paimon Catalog 和 Table
創建 Catalog 和 Table
-- if you're trying out Paimon in a distributed environment,
-- the warehouse path should be set to a shared file system, such as HDFS or OSS
CREATE CATALOG my_catalog WITH ('type'='paimon','warehouse'='file:/tmp/paimon'
);USE CATALOG my_catalog;-- create a word count table
CREATE TABLE word_count (word STRING PRIMARY KEY NOT ENFORCED,cnt BIGINT
);
3.使用Flink通用的Catalog創建Table
使用Flink通用的Catalog,需要使用Hive metastore,然后可以使用Paimon、Hive和Flink通用表(Kafka和其他表)中的所有表。
在此模式下,應該使用"connector"選項來創建tables。
Paimon將在hive-site.xml
中使用hive.metastore.warehouse.dir
,需要使用帶有scheme的path,例如,hdfs://...
.否則,Paimon將使用本地路徑。
CREATE CATALOG my_catalog WITH ('type'='paimon-generic','hive-conf-dir'='...','hadoop-conf-dir'='...'
);USE CATALOG my_catalog;-- create a word count table
CREATE TABLE word_count (word STRING PRIMARY KEY NOT ENFORCED,cnt BIGINT
) WITH ('connector'='paimon'
);
4.創建 Source 數據表
-- create a word data generator table
CREATE TEMPORARY TABLE word_table (word STRING
) WITH ('connector' = 'datagen','fields.word.length' = '1'
);-- paimon requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
5.數據查詢案例
OLAP查詢
-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';-- olap query the table
SELECT * FROM word_count;
流式查詢
-- switch to streaming mode
SET 'execution.runtime-mode' = 'streaming';-- track the changes of table and calculate the count interval statistics
SELECT `interval`, COUNT(*) AS interval_cnt FROM(SELECT cnt / 10000 AS `interval` FROM word_count) GROUP BY `interval`;
6.退出Flink SQL客戶端,停止Flink集群
退出Flink SQL客戶端
-- uncomment the following line if you want to drop the dynamic table and clear the files
-- DROP TABLE word_count;-- exit sql-client
EXIT;
停止Flink集群
./bin/stop-cluster.sh
7.觸發 Savepoint 和 recover
由于Paimon有自己的snapshot管理,可能與Flink的checkpoint管理相沖突,在從savepoint恢復時會導致異常(不會導致存儲損壞)。
建議使用以下方法開啟savepoint:
使用Stop with savepoint。
使用 Tag with savepoint,并在從savepoint恢復之前rollback-to-tag。
8.使用Action Jar
Flink本地集群啟動后,使用以下命令執行 action jar
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \<action><args>
compact 一張 table
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \compact \--path <TABLE_PATH>
9.支持的Flink數據類型
支持所有Flink數據類型,除了
MULTISET
不受支持。MAP
不支持作為主鍵。
10.使用Flink Managed Memory
Paimon 任務可以創建 memory pools,基于Flink executor管理的 executor memory , 像Flink任務管理的 managed memory。
通過 executor 管理的多個任務的 writer buffers 可以提升 sinks 的穩定性和性能。
使用 Flink managed memory 的配置如下:
Option | Default | Description |
---|---|---|
sink.use-managed-memory-allocator | false | If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator, which means each task allocates and manages its own memory pool (heap memory), if there are too many tasks in one Executor, it may cause performance issues and even OOM. |
sink.managed.writer-buffer-memory | 256M | Weight of writer buffer in managed memory, Flink will compute the memory size, for writer according to the weight, the actual memory used depends on the running environment. Now the memory size defined in this property are equals to the exact memory allocated to write buffer in runtime. |
在SQL中為Flink Managed Memory設置內存權重,然后Flink sink operator將獲得memory pool大小,并為Paimon writer創建allocator。
INSERT INTO paimon_table /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='256M') */
SELECT * FROM ....;
11.Setting dynamic options
與Paimon表交互時,可以在不更改catalog options的情況下調整table options。
Paimon將獲取job-level的dynamic options,并在current session中生效,dynamic options的格式是:
paimon.${catalogName}.${dbName}.${tableName}.${config_key}catalogName/dbName/tableName 可以是 *
例如:
-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;-- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog
SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;
12.Procedures
Flink 1.18及以上版本支持Call Statements,通過編寫SQL來操作Paimon表的數據和元數據。
注意:當 call 一個 procedure 時,必須按順序傳遞參數,如果不想傳遞某些參數,必須使用 ‘’ 作為占位符。
例如,用并行度為4的任務壓縮表default.t
,但不想指定分區和排序策略,調用語句應該是
CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')
指定分區:使用字符串來表示partition filter,“,“表示"AND”,”;"表示“OR”。
例如,指定兩個分區date=01或date=02,需要寫’date=01;date=02’;如果指定一個帶有date=01和day=01的分區,需要寫’date=01,day=01’。
table options 語法:使用字符串來表示table options,格式是’key1=value1,key2=value2…'。
Procedure Name | Usage | Explaination | Example |
---|---|---|---|
compact | CALL [catalog.]sys.compact(‘identifier’) CALL [catalog.]sys.compact(‘identifier’, ‘partitions’) CALL [catalog.]sys.compact(‘identifier’, ‘partitions’, ‘order_strategy’, ‘order_columns’, ‘table_options’) | TO compact a table. Arguments:identifier: the target table identifier. Cannot be empty.partitions: partition filter.order_strategy: ‘order’ or ‘zorder’ or ‘none’. Left empty for ‘none’.order_columns: the columns need to be sort. Left empty if ‘order_strategy’ is ‘none’.table_options: additional dynamic options of the table. | CALL sys.compact(‘default.T’, ‘p=0’, ‘zorder’, ‘a,b’, ‘sink.parallelism=4’) |
compact_database | CALL [catalog.]sys.compact_database() CALL [catalog.]sys.compact_database(‘includingDatabases’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’, ‘tableOptions’) | To compact databases. Arguments:includingDatabases: to specify databases. You can use regular expression.mode: compact mode. “divided”: start a sink for each table, detecting the new table requires restarting the job; “combined” (default): start a single combined sink for all tables, the new table will be automatically detected.includingTables: to specify tables. You can use regular expression.excludingTables: to specify tables that are not compacted. You can use regular expression.tableOptions: additional dynamic options of the table. | CALL sys.compact_database(‘db1|db2’, ‘combined’, ‘table_.*’, ‘ignore’, ‘sink.parallelism=4’) |
create_tag | CALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’, snapshotId) | To create a tag based on given snapshot. Arguments:identifier: the target table identifier. Cannot be empty.tagName: name of the new tag.snapshotId (Long): id of the snapshot which the new tag is based on. | CALL sys.create_tag(‘default.T’, ‘my_tag’, 10) |
delete_tag | CALL [catalog.]sys.delete_tag(‘identifier’, ‘tagName’) | To delete a tag. Arguments:identifier: the target table identifier. Cannot be empty.tagName: name of the tag to be deleted. | CALL sys.delete_tag(‘default.T’, ‘my_tag’) |
merge_into | – when matched then upsert CALL [catalog.]sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’) – when matched then upsert; when not matched then insert CALL [catalog.]sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’, ‘notMatchedInsertCondition’,‘notMatchedInsertValues’) – when matched then delete CALL [catalog].sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedDeleteCondition’) – when matched then upsert + delete; – when not matched then insert CALL [catalog].sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’, ‘notMatchedInsertCondition’,‘notMatchedInsertValues’, ‘matchedDeleteCondition’) | To perform “MERGE INTO” syntax. See merge_into action for details of arguments. | – for matched order rows, – increase the price, – and if there is no match, – insert the order from – the source table CALL sys.merge_into(‘default.T’, ‘’, ‘’, ‘default.S’, ‘T.id=S.order_id’, ‘’, ‘price=T.price+20’, ‘’, ‘*’) |
remove_orphan_files | CALL [catalog.]sys.remove_orphan_files(‘identifier’) CALL [catalog.]sys.remove_orphan_files(‘identifier’, ‘olderThan’) | To remove the orphan data files and metadata files. Arguments:identifier: the target table identifier. Cannot be empty.olderThan: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval. | CALL remove_orphan_files(‘default.T’, ‘2023-10-31 12:00:00’) |
reset_consumer | – reset the new next snapshot id in the consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’, nextSnapshotId) – delete consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’) | To reset or delete consumer. Arguments:identifier: the target table identifier. Cannot be empty.consumerId: consumer to be reset or deleted.nextSnapshotId (Long): the new next snapshot id of the consumer. | CALL sys.reset_consumer(‘default.T’, ‘myid’, 10) |
rollback_to | – rollback to a snapshot CALL sys.rollback_to(‘identifier’, snapshotId) – rollback to a tag CALL sys.rollback_to(‘identifier’, ‘tagName’) | To rollback to a specific version of target table. Argument:identifier: the target table identifier. Cannot be empty.snapshotId (Long): id of the snapshot that will roll back to.tagName: name of the tag that will roll back to. | CALL sys.rollback_to(‘default.T’, 10) |
expire_snapshots | – expires snapshot CALL sys.expire_snapshots(‘identifier’, retainMax) | To expire snapshots. Argument:identifier: the target table identifier. Cannot be empty.retainMax: the maximum number of completed snapshots to retain. | CALL sys.expire_snapshots(‘default.T’, 2) |