Spark執行計劃與UI分析

文章目錄

  • 1.Spark任務階段劃分
    • 1.1 job,stage與task
    • 1.2 job劃分
    • 1.3 stage和task劃分
  • 2.任務執行時機
  • 3.task內部數據存儲與流動
  • 4.根據sparkUI了解Spark執行計劃
    • 4.1查看job和stage
    • 4.2 查看DAG圖
    • 4.3查看task

1.Spark任務階段劃分

1.1 job,stage與task

  • 首先根據action()操作順序將應用劃分為作業job。
  • 根據每個job的邏輯處理流程中的ShuffleDependency依賴關系,將job劃分為執行階段stage。
  • 在每個stage中,根據最后生成的RDD的分區個數生成多個計算任務task。

1.2 job劃分

舉一個簡單的例子,在下面這段代碼中:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count, col# 初始化SparkSession
spark = SparkSession.builder.appName("MultiJobStageTaskExample").getOrCreate()# 讀取數據(Transformation,不觸發Job)
orders = spark.read.csv("orders.csv",header=True,inferSchema=True
).select("用戶ID", "訂單金額", "支付方式")users = spark.read.csv("users.csv",header=True,inferSchema=True
).select("用戶ID", "所在城市")# 緩存重復使用的數據集(優化性能)
orders.cache()
users.cache()# --------------------------
# Job 1:計算不同支付方式的訂單數和總金額
# --------------------------
payment_analysis = orders.groupBy("支付方式") \.agg(count("用戶ID").alias("訂單數"),  # 聚合操作(寬依賴,觸發Shuffle)sum("訂單金額").alias("總金額"))# Action操作:觸發Job 1
payment_result = payment_analysis.collect()  # Job 1
print("支付方式分析結果:", payment_result)# --------------------------
# Job 2:計算每個城市的平均訂單金額
# --------------------------
city_analysis = orders.join(users, on="用戶ID", how="inner") \  #  join是寬依賴(Shuffle).groupBy("所在城市") \  # 再次寬依賴(Shuffle).agg(sum("訂單金額").alias("城市總金額"),count("用戶ID").alias("城市訂單數")) \.withColumn("平均訂單金額", col("城市總金額") / col("城市訂單數"))# Action操作:觸發Job 2
city_analysis.write.csv("city_avg_order")  # Job 2# --------------------------
# Job 3:統計高消費用戶(訂單總金額>10000)的分布
# --------------------------
high_value_users = orders.groupBy("用戶ID") \  # 寬依賴(Shuffle).agg(sum("訂單金額").alias("用戶總消費")) \.filter(col("用戶總消費") > 10000) \  # 過濾(窄依賴).join(users, on="用戶ID", how="inner")  # 寬依賴(Shuffle)# Action操作:觸發Job 3
high_value_count = high_value_users.count()  # Job 3
print("高消費用戶數量:", high_value_count)spark.stop()

根據payment_analysis.collect(),city_analysis.write.csv(“city_avg_order”)和high_value_count = high_value_users.count(),這段代碼被劃分成了三個job。

1.3 stage和task劃分

如下圖所示,在一個job中,出現了shuffle操作,就會劃分一個stage。再根據每個stage中的分區數量劃分task數量。
在這里插入圖片描述

2.任務執行時機

  • job的提交時間與action()被調用的時間有關,當應用程序執行到rdd.action()時,就會立即將rdd.action()形成的job提交給Spark。這其實也就是為什么有的時候寫完代碼沒有運行的原因,因為沒寫action()操作,job不會被提交到Spark
  • 僅當上游的stage都執行完成后,再執行下游的stage。如果stage之間沒有依賴,則并行執行,例如stage1和stage0是并行執行,當且僅當兩者執行后,stage2才開始執行。
  • stage中每個task因為是獨立而且同構的,可以并行運行沒有先后之分。

3.task內部數據存儲與流動

task是根據分區來劃分的,而一個分區中有很多個record,根據不同record之間的關系,存儲的方式也不同:
在這里插入圖片描述
這是一個task的執行流程的幾種不同的情況:

  • 第一個流程:record之間并沒有相互依賴,因此可以進行流式處理,即record1處理成record1’之后就可以將record1從內存中刪掉,而不用關心record2和record3處理到哪里了。
  • 第二個流程:f()流程無相互依賴,但是g()流程有相互依賴,也就是說record1在處理成record1’‘后,record1’‘會被保存到內存中,直到record2’‘和record3’'被處理完成。
  • 第三個流程:同理,在record1,record2和record3都被算出之后,才能執行f(),而在執行g()時,record1’,record2’和record3’才不會相互依賴。
  • 第四個流程:無法進行流水線處理,每處理完一個操作,才能回收該操作的輸入結果。

4.根據sparkUI了解Spark執行計劃

4.1查看job和stage

在spark的首界面可以看到當前正在執行的job:
在這里插入圖片描述
點擊job的鏈接,可以看到當前job中的stage數量:
在這里插入圖片描述
其中stage 0包含3個task,共Shuffle Write了376.0B,stage 1包含4個task,共Shuffle Write了988.0B,而stage 2包含3個task,一共Shuffle Read了1364.0B=376.0B+988.0B。

4.2 查看DAG圖

將Job鏈接中界面上的DAG Visualization展開,可以看到正在執行的DAG圖:
在這里插入圖片描述
每個黑色實心圓圈代表一個RDD,但這個圖稍顯混亂,stage 0中parallelize操作生成的RDD應該是被stage 2中的partitionBy處理的,與stage 1中的parallelize無關,也就是stage 0到stage 2的橫箭頭并沒有在stage1中作停留生成一個RDD
如果想進一步了解黑色實心圓圈代表哪些RDD,則可以進入stage的UI界面:
在這里插入圖片描述
這張圖展示了每個操作會生成哪些RDD(如join()操作生成了CoGroupedRDD及兩個MapPartitionsRDD),但沒有展示stage之間的連接關系。但是沒有展示Stage的連接關系。

4.3查看task

在某個stage界面,可以看到該stage的task信息:
在這里插入圖片描述
stage 0包含3個task,每個task都進行了Shuffle Write,寫入了2~3個record,也就是說Spark UI中也會統計Shuffle Write/Read的record數目。
在這里插入圖片描述
stage 1包含4個task,每個task都進行了ShuffleWrite,寫入了2個record。
在這里插入圖片描述
stage 2包含3個task,每個task從上游的stage 0/1那里Shuffle Read了5~6個record。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/95901.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/95901.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/95901.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

16-docker的容器監控方案-prometheus實戰篇

文章目錄一.前置知識1.監控與報警2.監控系統的設計3.監控系統的分類二、prometheus概述1.什么是prometheus2.prometheus的歷史3.為什么要學習prometheus4.prometheus的使用場景5.prometheus的宏觀架構圖6.prometheus軟件下載地址三、部署prometheus server監控軟件1.同步集群時…

集成電路學習:什么是Image Processing圖像處理

Image Processing,即圖像處理,是計算機視覺、人工智能、多媒體等領域的重要基礎。它利用計算機對圖像進行分析、加工和處理,以達到預期目的的技術。以下是對圖像處理的詳細解析: 一、定義與分類 定義: 圖像處理是指用計算機對圖像進行分析,以達到所需結果的技術,又稱…

基于Android的隨身小管家APP的設計與實現/基于SSM框架的財務管理系統/android Studio/java/原生開發

基于Android的隨身小管家APP的設計與實現/基于SSM框架/android Studio/java/原生開發

Web 開發 16

1 在 JavaScript(包括 JSX)中,函數體的寫法和返回值處理在 JavaScript(包括 JSX)中,函數體的寫法和返回值處理確實有一些簡潔的語法規則,尤其是在箭頭函數中。這些規則常常讓人混淆,…

超高車輛碰撞預警系統如何幫助提升城市立交隧道安全?

超高車輛帶來的安全隱患立交橋和隧道的設計通常基于常規車輛的高度標準。然而,隨著重型運輸業和超高貨車的增加,很多超高車輛會誤入這些限高區域,造成潛在的安全隱患。超高車輛與立交橋梁或隧道頂蓋發生碰撞時,可能導致結構受損&a…

三種變量類型在局部與全局作用域的區別

一、基本概念作用域(Scope): 全局作用域:定義在所有函數外部的變量或函數,具有文件作用域,生命周期為整個程序運行期間。局部作用域:定義在函數、塊(如 {})或類內部的變量…

InfluxDB 數據遷移工具:跨數據庫同步方案(二)

六、基于 API 的同步方案實戰6.1 API 原理介紹InfluxDB 提供的 HTTP API 是實現數據遷移的重要途徑。通過這個 API,我們可以向 InfluxDB 發送 HTTP 請求,以實現數據的讀取和寫入操作。在數據讀取方面,使用GET請求,通過指定數據庫名…

JVM安全點輪詢匯編函數解析

OpenJDK 17 源碼的實現邏輯,handle_polling_page_exception 函數在方法返回時的調用流程如下:調用流程分析:棧水印檢查觸發跳轉:當線程執行方法返回前的安全點輪詢時(MacroAssembler::safepoint_poll 中 at_returntrue…

Linux怎么查看服務器開放和啟用的端口

在 Linux 系統中,可以通過以下方法查看 服務器開放和啟用的端口。以下是詳細的步驟和工具,適用于不同場景。1. 使用 ss 查看開放的端口ss 是一個現代化工具,用于顯示網絡連接和監聽的端口。1.1 查看正在監聽的端口運行以下命令:ba…

XF 306-2025 阻燃耐火電線電纜檢測

近幾年隨著我國經濟快速的發展,電氣火災呈現高發趨勢,鑒于電線電纜火災的危險性,國家制定了阻燃,耐火電線電纜的標準,為企業,建設方,施工方等的生產,選材提供了指引。XF 306-2025 阻…

【Java|第二十篇】面向對象(十)——枚舉類

目錄 (四)面向對象: 12、枚舉類: (1)概述: (2)枚舉類的定義格式: (3)編譯與反編譯: (4)Enum類…

第二十一天-OLED顯示實驗

一、OLED顯示原理1、OLED名詞解釋OLED可以自發光,無需背光光源。2、正點原子OLED模塊模塊總體概述模塊接口模式選擇MCU與模塊外部連接8080并口讀寫過程OLED顯存因為要進行顯示,所以需要有顯存。顯存容量為128 x 8 byte,一個點用一位表示。SSD…

會議系統核心流程詳解:創建、加入與消息交互

一、系統架構概覽 會議系統采用"主進程線程池進程池"的分層架構,實現高并發與業務隔離: #mermaid-svg-fDJ5Ja5L3rqPkby0 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-fDJ5Ja5L3r…

Spring 創建 Bean 的 8 種主要方式

Spring(尤其是 Spring Boot)提供了多種方式來讓容器創建和管理 Bean。Component、Configuration Bean、EnableConfigurationProperties 都是常見方式。 下面我為你系統地梳理 Spring 創建 Bean 的所有主要方式,并說明它們的使用場景和區別。…

React 第七十節 Router中matchRoutes的使用詳解及注意事項

前言 matchRoutes 是 React Router v6 提供的一個核心工具函數,主要用于匹配路由配置與當前路徑。它在服務端渲染(SSR)、數據預加載、權限校驗等場景中非常實用。下面詳細解析其用法、注意事項和案例分析: 1、基本用法 import { m…

iSCSI服務配置全指南(含服務器與客戶端)

iSCSI服務配置全指南(含服務器與客戶端)一、iSCSI簡介 1. 概念 互聯網小型計算機系統接口(Internet Small Computer System Interface,簡稱iSCSI)是一種基于TCP/IP的協議,其核心功能是通過IP網絡仿真SCSI高…

堆(Heap):高效的優先級隊列實現

什么是堆?堆是一種特殊的完全二叉樹,滿足以下性質:堆序性:每個節點的值與其子節點滿足特定關系最小堆:父節點 ≤ 子節點(根最小)最大堆:父節點 ≥ 子節點(根最大&#xf…

朝花夕拾(四) --------python中的os庫全指南

目錄 Python os模塊完全指南:從基礎到高階文件操作 1. 引言:為什么需要os模塊? 1.1 os模塊的重要性 1.2 適用場景 1.3 os模塊的"瑞士軍刀"特性 2. os模塊基礎功能 2.1 文件與目錄操作 2.1.1 核心方法介紹 2.1.2 避坑指南 …

uniappx 安卓端本地打包的一些總結

本人之前沒用過android studio,因為有打包到安卓端的需求,所以有了這篇文章。下面一些內容不正常工作,也不報錯,是很煩的,根本不知道是哪里出了問題。比如對應的aar包沒有引入。或者沒有注冊信息。 在實現過程中我遇到…

AUTOSAR進階圖解==>AUTOSAR_SWS_UDPNetworkManagement

AUTOSAR UDP網絡管理詳解 基于AUTOSAR標準的UDP網絡管理模塊架構分析與實現指南目錄 1. 概述2. UDP網絡管理架構 2.1 整體架構圖2.2 架構組件詳解 3. UDP網絡管理狀態機 3.1 狀態機圖3.2 狀態詳解 4. UDP網絡管理操作序列 4.1 序列圖4.2 操作流程詳解 5. UDP網絡管理配置模型 …