主鍵表(Table with PK)
PK 是 Primary Key(主鍵)的縮寫。在數據庫中,主鍵是一個或多個列的組合,其值在表中是唯一的,并且不能為 NULL。主鍵的作用是確保每一行記錄的唯一性,便于數據的查找、管理和維護,還可用于建立表與表之間的關系。
概述
如果定義了一個帶有主鍵的表,就可以在表中插入、更新或刪除記錄。
主鍵由一組列組成,這些列中的值對于每條記錄都是唯一的。Paimon通過在每個桶內對主鍵進行排序來強制數據有序,這樣用戶通過在主鍵上應用過濾條件就能實現高性能操作。詳見“創建表(CREATE TABLE)”。
桶(Bucket)
未分區的表,或者分區表中的各個分區,會進一步細分為桶,為數據提供額外的結構,以便更高效地進行查詢。
每個桶目錄包含一個日志結構合并樹(LSM tree)及其變更日志文件。
桶的范圍由記錄中一個或多個列的哈希值決定。用戶可以通過提供 bucket-key
選項來指定分桶列。如果未指定 bucket-key
選項,則主鍵(如果已定義)或完整記錄將用作桶鍵。
桶是讀寫的最小存儲單元,因此桶的數量限制了最大處理并行度。不過,這個數量也不宜過大,否則會導致產生大量小文件,降低讀取性能。一般來說,建議每個桶中的數據大小約為200MB-1GB。
另外,如果在表創建后想要調整桶的數量,可以參考“重新調整桶(rescale bucket)”。
日志結構合并樹(LSM Trees)
Paimon采用日志結構合并樹(log-structured merge-tree,簡稱LSM樹)作為文件存儲的數據結構。本文檔簡要介紹有關LSM樹的概念。
有序段(Sorted Runs)
LSM樹將文件組織成幾個有序段。一個有序段由一個或多個數據文件組成,并且每個數據文件恰好屬于一個有序段。
數據文件內的記錄按其主鍵排序。在一個有序段內,各個數據文件的主鍵范圍從不重疊。
如你所見,不同的有序段可能有重疊的主鍵范圍,甚至可能包含相同的主鍵。在查詢LSM樹時,必須合并所有有序段,并且所有具有相同主鍵的記錄必須根據用戶指定的合并引擎和每條記錄的時間戳進行合并。
寫入LSM樹的新記錄首先會在內存中緩沖。當內存緩沖區已滿時,內存中的所有記錄將被排序并刷新到磁盤。此時會創建一個新的有序段。
拓展:
主鍵(Primary Key):在數據庫設計中,主鍵是用于唯一標識表中每一行記錄的一個或多個字段的集合。主鍵的主要特點是唯一性和非空性,它確保了表中數據的完整性和準確性,便于數據的索引、關聯和操作。例如,在用戶信息表中,可以將“用戶ID”設置為主鍵,這樣就可以通過該主鍵快速定位和管理每個用戶的記錄。在Paimon中,主鍵不僅用于保證數據的唯一性,還通過在桶內排序來優化查詢性能。
日志結構合并樹(Log-Structured Merge-Tree, LSM Tree):這是一種廣泛應用于數據庫系統的數據結構,特別適用于處理高寫入負載的場景。LSM樹的基本原理是將寫入操作先記錄在日志中,然后定期將日志合并到更大的有序結構中。這種設計減少了隨機I/O操作,提高了寫入性能。在Paimon中,LSM樹用于文件存儲,將數據文件組織成有序段,使得數據的寫入和查詢能夠更高效地進行。例如,在處理大量實時數據寫入時,LSM樹能夠快速響應寫入請求,并在后續的合并過程中優化數據存儲結構,以提升查詢效率。
分桶(Bucketing):在數據存儲和查詢優化中,分桶是一種將數據按照特定規則劃分到不同桶中的技術。通過分桶,可以將數據分布到多個較小的單元中,從而提高查詢并行度和效率。在Paimon中,分桶不僅可以基于主鍵,還可以通過用戶指定的桶鍵進行。合理的分桶策略能夠平衡數據存儲和查詢性能,避免因桶數量過多導致小文件過多影響讀取性能,或者桶數量過少無法充分利用并行處理能力。例如,在處理大規模銷售數據時,可以按照銷售地區或時間等維度進行分桶,以便在查詢特定地區或時間段的數據時能夠快速定位和處理。
數據分布(Data Distribution)
桶是讀寫的最小存儲單元,每個桶目錄包含一個日志結構合并樹(LSM樹)。
固定桶(Fixed Bucket)
配置一個大于0的桶數,使用固定桶模式,根據 Math.abs(key_hashcode % numBuckets)
來計算記錄所屬的桶。
只能通過離線流程重新調整桶的數量,詳見“重新調整桶(Rescale Bucket)”。桶的數量過多會導致產生過多小文件,而桶的數量過少則會導致寫入性能不佳。
動態桶(Dynamic Bucket)
主鍵表的默認模式,或者配置 'bucket' = '-1'
。
先到達的鍵會落入舊桶,新的鍵會落入新桶,桶與鍵的分布取決于數據到達的順序。Paimon維護一個索引來確定哪個鍵對應哪個桶。
Paimon會自動擴展桶的數量。
選項1:
'dynamic-bucket.target-row-num'
:控制一個桶的目標行數。選項2:
'dynamic-bucket.initial-buckets'
:控制初始化的桶數量。
動態桶僅支持單個寫入作業。請勿啟動多個作業寫入相同的分區(這可能會導致數據重復)。即使啟用 'write-only'
并啟動一個專門的合并作業,也無法避免。
普通動態桶模式(Normal Dynamic Bucket Mode)
當你的更新不跨分區(無分區,或者主鍵包含所有分區字段)時,動態桶模式使用哈希(HASH)索引來維護鍵到桶的映射,相比固定桶模式,它需要更多內存。
性能:
一般來說,不會有性能損失,但會有一些額外的內存消耗,一個分區中有1億條記錄會多占用1GB內存,不再活躍的分區不占用內存。
對于更新率較低的表,建議使用此模式以顯著提高性能。
普通動態桶模式支持排序合并(Sort Compact)以加速查詢。詳見“排序合并(Sort Compact)”。
跨分區插入更新動態桶模式(Cross Partitions Upsert Dynamic Bucket Mode)
當你需要跨分區插入更新(主鍵不包含所有分區字段)時,動態桶模式直接維護鍵到分區和桶的映射,使用本地磁盤,并在啟動流寫入作業時通過讀取表中所有現有鍵來初始化索引。
不同的合并引擎有不同的行為:
去重(Deduplicate):從舊分區刪除數據,并將新數據插入到新分區。
部分更新(PartialUpdate)和聚合(Aggregation):將新數據插入到舊分區。
首行(FirstRow):如果存在舊值,則忽略新數據。
性能:對于數據量較大的表,性能會有顯著損失。此外,初始化需要很長時間。
如果你的插入更新操作不依賴太舊的數據,可以考慮配置索引生存時間(TTL)來減少索引和初始化時間:
cross-partition-upsert.index-ttl
:RocksDB索引和初始化中的生存時間(TTL),這可以避免維護過多索引導致性能越來越差。
但請注意,這也可能會導致數據重復。
選擇分區字段(Pick Partition Fields)
在數據倉庫中,以下三種類型的字段可定義為分區字段:
創建時間(推薦):創建時間通常是不可變的,因此你可以放心地將其作為分區字段并添加到主鍵中。
事件時間:事件時間是原始表中的一個字段。對于變更數據捕獲(CDC)數據,例如從MySQL CDC同步的表或由Paimon生成的變更日志,它們都是完整的CDC數據,包括
UPDATE_BEFORE
記錄,即使你聲明主鍵包含分區字段,也能實現唯一性效果(需要'changelog-producer'='input'
)。CDC操作時間戳(CDC op_ts):它不能定義為分區字段,因為無法得知先前記錄的時間戳。所以你需要使用跨分區插入更新,這會消耗更多資源。
拓展:
哈希索引(HASH Index):一種通過哈希函數將鍵值映射到特定存儲位置(如桶)的索引結構。在普通動態桶模式中,哈希索引用于快速定位鍵所屬的桶,提高數據存儲和查詢效率。哈希函數的選擇很關鍵,理想情況下應盡量減少哈希沖突,確保數據均勻分布。例如,在處理大量用戶數據時,通過用戶ID的哈希值將用戶記錄分配到不同桶中,使得查詢特定用戶記錄時能快速定位到對應的桶。
生存時間(TTL, Time-To-Live):在計算機系統中,TTL常用于指定數據或資源的有效期限。在跨分區插入更新動態桶模式下,通過配置索引TTL,可以控制索引數據的保留時間,避免因長期積累過多索引數據而導致性能下降。例如,對于一些時效性較強的數據,設置較短的索引TTL,過期后索引數據被清理,可減少內存占用和索引維護成本,但可能會因索引數據丟失而導致部分數據查詢不準確或重復。
變更數據捕獲(CDC, Change Data Capture):一種用于捕獲數據庫中數據變化的技術,它能實時監測數據庫表的插入、更新和刪除操作,并將這些變化的數據捕獲下來。在Paimon中,涉及CDC數據處理時,理解如何利用CDC數據的特性(如事件時間、
UPDATE_BEFORE
記錄等)來設計分區和主鍵,對于確保數據的一致性和高效處理非常重要。例如,從MySQL數據庫同步數據到Paimon時,通過CDC技術捕獲MySQL表的變更,然后在Paimon中根據這些變更進行相應的數據更新和管理。
表模式(Table Mode)
主鍵表的文件結構大致如上圖所示。表或分區包含多個桶,每個桶是一個單獨的日志結構合并樹(LSM)結構,包含多個文件。
LSM的寫入過程大致如下:Flink檢查點刷新L0文件,并根據需要觸發合并以合并數據。根據寫入時的不同處理方式,有三種模式:
讀時合并(Merge On Read, MOR):默認模式,僅執行小范圍合并,讀取時需要進行合并。
寫時復制(Copy On Write, COW):使用
'full-compaction.delta-commits' = '1'
,將同步進行完全合并,這意味著寫入時就完成合并。寫時合并(Merge On Write, MOW):使用
'deletion-vectors.enabled' = 'true'
,在寫入階段,LSM將被查詢以生成數據文件的刪除向量文件,讀取時可直接過濾掉不必要的行。
對于一般的主鍵表,推薦使用寫時合并模式(合并引擎默認為去重)。
讀時合并(Merge On Read)
MOR是主鍵表的默認模式。
當模式為MOR時,讀取時需要合并所有文件,因為所有文件都是有序的,要進行多路合并,其中包括主鍵的比較計算。
這里存在一個明顯的問題,單個LSM樹一次只能由一個線程讀取,因此讀取并行度受限。如果桶中的數據量過大,會導致讀取性能不佳。所以為了保證讀取性能,建議分析查詢需求表,并將桶中的數據量設置在200MB到1GB之間。但如果桶過小,會有大量小文件的讀寫操作,給文件系統帶來壓力。
此外,由于合并過程的存在,不能對非主鍵列執行基于過濾(Filter)的數據跳過操作,否則新數據會被過濾掉,導致舊數據不正確。
寫入性能:非常好。
讀取性能:不太好。
寫時復制(Copy On Write)
ALTER?TABLE?orders?SET?('full-compaction.delta-commits'?=?'1');
將full-compaction.delta-commits
設置為1,意味著每次寫入都會進行完全合并,所有數據都會合并到最高層。此時讀取時無需合并,讀取性能最高。但每次寫入都需要完全合并,寫放大非常嚴重。
寫入性能:非常差。
讀取性能:非常好。
寫時合并(Merge On Write)
ALTER?TABLE?orders?SET?('deletion-vectors.enabled'?=?'true');
得益于Paimon的LSM結構,它具備按主鍵查詢的能力。我們可以在寫入時生成刪除向量文件,表明文件中的哪些數據已被刪除。讀取時這能直接過濾掉不必要的行,相當于進行了合并且不影響讀取性能。
一個簡單的例子:
通過先刪除舊記錄再添加新記錄來更新數據。
寫入性能:好。
讀取性能:好。
可見性保證:在刪除向量模式下的表,L0級別的文件只有在合并后才會可見。所以默認情況下,合并是同步的,如果開啟異步,數據可能會有延遲。
MOR讀優化(MOR Read Optimized)
如果你不想使用刪除向量模式,又希望在MOR模式下查詢足夠快,但只能查詢到較舊的數據,你還可以:
在寫入數據時配置
‘compaction.optimization-interval’
。從讀優化的系統表中查詢。從優化后的文件結果中讀取可避免合并具有相同鍵的記錄,從而提高讀取性能。
你可以在讀取時靈活平衡查詢性能和數據延遲。
拓展:
讀時合并(Merge On Read, MOR):在這種模式下,數據寫入時不立即進行大規模合并,而是在讀取時才將多個有序文件合并。這使得寫入操作相對快速,因為不需要等待合并完成。然而,讀取性能會受到影響,尤其是數據量較大時,由于單線程讀取和合并操作,可能導致讀取速度慢。在一些大數據存儲場景中,如果數據寫入頻繁但讀取不那么頻繁,MOR模式可以在一定程度上提高整體系統性能。例如,在日志數據的存儲中,日志數據不斷寫入,而查詢頻率相對較低,MOR模式可減少寫入時的開銷。
寫時復制(Copy On Write, COW):此模式下,每次寫入操作都會觸發數據的完全合并,將所有數據合并到最高層。雖然這極大地提高了讀取性能,因為讀取時無需再進行合并操作,但嚴重的寫放大問題會導致寫入性能急劇下降。寫放大意味著為了完成一次寫入,需要進行大量額外的I/O操作(如多次讀取和寫入數據塊)。這種模式適用于對讀取性能要求極高且寫入頻率較低的場景,例如某些數據倉庫,主要用于數據分析查詢,寫入操作相對較少。
寫時合并(Merge On Write, MOW):通過在寫入時生成刪除向量文件,記錄哪些數據已被刪除,讀取時可直接過濾掉這些不必要的數據行,從而在保證寫入性能的同時,也能維持較好的讀取性能。刪除向量文件作為一種優化手段,利用了LSM樹按主鍵查詢的特性,有效提升了整體性能。在實際應用中,對于頻繁更新且對讀寫性能都有要求的主鍵表數據,MOW模式是一種較為理想的選擇。例如,在電商系統的訂單表中,訂單數據會不斷更新,采用MOW模式可在不影響寫入速度的前提下,快速讀取最新的訂單狀態。
合并優化間隔(compaction.optimization-interval):在MOR讀優化中,配置該參數可以控制合并優化的時間間隔。通過合理設置這個間隔,可以在寫入時對數據進行適當的預處理,使得后續讀取時能夠更快地獲取數據,同時也能在一定程度上平衡查詢性能和數據延遲。較短的優化間隔可能會提高查詢性能,但可能會增加寫入時的開銷;較長的間隔則相反。這需要根據具體的業務需求和數據特點來調整。
合并引擎(Merge Engine)
概述
當Paimon接收器接收到兩條或更多具有相同主鍵的記錄時,它會將它們合并為一條記錄,以保持主鍵的唯一性。通過指定 merge-engine
表屬性,用戶可以選擇記錄的合并方式。
在Flink SQL表配置中,始終將 table.exec.sink.upsert-materialize
設置為 NONE
,sink upsert-materialize
可能會導致奇怪的行為。當輸入無序時,建議使用序列字段來糾正無序問題。
去重(Deduplicate)
去重合并引擎是默認的合并引擎。Paimon將只保留最新的記錄,并丟棄其他具有相同主鍵的記錄。
具體來說,如果最新的記錄是一條DELETE記錄,則所有具有相同主鍵的記錄都將被刪除。你可以配置 ignore-delete
來忽略它。
拓展:
Paimon接收器(Paimon sink):在數據處理流程中,負責將數據寫入Paimon存儲系統的組件。當它接收到數據時,需要按照特定規則處理重復主鍵的情況,以維護數據的一致性和完整性。例如,在一個實時數據處理任務中,數據從各種數據源流入,通過Flink等流處理框架處理后,由Paimon接收器寫入Paimon表中。
table.exec.sink.upsert-materialize
:這是Flink SQL表配置中的一個屬性,用于控制upsert操作(插入或更新操作)的數據物化方式。設置為NONE
可以避免一些異常行為,確保數據按照預期的方式寫入Paimon。在Flink與Paimon結合使用時,正確配置這個屬性對于保證數據處理的正確性非常重要。例如,如果設置不當,可能會導致數據重復寫入或更新異常。序列字段(Sequence Field):當數據輸入無序時,序列字段可以作為一種標記,用于確定數據的先后順序。在Paimon中,結合去重合并引擎,序列字段可以幫助系統準確判斷哪條記錄是最新的,從而正確執行去重操作。比如在一個電商訂單系統中,訂單數據可能由于網絡延遲等原因無序到達,通過為每條訂單記錄添加一個遞增的序列字段,系統可以根據序列字段的值來確定訂單的實際發生順序,進而在去重時保留最新的訂單狀態。
部分更新(Partial Update)
通過指定'merge-engine' = 'partial-update'
,用戶能夠通過多次更新來更新記錄的列,直至記錄完整。這是通過逐個更新值字段來實現的,使用相同主鍵下的最新數據。不過,在此過程中,空值不會被覆蓋。
例如,假設Paimon接收到三條記錄:
<1,?23.0,?10,?NULL>-
<1,?NULL,?NULL,?'This?is?a?book'>
<1,?25.2,?NULL,?NULL>
假設第一列是主鍵,最終結果將是<1, 25.2, 10, '這是一本書'>。
對于流查詢,部分更新合并引擎必須與查找(lookup)或完全合并變更日志生成器一起使用。(也支持‘input’變更日志生成器,但僅返回輸入記錄。)
默認情況下,部分更新不接受刪除記錄,你可以選擇以下解決方案之一:
配置‘ignore-delete’
以忽略刪除記錄。
配置‘partial-update.remove-record-on-delete’
,在接收到刪除記錄時刪除整行。
配置‘sequence-group’
以撤回部分列。
序列組(Sequence Group)
序列字段可能無法解決具有多個流更新的部分更新表的無序問題,因為在多流更新期間,序列字段可能會被另一個流的最新數據覆蓋。
因此,我們為部分更新表引入序列組機制。它可以解決:
多流更新期間的無序問題。每個流定義自己的序列組。
實現真正的部分更新,而不僅僅是非空更新。
看示例:
CREATE?TABLE?t
(k???INT,a???INT,b???INT,g_1?INT,c???INT,d???INT,g_2?INT,PRIMARY?KEY?(k)?NOT?ENFORCED
)?WITH?('merge?-?engine'?=?'partial?-?update','fields.g_1.sequence?-?group'?=?'a,b','fields.g_2.sequence?-?group'?=?'c,d');INSERT?INTO?t
VALUES?(1,?1,?1,?1,?1,?1,?1);--?g_2為空,c、d不應更新
INSERT?INTO?t
VALUES?(1,?2,?2,?2,?2,?2,?CAST(NULL?AS?INT));SELECT?*
FROM?t;
--?輸出1,?2,?2,?2,?1,?1,?1--?g_1較小,a、b不應更新
INSERT?INTO?t
VALUES?(1,?3,?3,?1,?3,?3,?3);SELECT?*
FROM?t;?--?輸出1,?2,?2,?2,?3,?3,?3
對于fields.<field-name>.sequence-group
,有效的可比較數據類型包括:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ。
你還可以在一個序列組中配置多個排序字段,如fields.<field-name1>,<field-name2>.sequence-group
,多個字段將按順序進行比較。
看示例:
CREATE?TABLE?SG
(k???INT,a???INT,b???INT,g_1?INT,c???INT,d???INT,g_2?INT,g_3?INT,PRIMARY?KEY?(k)?NOT?ENFORCED
)?WITH?('merge-engine'?=?'partial-update','fields.g_1.sequence-group'?=?'a,b','fields.g_2,g_3.sequence-group'?=?'c,d');INSERT?INTO?SG
VALUES?(1,?1,?1,?1,?1,?1,?1,?1);--?g_2、g_3不應更新
INSERT?INTO?SG
VALUES?(1,?2,?2,?2,?2,?2,?1,?CAST(NULL?AS?INT));SELECT?*
FROM?SG;
--?輸出1,?2,?2,?2,?1,?1,?1,?1--?g_1不應更新
INSERT?INTO?SG
VALUES?(1,?3,?3,?1,?3,?3,?3,?1);SELECT?*
FROM?SG;
--?輸出1,?2,?2,?2,?3,?3,?3,?1
部分更新的聚合(Aggregation For Partial Update)
你可以為輸入字段指定聚合函數,支持聚合中的所有函數。
看示例:
CREATE?TABLE?t
(k?INT,a?INT,b?INT,c?INT,d?INT,PRIMARY?KEY?(k)?NOT?ENFORCED
)?WITH?('merge-engine'?=?'partial-update','fields.a.sequence-group'?=?'b','fields.b.aggregate-function'?=?'first_value','fields.c.sequence-group'?=?'d','fields.d.aggregate-function'?='sum');
INSERT?INTO?t
VALUES?(1,?1,?1,?CAST(NULL?AS?INT),?CAST(NULL?AS?INT));
INSERT?INTO?t
VALUES?(1,?CAST(NULL?AS?INT),?CAST(NULL?AS?INT),?1,?1);
INSERT?INTO?t
VALUES?(1,?2,?2,?CAST(NULL?AS?INT),?CAST(NULL?AS?INT));
INSERT?INTO?t
VALUES?(1,?CAST(NULL?AS?INT),?CAST(NULL?AS?INT),?2,?2);SELECT?*
FROM?t;?--?輸出1,?2,?1,?2,?3
你還可以為多個排序字段內的序列組配置聚合函數。
看示例:
CREATE?TABLE?AGG
(k???INT,a???INT,b???INT,g_1?INT,c???VARCHAR,g_2?INT,g_3?INT,PRIMARY?KEY?(k)?NOT?ENFORCED
)?WITH?('merge-engine'?=?'partial-update','fields.a.aggregate-function'?='sum','fields.g_1,g_3.sequence-group'?=?'a','fields.g_2.sequence-group'?=?'c');
--?a在序列組g_1、g_3中,使用sum聚合
--?b不在序列組中
--?c在序列組g_2中,無聚合INSERT?INTO?AGG
VALUES?(1,?1,?1,?1,?'1',?1,?1);--?g_2不應更新
INSERT?INTO?AGG
VALUES?(1,?2,?2,?2,?'2',?CAST(NULL?AS?INT),?2);SELECT?*
FROM?AGG;
--?輸出1,?3,?2,?2,?"1",?1,?2--?g_1、g_3不應更新
INSERT?INTO?AGG
VALUES?(1,?3,?3,?2,?'3',?3,?1);SELECT?*
FROM?AGG;
--?輸出1,?6,?3,?2,?"3",?3,?2
你可以使用fields.default-aggregate-function
為所有輸入字段指定默認聚合函數,看示例:
CREATE?TABLE?t
(k?INT,a?INT,b?INT,c?INT,d?INT,PRIMARY?KEY?(k)?NOT?ENFORCED
)?WITH?('merge-engine'?=?'partial-update','fields.a.sequence-group'?=?'b','fields.c.sequence-group'?=?'d','fields.default-aggregate-function'?=?'last_non_null_value','fields.d.aggregate-function'?='sum');INSERT?INTO?t
VALUES?(1,?1,?1,?CAST(NULL?AS?INT),?CAST(NULL?AS?INT));
INSERT?INTO?t
VALUES?(1,?CAST(NULL?AS?INT),?CAST(NULL?AS?INT),?1,?1);
INSERT?INTO?t
VALUES?(1,?2,?2,?CAST(NULL?AS?INT),?CAST(NULL?AS?INT));
INSERT?INTO?t
VALUES?(1,?CAST(NULL?AS?INT),?CAST(NULL?AS?INT),?2,?2);SELECT?*
FROM?t;?--?輸出1,?2,?2,?2,?3
拓展:
部分更新合并引擎(Partial Update Merge Engine):在數據庫操作中,部分更新合并引擎允許用戶對記錄的特定列進行逐步更新,而不是一次性更新整個記錄。這種機制在處理需要多次累積更新的場景時非常有用,比如在一個用戶信息表中,用戶可能分多次更新自己的不同信息字段,使用部分更新合并引擎可以確保每次更新都能正確累積,同時避免覆蓋空值。
序列組(Sequence Group):這是為解決多流更新場景下部分更新表的無序問題而引入的機制。通過為不同的字段組定義序列組,每個序列組內的數據可以按照特定規則進行更新,保證更新的準確性和順序性。例如在一個包含多個屬性的設備狀態表中,不同的屬性可能來自不同的數據流,通過序列組可以確保每個數據流對應的屬性更新按照正確順序進行,避免數據混亂。
聚合函數(Aggregation Function):在部分更新的場景下,聚合函數用于對具有相同主鍵的不同記錄中的字段值進行聚合操作。如示例中展示的
sum
(求和)、first_value
(取第一個值)等函數,通過這些函數可以根據業務需求對數據進行合并處理。在銷售數據統計中,可以使用sum
函數對具有相同產品ID的不同銷售記錄的銷售額進行求和,從而得到該產品的總銷售額。默認聚合函數(Default Aggregation Function):通過
fields.default-aggregate-function
設置的默認聚合函數,為所有未單獨指定聚合函數的輸入字段提供統一的聚合規則。這在批量處理具有相似聚合需求的字段時非常方便,減少了配置的復雜性。例如在一個包含多個數值字段的統計表格中,如果大部分字段都需要使用last_non_null_value
函數進行聚合,設置默認聚合函數可以避免為每個字段單獨配置。
聚合(Aggregation)
注意:在Flink SQL表配置中,始終將 table.exec.sink.upsert-materialize
設置為 NONE
。
有時用戶只關心聚合結果。聚合合并引擎會根據聚合函數,對相同主鍵下的每個值字段逐個與最新數據進行聚合。
每個非主鍵字段都可以指定一個聚合函數,通過 fields.<field-name>.aggregate-function
表屬性指定,否則將默認使用 last_non_null_value
聚合。例如,考慮以下表定義:
Flink:
CREATE?TABLE?my_table?(product_id?BIGINT,price?DOUBLE,sales?BIGINT,PRIMARY?KEY?(product_id)?NOT?ENFORCED
)?WITH?('merge-engine'?=?'aggregation','fields.price.aggregate-function'?='max','fields.sales.aggregate-function'?='sum'
);
price 字段將通過 max 函數進行聚合,sales 字段將通過 sum 函數進行聚合。給定兩條輸入記錄 <1, 23.0, 15> 和 <1, 30.2, 20>,最終結果將是 <1, 30.2, 35>。
聚合函數(Aggregation Functions)
目前支持的聚合函數和數據類型如下:
sum
sum 函數對多行的值進行求和。它支持 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT 和 DOUBLE 數據類型。
product
product 函數可以計算多行的乘積值。它支持 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT 和 DOUBLE 數據類型。
count
在需要統計滿足特定條件的行數的場景中,可以使用 SUM 函數來實現。通過將條件表示為布爾值(TRUE 或 FALSE)并將其轉換為數值,就可以有效地統計行數。在這種方法中,TRUE 轉換為 1,FALSE 轉換為 0。
例如,如果有一個 orders 表,想要統計滿足特定條件的行數,可以使用以下查詢:
SELECT?SUM(CASE?WHEN?condition?THEN?1?ELSE?0?END)?AS?count
FROM?orders;
max
max 函數識別并保留最大值。它支持 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ 數據類型。
min
min 函數識別并保留最小值。它支持 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ 數據類型。
last_value
last_value 函數用最近導入的值替換先前的值。它支持所有數據類型。
last_non_null_value
last_non_null_value 函數用最新的非空值替換先前的值。它支持所有數據類型。
listagg
listagg 函數將多個字符串值連接成一個字符串。它支持 STRING 數據類型。每個非主鍵字段都可以指定一個列表聚合分隔符,通過 fields.<field-name>.list-agg-delimiter
表屬性指定,否則將默認使用 “,”。
bool_and
bool_and 函數評估布爾集合中的所有值是否都為 true。它支持 BOOLEAN 數據類型。
bool_or
bool_or 函數檢查布爾集合中是否至少有一個值為 true。它支持 BOOLEAN 數據類型。
first_value
first_value 函數從數據集中檢索第一個空值。它支持所有數據類型。
first_non_null_value
first_non_null_value 函數選擇數據集中的第一個非空值。它支持所有數據類型。
rbm32
rbm32 函數將多個序列化的 32 位 RoaringBitmap 聚合為一個 RoaringBitmap。它支持 VARBINARY 數據類型。
rbm64
rbm64 函數將多個序列化的 64 位 Roaring64Bitmap 聚合為一個 Roaring64Bitmap。它支持 VARBINARY 數據類型。
nested_update
nested_update 函數將多行收集到一個數組中(即所謂的 “嵌套表”)。它支持 ARRAY 數據類型。
使用 fields.<field-name>.nested-key = pk0,pk1,...
來指定嵌套表的主鍵。如果沒有指定鍵,行將追加到數組中。
collect
collect 函數將元素收集到一個數組中。可以設置 fields.<field-name>.distinct = true
來對元素進行去重。它僅支持 ARRAY 類型。
merge_map
merge_map 函數合并輸入的映射。它僅支持 MAP 類型。
基數草圖類型(Types of cardinality sketches)
Paimon 使用 Apache DataSketches 隨機流算法庫來實現草圖模塊。DataSketches 庫包含各種類型的草圖,每種草圖都旨在解決不同類型的問題。Paimon 支持 HyperLogLog(HLL)和 Theta 基數草圖。
HyperLogLog
HyperLogLog(HLL)草圖聚合器是一種非常緊湊的草圖算法,用于近似基數統計(計算不同值的數量)。你還可以使用 HLL 聚合器來計算 HLL 草圖的并集。
Theta
Theta 草圖是一種用于帶有集合操作的近似基數統計的草圖算法。Theta 草圖允許你計算集合之間的重疊部分,以便你可以計算草圖對象之間的并集、交集或差集。
選擇草圖類型
HLL 和 Theta 草圖都支持近似基數統計;然而,HLL 草圖產生的結果更準確,并且消耗的存儲空間更少。Theta 草圖更靈活,但需要顯著更多的內存。
在為你的用例選擇近似算法時,請考慮以下幾點:
如果你的用例需要基數統計和合并草圖對象,請使用 HLL 草圖。
如果你需要評估并集、交集或差集操作,請使用 Theta 草圖。
你不能將 HLL 草圖與 Theta 草圖合并。
hll_sketch
hll_sketch 函數將多個序列化的 Sketch 對象聚合為一個 Sketch。它支持 VARBINARY 數據類型。
theta_sketch
theta_sketch 函數將多個序列化的 Sketch 對象聚合為一個 Sketch。它支持 VARBINARY 數據類型。
例如:
Flink:
--?源表
CREATE?TABLE?VISITS?(id?INT?PRIMARY?KEY?NOT?ENFORCED,user_id?STRING
);--?聚合表
CREATE?TABLE?UV_AGG?(id?INT?PRIMARY?KEY?NOT?ENFORCED,uv?VARBINARY
)?WITH?('merge-engine'?=?'aggregation','fields.f0.aggregate-function'?=?'theta_sketch'
);--?將以下類注冊為名為?"THETA_SKETCH"?的?Flink?函數,
--?該函數用于將輸入轉換為草圖字節數組:
--
--?public?static?class?ThetaSketchFunction?extends?ScalarFunction?{
--???public?byte[]?eval(String?user_id)?{
--?????UpdateSketch?updateSketch?=?UpdateSketch.builder().build();
--?????updateSketch.update(user_id);
--?????return?updateSketch.compact().toByteArray();
--???}
--?}
--
INSERT?INTO?UV_AGG?SELECT?id,?THETA_SKETCH(user_id)?FROM?VISITS;--?將以下類注冊為名為?"THETA_SKETCH_COUNT"?的?Flink?函數,
--?該函數用于從草圖字節數組中獲取基數:
--?
--?public?static?class?ThetaSketchCountFunction?extends?ScalarFunction?{?
--???public?Double?eval(byte[]?sketchBytes)?{
--?????if?(sketchBytes?==?null)?{
--???????return?0d;?
--?????}?
--?????return?Sketches.wrapCompactSketch(Memory.wrap(sketchBytes)).getEstimate();?
--???}?
--?}
--
--?然后我們可以根據聚合字段獲取用戶基數。
SELECT?id,?THETA_SKETCH_COUNT(UV)?as?uv?FROM?UV_AGG;
對于流查詢,聚合合并引擎必須與查找(lookup)或完全合并變更日志生成器一起使用。(也支持 ‘input’ 變更日志生成器,但僅返回輸入記錄。)
回撤(Retraction)
只有 sum、product、collect、merge_map、nested_update、last_value 和 last_non_null_value 支持回撤(UPDATE_BEFORE 和 DELETE),其他聚合函數不支持回撤。如果你允許某些函數忽略回撤消息,可以配置:'fields.${field_name}.ignore-retract' = 'true'
。
last_value 和 last_non_null_value 在接收到回撤消息時僅將字段設置為 null。
collect 和 merge_map 盡力嘗試處理回撤消息,但不能保證結果準確。在處理回撤消息時可能會出現以下行為:
如果記錄無序,可能無法處理回撤消息。例如,表使用 collect,上游分別發送 +I['A', 'B'] 和 -U['A']。如果表先接收到 -U['A'],則無法處理;然后接收到 +I['A', 'B'],合并結果將是 +I['A', 'B'] 而不是 +I['B']。
來自一個上游的回撤消息將回撤從多個上游合并的結果。例如,表使用 merge_map,一個上游發送 +I[1 -> A],另一個上游發送 +I[1 -> B],隨后發送 -D[1 -> B]。表將首先將兩個插入值合并為 +I[1 -> B],然后 -D[1 -> B] 將回撤整個結果,因此最終結果是一個空映射而不是 +I[1 -> A]。
拓展:
聚合合并引擎(Aggregation Merge Engine):在數據處理中,聚合合并引擎負責按照指定的聚合函數對具有相同主鍵的數據進行聚合操作,以生成最終的聚合結果。這在數據分析和統計場景中廣泛應用,例如在銷售數據分析中,通過聚合合并引擎可以按產品ID統計不同產品的總銷售額、最高價格等信息。
Flink SQL 配置:在使用Flink與Paimon結合進行數據處理時,正確配置Flink SQL的相關屬性(如
table.exec.sink.upsert-materialize
設置為NONE
)對于確保數據處理的準確性和一致性至關重要。這些配置會影響數據的寫入方式和聚合操作的執行邏輯。Apache DataSketches 庫:這是一個用于概率算法和數據草圖的開源庫,提供了高效的近似計算方法,如基數統計等。在Paimon中使用該庫實現的HLL和Theta草圖,能夠在處理大規模數據時,以較小的空間開銷和計算成本獲得近似的統計結果。例如,在統計網站的獨立訪客數量(基數統計)時,使用HLL草圖可以在不存儲所有訪客ID的情況下,快速得到較為準確的訪客數量估計。
回撤(Retraction):在流數據處理中,回撤表示對之前發送的消息進行撤銷或修正。例如,由于數據更新或刪除操作,需要撤回之前插入或更新的數據。不同的聚合函數對回撤的支持情況不同,了解這些差異對于正確處理流數據中的變更非常重要。在實時數據分析場景中,如果不妥善處理回撤消息,可能會導致數據的不一致或錯誤的統計結果。
首行(First Row)
通過指定'merge-engine' = 'first-row'
,用戶可以保留具有相同主鍵的首行記錄。它與去重合并引擎的不同之處在于,首行合并引擎只會生成僅插入的變更日志。
不能指定序列字段(sequence.field)。
不接受 DELETE 和 UPDATE_BEFORE 消息。你可以配置
ignore-delete
來忽略這兩種記錄。可見性保證:使用首行合并引擎的表,L0 級別的文件只有在合并后才會可見。所以默認情況下,合并是同步的,如果開啟異步,數據可能會有延遲。
這在流計算中替代日志去重方面有很大幫助。
拓展:
首行合并引擎(First Row Merge Engine):在數據處理場景中,該引擎為處理具有相同主鍵的數據提供了一種特定的合并策略,即保留首條記錄。這種策略在某些特定業務場景下很有用,比如在記錄事件的先后順序且只關心最早發生的事件記錄時,首行合并引擎能確保保留所需數據。與去重合并引擎相比,它生成的變更日志只有插入操作,簡化了數據變更的記錄方式。
序列字段(sequence.field):在數據處理過程中,序列字段常用于標識數據的順序或版本等信息。在首行合并引擎中不允許指定序列字段,這是因為該引擎的設計初衷就是簡單地保留首條記錄,序列字段可能會引入額外的復雜性,與引擎的簡潔設計原則不符。
可見性保證(Visibility Guarantee):在數據存儲和處理系統中,可見性保證定義了數據何時對查詢操作可見。對于使用首行合并引擎的表,L0 級文件在合并后才可見,這是為了確保數據的一致性和完整性。同步合并可以即時保證數據的可見性,但可能會影響寫入性能;而異步合并雖然能提高寫入性能,但會導致數據可見延遲,用戶需要根據實際業務需求權衡選擇。在實時性要求較高的業務場景中,可能更傾向于同步合并;而在對寫入性能要求較高、對數據可見延遲有一定容忍度的場景中,異步合并可能是更好的選擇。
變更日志生產者(Changelog Producer)
流寫入能夠持續為流讀取生成最新的變更。
在創建表時通過指定 changelog-producer
表屬性,用戶可以選擇從表文件中生成變更的模式。
changelog-producer
可能會顯著降低合并性能,除非必要,請勿啟用它。
無(None)
默認情況下,不會對表的寫入器應用額外的變更日志生成器。Paimon 源只能看到跨快照的合并變更,比如哪些鍵被移除以及某些鍵的新值是什么。
然而,這些合并變更無法形成完整的變更日志,因為我們無法直接從它們當中讀取鍵的舊值。合并變更要求消費者 “記住” 每個鍵的值,并且在看不到舊值的情況下重寫值。然而,一些消費者需要舊值來確保正確性或提高效率。
考慮一個消費者,它對某些分組鍵(可能與主鍵不相等)計算總和。如果消費者只看到新值 5,它無法確定應該將什么值加到求和結果中。例如,如果舊值是 4,它應該將 1 加到結果中。但如果舊值是 6,它反而應該從結果中減去 1。對于這類消費者,舊值很重要。
總而言之,無變更日志生成器最適合像數據庫系統這樣的消費者。Flink 也有一個內置的 “規范化(normalize)” 算子,它會在狀態中持久化每個鍵的值。可以很容易看出,這個算子成本很高,應該避免使用。(你可以通過 'scan.remove-normalize'
強制移除 “規范化” 算子。)
輸入(Input)
通過指定 'changelog-producer' = 'input'
,Paimon 寫入器依賴其輸入作為完整變更日志的來源。所有輸入記錄都將保存在單獨的變更日志文件中,并由 Paimon 源提供給消費者。
當 Paimon 寫入器的輸入是完整的變更日志時,可以使用輸入變更日志生成器,例如來自數據庫變更數據捕獲(CDC),或者由 Flink 有狀態計算生成的情況。
查找(Lookup)
如果你的輸入無法生成完整的變更日志,但你仍然希望擺脫成本高昂的規范化算子,你可以考慮使用 lookup
變更日志生成器。
通過指定 'changelog-producer' = 'lookup'
,Paimon 將在提交數據寫入之前通過 “查找” 生成變更日志。
Lookup 會在內存和本地磁盤上緩存數據,你可以使用以下選項來優化性能:
選項 | 默認值 | 類型 | 描述 |
---|---|---|---|
lookup.cache-file-retention | 1 小時 | 持續時間 | 查找緩存文件的保留時間。文件過期后,如果需要訪問,將從分布式文件系統(DFS)重新讀取以在本地磁盤上構建索引。 |
lookup.cache-max-disk-size | 無限制 | 內存大小 | 查找緩存的最大磁盤大小,你可以使用此選項限制本地磁盤的使用。 |
lookup.cache-max-memory-size | 256 mb | 內存大小 | 查找緩存的最大內存大小。 |
查找變更日志生成器支持 changelog-producer.row-deduplicate
以避免為相同記錄生成 -U
,+U
變更日志。
(注意:請增加 Flink 配置 'execution.checkpointing.max-concurrent-checkpoints'
,這對性能非常重要。)
完全合并(Full Compaction)
如果你認為 lookup
的資源消耗過大,可以考慮使用 full-compaction
變更日志生成器,它可以解耦數據寫入和變更日志生成,更適合高延遲的場景(例如,10 分鐘)。
通過指定 'changelog-producer' = 'full-compaction'
,Paimon 將比較完全合并之間的結果,并將差異作為變更日志生成。變更日志的延遲受完全合并頻率的影響。
通過指定 full-compaction.delta-commits
表屬性,在增量提交(檢查點)后將不斷觸發完全合并。默認設置為 1,所以每個檢查點都會進行一次完全合并并生成一個變更日志。
完全合并變更日志生成器可以為任何類型的源生成完整的變更日志。然而,它不像輸入變更日志生成器那樣高效,并且生成變更日志的延遲可能較高。
完全合并變更日志生成器支持 changelog-producer.row-deduplicate
以避免為相同記錄生成 -U
,+U
變更日志。
(注意:請增加 Flink 配置 'execution.checkpointing.max-concurrent-checkpoints'
,這對性能非常重要。)
拓展:
變更日志生成器(Changelog Producer):在數據處理流程中,變更日志生成器負責生成記錄數據變更的日志,這些日志對于流讀取獲取最新數據狀態至關重要。不同的生成模式(None、Input、Lookup、Full Compaction)適用于不同的業務場景和數據來源,開發者需要根據實際情況選擇,以平衡數據處理的效率、準確性和資源消耗。
規范化算子(Normalize Operator):在Flink中,規范化算子用于在沒有完整變更日志舊值的情況下,通過在狀態中持久化每個鍵的值來處理合并變更。然而,這種方式資源消耗較大,在使用
None
變更日志生成器模式時,為了避免其高成本,可以通過'scan.remove-normalize'
強制移除。這體現了在數據處理中需要根據具體需求權衡不同組件的使用。Lookup:
Lookup
變更日志生成器通過在內存和本地磁盤緩存數據,在寫入前通過查找生成完整變更日志。合理配置其相關緩存參數(如lookup.cache-file-retention
、lookup.cache-max-disk-size
、lookup.cache-max-memory-size
)對于優化性能很關鍵,同時注意配合調整Flink的'execution.checkpointing.max-concurrent-checkpoints'
配置。完全合并(Full Compaction):適用于對延遲有一定容忍度且希望解耦數據寫入和變更日志生成的場景。通過比較完全合并結果生成變更日志,雖然能為各種數據源生成完整日志,但效率相對較低且延遲較高。同樣,使用時需關注相關配置及對Flink配置的調整以提升性能。
序列與行類型(Sequence and Rowkind)
創建表時,你可以通過指定字段來指定'sequence.field'
以確定更新順序,或者指定'rowkind.field'
來確定記錄的變更日志類型。
序列字段(Sequence Field)
默認情況下,主鍵表根據輸入順序確定合并順序(最后輸入的記錄將最后合并)。然而,在分布式計算中,會出現一些導致數據無序的情況。此時,你可以使用一個時間字段作為序列字段,例如:
Flink:
CREATE?TABLE?my_table?(pk?BIGINT?PRIMARY?KEY?NOT?ENFORCED,v1?DOUBLE,v2?BIGINT,update_time?TIMESTAMP
)?WITH?('sequence.field'?=?'update_time'
);
序列字段值最大的記錄將最后合并,如果值相同,則使用輸入順序來確定哪條記錄最后合并。序列字段支持所有數據類型的字段。
你可以為序列字段定義多個字段,例如'update_time,flag'
,多個字段將按順序進行比較。
用戶定義的序列字段與諸如first_row
和first_value
等特性沖突,這可能會導致意外結果。
行類型字段(Row Kind Field)
默認情況下,主鍵表根據輸入行確定行類型。你也可以定義'rowkind.field'
,使用一個字段來提取行類型。
有效的行類型字符串應為'+I'
、'-U'
、'+U'
或'-D'
。
拓展:
序列字段(Sequence Field):在分布式數據處理場景中,由于網絡延遲、并行計算等因素,數據到達的順序可能與實際期望的處理順序不一致。通過指定序列字段,可以明確數據的合并順序,確保數據處理結果的準確性。例如在一個實時訂單處理系統中,訂單數據可能會因為網絡波動而無序到達,使用訂單的更新時間作為序列字段,就可以保證按照時間先后順序處理訂單數據,避免數據更新錯誤。
行類型字段(Row Kind Field):在數據變更日志中,不同的行類型(
+I
表示插入,-U
表示刪除更新前數據,+U
表示更新,'-D'
表示刪除)用于準確記錄數據的變化情況。通過定義行類型字段,可以從數據記錄中提取這些變更類型信息,方便下游數據處理邏輯根據不同的變更類型進行相應操作。例如在數據同步過程中,接收方可以根據行類型字段確定是進行插入、更新還是刪除操作,保證數據的一致性。特性沖突(Feature Conflict):用戶定義的序列字段與
first_row
、first_value
等特性沖突,這是因為first_row
、first_value
等特性已經有其固定的邏輯來確定數據的處理方式,而序列字段的引入改變了數據的默認處理順序,可能導致這些特性無法按預期工作。在實際應用中,開發者需要注意這些特性之間的相互影響,根據業務需求合理選擇和配置相關參數,以避免出現意外結果。
合并(Compaction)
當越來越多的記錄寫入日志結構合并樹(LSM tree)時,有序段(sorted runs)的數量會增加。由于查詢LSM樹需要合并所有的有序段,過多的有序段會導致查詢性能變差,甚至出現內存溢出。
為了限制有序段的數量,我們需要時不時地將幾個有序段合并成一個大的有序段。這個過程就稱為合并。
然而,合并是一個資源密集型的過程,會消耗一定的CPU時間和磁盤I/O,所以過于頻繁的合并反而可能導致寫入速度變慢。這是查詢性能和寫入性能之間的一種權衡。Paimon目前采用了類似于Rocksdb通用合并的策略。
合并解決的問題
減少L0層文件數量,避免查詢性能下降。
通過變更日志生成器(changelog-producer)生成變更日志。
為寫時合并(MOW)模式生成刪除向量。
快照過期、標簽過期、分區過期。
限制
同一分區的合并只能有一個作業進行,否則會導致沖突,一方會拋出異常失敗。
寫入性能幾乎總是會受到合并的影響,因此對其進行調優至關重要。
異步合并(Asynchronous Compaction)
合并本質上是異步的,但如果你希望它完全異步且不阻塞寫入,期望一種能獲得最大寫入吞吐量的模式,合并可以緩慢且不急于完成。你可以為你的表使用以下策略:
num-sorted-run.stop-trigger?=?2147483647
sort-spill-threshold?=?10
lookup-wait?=?false
這種配置會在寫入高峰期生成更多文件,并在寫入低谷期逐漸合并以達到最佳讀取性能。
專用合并作業(Dedicated compaction job)
一般來說,如果你期望多個作業寫入同一個表,就需要分離合并操作。你可以使用專用合并作業。
記錄級過期(Record-Level expire)
在合并過程中,你可以配置記錄級過期時間來使記錄過期,你應該配置:
'record-level.expire-time'
:記錄保留的時間。'record-level.time-field'
:記錄級過期的時間字段。'record-level.time-field-type'
:記錄級過期的時間字段類型,可以是seconds-int
(秒為單位的整數)或millis-long
(毫秒為單位的長整型)。
過期操作在合并時發生,并不能強有力地保證記錄及時過期。
完全合并(Full Compaction)
Paimon合并使用通用合并(Universal-Compaction)。默認情況下,當增量數據過多時,會自動執行完全合并。通常你不必為此擔心。
Paimon還提供了一些配置,允許定期執行完全合并:
‘compaction.optimization-interval’
:表示每隔多久執行一次優化型完全合并,此配置用于確保讀優化系統表的查詢及時性。‘full-compaction.delta-commits’
:增量提交(delta commits)后會不斷觸發完全合并。其缺點是只能同步執行合并,這會影響寫入效率。
合并選項(Compaction Options)
暫停寫入的有序段數量(Number of Sorted Runs to Pause Writing)
當有序段數量較少時,Paimon寫入器會在單獨的線程中異步執行合并,這樣記錄可以持續寫入表中。然而,為了避免有序段無限制增長,當有序段數量達到閾值時,寫入器會暫停寫入。以下表屬性決定這個閾值:
選項 | 是否必需 | 默認值 | 類型 | 描述 |
---|---|---|---|---|
num-sorted-run.stop-trigger | 否 | (none) | Integer | 觸發停止寫入的有序段數量,默認值是 |
num-sorted-run.stop-trigger
的值越大,寫入暫停的頻率就越低,從而提高寫入性能。但是,如果這個值太大,查詢表時將需要更多的內存和CPU時間。如果你擔心內存溢出(OOM)問題,請配置以下選項。其值取決于你的內存大小。
選項 | 是否必需 | 默認值 | 類型 | 描述 |
---|---|---|---|---|
sort-spill-threshold | 否 | (none) | Integer | 如果排序讀取器的最大數量超過此值,將嘗試進行溢出操作。這可以防止過多的讀取器消耗過多內存導致OOM。 |
觸發合并的有序段數量(Number of Sorted Runs to Trigger Compaction)
Paimon使用的LSM樹支持大量更新操作。LSM將文件組織成多個有序段。從LSM樹查詢記錄時,必須合并所有有序段才能生成所有記錄的完整視圖。
很容易看出,過多的有序段會導致查詢性能不佳。為了將有序段數量保持在合理范圍內,Paimon寫入器會自動執行合并。以下表屬性決定觸發合并的最小有序段數量:
選項 | 是否必需 | 默認值 | 類型 | 描述 |
---|---|---|---|---|
num-sorted-run.compaction-trigger | 否 | 5 | Integer | 觸發合并的有序段數量。包括L0層文件(一個文件一個有序段)和高層有序段(一層一個有序段)。 |
num-sorted-run.compaction-trigger
的值越大,合并頻率就越低,從而提高寫入性能。但是,如果這個值太大,查詢表時將需要更多的內存和CPU時間。這是寫入性能和查詢性能之間的一種權衡。
拓展:
日志結構合并樹(LSM Tree)與合并(Compaction):LSM樹是一種適合高寫入負載的存儲結構,通過將寫入操作先記錄在日志中,然后定期合并成更大的有序段來優化存儲。合并在這個過程中起著關鍵作用,它不僅能控制有序段數量以提升查詢性能,還負責處理數據過期、生成變更日志等重要任務。例如,在一個大規模的日志數據存儲系統中,隨著日志數據不斷寫入LSM樹,合并操作可以定期清理過期的日志記錄,同時合并小的有序段,使得查詢特定時間段的日志數據時能更高效地進行。
異步合并(Asynchronous Compaction)策略:采用異步合并策略可以在不阻塞寫入操作的前提下,逐漸優化數據存儲結構以提升讀取性能。通過調整
num-sorted-run.stop-trigger
、sort-spill-threshold
和lookup-wait
等參數,系統可以根據業務場景的讀寫特點進行靈活配置。例如,在一個實時數據寫入頻繁但對讀取實時性要求相對較低的物聯網數據采集系統中,適當增大num-sorted-run.stop-trigger
的值可以減少寫入暫停次數,提高寫入吞吐量,而在寫入低谷期再進行有序段的合并優化讀取性能。專用合并作業(Dedicated Compaction Job):當多個作業同時寫入同一個表時,使用專用合并作業可以避免合并操作之間的沖突,確保數據的一致性和系統的穩定性。這在多用戶或多模塊同時對同一數據集進行操作的復雜應用場景中尤為重要。比如在一個企業級的數據倉庫中,不同部門的數據分析任務可能同時向同一個表寫入數據,專用合并作業可以保證每個部門的寫入操作不受其他部門合并操作的干擾。
記錄級過期(Record-Level Expire):在合并過程中配置記錄級過期,使得系統可以根據業務需求自動清理過期數據,節省存儲空間。通過設置
'record-level.expire-time'
、'record-level.time-field'
和'record-level.time-field-type'
等參數,用戶可以靈活控制哪些記錄在什么時間過期。例如,在一個存儲用戶會話信息的系統中,可以設置會話記錄在一定時間(如24小時)后過期,以保護用戶隱私并減少無用數據的存儲。合并選項調優(Compaction Options Tuning):
num-sorted-run.stop-trigger
和num-sorted-run.compaction-trigger
等合并選項的調整涉及到寫入性能和查詢性能的平衡。在實際應用中,需要根據系統的硬件資源(如內存大小)和業務需求(如讀寫頻率、數據量等)來合理設置這些參數。例如,對于內存有限且寫入操作頻繁的系統,適當降低num-sorted-run.stop-trigger
的值可以避免因過多有序段導致的內存溢出問題,同時合理調整num-sorted-run.compaction-trigger
的值以確保查詢性能不會受到太大影響。