實操作業02:Spark核心開發
作業說明
- 請嚴格按照步驟操作,并將最終結果文件(命名為:
sparkcore_result.txt
)于20點前上傳。 - 結果文件需包含每一步的關鍵命令執行結果文本輸出。
一、數據讀取與轉換操作
- 上傳賬戶數據
$DATA_EXERCISE/accounts
到HDFS的/dw/accounts
目錄,從HDFS路徑/dw/accounts
讀取accounts數據文件
hadoop fs -mkdir -p /dw/accounts
hadoop fs -put $DATA_EXERCISE/accounts /dw/accounts/
- 將每行數據按逗號分割成字段數組
- 以郵政編碼字段(第9個字段)作為key,創建鍵值對RDD
- 查看轉換后的數據結構,顯示前2條記錄
step1. 創建 RDD(讀取所有 part 文件)
// 使用通配符 * 讀取目錄下所有 part 文件
val accountsRDD = sc.textFile("hdfs://master:8020/dw/accounts/accounts/part-*")// 驗證數據加載
accountsRDD.take(2).foreach(println)
step2:數據轉換
// 一、數據轉換操作
val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",") // 按逗號分割每行數據val zipCode = fields(8) // 提取第9個字段(索引8)作為郵政編碼(zipCode, fields) // 返回(郵編, 字段數組)的鍵值對
})// 顯示前2條
zipKeyValueRDD.take(2).foreach { case (zip, fields) => println(s"Key: $zip | Value: ${fields.mkString(",")}")
}
為什么可以確定line是一個數組屬性的變量呢
解析:
line: 是一個 String 類型的變量,表示CSV文件的某一行(例如:"1,John,Doe,10001")
.split(","):
是String類的方法,按指定的分隔符(這里是逗號 ,)切割字符串
返回一個 Array[String](字符串數組)
val fields:
定義一個 不可變變量(Scala中用val聲明)
類型由編譯器自動推斷為 Array[String]
存儲切割后的字段數組可以通過Spark-shell直接驗證類型:
// 示例數據
val line = "1,John,Doe,10001"
// 執行切割
val fields = line.split(",") // 得到:Array("1", "John", "Doe", "10001")
// 查看類型
println(fields.getClass) // 輸出:class [Ljava.lang.String; (表示String數組)
這三行代碼也只能輸出一行數據的轉換結果,但是這個是一整個文件的轉換結果,應該是有循環在里面的,循環體現在哪呢
這里的關鍵在于?Spark的分布式計算模型?和?RDD的轉換操作?的運作機制。讓我們深入解析這個"隱式循環"的實現原理:
1. 表面現象 vs 底層實現
你看到的代碼(看似單行處理)
scala
val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",")? // 似乎只處理一行val zipCode = fields(8)(zipCode, fields)
})
實際發生的處理(分布式并行處理)
python
# 偽代碼:Spark集群的實際執行邏輯
for partition in accountsRDD.partitions: # 遍歷所有數據分區for line in partition: # 分區內逐行處理(真正的循環)fields = line.split(",")emit((fields[8], fields)) # 輸出鍵值對
2. 循環的隱藏位置
Spark通過以下機制實現"隱式循環":
組件 | 作用 | 類比傳統編程 |
---|---|---|
RDD.map() | 定義轉換邏輯 | 相當于循環體 |
Partitions(分區) | 數據分片 | 相當于外層循環 |
Tasks(任務) | 并行處理單元 | 相當于多線程循環 |
3. 完整數據處理流程
-
輸入階段
-
假設原始文件有100萬行,被Spark自動劃分為N個分區(如HDFS的block數)
-
-
并行執行階段
-
每個分區的數據會被不同的Executor進程處理
-
每個Executor對分配到的分區數據?逐行執行map函數
-
-
輸出階段
-
所有節點的處理結果合并成新的RDD
-
4. 關鍵問題解答
Q: 為什么代碼沒有顯式循環卻能處理整個文件?
-
Spark的RDD操作是聲明式的:你只需要定義"做什么"(業務邏輯),Spark框架自動處理"怎么做"(分布式循環)
-
數據分區和并行計算:
scala
// 假設文件被分為3個分區
accountsRDD.partitions.size? // 返回分區數每個分區會觸發一個Task,每個Task內部會循環處理其分配到的數據
Q: 如何驗證確實處理了所有數據?
scala
// 查看處理后的記錄數
zipKeyValueRDD.count()?
// 查看分區處理情況(需在Spark UI觀察)
zipKeyValueRDD.toDebugString
5. 與傳統編程的對比
特性 | Spark RDD | 傳統Java/Python |
---|---|---|
循環方式 | 隱式(由框架管理) | 顯式(for/while) |
并行度 | 自動分區并行 | 需手動線程/進程管理 |
數據規模 | 支持TB/PB級 | 通常單機內存限制 |
代碼示例 | rdd.map(x => f(x)) | for x in list: f(x) |
-
循環確實存在:但被Spark框架隱藏在分布式運行時中
-
你的代碼是"循環體":
map
中的邏輯會被應用到每一行數據 -
優勢:開發者無需關心并行化和數據分發,專注業務邏輯
二、數據聚合操作
- 對上述RDD提取每個記錄的姓名字段:
- 提取第5個字段(first_name)和第4個字段(last_name)
- 將姓和名用逗號連接
val nameByZipRDD = zipKeyValueRDD.mapValues(fields => s"${fields(4)},${fields(3)}" // 格式化為"姓,名"
)
在Scala中,s"${fields(4)},${fields(3)}"
?是一種稱為?字符串插值(String Interpolation)?的語法
1. 字符串插值的組成
部分 | 含義 | 示例 |
---|---|---|
開頭的s | 表示啟用字符串插值 | s"..." |
${} | 插入變量/表達式的語法 | ${fields(4)} |
引號內內容 | 固定字符串+動態變量組合 | "姓,名" |
2. 具體到代碼
scala
s"${fields(4)},${fields(3)}"
-
等效的普通寫法:
scala
fields(4) + "," + fields(3) // 直接字符串拼接
-
執行過程:
-
取出數組
fields
的第5個元素(索引4) -
取出第4個元素(索引3)
-
用逗號連接兩者
-
3. 對比其他語言
語言 | 類似語法 | 示例 |
---|---|---|
Scala | s"${var}" | s"Hello, ${name}" |
Python | f-string | f"Hello, {name}" |
JavaScript | 模板字符串 | `Hello, ${name}` |
1.?map
?vs?mapValues
?的本質區別
操作 | 函數簽名 | 輸入 → 輸出 | 在你的代碼中的應用 |
---|---|---|---|
map | (T) => U | 整個元素 → 新元素 | line => (zipCode, fields) |
mapValues | (V) => U | 僅值部分 → 新值(鍵不變) | fields => "姓,名" |
2.代碼中兩個階段的解析
(1)第一階段:數據轉換 (map
)
scala
val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",") // String → Array[String]val zipCode = fields(8) // 提取key(zipCode, fields) // 返回: (String, Array[String])
})
-
line =>
?的含義:-
輸入:原始字符串(如?
"1,John,Doe,10001"
) -
輸出:完全新建的鍵值對?
(String, Array[String])
-
-
數據流:
text
"1,John,Doe,10001" → split → ["1","John","Doe","10001"] → 取fields(8)作為key → 輸出 ("10001", ["1","John","Doe","10001",...])
(2)第二階段:聚合 (mapValues
)
scala
val nameByZipRDD = zipKeyValueRDD.mapValues(fields => s"${fields(4)},${fields(3)}" // 僅修改value部分
)
-
fields =>
?的含義:-
輸入:已有鍵值對的值部分(即之前的?
Array[String]
) -
輸出:僅更新值(鍵?
zipCode
?保持不變)
-
-
數據流:
text
輸入: ("10001", ["1","John","Doe","10001",...])→ 提取fields(4)和fields(3) → 輸出 ("10001", "Doe,John") // 鍵未改變!
3. =>
?的本質
-
=>
?是Scala中的函數定義符號,表示:scala
val func: InputType => OutputType = (input) => { // 處理input output }
-
在代碼中:
-
line => ...
:定義了一個從?String
?到?(String, Array[String])
?的函數 -
fields => ...
:定義了一個從?Array[String]
?到?String
?的函數
-
- 按郵政編碼分組
- 查看聚合結果,顯示前2條記錄
val groupedByNameRDD = nameByZipRDD.groupByKey()// 顯示前2組
groupedByNameRDD.take(2).foreach {case (zip, names) => println(s"$zip -> ${names.mkString("; ")}")
}
三、數據排序與展示
- 對分組后的RDD按郵政編碼進行升序排列
- 取前5條記錄進行展示
- 對每條記錄,先打印郵政編碼,然后打印該郵政編碼下的所有姓名列表
groupedByNameRDD.sortByKey().take(5).foreach {case (zip, names) =>println(s"\n=== 郵政編碼: $zip ===")names.foreach(println)
}
四、提交要求
-
代碼和結果文件:將代碼及其執行后的輸出結果保存到
sparkcore_result.txt
文件中 -
結果文件應包含:
- 數據讀取與轉換操作的代碼和輸出結果
- 數據聚合操作的代碼和輸出結果
- 數據排序與展示的代碼和輸出結果