01?背景
推薦系統的模型,通過學習用戶歷史行為來達到個性化精準推薦的目的,因此模型訓練依賴的樣本數據,需要包括用戶特征、服務端推薦的視頻特征,以及用戶在推薦視頻上是否有一系列的消費行為。
推薦模型數據流,即為推薦模型提供帶特征和優化目標的訓練樣本,包括兩個模塊,一是Label Join模塊,負責用戶行為的采集。二是feature extract模塊,從原始日志中抽取特征,并基于用戶行為計算模型優化的目標label。
在B站早期的推薦模型數據流架構中,如下圖所示,采樣兩階段特征補齊設計。Label Join模塊除了完成用戶行為的采集,還需要查詢實時特征,補齊訓練樣本依賴的部分原始特征數據,一般是秒級更新的實時特征,存儲在Redis中。而Feature Extract模塊在計算樣本之前,補齊另外一部分原始特征數據,一般是批量更新的特征數據,存儲在KFC中(B站自研的KV系統)
02?問題分析
上述的推薦模型數據流架構,存在“不一致”問題,包括“數據不一致”和“計算不一致”
2.1 數據不一致
在這種自反饋系統中,推理輸入的特征版本和訓練輸入的特征版本,如果有差異的話,會影響模型的準確性。數據不一致有3個原因:
-
訪問時間差異。推理服務和Label Join/Feature Extract模塊訪問同一個特征的時間是不一樣的。特別是秒級更新的實時特征,這種數據差異會被放的更大
-
新稿件問題。在線推理服務有稿件服務可獲取新稿件的特征。而離線沒有這個服務,所以離線缺少新稿件的特征,導致新稿件的推薦不準確
-
特征穿越問題。LabelJoin有N分鐘的固定時間窗,所以Feature Extract在查詢批量特征的時候,可能批量特征版本已經更新,查到的是最新版本特征,從而導致特征穿越。一般需要有經驗的算法工程師,在離線批量特征更新上加上時間延遲,規避穿越問題。但這種規避可能會進一步加劇數據不一致
2.2 計算不一致
這里“計算”指的是從原始數據,生成特征的過程。特征可以用于在線推理和離線訓練。特征計算有3個地方:
-
在線推理,一個c++的服務。特征計算使用c++實現
-
Online Feature Extract,實時的樣本計算,基于FLINK,特征計算使用java實現
-
Offline Feature Extract,離線樣本回溯,基于Spark,特征計算使用python實現
此外這3個地方的數據格式也是不一樣的,不是簡單的語言之間的轉換。需要用戶開發3種特征計算邏輯,并且要很小心的對齊
03.? 一致性架構
為了解決上述的“數據不一致”和“計算不一致”,我們將B站推薦模型數據流升級成一致性架構:
-
數據一致性:將在線推理的原始特征現場snapshot,dump到近線。近線基于snapshot做Label Join和Feature Extract。因為在離線用的同一份數據,可保證數據完全一致
-
計算一致性:特征計算邏輯算子化,基于tenforflow實現一個c++ lib。推理服務直接調用特征抽取lib,離線Feature Extract通過java JNI調用c++ lib。特征計算都基于同一個c++ lib,用戶只需要定義一次,可保證計算完全一致
3.1 整體架構
-推薦服務,將推理的原始特征現場,打包成snapshot,通過數據集成工具,從在線服務同步到近線Kafka
-在近線,基于flink latency join功能,實現Label Join,為每條請求join用戶在稿件上的各種行為label,e.g. 點擊、播放、后驗等
-Label Join任務輸出Shitu,同時寫到kafka和hive
-在近線,基于flink實現實時樣本計算,消費kafka Shitu,產出訓練樣本,寫到kafka,再接入實時訓練。產出模型用于線上推理
-在離線,基于flink/spark實現的批量樣本計算,從hive表load Shitu,產出樣本寫到hive。訓練任務依賴hive表作批量的模型訓練
3.2 Label Join
Label Join,使用flink letency join(延時拼接)功能,完成每個請求稿件上的用戶行為采集。用戶行為作為訓練樣本的label,即模型的優化目標。
flink letency join基于時間驅動,以一個固定時間窗做數據下發。e.g.固定時間窗是N分鐘,即每條請求數據到flink任務后,等待N分鐘后輸出數據
在最新的一致性數據流上,我們在Label Join上增加了事件驅動,增加數據下發的時效性,將數據流的時效性提升60%
-
定義下發事件:原則上按用戶不會再看到視頻作下發規則。對用戶連續請求,按時間排序。如果最新的請求是清屏請求,那這次請求前的所有請求都可以下發。如果是普通請求,那這次請求往前第5個請求可以下發
-
采用事件驅動+時間驅動的數據下發方式,優先事件驅動下發,沒有被事件觸發的請求,走時間驅動,超時下發
3.3 樣本計算
樣本計算是基于Label Join產出的Shitu數據,計算訓練樣本,分兩種模式:
-online extract:實時的樣本計算,flink streaming計算引擎,讀kafka寫kafka
-offline extract:離線的批量樣本計算,支持flink batch/spark batch兩種計算引擎。offline extract支持兩種樣本計算模式:1) 無新增特征的樣本計算,直接讀Shitu hive table產也訓練樣本。2) 有新增特征的樣本計算,用戶挖掘的新特征,不在Shitu里。訓練樣本依賴Shitu和新挖掘特征
目前一致性的樣本計算框架支持兩種模型:
(1)直接計算:一般用于精排模型。整個樣本計算過程抽象成幾個算子:
-selector:數據篩選。過濾請求或者稿件
-calculate label : 通過用戶行為label,計算每個視頻的train label
-刷內item采樣:在一刷請求內,對稿件進行采樣,e.g. 按正負例
-pyfe:調用fealib,生成模型特征
每個算子,都可以支持算法同學根據業務需求自定義
(2)有外部采樣的樣本計算:一般在召回模型上使用
-calculate label : 通過用戶行為label,計算每個視頻的train label
-外接一個采樣稿件候選池,根據稿件的train label,進行采樣。采樣邏輯按算法需求可定制
-從KFC查詢采樣稿件的特征,并組裝一條完整的snapshot
-pyfe:調用fealib,生成模型特征
3.4 BackFill
BackFill特征回填,指的是算法同學調研新特征在模型上的收益,流程如下:
-
對于NoDelta模式,直接讀Base Shitu,生成全量的訓練樣本
-
對HasDelta模式,用戶挖掘一批新增的特征(delta snapshot)
-
基線Shitu join delta snapshot,生成一份新Shitu
-
基于新Shitu,作全量的特征計算,生成全量訓練樣本
-
模型訓練樣本并評估auc,效果不符合預期重新設計數據和特征
同時我們提供了一套python sdk,支持用戶在鏡像或者jupyter上自己訂制特征回填特征的邏輯和流程
3.5 基于protobuf wireformat的partial decode優化
對于在線推理現場snapshot,采用了protobuf組織數據,包含了模型特征需要所有原始數據,單條數據超過250KB,有上千個字段。在樣本計算階段,對snapshot有兩個處理邏輯:
調用protobuf ParseFrom接口,將snapshot bytes 反序列化成Message,平均耗時7~8ms
將snapshot所有稿件類的特征做裁剪:一刷請求n個稿件,其中m個稿件參與訓練,平均耗時5~6ms
通過性能分析,樣本計算中有50%的時間消耗在上述snapshot protobuf解析和處理上。但實際樣本計算相關邏輯上,并不需要所有snapshot字段,所以我們使用protobuf wireformat,對snapshot做partial decode,只解析需要的field。最終將snapshot處理的性能從14ms優化到1.5ms,樣本計算的cpu資源降低了30%+
04 未來工作
4.1 基于Iceberg批流一體的訓練樣本計算框架
如3.1章節的數據流架構中,通過FLINK實時計算產出的訓練樣本,會同時寫到Kafka和Hive表,分別用于實時訓練和批量訓練。同時離線回溯也可以產出訓練樣本寫到Hive表。這種架構存在兩個問題:
(1) 需要額外的FLINK資源,把Kafka中的樣本備份到Hive表中,即一個實驗樣本流,需要搭建兩個FLINK任務
(2) 實時樣本和離線樣本,輸入輸出的介質不同,框架層面需要適配。下游訓練模塊也需要適配不同的樣本源,無法做到批流一體
未來我們計劃引入iceberg實現樣本計算框架的批流一體,解決上述問題。?Apache Iceberg? 是一種用于大型分析表的高性能格式,旨在解決數據存儲和計算引擎之間的適配問題,其核心特性之一是支持同時處理流數據和批數據,提供統一的讀寫接口
框架如下圖所示:
-
Label Join產出數據,實時寫到iceberg Shitu表
-
樣本計算框架,從iceberg Shitu讀數據,可以實時計算或批量計算,產出數據寫到iceberg樣本表
-
訓練框架讀iceberg樣本表,可online training或者batch training
4.2 基于Iceberg MOR的增量特征回填優化
如3.4節的BackFill功能,將全量Shitu和Delta Snapshot拼接之后,再進行樣本計算。這個邏輯存在2個問題,一是Shitu數據量比較多,拼接效率低。二是每次都需要全量計算所有特征,性能開銷大。當然可以做增量特征計算,在和基線樣本拼接。但樣本數據量比較大,Hive表拼接性能較差,在某些情況下,可能比全量計算特征慢。
為此我們計劃在4.1工作基礎上,利用iceberg的MOR技術,優化BackFill的性能:
-
維護一份基線樣本的iceberg表
-
在基線樣本iceberg表新建一個branch,增加新特征列
-
基于Shitu和delta snapshot,只做增量特征計算,并將增量特征寫到新特征列。這一步只計算增量特征,不需要join,可極大提升性能
-
訓練模塊讀樣本表,利用Iceberg MOR的能力,讀基線特征+增量特征,再merge成完整的特征列表,完成訓練
-End-
作者丨lixiaowei、正鼎