7.7晚自習作業

實操作業02:Spark核心開發

作業說明

  • 請嚴格按照步驟操作,并將最終結果文件(命名為:sparkcore_result.txt)于20點前上傳
  • 結果文件需包含每一步的關鍵命令執行結果文本輸出。

一、數據讀取與轉換操作

  1. 上傳賬戶數據$DATA_EXERCISE/accounts到HDFS的/dw/accounts目錄,從HDFS路徑/dw/accounts讀取accounts數據文件
hadoop fs -mkdir -p /dw/accounts
hadoop fs -put $DATA_EXERCISE/accounts /dw/accounts/

  1. 將每行數據按逗號分割成字段數組
  2. 以郵政編碼字段(第9個字段)作為key,創建鍵值對RDD
  3. 查看轉換后的數據結構,顯示前2條記錄

step1. 創建 RDD(讀取所有 part 文件)

// 使用通配符 * 讀取目錄下所有 part 文件
val accountsRDD = sc.textFile("hdfs://master:8020/dw/accounts/accounts/part-*")// 驗證數據加載
accountsRDD.take(2).foreach(println)

step2:數據轉換

// 一、數據轉換操作
val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",")  // 按逗號分割每行數據val zipCode = fields(8)      // 提取第9個字段(索引8)作為郵政編碼(zipCode, fields)            // 返回(郵編, 字段數組)的鍵值對
})// 顯示前2條
zipKeyValueRDD.take(2).foreach { case (zip, fields) => println(s"Key: $zip | Value: ${fields.mkString(",")}")
}

為什么可以確定line是一個數組屬性的變量呢

解析:
line: 是一個 String 類型的變量,表示CSV文件的某一行(例如:"1,John,Doe,10001")
.split(","):
是String類的方法,按指定的分隔符(這里是逗號 ,)切割字符串
返回一個 Array[String](字符串數組)
val fields:
定義一個 不可變變量(Scala中用val聲明)
類型由編譯器自動推斷為 Array[String]
存儲切割后的字段數組可以通過Spark-shell直接驗證類型:
// 示例數據
val line = "1,John,Doe,10001"
// 執行切割
val fields = line.split(",")  // 得到:Array("1", "John", "Doe", "10001")
// 查看類型
println(fields.getClass)  // 輸出:class [Ljava.lang.String; (表示String數組)

這三行代碼也只能輸出一行數據的轉換結果,但是這個是一整個文件的轉換結果,應該是有循環在里面的,循環體現在哪呢

這里的關鍵在于?Spark的分布式計算模型?和?RDD的轉換操作?的運作機制。讓我們深入解析這個"隱式循環"的實現原理:

1. 表面現象 vs 底層實現

你看到的代碼(看似單行處理)

scala

val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",")? // 似乎只處理一行val zipCode = fields(8)(zipCode, fields)
})

實際發生的處理(分布式并行處理)

python

# 偽代碼:Spark集群的實際執行邏輯
for partition in accountsRDD.partitions:    # 遍歷所有數據分區for line in partition:                   # 分區內逐行處理(真正的循環)fields = line.split(",")emit((fields[8], fields))             # 輸出鍵值對

2. 循環的隱藏位置

Spark通過以下機制實現"隱式循環":

組件作用類比傳統編程
RDD.map()定義轉換邏輯相當于循環體
Partitions(分區)數據分片相當于外層循環
Tasks(任務)并行處理單元相當于多線程循環

3. 完整數據處理流程

  1. 輸入階段

    • 假設原始文件有100萬行,被Spark自動劃分為N個分區(如HDFS的block數)

  2. 并行執行階段

    • 每個分區的數據會被不同的Executor進程處理

    • 每個Executor對分配到的分區數據?逐行執行map函數

  3. 輸出階段

    • 所有節點的處理結果合并成新的RDD

4. 關鍵問題解答

Q: 為什么代碼沒有顯式循環卻能處理整個文件?

  • Spark的RDD操作是聲明式的:你只需要定義"做什么"(業務邏輯),Spark框架自動處理"怎么做"(分布式循環)

  • 數據分區和并行計算

    scala

    // 假設文件被分為3個分區
    accountsRDD.partitions.size? // 返回分區數

    每個分區會觸發一個Task,每個Task內部會循環處理其分配到的數據

Q: 如何驗證確實處理了所有數據?

scala

// 查看處理后的記錄數
zipKeyValueRDD.count()?

// 查看分區處理情況(需在Spark UI觀察)
zipKeyValueRDD.toDebugString


5. 與傳統編程的對比

特性Spark RDD傳統Java/Python
循環方式隱式(由框架管理)顯式(for/while)
并行度自動分區并行需手動線程/進程管理
數據規模支持TB/PB級通常單機內存限制
代碼示例rdd.map(x => f(x))for x in list: f(x)
  • 循環確實存在:但被Spark框架隱藏在分布式運行時中

  • 你的代碼是"循環體"map中的邏輯會被應用到每一行數據

  • 優勢:開發者無需關心并行化和數據分發,專注業務邏輯

二、數據聚合操作

  1. 對上述RDD提取每個記錄的姓名字段:
  2. 提取第5個字段(first_name)和第4個字段(last_name)
  3. 將姓和名用逗號連接
val nameByZipRDD = zipKeyValueRDD.mapValues(fields => s"${fields(4)},${fields(3)}"  // 格式化為"姓,名"
)

    在Scala中,s"${fields(4)},${fields(3)}"?是一種稱為?字符串插值(String Interpolation)?的語法

    1. 字符串插值的組成

    部分含義示例
    開頭的s表示啟用字符串插值s"..."
    ${}插入變量/表達式的語法${fields(4)}
    引號內內容固定字符串+動態變量組合"姓,名"

    2. 具體到代碼

    scala

    s"${fields(4)},${fields(3)}"
    • 等效的普通寫法

      scala

      fields(4) + "," + fields(3)  // 直接字符串拼接

    • 執行過程

      1. 取出數組fields的第5個元素(索引4)

      2. 取出第4個元素(索引3)

      3. 用逗號連接兩者

    3. 對比其他語言

    語言類似語法示例
    Scalas"${var}"s"Hello, ${name}"
    Pythonf-stringf"Hello, {name}"
    JavaScript模板字符串`Hello, ${name}`

    1.?map?vs?mapValues?的本質區別

    操作函數簽名輸入 → 輸出在你的代碼中的應用
    map(T) => U整個元素 → 新元素line => (zipCode, fields)
    mapValues(V) => U僅值部分 → 新值(鍵不變)fields => "姓,名"

    2.代碼中兩個階段的解析

    (1)第一階段:數據轉換 (map)

    scala

    val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",")       // String → Array[String]val zipCode = fields(8)           // 提取key(zipCode, fields)                 // 返回: (String, Array[String])
    })
    • line =>?的含義:

      • 輸入:原始字符串(如?"1,John,Doe,10001"

      • 輸出:完全新建的鍵值對?(String, Array[String])

    • 數據流

      text

      "1,John,Doe,10001" → split → ["1","John","Doe","10001"] → 取fields(8)作為key → 輸出 ("10001", ["1","John","Doe","10001",...])
    (2)第二階段:聚合 (mapValues)

    scala

    val nameByZipRDD = zipKeyValueRDD.mapValues(fields => s"${fields(4)},${fields(3)}"  // 僅修改value部分
    )
    • fields =>?的含義:

      • 輸入:已有鍵值對的值部分(即之前的?Array[String]

      • 輸出:僅更新值(鍵?zipCode?保持不變)

    • 數據流

      text

      輸入: ("10001", ["1","John","Doe","10001",...])→ 提取fields(4)和fields(3) → 輸出 ("10001", "Doe,John")  // 鍵未改變!

    3. =>?的本質

    • =>?是Scala中的函數定義符號,表示:

      scala

      val func: InputType => OutputType = (input) => { // 處理input output 
      }
    • 在代碼中:

      • line => ...:定義了一個從?String?到?(String, Array[String])?的函數

      • fields => ...:定義了一個從?Array[String]?到?String?的函數

    1. 按郵政編碼分組
    2. 查看聚合結果,顯示前2條記錄
    val groupedByNameRDD = nameByZipRDD.groupByKey()// 顯示前2組
    groupedByNameRDD.take(2).foreach {case (zip, names) => println(s"$zip -> ${names.mkString("; ")}")
    }

    三、數據排序與展示

    1. 對分組后的RDD按郵政編碼進行升序排列
    2. 取前5條記錄進行展示
    3. 對每條記錄,先打印郵政編碼,然后打印該郵政編碼下的所有姓名列表
    groupedByNameRDD.sortByKey().take(5).foreach {case (zip, names) =>println(s"\n=== 郵政編碼: $zip ===")names.foreach(println)
    }


    四、提交要求

    1. 代碼和結果文件:將代碼及其執行后的輸出結果保存到sparkcore_result.txt文件中

    2. 結果文件應包含

    3. 數據讀取與轉換操作的代碼和輸出結果
    4. 數據聚合操作的代碼和輸出結果
    5. 數據排序與展示的代碼和輸出結果

    本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
    如若轉載,請注明出處:http://www.pswp.cn/bicheng/88023.shtml
    繁體地址,請注明出處:http://hk.pswp.cn/bicheng/88023.shtml
    英文地址,請注明出處:http://en.pswp.cn/bicheng/88023.shtml

    如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

    相關文章

    手機FunASR識別SIM卡通話占用內存和運行性能分析

    手機FunASR識別SIM卡通話占用內存和運行性能分析 --本地AI電話機器人 上一篇:手機無網離線使用FunASR識別SIM卡語音通話內容 下一篇:手機通話語音離線ASR識別商用和優化方向 一、前言 書接上一文《阿里FunASR本地斷網離線識別模型簡析》,…

    虛幻引擎Unreal Engine5恐怖游戲設計制作教程,從入門到精通從零開始完整項目開發實戰詳細講解中英字幕

    和大家分享一個以前收集的UE5虛幻引擎恐怖游戲開發教程,這是國外一個大神制作的視頻教程,教程從零開始到制作出一款完整的游戲。內容講解全面,如藍圖基礎知識講解、角色控制、高級交互系統、高級庫存系統、物品檢查、恐怖環境氛圍設計、過場動…

    多人協同開發時Git使用命令

    拉取倉庫代碼 # 拉取遠程倉庫至本地tar_dir路徑 git clone gitgithub.com:your-repo.git target_dir # 默認是拉取遠程master分支,下面拉取并切換到自己需要開發的分支上 # 假設自己需要開發的分支是/feature/my_branch分支 git checkout -b feature/my_branch orig…

    線性表——雙向鏈表

    線性表——雙向鏈表1. 雙向鏈表的實現1.1 簡單圖例1.2 結點的定義1.3 新結點的創建1.4 鏈表的初始化1.5 結點的插入1.5.1 頭部插入(頭插)1.5.2 尾部插入(尾插)1.5.3 任意位置(前)插入1.6 結點的刪除1.6.1 頭…

    Java后端技術博客匯總文檔

    文章目錄 前言Java后端匯總鏈接Java基礎知識點數據結構算法(Java實現)算法知識點合集算法刷題算法競賽AcWing課程藍橋杯AB組輔導課合集(更新中…) 源碼分析redission 數據庫SQL ServerMySQLRedis -Canal JUC并發編程JVMNetty日志框…

    QT 菜單欄設計使用方法

    目錄 常用設置函數 多個QAction的單選設置 ???????菜單相關類 ??????? 系統菜單的生成和響應 使用代碼添加系統菜單 使用UI設計器設計系統菜單 使用Qt設計及界面時,常用的兩種方式添加菜單,第一使用UI界面添加,第二種 在…

    AIGC領域AI藝術,打造個性化藝術作品

    AIGC領域AI藝術,打造個性化藝術作品 關鍵詞:AIGC、AI藝術、生成對抗網絡、個性化創作、深度學習、藝術風格遷移、創意計算 摘要:本文深入探討了AIGC(人工智能生成內容)在藝術創作領域的應用,重點分析了如何利用AI技術打造個性化藝術作品。文章從技術原理出發,詳細解析了生…

    基于Flask+Jinja2的快捷教務系統(后端鏈接到新版正方教務系統)

    快捷教務系統(Easy Educational Administration Management System, EasyEAMS) 項目簡介 EasyEAMS 是一個基于 Flask Jinja2 的現代化教務系統 Web 應用。學生可通過網頁端登錄,在線查詢個人信息、成績、課表、學業生涯、通知、選課等。系…

    EDM自動化與出海獨立開發實用教程

    隨著互聯網全球化發展,越來越多的獨立開發者(Indie Developer)選擇將自己的產品推向海外市場。如何高效地獲客、激活用戶、提升轉化率,成為出海過程中必須解決的問題。EDM(電子郵件營銷)自動化,…

    「日拱一碼」017 深度學習常用庫——TensorFlow

    目錄 基礎操作 張量操作: tf.constant 用于創建常量張量 tf.Variable 用于創建可訓練的變量張量 tf.reshape 可改變張量的形狀 tf.concat 可將多個張量沿指定維度拼接 tf.split 則可將張量沿指定維度分割 數學運算: tf.add 張量的加運算 tf.su…

    ARM DStream仿真器腳本常用命令

    以下是ARM DStream仿真器腳本中常用的命令及其功能分類,結合調試流程和典型應用場景整理: ?? 一、連接與初始化命令 connect 建立與目標設備的連接,需指定接口類型(如JTAG/SWD)和處理器核心。 示例:conne…

    vscode 調試unity

    lanch.json { “version”: “0.2.0”, “configurations”: [ { “name”: “Attach to Unity”, “type”: “vstuc”, “request”: “attach” } ] }

    金融IT入門知識點

    銀行金融IT核心知識點全解析:架構、技術與實踐 一、金融IT的戰略地位與行業特性 金融IT作為銀行業務的核心支撐體系,其發展水平直接決定了銀行服務的效率、安全性與創新能力。截至 2025年,中國銀行業線上化業務占比已達97%,手機銀…

    C++——手撕智能指針、單例模式、線程池、String

    智能指針今天我們來學習一下C中的智能指針,如果有人不知道C中的智能指針的概念的話:C智能指針是一種基于RAII(Resource Acquisition Is Initialization,資源獲取即初始化)機制的高級內存管理工具,用于自動化…

    Mybatis----留言板

    基礎項目:留言板 截止到目前為止,我們已經學習了 Spring(只學習了DI)、Spring MVC、SpringBoot、Mybatis 這些知識了,已經滿足了做簡單項目的基本要求了,所以接下來我們就從0到1實現表白墻項目。 需求分析…

    Web-API-day3 DOM事件進階

    一、 事件流 1.事件冒泡 const fa document.querySelector(.father)const son document.querySelector(.son)document.addEventListener(click, function () {alert(我是爺爺)})fa.addEventListener(click, function () {alert(我是爸爸)})son.addEventListener(click, fun…

    小波增強型KAN網絡 + SHAP可解釋性分析(Pytorch實現)

    效果一覽一、傳統KAN網絡的痛點與突破 1. 傳統KAN的局限性 傳統Kolmogorov-Arnold網絡(KAN)雖在理論上有可靠的多變量函數逼近能力,但存在顯著瓶頸: 計算效率低:訓練速度慢于MLP,資源消耗大,尤其…

    tomcat部署多個端口以及制定路徑部署-vue3

    vue3項目tomcat部署記錄 使用hash路由 字符串拼接的圖片地址可以使用import.meta.env.BASE_URL 默認8080 如果部署地址為8080/xc 則設置 vite.config.js中設置base為’/xc/’ outDir設置為xc 打包產物直接拖到webapps目錄下 如果另開一個端口 如8081 設置根目錄訪問 conf/ser…

    LeetCode三數之和-js題解

    給你一個整數數組 nums ,判斷是否存在三元組 [nums[i], nums[j], nums[k]] 滿足 i ! j、i ! k且 j ! k ,同時還滿足 nums[i] nums[j] nums[k] 0 。請你返回所有和為 0 且不重復的三元組。 注意:答案中不可以包含重復的三元組。 示例 1&…

    Flink SQLServer CDC 環境配置與驗證

    一、SQL Server 數據庫核心配置 1. 啟用 CDC 功能(Change Data Capture) SQL Server CDC 依賴數據庫級別的 CDC 功能及表級別的捕獲配置,需按以下步驟啟用: 啟用數據庫 CDC -- 以管理員身份連接數據庫 USE master; GO-- 檢查數…