上周在組內分享了一下這個主題, 我覺得還是摘出一部分當文章輸出出來
分享主要包括三個方面: 1. 項目背景 2.Spark 原理 3. Spark 實戰
項目背景
主要是將海量日志進行多維度處理;
項目難點
1、數據量大(壓縮包數量 6TB,60 億條數據);
2、在 cos 上的目錄不固定;
3、計算方式復雜,各種過濾、聚合、匯總邏輯;
4、處理時間有限,需在 4h 內完成;
基于上述的項目背景和難點, 最終決定采用 Spark,首先數據量大及計算方式復雜, 如果使用傳統的服務方式, 需要大量的服務器資源, 而目錄不固定, 使數據讀取變的復雜, 且普通服務不太可能在 4h 內處理完畢; 綜合考慮決定使用 Spark。
那么就要講講Spark 是什么, 以及在這些挑戰中的優勢了
Apache Spark
快速、通用、可擴展的大數據引擎;
優勢:
1、快速: Spark 可以中集群中并行處理數據, 重復利用多臺機器的計算能力,顯著提高處理速度, 對于我們的大數據量場景尤為重要;
2、易于使用的 API, 支持 Java、Python、Scala API; Spark 原生只支持 Scala 和 Java,僅中外圍包裝通過 PySpark 中間件實現對 Python 語言的支持;
3、通用:提供多種計算模型,如: 批處理、交互式查詢(Spark SQL)、實時流處理、機器學習、圖計算等,可以靈活應對復雜的計算需求;
4、靈活: 支持多種數據源,如 HDFS、COS、Kafka、HBase 像我們的數據存儲在 COS 上, 是可以直接讀取 COS 目錄, 且對于不確定路徑, 可以直接使用* 代替,Hadoop-COS實現了以騰訊云 COS 作為底層文件系統運行上層計算任務的功能,支持使用Hadoop、Spark以及Tez等處理存儲在騰訊云COS對象存儲系統上的數據,地址如下:
https://github.com/tencentyun/hadoop-cos
Spark 架構
前面了解了 Spark 是什么, 這里講一下 Spark 的架構
1、Driver是spark應用程序入口,是spark核心,負責spark集群的鏈接和資源管理。
2、ClusterManager負責所有Executor的資源管理和調度,Spark可以與多種集群管理器配合使用, 比如yarn k8s。
3、Executor 負責具體作業計算任務。
當我們提交一個任務開始執行時,是如何作業的:
1、啟動 Driver 程序,會解析編寫的程序,并序列化字節級別代碼, 通過 SparkSession 的一個成員變量: SparkContext 向Cluster Manager 發出命令,Cluster Manager 會將當前的資源情況分配合適的資源給 Driver。
2、 Drvier 的字節級別代碼會分發至將要執行的 Executor 上, 這些計算過程實際上是在每個節點本地計算并完成,每個spark會在集群中有一個或多個Executor,Executor 之間也可能會有數據的傳輸,比如一些聚合函數執行。
3、一旦整個執行過程完成,Driver 收集所有 Executor 返回的結果, 結束整個作業,同時像 ClusterManager 釋放資源。
4、在整個過程中,Cluster Manager 扮演了資源管理和任務調度的關鍵角色。它確保了 Spark 作業能夠高效地利用集群資源,調度任務到合適的 Executor 上執行,從而實現分布式計算的優勢。
通過這種方式, Spark 可以高效利用集群資源, 實現大規模數據的分布式處理
Spark 核心組件
1、Spark Core是Spark基礎,提供內存計算能力, 是分布式處理大數據的基礎,它將分布式數據抽象為彈性分布式數據集(RDD),并為運行在其上的上層組件提供 API。所有 Spark 的上層組件都建立在 Spark Core 的基礎之上。
2、Spark Streaming 是一個用于處理動態數據流的 Spark 組件。它能夠開發出強大的交互和數據查詢程序。在處理動態數據流時,流數據會被分割成微小的批處理,這些微小批處理將會在 Spark Core 上按時間順序快速執行。
3、Spark SQL 是一個用于處理結構化數據的 Spark 組件。它允許使用 SQL 語句查詢數據。Spark 支持多種數據源,包括 Hive 表、Parquet 和 JSON 等。
Spark 核心數據結構
前面我們帶過 rdd, rdd 全稱為彈性分布式數據集, 是 spark 的核心數據結構,一個不可變的分布式對象集合,
雖然名字帶了分布式,但是在使用的時候,是感受不到分布式,就跟操作本地數據集一樣操作在分布式存儲中的數據。
RDD 特性有三種:
1、彈性
容錯分兩部分: 1> 機器層面的容錯, 節點出錯自動重試, 2>RDD層面的容錯;也就是血統, rdd 的依賴關系, 有寬依賴和窄依賴, 可以通過血統信息重新計算丟失的分區,而不需要重新計算整個RDD;但是當計算邏輯復雜時,就會引起依賴鏈過長,重算代價就很高,可以適當使用rdd檢查點;
2、分布式
數據分布在集群的多個節點上,RDD的分區(partition)是指將數據集劃分成多個部分,以便在集群中的不同節點上并行處理。分區數與集群中的節點數無關,很可能集群中的單個節點有幾個數據分區。
3、不可變性
Rdd只能基于在穩定物理存儲中的數據集和其他已有的rdd上執行確定性操作來創建。
RDD 支持操作
rdd支持兩種
1、轉換,從現有數據集創建一個新的數據集:如map將數據集每個元素傳遞給函數,返回一個新的分布式數據集表示結果。
RDD的所有轉換都是惰性的, 也就是說并不會直接計算,他們只是記住這些應用到基礎數據集(比如一個文件)的轉換動作, 只有當發生一個要求返回結果給Driver的動作, 才會真正執行; 比如map創建一個新數據集,并在reduce中使用,最終只返回reduce的結果給Driver,而不是整個大的新數據集。
這樣Spark就可以了解所有執行的算子, 從而設定并優化執行計劃。
2、動作: 如reduce 將所有元素疊加起來,將最終結果返回給Driver
轉換算子返回的還是RDD,但是行動算子返回的是非RDD類型值,比如整數,或者無返回值
RDD 依賴關系
1、窄依賴
每一個parent RDD的Partition最多被子RDD的一個Partition使用
2、寬依賴(也稱Shuffle依賴)
多個子RDD的Partition會依賴同一個parent RDD的Partition
Shuffle 是指在分布式計算過程中,數據在不同的分區之間重新分配的過程。Shuffle 通常發生在需要跨分區進行數據交換的操作中,例如 groupByKey、reduceByKey、join 等。這些操作需要將數據從一個分區移動到另一個分區,以便進行合并或聚合
在書上截的一個圖, 還是很清晰的:
RDD 邏輯計算圖
這里結合我們的項目背景: 萬象圖片請求數據, 對海量日志進行多維度處理、計算、分析,我們來了解一下 rdd 的邏輯計算
我們的日志都是以壓縮包的方式,json 的格式存儲在 cos 上; 首先從 cos讀取出來的數據也就是第一步創建 RDD,其中解析 json,確定 key 以及 filter 過濾邏輯, 是 RDD 的轉換操作;
轉換完成后,進行按桶粒度聚合或者統計,是action動作,生成運算結果, 轉換和執行在Executor上操作的;
每個Executor處理其中的一部分RDD,最終將執行結果又寫回 COS 上;
RDD 緩存
Spark 速度非常快的原因之一就是 RDD 緩存。
我們看右側的這個圖, 以場景來說明:
RDD0 過濾生成RDD1, 在RDD1基礎上, 進行不同的聚合計算, 常規情況下, 要做兩次filter;
首先進行了RDD0→RDD1→RDD2的計算作業,那么計算結束時,RDD1就已經緩存在系統中了。在進行RDD0→RDD1→RDD3的計算作業時,由于RDD1已經緩存在系統中,因此RDD0→RDD1的轉換不會重復進行,計算作業只須進行RDD1→RDD3的計算就可以了,因此計算速度可以得到很大提升
所以在不同操作中在內存中持久化(或緩存)一個RDD后,每個節點就將計算的分片結果保存在內存中,對次數據集進行的其他操作中重用。
緩存有可能丟失, 或者基于內存的數據由于內存不足被刪除, RDD的緩存機制,保證了即使緩存丟失也能保證計算的正確執行。
Spark 配置及調優
先講資源配置:
Executor.memory: 設置過大, 部分任務分配到資源等待, 設置過小,頻繁gc,影響性能;
Executor-cores: 每個Executor可以使用的cpu核心數每個Executor可以并行執行多個任務,核心數越多,Executor的并行處理能力越強。
在代碼中的一些使用: 數據處理優化;
數據分區設置: 分區數決定了數據集被劃分成多少個部分,影響到并行度和任務調度。過多: 上下文頻繁切換;過少,并行度不足,任務處理數據量大,影響作業完成時間;
那么如何合理設置分區數:
分區數應根據數據量和集群的計算資源來設置。一個常見的經驗法則是每個分區的數據量在 128MB 到 256MB 之間。在實際運行中,監控作業的執行情況,觀察任務的執行時間、資源利用率等指標,根據實際情況進行調整。
數據傾斜:數據傾斜會導致某些任務處理數據量過大, 以reduceByKey 和groupByKey 為例:
1> 內存使用
groupByKey 會將所有具有相同鍵的值聚集到一個列表中,這可能會導致大量的數據在內存中存儲,尤其是當某個鍵的值非常多時。這可能會導致內存溢出或性能下降。
reduceByKey 在每個分區內先進行局部聚合(即在每個分區內對相同鍵的值進行合并),然后再將結果發送到 reducer。這種方式減少了需要傳輸的數據量,從而降低了內存使用和網絡傳輸的開銷。
2> 網絡傳輸
groupByKey 會將所有相同鍵的值發送到同一個節點,這可能會導致大量的數據在網絡中傳輸。
reduceByKey 通過在每個分區內進行局部合并,減少了需要在網絡中傳輸的數據量,從而提高了性能。
緩存: 將常用數據進行緩存,緩存有幾種形式, 比如都放內存中, 可以選擇節省空間的級別,序列化對象等多種級別。
1> 比如在使用filter算子后,通常數據會被打碎成很多個小分區,這會影響后面的執行操作,可以先對后面的數據用coalesce算子進行一次合并。
2>像在實際處理cos 文件, 文件只有幾十 k,但是十幾萬的數據, 光遍歷讀 COS 就需要 1h+, 處理加工只需要 30min;
到這里, 就結束了!
附錄 COS 使用 demo:
https://cloud.tencent.com/document/product/436/79146