一、背景
為了理解Spark Streaming提供的語義,我們先回顧西Spark RDD的基本容錯語義學。
- RDD是一個不可變的、確定性可重新計算的分布式數據集。每個RDD都記住在容錯輸入數據集上用于創建它的確定性操作的沿襲。
- 如果RDD的任何分區由于工作節點故障而丟失,則可以使用操作沿襲從原始容錯數據集重新計算該分區。
- 假設所有RDD轉換都是確定性的,最終轉換后的RDD中的數據將始終相同,而不管Spark集群中的故障如何
Spark對HDFS或S3等容錯文件系統中的數據進行操作。因此,從容錯數據生成的所有RDD也是容錯的。然而,Spark Streaming并非如此,因為在大多數情況下,數據是通過網絡接收的(使用fileStream時除外)。為了實現所有生成的RDD的相同容錯屬性,接收到的數據將在集群中工作節點的多個Spark executors 之間復制(默認復制因子為2)。這導致系統中有兩種數據需要在發生故障時恢復:
- 接收和復制的數據-此數據在單個工作節點發生故障時幸存下來,因為它的副本存在于其他節點之一上
- 已接收但為復制而緩沖的數據-由于未復制,因此恢復此數據的唯一方法是從源再次獲取它
此外,我們應該關注兩種失敗:
- 工作節點的故障-任何運行執行器的工作節點都可能發生故障,并且這些節點上的所有內存數據都將丟失。如果任何接收器在故障節點上運行,那么它們的緩沖數據將丟失。
- 驅動程序節點的故障-如果運行Spark Streaming應用程序的驅動程序節點發生故障,那么顯然SparkContext丟失了,并且所有具有內存數據的執行程序都丟失了。
有了這些基礎知識,我們下面開始學習Spark Streaming的容錯語義學
二、整體語義
流系統的語義學通常是根據系統可以處理每條記錄的次數來捕獲的。系統可以在所有可能的操作條件下(盡管有故障等)提供三種類型的保證。
- 最多處理一次:每條記錄要么處理一次,要么根本不處理
- 至少一次:每條記錄將被處理一次或多次。這比最多一次更強,因為它確保沒有數據丟失。但可能會有重復。
- 精確一次:每條記錄將被精確處理一次 - -- 沒有數據丟失,也沒有數據被多次處理。這顯然是三者中最強大的保證。
在任何流處理系統中,廣義上講,處理數據有三個步驟。
- 接收數據:使用接收器或其他方式從源接收數據。
- 轉換數據:使用DStream和RDD轉換轉換接收到的數據
- 推送數據:最終轉換后的數據被推送到外部系統,如文件系統、數據庫、儀表板等
如果一個流式應用程序必須實現端到端的精確一次保證,那么每個步驟都必須提供精確一次保證。也就是說,每條記錄必須精確接收一次,精確轉換一次,并精確推送到下游系統一次。Spark Streaming采用的是RDD來處理數據,RDD中間的轉化操作都是迭代器模式,可以保證所有接收到的數據將只處理一次。即使出現故障,只要接收到的輸入數據是可訪問的,最終轉換的RDD將始終具有相同的內容。這樣就剩下接收數據和推送數據的保證,這兩點我們再后面結合不同的輸入源提供的保證以及下游系統的不同來進行詳細分析。
三、接收數據語義
不同的輸入源提供不同的保證,從至少一次到恰好一次。
1、輸入源是文件
如果所有輸入數據都已經存在于像HDFS這樣的容錯文件系統中,Spark Streaming總是可以從任何故障中恢復并處理所有數據。這給出了一次語義學,這意味著無論發生什么故障,所有數據都將被處理一次。
2、輸入源是接收器(Receiver)
對于基于接收器的輸入源,容錯語義學取決于故障場景和接收器類型。正如我們之前討論的,有兩種類型的接收器:
- 可靠的接收器——這些接收器只有在確保接收到的數據已經被復制后才會確認可靠的來源。如果這樣的接收器發生故障,源將不會收到緩沖(未復制)數據的確認。因此,如果接收器重新啟動,源將重新發送數據,并且不會因故障而丟失數據。
- 不可靠的接收器-這種接收器不發送確認,因此當它們由于工作人員或驅動程序故障而失敗時可能會丟失數據
根據使用的接收器類型,如果工作節點發生故障,那么可靠的接收器不會丟失數據。對于不可靠的接收器,接收但未復制的數據可能會丟失。如果driver?發生故障,那么除了這些丟失之外,所有過去在內存中接收和復制的數據都將丟失。這將影響有狀態轉換的結果。
為了避免過去接收到的數據丟失,Spark 1.2引入了預寫日志,將接收到的數據保存到容錯存儲中。由于啟用了預寫日志和可靠的接收器,數據丟失為零。就語義學而言,它提供了至少一次保證。
因此推薦采用的模式為:帶有預寫日志的Spark 1.2或更高版本
3、輸入源是Kafka的Direct API
在Spark 1.3中,引入了一個新的Kafka Direct API,它可以確保Spark Streaming只接收一次所有Kafka數據。
四、輸出數據語義
輸出操作(如foreachRDD)至少有一次語義學,也就是說,在worker 節點失敗的情況下,轉換后的數據可能會多次寫入外部實體。雖然這對于使用saveAs***Files操作保存到文件系統是可以接受的(因為文件將被相同的數據覆蓋),但可能需要額外的努力來實現一次語義學。有兩種方法。
1、冪等更新:多次嘗試總是寫入相同的數據。例如,SaveAs***Files總是將相同的數據寫入生成的文件。
2、事務性更新:所有更新都是以事務性方式進行的,因此更新僅以原子方式進行一次。
- 使用批處理時間(在foreachRDD中可用)和RDD的分區索引來創建標識符。此標識符唯一標識流應用程序中的blob數據
- 使用標識符以事務方式(即僅一次原子方式)使用此blob更新外部系統。也就是說,如果標識符尚未提交,請原子方式提交分區數據和標識符。否則,如果已經提交,請跳過更新。
dstream.foreachRDD { (rdd, time) =>rdd.foreachPartition { partitionIterator =>val partitionId = TaskContext.get.partitionId()val uniqueId = generateUniqueId(time.milliseconds, partitionId)// 使用此uniqueId在partitionIterator中事務性提交數據}
}
大多數高校碩博生畢業要求需要參加學術會議,發表EI或者SCI檢索的學術論文會議論文:
可訪問艾思科藍官網,瀏覽即將召開的學術會議列表。會議如下:
第四屆大數據、信息與計算機網絡國際學術會議(BDICN 2025)
- 廣州
- https://ais.cn/u/fi2yym
第四屆電子信息工程、大數據與計算機技術國際學術會議(EIBDCT 2025)
- 青島
- https://ais.cn/u/nuQr6f
第六屆大數據與信息化教育國際學術會議(ICBDIE 2025)
- 蘇州
- https://ais.cn/u/eYnmQr
第三屆通信網絡與機器學習國際學術會議(CNML 2025)
- 南京
- https://ais.cn/u/vUNva2