配置(Maintenance)
系統表
表特定系統表
表特定系統表包含關于每個表的元數據和信息,例如創建的快照以及正在使用的選項。用戶可以通過批量查詢來訪問系統表。
目前,Flink、Spark、Trino 和 StarRocks 支持查詢系統表。
在某些情況下,表名需要用反引號括起來,以避免語法解析沖突,例如三元訪問模式:
SELECT?*?FROM?my_catalog.my_db.`my_table$snapshots`;
快照表
你可以通過快照表查詢表的快照歷史信息,包括快照中出現的記錄數。
SELECT?*?FROM?my_table$snapshots;
/*
+--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+-------------------------------?+--------------------------------+---------------------+---------------------+-------------------------+----------------+
|??snapshot_id?|??schema_id?|?????commit_user?|?commit_identifier?|??commit_kind?|?????????????commit_time?|?????????????base_manifest_list?|????????????delta_manifest_list?|????????changelog_manifest_list?|??total_record_count?|??delta_record_count?|??changelog_record_count?|??????watermark?|
+--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+-------------------------------?+--------------------------------+---------------------+---------------------+-------------------------+----------------+
|????????????2?|??????????0?|?7ca4cd28-98e...?|?????????????????2?|???????APPEND?|?2022-10-26?11:44:15.600?|?manifest-list-31323d5f-76e6...?|?manifest-list-31323d5f-76e6...?|?manifest-list-31323d5f-76e6...?|???????????????????2?|???????????????????2?|???????????????????????0?|??1666755855600?|
|????????????1?|??????????0?|?870062aa-3e9...?|?????????????????1?|???????APPEND?|?2022-10-26?11:44:15.148?|?manifest-list-31593d5f-76e6...?|?manifest-list-31593d5f-76e6...?|?manifest-list-31593d5f-76e6...?|???????????????????1?|???????????????????1?|???????????????????????0?|??1666755855148?|
+--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+-------------------------------?+--------------------------------+---------------------+---------------------+-------------------------+----------------+
2?rows?in?set
*/
通過查詢快照表,你可以了解該表的提交和過期信息,并對數據進行時間回溯。
模式表
你可以通過模式表查詢表的歷史模式。
SELECT?*?FROM?my_table$schemas;
/*
+-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
|?schema_id?|?????????????????????????fields?|?partition_keys?|?primary_keys?|?options?|?comment?|???????update_time???????|
+-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
|?????????0?|?[{"id":0,"name":"word","typ...?|?????????????[]?|?????["word"]?|??????{}?|?????????|?2022-10-28?11:44:20.600?|
|?????????1?|?[{"id":0,"name":"word","typ...?|?????????????[]?|?????["word"]?|??????{}?|?????????|?2022-10-27?11:44:15.600?|
|?????????2?|?[{"id":0,"name":"word","typ...?|?????????????[]?|?????["word"]?|??????{}?|?????????|?2022-10-26?11:44:10.600?|
+-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
3?rows?in?set
*/
你可以將快照表和模式表連接起來,以獲取給定快照的字段。
SELECT?s.snapshot_id,?t.schema_id,?t.fields?FROM?my_table$snapshots?s?JOIN?my_table$schemas?t?ON?s.schema_id?=?t.schema_id?where?s.snapshot_id?=?100;
選項表
你可以通過選項表查詢從DDL指定的表的選項信息。未顯示的選項將采用默認值。你可以參考配置。
SELECT?*?FROM?my_table$options;/*
+------------------------+--------------------+
|?????????key????????????|????????value???????|
+------------------------+--------------------+
|?snapshot.time-retained?|?????????5?h????????|
+------------------------+--------------------+
1?rows?in?set
*/
審計日志表
如果你需要審計表的變更日志,可以使用?audit_log
?系統表。通過?audit_log
?表,在獲取表的增量數據時,你可以得到?rowkind
?列。你可以使用該列進行過濾等操作以完成審計。
rowkind
?有四個值: -+I
:插入操作。 --U
:更新操作,包含更新前行的先前內容。 -+U
:更新操作,包含更新行的新內容。 --D
:刪除操作。
SELECT?*?FROM?my_table$audit_log;
/*
+------------------+-----------------+-----------------+
|?????rowkind??????|?????column_0????|?????column_1????|
+------------------+-----------------+-----------------+
|????????+I????????|??????...????????|??????...????????|
+------------------+-----------------+-----------------+
|????????-U????????|??????...????????|??????...????????|
+------------------+-----------------+-----------------+
|????????+U????????|??????...????????|??????...????????|
+------------------+-----------------+-----------------+
3?rows?in?set
*/
讀優化表
如果你需要極高的讀取性能,并且能夠接受讀取略微陳舊的數據,可以使用?ro
(讀優化)系統表。讀優化系統表通過僅掃描無需合并的文件來提高讀取性能。
對于主鍵表,ro
?系統表僅掃描最頂層的文件。也就是說,ro
?系統表僅生成最新完全合并的結果。
不同的桶可能在不同時間進行完全合并,因此不同鍵的值可能來自不同的快照。
對于追加表,由于所有文件無需合并即可讀取,ro
?系統表的行為類似于普通追加表。
SELECT?*?FROM?my_table$ro;
文件表
你可以查詢具有特定快照的表的文件。
--?查詢最新快照的文件
SELECT?*?FROM?my_table$files;
/*
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
|?partition?|?bucket?|??????????????????????file_path?|?file_format?|?schema_id?|?level?|?record_count?|?file_size_in_bytes?|?min_key?|?max_key?|??????null_value_counts?|?????????min_value_stats?|?????????max_value_stats?|?min_sequence_number?|?max_sequence_number?|?????????creation_time?|
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
|???????[3]?|??????0?|?data-8f64af95-29cc-4342-adc...?|?????????orc?|?????????0?|?????0?|????????????1?|????????????????593?|?????[c]?|?????[c]?|?{cnt?=?0,?val?=?0,?word?=?0}?|?{cnt?=?3,?val?=?33,?word?=?c}?|?{cnt?=?3,?val?=?33,?word?=?c}?|???????1691551246234?|???????1691551246637?|2023-02-24T16:06:21.166?|
|???????[2]?|??????0?|?data-8b369068-0d37-4011-aa5...?|?????????orc?|?????????0?|?????0?|????????????1?|????????????????593?|?????[b]?|?????[b]?|?{cnt?=?0,?val?=?0,?word?=?0}?|?{cnt?=?2,?val?=?22,?word?=?b}?|?{cnt?=?2,?val?=?22,?word?=?b}?|???????1691551246233?|???????1691551246732?|2023-02-24T16:06:21.166?|
|???????[2]?|??????0?|?data-83aa7973-060b-40b6-8c8...?|?????????orc?|?????????0?|?????0?|????????????1?|????????????????605?|?????[d]?|?????[d]?|?{cnt?=?0,?val?=?0,?word?=?0}?|?{cnt?=?2,?val?=?32,?word?=?d}?|?{cnt?=?2,?val?=?32,?word?=?d}?|???????1691551246267?|???????1691551246798?|2023-02-24T16:06:21.166?|
|???????[5]?|??????0?|?data-3d304f4a-bcea-44dc-a13...?|?????????orc?|?????????0?|?????0?|????????????1?|????????????????593?|?????[c]?|?????[c]?|?{cnt?=?0,?val?=?0,?word?=?0}?|?{cnt?=?5,?val?=?51,?word?=?c}?|?{cnt?=?5,?val?=?51,?word?=?c}?|???????1691551246788?|???????1691551246152?|2023-02-24T16:06:21.166?|
|???????[1]?|??????0?|?data-10abb5bc-0170-43ae-b6a...?|?????????orc?|?????????0?|?????0?|????????????1?|????????????????595?|?????[a]?|?????[a]?|?{cnt?=?0,?val?=?0,?word?=?0}?|?{cnt?=?1,?val?=?11,?word?=?a}?|?{cnt?=?1,?val?=?11,?word?=?a}?|???????1691551246722?|???????1691551246273?|2023-02-24T16:06:21.166?|
|???????[4]?|??????0?|?data-2c9b7095-65b7-4013-a7a...?|?????????orc?|?????????0?|?????0?|????????????1?|????????????????593?|?????[a]?|?????[a]?|?{cnt?=?0,?val?=?0,?word?=?0}?|?{cnt?=?4,?val?=?12,?word?=?a}?|?{cnt?=?4,?val?=?12,?word?=?a}?|???????1691551246321?|???????1691551246109?|2023-02-24T16:06:21.166?|
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
6?rows?in?set
*/--?你也可以查詢特定快照的文件
SELECT?*?FROM?my_table$files?/*+?OPTIONS('scan.snapshot-id'='1')?*/;
/*
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
|?partition?|?bucket?|??????????????????????file_path?|?file_format?|?schema_id?|?level?|?record_count?|?file_size_in_bytes?|?min_key?|?max_key?|??????null_value_counts?|?????????min_value_stats?|?????????max_value_stats?|?min_sequence_number?|?max_sequence_number?|?????????creation_time?|
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
|???????[3]?|??????0?|?data-8f64af95-29cc-4342-adc...?|?????????orc?|?????????0?|?????0?|????????????1?|????????????????593?|?????[c]?|?????[c]?|?{cnt?=?0,?val?=?0,?word?=?0}?|?{cnt?=?3,?val?=?33,?word?=?c}?|?{cnt?=?3,?val?=?33,?word?=?c}?|???????1691551246234?|???????1691551246637?|2023-02-24T16:06:21.166?|
|???????[2]?|??????0?|?data-8b369068-0d37-4011-aa5...?|?????????orc?|?????????0?|?????0?|????????????1?|????????????????593?|?????[b]?|?????[b]?|?{cnt?=?0,?val?=?0,?word?=?0}?|?{cnt?=?2,?val?=?22,?word?=?b}?|?{cnt?=?2,?val?=?22,?word?=?b}?|???????1691551246233?|???????1691551246732?|2023-02-24T16:06:21.166?|
|???????[1]?|??????0?|?data-10abb5bc-0170-43ae-b6a...?|?????????orc?|?????????0?|?????0?|????????????1?|????????????????595?|?????[a]?|?????[a]?|?{cnt?=?0,?val?=?0,?word?=?0}?|?{cnt?=?1,?val?=?11,?word?=?a}?|?{cnt?=?1,?val?=?11,?word?=?a}?|???????1691551246267?|???????1691551246798?|2023-02-24T16:06:21.166?|
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
3?rows?in?set
*/
標簽表
你可以通過標簽表查詢表的標簽歷史信息,包括標簽基于哪些快照以及這些快照的一些歷史信息。你還可以獲取所有標簽名稱,并通過名稱進行時間回溯到特定標簽的數據。
SELECT?*?FROM?my_table$tags;
/*
+----------+-------------+-----------+-------------------------+--------------+--------------+
|?tag_name?|?snapshot_id?|?schema_id?|?????????????commit_time?|?record_count?|???branches???|
+----------+-------------+-----------+-------------------------+--------------+--------------+
|?????tag1?|???????????1?|?????????0?|?2023-06-28?14:55:29.344?|????????????3?|??????[]??????|
|?????tag3?|???????????3?|?????????0?|?2023-06-28?14:58:24.691?|????????????7?|??[branch-1]??|
+----------+-------------+-----------+-------------------------+--------------+--------------+
2?rows?in?set
*/
分支表
你可以查詢表的分支。
SELECT?*?FROM?my_table$branches;
/*
+----------------------+---------------------------+--------------------------+-------------------------+
|??????????branch_name?|??????????created_from_tag?|????created_from_snapshot?|?????????????create_time?|
+----------------------+---------------------------+--------------------------+-------------------------+
|??????????????branch1?|????????????????????tag1???|????????????????????????2?|?2024-07-18?20:31:39.084?|
|??????????????branch2?|????????????????????tag2???|????????????????????????5?|?2024-07-18?21:11:14.373?|
+----------------------+---------------------------+--------------------------+-------------------------+
2?rows?in?set
*/
消費者表
你可以查詢所有包含下一個快照的消費者。
SELECT
*?FROM?my_table$consumers;
/*
+-------------+------------------+
|?consumer_id?|?next_snapshot_id?|
+-------------+------------------+
|?????????id1?|????????????????1?|
|?????????id2?|????????????????3?|
+-------------+------------------+
2?rows?in?set
*/
清單表
你可以查詢當前表最新快照或指定快照中包含的所有清單文件。
--?查詢最新快照的清單
SELECT?*?FROM?my_table$manifests;
/*
+--------------------------------+-------------+------------------+-------------------+---------------+
|??????????????????????file_name?|???file_size?|??num_added_files?|?num_deleted_files?|?????schema_id?|
+--------------------------------+-------------+------------------+-------------------+---------------+
|?manifest-f4dcab43-ef6b-4713...?|????????12365|???????????????40?|?????????????????0?|?????????????0?|
|?manifest-f4dcab43-ef6b-4713...?|????????1648?|????????????????1?|?????????????????0?|?????????????0?|
+--------------------------------+-------------+------------------+-------------------+---------------+
2?rows?in?set
*/--?你也可以查詢指定快照的清單
SELECT?*?FROM?my_table$manifests?/*+?OPTIONS('scan.snapshot-id'='1')?*/;
/*
+--------------------------------+-------------+------------------+-------------------+---------------+
|??????????????????????file_name?|???file_size?|??num_added_files?|?num_deleted_files?|?????schema_id?|
+--------------------------------+-------------+------------------+-------------------+---------------+
|?manifest-f4dcab43-ef6b-4713...?|????????12365|???????????????40?|?????????????????0?|?????????????0?|
+--------------------------------+-------------+------------------+-------------------+---------------+
1?rows?in?set
*/
聚合字段表
你可以通過聚合字段表查詢表的歷史聚合信息。
SELECT?*?FROM?my_table$aggregation_fields;
/*
+------------+-----------------+--------------+--------------------------------+---------+
|?field_name?|??????field_type?|????function??|???????????????function_options?|?comment?|
+------------+-----------------+--------------+--------------------------------+---------+
|?product_id?|?BIGINT?NOT?NULL?|???????????[]?|?????????????????????????????[]?|?????????|
|??????price?|?????????????INT?|?[true,count]?|?[fields.price.ignore-retrac...?|?????????|
|??????sales?|??????????BIGINT?|????????[sum]?|?[fields.sales.aggregate-fun...?|?????????|
+------------+-----------------+--------------+--------------------------------+---------+
3?rows?in?set
*/
分區表
你可以查詢表的分區文件。
SELECT?*?FROM?my_table$partitions;
/*
+---------------+----------------+--------------------+--------------------+------------------------+
|??partition????|???record_count?|??file_size_in_bytes|??????????file_count|????????last_update_time|
+---------------+----------------+--------------------+--------------------+------------------------+
|??[1]??????????|???????????1????|?????????????645????|????????????????1???|?2024-06-24?10:25:57.400|
+---------------+----------------+--------------------+--------------------+------------------------+
*/
全局系統表
全局系統表包含Paimon中所有表的統計信息。為了方便查詢,我們創建了一個名為?sys
?的參考系統數據庫。我們可以在Flink中通過SQL顯示所有全局系統表:
USE?sys;
SHOW?TABLES;
所有選項表
此表與選項表類似,但它顯示所有數據庫中所有表的選項。
SELECT?*?FROM?sys.all_table_options;
/*
+---------------+--------------------------------+--------------------------------+------------------+
|?database_name?|?????????????????????table_name?|????????????????????????????key?|????????????value?|
+---------------+--------------------------------+--------------------------------+------------------+
|?????????my_db?|????????????????????Orders_orc??|?????????????????????????bucket?|???????????????-1?|
|?????????my_db?|????????????????????????Orders2?|?????????????????????????bucket?|???????????????-1?|
|?????????my_db?|????????????????????????Orders2?|???????????????sink.parallelism?|????????????????7?|
|?????????my_db2|??????????????????????OrdersSum?|?????????????????????????bucket?|????????????????1?|
+---------------+--------------------------------+--------------------------------+------------------+
7?rows?in?set
*/
目錄選項表
你可以通過目錄選項表查詢目錄的選項信息。未顯示的選項將采用默認值。你可以參考配置。
SELECT?*?FROM?sys.catalog_options;
/*
+-----------+---------------------------+
|???????key?|?????????????????????value?|
+-----------+---------------------------+
|?warehouse?|?hdfs:///path/to/warehouse?|
+-----------+---------------------------+
1?rows?in?set
*/
拓展:
系統表在數據管理與分析中的應用:這些系統表為數據管理人員和分析師提供了豐富的信息。例如,通過快照表和模式表,數據分析師可以了解數據的歷史變更,從而更好地理解數據的演變過程,有助于調試數據處理流程或進行數據質量追溯。選項表則讓管理員能夠清晰知曉表的配置選項,方便進行性能調優或故障排查。審計日志表對于合規性要求較高的場景非常關鍵,它詳細記錄了數據的變更操作,滿足審計和監管需求。
讀優化表與大數據查詢性能優化:讀優化表(
ro
?系統表)針對大數據場景下的讀取性能進行了優化。在數據量龐大且對數據實時性要求不是極高的情況下,通過僅掃描無需合并的文件,大大減少了查詢時的數據掃描范圍,從而顯著提升讀取速度。對于主鍵表,這種優化方式確保了查詢能夠快速獲取最新完全合并后的結果,避免了掃描大量冗余或過時的數據文件。而對于追加表,ro
?系統表的行為與普通追加表一致,保持了數據讀取的一致性和穩定性。全局系統表與多表管理:全局系統表為管理多個表提供了統一的視角。
sys
?數據庫中的這些表,如所有選項表和目錄選項表,使得管理員可以在一個地方查看和管理所有表及目錄的相關信息。這在大規模數據倉庫環境中尤為重要,方便進行整體的配置管理、性能監控以及資源優化。例如,通過查看所有選項表,管理員可以快速發現不同表在配置上的差異,及時調整以確保整個數據存儲和處理系統的一致性和高效性。
寫入性能
Paimon的寫入性能與檢查點密切相關,因此如果你需要更高的寫入吞吐量,可以采取以下措施:
-
Flink配置(在'flink-conf.yaml'中或通過SQL的SET語句):增加檢查點間隔('execution.checkpointing.interval'),將最大并發檢查點增加到3('execution.checkpointing.max-concurrent-checkpoints'),或者直接使用批處理模式。
-
增加 write-buffer-size 寫入緩沖區大小。
-
啟用write-buffer-spillable可溢出的寫入緩沖區。
-
如果你正在使用固定桶模式,可以重新調整桶的數量。
選項'changelog-producer' = 'lookup' 或 'full-compaction',以及選項'full-compaction.delta-commits' 對寫入性能有很大影響。如果處于快照/全量同步階段,你可以取消設置這些選項,然后在增量階段再次啟用它們。
如果你發現作業的輸入在出現背壓的情況下呈現鋸齒狀模式,這可能是工作節點負載不均衡。你可以考慮開啟異步合并,觀察吞吐量是否提高。
并行度
建議sink的并行度小于或等于桶的數量,最好相等。你可以使用sink.parallelism表屬性來控制sink的并行度。
選項 | 是否必需 | 默認值 | 類型 | 描述 |
---|---|---|---|---|
sink.parallelism | 否 | (無) | 整數 | 定義sink算子的并行度。默認情況下,并行度由框架根據上游鏈式算子的并行度來確定。 |
本地合并
如果你的作業受到主鍵數據傾斜的影響(例如,你想統計網站中每個頁面的瀏覽量,而某些特定頁面非常受用戶歡迎),你可以設置'local-merge-buffer-size',以便在按桶進行洗牌并寫入sink之前,對輸入記錄進行緩沖和合并。當相同主鍵在快照之間頻繁更新時,這特別有用。
緩沖區滿時將被刷新。當你面臨數據傾斜但不知道從何處開始調整緩沖區大小時,我們建議從64MB開始嘗試。
(目前,本地合并不適用于CDC攝入)
文件格式
如果你想實現極致的合并性能,可以考慮使用行存儲文件格式AVRO。
-
優點:可以實現高寫入吞吐量和合并性能。
-
缺點:分析查詢會很慢,行存儲最大的問題是它沒有查詢投影。例如,如果表有100列,但只查詢幾列,行存儲的IO開銷就不容忽視。此外,合并效率會降低,存儲成本會增加。這是一種權衡。
通過以下選項啟用行存儲: -file.format = avro -metadata.stats-mode = none
收集行存儲的統計信息成本有點高,所以我也建議關閉統計信息。
如果你不想將所有文件都修改為Avro格式,至少可以考慮將前幾層的文件修改為Avro格式。你可以使用'file.format.per.level' = '0:avro,1:avro'來指定前兩層的文件為Avro格式。
文件壓縮
默認情況下,Paimon使用zstd合并算法且級別為1,你可以修改合并算法相關設置:
-
'file.compression.zstd-level':默認的zstd級別是1。為了獲得更高的合并率,可以將其配置為9,但讀寫速度會顯著下降。
穩定性
如果桶的數量或資源過少,完全合并可能會導致檢查點超時,Flink的默認檢查點超時時間是10分鐘。
如果你期望即使在這種情況下也能保持穩定性,可以提高檢查點超時時間,例如: -execution.checkpointing.timeout = 60 min
寫入初始化
在寫入初始化階段,桶的寫入器需要讀取所有歷史文件。如果這里出現瓶頸(例如,同時寫入大量分區),你可以使用write-manifest-cache來緩存讀取的清單數據,以加速初始化過程。
寫入內存
Paimon寫入器主要在以下三個地方占用內存:
-
寫入器的內存緩沖區:由單個任務的所有寫入器共享并可被搶占。此內存值可通過write-buffer-size表屬性進行調整。
-
合并多個排序運行進行合并時消耗的內存:可以通過num-sorted-run.compaction-trigger選項來調整要合并的排序運行數量,從而調整該部分內存占用。
-
如果行數據非常大:在進行合并時,一次讀取過多行數據會消耗大量內存。減少read.batch-size選項的值可以緩解這種情況的影響。
-
寫入列式ORC文件時消耗的內存:減少orc.write.batch-size選項的值可以降低ORC格式的內存消耗。
-
如果在寫入任務中文件自動合并:某些大列的字典在合并過程中會顯著消耗內存。
-
要在Parquet格式中禁用所有字段的字典編碼,設置'parquet.enable.dictionary' = 'false'。
-
要在ORC格式中禁用所有字段的字典編碼,設置orc.dictionary.key.threshold = '0'。此外,設置orc.column.encoding.direct = 'field1,field2' 可以禁用特定列的字典編碼。
-
如果你的Flink作業不依賴狀態,請避免使用托管內存,你可以通過以下Flink參數進行控制:
-
taskmanager.memory.managed.size = 1m
或者你可以將Flink托管內存用于寫入緩沖區以避免OOM,設置表屬性:
-
sink.use-managed-memory-allocator = true
提交內存
如果寫入表的數據量特別大,提交節點可能會使用大量內存,如果內存過小則可能發生OOM(內存溢出)。在這種情況下,你需要增加提交器的堆內存,但你可能不想統一增加Flink的TaskManager的內存,因為這可能會導致內存浪費。
你可以使用Flink的細粒度資源管理來僅增加提交器的堆內存:
-
配置Flink參數cluster.fine-grained-resource-management.enabled: true。(Flink 1.18之后默認為true)
-
配置Paimon表選項:sink.committer-memory,例如300MB,具體取決于你的TaskManager。(也支持sink.committer-cpu選項)
拓展:
檢查點與寫入性能關系的深入理解:檢查點在Flink中是為了保證作業的容錯性,但它會對寫入性能產生影響。增加檢查點間隔意味著兩次檢查點之間有更多時間進行寫入操作,從而提高寫入吞吐量。而最大并發檢查點數量的調整,可以避免檢查點操作過于頻繁而阻塞寫入。例如,在一個實時數據寫入任務中,默認的檢查點設置可能導致頻繁的寫入中斷以進行檢查點操作,通過適當增加這兩個參數,可以減少中斷次數,提升整體寫入效率。
數據傾斜與本地合并優化原理:數據傾斜是大數據處理中常見的問題,在Paimon中通過本地合并機制來緩解。當主鍵數據傾斜時,大量相同主鍵的數據會集中在某些桶中,導致寫入性能下降。本地合并緩沖區(local-merge-buffer)會先將輸入記錄緩沖起來并進行合并,減少了相同主鍵數據在桶中的重復寫入,從而提高寫入性能。例如,在電商銷售數據統計中,某些熱門商品的銷售記錄可能會導致數據傾斜,本地合并緩沖區可以先對這些熱門商品的銷售記錄進行合并,再寫入相應的桶,避免了重復數據的頻繁寫入。
文件格式選擇對性能的綜合影響:選擇不同的文件格式(如AVRO)涉及到寫入性能、查詢性能和存儲成本之間的權衡。AVRO的行存儲方式雖然在寫入和合并時表現出色,但由于缺乏查詢投影,在查詢時會讀取更多不必要的數據,增加IO開銷,導致查詢性能下降。在實際應用中,需要根據業務場景來決定。如果是寫入密集型業務,對查詢實時性要求不高,AVRO可能是一個不錯的選擇;但如果是查詢頻繁且對響應時間要求嚴格的業務,則需要謹慎考慮。例如,在日志數據的收集和存儲場景中,由于日志數據主要用于后續分析,對寫入性能要求較高,AVRO格式可以快速將日志數據寫入存儲,而在查詢時雖然速度較慢,但可以通過批量分析的方式來彌補。
內存管理對寫入穩定性的關鍵作用:Paimon寫入過程中的內存管理非常關鍵。從寫入緩沖區到合并過程中的內存使用,再到文件格式相關的內存消耗,每個環節都可能影響寫入的穩定性。合理調整這些內存參數,可以避免內存溢出問題,保證作業的穩定運行。例如,對于寫入緩沖區,根據數據量大小調整write-buffer-size可以確保緩沖區既能有效緩存數據,又不會占用過多內存。而在處理大列數據時,禁用字典編碼等操作,可以減少合并過程中的內存消耗,避免OOM情況的發生,確保數據寫入的連續性和穩定性。
專用合并(Dedicated Compaction)
Paimon的快照管理支持多個寫入器同時寫入。
對于類似S3的對象存儲,其“重命名”操作不具備原子語義。我們需要配置Hive元存儲,并為目錄啟用“lock.enabled”選項。
默認情況下,Paimon支持并發寫入不同分區。一種推薦的模式是,流處理作業將記錄寫入Paimon的最新分區;同時,批處理作業(覆蓋寫入)將記錄寫入歷史分區。
到目前為止,這些操作都運行良好,但是如果你需要多個寫入器將記錄寫入同一分區,情況就會稍微復雜一些。例如,你不想使用UNION ALL,而是有多個流處理作業將記錄寫入一個“部分更新”表。請參考下面的“專用合并作業”。
專用合并作業
默認情況下,Paimon寫入器在寫入記錄時會根據需要執行合并操作。這在大多數用例中已經足夠。
合并會將一些數據文件標記為“已刪除”(并非真正刪除,更多信息請參閱過期快照相關內容)。如果多個寫入器標記同一個文件,在提交更改時就會發生沖突。Paimon會自動解決沖突,但這可能會導致作業重啟。
為避免這些問題,用戶也可以選擇在寫入器中跳過合并操作,而是運行一個專門用于合并的作業。由于合并僅由專用作業執行,寫入器可以持續寫入記錄而無需暫停,并且不會發生沖突。
要在寫入器中跳過合并操作,將以下表屬性設置為true。
選項 | 是否必需 | 默認值 | 類型 | 描述 |
---|---|---|---|---|
write - only | 否 | false | 布爾值 | 如果設置為true,將跳過合并和快照過期操作。此選項與專用合并作業配合使用。 |
要運行專用合并作業,請遵循以下說明。
Flink SQL
運行以下SQL語句:
--?壓縮表
CALL?sys.compact(`table`?=]]>?'default.T');--?帶選項壓縮表
CALL?sys.compact(`table`?=]]>?'default.T',?`options`?=]]>'sink.parallelism=4');--?壓縮表分區
CALL?sys.compact(`table`?=]]>?'default.T',?`partitions`?=]]>?'p=0');--?帶篩選條件壓縮表分區
CALL?sys.compact(`table`?=]]>?'default.T',?`where`?=]]>?'dt>10?and?h<20');
同樣,默認情況下是同步合并,這可能會導致檢查點超時。你可以配置table_conf
以使用異步合并。
數據庫合并作業
你可以運行以下命令為多個數據庫提交合并作業。
Flink SQL
運行以下SQL語句:
CALL?sys.compact_database('includingDatabases')CALL?sys.compact_database('includingDatabases','mode')CALL?sys.compact_database('includingDatabases','mode',?'includingTables')CALL?sys.compact_database('includingDatabases','mode',?'includingTables',?'excludingTables')CALL?sys.compact_database('includingDatabases','mode',?'includingTables',?'excludingTables',?'tableOptions')--?示例
CALL?sys.compact_database('db1|db2',?'combined',?'table_.*',?'ignore','sink.parallelism=4')
排序合并
如果你的表配置為動態桶主鍵表或追加表,你可以觸發按指定列排序的合并操作以加速查詢。
Flink SQL
運行以下SQL語句:
--?排序壓縮表
CALL?sys.compact(`table`?=]]>?'default.T',?order_strategy?=]]>?'zorder',?order_by?=]]>?'a,b')
歷史分區合并
你可以運行以下命令為一段時間內未接收任何新數據的分區提交合并作業。這些分區中的小文件將進行完全合并。
此功能目前僅在批處理模式下可用。
針對單個表
這是針對單個表的操作。
Flink SQL
運行以下SQL語句:
--?歷史分區壓縮表
CALL?sys.compact(`table`?=]]>?'default.T',?'partition_idle_time'?=]]>?'1?d')
針對多個數據庫
這是針對不同數據庫中的多個表的操作。
Flink SQL
運行以下SQL語句:
--?歷史分區壓縮表
CALL?sys.compact_database('includingDatabases','mode',?'includingTables',?'excludingTables',?'tableOptions',?'partition_idle_time')--?示例:壓縮數據庫中表的歷史分區
CALL?sys.compact_database('test_db',?'combined',?'',?'',?'',?'1?d')
拓展:
專用合并作業在復雜寫入場景中的意義:在大數據處理場景中,多個寫入器同時寫入同一分區可能會引發數據一致性和性能問題。Paimon通過引入專用合并作業,將合并操作從常規寫入流程中分離出來,有效避免了因多個寫入器同時標記文件刪除而產生的沖突,保證了寫入過程的連續性,減少作業重啟次數。這對于需要高可靠性和持續寫入性能的應用場景,如實時數據采集和處理系統,尤為重要。
不同合并方式的適用場景:
數據庫合并作業:適用于需要對多個數據庫中的表進行批量合并的場景。例如,在數據倉庫環境中,可能有多個數據庫存儲不同主題的數據,通過數據庫合并作業可以一次性對這些數據庫中的相關表進行合并,提高整體存儲效率和查詢性能。
排序合并:對于動態桶主鍵表或追加表,排序合并能夠根據指定列進行排序后合并,從而在查詢時利用排序優勢加速數據檢索。比如在時間序列數據存儲中,按照時間列進行排序合并后,查詢特定時間范圍的數據時可以更快定位到相關文件,提升查詢速度。
歷史分區合并:主要用于處理長時間未更新的分區,通過對這些分區中的小文件進行完全合并,可以釋放存儲資源,提高存儲利用率,同時也能提升對這些歷史數據的查詢性能。例如,在日志數據存儲中,隨著時間推移,舊的日志分區數據不再更新,對這些分區進行歷史分區合并,可以優化存儲結構,方便后續的數據分析和審計。
并發寫入與合并的協調機制:Paimon在支持并發寫入的同時,通過合理的合并機制保證數據的一致性和性能。在多個寫入器同時工作時,默認的合并方式可能會引發沖突,但通過設置
write - only
屬性并結合專用合并作業,可以實現寫入與合并的解耦,確保并發寫入的高效和穩定。此外,在不同的合并場景中,如同步與異步合并的選擇,也需要根據實際的業務需求和系統資源情況進行權衡。例如,在對實時性要求較高的場景中,異步合并可以避免合并操作對寫入過程的阻塞,保證數據的快速寫入;而在對數據一致性要求極高且系統資源充足的情況下,同步合并可能更合適,因為它能即時完成合并操作,確保數據狀態的及時更新。
管理快照(Manage Snapshots)
本節將介紹與快照相關的管理操作和行為。
過期快照
Paimon寫入器每次提交會生成一個或兩個快照。每個快照可能會添加一些新的數據文件,或者將一些舊數據文件標記為已刪除。然而,被標記的數據文件并不會真正被刪除,因為Paimon還支持回溯到更早的快照。只有當快照過期時,這些文件才會被刪除。
目前,在提交新更改時,Paimon寫入器會自動執行過期操作。通過使舊快照過期,可以刪除不再使用的舊數據文件和元數據文件,從而釋放磁盤空間。
快照過期由以下表屬性控制:
選項 | 是否必需 | 默認值 | 類型 | 描述 |
---|---|---|---|---|
snapshot.time-retained | 否 | 1小時 | 時長 | 已完成快照保留的最長時間。 |
snapshot.num-retained.min | 否 | 10 | 整數 | 已完成快照保留的最小數量。應大于或等于1。 |
snapshot.num-retained.max | 否 | Integer.MAX_VALUE(Java中整型最大值) | 整數 | 已完成快照保留的最大數量。應大于或等于最小數量。 |
snapshot.expire.execution-mode | 否 | sync(同步) | 枚舉值 | 指定過期操作的執行模式。 |
snapshot.expire.limit | 否 | 10 | 整數 | 一次允許過期的最大快照數量。 |
當快照數量小于snapshot.num-retained.min
時,不會有快照過期(即使滿足snapshot.time-retained
條件)。此后,snapshot.num-retained.max
和snapshot.time-retained
將用于控制快照過期,直到剩余的快照滿足條件。
以下示例展示更多細節(snapshot.num-retained.min
為2,snapshot.time-retained
為1小時,snapshot.num-retained.max
為5):
使用元組(snapshotId,對應時間)描述快照項
新快照 | 過期檢查后的所有快照 | 解釋 |
---|---|---|
(snapshots-1, 2023-07-06 10:00) | (snapshots-1, 2023-07-06 10:00) | 無快照過期 |
(snapshots-2, 2023-07-06 10:20) | (snapshots-1, 2023-07-06 10:00) (snapshots-2, 2023-07-06 10:20) | 無快照過期 |
(snapshots-3, 2023-07-06 10:40) | (snapshots-1, 2023-07-06 10:00) (snapshots-2, 2023-07-06 10:20) (snapshots-3, 2023-07-06 10:40) | 無快照過期 |
(snapshots-4, 2023-07-06 11:00) | (snapshots-1, 2023-07-06 10:00) (snapshots-2, 2023-07-06 10:20) (snapshots-3, 2023-07-06 10:40) (snapshots-4, 2023-07-06 11:00) | 無快照過期 |
(snapshots-5, 2023-07-06 11:20) | (snapshots-2, 2023-07-06 10:20) (snapshots-3, 2023-07-06 10:40) (snapshots-4, 2023-07-06 11:00) (snapshots-5, 2023-07-06 11:20) | snapshot-1 過期,因為不滿足 |
(snapshots-6, 2023-07-06 11:30) | (snapshots-3, 2023-07-06 10:40) (snapshots-4, 2023-07-06 11:00) (snapshots-5, 2023-07-06 11:20) (snapshots-6, 2023-07-06 11:30) | snapshot-2 過期,因為不滿足 |
(snapshots-7, 2023-07-06 11:35) | (snapshots-3, 2023-07-06 10:40) (snapshots-4, 2023-07-06 11:00) (snapshots-5, 2023-07-06 11:20) (snapshots-6, 2023-07-06 11:30) (snapshots-7, 2023-07-06 11:35) | 無快照過期 |
(snapshots-8, 2023-07-06 11:36) | (snapshots-4, 2023-07-06 11:00) (snapshots-5, 2023-07-06 11:20) (snapshots-6, 2023-07-06 11:30) (snapshots-7, 2023-07-06 11:35) (snapshots-8, 2023-07-06 11:36) | snapshot-3 過期,因為不滿足 |
請注意,保留時間過短或保留數量過小可能會導致: -批處理查詢找不到文件。例如,表比較大,批處理查詢需要10分鐘讀取數據,但10分鐘前的快照過期了,此時批處理查詢將讀取到已刪除的快照。 -基于表文件的流讀取作業重啟失敗。當作業重啟時,它記錄的快照可能已過期。(在快照過期保留時間較短的情況下,你可以使用消費者ID來保護流讀取。)
默認情況下,Paimon會同步刪除過期快照。當需要刪除的文件過多時,可能無法快速刪除,從而對上游操作造成壓力。為避免這種情況,用戶可以通過將snapshot.expire.execution-mode
設置為async
(異步)來使用異步過期模式。
回滾到快照
將表回滾到特定的快照ID。
Flink
運行以下命令:
/bin/flink?run?\/path/to/paimon-flink-action-0.9.0.jar?\rollback_to?\--warehouse??\--database??\?--table??\--version??\[--catalog_conf??[--catalog_conf??...]]
刪除孤立文件
Paimon文件僅在快照過期時才會被物理刪除。然而,在刪除文件時可能會發生一些意外錯誤,導致存在未被Paimon快照使用的文件(即所謂的“孤立文件”)。你可以提交一個remove_orphan_files
作業來清理它們:
Spark SQL/Flink SQL
CALL?sys.remove_orphan_files(table?=]]>?"my_db.my_table",?[older_than?=]]>?"2023-10-31?12:00:00"])CALL?sys.remove_orphan_files(table?=]]>?"my_db.*",?[older_than?=]]>?"2023-10-31?12:00:00"])
拓展:
快照過期機制的優化考量:快照過期機制是平衡存儲資源利用和數據可訪問性的關鍵。保留時間和保留數量的設置需要根據具體業務場景進行精細調整。例如,在實時數據分析場景中,如果數據變化頻繁且對歷史數據的依賴較低,可以適當縮短保留時間和減少保留數量,以更快地釋放磁盤空間。但對于需要長期保留歷史數據用于審計或趨勢分析的場景,就需要設置較長的保留時間和較多的保留數量。此外,異步過期模式在處理大量過期文件時,能夠有效避免對上游操作的壓力,提高系統整體的穩定性和性能。
回滾到快照的應用場景:回滾到特定快照在數據處理過程中非常有用。例如,當發現最近的數據處理操作導致數據錯誤時,可以通過回滾到之前正確的快照來恢復數據狀態,避免重新處理大量數據的成本。在進行數據更新或遷移操作時,也可以先創建快照,若操作出現問題,能夠迅速回滾到操作前的狀態。這一功能為數據管理提供了一種可靠的“撤銷”機制,增強了系統的容錯能力。
刪除孤立文件的重要性:孤立文件的存在不僅占用寶貴的存儲資源,還可能導致數據管理的混亂。定期清理孤立文件有助于維護存儲系統的整潔和高效。通過
remove_orphan_files
作業,可以根據特定條件(如文件的創建時間)來刪除不再被使用的文件。在大規模數據存儲環境中,這種清理操作對于優化存儲結構、提高文件系統性能至關重要。 例如,在一個長期運行的數據倉庫中,頻繁的寫入和刪除操作可能會產生大量孤立文件,定期執行該作業可以有效避免存儲碎片化問題,確保數據存儲和訪問的高效性。
重新調整桶數量(Rescale Bucket)
由于桶的總數會極大地影響性能,Paimon允許用戶通過ALTER TABLE
命令調整桶的數量,并通過INSERT OVERWRITE
在不重新創建表/分區的情況下重新組織數據布局。在執行覆蓋作業時,框架會自動掃描舊桶數量的數據,并根據當前桶數量對記錄進行哈希處理。
重新調整覆蓋(Rescale Overwrite)
--?重新調整桶的總數
ALTER?TABLE?table_identifier?SET?('bucket'?=?'...');--?重新組織表/分區的數據布局
INSERT?OVERWRITE?table_identifier?[PARTITION?(part_spec)]
SELECT?...?
FROM?table_identifier
[WHERE?part_spec];
請注意:
-
ALTER TABLE
僅修改表的元數據,不會重新組織或重新格式化現有數據。重新組織現有數據必須通過INSERT OVERWRITE
來實現。 -
重新調整桶數量不會影響正在運行的讀取和寫入作業。
-
一旦桶數量發生變化,任何新調度的
INSERT INTO
作業,如果寫入未重新組織的現有表/分區,將拋出TableException
異常,提示信息類似:“嘗試以新的桶數量...寫入表/分區...,但之前的桶數量是...。請切換到批處理模式,并首先執行INSERT OVERWRITE
以重新調整當前數據布局。” -
對于分區表,不同分區可以有不同的桶數量。例如:
ALTER?TABLE?my_table?SET?('bucket'?=?'4');
INSERT?OVERWRITE?my_table?PARTITION?(dt?=?'2022-01-01')
SELECT?*?FROM?...;ALTER?TABLE?my_table?SET?('bucket'?=?'8');
INSERT?OVERWRITE?my_table?PARTITION?(dt?=?'2022-01-02')
SELECT?*?FROM?...;
-
在覆蓋期間,確保沒有其他作業寫入相同的表/分區。
注意:對于啟用了日志系統(如Kafka)的表,請同時重新調整主題的分區以保持一致性。
用例(Use Case)
重新調整桶數量有助于應對吞吐量的突然激增。假設有一個每日流ETL任務用于同步交易數據。表的DDL和管道如下所示:
--?表DDL
CREATE?TABLE?verified_orders?(trade_order_id?BIGINT,item_id?BIGINT,item_price?DOUBLE,dt?STRING,PRIMARY?KEY?(dt,?trade_order_id,?item_id)?NOT?ENFORCED?
)?PARTITIONED?BY?(dt)
WITH?('bucket'?=?'16'
);--?類似于從Kafka表
CREATE?temporary?TABLE?raw_orders(trade_order_id?BIGINT,item_id?BIGINT,item_price?BIGINT,gmt_create?STRING,order_status?STRING
)?WITH?('connector'?=?'kafka','topic'?=?'...','properties.bootstrap.servers'?=?'...','format'?=?'csv'...
);--?以桶數量為16進行流式插入
INSERT?INTO?verified_orders
SELECT?trade_order_id,item_id,item_price,DATE_FORMAT(gmt_create,?'yyyy-MM-dd')?AS?dt
FROM?raw_orders
WHERE?order_status?='verified';
在過去幾周,該管道運行良好。然而,最近數據量快速增長,作業延遲持續增加。為提高數據的時效性,用戶可以:
1. 使用保存點暫停流作業(請參閱“暫停狀態和優雅停止作業并創建最終保存點”):
$?./bin/flink?stop?\--savepointPath?/tmp/flink-savepoints?\$JOB_ID
2. 增加桶的數量:
--?擴展
ALTER?TABLE?verified_orders?SET?('bucket'?=?'32');
3. 切換到批處理模式并覆蓋流作業正在寫入的當前分區:
SET?'execution.runtime-mode'?=?'batch';
--?假設今天是2022-06-22
--?情況1:沒有延遲事件更新歷史分區,因此覆蓋今天的分區就足夠了
INSERT?OVERWRITE?verified_orders?PARTITION?(dt?=?'2022-06-22')
SELECT?trade_order_id,item_id,item_price
FROM?verified_orders
WHERE?dt?=?'2022-06-22';--?情況2:有延遲事件更新歷史分區,但范圍不超過3天
INSERT?OVERWRITE?verified_orders
SELECT?trade_order_id,item_id,item_price,dt
FROM?verified_orders
WHERE?dt?IN?('2022-06-20',?'2022-06-21',?'2022-06-22');
4. 覆蓋作業完成后,切換回流式模式。現在,可以隨著桶數量的增加提高并行度,并從保存點恢復流作業(請參閱“從保存點啟動SQL作業”):
SET?'execution.runtime-mode'?='streaming';
SET?'execution.savepoint.path'?=?;INSERT?INTO?verified_orders
SELECT?trade_order_id,item_id,item_price,DATE_FORMAT(gmt_create,?'yyyy-MM-dd')?AS?dt
FROM?raw_orders
WHERE?order_status?='verified';
拓展:
桶數量對性能的影響原理:在大數據處理中,桶(bucket)的數量影響數據的分布和并行處理能力。更多的桶意味著數據可以更均勻地分布在存儲和計算資源上,從而提高并行處理的效率。例如,在一個大規模的分布式數據存儲系統中,每個桶可以分配到不同的存儲節點和計算資源,當桶數量過少時,可能會導致某些桶的數據量過大,形成數據熱點,影響整體性能。而通過合理增加桶的數量,可以將數據更均衡地分配,減少數據熱點,提升系統的吞吐量和響應速度。
重新調整桶數量操作流程的關鍵要點:重新調整桶數量的過程中,
ALTER TABLE
命令只是更新表的元數據,告知系統新的桶數量設置。真正對數據進行重新組織以適應新桶數量的操作是通過INSERT OVERWRITE
完成的。這是因為INSERT OVERWRITE
會重新掃描數據并根據新的桶數量進行哈希分布,確保數據在新的桶布局下得到正確存儲。在操作過程中,要特別注意避免在覆蓋數據時其他作業對相同表或分區的寫入操作,以免造成數據沖突和不一致。另外,對于依賴日志系統(如Kafka)的表,同步調整日志主題的分區與表的桶數量一致,是保證數據一致性的關鍵步驟。例如,如果Kafka主題分區數量與表的桶數量不匹配,可能會導致數據寫入或讀取的混亂。用例中的性能優化思路:在實際應用中,數據量的動態變化是常見的情況。如上述用例中,隨著數據量的快速增長,原有的桶數量無法滿足處理需求,導致作業延遲增加。通過暫停流作業、增加桶數量、在批處理模式下覆蓋數據,再恢復流作業的流程,實現了系統性能的優化。這種方法在不影響現有數據的情況下,對數據布局進行了調整,提高了系統處理能力。在覆蓋數據時,根據是否有延遲事件更新歷史分區來決定覆蓋的范圍,體現了操作的靈活性和對實際業務場景的適應性。例如,如果有延遲事件更新歷史分區,且范圍較大,就需要覆蓋更多歷史分區的數據,以確保數據的完整性和準確性。
管理標簽
Paimon的快照提供了一種便捷的歷史數據查詢方式。但在大多數場景下,一個作業會生成大量快照,并且表會根據配置使舊快照過期。快照過期會刪除舊數據文件,過期快照的歷史數據將無法再查詢。
為解決此問題,你可以基于某個快照創建標簽。標簽會維護該快照的清單和數據文件。一種典型用法是每天創建標簽,這樣就能保留每天的歷史數據用于批量讀取。
自動創建
Paimon支持在寫入作業中自動創建標簽。
步驟1:選擇創建模式?
你可以通過表選項tag.automatic-creation
設置創建模式。支持的值有:
-
process-time
:基于機器時間創建標簽。 -
watermark
:基于Sink輸入的水印創建標簽。 -
batch
:在批處理場景中,當前任務完成后生成一個標簽。
如果你選擇Watermark
,可能需要指定水印的時區。如果水印不在協調世界時(UTC)時區,請配置sink.watermark-time-zone
。
步驟2:選擇創建周期?
即使用何種頻率生成標簽。對于tag.creation-period
,你可以選擇daily
(每天)、hourly
(每小時)和two-hours
(每兩小時)。
如果你需要等待延遲數據,可以配置延遲時間:tag.creation-delay
。
步驟3:標簽的自動刪除
你可以配置tag.num-retained-max
或tag.default-time-retained
來自動刪除標簽。
例如,配置表每天0點10分創建一個標簽,最大保留時間為3個月:
--?Flink?SQL
CREATE?TABLE?t?(k?INT?PRIMARY?KEY?NOT?ENFORCED,f0?INT,...
)?WITH?('tag.automatic-creation'?=?'process-time','tag.creation-period'?=?'daily','tag.creation-delay'?=?'10?m','tag.num-retained-max'?=?'90'
);INSERT?INTO?t?SELECT...;--?Spark?SQL--?讀取最新快照
SELECT?*?FROM?t;--?讀取標簽快照
SELECT?*?FROM?t?VERSION?AS?OF?'2023-07-26';--?讀取標簽間的增量數據
SELECT?*?FROM?paimon_incremental_query('t',?'2023-07-25',?'2023-07-26');
更多關于Spark的查詢,請參閱“查詢表”。
創建標簽
你可以使用給定的名稱和快照ID創建標簽。
Flink:
運行以下命令:
/bin/flink?run?\/path/to/paimon-flink-action-0.9.0.jar?\create_tag?\--warehouse??\--database??\?--table??\--tag_name??\[--snapshot?]?\[--time_retained?]?\[--catalog_conf??[--catalog_conf??...]]
如果未設置snapshot
,snapshot_id
默認為最新的快照ID。
刪除標簽
你可以按名稱刪除標簽。
Flink:
運行以下命令:
/bin/flink?run?\/path/to/paimon-flink-action-0.9.0.jar?\delete_tag?\--warehouse??\--database??\?--table??\--tag_name??\[--catalog_conf??[--catalog_conf??...]]
回滾到標簽
將表回滾到特定標簽。所有快照ID大于該標簽的快照和標簽都將被刪除(數據也會被刪除)。
Flink:
運行以下命令:
/bin/flink?run?\/path/to/paimon-flink-action-0.9.0.jar?\rollback_to?\--warehouse??\--database??\?--table??\--version??\[--catalog_conf??[--catalog_conf??...]]
拓展:
標簽管理在數據歷史版本維護中的意義:在大數據處理流程中,數據不斷更新,快照雖然提供了歷史數據查詢的途徑,但由于過期機制會刪除舊數據文件,導致長期歷史數據難以保留。通過標簽管理,特別是自動創建標簽功能,可以按特定周期(如每天)保留關鍵時間點的數據狀態,這對于需要長期保存歷史數據用于分析、審計或數據回溯的場景至關重要。例如,在電商數據分析中,每天創建標簽可以保留每天的交易數據狀態,方便后續分析每日業務趨勢、對比不同日期的銷售情況等。
自動創建標簽模式的應用場景差異:不同的自動創建標簽模式適用于不同的業務場景。
process-time
模式基于機器時間創建標簽,簡單直接,適用于對時間精度要求不高,主要按固定時間間隔(如每天、每小時)保留數據的場景。watermark
模式基于Sink輸入的水印創建標簽,更適合處理流數據且需要根據數據處理進度來標記數據的場景。例如,在實時流數據處理中,水印可以反映數據的完整性,基于水印創建標簽能確保在數據完整性達到一定程度時標記數據,適用于對數據準確性和實時性要求較高的場景。batch
模式則適用于批處理作業,在任務完成后創建標簽,適合定期批量處理數據并保留處理結果的場景,如每月的財務報表數據處理。標簽操作與數據一致性:在進行標簽的創建、刪除和回滾操作時,需要注意數據一致性。例如,刪除標簽時,如果有其他作業正在依賴該標簽的數據,可能會導致數據不一致或作業失敗。回滾到標簽操作不僅會刪除后續的快照和標簽,還會刪除相關數據,這在生產環境中需要謹慎操作,確保不會誤刪重要數據。在實際應用中,應結合業務需求和數據依賴關系,制定合理的標簽管理策略,以保證數據的一致性和可用性。例如,在刪除標簽前,可以先檢查是否有作業正在使用該標簽數據;在回滾操作前,進行數據備份或通知相關業務方,避免對業務造成影響。
Paimon指標(Metrics)
Paimon構建了一個指標系統,用于衡量讀寫行為,例如在上次規劃中掃描了多少清單文件、上次提交操作花費了多長時間、上次合并操作刪除了多少文件。
在Paimon的指標系統中,指標以表為粒度進行更新和報告。
Paimon指標系統提供三種類型的指標:儀表盤指標(Gauge)、計數器指標(Counter)、直方圖指標(Histogram)。
-
儀表盤指標(Gauge):在某個時間點提供任意類型的值。
-
計數器指標(Counter):通過遞增和遞減來統計數值。
-
直方圖指標(Histogram):衡量一組值的統計分布,包括最小值、最大值、平均值、標準差和百分位數。
Paimon支持內置指標來衡量提交、掃描、寫入和合并操作,這些指標可以橋接到任何支持的計算引擎,如Flink、Spark等。
指標列表
以下是Paimon內置指標列表。它們分為掃描指標、提交指標、寫入指標、寫入緩沖區指標和合并指標幾類。
掃描指標
指標名稱 | 類型 | 描述 |
---|---|---|
lastScanDuration | 儀表盤指標(Gauge) | 完成上次掃描所花費的時間。 |
scanDuration | 直方圖指標(Histogram) | 最近幾次掃描所花費時間的分布情況。 |
lastScannedManifests | 儀表盤指標(Gauge) | 上次掃描中掃描的清單文件數量。 |
lastSkippedByPartitionAndStats | 儀表盤指標(Gauge) | 上次掃描中通過分區過濾和值/鍵統計信息跳過的表文件數量。 |
lastSkippedByBucketAndLevelFilter | 儀表盤指標(Gauge) | 上次掃描中通過桶、桶鍵和層級過濾跳過的表文件數量。 |
lastSkippedByWholeBucketFilesFilter | 儀表盤指標(Gauge) | 上次掃描中通過桶層級值過濾(僅主鍵表)跳過的表文件數量。 |
lastScanSkippedTableFiles | 儀表盤指標(Gauge) | 上次掃描中總共跳過的表文件數量。 |
lastScanResultedTableFiles | 儀表盤指標(Gauge) | 上次掃描中得到的表文件數量。 |
提交指標
指標名稱 | 類型 | 描述 |
---|---|---|
lastCommitDuration | 儀表盤指標(Gauge) | 完成上次提交所花費的時間。 |
commitDuration | 直方圖指標(Histogram) | 最近幾次提交所花費時間的分布情況。 |
lastCommitAttempts | 儀表盤指標(Gauge) | 上次提交嘗試的次數。 |
lastTableFilesAdded | 儀表盤指標(Gauge) | 上次提交中添加的表文件數量,包括新創建的數據文件和之后合并的文件。 |
lastTableFilesDeleted | 儀表盤指標(Gauge) | 上次提交中刪除的表文件數量,這些文件來自之前的合并。 |
lastTableFilesAppended | 儀表盤指標(Gauge) | 上次提交中追加的表文件數量,即新創建的數據文件。 |
lastTableFilesCommitCompacted | 儀表盤指標(Gauge) | 上次提交中合并的表文件數量,包括之前和之后合并的文件。 |
lastChangelogFilesAppended | 儀表盤指標(Gauge) | 上次提交中追加的變更日志文件數量。 |
lastChangelogFileCommitCompacted | 儀表盤指標(Gauge) | 上次提交中合并的變更日志文件數量。 |
lastGeneratedSnapshots | 儀表盤指標(Gauge) | 上次提交中生成的快照文件數量,可能是1個或2個快照。 |
lastDeltaRecordsAppended | 儀表盤指標(Gauge) | 上次提交中以APPEND提交類型的增量記錄數。 |
lastChangelogRecordsAppended | 儀表盤指標(Gauge) | 上次提交中以APPEND提交類型的變更日志記錄數。 |
lastDeltaRecordsCommitCompacted | 儀表盤指標(Gauge) | 上次提交中以COMPACT提交類型的增量記錄數。 |
lastChangelogRecordsCommitCompacted | 儀表盤指標(Gauge) | 上次提交中以COMPACT提交類型的變更日志記錄數。 |
lastPartitionsWritten | 儀表盤指標(Gauge) | 上次提交中寫入的分區數量。 |
lastBucketsWritten | 儀表盤指標(Gauge) | 上次提交中寫入的桶數量。 |
寫入緩沖區指標
指標名稱 | 類型 | 描述 |
---|---|---|
numWriters | 儀表盤指標(Gauge) | 此并行度中的寫入器數量。 |
bufferPreemptCount | 儀表盤指標(Gauge) | 被搶占的內存總數。 |
usedWriteBufferSizeByte | 儀表盤指標(Gauge) | 當前使用的寫入緩沖區大小(以字節為單位)。 |
totalWriteBufferSizeByte | 儀表盤指標(Gauge) | 配置的總寫入緩沖區大小(以字節為單位)。 |
合并指標
指標名稱 | 類型 | 描述 |
---|---|---|
maxLevel0FileCount | 儀表盤指標(Gauge) | 此寫入器當前處理的0級文件的最大數量。如果異步合并不能及時完成,該值會變大。 |
avgLevel0FileCount | 儀表盤指標(Gauge) | 此寫入器當前處理的0級文件的平均數量。如果異步合并不能及時完成,該值會變大。 |
compactionThreadBusy | 儀表盤指標(Gauge) | 此并行度中合并線程的最大繁忙程度。目前,每個并行度只有一個合并線程,因此繁忙程度值范圍從0(空閑)到100(一直運行合并)。 |
avgCompactionTime | 儀表盤指標(Gauge) | 合并線程的平均運行時間,基于記錄的合并時間數據以毫秒為單位計算。該值表示合并操作的平均持續時間。值越高表明平均合并時間越長,可能意味著需要進行性能優化。 |
橋接至Flink
Paimon已實現將指標橋接到Flink的指標系統,這些指標可由Flink報告,并且指標組的生命周期由Flink管理。
使用Flink訪問Paimon時,請結合<scope>.<infix>.<metric_name>
以獲取完整的指標標識符,metric_name
可從指標列表中獲取。
例如,在名為insert_word_count
的Flink作業中,表word_count
的lastPartitionsWritten
指標的標識符為:
localhost.taskmanager.localhost:60340-775a20.insert_word_count.Global Committer : word_count.0.paimon.table.word_count.commit.lastPartitionsWritten
。
從Flink Web界面中,進入提交器操作符的指標,顯示為:
0.Global_Committer___word_count.paimon.table.word_count.commit.lastPartitionsWritten
。
請參考系統范圍以了解Flink范圍。掃描指標僅在Flink版本 >= 1.18時支持。
范圍 | 中綴 |
---|---|
掃描指標 |
|
提交指標 |
|
寫入指標 |
|
寫入緩沖區指標 |
|
合并指標 |
|
Flink源指標 |
|
Flink sink指標 |
|
Flink連接器標準指標
當使用Flink進行讀寫時,Paimon實現了一些關鍵的標準Flink連接器指標來衡量源延遲和sink輸出,詳見FLIP-33:標準化連接器指標。此處列出已實現的Flink源/sink指標。
源指標(Flink)
指標名稱 | 級別 | 類型 | 描述 |
---|---|---|---|
currentEmitEventTimeLag | Flink源操作符 | 儀表盤指標(Gauge) | 從源發送記錄與文件創建之間的時間差。 |
currentFetchEventTimeLag | Flink源操作符 | 儀表盤指標(Gauge) | 讀取數據文件與文件創建之間的時間差。 |
請注意,如果在流查詢中指定了consumer-id
,源指標的級別應變為讀取器操作符,該操作符在監控操作符之后。
sink指標(Flink)
指標名稱 | 級別 | 類型 | 描述 |
---|---|---|---|
numBytesOut | 表 | 計數器指標(Counter) | 輸出的總字節數。 |
numBytesOutPerSecond | 表 | 計量器(Meter) | 每秒輸出的字節數。 |
numRecordsOut | 表 | 計數器指標(Counter) | 輸出的總記錄數。 |
numRecordsOutPerSecond | 表 | 計量器(Meter) | 每秒輸出的記錄數。 |
拓展:
指標系統對Paimon性能優化的作用:Paimon的指標系統為性能優化提供了詳細的數據支持。通過掃描指標,可以了解掃描操作的效率,例如
lastScanDuration
能直觀反映每次掃描的耗時,scanDuration
的直方圖分布有助于發現掃描時間的波動情況,從而判斷是否存在性能瓶頸。提交指標能幫助定位提交操作中的問題,如lastCommitDuration
可查看提交操作的時長,lastCommitAttempts
能發現提交是否頻繁重試,進而分析原因進行優化。寫入緩沖區指標和合并指標也分別從不同角度反映了寫入和合并過程中的資源使用和性能狀況,有助于調整相關參數以提升整體性能。橋接至Flink指標系統的優勢:將Paimon指標橋接到Flink指標系統,使得在Flink環境中使用Paimon時,用戶可以在統一的Flink指標體系下全面監控Paimon的讀寫等操作。Flink強大的指標管理和展示功能(如Flink Web界面)能更直觀地呈現Paimon的運行狀態,方便用戶及時發現性能問題。同時,這種集成也便于與Flink自身的作業指標相結合,進行綜合分析,例如將Paimon的寫入指標與Flink Sink指標關聯,更深入地理解數據從Flink作業流入Paimon的整個過程中的性能表現。
Flink連接器標準指標在數據流轉監控中的意義:Flink連接器標準指標為監控數據在Flink與Paimon之間的流轉提供了關鍵信息。源指標中的
currentEmitEventTimeLag
和currentFetchEventTimeLag
可以幫助用戶了解數據從源端讀取和發送的延遲情況,及時發現數據讀取或發送過程中的異常。Sink指標中的numBytesOut
、numBytesOutPerSecond
、numRecordsOut
和numRecordsOutPerSecond
則直觀地反映了數據寫入Paimon的量和速率,對于評估寫入性能、預測資源需求以及發現潛在的寫入瓶頸非常重要。這些指標的結合使用,能全方位監控數據在Flink與Paimon之間的流轉過程,保障數據處理流程的穩定和高效運行。
權限管理(Manage Privileges)
Paimon在目錄層面提供了一個權限系統。權限決定了哪些用戶可以對哪些對象執行哪些操作,這樣你就能夠以細粒度的方式管理表的訪問權限。
目前,Paimon采用基于身份的訪問控制(IBAC)權限模型。也就是說,權限直接分配給用戶。
這個權限系統僅防止不被期望的用戶通過目錄訪問表。它不會阻止通過臨時表(通過在文件系統上指定表路徑)進行的訪問,也無法防止用戶直接在文件系統上修改數據文件。如果你需要更嚴格的保護,應使用具有訪問管理功能的文件系統。
基本概念
我們現在介紹權限系統的基本概念。
對象
對象是可以被授予訪問權限的實體。除非通過授權允許,否則訪問將被拒絕。
目前,Paimon的權限系統中有三種類型的對象:目錄(CATALOG)、數據庫(DATABASE)和表(TABLE)。對象具有邏輯層次結構,這與它們所代表的概念相關。例如:
-
如果用戶被授予目錄的某項權限,那么他也將對該目錄中的所有數據庫和所有表擁有此權限。
-
如果用戶被授予數據庫的某項權限,那么他也將對該數據庫中的所有表擁有此權限。
-
如果用戶的目錄權限被撤銷,那么他在該目錄中的所有數據庫和所有表上的此權限也將喪失。
-
如果用戶的數據庫權限被撤銷,那么他在該數據庫中的所有表上的此權限也將喪失。
權限
權限是對對象定義的訪問級別。可以使用多種權限來控制授予對象的訪問粒度。權限是特定于對象的。不同的對象可能有不同的權限。
目前,我們支持以下權限:
權限 | 描述 | 可授予的對象 |
---|---|---|
SELECT | 查詢表中的數據。 | 表、數據庫、目錄 |
INSERT | 在表中插入、更新或刪除數據。在表中創建或刪除標簽和分支。 | 表、數據庫、目錄 |
ALTER_TABLE | 修改表的元數據,包括表名、列名、表選項等。 | 表、數據庫、目錄 |
DROP_TABLE | 刪除表。 | 表、數據庫、目錄 |
CREATE_TABLE | 在數據庫中創建表。 | 數據庫、目錄 |
DROP_DATABASE | 刪除數據庫。 | 數據庫、目錄 |
CREATE_DATABASE | 在目錄中創建數據庫。 | 目錄 |
ADMIN | 在目錄中創建或刪除特權用戶,授予或撤銷用戶權限。 | 目錄 |
用戶
用戶是可以被授予權限的實體。用戶通過密碼進行身份驗證。
當權限系統啟用時,會自動創建兩個特殊用戶:
-
根用戶,在啟用權限系統時通過提供的根密碼進行標識。此用戶始終在目錄中擁有所有權限。
-
匿名用戶。如果在創建目錄時未提供用戶名和密碼,這將是默認用戶。
啟用權限
Paimon目前僅支持基于文件的權限系統。只有metastore
?= 'filesystem'(默認值)或metastore
?= 'hive'的目錄支持這種權限系統。
要在文件系統/Hive目錄上啟用權限系統,請執行以下步驟: Flink 1.18+: 運行以下Flink SQL:
--?使用你要啟用權限系統的目錄
USE?CATALOG?`my-catalog`;--?通過提供根密碼初始化權限系統
--?將'root-password'更改為你想要的密碼
CALL?sys.init_file_based_privilege('root-password');
權限系統啟用后,請重新創建目錄并以根用戶身份進行身份驗證,以創建其他用戶并授予他們權限。
權限系統不會影響現有的目錄。也就是說,這些目錄仍然可以自由訪問和修改表。如果你想在這些目錄中使用權限系統,請刪除并使用所需的倉庫路徑重新創建所有目錄。
訪問受權限控制的目錄
要訪問受權限控制的目錄并以用戶身份進行身份驗證,你需要在創建目錄時定義用戶和密碼目錄選項。例如,以下SQL創建一個目錄,同時嘗試以密碼為mypassword
的根用戶身份進行身份驗證:
--?Flink
CREATE?CATALOG?`my-catalog`?WITH?('type'?=?'paimon',--...'user'?=?'root','password'?='mypassword'
);
創建用戶
你必須以具有ADMIN權限的用戶(例如根用戶)身份進行身份驗證才能執行此操作。
要在權限系統中創建用戶,請執行以下步驟: Flink 1.18+: 運行以下Flink SQL:
--?使用你要創建用戶的目錄
--?你必須在該目錄中以具有ADMIN權限的用戶身份進行身份驗證
USE?CATALOG?`my-catalog`;--?創建一個通過指定密碼進行身份驗證的用戶
--?將'user'和'password'更改為你想要的用戶名和密碼
CALL?sys.create_privileged_user('user',?'password');
刪除用戶
你必須以具有ADMIN權限的用戶(例如根用戶)身份進行身份驗證才能執行此操作。
要在權限系統中刪除用戶,請執行以下步驟: Flink 1.18+: 運行以下Flink SQL:
--?使用你要刪除用戶的目錄
--?你必須在該目錄中以具有ADMIN權限的用戶身份進行身份驗證
USE?CATALOG?`my-catalog`;--?將'user'更改為你要刪除的用戶名
CALL?sys.drop_privileged_user('user');
授予用戶權限
你必須以具有ADMIN權限的用戶(例如根用戶)身份進行身份驗證才能執行此操作。
要在權限系統中授予用戶權限,請執行以下步驟: Flink 1.18+: 運行以下Flink SQL:
--?使用你要授予權限的目錄
--?你必須在該目錄中以具有ADMIN權限的用戶身份進行身份驗證
USE?CATALOG?`my-catalog`;--?你可以將'user'更改為你想要的用戶名,并將'SELECT'更改為你想要的其他權限
--?授予'user'對整個目錄的'SELECT'權限
CALL?sys.grant_privilege_to_user('user',?'SELECT');
--?授予'user'對數據庫my_db的'SELECT'權限
CALL?sys.grant_privilege_to_user('user',?'SELECT','my_db');
--?授予'user'對表my_db.my_tbl的'SELECT'權限
CALL?sys.grant_privilege_to_user('user',?'SELECT','my_db','my_tbl');
撤銷用戶權限
你必須以具有ADMIN權限的用戶(例如根用戶)身份進行身份驗證才能執行此操作。
要在權限系統中撤銷用戶的權限,請執行以下步驟: Flink 1.18+: 運行以下Flink SQL:
--?使用你要撤銷權限的目錄
--?你必須在該目錄中以具有ADMIN權限的用戶身份進行身份驗證
USE?CATALOG?`my-catalog`;--?你可以將'user'更改為你想要的用戶名,并將'SELECT'更改為你想要的其他權限
--?撤銷'user'對整個目錄的'SELECT'權限
CALL?sys.revoke_privilege_from_user('user',?'SELECT');
--?撤銷'user'對數據庫my_db的'SELECT'權限
CALL?sys.revoke_privilege_from_user('user',?'SELECT','my_db');
--?撤銷'user'對表my_db.my_tbl的'SELECT'權限
CALL?sys.revoke_privilege_from_user('user',?'SELECT','my_db','my_tbl');
拓展:
權限系統在數據安全管理中的重要性:在大數據環境中,數據的安全性至關重要。Paimon的權限系統通過細粒度的權限控制,確保只有授權用戶能夠對特定對象執行相應操作,有效保護了數據的隱私和完整性。例如,在企業數據倉庫中,不同部門的用戶可能只需要對特定的表或數據庫進行查詢操作,通過授予SELECT權限,可以滿足他們的數據需求,同時防止未經授權的修改。而對于管理員等具有更高權限的用戶,可以授予ADMIN權限,以便進行用戶管理和系統配置等操作。這種分層的權限管理模式,有助于構建一個安全可靠的數據訪問環境。
基于身份的訪問控制(IBAC)模型的特點:IBAC模型直接將權限分配給用戶,使得權限管理相對簡單直接。每個用戶的權限明確,易于跟蹤和管理。與其他訪問控制模型(如基于角色的訪問控制RBAC)相比,IBAC更適用于用戶數量相對較少且權限分配較為個性化的場景。在Paimon中,由于數據對象(目錄、數據庫、表)的層次結構清晰,IBAC模型能夠很好地與之結合,實現權限的級聯授予和撤銷,保證了權限管理的一致性和高效性。例如,當一個用戶在目錄級別被授予某種權限時,其在該目錄下的所有數據庫和表上自動擁有相同權限,減少了重復的權限設置工作。
權限系統與其他安全機制的配合:雖然Paimon的權限系統能夠在目錄層面控制用戶對表的訪問,但它存在一定的局限性,如無法阻止通過臨時表或直接在文件系統上對數據文件的訪問。因此,在實際應用中,需要與其他安全機制配合使用。例如,結合具有訪問管理功能的文件系統(如HDFS的權限管理),可以進一步增強數據的安全性。同時,還可以與認證系統(如Kerberos)集成,提供更強大的用戶身份驗證功能,確保只有合法用戶能夠訪問系統。這種多層次的安全防護體系,能夠全方位保護大數據環境中的數據安全。
管理分支(Manage Branches)
在流數據處理中,修正數據比較困難,因為這可能會影響現有數據,而且用戶會看到流處理的臨時結果,這并非預期情況。
我們假設現有工作流程正在處理的分支是“main”分支。通過創建自定義數據分支,有助于在現有表上對新作業進行實驗測試和數據驗證,無需停止現有的讀/寫工作流程,也無需從主分支復制數據。
通過合并或替換分支操作,用戶可以完成數據的修正。
創建分支
Paimon支持從特定標簽或快照創建分支,或者創建一個空分支,這意味著創建的分支的初始狀態類似于一個空表。
Flink
運行以下SQL:
--?從標簽'tag1'創建名為'branch1'的分支
CALL?sys.create_branch('default.T',?'branch1',?'tag1');--?創建名為'branch1'的空分支
CALL?sys.create_branch('default.T',?'branch1');
刪除分支
你可以按名稱刪除分支。
Flink
運行以下SQL:
CALL?sys.delete_branch('default.T',?'branch1');
基于分支的讀/寫操作
你可以按如下方式基于分支進行讀或寫操作。
Flink
--?從'branch1'分支讀取數據
SELECT?*?FROM?`t$branch_branch1`;
SELECT?*?FROM?`t$branch_branch1`?/*+?OPTIONS('consumer-id'?='myid')?*/;--?向'branch1'分支寫入數據
INSERT?INTO?`t$branch_branch1`?SELECT...
快速推進
將自定義分支快速推進到主分支,會刪除主分支中在該分支初始標簽之后創建的所有快照、標簽和模式。并將分支中的快照、標簽和模式復制到主分支。
Flink
CALL?sys.fast_forward('default.T',?'branch1');
從回退分支進行批量讀取
你可以設置表選項scan.fallback-branch
,這樣當批處理作業從當前分支讀取數據時,如果某個分區不存在,讀取器將嘗試從回退分支讀取該分區。對于流讀取作業,目前不支持此功能,流讀取作業只會從當前分支生成結果。
這個功能的用例是什么呢?假設你創建了一個按日期分區的Paimon表。你有一個長期運行的流作業,將記錄插入到Paimon中,以便可以及時查詢當天的數據。你還有一個批處理作業,每晚運行,將前一天修正后的記錄插入到Paimon中,以保證數據的準確性。
當你從這個Paimon表查詢數據時,你希望首先從批處理作業的結果中讀取數據。但是如果某個分區(例如,當天的分區)在其結果中不存在,那么你希望從流作業的結果中讀取數據。在這種情況下,你可以為流作業創建一個分支,并將scan.fallback-branch
設置為這個流分支。
讓我們看一個示例。
Flink
--?創建Paimon表
CREATE?TABLE?T?(dt?STRING?NOT?NULL,name?STRING?NOT?NULL,amount?BIGINT
)?PARTITIONED?BY?(dt);--?為流作業創建一個分支
CALL?sys.create_branch('default.T',?'test');--?為分支設置主鍵和桶數量
ALTER?TABLE?`T$branch_test`?SET?('primary-key'?=?'dt,name','bucket'?=?'2','changelog-producer'?=?'lookup'
);--?設置回退分支
ALTER?TABLE?T?SET?('scan.fallback-branch'?=?'test'
);--?向流分支寫入記錄
INSERT?INTO?`T$branch_test`?VALUES?('20240725',?'apple',?4),?('20240725',?'peach',?10),?('20240726',?'cherry',?3),?('20240726',?'pear',?6);--?向默認分支寫入記錄
INSERT?INTO?T?VALUES?('20240725',?'apple',?5),?('20240725',?'banana',?7);SELECT?*?FROM?T;
/*
+------------------+------------------+--------+
|???????????????dt?|?????????????name?|?amount?|
+------------------+------------------+--------+
|?????????20240725?|????????????apple?|??????5?|
|?????????20240725?|???????????banana?|??????7?|
|?????????20240726?|???????????cherry?|??????3?|
|?????????20240726?|?????????????pear?|??????6?|
+------------------+------------------+--------+
*/--?重置回退分支
ALTER?TABLE?T?RESET?('scan.fallback-branch');--?現在它只從默認分支讀取數據
SELECT?*?FROM?T;
/*
+------------------+------------------+--------+
|???????????????dt?|?????????????name?|?amount?|
+------------------+------------------+--------+
|?????????20240725?|????????????apple?|??????5?|
|?????????20240725?|???????????banana?|??????7?|
+------------------+------------------+--------+
*/
拓展:
分支管理在流數據處理中的優勢:在流數據處理場景下,數據持續流動且處理過程實時性強,傳統的數據修正方式可能會干擾正在運行的流程或影響現有數據的完整性。Paimon的分支管理功能為解決這些問題提供了有效的途徑。通過創建分支,數據工程師可以在不影響主工作流程的前提下,對新的處理邏輯或數據修正操作進行試驗。例如,在實時數據分析平臺中,為了驗證新的算法對數據處理結果的影響,可以在分支上進行測試,確保無誤后再通過合并操作應用到主分支,從而保證了主工作流程的穩定性和數據的一致性。
快速推進操作的應用場景:快速推進操作在數據版本管理和更新方面具有重要意義。當在分支上完成了數據的驗證、修正或新功能的開發后,通過快速推進將分支的狀態同步到主分支,能夠高效地將這些變更集成到主數據流程中。例如,在一個基于Paimon構建的數據倉庫中,數據分析師在分支上對歷史數據進行了重新計算和修正,完成后使用快速推進操作,可以將修正后的數據快速應用到主分支,使得后續的查詢和分析能夠基于準確的數據進行,同時避免了手動逐個復制和更新相關數據對象(如快照、標簽和模式)的繁瑣過程,提高了數據管理的效率。
回退分支在數據查詢中的應用:設置回退分支為數據查詢提供了更大的靈活性和數據完整性保障。在實際業務中,不同的數據處理作業(如批處理和流處理)可能在不同的時間點生成數據,并且數據的準確性和時效性要求也不同。通過設置回退分支,查詢操作可以根據數據的可用性和需求,優先從批處理作業結果中獲取更準確的數據,如果特定分區數據缺失,則從流處理作業結果中獲取最新數據。這種機制在數據查詢場景中非常實用,例如在電商銷售數據統計中,批處理作業每天晚上對前一天的銷售數據進行修正和匯總,流處理作業實時處理當天的銷售數據。通過回退分支設置,查詢可以在保證數據準確性的同時,獲取到最新的銷售數據,為業務決策提供更全面的支持。
配置(Configuration)
核心選項(CoreOptions)
Paimon的核心配置選項。
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
async-file-write | true | 布爾值 | 寫入文件時是否啟用異步IO寫入。 |
auto-create | false | 布爾值 | 讀寫表時是否自動創建底層存儲。 |
branch | "main" | 字符串 | 指定分支名稱。 |
bucket | -1 | 整數 | 文件存儲的桶數量。它要么等于 -1(動態桶模式),要么必須大于0(固定桶模式)。 |
bucket-key | (none) | 字符串 | 指定Paimon的數據分布策略。數據根據bucket-key的哈希值分配到各個桶中。如果指定多個字段,用逗號分隔。如果未指定,則使用主鍵;如果沒有主鍵,則使用整行數據。 |
cache-page-size | 64 kb | 內存大小 | 緩存的內存頁大小。 |
changelog-producer | none | 枚舉值 | 是否雙寫到變更日志文件。此變更日志文件記錄數據更改的詳細信息,在流讀取時可直接讀取。這適用于有主鍵的表。可能的值: "none":無變更日志文件。 "input":刷新內存表時雙寫到變更日志文件,變更日志來自輸入。 "full-compaction":每次全量合并時生成變更日志文件。 "lookup":在提交數據寫入之前通過“lookup”生成變更日志文件。 |
changelog-producer.row-deduplicate | false | 布爾值 | 對于相同記錄是否生成 -U、+U變更日志。此配置僅在changelog-producer為lookup或full-compaction時有效。 |
changelog.num-retained.max | (none) | 整數 | 保留的已完成變更日志的最大數量。應大于或等于最小數量。 |
changelog.num-retained.min | (none) | 整數 | 保留的已完成變更日志的最小數量。應大于或等于1。 |
changelog.time-retained | (none) | 持續時間 | 保留已完成變更日志的最長時間。 |
commit.callback.#.param | (none) | 字符串 | 類#構造函數的參數字符串。回調類應自行解析參數。 |
commit.callbacks | (none) | 字符串 | 提交成功后要調用的提交回調類列表。類名用逗號連接(例如:com.test.CallbackA,com.sample.CallbackB)。 |
commit.force-compact | false | 布爾值 | 提交前是否強制進行合并。 |
commit.force-create-snapshot | false | 布爾值 | 提交時是否強制創建快照。 |
commit.user-prefix | (none) | 字符串 | 指定提交用戶前綴。 |
compaction.max-size-amplification-percent | 200 | 整數 | 大小放大定義為在變更日志模式表的合并樹中存儲單個字節數據所需的額外存儲量(以百分比表示)。 |
compaction.max.file-num | (none) | 整數 | 對于文件集[f_0,...,f_N],即使sum(size(f_i)) < targetFileSize,追加表觸發合并的最大文件數。此值可避免過多小文件堆積。追加表的默認值為'50'。分桶追加表的默認值為'5'。 |
compaction.min.file-num | 5 | 整數 | 對于文件集[f_0,...,f_N],滿足sum(size(f_i)) >= targetFileSize以觸發追加表合并的最小文件數。此值可避免幾乎滿的文件被合并,因為這并不劃算。 |
compaction.optimization-interval | (none) | 持續時間 | 表示執行優化合并的頻率,此配置用于確保讀優化系統表的查詢及時性。 |
compaction.size-ratio | 1 | 整數 | 變更日志模式表中比較排序運行大小的百分比靈活性。如果候選排序運行(s)的大小比下一個排序運行的大小小1%,則將下一個排序運行包含到該候選集中。 |
consumer-id | (none) | 字符串 | 用于記錄存儲中消費偏移量的消費者ID。 |
consumer.expiration-time | (none) | 持續時間 | 消費者文件的過期間隔。如果自上次修改后消費者文件的生存時間超過此值,則該文件將過期。 |
consumer.ignore-progress | false | 布爾值 | 新啟動的作業是否忽略消費者進度。 |
consumer.mode | exactly-once | 枚舉值 | 指定表的消費者一致性模式。可能的值: "exactly-once":讀取器以快照粒度消費數據,并嚴格確保消費者中記錄的快照ID是所有讀取器都準確消費的快照ID + 1。 "at-least-once":每個讀取器以不同速率消費快照,所有讀取器中消費進度最慢的快照將記錄在消費者中。 |
continuous.discovery-interval | 10 s | 持續時間 | 持續讀取的發現間隔。 |
cross-partition-upsert.bootstrap-parallelism | 10 | 整數 | 跨分區更新中單個任務引導的并行度。 |
cross-partition-upsert.index-ttl | (none) | 持續時間 | 跨分區更新(主鍵不包含所有分區字段)在RocksDB索引中的TTL,這可以避免維護過多索引導致性能越來越差,但請注意這也可能導致數據重復。 |
delete-file.thread-num | (none) | 整數 | 并發刪除文件的最大數量。默認值為Java虛擬機可用的處理器數量。 |
delete.force-produce-changelog | false | 布爾值 | 在刪除SQL中強制生成變更日志,或者你可以使用'streaming-read-overwrite'從覆蓋提交中讀取變更日志。 |
deletion-vector.index-file.target-size | 2 mb | 內存大小 | 刪除向量索引文件的目標大小。 |
deletion-vectors.enabled | false | 布爾值 | 是否啟用刪除向量模式。在此模式下,寫入數據時會生成包含刪除向量的索引文件,標記要刪除的數據。在讀取操作中,通過應用這些索引文件,可以避免合并。 |
dynamic-bucket.assigner-parallelism | (none) | 整數 | 動態桶模式下分配器操作符的并行度,它也與初始化桶的數量相關,過小會導致分配器處理速度不足。 |
dynamic-bucket.initial-buckets | (none) | 整數 | 動態桶模式下分配器操作符中一個分區的初始桶數。 |
dynamic-bucket.target-row-num | 2000000 | 長整數 | 如果桶數為 -1,對于主鍵表,即動態桶模式,此選項控制一個桶的目標行數。 |
dynamic-partition-overwrite | true | 布爾值 | 覆蓋有動態分區列的分區表時是否僅覆蓋動態分區。僅在表有分區鍵時有效。 |
end-input.check-partition-expire | false | 布爾值 | 批處理模式或有界流情況下可選的endInput檢查分區過期。 |
fields.default-aggregate-function | (none) | 字符串 | 部分更新和聚合合并函數中所有字段的默認聚合函數。 |
file-index.in-manifest-threshold | 500 bytes | 內存大小 | 在清單中存儲文件索引字節的閾值。 |
file-index.read.enabled | true | 布爾值 | 是否啟用讀取文件索引。 |
file-reader-async-threshold | 10 mb | 內存大小 | 異步讀取文件的閾值。 |
file.block-size | (none) | 內存大小 | 格式的文件塊大小,orc條帶的默認值為64MB,parquet行組的默認值為128MB。 |
file.compression | "zstd" | 字符串 | 默認文件合并方式。為了更快的讀寫速度,建議使用zstd。 |
file.compression.per.level | Map | 定義不同層級的不同合并策略,你可以這樣添加配置:'file.compression.per.level' = '0:lz4,1:zstd'。 | |
file.compression.zstd-level | 1 | 整數 | 默認文件合并zstd級別。如需更高合并率,可配置為9,但讀寫速度會顯著下降。 |
file.format | "parquet" | 字符串 | 指定數據文件的消息格式,目前支持orc、parquet和avro。 |
file.format.per.level | Map | 定義不同層級的不同文件格式,你可以這樣添加配置:'file.format.per.level' = '0:avro,3:parquet',如果未提供某個層級的文件格式,則使用 | |
force-lookup | false | 布爾值 | 是否強制在合并時使用lookup。 |
full-compaction.delta-commits | (none) | 整數 | 增量提交后將不斷觸發全量合并。 |
ignore-delete | false | 布爾值 | 是否忽略刪除記錄。 |
incremental-between | (none) | 字符串 | 讀取起始快照(不包含)和結束快照之間的增量更改,例如,'5,10'表示快照5和快照10之間的更改。 |
incremental-between-scan-mode | auto | 枚舉值 | 讀取起始快照(不包含)和結束快照之間的增量更改時的掃描類型。可能的值: "auto":對于生成變更日志文件的表,掃描變更日志文件。否則,掃描新更改的文件。 "delta":掃描快照之間新更改的文件。 "changelog":掃描快照之間的變更日志文件。 |
incremental-between-timestamp | (none) | 字符串 | 讀取起始時間戳(不包含)和結束時間戳之間的增量更改,例如,'t1,t2'表示時間戳t1和時間戳t2之間的更改。 |
local-merge-buffer-size | (none) | 內存大小 | 本地合并將在輸入記錄按桶進行洗牌并寫入接收器之前對其進行緩沖和合并。緩沖區滿時將被刷新。主要用于解決主鍵上的數據傾斜問題。嘗試此功能時,建議從64mb開始。 |
local-sort.max-num-file-handles | 128 | 整數 | 外部合并排序的最大扇入數。它限制了文件句柄的數量。如果太小,可能會導致中間合并。但如果太大,會導致同時打開太多文件,消耗內存并導致隨機讀取。 |
lookup-wait | true | 布爾值 | 需要lookup時,提交是否等待通過lookup進行合并。 |
lookup.cache-file-retention | 1 h | 持續時間 | lookup緩存文件的保留時間。文件過期后,如果需要訪問,將從DFS重新讀取以在本地磁盤上構建索引。 |
lookup.cache-max-disk-size | infinite | 內存大小 | lookup緩存的最大磁盤大小,你可以使用此選項限制本地磁盤的使用。 |
lookup.cache-max-memory-size | 256 mb | 內存大小 | lookup緩存的最大內存大小。 |
lookup.cache-spill-compression | "zstd" | 字符串 | lookup緩存的溢出合并方式,目前支持zstd、none、lz4和lzo。 |
lookup.cache.bloom.filter.enabled | true | 布爾值 | 是否為lookup緩存啟用布隆過濾器。 |
lookup.cache.bloom.filter.fpp | 0.05 | 雙精度浮點數 | 定義lookup緩存布隆過濾器的默認誤判率。 |
lookup.hash-load-factor | 0.75 | 單精度浮點數 | lookup的索引負載因子。 |
lookup.local-file-type | hash | 枚舉值 | lookup的本地文件類型。可能的值: "sort":構建用于lookup的排序文件。 "hash":構建用于lookup的哈希文件。 |
manifest.compression | "zstd" | 字符串 | 清單文件的默認文件合并方式。 |
manifest.format | "avro" | 字符串 | 指定清單文件的消息格式。 |
manifest.full-compaction-threshold-size | 16 mb | 內存大小 | 觸發清單全量合并的大小閾值。 |
manifest.merge-min-count | 30 | 整數 | 為避免頻繁的清單合并,此參數指定合并的最小ManifestFileMeta數量。 |
manifest.target-file-size | 8 mb | 內存大小 | 建議的清單文件大小。 |
merge-engine | deduplicate | 枚舉值 | 為有主鍵的表指定合并引擎。可能的值: "deduplicate":去重并保留最后一行。 "partial-update":部分更新非空字段。 "aggregation":聚合具有相同主鍵的字段。 "first-row":去重并保留第一行。 |
metadata.iceberg-compatible | false | 布爾值 | 設置為true時,提交快照后生成Iceberg元數據,以便Iceberg讀取器可以讀取Paimon的原始文件。 |
metadata.stats-mode | "truncate(16)" | 字符串 | 元數據統計信息收集模式。可用值為none、counts、truncate(16)、full。 "none":表示禁用元數據統計信息收集。 "counts":表示僅收集空值計數。 "full":表示收集空值計數、最小值/最大值。 "truncate(16)":表示收集空值計數、截斷長度為16的最小值/最大值。字段級統計模式可通過fields.{field_name}.stats-mode指定。 |
metastore.partitioned-table | false | 布爾值 | 是否在元存儲中將此表創建為分區表。例如,如果你想在Hive中列出Paimon表的所有分區,你需要在Hive元存儲中將此表創建為分區表。此配置選項不影響默認的文件系統元存儲。 |
metastore.tag-to-partition | (none) | 字符串 | 是否在元存儲中將此表創建為分區表,用于映射非分區表標簽。這允許Hive引擎以分區表視圖查看此表,并使用分區字段讀取特定分區(特定標簽)。 |
metastore.tag-to-partition.preview | none | 枚舉值 | 是否在元存儲中預覽生成快照的標簽。這允許Hive引擎在創建之前查詢特定標簽。可能的值: "none":無自動創建的標簽。 "process-time":基于機器時間,處理時間經過周期時間加上延遲后創建標簽。 "watermark":基于輸入的水印,水印經過周期時間加上延遲后創建標簽。 "batch":在批處理場景中,任務完成后生成與當前快照對應的標簽。 |
num-levels | (none) | 整數 | 總層級數,例如,有3個層級,包括0、1、2層級。 |
num-sorted-run.compaction-trigger | 5 | 整數 | 觸發合并的排序運行數。包括0級文件(一個文件一個排序運行)和高級運行(一個層級一個排序運行)。 |
num-sorted-run.stop-trigger | (none) | 整數 | 觸發停止寫入的排序運行數,默認值為'num-sorted-run.compaction-trigger' + 3。 |
page-size | 64 kb | 內存大小 | 內存頁大小。 |
parquet.enable.dictionary | (none) | 整數 | 關閉parquet中所有字段的字典編碼。 |
partial-update.remove-record-on-delete | false | 布爾值 | 在部分更新引擎中收到 -D記錄時是否刪除整行。 |
partition | (none) | 字符串 | 通過表選項定義分區,不能同時在DDL和表選項中定義分區。 |
partition.default-name | "DEFAULT_PARTITION" | 字符串 | 動態分區列值為空/空字符串時的默認分區名稱。 |
partition.expiration-check-interval | 1 h | 持續時間 | 分區過期檢查間隔。 |
partition.expiration-strategy | values-time | 枚舉值 | 確定如何提取分區時間并將其與當前時間進行比較的策略。可能的值: "values-time":此策略將從分區值中提取的時間與當前時間進行比較。 "update-time":此策略將分區的最后更新時間與當前時間進行比較。 |
partition.expiration-time | (none) | 持續時間 | 分區的過期間隔。如果分區的生存時間超過此值,則分區將過期。分區時間從分區值中提取。 |
partition.mark-done-action | "success-file" | 字符串 | 標記分區完成的操作是通知下游應用程序該分區已完成寫入,分區已準備好讀取。1.'success-file':在目錄中添加'_success'文件。2.'done-partition':在元存儲中添加'xxx.done'分區。3.'mark-event':在元存儲中標記分區事件。可以同時配置多個:'done-partition,success-file,mark-event'。 |
partition.timestamp-formatter | (none) | 字符串 | 用于從字符串格式化時間戳的格式化器。它可以與'partition.timestamp-pattern'一起使用,使用指定值創建格式化器。默認格式化器為'yyyy-MM-dd HH:mm:ss'和'yyyy-MM-dd'。支持多個分區字段,如'𝑦𝑒𝑎𝑟?year?month-$day $hour:00:00'。時間戳格式化器與Java的DateTimeFormatter兼容。 |
partition.timestamp-pattern | (none) | 字符串 | 你可以指定一個模式從分區中獲取時間戳。格式化器模式由'partition.timestamp-formatter |
primary-key | (none) | 字符串 | 通過表選項定義主鍵,不能同時在DDL和表選項中定義主鍵。 |
read.batch-size | 1024 | 整數 | orc和parquet的讀取批大小。 |
record-level.expire-time | (none) | 持續時間 | 主鍵表的記錄級過期時間,過期在合并時發生,不能保證及時過期記錄。你還必須指定'record-level.time-field'。 |
record-level.time-field | (none) | 字符串 | 記錄級過期的時間字段。 |
record-level.time-field-type | seconds-int | 枚舉值 | 記錄級過期的時間字段類型,可以是seconds-int或millis-long。可能的值: "seconds-int":以秒為單位的時間戳應為INT類型。 "millis-long":以毫秒為單位的時間戳應為BIGINT類型。 |
rowkind.field | (none) | 字符串 | 為主鍵表生成行類型的字段,行類型決定哪些數據是 '+I'、'-U'、'+U' 或 '-D'。 |
scan.bounded.watermark | (none) | 長整數 | 有界流模式的結束條件“水印”。當遇到更大水印的快照時,流讀取將結束。 |
scan.fallback-branch | (none) | 字符串 | 當批處理作業從表中查詢時,如果當前分支中不存在某個分區,讀取器將嘗試從這個回退分支獲取該分區。 |
scan.file-creation-time-millis | (none) | 長整數 | 配置此時間后,僅讀取在此時間之后創建的數據文件。它獨立于快照,但這是一種不精確的過濾(取決于是否發生合并)。 |
scan.manifest.parallelism | (none) | 整數 | 掃描清單文件的并行度,默認值為CPU處理器數量。注意:增大此參數會在掃描清單文件時增加內存使用。當掃描時遇到內存不足異常時,我們可以考慮減小它。 |
scan.max-splits-per-task | 10 | 整數 | 掃描時一個任務應緩存的最大分割大小。如果枚舉器中緩存的分割大小大于任務數量乘以此值,掃描器將暫停掃描。 |
scan.mode | default | 枚舉值 | 指定源的掃描行為。可能的值: "default":根據其他表屬性確定實際啟動模式。如果設置了 "scan.timestamp-millis",實際啟動模式將是 "from-timestamp";如果設置了 "scan.snapshot-id" 或 "scan.tag-name",實際啟動模式將是 "from-snapshot"。否則,實際啟動模式將是 "latest-full"。 "latest-full":對于流源,首次啟動時生成表上的最新快照,并繼續讀取最新更改。對于批源,僅生成最新快照但不讀取新更改。 "full":已棄用。與 "latest-full" 相同。 "latest":對于流源,持續讀取最新更改,開始時不生成快照。對于批源,行為與 "latest-full" 啟動模式相同。 "compacted-full":對于流源,首次啟動時在表上的最新合并后生成快照,并繼續讀取最新更改。對于批源,僅在最新合并后生成快照但不讀取新更改。啟用計劃全量合并時,選擇全量合并的快照。 "from-timestamp":對于流源,從 "scan.timestamp-millis" 指定的時間戳開始持續讀取更改,開始時不生成快照。對于批源,在 "scan.timestamp-millis" 指定的時間戳生成快照但不讀取新更改。 "from-file-creation-time":對于流和批源,生成快照并按創建時間過濾數據文件。對于流源,首次啟動時,并繼續讀取最新更改。 "from-snapshot":對于流源,從 "scan.snapshot-id" 指定的快照開始持續讀取更改,開始時不生成快照。對于批源,生成 "scan.snapshot-id" 或 "scan.tag-name" 指定的快照但不讀取新更改。 "from-snapshot-full":對于流源,首次啟動時從表上 "scan.snapshot-id" 指定的快照生成,并持續讀取更改。對于批源,生成 "scan.snapshot-id" 指定的快照但不讀取新更改。 "incremental":讀取起始和結束快照或時間戳之間的增量更改。 |
scan.plan-sort-partition | false | 布爾值 | 是否按分區字段對計劃文件進行排序,這允許你按分區順序讀取,即使你的分區寫入無序。建議對“追加僅”表的流讀取使用此選項。默認情況下,流讀取將首先讀取完整快照。為了避免分區的無序讀取,你可以打開此選項。 |
scan.snapshot-id | (none) | 長整數 | “from-snapshot” 或 “from-snapshot-full” 掃描模式下可選的快照ID。 |
scan.tag-name | (none) | 字符串 | “from-snapshot” 掃描模式下可選的標簽名稱。 |
scan.timestamp | (none) | 字符串 | “from-timestamp” 掃描模式下可選的時間戳,它將自動轉換為 Unix 毫秒時間戳,使用本地時區。 |
scan.timestamp-millis | (none) | 長整數 | “from-timestamp” 掃描模式下可選的時間戳。如果沒有早于此時間的快照,將選擇最早的快照。 |
scan.watermark | (none) | 長整數 | “from-snapshot” 掃描模式下可選的水印。如果沒有晚于此水印的快照,將拋出異常。 |
sequence.field | (none) | 字符串 | 為主鍵表生成序列號的字段,序列號決定哪些數據是最新的。 |
sink.watermark-time-zone | "UTC" | 字符串 | 將長整型水印值解析為TIMESTAMP值的時區。默認值為'UTC',這意味著水印定義在TIMESTAMP列上或未定義。如果水印定義在TIMESTAMP_LTZ列上,水印的時區是用戶配置的時區,該值應為用戶配置的本地時區。選項值可以是完整名稱,如'America/Los_Angeles',也可以是自定義時區ID,如'GMT-08:00'。 |
snapshot.clean-empty-directories | false | 布爾值 | 過期快照時是否嘗試清理空目錄,如果啟用,請注意: hdfs:可能會在NameNode中打印異常。 oss/s3:可能會導致性能問題。 |
snapshot.expire.execution-mode | sync | 枚舉值 | 指定過期的執行模式。可能的值: "sync":同步執行過期。如果文件太多,可能需要很長時間并阻塞流處理。 "async":異步執行過期。如果快照的生成大于刪除,將有文件積壓。 |
snapshot.expire.limit | 10 | 整數 | 一次允許過期的最大快照數量。 |
snapshot.num-retained.max | infinite | 整數 | 保留的已完成快照的最大數量。應大于或等于最小數量。 |
snapshot.num-retained.min | 10 | 整數 | 保留的已完成快照的最小數量。應大于或等于1。 |
snapshot.time-retained | 1 h | 持續時間 | 保留已完成快照的最長時間。 |
snapshot.watermark-idle-timeout | (none) | 持續時間 | 在水印處理中,如果源在指定的超時持續時間內保持空閑,則會觸發快照推進并有助于標簽創建。 |
sort-compaction.local-sample.magnification | 1000 | 整數 | 排序合并的本地樣本放大倍數。本地樣本的大小為sink并行度 * 放大倍數。 |
sort-compaction.range-strategy | QUANTITY | 枚舉值 | 排序合并的范圍策略,默認值為quantity。如果分配給排序任務的數據大小不均勻,這可能會導致性能瓶頸,可將此配置設置為size。可能的值: "SIZE" "QUANTITY" |
sort-engine | loser-tree | 枚舉值 | 為有主鍵的表指定排序引擎。可能的值: "min-heap":使用最小堆進行多路排序。 "loser-tree":使用失敗者樹進行多路排序。與堆排序相比,失敗者樹的比較次數更少,效率更高。 |
sort-spill-buffer-size | 64 mb | 內存大小 | 溢出排序中溢出記錄到磁盤的數據量。 |
sort-spill-threshold | (none) | 整數 | 如果排序讀取器的最大數量超過此值,將嘗試溢出。這可以防止太多讀取器消耗過多內存并導致OOM。 |
source.split.open-file-cost | 4 mb | 內存大小 | 源文件的打開文件成本。它用于避免源分割讀取太多文件,這可能會非常慢。 |
source.split.target-size | 128 mb | 內存大小 | 掃描桶時源分割的目標大小。 |
spill-compression | "zstd" | 字符串 | 溢出合并方式,目前支持zstd、lzo和zstd。 |
spill-compression.zstd-level | 1 | 整數 | 默認溢出合并zstd級別。如需更高合并率,可配置為9,但讀寫速度會顯著下降。 |
streaming-read-mode | (none) | 枚舉值 | 流讀取模式,指定從表文件或日志中讀取數據。可能的值: "log":從表日志存儲的數據中讀取。 "file":從表文件存儲的數據中讀取。 |
streaming-read-overwrite | false | 布爾值 | 是否在流模式下從覆蓋中讀取更改。當changelog producer為full-compaction或lookup時,不能設置為true,因為這會讀取重復的更改。 |
streaming.read.snapshot.delay | (none) | 持續時間 | 掃描增量快照時流讀取的延遲持續時間。 |
tag.automatic-completion | false | 布爾值 | 是否自動完成缺失的標簽。 |
tag.automatic-creation | none | 枚舉值 | 是否自動創建標簽以及如何生成標簽。可能的值: "none":無自動創建的標簽。 "process-time":基于機器時間,處理時間經過周期時間加上延遲后創建標簽。 "watermark":基于輸入的水印,水印經過周期時間加上延遲后創建標簽。 "batch":在批處理場景中,任務完成后生成與當前快照對應的標簽。 |
tag.callback.#.param | (none) | 字符串 | 類#構造函數的參數字符串。回調類應自行解析參數。 |
tag.callbacks | (none) | 字符串 | 成功創建標簽后要調用的提交回調類列表。類名用逗號連接(例如:com.test.CallbackA,com.sample.CallbackB)。 |
tag.creation-delay | 0 ms | 持續時間 | 周期結束后創建標簽的延遲時間。這可以允許一些延遲數據進入標簽。 |
tag.creation-period | daily | 枚舉值 | 用于生成標簽的頻率。可能的值: "daily":每天生成一個標簽。 "hourly":每小時生成一個標簽。 "two-hours":每兩小時生成一個標簽。 |
tag.default-time-retained | (none) | 持續時間 | 新創建標簽的默認最長保留時間。它會影響自動創建的標簽和手動(通過過程)創建的標簽。 |
tag.num-retained.max | (none) | 整數 | 保留的最大標簽數量。它僅影響自動創建的標簽。 |
tag.period-formatter | with_dashes | 枚舉值 | 標簽周期的日期格式。可能的值: "with_dashes":帶破折號的日期和小時,例如,'yyyy-MM-dd HH' "without_dashes":不帶破折號的日期和小時,例如,'yyyyMMdd HH' |
target-file-size | (none) | 內存大小 | 文件的目標大小。 主鍵表:默認值為128MB。 追加表:默認值為256MB。 |
write-buffer-for-append | false | 布爾值 | 此選項僅適用于追加表。寫入是否使用寫緩沖區以避免內存不足錯誤。 |
write-buffer-size | 256 mb | 內存大小 | 在轉換為排序的磁盤文件之前,在內存中累積的數據量。 |
write-buffer-spill.max-disk-size | infinite | 內存大小 | 寫緩沖區溢出使用的最大磁盤空間。僅在啟用寫緩沖區溢出時有效。 |
write-buffer-spillable | (none) | 布爾值 | 寫緩沖區是否可溢出。使用對象存儲時默認啟用。 |
write-manifest-cache | 0 bytes | 內存大小 | 寫入初始化時讀取清單文件的緩存大小。 |
write-max-writers-to-spill | 10 | 整數 | 在批處理追加插入中,如果寫入器數量大于此選項,我們將打開緩沖區緩存和溢出功能以避免內存不足。 |
write-only | false | 布爾值 | 如果設置為true,將跳過合并和快照過期。此選項與專用合并作業一起使用。 |
zorder.var-length-contribution | 8 | 整數 | 類型(CHAR、VARCHAR、BINARY、VARBINARY)對zorder排序貢獻的字節數。 |
目錄選項(CatalogOptions)
Paimon目錄的選項。
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
allow-upper-case | (none) | 布爾值 | 指示此目錄是否允許大寫,其默認值取決于特定目錄的實現。 |
cache-enabled | true | 布爾值 | 控制目錄是否緩存數據庫、表和清單。 |
cache.expiration-interval | 1 min | 持續時間 | 控制目錄中數據庫和表的緩存持續時間。 |
cache.manifest.max-memory | (none) | 內存大小 | 控制緩存清單內容的最大內存。 |
cache.manifest.small-file-memory | 128 mb | 內存大小 | 控制緩存小清單文件的緩存內存。 |
cache.manifest.small-file-threshold | 1 mb | 內存大小 | 控制小清單文件的閾值。 |
client-pool-size | 2 | 整數 | 配置連接池的大小。 |
fs.allow-hadoop-fallback | true | 布爾值 | 當找不到針對該方案的文件IO時,是否允許回退到Hadoop文件IO。 |
lineage-meta | (none) | 字符串 | 用于存儲表和數據血緣信息的血緣元數據。可能的值: "jdbc":使用標準jdbc存儲表和數據血緣信息。 "custom":你可以實現LineageMetaFactory和LineageMeta,將血緣信息存儲在自定義存儲中。 |
lock-acquire-timeout | 8 min | 持續時間 | 獲取鎖的最大等待時間。 |
lock-check-max-sleep | 8 s | 持續時間 | 重試檢查鎖時的最大睡眠時間。 |
lock.enabled | (none) | 布爾值 | 啟用目錄鎖。 |
lock.type | (none) | 字符串 | 目錄的鎖類型,如'hive'、'zookeeper'。 |
metastore | "filesystem" | 字符串 | Paimon目錄的元存儲,支持filesystem、hive和jdbc。 |
sync-all-properties | false | 布爾值 | 是否將所有表屬性同步到Hive元存儲。 |
table.type | managed | 枚舉值 | 表的類型。可能的值: "managed":Paimon擁有的表,表數據的整個生命周期由Paimon管理。 "external":Paimon與存儲在外部位置的數據松散耦合的表。 |
uri | (none) | 字符串 | 元存儲服務器的URI。 |
warehouse | (none) | 字符串 | 目錄的倉庫根路徑。 |
文件系統目錄選項(FilesystemCatalogOptions)
文件系統目錄的選項。
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
case-sensitive | true | 布爾值 | 是否區分大小寫。如果不區分大小寫,你需要將此選項設置為false,并且表名和字段將轉換為小寫。 |
Hive目錄選項(HiveCatalogOptions)
Hive目錄的選項。
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
client-pool-cache.eviction-interval-ms | 300000 | 長整數 | 設置客戶端池緩存的逐出間隔(毫秒)。 |
client-pool-cache.keys | (none) | 字符串 | 指定客戶端緩存鍵,多個元素用逗號分隔。 "ugi":表示使用緩存的當前用戶的Hadoop UserGroupInformation實例。 “user_name”類似于UGI,但僅包括由UserGroupInformation#getUserName確定的用戶名。 “conf”:任意配置的名稱。該配置的值將從目錄屬性中提取并添加到緩存鍵中。一個conf元素應以“conf:”前綴開頭,后跟配置名稱。例如,指定“conf:a.b.c”將把“a.b.c”添加到鍵中,這樣具有不同默認目錄的配置就不會共享相同的客戶端池。可以指定多個conf元素。 |
format-table.enabled | false | 布爾值 | 是否支持格式表,格式表對應于常規Hive表,允許進行讀寫操作。但是,在這些過程中,它不會連接到元存儲;因此,新添加的分區不會反映在元存儲中,需要作為單獨的分區操作手動添加。 |
hadoop-conf-dir | (none) | 字符串 | core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml的文件目錄。目前僅支持本地文件系統路徑。如果未配置,嘗試從“HADOOP_CONF_DIR”或“HADOOP_HOME”系統環境加載。配置優先級:1. 從“hadoop-conf-dir”;2. 從HADOOP_CONF_DIR;3. 從HADOOP_HOME/conf;4. HADOOP_HOME/etc/hadoop。 |
hive-conf-dir | (none) | 字符串 | hive-site.xml的文件目錄,用于創建HiveMetastoreClient和安全認證,如Kerberos、LDAP、Ranger等。如果未配置,嘗試從“HIVE_CONF_DIR”環境變量加載。 |
location-in-properties | false | 布爾值 | 設置Hive表/數據庫屬性中的位置。如果在使用對象存儲(如s3、oss)時不想通過Hive的文件系統訪問位置,可以將此選項設置為true。 |
Jdbc目錄選項(JdbcCatalogOptions)
Jdbc目錄的選項。
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
catalog-key | "jdbc" | 字符串 | 自定義jdbc目錄存儲鍵。 |
lock-key-max-length | 255 | 整數 | 設置鎖鍵的最大長度。“lock-key”由“catalog-key”、“database”和“table”三個字段連接而成。 |
Flink目錄選項(FlinkCatalogOptions)
Paimon的Flink目錄選項。
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
default-database | "default" | 字符串 | 默認數據庫名稱。 |
disable-create-table-in-default-db | false | 布爾值 | 如果為true,則不允許在默認數據庫中創建表。默認為false。 |
Flink連接器選項(FlinkConnectorOptions)
Paimon的Flink連接器選項。
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
end-input.watermark | (none) | 長整數 | 批處理模式或有界流情況下可選的endInput水印。 |
lookup.async | false | 布爾值 | 是否啟用異步查找連接。 |
lookup.async-thread-number | 16 | 整數 | 查找異步的線程數。 |
lookup.bootstrap-parallelism | 4 | 整數 | 查找連接中單個任務引導的并行度。 |
lookup.cache | AUTO | 枚舉值 | 查找連接的緩存模式。可能的值: "AUTO" "FULL" |
lookup.dynamic-partition | (none) | 字符串 | 查找的特定動態分區,目前僅支持'max_pt()'。 |
lookup.dynamic-partition.refresh-interval | 1 h | 持續時間 | 查找的特定動態分區刷新間隔,掃描所有分區并獲取相應分區。 |
lookup.refresh.async | false | 布爾值 | 是否在異步線程中刷新查找表。 |
lookup.refresh.async.pending-snapshot-count | 5 | 整數 | 如果掛起的快照計數超過閾值,查找操作符將同步刷新表。 |
partition.end-input-to-done | false | 布爾值 | 當結束輸入時,是否標記完成狀態以指示數據已準備好。 |
partition.idle-time-to-done | (none) | 持續時間 | 設置一個分區在沒有新數據后的時間間隔,經過此時間間隔后,標記完成狀態以指示數據已準備好。 |
partition.time-interval | (none) | 持續時間 | 你可以為分區指定時間間隔,例如,每日分區為“1 d”,每小時分區為“1 h”。 |
scan.infer-parallelism | true | 布爾值 | 如果為false,源的并行度由全局并行度設置。否則,源并行度從分割數(批處理模式)或桶數(流模式)推斷。 |
scan.infer-parallelism.max | 1024 | 整數 | 如果scan.infer-parallelism為true,通過此選項限制源的并行度。 |
scan.parallelism | (none) | 整數 | 為掃描源定義自定義并行度。默認情況下,如果未定義此選項,規劃器將通過考慮全局配置為每個語句單獨推導并行度。如果用戶啟用scan.infer-parallelism,規劃器將通過推斷的并行度推導并行度。 |
scan.push-down | true | 布爾值 | 如果為true,Flink將把投影、過濾器、限制下推到源。代價是在作業中難以重用源。在Flink 1.18或更高版本中,即使進行投影下推,也可以重用源。 |
scan.remove-normalize | false | 布爾值 | 流讀取時是否強制刪除規范化節點。注意:這很危險,如果下游用于計算聚合且輸入不是完整的變更日志,很可能導致數據錯誤。 |
scan.split-enumerator.batch-size | 10 | 整數 | 在StaticFileStoreSplitEnumerator中,每批應分配給子任務多少個分割,以避免超過 |
scan.split-enumerator.mode | fair | 枚舉值 | StaticFileStoreSplitEnumerator分配分割使用的模式。可能的值: "fair":批處理讀取時均勻分配分割,以防止少數任務讀取所有數據。 "preemptive":根據任務的消費速度搶占式分配分割。 |
scan.watermark.alignment.group | (none) | 字符串 | 用于對齊水印的一組源。 |
scan.watermark.alignment.max-drift | (none) | 持續時間 | 對齊水印的最大漂移,在暫停從源/任務/分區消費之前。 |
scan.watermark.alignment.update-interval | 1 s | 持續時間 | 任務應多久通知協調器當前水印,以及協調器應多久宣布最大對齊水印。 |
scan.watermark.emit.strategy | on-event | 枚舉值 | 水印生成的發射策略。可能的值: "on-periodic":定期發射水印,間隔由Flink的'pipeline.auto-watermark-interval'控制。 "on-event":每條記錄發射水印。 |
scan.watermark.idle-timeout | (none) | 持續時間 | 如果在該時間段內流的某個分區沒有記錄流動,則該分區被視為“空閑”,不會阻礙下游操作符中水印的進度。 |
sink.clustering.by-columns | (none) | 字符串 | 指定范圍分區期間用于比較的列名,格式為'columnName1,columnName2'。如果未設置或設置為空字符串,則表示未啟用范圍分區功能。此選項僅對無主鍵且為批處理執行模式的無桶感知表有效。 |
sink.clustering.sample-factor | 100 | 整數 | 指定采樣因子。設S為總樣本數,F為采樣因子,P為sink并行度,則S = F×P。允許的最小采樣因子為20。 |
sink.clustering.sort-in-cluster | true | 布爾值 | 表示范圍分區后是否對屬于每個sink任務的數據進一步排序。 |
sink.clustering.strategy | "auto" | 字符串 | 指定范圍分區使用的比較算法,包括'zorder'、'hilbert'和'order',分別對應z-order曲線算法、hilbert曲線算法和基本類型比較算法。未配置時,將根據'sink.clustering.by-columns'中的列數自動確定算法。1列時使用'order',小于5列時使用'zorder',5列或更多列時使用'hilbert'。 |
sink.committer-cpu | 1.0 | 雙精度浮點數 | Sink提交器的CPU,用于控制全局提交器的CPU核心數。 |
sink.committer-memory | (none) | 內存大小 | Sink提交器的內存,用于控制全局提交器的堆內存。 |
sink.committer-operator-chaining | true | 布爾值 | 允許sink提交器和寫入器操作符鏈接在一起。 |
sink.cross-partition.managed-memory | 256 mb | 內存大小 | 跨分區更新中RocksDB的托管內存權重,Flink將根據該權重計算內存大小,實際使用的內存取決于運行環境。 |
sink.managed.writer-buffer-memory | 256 mb | 內存大小 | 托管內存中寫入器緩沖區的權重,Flink將根據該權重為寫入器計算內存大小,實際使用的內存取決于運行環境。 |
sink.parallelism | (none) | 整數 | 為sink定義自定義并行度。默認情況下,如果未定義此選項,規劃器將通過考慮全局配置為每個語句單獨推導并行度。 |
sink.savepoint.auto-tag | false | 布爾值 | 如果為true,將為Flink保存點創建的快照自動創建一個標簽。 |
sink.use-managed-memory-allocator | false | 布爾值 | 如果為true,Flink sink將為合并樹使用托管內存;否則,它將創建一個獨立的內存分配器。 |
source.checkpoint-align.enabled | false | 布爾值 | 是否將Flink檢查點與Paimon表的快照對齊,如果為true,僅在消耗快照時才進行檢查點。 |
source.checkpoint-align.timeout | 30 s | 持續時間 | 如果檢查點開始觸發時新快照尚未生成,枚舉器將阻塞檢查點并等待新快照。設置最大等待時間以避免無限等待,如果超時,檢查點將失敗。請注意,它應設置為小于檢查點超時時間。 |
streaming-read.shuffle-bucket-with-partition | true | 布爾值 | 流讀取時是否按分區和桶進行洗牌。 |
unaware-bucket.compaction.parallelism | (none) | 整數 | 為無桶感知表合并作業定義自定義并行度。默認情況下,如果未定義此選項,規劃器將通過考慮全局配置為每個語句單獨推導并行度。 |
Spark目錄選項(SparkCatalogOptions)
Paimon的Spark目錄選項。
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
catalog.create-underlying-session-catalog | false | 布爾值 | 如果為true,在使用SparkGenericCatalog時,創建并使用底層會話目錄而不是默認會話目錄。 |
defaultDatabase | "default" | 字符串 | 默認數據庫名稱。 |
Spark連接器選項(SparkConnectorOptions)
Paimon的Spark連接器選項。
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
read.changelog | false | 布爾值 | 是否以變更日志的形式讀取行(在行中添加rowkind列以表示其更改類型)。 |
read.stream.maxBytesPerTrigger | (none) | 長整數 | 單個批次中返回的最大字節數。 |
read.stream.maxFilesPerTrigger | (none) | 整數 | 單個批次中返回的最大文件數。 |
read.stream.maxRowsPerTrigger | (none) | 長整數 | 單個批次中返回的最大行數。 |
read.stream.maxTriggerDelayMs | (none) | 長整數 | 兩個相鄰批次之間的最大延遲,與read.stream.minRowsPerTrigger一起用于創建MinRowsReadLimit。 |
read.stream.minRowsPerTrigger | (none) | 長整數 | 單個批次中返回的最小行數,與read.stream.maxTriggerDelayMs一起用于創建MinRowsReadLimit。 |
write.merge-schema | false | 布爾值 | 如果為true,在寫入數據之前自動合并數據模式和表模式。 |
write.merge-schema.explicit-cast | false | 布爾值 | 如果為true,當兩種數據類型滿足顯式轉換規則時,允許合并數據類型。 |
ORC選項
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
orc.column.encoding.direct | (none) | 整數 | 在orc中要跳過字典編碼的字段的逗號分隔列表。 |
orc.dictionary.key.threshold | 0.8 | 雙精度浮點數 | 如果字典中不同鍵的數量大于非空行總數的此分數,則在orc中關閉字典編碼。使用0始終禁用字典編碼。使用1始終使用字典編碼。 |
orc.write.batch-size | 1024 | 整數 | orc的寫入批大小。 |
RocksDB選項
以下選項允許用戶精細調整RocksDB以獲得更好的性能。你可以在表屬性或動態表提示中指定它們。
鍵 | 默認值 | 類型 | 描述 |
---|---|---|---|
lookup.cache-rows | 10000 | 長整數 | 緩存中存儲的最大行數。 |
lookup.continuous.discovery-interval | (none) | 持續時間 | lookup持續讀取的發現間隔。這用作SQL提示。如果未配置,lookup函數將回退到'continuous.discovery-interval'。 |
rocksdb.block.blocksize | 4 kb | 內存大小 | 每個塊中包裝的用戶數據的近似大小(以字節為單位)。默認塊大小為“4KB”。 |
rocksdb.block.cache-size | 128 mb | 內存大小 | RocksDB中數據塊的緩存量。 |
rocksdb.block.metadata-blocksize | 4 kb | 內存大小 | 每個塊中分區元數據的近似大小。當前在啟用分區索引/過濾器選項時應用于索引塊。默認塊大小為“4KB”。 |
rocksdb.bloom-filter.bits-per-key | 10.0 | 雙精度浮點數 | 布隆過濾器將使用的每個鍵的位數,僅在使用布隆過濾器時生效。默認值為10.0。 |
rocksdb.bloom-filter.block-based-mode | false | 布爾值 | 如果為true,RocksDB將使用基于塊的過濾器而不是完整過濾器,僅在使用布隆過濾器時生效。默認值為“false”。 |
rocksdb.compaction.level.max-size-level-base | 256 mb | 內存大小 | 級別基礎文件總大小的上限(以字節為單位)。默認值為“256MB”。 |
rocksdb.compaction.level.target-file-size-base | 64 mb | 內存大小 | 合并的目標文件大小,它決定了1級文件的大小。默認值為“64MB”。 |
rocksdb.compaction.level.use-dynamic-size | false | 布爾值 | 如果為true,RocksDB將動態選擇每個級別的目標大小。從空數據庫開始,RocksDB將使最后一級成為基礎級別,這意味著將L0數據合并到最后一級,直到它超過max_bytes_for_level_base。然后對倒數第二級重復此過程,依此類推。默認值為“false”。有關更多信息,請參閱RocksDB文檔。 |
rocksdb.compaction.style | LEVEL | 枚舉值 | 數據庫的指定合并樣式。候選合并樣式為LEVEL、FIFO、UNIVERSAL或NONE,Flink選擇“LEVEL”作為默認樣式。可能的值: "LEVEL" "UNIVERSAL" "FIFO" "NONE" |
rocksdb.compression.type | LZ4_COMPRESSION | 枚舉值 | 合并類型。可能的值: "NO_COMPRESSION" "SNAPPY_COMPRESSION" "ZLIB_COMPRESSION" "BZLIB2_COMPRESSION" "LZ4_COMPRESSION" "LZ4HC_COMPRESSION" "XPRESS_COMPRESSION" "ZSTD_COMPRESSION" "DISABLE_COMPRESSION_OPTION" |
rocksdb.files.open | -1 | 整數 | 數據庫可使用的打開文件的最大數量(每個有狀態操作符),“-1”表示無限制。默認值為“-1”。 |
rocksdb.thread.num | 2 | 整數 | 并發后臺刷新和合并作業的最大數量(每個有狀態操作符)。默認值為“2”。 |
rocksdb.use-bloom-filter | false | 布爾值 | 如果為true,每個新創建的SST文件將包含一個布隆過濾器。默認情況下禁用。 |
rocksdb.writebuffer.count | 2 | 整數 | 在內存中建立的寫緩沖區的最大數量。默認值為“2”。 |
rocksdb.writebuffer.number-to-merge | 1 | 整數 | 在寫入存儲之前將合并在一起的最小寫緩沖區數量。默認值為“1”。 |
rocksdb.writebuffer.size | 64 mb | 內存大小 | 在轉換為排序的磁盤文件之前,在內存中累積的數據量(由磁盤上的未排序列表支持)。默認寫緩沖區大小為“64MB”。 |
拓展:
CoreOptions相關拓展:?async-file-write:異步文件寫入在大數據量寫入場景下能顯著提升性能。當開啟異步寫入時,數據可以在后臺線程進行寫入操作,而主線程可以繼續處理其他任務,避免了寫入操作成為整個數據處理流程的瓶頸。例如在高并發的日志數據寫入場景中,異步寫入能夠讓系統在不阻塞其他數據處理邏輯的前提下,高效地將日志數據持久化到存儲中。
changelog-producer:不同的變更日志生成策略適用于不同的業務需求。例如,“lookup”策略適合對數據一致性和實時性要求較高的場景,因為它在提交數據寫入之前就生成變更日志,能保證數據更改的詳細記錄與寫入操作緊密關聯。在金融交易數據處理中,每一筆交易的變更都需要準確記錄,“lookup”策略就可以確保變更日志的及時性和準確性,便于后續的審計和分析。
scan.mode:不同的掃描模式為數據讀取提供了極大的靈活性。“from-timestamp”模式適用于需要從特定時間點開始獲取數據變更的場景,如在監控系統中,可能需要從某個故障發生時間點開始讀取相關數據的變更,以分析故障原因。“compacted-full”模式則在需要獲取經過合并后的最新數據狀態時非常有用,比如在數據倉庫場景中,定期的全量合并可以清理過期數據和合并小文件,“compacted-full”模式能確保讀取到合并后的最新、最優化的數據快照。
CatalogOptions相關拓展:
lock.type:不同的鎖類型適用于不同的部署環境和數據一致性要求。例如,“hive”鎖類型在與Hive集成的環境中能很好地利用Hive已有的鎖機制來保證數據的一致性,適用于需要與Hive生態緊密結合的數據管理場景。而“zookeeper”鎖類型則利用Zookeeper的分布式協調能力,提供了更強大的分布式鎖功能,適用于大規模分布式數據存儲和處理環境,確保在多節點并發訪問時的數據一致性和完整性。
metastore:選擇不同的元存儲方式會影響數據的管理和訪問方式。“filesystem”元存儲簡單直接,適用于小規模、對部署復雜度要求較低的場景,它將元數據存儲在文件系統中,易于理解和維護。“hive”元存儲則可以與Hive的生態系統無縫集成,方便在Hive環境中對Paimon表進行管理和查詢,例如可以利用Hive的SQL接口對Paimon表進行操作,同時共享Hive的元數據管理功能。“jdbc”元存儲則提供了更靈活的元數據存儲方式,可以將元數據存儲在關系型數據庫中,便于進行復雜的元數據查詢和管理,適用于對元數據管理有較高要求的企業級應用。
FlinkConnectorOptions相關拓展:
lookup.async:啟用異步查找連接在大數據量關聯查詢場景下能有效提升性能。在實時數據分析中,常常需要將實時流數據與一個較大的維度表進行關聯,若采用同步查找連接,可能會因為等待維度表的查詢結果而導致流處理延遲。而異步查找連接允許在等待維度表查詢結果的同時,繼續處理其他流數據,當查詢結果返回后再進行相應的處理,從而提高了整個流處理系統的吞吐量和響應速度。
sink.clustering.strategy:不同的范圍分區策略適用于不同的數據分布和查詢需求。“zorder”策略在處理多維數據且需要快速定位特定區域數據的場景中表現出色,例如在地理信息系統(GIS)數據處理中,通過Z-order曲線可以將二維或多維的地理空間數據進行有效的分區和排序,使得查詢特定區域的數據變得更加高效。“hilbert”策略則在處理高維數據時能更好地保持數據的局部性,適用于對數據局部性要求較高的數據分析場景。“order”策略簡單直接,適用于單維度數據的分區和排序,在數據分布較為均勻的單維度數據處理中能發揮較好的性能。
SparkConnectorOptions相關拓展:
read.changelog:以變更日志形式讀取數據在數據版本管理和數據審計場景中非常有用。例如在數據倉庫的緩慢變化維度(SCD)處理中,通過讀取變更日志可以準確記錄每一次數據的變化,包括插入、更新和刪除操作,便于跟蹤數據的演變歷史。在數據質量監控中,變更日志可以幫助分析數據異常變化的原因,通過對比不同時間點的變更日志,可以發現數據質量問題的根源。
write.merge-schema:自動合并數據模式和表模式在數據集成場景中能大大簡化數據寫入操作。當從多個數據源獲取數據并寫入Paimon表時,不同數據源的數據模式可能存在差異。啟用此選項后,系統可以自動將數據模式與目標表模式進行合并,減少了手動進行模式轉換和驗證的工作量,提高了數據集成的效率和準確性。同時,結合“write.merge-schema.explicit-cast”選項,可以在滿足顯式轉換規則的情況下,更加靈活地處理不同數據類型之間的合并,避免了因數據類型不匹配而導致的數據寫入失敗問題。
RocksDB Options相關拓展:
rocksdb.compaction.style:不同的合并樣式對RocksDB的性能和存儲布局有顯著影響。“LEVEL”合并樣式是一種常用的策略,它通過將數據分層存儲,使得不同層次的數據具有不同的合并比和訪問頻率,適用于大多數通用場景,能夠在性能和存儲利用率之間取得較好的平衡。“UNIVERSAL”合并樣式則更注重減少存儲文件的數量,適用于存儲資源有限的場景,但可能會在合并過程中消耗更多的CPU資源。“FIFO”合并樣式按照先進先出的原則進行合并,適用于對數據新鮮度要求較高的場景,確保較新的數據能夠及時得到合并和整理。“NONE”合并樣式則完全禁用合并,適用于對寫入性能要求極高且存儲資源充足的場景,因為它避免了合并過程帶來的開銷,但會占用更多的存儲空間。
rocksdb.use-bloom-filter:布隆過濾器在RocksDB中用于快速判斷數據是否存在于數據庫中。當啟用布隆過濾器時,它可以顯著減少磁盤I/O操作,提高查詢性能。在大規模數據存儲中,數據量可能非常龐大,直接在磁盤上查詢數據是否存在會帶來較高的I/O開銷。布隆過濾器通過在內存中維護一個緊湊的位圖結構,能夠快速判斷某個鍵是否可能存在于數據庫中。雖然布隆過濾器存在一定的誤判率,但在大多數情況下,它可以有效地過濾掉大量不存在的數據查詢請求,從而減少不必要的磁盤I/O,提高系統的整體性能。同時,通過調整“rocksdb.bloom-filter.bits-per-key”和“rocksdb.bloom-filter.block-based-mode”等參數,可以進一步優化布隆過濾器的性能和誤判率,以適應不同的應用場景需求。