Spark(29)基礎自定義分區器

什么是分區

【復習提問:RDD的定義是什么?】

在 Spark 里,彈性分布式數據集(RDD)是核心的數據抽象,它是不可變的、可分區的、里面的元素并行計算的集合。

在 Spark 中,分區是指將數據集按照一定的規則劃分成多個較小的子集,每個子集可以獨立地在不同的計算節點上進行處理,這樣可以實現數據的并行處理,提高計算效率。

可以將 Spark 中的分區類比為快遞公司處理包裹的過程。假設你有一批包裹要從一個城市發送到另一個城市,快遞公司會將這些包裹按照一定的規則進行分區,比如按照收件地址的區域劃分。每個分區的包裹會被分配到不同的快遞員或運輸車輛上進行運輸,這些快遞員或車輛可以同時出發,并行地將包裹送到不同的區域。這就類似于 Spark 中的分區,每個分區的數據可以在不同的計算節點上同時進行處理,從而加快整個數據處理的速度。

默認分區的情況

  1. 從集合創建 RDD(使用 parallelize 方法)

當使用 parallelize 方法從一個集合創建 RDD 時,默認分區數通常取決于集群的配置。

在本地模式下,默認分區數等于本地機器的 CPU 核心數;在集群模式下,默認分區數由 spark.default.parallelism 配置項決定。

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("DefaultPartitionExample").setMaster("local")

val sc = new SparkContext(conf)

val data = Seq(1, 2, 3, 4, 5)

val rdd = sc.parallelize(data)

println(s"默認分區數: ${rdd.partitions.length}")

sc.stop()

2.從外部存儲(如文件)創建 RDD(使用 textFile 方法)

當使用 textFile 方法從外部存儲(如 HDFS、本地文件系統等)讀取文件創建 RDD 時,默認分區數通常由文件的塊大小決定。對于 HDFS 文件,默認分區數等于文件的塊數。例如,一個 128MB 的文件在 HDFS 上被分成 2 個 64MB 的塊,那么創建的 RDD 默認分區數就是 2。

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("DefaultPartitionFileExample").setMaster("local")

val sc = new SparkContext(conf)

// 假設文件存在于本地

val rdd = sc.textFile("path/to/your/file.txt")

println(s"默認分區數: ${rdd.partitions.length}")

sc.stop()

【現場演示,如果文件是一個.gz文件,是一個不可拆分的文件,那么默認分區的數量就會是1】

(三)分區的作用

在 Spark 中,RDD 是數據的集合,它會被劃分成多個分區,這些分區可以分布在不同的計算節點上,就像圖書館的書架分布在不同的房間一樣。

這樣做的好處是什么呢?

并行計算:Spark 能夠同時對多個分區的數據進行處理,充分利用集群的計算資源,進而加快作業的執行速度。例如,若一個 RDD 有 10 個分區,且集群有足夠的計算資源,Spark 就可以同時處理這 10 個分區的數據。

數據局部性:分區有助于實現數據局部性,也就是讓計算盡量在數據所在的節點上進行,減少數據在網絡間的傳輸,從而降低網絡開銷。

容錯性:當某個分區的數據處理失敗時,Spark 能夠重新計算該分區,而不需要重新計算整個 RDD。

當使用savaAsTextFile做保存操作時,最終生成的文件個數通常和RDD的分區數一致。

object PartitionExample {

??def main(args: Array[String]): Unit = {

????// 創建 SparkConf 對象,設置應用程序名稱和運行模式

????val conf = new SparkConf().setAppName("PartitionExample").setMaster("local")

????// 使用 SparkConf 創建 SparkContext 對象

????val sc = new SparkContext(conf)

????// 創建一個包含 10 個元素的 Seq

????val data = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

????// 使用 parallelize 方法創建 RDD,并設置分區數為 3

????val rdd = sc.parallelize(data, 3)

????// 將 RDD 保存為文本文件,保存路徑為 "output"

????rdd.saveAsTextFile("output")

????// 停止 SparkContext,釋放資源

????sc.stop()

??}

} ?

在運行代碼后,output 目錄下會生成與 RDD 分區數量相同的文本文件,這里 RDD 分區數設置為 3,所以會生成 3 個文件,文件名通常為 part-00000、part- 00001、part-00002 。

  • (四)分區器和默認分區器

分區器是 Spark 中用于決定 RDD 數據如何在不同分區之間進行分布的組件。通過定義分區規則,它能夠將具有鍵值對類型的數據(PairRDD)按照一定策略劃分到不同分區,以實現數據的合理分布,進而提高并行計算的效率。

在大多數涉及鍵值對的轉換操作中,Spark 默認使用 HashPartitioner。例如,reduceByKey、groupByKey 等操作,如果沒有顯式指定分區器,就會使用 HashPartitioner

HashPartitioner 根據鍵的哈希值來決定數據應該被分配到哪個分區。具體來說,它會對鍵的哈希值取模,模的結果就是分區的編號。假設分區數為 n,鍵為 key,則分區編號的計算公式為 hash(key) % n。

對于鍵值對 RDD,HashPartitioner 是大多數轉換操作的默認分區器,而 RangePartitioner 是 sortByKey 操作的默認分區器。你也可以根據具體需求顯式指定分區器來控制數據的分區方式。

  • (五)為什么需要自定義分區

?

數據傾斜:當數據分布不均勻,某些分區數據量過大,導致計算負載不均衡時,可自定義分區器,按照特定規則重新分配數據,避免數據傾斜影響計算性能。比如電商訂單數據中,按地區統計銷售額,若某些熱門地區訂單數遠多于其他地區,使用默認分區器會使部分任務計算量過大。通過自定義分區器,可將熱門地區進一步細分,讓各分區數據量更均衡。

特定業務邏輯:若業務對數據分區有特殊要求,如按時間段將日志數據分區,不同時間段的數據存到不同分區便于后續處理分析;或在社交網絡數據中,按用戶關系緊密程度分區等,都需自定義分區器實現。

自定義分區器的實現步驟

自定義分區器需要:繼承Partitioner抽象類 + 實現其中的兩個方法。

  1. numPartitions :返回分區的數量,即整個 RDD 將被劃分成多少個分區 。
  2. getPartition(key: Any) :接收一個鍵值key(對于非鍵值對類型 RDD,可根據數據特征構造合適的鍵 ),根據自定義邏輯返回該鍵值對應的分區索引(從 0 開始,取值范圍為 0 到numPartitions - 1 ) 。
案例

假設要對 NBA 球隊比賽信息進行分區存儲,要求將湖人、火箭兩隊信息單獨存儲,其余球隊信息存放在一個分區。

("勇士", "info1"),
("掘金", "info2"),

("湖人", "info3"),

("火箭", "info4")

示例代碼如下:

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object CustomPartitionerExample {

??def main(args: Array[String]): Unit = {

????val conf = new SparkConf().setAppName("CustomPartitionerExample").setMaster("local[*]")

????val sc = new SparkContext(conf)

????// 準備數據集,數據為(球隊名稱, 相關信息)形式的鍵值對

????val rdd = sc.parallelize(List(

??????("勇士", "info1"),

??????("掘金", "info2"),

??????("湖人", "info3"),

??????("火箭", "info4")

????))

????// 使用自定義分區器對RDD進行分區

????val partitionedRDD = rdd.partitionBy(new MyPartitioner)

????partitionedRDD.saveAsTextFile("output")

????sc.stop()

??}

}
// 自定義分區器類

class MyPartitioner extends Partitioner {

??// 定義分區數量為3

??override def numPartitions: Int = 3

??// 根據球隊名稱(鍵值)確定分區索引

??override def getPartition(key: Any): Int = {

????key match {

??????case "湖人" => 0

??????case "火箭" => 1
??????case _ => 2

????}

??}

}

核心代碼解釋:

  1. MyPartitioner類繼承自Partitioner,實現了numPartitions方法指定分區數量為 3 ,實現getPartition方法,根據球隊名稱判斷分區索引,湖人對應分區 0,火箭對應分區 1,其他球隊對應分區 2 。

2.在main方法中,創建包含球隊信息的 RDD,然后調用partitionBy方法并傳入自定義分區器MyPartitioner,對 RDD 進行分區,最后將分區后的數據保存到指定路徑。

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/906805.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/906805.shtml
英文地址,請注明出處:http://en.pswp.cn/news/906805.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python打卡訓練營打卡記錄day35

知識點回顧: 三種不同的模型可視化方法:推薦torchinfo打印summary權重分布可視化進度條功能:手動和自動寫法,讓打印結果更加美觀推理的寫法:評估模式 作業:調整模型定義時的超參數,對比下效果 1…

【MySQL】07.表內容的操作

1. insert 我們先創建一個表結構,這部分操作我們使用這張表完成我們的操作: mysql> create table student(-> id int primary key auto_increment,-> name varchar(20) not null,-> qq varchar(20) unique-> ); Query OK, 0 rows affec…

使用SQLite Expert個人版VACUUM功能修復數據庫

使用SQLite Expert個人版VACUUM功能修復數據庫 一、SQLite Expert工具簡介 SQLite Expert 是一款功能強大的SQLite數據庫管理工具,分為免費的個人版(Personal Edition)和收費的專業版(Professional Edition)。其核心功…

LM-BFF——語言模型微調新范式

gpt3(GPT3——少樣本示例推動下的通用語言模型雛形)結合提示詞和少樣本示例后,展示出了強大性能。但大語言模型的訓練門檻太高,普通研究人員無力,LM-BFF(Making Pre-trained Language Models Better Few-shot Learners)的作者受gp…

遙感解譯項目Land-Cover-Semantic-Segmentation-PyTorch之二訓練模型

遙感解譯項目Land-Cover-Semantic-Segmentation-PyTorch之一推理模型 背景 上一篇文章了解了這個項目的環境安裝和模型推理,這篇文章介紹下如何訓練這個模型,添加類別 下載數據集 在之前的一篇文章中,也有用到這個數據集 QGIS之三十六Deepness插件實現AI遙感訓練模型 數…

【NLP 71、常見大模型的模型結構對比】

三到五年的深耕,足夠讓你成為一個你想成為的人 —— 25.5.8 模型名稱位置編碼Transformer結構多頭機制Feed Forward層設計歸一化層設計線性層偏置項激活函數訓練數據規模及來源參數量應用場景側重GPT-5 (OpenAI)RoPE動態相對編碼混合專家架構(MoE&#…

[250521] DBeaver 25.0.5 發布:SQL 編輯器、導航器全面升級,新增 Kingbase 支持!

目錄 DBeaver 25.0.5 發布:SQL 編輯器、導航器全面升級,新增 Kingbase 支持! DBeaver 25.0.5 發布:SQL 編輯器、導航器全面升級,新增 Kingbase 支持! 近日,DBeaver 發布了 25.0.5 版本&#xf…

服務器硬盤虛擬卷的處理

目前的情況是需要刪除邏輯卷,然后再重新來弄一遍。 數據已經備份好了,所以不用擔心數據會丟失。 查看服務器的具體情況 使用 vgdisplay 操作查看服務器的卷組情況: --- Volume group ---VG Name vg01System IDFormat …

Flutter 中 build 方法為何寫在 StatefulWidget 的 State 類中

Flutter 中 build 方法為何寫在 StatefulWidget 的 State 類中 在 Flutter 中,build 方法被設計在 StatefulWidget 的 State 類中而非 StatefulWidget 類本身,這種設計基于幾個重要的架構原則和實際考量: 1. 核心設計原因 1.1 生命周期管理…

傳統醫療系統文檔集中標準化存儲和AI智能化更新路徑分析

引言 隨著醫療數智化建設的深入推進,傳統醫療系統如醫院信息系統(HIS)、臨床信息系統(CIS)、護理信息系統(NIS)、影像歸檔與通信系統(PACS)和實驗室信息系統(LIS)已經成為了現代醫療機構不可或缺的技術基礎設施。這些系統各自承擔著不同的功能,共同支撐…

探索常識性概念圖譜:構建智能生活的知識橋梁

目錄 一、知識圖譜背景介紹 (一)基本背景 (二)與NLP的關系 (三)常識性概念圖譜的引入對比 二、常識性概念圖譜介紹 (一)常識性概念圖譜關系圖示例 (二&#xff09…

Linux/aarch64架構下安裝Python的Orekit開發環境

1.背景 國產化趨勢越來越強,從軟件到硬件,從操作系統到CPU,甚至顯卡,就產生了在國產ARM CPU和Kylin系統下部署Orekit的需求,且之前的開發是基于Python的,需要做適配。 2.X86架構下安裝Python/Orekit開發環…

Ctrl+鼠標滾動阻止頁面放大/縮小

項目場景: 提示:這里簡述項目相關背景: 一般在我們做大屏的時候,不希望Ctrl鼠標上下滾動的時候頁面會放大/縮小,那么在有時候,又不希望影響到別的頁面,比如說這個大屏是在另一個管理后臺中&am…

MySQL——復合查詢表的內外連

目錄 復合查詢 回顧基本查詢 多表查詢 自連接 子查詢 where 字句中使用子查詢 單行子查詢 多行子查詢 多列子查詢 from 字句中使用子查詢 合并查詢 實戰OJ 查找所有員工入職時候的薪水情況 獲取所有非manager的員工emp_no 獲取所有員工當前的manager 表的內外…

聊一下CSS中的標準流,浮動流,文本流,文檔流

在網絡上關于CSS的文章中,有時候能聽到“標準流”,“浮動流”,“定位流”等等詞語,還有像“文檔流”,“文本流”等詞,這些流是什么意思?它們是CSS中的一些布局方案和特性。今天我們就來聊一下CS…

python訓練營第33天

MLP神經網絡的訓練 知識點回顧: PyTorch和cuda的安裝查看顯卡信息的命令行命令(cmd中使用)cuda的檢查簡單神經網絡的流程 數據預處理(歸一化、轉換成張量)模型的定義 繼承nn.Module類定義每一個層定義前向傳播流程 定義…

JDK21深度解密 Day 1:JDK21全景圖:關鍵特性與升級價值

【JDK21深度解密 Day 1】JDK21全景圖:關鍵特性與升級價值 引言 歡迎來到《JDK21深度解密:從新特性到生產實踐的全棧指南》系列的第一天。今天我們將探討JDK21的關鍵特性和升級價值。作為近5年最重要的LTS版本,JDK21不僅帶來了性能上的巨大突…

[docker]更新容器中鏡像版本

從peccore-dev倉庫拉取鏡像 docker pull 10.12.135.238:8060/peccore-dev/configserver:v1.13.45如果報錯,請參考docker拉取鏡像失敗,添加倉庫地址 修改/etc/CET/Common/peccore-docker-compose.yml文件中容器的版本,為剛剛拉取的版本 # 配置中心confi…

LVS原理詳解及LVS負載均衡工作模式

什么是虛擬服務器(LVS) 虛擬服務器是高度可擴展且高度可用的服務器 構建在真實服務器集群上。服務器集群的架構 對最終用戶完全透明,并且用戶與 cluster 系統,就好像它只是一個高性能的虛擬 服務器。請考慮下圖。 真實服務器和負…

上位機知識篇---keil IDE操作

文章目錄 前言文件操作按鍵新建打開保存保存所有編輯操作按鍵撤銷恢復復制粘貼剪切全選查找書簽操作按鍵添加書簽跳轉到上一個書簽跳轉到下一個書簽清空所有書簽編譯操作按鍵編譯當前文件構建目標文件重新構建調試操作按鍵進入調試模式復位全速運行停止運行單步調試逐行調試跳出…