(一)什么是分區
【復習提問:RDD的定義是什么?】
在 Spark 里,彈性分布式數據集(RDD)是核心的數據抽象,它是不可變的、可分區的、里面的元素并行計算的集合。
在 Spark 中,分區是指將數據集按照一定的規則劃分成多個較小的子集,每個子集可以獨立地在不同的計算節點上進行處理,這樣可以實現數據的并行處理,提高計算效率。
可以將 Spark 中的分區類比為快遞公司處理包裹的過程。假設你有一批包裹要從一個城市發送到另一個城市,快遞公司會將這些包裹按照一定的規則進行分區,比如按照收件地址的區域劃分。每個分區的包裹會被分配到不同的快遞員或運輸車輛上進行運輸,這些快遞員或車輛可以同時出發,并行地將包裹送到不同的區域。這就類似于 Spark 中的分區,每個分區的數據可以在不同的計算節點上同時進行處理,從而加快整個數據處理的速度。
(二)默認分區的情況
- 從集合創建 RDD(使用 parallelize 方法)
當使用 parallelize 方法從一個集合創建 RDD 時,默認分區數通常取決于集群的配置。
在本地模式下,默認分區數等于本地機器的 CPU 核心數;在集群模式下,默認分區數由 spark.default.parallelism 配置項決定。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("DefaultPartitionExample").setMaster("local")
val sc = new SparkContext(conf)
val data = Seq(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
println(s"默認分區數: ${rdd.partitions.length}")
sc.stop()
2.從外部存儲(如文件)創建 RDD(使用 textFile 方法)
當使用 textFile 方法從外部存儲(如 HDFS、本地文件系統等)讀取文件創建 RDD 時,默認分區數通常由文件的塊大小決定。對于 HDFS 文件,默認分區數等于文件的塊數。例如,一個 128MB 的文件在 HDFS 上被分成 2 個 64MB 的塊,那么創建的 RDD 默認分區數就是 2。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("DefaultPartitionFileExample").setMaster("local")
val sc = new SparkContext(conf)
// 假設文件存在于本地
val rdd = sc.textFile("path/to/your/file.txt")
println(s"默認分區數: ${rdd.partitions.length}")
sc.stop()
【現場演示,如果文件是一個.gz文件,是一個不可拆分的文件,那么默認分區的數量就會是1】
(三)分區的作用
在 Spark 中,RDD 是數據的集合,它會被劃分成多個分區,這些分區可以分布在不同的計算節點上,就像圖書館的書架分布在不同的房間一樣。
這樣做的好處是什么呢?
并行計算:Spark 能夠同時對多個分區的數據進行處理,充分利用集群的計算資源,進而加快作業的執行速度。例如,若一個 RDD 有 10 個分區,且集群有足夠的計算資源,Spark 就可以同時處理這 10 個分區的數據。
數據局部性:分區有助于實現數據局部性,也就是讓計算盡量在數據所在的節點上進行,減少數據在網絡間的傳輸,從而降低網絡開銷。
容錯性:當某個分區的數據處理失敗時,Spark 能夠重新計算該分區,而不需要重新計算整個 RDD。
當使用savaAsTextFile做保存操作時,最終生成的文件個數通常和RDD的分區數一致。
object PartitionExample {
??def main(args: Array[String]): Unit = {
????// 創建 SparkConf 對象,設置應用程序名稱和運行模式
????val conf = new SparkConf().setAppName("PartitionExample").setMaster("local")
????// 使用 SparkConf 創建 SparkContext 對象
????val sc = new SparkContext(conf)
????// 創建一個包含 10 個元素的 Seq
????val data = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
????// 使用 parallelize 方法創建 RDD,并設置分區數為 3
????val rdd = sc.parallelize(data, 3)
????// 將 RDD 保存為文本文件,保存路徑為 "output"
????rdd.saveAsTextFile("output")
????// 停止 SparkContext,釋放資源
????sc.stop()
??}
} ?
在運行代碼后,output 目錄下會生成與 RDD 分區數量相同的文本文件,這里 RDD 分區數設置為 3,所以會生成 3 個文件,文件名通常為 part-00000、part- 00001、part-00002 。
- (四)分區器和默認分區器
分區器是 Spark 中用于決定 RDD 數據如何在不同分區之間進行分布的組件。通過定義分區規則,它能夠將具有鍵值對類型的數據(PairRDD)按照一定策略劃分到不同分區,以實現數據的合理分布,進而提高并行計算的效率。
在大多數涉及鍵值對的轉換操作中,Spark 默認使用 HashPartitioner。例如,reduceByKey、groupByKey 等操作,如果沒有顯式指定分區器,就會使用 HashPartitioner。
HashPartitioner 根據鍵的哈希值來決定數據應該被分配到哪個分區。具體來說,它會對鍵的哈希值取模,模的結果就是分區的編號。假設分區數為 n,鍵為 key,則分區編號的計算公式為 hash(key) % n。
對于鍵值對 RDD,HashPartitioner 是大多數轉換操作的默認分區器,而 RangePartitioner 是 sortByKey 操作的默認分區器。你也可以根據具體需求顯式指定分區器來控制數據的分區方式。
- (五)為什么需要自定義分區
?
數據傾斜:當數據分布不均勻,某些分區數據量過大,導致計算負載不均衡時,可自定義分區器,按照特定規則重新分配數據,避免數據傾斜影響計算性能。比如電商訂單數據中,按地區統計銷售額,若某些熱門地區訂單數遠多于其他地區,使用默認分區器會使部分任務計算量過大。通過自定義分區器,可將熱門地區進一步細分,讓各分區數據量更均衡。
特定業務邏輯:若業務對數據分區有特殊要求,如按時間段將日志數據分區,不同時間段的數據存到不同分區便于后續處理分析;或在社交網絡數據中,按用戶關系緊密程度分區等,都需自定義分區器實現。
(六)自定義分區器的實現步驟
自定義分區器需要:繼承Partitioner抽象類 + 實現其中的兩個方法。
- numPartitions :返回分區的數量,即整個 RDD 將被劃分成多少個分區 。
- getPartition(key: Any) :接收一個鍵值key(對于非鍵值對類型 RDD,可根據數據特征構造合適的鍵 ),根據自定義邏輯返回該鍵值對應的分區索引(從 0 開始,取值范圍為 0 到numPartitions - 1 ) 。
(七)案例
假設要對 NBA 球隊比賽信息進行分區存儲,要求將湖人、火箭兩隊信息單獨存儲,其余球隊信息存放在一個分區。
("勇士", "info1"),
("掘金", "info2"),
("湖人", "info3"),
("火箭", "info4")
示例代碼如下:
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object CustomPartitionerExample {
??def main(args: Array[String]): Unit = {
????val conf = new SparkConf().setAppName("CustomPartitionerExample").setMaster("local[*]")
????val sc = new SparkContext(conf)
????// 準備數據集,數據為(球隊名稱, 相關信息)形式的鍵值對
????val rdd = sc.parallelize(List(
??????("勇士", "info1"),
??????("掘金", "info2"),
??????("湖人", "info3"),
??????("火箭", "info4")
????))
????// 使用自定義分區器對RDD進行分區
????val partitionedRDD = rdd.partitionBy(new MyPartitioner)
????partitionedRDD.saveAsTextFile("output")
????sc.stop()
??}
}
// 自定義分區器類
class MyPartitioner extends Partitioner {
??// 定義分區數量為3
??override def numPartitions: Int = 3
??// 根據球隊名稱(鍵值)確定分區索引
??override def getPartition(key: Any): Int = {
????key match {
??????case "湖人" => 0
??????case "火箭" => 1
??????case _ => 2
????}
??}
}
核心代碼解釋:
- MyPartitioner類繼承自Partitioner,實現了numPartitions方法指定分區數量為 3 ,實現getPartition方法,根據球隊名稱判斷分區索引,湖人對應分區 0,火箭對應分區 1,其他球隊對應分區 2 。
2.在main方法中,創建包含球隊信息的 RDD,然后調用partitionBy方法并傳入自定義分區器MyPartitioner,對 RDD 進行分區,最后將分區后的數據保存到指定路徑。
?