引言:流批一體,理想與現實的鴻溝
在數據驅動的今天,“實時”二字仿佛擁有魔力,驅使著無數企業投身于流批一體架構的建設浪潮中。我們渴望實時洞察業務變化,實時響應用戶需求。以 Apache Flink 為代表的流處理引擎,以其強大的功能和極低的延遲,為我們描繪了一幅美好的實時數據藍圖。
然而,理想通往現實的道路往往布滿荊棘。對于許多企業,尤其是IT能力和研發資源并非頂尖的公司而言,構建和維護一套基于 Flink 的流批一體平臺,往往意味著一場“甜蜜的煩惱”:我們得到了實時性,卻也背上了高昂的復雜度和成本。
有沒有一種更簡潔、更優雅的方式來實現流批一體?答案是肯定的。隨著數據庫技術的“文藝復興”,Cloudberry 數據庫中實現的增量物化視圖(Incremental Materialized View, IVM)為代表的“庫內流處理”技術,正成為一把剃除繁雜、直達問題核心的“奧卡姆剃刀”。本文將深入探討這一技術,以及它為何可能成為更多企業流批一體實踐的主流選擇。
傳統流批一體的“重”:Flink 的強大與負擔
在我們探討新范式之前,必須正視現有主流方案的挑戰。以 Flink 為核心的流批一體架構通常遵循下圖中的模式,本次我們主要探討的是有業務狀態變更的場景,這種場景是需要提供源端數據庫的事務保證的,必須提供“單一事實來源”;而事件類的場景,如日志、行為數據、IOT數據則可以直接由應用將消息數據推送給Kafka,這種場景并非數據庫的主戰場,故不在本次討論范圍內。
[圖片]
這個架構功能強大,但其“重量”也體現在多個方面:
- 架構的“縫合感”與高昂運維:整個數據鏈路需要“縫合”多個獨立的分布式系統:應用、MySQL、CDC工具、Kafka、Flink,以及最終的數據湖/數倉。每一個組件都需要專業的知識進行部署、監控和維護,任何一個環節的故障都可能導致整個鏈路的中斷。
- 開發的“雙重負擔”:在經典的 Lambda 架構中,為了保證結果的最終一致性,團隊往往需要維護兩套異構的代碼:一套 Flink 的流處理邏輯,和一套 Spark/Hive 的批處理邏輯。相同的業務口徑,雙份的開發和測試工作,這不僅成本高昂,也極易導致邏輯不一致。
- 技術的“陡峭曲線”:精通 Flink 絕非易事。其背后的狀態管理、時間語義(事件時間/處理時間)、水印(Watermark)、窗口機制以及性能調優,都需要一個高度專業化的團隊來駕馭,這對很多企業來說是一種奢侈。
化繁為簡:增量物化視圖如何重塑流批一體?
面對傳統方案的復雜性,Cloudberry 等現代數據平臺提出了一個新的思路:
為什么不讓最擅長管理數據的數據庫,自己來處理流式計算呢? 這就是“庫內流批一體”的核心思想,其實現如下圖所示。
[圖片]
增量物化視圖(IVM)是實現這一范式的核心武器。它本質上是一個“活”的、能自動更新的查詢結果緩存。
- “批”處理:當你首次執行 CREATE INCREMENTAL MATERIALIZED VIEW 時,Cloudberry 數據庫會對所有存量歷史數據進行一次全量計算,生成視圖的初始狀態。這,就是批處理。
- “流”處理:創建完成后,IVM 引擎開始工作。任何對源表(通常是實時數據流入的 Heap 表)的 INSERT, UPDATE, DELETE 操作,都會被 IVM 捕捉到。引擎只會計算這些“增量”數據對結果的影響,并以準實時的方式(延遲在亞秒到秒級)更新物化視圖。這,就是流式處理。
這一切帶來的改變是立竿見影: 原本復雜的數據流,需要定義Kafka的數據結構和難以復用的Flink的數據結構,以及各種復雜的Flink SQL 代碼(包括定義數據源、窗口、聚合邏輯、維表關聯、結果表等)才能完成的任務,如:
//Kafka數據結構
{
“sales_id”: 8435,
“event_type”: “+I”,
“event_time”: “2025-06-27 07:53:21Z”,
“ticket_number”: 8619628,
“item_sk”: 6687,
“customer_sk”: 69684,
“store_sk”: 238,
“quantity”: 6,
“sales_price”: 179.85,
“ext_sales_price”: 1079.1,
“net_profit”: 672,
“event_source”: “CDC-TO-KAFKA-FIXED”
}
CDC同步給Kafka的數據結構必須由原本的SQL形態轉換成Json形態,但這又無法避免,因為Flink在處理流式數據之前需要這些數據是能持久化的,避免數據在傳輸中丟失,從而影響數據處理的正確性,并且也便于出現問題后的重新執行。
下面的代碼只是呈現Flink在做流式計算的示例,而在實際應用中CDC -> Kafka,和Kafka ->Flink的過程中還要做大量的代碼和配置。
//創建TPC-DS店鋪業績聚合結果輸出表(輸出到控制臺)
CREATE TABLE store_daily_performance (
window_start TIMESTAMP(3), – 窗口開始時間
window_end TIMESTAMP(3), – 窗口結束時間
s_store_sk INT, – TPC-DS店鋪代理鍵
s_store_name STRING, – TPC-DS店鋪名稱
s_state STRING, – TPC-DS州/省份
s_market_manager STRING, – TPC-DS市場經理
sale_date STRING, – 銷售日期
– TPC-DS核心業務指標
total_sales_amount DECIMAL(10,2), – 總銷售額
total_net_profit DECIMAL(10,2), – 總凈利潤
total_items_sold BIGINT, – 總商品數量
transaction_count BIGINT, – 交易筆數
avg_sales_price DECIMAL(7,2), – 平均銷售價格
– 統計時間
process_time TIMESTAMP_LTZ(3) – 處理時間
) WITH (
‘connector’=‘print’,
‘print-identifier’=‘TPCDS-STORE-PERFORMANCE’
);
//核心聚合查詢:實現類似增量聚合效果
INSERT INTO store_daily_performance
SELECT
– 時間窗口信息
window_start,
window_end,
– TPC-DS維度信息
s.ss_store_sk,
COALESCE(sd.s_store_name, CONCAT(‘Store #’, CAST(s.ss_store_sk AS STRING))) as s_store_name,
COALESCE(sd.s_state, ‘Unknown’) as s_state,
COALESCE(sd.s_market_manager, ‘Unknown Manager’) as s_market_manager,
DATE_FORMAT(window_start, ‘yyyy-MM-dd’) as sale_date,
– TPC-DS核心業務指標聚合
SUM(CASEWHEN s.event_type =‘+I’ THEN s.ss_ext_sales_price
WHEN s.event_type =‘-D’ THEN -s.ss_ext_sales_price
ELSE 0 END) as total_sales_amount,
SUM(CASEWHEN s.event_type =‘+I’ THEN s.ss_net_profit
WHEN s.event_type =‘-D’ THEN- s.ss_net_profit
ELSE 0 END) as total_net_profit,
SUM(CASEWHEN s.event_type =‘+I’ THEN s.ss_quantity
WHEN s.event_type =‘-D’ THEN -s.ss_quantity
ELSE 0 END) as total_items_sold,
COUNT(DISTINCT s.ss_ticket_number) as transaction_count,
AVG(s.ss_sales_price) as avg_sales_price,
– 處理時間戳
LOCALTIMESTAMP as process_time
FROMTABLE(
TUMBLE(TABLE sales_events_source, DESCRIPTOR(event_time), INTERVAL ‘1’MINUTE)
) s
LEFT JOIN store_dim sd ON s.ss_store_sk = sd.s_store_sk
WHERE s.event_type IN (’+I’, ‘-D’, ‘U’) – 處理插入、刪除、更新事件
GROUP BY
window_start,
window_end,
s.ss_store_sk,
sd.s_store_name,
sd.s_state,
sd.s_market_manager;
而如果使用Cloudberry IVM,可能只需要一句CREATE INCREMENTAL MATERIALIZED VIEW 即可。
CREATE INCREMENTAL MATERIALIZED VIEW tpcds.store_daily_performance_enriched_ivm
AS
SELECT
– 維度信息 (從維度表中關聯得到)
ss.ss_store_sk store,
s.s_store_name store_name,
s.s_state state,
s.s_market_manager manager,
d.d_date sold_date,
– 核心業務指標 (與之前相同)
SUM(ss.ss_net_paid_inc_tax) AS total_sales_amount,
SUM(ss.ss_net_profit) AS total_net_profit,
SUM(ss.ss_quantity) AS total_items_sold,
COUNT(ss.ss_ticket_number) AS transaction_count
FROM
– 核心事實表與維度表的 JOIN
tpcds.store_sales_heap ss
JOIN
tpcds.date_dim d ON ss.ss_sold_date_sk = d.d_date_sk
JOIN
tpcds.store s ON ss.ss_store_sk = s.s_store_sk
GROUP BY
– 所有非聚合的維度列都需要出現在 GROUP BY 中
ss.ss_store_sk,
s.s_store_name,
s.s_state,
s.s_market_manager,
d.d_date
DISTRIBUTED BY (ss_store_sk);
狀態管理、數據一致性、計算觸發等所有復雜工作,都由數據庫內核透明地完成了,自此告別了中間大量的數據流作業的調度,大幅減少了開發運維成本。
“黃金搭檔”:IVM 與動態表(Dynamic Table)的場景辨析
在 Cloudberry 的工具箱中,除了 IVM,還有另一個強大的武器——動態表。兩者雖都是物化視圖的變體,但應用場景截然不同,是一對完美的“黃金搭檔”。
[圖片]
何時選擇增量物化視圖 (Incremental Materialized View)?
選擇 IVM 的核心決策依據是:您對數據的“新鮮度”和“低延遲”有極致的要求。
場景1:實時監控與分析儀表盤 (Real-time Dashboards)
- 描述:想象一下“雙十一”作戰指揮室里的大屏,需要以秒級刷新展示全國各個區域的實時GMV、訂單量、支付成功率。
- 為何適合IVM: 每一個新的訂單(INSERT到store_sales表)都需要被立刻反映到大屏的聚合指標上。IVM 事件驅動的特性完美匹配這個需求,它可以緊隨源表事務,提供秒級的視圖更新,確保決策者看到的是最新的戰況。動態表5分鐘一次的刷新在這里會顯得“太慢了”。
場景2:在線分析與交易一體化 (HTAP / OLAP on OLTP) - 描述:在一個繁忙的交易系統中(例如我們的 MySQL + CDC 場景),業務方希望在不影響交易性能的前提下,對最新的業務數據進行復雜的分析查詢。
- 為何適合IVM: IVM 將昂貴的聚合和關聯計算與前端查詢進行解耦。它在后臺悄悄地、增量地處理著每一筆交易變更,將結果預先算好。分析師的查詢可以直接命中這個預計算好的 IVM,避免了直接用復雜的分析查詢去沖擊寶貴的在線交易數據庫。
場景3:需要物化復雜中間結果的ETL/數據處理鏈路 - 描述: 在一個數據處理流程中,需要將多張頻繁變更的表進行關聯,并將這個中間結果作為下游多個任務的輸入。
- 為何適合IVM: IVM 可以將這個復雜的中間結果物化下來,并保持準實時更新。下游的所有任務都可以直接從這個穩定、高效的 IVM 中讀取數據,而無需重復進行昂貴的關聯操作,極大地提升了整個數據處理鏈路的效率。
何時選擇動態表 (Dynamic Table)?
選擇動態表的核心決策依據是:業務可以容忍分鐘級或更長的數據延遲,且主要目標是加速復雜查詢或避免對源系統造成持續壓力。
場景1:加速數據湖查詢 (Lakehouse Acceleration) - 它的“主場” - 描述:這是動態表文檔中明確提出的核心場景。您的公司將海量的(TB/PB級)用戶行為日志以 Parquet 格式存儲在 S3 數據湖中。您在 CloudberryDB 中創建了一個指向這批數據的外部表。直接對這個外部表進行聚合查詢非常緩慢,因為每次都需要通過網絡從 S3 拉取大量數據。
- 為何適合DT: 您可以創建一個動態表,SCHEDULE ‘*/30 * * * *’(每30分鐘)對這個外部表進行一次聚合計算,并將結果物化到 Cloudberry 的本地存儲中。分析師們現在可以直接查詢這個本地的動態表,查詢速度將從幾十分鐘縮短到幾秒鐘,體驗與查詢內部表無異。
場景2:常規商業智能與報表 (Periodic BI & Reporting) - 描述:業務方需要一份“每日銷售總結報表”、“每周用戶活躍度報告”或“每月財務對賬報表”。
- 為何適合DT: 這些報表對數據的要求不是“實時”,而是“T+1”或“周/月度”的準確性。使用動態表,配置一個每天凌晨 SCHEDULE ‘0 1 * * *’ 運行的刷新任務,自動生成前一天的報表數據。這相當于一個內置的、無需維護的、輕量級 ETL 作業,非常高效且優雅。
場景3:保護高并發寫入的源系統 - 描述:我們之前討論過,IVM 會給源表的 INSERT/UPDATE 帶來額外的事務開銷。現在假設您的源表是一個寫入并發極高的日志表,任何一點寫入延遲的增加都是不可接受的。
- 為何適合DT: 動態表完美地解決了這個問題。它的刷新任務與源表的寫入事務是完全解耦的。您的日志表可以毫無壓力地進行高頻寫入。動態表只會在調度點(例如每5分鐘)對該表發起一次集中的讀取操作,將計算負載與寫入負載在時間上完全錯開。
結論:互補的“黃金搭檔”
通過以上分析,我們可以清晰地看到: - 增量物化視圖 (IVM) 和 動態表 (DT) 并非互相替代的競爭關系,而是一對功能互補的“黃金搭檔”。
- IVM 是您工具箱里的“手術刀”,用于對需要低延遲、高新鮮度的內部數據進行精準、實時的分析。
- 動態表 (DT) 則是您工具箱里的“搬運車”和“預制工廠”,用于將外部的、或計算昂貴的數據,以周期性的方式高效地“搬運”和“預制”到數據庫內部,供您隨時享用。
直面現實:Cloudberry 增量物化視圖的性能與當前限制
任何技術都不是銀彈。透明地看待其成本與限制,是做出正確架構選擇的前提。
性能開銷:IVM 的即時維護特性,會給源表的 INSERT/UPDATE/DELETE 操作帶來額外的開銷。我們的測試顯示,這種開銷與基表上建立的IVM數量基本成正比。對于寫性能極其敏感的場景,需要審慎評估或采用動態表等其他模式。
關鍵限制:當前版本的 Cloudberry 增量物化視圖還存在一些功能限制,例如: - 不支持 MIN、MAX 聚合函數。
- 不支持 CTE、窗口函數、LEFT/OUTER JOIN 等復雜查詢和連接。
- 不支持分區表。
我們期待并相信,在開源社區的共同努力下,這些限制將在未來的版本中得到逐步完善。
結語:擁抱簡單,回歸本質
對于全球頂尖的互聯網公司而言,用一個龐大的團隊去駕馭 Flink 這樣的“重器”,追求極致的性能和靈活性是值得的。但對于更廣泛的企業來說,其絕大多數的實時分析需求,并不需要如此復雜的“屠龍之技”。
Apache Cloudberry 數據庫提供的增量物化視圖,正是這樣一把返璞歸真的“奧卡姆剃刀”。它讓我們回歸數據處理的本質,用最簡潔、最通用的語言(SQL),在一個統一、自洽的系統內,解決了流批一體的核心難題——數據一致性、開發復雜性和高昂成本。這或許正是能讓實時數據能力在更多企業中真正普及和落地的、最務實的一條路徑。
Github Demo庫代碼(用于理解并比對IVM與Flink流式加工的區別):
https://github.com/darkcatc/Stream-Batch-IVM