使用Spark Shell探索RDD
啟動并使用Scala Spark Shell
在終端窗口,啟動Scala Spark shell:
spark-shell --master local
查看對象:
scala> sc
scala> spark
輸入spark.[TAB]
然后可以看到所有可用的方法。
讀并顯示文本文件
查看文本$DATA_EXERCISE/frostroad.txt
讀取本地文件來創建RDD。Spark并沒有讀文件,直到你執行了action操作,比如統計數據集行數。
嘗試執行collect操作來顯示RDD的所有數據。
輸入mydata.[TAB]
可以看到所有可用的轉換操作。
輸入exit
退出。
操作代碼
[root@master ~]# spark-shell --master localscala> val fr_rdd = sc.textFile("file:/root/dataExercise/frostroad.txt")
fr_rdd: org.apache.spark.rdd.RDD[String] = file:/root/dataExercise/frostroad.txt MapPartitionsRDD[1] at textFile at <console>:24scala> fr_rdd.take(2)
res0: Array[String] = Array(Two roads diverged in a yellow wood,, And sorry I could not travel both)scala> fr_rdd.collect()scala> fr_rdd.collect().foreach(println)scala> fr_rdd.
++ first max take
aggregate flatMap min takeAsync
barrier fold name takeOrdered
cache foreach
使用RDD來轉換數據集
探索Web日志文件
日志文件/dw/weblogs
示例:
22.252.151.164 - 23251 [15/Mar/2014:00:00:30 +0100] "GET /KBDOC-00150.html HTTP/1.0" 200 19203 "http://www.loudacre.com" "Loudacre Mobile Browser Sorrento F11L"
22.252.151.164 - 23251 [15/Mar/2014:00:00:30 +0100] "GET /theme.css HTTP/1.0" 200 10684 "http://www.loudacre.com" "Loudacre Mobile Browser Sorrento F11L"
從數據文件創建RDD。
創建只包含請求圖片JPG文件的RDD。
使用take查看前10行數據。
在日志中返回每行的長度。
把每一行映射成一個數組,查看前5條。
定義新的RDD,日志文件的每一行只包含IP地址。
最后,保存IP地址列表到/dw/iplist
在終端窗口或Hue文件瀏覽器,列出/dw/iplist
目錄內容。你可以看到多個part-xxxxx文件。查看文件內容確認結果是正確的。
如果需要節約內存,可以停止cloudera服務
service cloudera-scm-server stop
service cloudera-scm-agent stop
scala> val logs = sc.textFile("/dw/weblogs")
logs: org.apache.spark.rdd.RDD[String] = /dw/weblogs MapPartitionsRDD[1] at textFile at <console>:24scala> logs.take(2)
res0: Array[String] = Array(34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /ifruit_3a_sales.html HTTP/1.0" 200 11416 "http://www.loudacre.com" "Loudacre Mobile Browser Titanic 2300", 34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /theme.css HTTP/1.0" 200 14933 "http://www.loudacre.com" "Loudacre Mobile Browser Titanic 2300")scala> val jpglogs = logs.filter(line => line.contains(".jpg"))scala> jpglogs.take(2)
res1: Array[String] = Array(34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /ifruit_3a.jpg HTTP/1.0" 200 12554 "http://www.loudacre.com" "Loudacre Mobile Browser Titanic 2300", 242.13.139.123 - 66694 [01/Mar/2014:23:54:48 +0100] "GET /sorrento_f10l.jpg HTTP/1.0" 200 649 "http://www.loudacre.com" "Loudacre Mobile Browser Sorrento F10L")scala> val logsLen = logs.map(line => line.length)
logsLen: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:25scala> logsLen.take(5)
res2: Array[Int] = Array(161, 150, 148, 154, 160)scala> val logs_split = logs.map(line => line.split(" "))
logs_split: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:25scala> logs_split.take(2)scala> val ip_list = logs_split.map(ar => ar(0))
ip_list: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:25scala> ip_list.take(2)
res5: Array[String] = Array(34.28.1.122, 34.28.1.122)scala> ip_list.saveAsTextFile("/dw/iplist")
使用Spark處理數據文件
檢查數據
檢查$DATA_EXERCISE/activations
里的數據,每個XML文件包含了客戶在指定月份活躍的設備數據。
拷貝數據到HDFS的/dw
目錄
樣本數據示例:
<activations><activation timestamp="1225499258" type="phone"><account-number>316</account-number><device-id>d61b6971-33e1-42f0-bb15-aa2ae3cd8680</device-id><phone-number>5108307062</phone-number><model>iFruit 1</model></activation>…
</activations>
處理文件
讀取XML文件并抽取賬戶號和設備型號,把結果保存到/dw/account-models
,格式為account_number:model
。
輸出示例:
1234:iFruit 1
987:Sorrento F00L
4566:iFruit 1
…
提供了解析XML的函數如下:
// Stub code to copy into Spark Shellimport scala.xml._// Given a string containing XML, parse the string, and
// return an iterator of activation XML records (Nodes) contained in the stringdef getActivations(xmlstring: String): Iterator[Node] = {val nodes = XML.loadString(xmlstring) \\ "activation"nodes.toIterator
}// Given an activation record (XML Node), return the model name
def getModel(activation: Node): String = {(activation \ "model").text
}// Given an activation record (XML Node), return the account number
def getAccount(activation: Node): String = {(activation \ "account-number").text
}
操作命令
上傳文件?hdfs dfs -put $DATA_EXERCISE/activations /dw
scala> val xmls = sc.wholeTextFiles("/dw/activations")
xmls: org.apache.spark.rdd.RDD[(String, String)] = /dw/activations MapPartitionsRDD[12] at wholeTextFiles at <console>:27scala> xmls.take(1)
res9: Array[(String, String)] =
Array((hdfs://master:8020/dw/activations/2013-01.xml,<activations><activation timestamp="1359697709" type="phone"><account-number>97349</account-number><device-id>e17ff6a8-0899-4a87-972b-30230ebfa6b9</device-id><phone-number>4247767545</phone-number><model>iFruit 4</model></activation><activation timestamp="1359697637" type="phone"><account-number>97068</account-number><device-id>49beb012-d410-40c8-84b6-a5753b68c607</device-id><phone-number>5592763034</phone-number><model>Ronin S1</model></activation><activation timestamp="1359696681" type="phone"><account-number>82601</account-number><device-id>ed58b95d-a7f3-4333-be06-d53890ef1a08</device-id><phone-number>503470...
scala> scala> val xmls_flat = xmls.flatMap(pair => getActivations(pair._2))
xmls_flat: org.apache.spark.rdd.RDD[scala.xml.Node] = MapPartitionsRDD[13] at flatMap at <console>:30scala> xmls_flat.take(1)
res10: Array[scala.xml.Node] =
Array(<activation type="phone" timestamp="1359697709"><account-number>97349</account-number><device-id>e17ff6a8-0899-4a87-972b-30230ebfa6b9</device-id><phone-number>4247767545</phone-number><model>iFruit 4</model></activation>)scala> val acc_model = xmls_flat.map(act => getAccount(act)+":"+getModel(act))
acc_model: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at map at <console>:32scala> acc_model.take(1)
res11: Array[String] = Array(97349:iFruit 4)scala> acc_model.saveAsTextFile("/dw/account-models")
使用Pair RDD來連接2個數據集
探索Web日志文件
使用map-reduce,統計每個用戶的請求。
-
使用map創建Pair RDD,User ID作為key,整數1作為value(user ID是每行的第三個字段)
-
匯總每個用戶的value
使用countByKey來確定對不同的頻率有多少用戶訪問了網站。即有多少用戶訪問了1次、兩次或者三次等等
- 使用map來倒轉key和value,類似于:
(userID, count) => (count, userID)
- 使用countByKey來返回(頻率:用戶數)鍵值對的Map
創建一個RDD,用戶id為key,用戶訪問的ip地址作為value。 - 提示:Map為(userid,ipaddress)并使用groupByKey。
scala> val logs = sc.textFile("/dw/weblogs")
logs: org.apache.spark.rdd.RDD[String] = /dw/weblogs MapPartitionsRDD[17] at textFile at <console>:27scala> val user_reqs = logs.map(line => line.split(" ")).map(ar => (ar(2),1)).reduceByKey((v1,v2) => v1+v2)
user_reqs: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:28scala> user_reqs.take(2)
res15: Array[(String, Int)] = Array((92694,4), (49368,4)) scala> user_reqs.map(pair => (pair._2, pair._1)).countByKey()
res18: scala.collection.Map[Int,Long] = Map(138 -> 6, 170 -> 1, 5 -> 19, 120 -> 6, 10 -> 228, 142 -> 9, 14 -> 20, 110 -> 2, 152 -> 6, 164 -> 1, 106 -> 3, 132 -> 10, 116 -> 10,
…………)scala> val user_ips = logs.map(line => line.split(" ")).map(ar => (ar(2),ar(0))).groupByKey()
user_ips: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[32] at groupByKey at <console>:28scala> user_ips.take(2)
res20: Array[(String, Iterable[String])] = Array((92694,CompactBuffer(84.206.178.154, 84.206.178.154, 84.206.178.154, 84.206.178.154)), (49368,CompactBuffer(34.179.95.142, 34.179.95.142, 34.179.95.142, 34.179.95.142))) scala> for (pair <- user_ips.take(2)){| println("userid:"+pair._1)| for (ip <- pair._2) println("-- ip:"+ip)| }
userid:92694
-- ip:84.206.178.154
-- ip:84.206.178.154
-- ip:84.206.178.154
-- ip:84.206.178.154
userid:49368
-- ip:34.179.95.142
-- ip:34.179.95.142
-- ip:34.179.95.142
-- ip:34.179.95.142
解析:
map(line => line.split(" ")) - 將每行日志按空格分割成數組
map(ar => (ar(2),1)) - 提取第三個字段(索引2)作為用戶ID,創建鍵值對(userID, 1)格式假設為:[IP地址] [其他字段] [用戶ID] [其他字段]...
reduceByKey((v1,v2) => v1+v2) - 對相同用戶ID的值進行累加
結果得到(userID, 總請求次數)的Pair RDD示例輸出:
Array((92694,4), (49368,4))3. 統計訪問頻率分布
scala
user_reqs.map(pair => (pair._2, pair._1)).countByKey()
詳細步驟:
map(pair => (pair._2, pair._1)) - 反轉鍵值對,從(userID, count)變為(count, userID)
countByKey() - 統計每個訪問次數(count)有多少不同的用戶
返回一個Map,鍵是訪問次數,值是對應的用戶數量示例輸出:
Map(138 -> 6, 170 -> 1, 5 -> 19, 120 -> 6, ...)表示:
有6個用戶訪問了138次
有1個用戶訪問了170次
有19個用戶訪問了5次
有6個用戶訪問了120次5. 打印用戶IP信息
scala
for (pair <- user_ips.take(2)){println("userid:"+pair._1)for (ip <- pair._2) println("-- ip:"+ip)
}
詳細步驟:
user_ips.take(2) - 獲取前兩個用戶的記錄
對于每個用戶:
打印用戶ID
遍歷該用戶的所有IP地址并打印
連接賬戶數據和Web日志文件
上傳賬戶數據$DATA_EXERCISE/accounts
到HDFS的/dw/accounts
目錄。 第一個字段是用戶ID。其他字段包含了賬戶明細,比如創建日期,姓名等。
// 加載賬戶數據
val accounts = sc.textFile("/dw/accounts/accounts")// 創建Pair RDD (userid -> 賬戶信息數組)
val accountsRDD = accounts.map(line => {val fields = line.split(",")(fields(0), fields) // (userid, Array[賬戶信息])
})// 查看前2條賬戶數據
accountsRDD.take(2).foreach { case (userid, fields) =>println(s"UserID: $userid, Name: ${fields(3)} ${fields(4)}")
}
用map,將accounts里的line作為input,經過計算得到(fields(0), fields)的輸出,其中fields是將每行的line用逗號分割成數組
輸入:
accounts 是一個 RDD[String],每行代表一個賬戶記錄
示例輸入行:"32438,2012-08-22 20:40:31.0,\N,Violet,Searcy,2601 Twin Oaks Drive..."處理邏輯:
line.split(","):將每行字符串按逗號,分割成一個字符串數組fields
示例結果:Array("32438", "2012-08-22 20:40:31.0", "\N", "Violet", "Searcy", ...)
fields(0):取數組的第一個元素(用戶ID)
(fields(0), fields):創建一個鍵值對(Tuple2)輸出:
返回一個新的 Pair RDD,類型為 RDD[(String, Array[String])]
示例輸出元素:("32438", Array("32438", "2012-08-22...))
數據轉換示例:
輸入行:
text
"32438,2012-08-22 20:40:31.0,\N,Violet,Searcy,2601 Twin Oaks Drive..."
經過轉換后變成:
scala
("32438", // 用戶ID作為keyArray( // 所有字段作為value"32438","2012-08-22 20:40:31.0","\\N","Violet","Searcy","2601 Twin Oaks Drive",...)
)
連接weblog數據和賬戶數據,得到以user ID為key的數據集。包含了用戶賬戶信息和網頁數量。
基于賬戶數據創建RDD,由key/value-array對(userid, [values…])組成。
// 這是第一步已經創建的RDD
val user_reqs = logs.map(line => line.split(" ")).map(ar => (ar(2), 1)).reduceByKey(_ + _)
第一步:logs.map(line => line.split(" "))
作用:將原始日志的每一行按空格分割成數組輸入:line(原始日志行,如 "34.28.1.122 - 65255 [01/Mar/2014:23:57:51 ...")處理:split(" ") 按空格分割輸出:字符串數組 Array(34.28.1.122, -, 65255, [01/Mar/2014:23:57:51, ...)關鍵字段位置:
ar(0):IP地址(如 34.28.1.122)
ar(1):分隔符(-)
ar(2):用戶ID(如 65255)← 這是我們需要的字段第二步:.map(ar => (ar(2), 1))
作用:生成 (用戶ID, 1) 的鍵值對輸入:上一步的數組 ar處理:取 ar(2)(用戶ID)作為 key,固定值 1 作為 value輸出:Pair RDD 格式 (String, Int),如 ("65255", 1)為什么是1:每個日志行代表一次訪問,用 1 表示一次計數,方便后續聚合。第三步:.reduceByKey(_ + _)
作用:對相同用戶ID的計數求和輸入:上一步的 (用戶ID, 1) 對處理:
reduceByKey 將相同 key(用戶ID)的 value 合并
_ + _ 是簡寫,等價于 (v1, v2) => v1 + v2輸出:最終統計結果 (用戶ID, 總訪問次數),如 ("65255", 42)數據流示例
假設原始日志有3行:text
"1.1.1.1 - 101 ..." // 用戶101訪問1次
"2.2.2.2 - 102 ..." // 用戶102訪問1次
"3.3.3.3 - 101 ..." // 用戶101再訪問1次
分割后:scala
Array("1.1.1.1", "-", "101", ...)
Array("2.2.2.2", "-", "102", ...)
Array("3.3.3.3", "-", "101", ...)
生成計數對:scala
("101", 1)
("102", 1)
("101", 1)
聚合結果:scala
("101", 2) // 用戶101總計2次
("102", 1) // 用戶102總計1次
連接PairRDD和上一步計算的userid/hitcount鍵值對數據集
// 連接賬戶數據和點擊量數據
val joinedData = accountsRDD.join(user_reqs)// 查看連接后的數據結構
joinedData.take(2).foreach { case (userid, (accountFields, count)) =>println(s"UserID: $userid, Count: $count, Name: ${accountFields(3)} ${accountFields(4)}")
}
第一步:accountsRDD.join(user_reqs)
作用:基于用戶ID關聯賬戶信息和訪問次數
輸入:
accountsRDD: (String, Array[String]) ← (用戶ID, 賬戶字段數組)
user_reqs: (String, Int) ← (用戶ID, 訪問次數)處理:
通過 用戶ID(key) 內連接(inner join)兩個RDD
自動匹配兩個RDD中相同的用戶ID輸出:
新RDD格式:(String, (Array[String], Int))
結構示例:("32438", (Array("32438", "2012-08-22..."), 4))
第二步:joinedData.take(2).foreach
作用:查看前2條連接結果并格式化輸出數據解構:
scala
case (userid, (accountFields, count)) =>// userid: 用戶ID(String)// accountFields: 賬戶字段數組(Array[String])// count: 訪問次數(Int)
字段索引(根據賬戶數據格式):
accountFields(3): 用戶的名(如 "Violet")
accountFields(4): 用戶的姓(如 "Searcy")輸出示例:
text
UserID: 32438, Count: 4, Name: Violet Searcy
UserID: 32439, Count: 25, Name: Eunice Myers數據流詳解
假設有以下數據:
賬戶數據(accountsRDD):
scala
("32438", Array("32438", "2012-08-22", "\\N", "Violet", "Searcy", ...))
("32439", Array("32439", "2012-12-15", "...", "Eunice", "Myers", ...))訪問次數(user_reqs):
scala
("32438", 4)
("32439", 25)連接后結果(joinedData):
scala
("32438", (Array("32438", "2012-08-22", ..., "Violet", "Searcy"), 4)
)
("32439",(Array("32439", "2012-12-15", ..., "Eunice", "Myers"), 25)
)
完整數據映射圖
accountsRDD (用戶ID -> 賬戶詳情) user_reqs (用戶ID -> 訪問次數)
+-----------------------------+ +---------------------+
| "32438" -> [...,Violet,...] | | "32438" -> 4 |
| "32439" -> [...,Eunice,...] | JOIN | "32439" -> 25 |
+-----------------------------+ +---------------------+|v
joinedData (用戶ID -> (賬戶詳情, 訪問次數))
+--------------------------------------------------+
| "32438" -> ([...,Violet,...], 4) -> "Violet 4" |
| "32439" -> ([...,Eunice,...], 25) -> "Eunice 25" |
+--------------------------------------------------+
顯示用戶ID,點擊量和姓名,比如:
userid1 4 Jack Cheng
userid2 25 John Doe
// 格式化輸出:userid, 點擊量, 姓名
val formattedResults = joinedData.map { case (userid, (accountFields, count)) => s"$userid $count ${accountFields(3)} ${accountFields(4)}"
}// 顯示前10條結果
formattedResults.take(10).foreach(println)
1. 輸入數據結構
joinedData 的格式為:
RDD[(String, (Array[String], Int))]
即每個元素是:
(用戶ID, (賬戶字段數組, 訪問次數))示例數據:
scala
("32438", (Array("32438", "2012-08-22", "\\N", "Violet", "Searcy", ...), 4)
)
2. map 操作
作用:對 joinedData 的每個元素進行轉換模式匹配:
case (userid, (accountFields, count)) 解構嵌套元組:
userid:用戶ID(如 "32438")
accountFields:賬戶字段數組
count:訪問次數(如 4)3. 字符串模板
s"$userid $count ${accountFields(3)} ${accountFields(4)}"字段索引:
accountFields(3):名字(如 "Violet")
accountFields(4):姓氏(如 "Searcy")輸出格式:
用戶ID 訪問次數 名 姓
示例:"32438 4 Violet Searcy"
數據轉換流程
原始數據 → 提取字段 → 格式化字符串("32438", (Array["32438",...,"Violet","Searcy",...], 4))
→ 提取 "32438"、4、"Violet"、"Searcy"
→ 拼接成 "32438 4 Violet Searcy"
編寫和運行Spark應用
編寫一個簡單的程序來統計web日志文件中JPG請求的數量。文件名將作為參數傳遞到程序中。
使用Python編寫Spark應用
創建CountJPGs.py
文件,實現統計JPG請求的功能。
import sys
from pyspark import SparkContextif __name__ == "__main__":if len(sys.argv) < 2:print >> sys.stderr, "Usage: CountJPGs.py <logfile>"exit(-1)# TODO: 從參數讀取日志文件,實現統計JPG請求的功能
import sys
from pyspark import SparkContextif __name__ == "__main__":if len(sys.argv) < 2:print >> sys.stderr, "Usage: CountJPGs.py <logfile>"exit(-1)sc = SparkContext()logfile = sys.argv[1]sc.setLogLevel("WARN")count = sc.textFile(logfile).filter(lambda line: '.jpg' in line).count()print "Number of JPG requests: ", countsc.stop()
運行程序
在終端中運行Python程序:
# 使用spark-submit運行
spark-submit --master local[*] CountJPGs.py /dw/weblogs
運行并查看結果:
測試成功后,程序將輸出統計的JPG請求數量。
提交Spark應用到集群
默認,spark-submit
在本地運行應用。在這個部分,運行應用到YARN集群上。
重新運行程序,指定—master yarn
參數
從運行日志中找到應用ID,并運行使用當前ID來查看結果
spark-submit --master yarn --deploy-mode cluster CountJPGs1.py /dw/weblogs
配置Spark應用
使用之前的實驗中使用的CountJPGs.py程序
在命令行設置配置選項
重新運行CountJPGs.py程序,指定應用名'Count JPGs'
訪問RM UI并注意命令行指定的應用名
在屬性文件中設置配置選項
使用文本編輯器,創建$CODE_EXERCISE/myspark.conf
文件,并添加如下配置:
spark.app.name "My Spark App1"
spark.master yarn
spark.executor.memory 600M
使用屬性文件myspark.conf重新運行應用
當應用正在運行,查看YARN UI并確認Spark應用名正確的顯示為"My Spark App1"
設置日志級別
修改/etc/spark/conf/log4j.properties
編輯log4j.properties
。第一行替換為DEBUG:
log4j.rootCategory=DEBUG, console
重新運行Spark應用。
spark-submit --master local[*] CountJPGs.py /dw/weblogs
注意到輸出包含INFO和DEBUG消息,比如
編輯log4j.properties
文件,替換DEBUG為WARN并重新運行。注意只有WARN消息出來。
在Spark應用UI中查看Jobs和Stages
探索基于文件RDD的分區
啟動Spark Shell,為了模擬實際中的多節點集群,使用2個線程運行在本地模式
使用Hue或命令行重新查看賬戶數據集(/dw/accounts/)。注意文件數量
在數據集中基于單個文件創建RDD,比如,/dw/accounts/part-m-00000
。然后調用toDebugString
。在結果RDD中有多少分區?
重復這個流程,但指定最小3個分區:sc.textFile(filename,3)
。RDD是否有3個分區?
最后,基于賬戶數據集的所有文件創建RDD。比較一下文件數和RDD的分區數?
設置作業
創建accounts RDD,key是賬戶id,value是姓名
創建userreqs RDD,統計每個用戶頁面點擊總數
通過user ID進行連接,并基于名字、姓和點擊總量重構新的RDD
打印出accounthits.toDebugString
的結果并查看輸出,基于這個信息,看是否能確定:
- 作業中有多少stage?
- Stage之間的依賴關系?
- 每個stage包含多少tasks?
運行作業
通過瀏覽器查看Spark應用UI,http://master:4040
在Spark應用UI中檢查Job
在Spark UI中,確保選擇了Jobs標簽。
重新運行shell并通過執行action(saveAsTextFile)來啟動作業
重新加載Spark UI Jobs頁面。
點擊job description來查看stages。
點擊stages來查看stage詳情。
當作業完成后,返回Jobs標簽查看執行的任務的最終統計和作業花費的時間。
持久化RDD
在上一個實驗的基礎上來完成后面的步驟
統計用戶點擊量大于5的賬戶數量
調用accounthits.persist()
來緩存RDD
在瀏覽器中,查看Spark應用UI并選擇Storage標簽。現在你已經標記RDD被持久化,但是還沒有執行action操作使得它持久化。
在Spark Shell,執行count操作
查看RDD的toDebugString
。注意輸出包含了選擇的持久化級別。
重新加載Storage標簽,注意持久化的RDD顯示出來。點擊RDD ID查看分區和持久化的明細。
點擊executors標簽并使用的內存量和可用的工作節點。
scala> val accounthits = joined.filter(pair => pair._2._1 > 5)scala> accounthits.persist()
res0: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> accounthits.count()
res1: Long = 3721scala> accounthits.toDebugString
res2: String =
(15) MapPartitionsRDD[15] at filter at <console>:25 [Memory Deserialized 1x Replicated]| CachedPartitions: 15; MemorySize: 643.5 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B| MapPartitionsRDD[13] at join at <console>:27 [Memory Deserialized 1x Replicated]| MapPartitionsRDD[12] at join at <console>:27 [Memory Deserialized 1x Replicated]| CoGroupedRDD[11] at join at <console>:27 [Memory Deserialized 1x Replicated]| ShuffledRDD[4] at reduceByKey at <console>:25 [Memory Deserialized 1x Replicated]+-(15) MapPartitionsRDD[3] at map at <console>:25 [Memory Deserialized 1x Replicated]| MapPartitionsRDD[2] at map at <console>:25 [Memory Deserialized 1x Replicated]| /dw/weblogs MapPartitionsRDD[1] at textFile at <console>:24 [Memory De...
--清理緩存
scala> accounthits.unpersist()
res3: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevelscala> accounthits.persist(StorageLevel.DISK_ONLY)
res4: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> accounthits.count()
Spark SQL處理Hive表數據
數據探索任務
讀取 Hive 表?db_userapp.user_base_info
,并查看表結構和部分數據。
檢查新的DataFrame的schema
創建新的DataFrame,選擇?user_id
?和?user_name
?列
將DataFrame轉換為Pair RDD,字段為?user_id
?和?user_name
?列
scala> val df_user = spark.read.table("db_userapp.user_base_info")
scala> df_user.printSchema
root|-- user_id: long (nullable = true)|-- user_name: string (nullable = true)|-- age: integer (nullable = true)|-- gender_id: integer (nullable = true)|-- edu_id: integer (nullable = true)|-- marital_id: integer (nullable = true)|-- property_cert_count: integer (nullable = true)|-- car_count: integer (nullable = true)scala> val df_user2 = df_user.select($"user_id",$"user_name")scala> df_user.select($"age"+10).show(4)
+----------+
|(age + 10)|
+----------+
| 95|
| 36|
| 28|
| 80|
+----------+
only showing top 4 rowsscala> df_user.sort($"age".desc).show(4)
+-------+---------+---+---------+------+----------+-------------------+---------+
|user_id|user_name|age|gender_id|edu_id|marital_id|property_cert_count|car_count|
+-------+---------+---+---------+------+----------+-------------------+---------+
| 27| 宋凌寒| 85| 1| 5| 4| 0| 0|
| 1| 周雅芙| 85| 1| 4| 2| 0| 0|
| 41| 黃冰萍| 85| 1| 6| 3| 4| 0|
| 22| 韓映雪| 85| 2| 3| 3| 0| 0|
+-------+---------+---+---------+------+----------+-------------------+---------+
only showing top 4 rowsscala> df_user.registerTempTable("tbl_user")
warning: there was one deprecation warning; re-run with -deprecation for detailsscala> spark.sql("select user_name,age from tbl_user limit 3").show()
+---------+---+
|user_name|age|
+---------+---+
| 周雅芙| 85|
| 王從夢| 26|
| 孫憶翠| 18|
+---------+---+scala> val rdd_user = df_user.rdd.map(row => (row(0).toString,row(1).toString))
關于代碼 val rdd_user = df_user.rdd.map(row => (row(0).toString,row(1).toString)的解析
1. 原始RDD數據結構
scala
Array([1,周雅芙,85,1,4,2,0,0], // 第一行[2,王從夢,26,1,3,1,0,0] // 第二行
)
每行是一個org.apache.spark.sql.Row對象字段順序對應DataFrame的Schema:
scala
root|-- user_id: long (index 0)|-- user_name: string (index 1)|-- age: integer (index 2)|-- gender_id: integer (index 3)|-- edu_id: integer (index 4)|-- marital_id: integer (index 5)|-- property_cert_count: integer (index 6)|-- car_count: integer (index 7)2. 轉換操作詳解
原始代碼:
scala
val rdd_user = df_user.rdd.map(row => (row(0).toString, row(1).toString)
)
對第一行 [1,周雅芙,85,...] 的處理:
row(0) → 取第0個字段:1 (Long型)
.toString → 轉為字符串:"1"
row(1) → 取第1個字段:"周雅芙" (已是String)
最終生成鍵值對:("1", "周雅芙")對第二行 [2,王從夢,26,...] 的處理:
同理生成:("2", "王從夢")3. 轉換后的RDD內容
執行rdd_user.take(2)會得到:
Array(("1", "周雅芙"), ("2", "王從夢")
)
數據變換任務
新增年齡分組字段:根據age
字段,將用戶分為“未成年”(<18)、“青年”(18-29)、“中年”(30-49)、“老年”(≥50)四類,新增age_group
字段。
拼接姓名與年齡:將user_name
和age
字段拼接為新字段name_age
,格式如“張三-25”。
篩選有房且有車的用戶:篩選property_cert_count
>0且car_count
>0的用戶,輸出其user_id
、user_name
、property_cert_count
、car_count
。
val df_user3 = df_user.withColumn(
"age_group",
when(col("age") <18, "未成年")
.when(col("age") <30, "青年")
.when(col("age") <50, "中年")
.otherwise("老年")
)
年齡分組代碼的解析:
2.1 withColumn 方法
功能:向DataFrame添加新列或替換現有列參數:
第一個參數:新列名(此處為"age_group")
第二個參數:列表達式(此處為when-otherwise條件鏈)2.2 when 條件表達式
工作方式:類似SQL的CASE WHEN語句
鏈式調用:多個when可以串聯,最后以otherwise結束
執行順序:從上到下依次判斷,第一個滿足的條件即返回對應值2.3 col("age") 列引用
指向DataFrame中的age列
所有比較操作都基于該列值3. 條件邏輯分解
條件判斷 分組標簽 對應年齡段
age < 18 未成年 小于18歲
age between 18 and 29 青年 18-29歲(含)
age between 30 and 49 中年 30-49歲(含)
以上都不滿足(otherwise) 老年 50歲及以上4. 執行過程示例
以原始數據中的兩行為例:
[1,周雅芙,85,...] // age=85
[2,王從夢,26,...] // age=26
第一行處理:
85 < 18? → 否
85 between 18-29? → 否
85 between 30-49? → 否
執行otherwise → "老年"第二行處理:
26 < 18? → 否
26 between 18-29? → 是 → "青年"
統計分析任務
統計不同學歷(edu_id)用戶的數量。
統計不同性別(gender_id)用戶的平均年齡。
統計擁有房產證數量大于0的用戶比例。
將處理結果保存為 Parquet 和 JSON 格式到 HDFS 指定目錄。