一 高時效通路
1.1 pathchdumper
實時數據拉取、實時數據處理、5分鐘微批dump來加速時效性,具體來說:
- 實時數據拉取(Fetcher):基于Databus Fetcher基建,直接對接F0層實時拉取最新數據,保證該節點常態秒級延遲;
- 實時數據處理(AdTable):面向Databus數據格式,有效實踐是類似于BS廣告庫服務,基于AdTable實時加載和用戶UDF能力保證數據處理秒級延遲;
- 5分鐘微批dump(Logger):新增Logger功能,通過記錄增量數據觸發的表結構變化,并經過數據格式轉換形成下游所需增量文件;
- 數據固化(Uploader):復用Databus Uploader基建,將5分鐘級Patch文件寫入AFS。
說人話:
-
定時執行heart_beat操作(可配置),對全局idea_map進行遍歷(跟隨ideatable更新),根據status對大寬表進行insert、remove
-
insert/remove的過程中執行logger,將大寬表數據存入buffer,buffer以idea_id為key,在dump固化到磁盤前,再次logger,新數據可覆蓋舊數據
-
loader讀增量文件至文件切換時dump buffer中數據,固化到磁盤
-
uploader上傳文件至afs
1.1.1 為何使用大寬表
-
一條完整的廣告增量是<winfo, idea>,而不是某個單一層級,這就決定了高時效通路中需要有可做層級join的能力。一條廣告是基于idea層級從同unit下優選出一條winfo
-
ann建庫時最終用戶(數據使用方)拿到的是一條完整Join好的有效數據。這意味著,下游拿到的一條Patch數據Record中,包含了有效的Idea層級、Unit層級、Plan層級數據
1.1.2 Heartbeat如何工作
custom_heart_beat的具體邏輯:
定期執行heart_beat操作(可配置),對全局idea_map進行遍歷,根據其status對大寬表進行增刪改
主函數 custom_heart_beat(FeedBsTables* p_handle)
遍歷全局創意map, for (auto& [idea_id, status] : context._global_idea_map)
然后判斷idea對應的狀態標記
1. 狀態為需要移除
- 從寬表中刪除該廣告
- 從全局的創意映射表中移除該創意
2、狀態為其他
- 插入、更新、無變化,調用completeness_check方法進行檢查, completeness_check(p_handle, status.first, idea_id);
其中,completeness_check函數
-
get_common_info讀取廣告基礎信息(user_id、plan_id、unit_id、user_main_version、plan_main_version、unit_main_version、winfo_main_version)
-
校驗USER、PLAN、UNIT、WINFO 四個層級的version,通過check_version函數進行主輔表verison狀態判斷:
-
輔表version不存在(輔表已下發刪除增量),需要刪除業務寬表VERSION_UNEQUAL_REMOVE
-
主輔表version相同,表示字段沒有變動VERSION_EQUAL
-
主輔表version不相等, 需要更新業務寬表VERSION_UNEQUAL_UPDATE
-
-
判斷主輔表狀態
-
如果結果是VERSION_UNEQUAL_REMOVE
-
表示輔表join失敗不做操作
-
return 0;
-
-
如果結果是VERSION_UNEQUAL_UPDATE
-
如果在寬表中的一定不是新廣告, 僅更新version
-
1. winfo優選choose_best_winfo_id
-
優選條件 target_type == 32 && intent_type == 16 && intent_name_id == 999999, 表示智能定向廣告
-
不符合則隨機抽一條
-
-
2. 業務寬表字段填充fill_wide_idea_tuple
-
3. 多樣性控制winfo_customer_control
-
get_freq, 使用map實現頻率計數,并設定key的過期時間
-
-
4. 新廣告插入到業務寬表中 p_handle->wide_idea_table()->insert(tuple);
-
5. version更新update_version
-
-
dump服務中,dump的框架需要能夠實現周期性 dump base,patch數據的能力,以及根據業務需求可以根據定期(按照處理條數或時間間隔)觸發提條heartbeat消息的能力,這樣業務可以根據 heartbeat 消息來實現一些特定的業務邏輯。
那么 trigger 應該由誰來觸發呢?
-
對于業務需求的 heartbeat 而言,則需要單獨增量一個條數和時間的計數器,以便在到達用戶配置的條件時,生成一條heartbeat數據,該邏輯在 loader 中實現也比較合理。
綜上,需要在 loader 中增加 trigger 的能力,來 cover 上述場景。
trigger 只需要在加載增量時進行觸發即可(應該會有返回值,告訴上游,什么時候可以開始啟動服務了)
1.1.3 Logger是如何實現的
對于patch 數據,我們是需要記錄下對 table 的修改,相當于記錄一個 log。 log則是以5分鐘的粒度(和增量文件的粒度對接)進行組織。
增量數據到來后,會對兩類Table 進行修改: MemDataTable 和 IndexTable,因此我們只需要對這兩類 Table 的 modify接口進行封裝,在數據更新時,將數據同時也更新到patch logger 中,就可以實現對table的修改記錄。
-
MemDataTable的 modify 接口包括:
-
insert(tuple)
-
remove(tuple)
-
-
IndexTable 的 modify接口包括:
-
insert(key, tuple)
-
remove(key)
-
為了保證數據記錄Patch Logger的性能,需要在內部為每個需要 dump 的table 配置一個buffer(buffer 大小可配,不同 table可以配不同的 buffer size)。當 buffer滿時,可以自動 spill 到本地磁盤。在 trigger patch dump 時,會將 buffer全部 flush 到本地磁盤。因為 Loader 在處理增量時是單線程的,所以當 Loader 觸發 dump 時,上一條數據已經被處理完,不會有數據丟失的問題。
Buffer 用HashMap來實現,這樣,對于同一個 key 對應的數據,我們可以只保留最新的一個修改,以實現一定程度的 compaction。注意:這里如果一次 patch數據量過多,可能生成多輪 buffer,那么最終的dump 文件中還是可能造成數據的重復的。
1.1.4 patchdump何時工作
5分鐘增量文件讀完后開始dump
-
我們希望 patch的粒度可以和databus 的增量文件對齊,那么loader 來觸發 adtable 的 dump 是一個很直接的想法。當loader可以識別到增量文件的切換,當文件發生切換時,向 PatchLog 發送一次 dump 請求,PatchLog將本次Patch的數據 flush到本地目錄。
-
對于base dump,為了可以和 patch 的時間對齊,同樣也使用增量文件的切換來觸發 base 數據的 dump,即,當前一個5分鐘的增量文件處理完成后,此時滿足需要 dump 的周期,則觸發一次table表的dump到本地目錄。
-
1.1.3 不同表的版本同步問題
CDC(Change Data Capture)數據同步:
-
https://en.wikipedia.org/wiki/Change_data_capture
-
Change Data Capture (CDC): The Complete Introduction | Confluent
基準:ETL
獨立業務寬表的方案,將主表和基于主表的業務寬表獨立。
[流程圖]
如圖所示,在這種方案中,我們一共有三種表:
-
主表:IdeaTable。該表包含所有主表的業務字段,此外還需要存儲相關Right Join輔表的數據版本version。
-
輔表:Right Join的業務表,如UnitTable,PlanTable。該表包含輔表自身的業務字段,此外還包含沒調記錄的數據version。
-
業務大寬表:業務大寬表為最終的業務方也需要的物理實體寬表,該表對接Logger,用于產出最終分片,其上掛載了業務方所需要的所有主表和輔表Field。