1、數據同步
????????數據同步我們之前在數倉當中使用了多種工具,比如使用 Flume 將日志文件從服務器采集到 Kafka,再通過 Flume 將 Kafka 中的數據采集到 HDFS。使用 MaxWell 實時監聽 MySQL 的 binlog 日志,并將采集到的變更日志(json 格式)保存到 Kafka,同樣再由一個 Flume 同步到 HDFS。使用 DataX 每天 0 點將需要全量同步的數據全量采集到 HDFS。
????????數據同步主要的作用就是實現不同數據源的數據流轉,對于大數據系統來說,包含把數據從業務系統同步進入數據倉庫和把數據從數據倉庫當中同步進入數據服務和數據應用兩個方面。
1.1、三種同步方式
1.1.1、直連同步
????????直連同步是指通過定義好的規范接口 API 和基于動態鏈接庫的方式直接連接業務數據庫,比如 ODBC/JDBC 等規定了統一規范的標準接口,不同數據庫廠商基于這套接口實現了自己的驅動,支持完全相同的函數調用和 SQL 實現。
? ? ? ? 直連同步就是通過 JDBC/ODBC 連接數據庫來往數倉進行寫入,但是這種方式對數據庫系統的性能影響比較大,尤其是執行大批量的數據同步可能會嚴重拖垮業務系統的性能。如果業務系統采用主備策略,則可以從備庫抽取數據,避免對業務系統產生性能影響。但是終究這不是一種好辦法。
1.1.2、數據文件同步
? ? ? ? 通過約定好的文件編碼、大小、格式等,直接從源系統生成數據的文本文件(比如把數據庫的二進制文件轉為文本文件),由專門的文件服務器傳輸到目標系統,對于常見的關系型數據庫,這種方式比較簡單實用。
? ? ? ? 另外,對于互聯網日志數據,通常是以文本文件形式保存的,所以也適合這種方式。
1.1.3、數據庫日志解析同步
? ? ? ? 現在大多數主流的數據庫都已經實現了使用日志文件進行系統恢復,比如 MySQL 的 binlog,通過數據庫日志可以實現增量同步的需求,不僅延遲可以控制在毫秒級別,而且對數據庫性能影響也比較小,目前這種方式也是廣泛應用于從業務系統到數倉的增量同步應用中的。
? ? ? ? 通過數據庫日志解析同步的效率雖然高,但是依然存在一些問題:
- 數據延遲。當業務系統做批量補錄時可能會使數據更新量超出系統處理的峰值,導致數據延遲。
- 投入較大。需要在業務數據庫和數倉之間部署一個專門用來實時同步的系統(比如 MaxWell,Cannal,這倒是也不算太大問題)。
- 數據漂移和遺漏。數據漂移一般是對于增量表而言的。具體解決方案下面會專門介紹。
1.2、阿里數據倉庫的同步方式?
?????????關于阿里云數據倉庫的同步方式這里簡單介紹,對于批量數據同步,阿里云使用的就是人家自研的 DataX;而關于實時數據同步,我們之前使用的是 MaxWell,而阿里云使用的是自家的 TT(TimeTunnel),具有高性能、實時性、高可用、可擴展等特點,被阿里巴巴廣泛應用于日志收集、數據監控、廣告反饋、量子統計、數據庫同步等領域。TT 是一種基于生產者、消費者和 Topic 的消息中間件(基于 HBase),不管是日志服務器中的日志還是業務系統中的數據都可以通過 TT 來進行同步到 MaxCompute。
1.3、數據同步中的問題與決絕方案
這里主要介紹數據漂移
1.3.1、數據漂移問題
????????數據漂移一般是對增量表而言的,它指的是數據在同步到數倉(ODS 層)過程中,由于網絡延遲或者系統壓力的原因,導致上一個分區的數據進入了下一個分區(今天的數據到了明天)。
? ? ? ? 由于 ODS 層有著面向歷史的細節數據查詢需求,這就要求數據采集到 ODS 層后必須按照時間進行分區存儲(離線數倉基本都是按天進行分區)
說明:
????????盡管離線數倉一般是以天為單位來進行數據分析,但并不是說我們就等到每天 0 點才開始同步前一整天的數據。
????????事實上,數據同步策略分為全量/增量同步,對于訂單表這種本身就非常大,而且變化也特別大的表一般都是采用實時同步策略(增量)。阿里巴巴采用 TT(TimeTunnel)來實現對業務數據庫的實時數據同步(原理就是監聽 binlog),但是一般并不是一條數據同步一次,而是累積一定時間間隔進行同步(比如每 15 分鐘)
????這里使用訂單表來說明數據漂移是怎么發生的,對于我們的業務數據表,它并不會像我們在數倉建表那樣為每個業務過程建立一張表,而是通過 update 操作來實現業務過程的變化,比如當 order_status 為已下單時,proc_time 就代表下單時間;當 order_status 為待支付時,modified_time 就代表狀態變化為待支付的時間。
id | order_id | proc_time | order_status | modified_time |
---|---|---|---|---|
1 | 1001 | 下單時間 | 已下單/支付中/支付成功 | 狀態修改時間 |
?通常,用于分區的時間戳字段分為四種:
- 業務表中用于標識數據記錄更新的時間戳字段(modified_time,比如訂單表中當訂單狀態變化為待支付、支付成功的識貨,modified_time 就會發生變化)
- 數據庫日志(binlog)當中用于標識數據記錄更新的時間戳字段(log_time)
- 業務表中用于記錄業務過程發生時間的時間戳字段(proc_time,比如下單時間、支付時間)
- 數據被抽取到的時間戳字段(extract_time, Flume 中的數據在寫入到?Kafka 之后,如果沒有 Event Header ,那么數據的時間默認就是寫入到 Kafka 的時間)
理論上,這幾個時間應該是一致的,但是在現實中,四個時間戳的大小關系為:proc_time<log_time<modified_time<extract_time,造成這些差異的原因有:
- 數據抽取是需要時間的而且得在數據產生之后,所以 extract_time 往往比另外三個時間都晚。
- 關系型數據庫采用預寫日志方式來更新數據,所以更新時間modified_time會晚于log_time
- 業務不能保證 modified_time 一定被更新。
- 由于網絡或系統壓力問題,會導致數據延遲寫入/數據延遲更新。log_time或者modified_time會晚于proc_time
? ? ? ? 通常的做法是選擇其中一個字段來進行分區,這就導致了數據漂移。下面是數據漂移常見的幾種場景:
- 根據 extract_time 進行分區。這種情況下最容易出現數據漂移。(比如 Flume 經過一定延遲把數據寫入到?Kafka 之后,如果沒有 Event Header,那么當 Kafka 的數據被轉為 Flume 格式時,Header 中默認的 timestamp 就是寫入到 Kafka 的時間?)
- 根據 modified_time 進行分區。但是業務不能保證 modified_time 一定被更新。
- 根據 log_time 進行分區。由于網絡或者系統壓力,可能會出現延遲。
- 根據 proc_time 進行分區。如果根據 proc_time 進行分區,我們得到的數據就遺漏了業務過程的變化(比如對于待支付、支付成功這些業務過程都是需要通過 modified_time 和 order_status 來確定的)。
數據漂移問題的解決方案(面試題)
1、多獲取后一天的數據
????????在每個 ODS 表時間分區中多冗余一部分數據,保證數據只多不少,畢竟即使網絡延遲再高,很小概率會超過 15 分鐘,所以我們可以向后冗余 15 分鐘的數據。但是這種方式會有一些誤差,比如我 1 號 0 點之前下單,2 號 1 點取消訂單,那么對于 1 號,我的數據狀態應該是已下單狀態,但是由于我把 2 號的部分數據頁拉到我的分區了,所以就可能導致記錄的狀態為訂單已關閉。所以對于記錄狀態更新頻繁的場景,我們可以創建拉鏈表,用時間(起始時間和結束時間)來約束獲取記錄的狀態。
2、多個時間戳字段限制
① 根據 modified_time 獲取后一天 15 分鐘的數據,并限制多個和業務過程的時間戳(下單、支付、成功)都是當天,然后根據這些數據按照 modified_time 升序排序,獲取每個數據(每個訂單,可以用 order_id 唯一區分)首次數據變更的那條記錄。
② 根據 log_time 分別冗余前一天最后15分鐘的數據和后一天凌晨開始15分鐘的數據,并用 modified_time 過濾非當天數據,并針對每個訂單按照 log_time 進行降序排序,取每個訂單當天最后一次數據變更的那條記錄。
③ 將兩部分數據根據 order_id 做full join 全外連接,將漂移數據回補到當天數據中。
? ? ? ? 總之,數據漂移是不可能杜絕的,畢竟大數據場景下網絡延遲和系統壓力不可避免,所以只能通過一些規則限制獲得相對準確的數據。