Spark03-RDD02-常用的Action算子

一、常用的Action算子

1-1、countByKey算子

作用:統計key出現的次數,一般適用于K-V型的RDD。

【注意】:

1、collect()是RDD的算子,此時的Action算子,沒有生成新的RDD,所以,沒有collect()!!!

2、Action算子,返回值不再是RDD,而是字典!

示例:

1-2、collect算子

1-3、reduce算子

示例:

返回結果:15

回顧:reduceByKey的邏輯:Spark03-RDD01-簡介+常用的Transformation算子-CSDN博客

1-4、fold算子

1-5、first算子

示例:

1-6、take算子

功能:獲取RDD的前N個元素組合成list返回給你。

示例:

1-7、top算子

功能:對RDD數據集,先降序,再取前N個。相當于:取最大的前N個數字,返回類型:list

【注意】:item之間的比較,可以自定義比較函數。

1-8、count算子

計算RDD有多少條數據,返回的是一個數字!

1-9、takeSample算子

1. 作用

takeSample 用于從 RDD 中隨機抽取一定數量的元素返回的是一個 Python list(而不是 RDD)。

它常用于數據探索,比如從一個很大的分布式數據集中 隨機取樣 看看大概長什么樣。


2. 函數簽名

RDD.takeSample(withReplacement, num, seed=None)
  • withReplacement: True/False

    • True有放回抽樣(同一個元素可能被多次抽到)

    • False:無放回抽樣(每個元素最多出現一次)

  • num: int

    • 需要抽取的樣本數量

  • seed: int可選

    • 隨機數種子。指定后每次結果一致;不指定時每次運行結果可能不同


3. 返回值

返回的是一個 list,包含抽到的樣本。

?? 注意:不會返回一個 RDD,而是直接把樣本收集到 driver 程序

????????

PySpark 的 takeSample 里,如果是無放回抽樣 (withReplacement=False),且你請求的樣本數量 大于 RDD 總數,即:?num > RDD.count(),結果會直接返回 整個 RDD,不會報錯。


4. 示例代碼

from pyspark import SparkContextsc = SparkContext("local", "TakeSampleExample")data = sc.parallelize(range(1, 101))  # RDD: 1 ~ 100# 無放回抽樣,取 10 個
sample1 = data.takeSample(False, 10)
print("無放回抽樣:", sample1)# 有放回抽樣,取 10 個
sample2 = data.takeSample(True, 10)
print("有放回抽樣:", sample2)# 固定隨機種子
sample3 = data.takeSample(False, 10, seed=42)
print("固定種子:", sample3)

運行可能結果:

無放回抽樣: [57, 3, 85, 21, 92, 38, 44, 71, 5, 66]
有放回抽樣: [21, 21, 45, 72, 3, 98, 45, 12, 7, 7]
固定種子: [63, 2, 73, 82, 23, 18, 47, 74, 96, 94]

5. 特點 & 注意點

  1. 返回 Python list,所以抽樣結果會被拉回 driver 內存。

    • 不適合 num 特別大(比如幾百萬),會導致 driver 內存爆炸

  2. sample 不同

    • sample(withReplacement, fraction, seed)返回 RDD(按比例抽樣)

    • takeSample(withReplacement, num, seed) → 返回 list(按數量抽樣)

    總結:

    • 想要 指定比例抽樣 → 用 sample

    • 想要 指定數量抽樣 → 用 takeSample


6. 使用場景

  • 調試 / 探索:比如 RDD 太大,不可能直接 collect(),就可以 takeSample(False, 20) 隨機取 20 個元素看一眼。

  • 機器學習抽樣:從數據集中隨機取一部分作為訓練集 / 測試集。

  • 模擬實驗:需要隨機數據時快速取一批樣本。

1-10、takeOrder算子


1. 作用

takeOrdered 用于 從 RDD 中取出前 n 個元素返回的是一個 Python list

  • 默認情況下,按 升序 排序后取前 n 個;(最小的前n個)

  • 也可以通過 key 參數指定排序規則。


2. 函數簽名

RDD.takeOrdered(num, key=None)
  • num: int
    要取的元素個數。

  • key: function(可選)
    用來指定排序方式。

    • 不指定 → 默認升序

    • 指定 lambda x: -x → 可以變成降序


3. 返回值

返回一個 list,長度最多是 num,包含排序后的前 n 個元素。

(?? 和 takeSample 一樣,也會把結果拉回到 driver


4. 示例代碼

from pyspark import SparkContext
sc = SparkContext("local", "TakeOrderedExample")data = sc.parallelize([5, 1, 8, 3, 2, 10, 6])# 取前 3 個最小的元素(默認升序)
result1 = data.takeOrdered(3)
print("最小的3個:", result1)# 取前 3 個最大的元素(用 key 參數)
result2 = data.takeOrdered(3, key=lambda x: -x)
print("最大的3個:", result2)# 按元素的平方排序,取前 3 個
result3 = data.takeOrdered(3, key=lambda x: x*x)
print("平方最小的3個:", result3)

可能輸出:

最小的3個: [1, 2, 3]
最大的3個: [10, 8, 6]
平方最小的3個: [1, 2, 3]

5. 特點 & 注意點

  1. 返回 Python list,結果會直接拉到 driver。

    • 如果 num 很大,可能導致內存壓力。

  2. 和其他算子的區別

    • top(n):返回最大的 n 個元素,默認降序。(只能是降序

    • takeOrdered(n):返回最小的 n 個元素,默認升序。

    • sortBy(key, ascending, numPartitions):返回排序后的 RDD,比 takeOrdered 重得多,因為它要分布式全排序

    總結:

    • 只想取 前 n 個 → 用 takeOrderedtop(高效)

    • 想要 全局排序 → 用 sortBy(代價更大)


6. 使用場景

  • Top-N 或 Bottom-N 樣本,比如成績前 10 名、銷售額最高的 5 個商品。

  • 數據探索時快速查看極值(最小/最大值)。

  • 機器學習前的數據預處理,比如截取一部分樣本。


1-11、foreach算子

1. 作用

foreach 用于對 RDD 的每個元素執行一個指定的函數(function),但 不會返回任何結果

它的典型用途是:

  • 在每個分區的 worker 節點上,對數據做副作用操作,比如寫數據庫、寫文件、更新計數器。


2. 函數簽名

RDD.foreach(f)
  • f: 一個函數,接收 RDD 的元素作為輸入,對它進行處理。


3. 特點

  1. 沒有返回值

    • foreach 的返回值是 None,所以你不能像 map 那樣拿到新 RDD。

    • 它是一個 Action 算子,會觸發真正的執行。

  2. 副作用在 Executor 端發生

    • 函數 f 會在集群各個節點(Executor)上執行,而不是在 driver 上。

    • 所以你在 fprint日志會打印到 Executor 的日志里,而不是 driver 的控制臺。

    • 如果你要在 driver 上調試看數據,可以用 collect()

  3. 常用場景

    • 寫數據庫:foreach(lambda x: save_to_mysql(x))

    • 寫文件系統:foreach(lambda x: write_to_hdfs(x))

    • 更新外部存儲:foreach(lambda x: redis_client.set(x[0], x[1]))


4. 示例

from pyspark import SparkContextsc = SparkContext("local", "ForeachExample")data = sc.parallelize([1, 2, 3, 4, 5])def process(x):print(f"處理元素: {x}")# foreach 對每個元素執行 process
data.foreach(process)

?? 注意:

  • 在本地模式(local)下,你可能能在控制臺看到輸出。

  • 集群模式(YARN、Standalone、Mesos),打印信息會在 Executor 日志,driver 控制臺一般看不到。


5. foreach 和 foreachPartition 的區別

  • foreach(f)
    → 每個元素都執行一次 f

  • foreachPartition(f)
    → 每個 分區 執行一次 ff 的輸入是該分區的迭代器。

一般寫數據庫、寫外部存儲時推薦 foreachPartition,這樣可以:

  • 避免頻繁建立連接(每個分區建立一次連接,而不是每條記錄都建立)。

  • 提高性能。


6. 對比 map

算子是否返回新 RDD是否觸發 Action典型用途
map? 是? 否數據轉換
foreach? 否? 是副作用操作(寫庫/打印/發送消息)

1-12、saveAsTextFile算子


1. 基本功能

saveAsTextFile(path)Action算子(觸發計算的算子),用于RDD 的內容 保存到 HDFS、本地文件系統或其他兼容 Hadoop 的文件系統,存儲格式是 文本文件

  • 每個元素會被轉換為一行字符串(調用 str() 方法)

  • 最終生成的結果是 一個目錄,而不是單個文件

  • 目錄中包含多個分區文件(如 part-00000part-00001 …),每個文件對應 RDD 的一個分區


2. 使用方法

# 假設已有一個RDD
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=2)# 保存為文本文件
rdd.saveAsTextFile("output_rdd")

結果目錄結構

output_rdd/├── part-00000├── part-00001└── _SUCCESS
  • part-00000part-00001:存儲 RDD 每個分區的數據

  • _SUCCESS:一個空文件,表示任務成功結束


3. 關鍵注意事項

  1. 路徑必須不存在
    Spark 默認不允許寫入已存在的目錄,否則會報錯:

    org.apache.hadoop.mapred.FileAlreadyExistsException
    

    解決辦法:先刪除舊目錄,再保存。

    import shutil
    shutil.rmtree("output_rdd", ignore_errors=True)
    rdd.saveAsTextFile("output_rdd")
    
  2. 輸出是多個文件
    如果需要單個文件,可以在保存前 合并分區

    rdd.coalesce(1).saveAsTextFile("output_single_file")
    

    輸出目錄下只會有一個 part-00000

  • coalesce(1) 會把 RDD 的所有數據壓縮到 一個分區。“創建一個新的目標分區,然后把數據往里壓”。
  • 保存時 Spark 會根據分區數寫出文件,因此只會生成 一個 part-00000 文件

如果是要交付給外部系統(比如 CSV 文件要交給別人用),那通常會 coalesce(1)

? ? ? ? 3. 數據類型要求

????????示例:

kv_rdd = sc.parallelize([("a", 1), ("b", 2)], 2)
kv_rdd.saveAsTextFile("output_kv")
# 文件內容大概是:
# ('a', 1)  part-00000
# ('b', 2)  part-00001
  • saveAsTextFile 默認調用 str() 轉換元素

  • 如果是 (key, value) 形式的 RDD,輸出會是 (key, value) 的字符串


4. 典型應用場景

  • 保存日志處理結果到 HDFS

  • 將 RDD 轉換為文本存儲,供下游任務(Hive、Spark SQL)使用

  • saveAsSequenceFilesaveAsObjectFile 對比,用于不同場景的持久化存儲


【小結】:

  • foreach
  • saveAsTestFile

這兩個算子是分區(Excutor)直接執行的,跳過Driver,由所在的分區(Excutor)直接執行,性能比較好!

其余的Action算子都會將結果發送至Driver

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/93819.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/93819.shtml
英文地址,請注明出處:http://en.pswp.cn/web/93819.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

[Android] 顯示的內容被導航欄這擋住

上圖中彈出的對話框的按鈕“Cancel/Save”被導航欄遮擋了部分顯示&#xff0c;影響了使用。Root cause: Android 應用的主題是 Theme.AppCompat.Light1. 修改 AndroidManifest.xml 將 application 標簽的 android:theme 屬性指向新的自定義主題&#xff1a;<applicationandr…

分貝單位全指南:從 dB 到 dBm、dBc

引言在射頻、音頻和通信工程中&#xff0c;我們經常會在示波器、頻譜儀或測試報告里看到各種各樣的dB單位&#xff0c;比如 dBm、dBc、dBV、dBFS 等。它們看起來都帶個 dB&#xff0c;實則各有不同的定義和參考基準&#xff1a;有的表示相對功率&#xff0c;有的表示電壓電平&a…

怎么確定mysql 鏈接成功了呢?

asyncio.run(test_connection()) ? Connection failed: cryptography package is required for sha256_password or caching_sha2_password auth methods 根據你提供的錯誤信息,問題出現在 MySQL 的認證插件和加密連接配置上。以下是幾種解決方法: 1. 安裝 cryptography 包…

(5)軟件包管理器 yum | Vim 編輯器 | Vim 文本批量化操作 | 配置 Vim

Ⅰ . Linux 軟件包管理器 yum01 安裝軟件在 Linux 下安裝軟件并不像 Windows 下那么方便&#xff0c;最通常的方式是去下載程序的源代碼并進行編譯&#xff0c;從而得到可執行程序。正是因為太麻煩&#xff0c;所以有些人就把一些常用的軟件提前編譯好并做成軟件包&#xff0c;…

VGG改進(3):基于Cross Attention的VGG16增強方案

第一部分&#xff1a;交叉注意力機制解析1.1 注意力機制基礎注意力機制的核心思想是模擬人類的選擇性注意力——在處理信息時&#xff0c;對重要部分分配更多"注意力"。在神經網絡中&#xff0c;這意味著模型可以學習動態地加權輸入的不同部分。傳統的自注意力(Self-…

代理ip平臺哪家好?專業代理IP服務商測評排行推薦

隨著互聯網的深度發展&#xff0c;通過網絡來獲取全球化的信息資源&#xff0c;已成為企業與機構在競爭中保持優勢的一大舉措。但想要獲取其他地區的信息&#xff0c;可能需要我們通過代理IP來實現。代理IP平臺哪家好&#xff1f;下文就讓我們從IP池資源與技術優勢等細節&#…

PWA》》以京東為例安裝到PC端

如果訪問 瀏覽器右側出現 安裝 或 點擊這個 也可以完成安裝桌面 會出現 如下圖標

Linux系統:C語言進程間通信信號(Signal)

1. 引言&#xff1a;從"中斷"到"信號"想象一下&#xff0c;你正在書房專心致志地寫代碼&#xff0c;這時廚房的水燒開了&#xff0c;鳴笛聲大作。你會怎么做&#xff1f;你會暫停&#xff08;Interrupt&#xff09; 手頭的工作&#xff0c;跑去廚房關掉燒水…

LoRa 網關組網方案(二)

LoRa 網關組網方案 現有需求&#xff1a;網關每6秒接收不同節點的數據&#xff0c;使用SX1262芯片。 以下是完整的組網方案&#xff1a;1. 網絡架構設計 采用星型拓撲&#xff1a; 網關&#xff1a;作為中心節點&#xff0c;持續監聽多個信道節點&#xff1a;分布在網關周圍&am…

服裝外貿系統軟件怎么用才高效防風險?

服裝外貿系統軟件概述 服裝外貿系統軟件&#xff0c;如“艾格文ERP”&#xff0c;是現代外貿企業不可或缺的管理工具。它整合了訂單處理、庫存管理、客戶資源保護、財務控制等多功能模塊&#xff0c;旨在全面提升業務運營效率。通過系統化的管理方式&#xff0c;艾格文ERP能夠從…

【沉浸式解決問題】peewee.ImproperlyConfigured: MySQL driver not installed!

目錄一、問題描述二、原因分析三、解決方案? 推薦&#xff1a;安裝 pymysql&#xff08;純 Python&#xff0c;跨平臺&#xff0c;安裝簡單&#xff09;? 可選&#xff1a;安裝 mysqlclient&#xff08;更快&#xff0c;但需要本地編譯環境&#xff09;? 總結四、mysql-conn…

C++進階-----C++11

作者前言 &#x1f382; ??????&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ?&#x1f382; 作者介紹&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

(論文速讀)航空軸承剩余壽命預測:多生成器GAN與CBAM融合的創新方法

論文題目&#xff1a;Remaining Useful Life Prediction Approach for Aviation Bearings Based on Multigenerator Generative Adversarial Network and CBAM&#xff08;基于多發生器生成對抗網絡和CBAM的航空軸承剩余使用壽命預測方法&#xff09;期刊&#xff1a;IEEE TRAN…

3ds Max 流體模擬終極指南:從創建到渲染,打造真實液體效果

流體模擬是提升 3D 場景真實感的重要技術之一。無論是模擬飛瀑流泉、杯中溢出的飲料&#xff0c;還是黏稠的蜂蜜或熔巖&#xff0c;熟練掌握流體動力學無疑能為你的作品增色不少。本文將以 3ds Max 為例&#xff0c;系統講解流體模擬的創建流程與渲染方法&#xff0c;幫助你實現…

《算法導論》第 35 章-近似算法

大家好&#xff01;今天我們深入拆解《算法導論》第 35 章 ——近似算法。對于 NP 難問題&#xff08;如旅行商、集合覆蓋&#xff09;&#xff0c;精確算法在大規模數據下往往 “力不從心”&#xff0c;而近似算法能在多項式時間內給出 “足夠好” 的解&#xff08;有嚴格的近…

系統架構設計師-操作系統-避免死鎖最小資源數原理模擬題

寫在前面&#xff1a;銀行家算法的核心目標是確保系統始終處于“安全狀態”。一、5個進程各需2個資源&#xff0c;至少多少資源避免死鎖&#xff1f; 解題思路 根據死鎖避免的資源分配公式&#xff0c;不發生死鎖的最少資源數為&#xff1a; 最少資源數k(n?1)1 \text{最少資源…

Preprocessing Model in MPC 2 - 背景、基礎原語和Beaver三元組

參考論文&#xff1a;SoK: Multiparty Computation in the Preprocessing Model MPC (Secure Multi-Party Computation) 博士生入門資料。抄襲必究。 本系列教程將逐字解讀參考論文(以下簡稱MPCiPPM)&#xff0c;在此過程中&#xff0c;將論文中涵蓋的40篇參考文獻進行梳理與講…

ACCESS/SQL SERVER保存軟件版本號為整數類型,轉成字符串

在 Access 中&#xff0c;若已將版本號&#xff08;如1.3.15&#xff09;轉換為整數形式&#xff08;如10315&#xff0c;即1*10000 3*100 15&#xff09;&#xff0c;可以通過 SQL 的數學運算反向解析出原始版本號格式&#xff08;主版本.次版本.修訂號&#xff09;。實現思…

編程語言學習

精通 Java、Scala、Python、Go、Rust、JavaScript ? 1. Java 面向對象編程&#xff08;OOP&#xff09;、異常處理、泛型JVM 原理、內存模型&#xff08;JMM&#xff09;、垃圾回收&#xff08;GC&#xff09;多線程與并發&#xff08;java.util.concurrent&#xff09;Java 8…

軟件測試:如何利用Burp Suite進行高效WEB安全測試

Burp Suite 被廣泛視為 Web 應用安全測試領域的行業標準工具集。要發揮其最大效能&#xff0c;遠非簡單啟動掃描即可&#xff0c;而是依賴于測試者對其模塊化功能的深入理解、有機組合及策略性運用。一次高效的測試流程&#xff0c;始于精細的環境配置與清晰的測試邏輯。測試初…