實驗03-Spark批處理開發

使用Spark Shell探索RDD

啟動并使用Scala Spark Shell

在終端窗口,啟動Scala Spark shell:

spark-shell --master local

查看對象:

scala> sc
scala> spark

輸入spark.[TAB]然后可以看到所有可用的方法。

讀并顯示文本文件

查看文本$DATA_EXERCISE/frostroad.txt

讀取本地文件來創建RDD。Spark并沒有讀文件,直到你執行了action操作,比如統計數據集行數。

嘗試執行collect操作來顯示RDD的所有數據。

輸入mydata.[TAB]可以看到所有可用的轉換操作。

輸入exit退出。

操作代碼

[root@master ~]# spark-shell --master localscala> val fr_rdd = sc.textFile("file:/root/dataExercise/frostroad.txt")
fr_rdd: org.apache.spark.rdd.RDD[String] = file:/root/dataExercise/frostroad.txt MapPartitionsRDD[1] at textFile at <console>:24scala> fr_rdd.take(2)
res0: Array[String] = Array(Two roads diverged in a yellow wood,, And sorry I could not travel both)scala> fr_rdd.collect()scala> fr_rdd.collect().foreach(println)scala> fr_rdd.
++                    first                    max                  take              
aggregate             flatMap                  min                  takeAsync         
barrier               fold                     name                 takeOrdered       
cache                 foreach            

使用RDD來轉換數據集

探索Web日志文件

日志文件/dw/weblogs示例:

22.252.151.164 - 23251 [15/Mar/2014:00:00:30 +0100] "GET /KBDOC-00150.html HTTP/1.0" 200 19203 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F11L"
22.252.151.164 - 23251 [15/Mar/2014:00:00:30 +0100] "GET /theme.css HTTP/1.0" 200 10684 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F11L"

從數據文件創建RDD。

創建只包含請求圖片JPG文件的RDD。

使用take查看前10行數據。

在日志中返回每行的長度。

把每一行映射成一個數組,查看前5條。

定義新的RDD,日志文件的每一行只包含IP地址。

最后,保存IP地址列表到/dw/iplist

在終端窗口或Hue文件瀏覽器,列出/dw/iplist目錄內容。你可以看到多個part-xxxxx文件。查看文件內容確認結果是正確的。

如果需要節約內存,可以停止cloudera服務

service cloudera-scm-server stop

service cloudera-scm-agent stop

scala> val logs = sc.textFile("/dw/weblogs")
logs: org.apache.spark.rdd.RDD[String] = /dw/weblogs MapPartitionsRDD[1] at textFile at <console>:24scala> logs.take(2)
res0: Array[String] = Array(34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /ifruit_3a_sales.html HTTP/1.0" 200 11416 "http://www.loudacre.com"  "Loudacre Mobile Browser Titanic 2300", 34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /theme.css HTTP/1.0" 200 14933 "http://www.loudacre.com"  "Loudacre Mobile Browser Titanic 2300")scala> val jpglogs = logs.filter(line => line.contains(".jpg"))scala> jpglogs.take(2)
res1: Array[String] = Array(34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /ifruit_3a.jpg HTTP/1.0" 200 12554 "http://www.loudacre.com"  "Loudacre Mobile Browser Titanic 2300", 242.13.139.123 - 66694 [01/Mar/2014:23:54:48 +0100] "GET /sorrento_f10l.jpg HTTP/1.0" 200 649 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F10L")scala> val logsLen = logs.map(line => line.length)
logsLen: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:25scala> logsLen.take(5)
res2: Array[Int] = Array(161, 150, 148, 154, 160)scala> val logs_split = logs.map(line => line.split(" "))
logs_split: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:25scala> logs_split.take(2)scala> val ip_list = logs_split.map(ar => ar(0))
ip_list: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:25scala> ip_list.take(2)
res5: Array[String] = Array(34.28.1.122, 34.28.1.122)scala> ip_list.saveAsTextFile("/dw/iplist")

使用Spark處理數據文件

檢查數據

檢查$DATA_EXERCISE/activations里的數據,每個XML文件包含了客戶在指定月份活躍的設備數據。

拷貝數據到HDFS的/dw目錄

樣本數據示例:

<activations><activation timestamp="1225499258" type="phone"><account-number>316</account-number><device-id>d61b6971-33e1-42f0-bb15-aa2ae3cd8680</device-id><phone-number>5108307062</phone-number><model>iFruit 1</model></activation>…
</activations>

處理文件

讀取XML文件并抽取賬戶號和設備型號,把結果保存到/dw/account-models,格式為account_number:model

輸出示例:

1234:iFruit 1
987:Sorrento F00L
4566:iFruit 1
…

提供了解析XML的函數如下:

// Stub code to copy into Spark Shellimport scala.xml._// Given a string containing XML, parse the string, and 
// return an iterator of activation XML records (Nodes) contained in the stringdef getActivations(xmlstring: String): Iterator[Node] = {val nodes = XML.loadString(xmlstring) \\ "activation"nodes.toIterator
}// Given an activation record (XML Node), return the model name
def getModel(activation: Node): String = {(activation \ "model").text
}// Given an activation record (XML Node), return the account number
def getAccount(activation: Node): String = {(activation \ "account-number").text
}

操作命令

上傳文件?hdfs dfs -put $DATA_EXERCISE/activations /dw

scala> val xmls = sc.wholeTextFiles("/dw/activations")
xmls: org.apache.spark.rdd.RDD[(String, String)] = /dw/activations MapPartitionsRDD[12] at wholeTextFiles at <console>:27scala> xmls.take(1)
res9: Array[(String, String)] =
Array((hdfs://master:8020/dw/activations/2013-01.xml,<activations><activation timestamp="1359697709" type="phone"><account-number>97349</account-number><device-id>e17ff6a8-0899-4a87-972b-30230ebfa6b9</device-id><phone-number>4247767545</phone-number><model>iFruit 4</model></activation><activation timestamp="1359697637" type="phone"><account-number>97068</account-number><device-id>49beb012-d410-40c8-84b6-a5753b68c607</device-id><phone-number>5592763034</phone-number><model>Ronin S1</model></activation><activation timestamp="1359696681" type="phone"><account-number>82601</account-number><device-id>ed58b95d-a7f3-4333-be06-d53890ef1a08</device-id><phone-number>503470...
scala> scala> val xmls_flat = xmls.flatMap(pair => getActivations(pair._2))
xmls_flat: org.apache.spark.rdd.RDD[scala.xml.Node] = MapPartitionsRDD[13] at flatMap at <console>:30scala> xmls_flat.take(1)
res10: Array[scala.xml.Node] =                                                  
Array(<activation type="phone" timestamp="1359697709"><account-number>97349</account-number><device-id>e17ff6a8-0899-4a87-972b-30230ebfa6b9</device-id><phone-number>4247767545</phone-number><model>iFruit 4</model></activation>)scala> val acc_model = xmls_flat.map(act => getAccount(act)+":"+getModel(act))
acc_model: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at map at <console>:32scala> acc_model.take(1)
res11: Array[String] = Array(97349:iFruit 4)scala> acc_model.saveAsTextFile("/dw/account-models")

使用Pair RDD來連接2個數據集

探索Web日志文件

使用map-reduce,統計每個用戶的請求。

  • 使用map創建Pair RDD,User ID作為key,整數1作為value(user ID是每行的第三個字段)

  • 匯總每個用戶的value

使用countByKey來確定對不同的頻率有多少用戶訪問了網站。即有多少用戶訪問了1次、兩次或者三次等等

  • 使用map來倒轉key和value,類似于:
(userID, count) => (count, userID)
  • 使用countByKey來返回(頻率:用戶數)鍵值對的Map

創建一個RDD,用戶id為key,用戶訪問的ip地址作為value。 - 提示:Map為(userid,ipaddress)并使用groupByKey。

scala> val logs = sc.textFile("/dw/weblogs")
logs: org.apache.spark.rdd.RDD[String] = /dw/weblogs MapPartitionsRDD[17] at textFile at <console>:27scala> val user_reqs = logs.map(line => line.split(" ")).map(ar => (ar(2),1)).reduceByKey((v1,v2) => v1+v2)
user_reqs: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:28scala> user_reqs.take(2)
res15: Array[(String, Int)] = Array((92694,4), (49368,4))                       scala> user_reqs.map(pair => (pair._2, pair._1)).countByKey()
res18: scala.collection.Map[Int,Long] = Map(138 -> 6, 170 -> 1, 5 -> 19, 120 -> 6, 10 -> 228, 142 -> 9, 14 -> 20, 110 -> 2, 152 -> 6, 164 -> 1, 106 -> 3, 132 -> 10, 116 -> 10, 
…………)scala> val user_ips = logs.map(line => line.split(" ")).map(ar => (ar(2),ar(0))).groupByKey()
user_ips: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[32] at groupByKey at <console>:28scala> user_ips.take(2)
res20: Array[(String, Iterable[String])] = Array((92694,CompactBuffer(84.206.178.154, 84.206.178.154, 84.206.178.154, 84.206.178.154)), (49368,CompactBuffer(34.179.95.142, 34.179.95.142, 34.179.95.142, 34.179.95.142)))   scala> for (pair <- user_ips.take(2)){| println("userid:"+pair._1)| for (ip <- pair._2) println("-- ip:"+ip)| }
userid:92694
-- ip:84.206.178.154
-- ip:84.206.178.154
-- ip:84.206.178.154
-- ip:84.206.178.154
userid:49368
-- ip:34.179.95.142
-- ip:34.179.95.142
-- ip:34.179.95.142
-- ip:34.179.95.142
解析:
map(line => line.split(" ")) - 將每行日志按空格分割成數組
map(ar => (ar(2),1)) - 提取第三個字段(索引2)作為用戶ID,創建鍵值對(userID, 1)格式假設為:[IP地址] [其他字段] [用戶ID] [其他字段]...
reduceByKey((v1,v2) => v1+v2) - 對相同用戶ID的值進行累加
結果得到(userID, 總請求次數)的Pair RDD示例輸出:
Array((92694,4), (49368,4))3. 統計訪問頻率分布
scala
user_reqs.map(pair => (pair._2, pair._1)).countByKey()
詳細步驟:
map(pair => (pair._2, pair._1)) - 反轉鍵值對,從(userID, count)變為(count, userID)
countByKey() - 統計每個訪問次數(count)有多少不同的用戶
返回一個Map,鍵是訪問次數,值是對應的用戶數量示例輸出:
Map(138 -> 6, 170 -> 1, 5 -> 19, 120 -> 6, ...)表示:
有6個用戶訪問了138次
有1個用戶訪問了170次
有19個用戶訪問了5次
有6個用戶訪問了120次5. 打印用戶IP信息
scala
for (pair <- user_ips.take(2)){println("userid:"+pair._1)for (ip <- pair._2) println("-- ip:"+ip)
}
詳細步驟:
user_ips.take(2) - 獲取前兩個用戶的記錄
對于每個用戶:
打印用戶ID
遍歷該用戶的所有IP地址并打印

連接賬戶數據和Web日志文件

上傳賬戶數據$DATA_EXERCISE/accounts到HDFS的/dw/accounts目錄。 第一個字段是用戶ID。其他字段包含了賬戶明細,比如創建日期,姓名等。

// 加載賬戶數據
val accounts = sc.textFile("/dw/accounts/accounts")// 創建Pair RDD (userid -> 賬戶信息數組)
val accountsRDD = accounts.map(line => {val fields = line.split(",")(fields(0), fields) // (userid, Array[賬戶信息])
})// 查看前2條賬戶數據
accountsRDD.take(2).foreach { case (userid, fields) =>println(s"UserID: $userid, Name: ${fields(3)} ${fields(4)}")
}

用map,將accounts里的line作為input,經過計算得到(fields(0), fields)的輸出,其中fields是將每行的line用逗號分割成數組

輸入:
accounts 是一個 RDD[String],每行代表一個賬戶記錄
示例輸入行:"32438,2012-08-22 20:40:31.0,\N,Violet,Searcy,2601 Twin Oaks Drive..."處理邏輯:
line.split(","):將每行字符串按逗號,分割成一個字符串數組fields
示例結果:Array("32438", "2012-08-22 20:40:31.0", "\N", "Violet", "Searcy", ...)
fields(0):取數組的第一個元素(用戶ID)
(fields(0), fields):創建一個鍵值對(Tuple2)輸出:
返回一個新的 Pair RDD,類型為 RDD[(String, Array[String])]
示例輸出元素:("32438", Array("32438", "2012-08-22...))
數據轉換示例:
輸入行:
text
"32438,2012-08-22 20:40:31.0,\N,Violet,Searcy,2601 Twin Oaks Drive..."
經過轉換后變成:
scala
("32438",  // 用戶ID作為keyArray(    // 所有字段作為value"32438","2012-08-22 20:40:31.0","\\N","Violet","Searcy","2601 Twin Oaks Drive",...)
)

連接weblog數據和賬戶數據,得到以user ID為key的數據集。包含了用戶賬戶信息和網頁數量。

基于賬戶數據創建RDD,由key/value-array對(userid, [values…])組成。

// 這是第一步已經創建的RDD
val user_reqs = logs.map(line => line.split(" ")).map(ar => (ar(2), 1)).reduceByKey(_ + _)
第一步:logs.map(line => line.split(" "))
作用:將原始日志的每一行按空格分割成數組輸入:line(原始日志行,如 "34.28.1.122 - 65255 [01/Mar/2014:23:57:51 ...")處理:split(" ") 按空格分割輸出:字符串數組 Array(34.28.1.122, -, 65255, [01/Mar/2014:23:57:51, ...)關鍵字段位置:
ar(0):IP地址(如 34.28.1.122)
ar(1):分隔符(-)
ar(2):用戶ID(如 65255)← 這是我們需要的字段第二步:.map(ar => (ar(2), 1))
作用:生成 (用戶ID, 1) 的鍵值對輸入:上一步的數組 ar處理:取 ar(2)(用戶ID)作為 key,固定值 1 作為 value輸出:Pair RDD 格式 (String, Int),如 ("65255", 1)為什么是1:每個日志行代表一次訪問,用 1 表示一次計數,方便后續聚合。第三步:.reduceByKey(_ + _)
作用:對相同用戶ID的計數求和輸入:上一步的 (用戶ID, 1) 對處理:
reduceByKey 將相同 key(用戶ID)的 value 合并
_ + _ 是簡寫,等價于 (v1, v2) => v1 + v2輸出:最終統計結果 (用戶ID, 總訪問次數),如 ("65255", 42)數據流示例
假設原始日志有3行:text
"1.1.1.1 - 101 ..."  // 用戶101訪問1次
"2.2.2.2 - 102 ..."  // 用戶102訪問1次 
"3.3.3.3 - 101 ..."  // 用戶101再訪問1次
分割后:scala
Array("1.1.1.1", "-", "101", ...)  
Array("2.2.2.2", "-", "102", ...)
Array("3.3.3.3", "-", "101", ...)
生成計數對:scala
("101", 1)
("102", 1)
("101", 1)
聚合結果:scala
("101", 2)  // 用戶101總計2次
("102", 1)  // 用戶102總計1次

連接PairRDD和上一步計算的userid/hitcount鍵值對數據集

// 連接賬戶數據和點擊量數據
val joinedData = accountsRDD.join(user_reqs)// 查看連接后的數據結構
joinedData.take(2).foreach { case (userid, (accountFields, count)) =>println(s"UserID: $userid, Count: $count, Name: ${accountFields(3)} ${accountFields(4)}")
}
第一步:accountsRDD.join(user_reqs)
作用:基于用戶ID關聯賬戶信息和訪問次數
輸入:
accountsRDD: (String, Array[String]) ← (用戶ID, 賬戶字段數組)
user_reqs: (String, Int) ← (用戶ID, 訪問次數)處理:
通過 用戶ID(key) 內連接(inner join)兩個RDD
自動匹配兩個RDD中相同的用戶ID輸出:
新RDD格式:(String, (Array[String], Int))
結構示例:("32438", (Array("32438", "2012-08-22..."), 4))
第二步:joinedData.take(2).foreach
作用:查看前2條連接結果并格式化輸出數據解構:
scala
case (userid, (accountFields, count)) =>// userid: 用戶ID(String)// accountFields: 賬戶字段數組(Array[String])// count: 訪問次數(Int)
字段索引(根據賬戶數據格式):
accountFields(3): 用戶的名(如 "Violet")
accountFields(4): 用戶的姓(如 "Searcy")輸出示例:
text
UserID: 32438, Count: 4, Name: Violet Searcy
UserID: 32439, Count: 25, Name: Eunice Myers數據流詳解
假設有以下數據:
賬戶數據(accountsRDD):
scala
("32438", Array("32438", "2012-08-22", "\\N", "Violet", "Searcy", ...))
("32439", Array("32439", "2012-12-15", "...", "Eunice", "Myers", ...))訪問次數(user_reqs):
scala
("32438", 4)
("32439", 25)連接后結果(joinedData):
scala
("32438", (Array("32438", "2012-08-22", ..., "Violet", "Searcy"), 4)
)
("32439",(Array("32439", "2012-12-15", ..., "Eunice", "Myers"), 25)
)
完整數據映射圖
accountsRDD (用戶ID -> 賬戶詳情)       user_reqs (用戶ID -> 訪問次數)
+-----------------------------+       +---------------------+
| "32438" -> [...,Violet,...] |       | "32438" -> 4        |
| "32439" -> [...,Eunice,...] | JOIN  | "32439" -> 25       |
+-----------------------------+       +---------------------+|v
joinedData (用戶ID -> (賬戶詳情, 訪問次數))
+--------------------------------------------------+
| "32438" -> ([...,Violet,...], 4) -> "Violet 4"   |
| "32439" -> ([...,Eunice,...], 25) -> "Eunice 25" |
+--------------------------------------------------+

顯示用戶ID,點擊量和姓名,比如:

userid1 4 Jack Cheng
userid2 25 John Doe
// 格式化輸出:userid, 點擊量, 姓名
val formattedResults = joinedData.map { case (userid, (accountFields, count)) => s"$userid $count ${accountFields(3)} ${accountFields(4)}"
}// 顯示前10條結果
formattedResults.take(10).foreach(println)
1. 輸入數據結構
joinedData 的格式為:
RDD[(String, (Array[String], Int))]
即每個元素是:
(用戶ID, (賬戶字段數組, 訪問次數))示例數據:
scala
("32438", (Array("32438", "2012-08-22", "\\N", "Violet", "Searcy", ...), 4)
)
2. map 操作
作用:對 joinedData 的每個元素進行轉換模式匹配:
case (userid, (accountFields, count)) 解構嵌套元組:
userid:用戶ID(如 "32438")
accountFields:賬戶字段數組
count:訪問次數(如 4)3. 字符串模板
s"$userid $count ${accountFields(3)} ${accountFields(4)}"字段索引:
accountFields(3):名字(如 "Violet")
accountFields(4):姓氏(如 "Searcy")輸出格式:
用戶ID 訪問次數 名 姓
示例:"32438 4 Violet Searcy"
數據轉換流程
原始數據 → 提取字段 → 格式化字符串("32438", (Array["32438",...,"Violet","Searcy",...], 4)) 
→ 提取 "32438"、4、"Violet"、"Searcy"  
→ 拼接成 "32438 4 Violet Searcy"

編寫和運行Spark應用

編寫一個簡單的程序來統計web日志文件中JPG請求的數量。文件名將作為參數傳遞到程序中。

使用Python編寫Spark應用

創建CountJPGs.py文件,實現統計JPG請求的功能。

import sys
from pyspark import SparkContextif __name__ == "__main__":if len(sys.argv) < 2:print >> sys.stderr, "Usage: CountJPGs.py <logfile>"exit(-1)# TODO: 從參數讀取日志文件,實現統計JPG請求的功能
import sys
from pyspark import SparkContextif __name__ == "__main__":if len(sys.argv) < 2:print >> sys.stderr, "Usage: CountJPGs.py <logfile>"exit(-1)sc = SparkContext()logfile = sys.argv[1]sc.setLogLevel("WARN")count = sc.textFile(logfile).filter(lambda line: '.jpg' in line).count()print "Number of JPG requests: ", countsc.stop()

運行程序

在終端中運行Python程序:

# 使用spark-submit運行
spark-submit --master local[*] CountJPGs.py /dw/weblogs

運行并查看結果:

測試成功后,程序將輸出統計的JPG請求數量。

提交Spark應用到集群

默認,spark-submit在本地運行應用。在這個部分,運行應用到YARN集群上。

重新運行程序,指定—master yarn參數

從運行日志中找到應用ID,并運行使用當前ID來查看結果

spark-submit --master yarn --deploy-mode cluster CountJPGs1.py /dw/weblogs

配置Spark應用

使用之前的實驗中使用的CountJPGs.py程序

在命令行設置配置選項

重新運行CountJPGs.py程序,指定應用名'Count JPGs'

訪問RM UI并注意命令行指定的應用名

在屬性文件中設置配置選項

使用文本編輯器,創建$CODE_EXERCISE/myspark.conf文件,并添加如下配置:

spark.app.name "My Spark App1"
spark.master yarn
spark.executor.memory 600M

使用屬性文件myspark.conf重新運行應用

當應用正在運行,查看YARN UI并確認Spark應用名正確的顯示為"My Spark App1"

設置日志級別

修改/etc/spark/conf/log4j.properties

編輯log4j.properties。第一行替換為DEBUG:

log4j.rootCategory=DEBUG, console

重新運行Spark應用。

spark-submit --master local[*] CountJPGs.py /dw/weblogs

注意到輸出包含INFO和DEBUG消息,比如

編輯log4j.properties文件,替換DEBUG為WARN并重新運行。注意只有WARN消息出來。

在Spark應用UI中查看Jobs和Stages

探索基于文件RDD的分區

啟動Spark Shell,為了模擬實際中的多節點集群,使用2個線程運行在本地模式

使用Hue或命令行重新查看賬戶數據集(/dw/accounts/)。注意文件數量

在數據集中基于單個文件創建RDD,比如,/dw/accounts/part-m-00000。然后調用toDebugString。在結果RDD中有多少分區?

重復這個流程,但指定最小3個分區:sc.textFile(filename,3)。RDD是否有3個分區?

最后,基于賬戶數據集的所有文件創建RDD。比較一下文件數和RDD的分區數?

設置作業

創建accounts RDD,key是賬戶id,value是姓名

創建userreqs RDD,統計每個用戶頁面點擊總數

通過user ID進行連接,并基于名字、姓和點擊總量重構新的RDD

打印出accounthits.toDebugString的結果并查看輸出,基于這個信息,看是否能確定:

  1. 作業中有多少stage?
  2. Stage之間的依賴關系?
  3. 每個stage包含多少tasks?

運行作業

通過瀏覽器查看Spark應用UI,http://master:4040

在Spark應用UI中檢查Job

在Spark UI中,確保選擇了Jobs標簽。

重新運行shell并通過執行action(saveAsTextFile)來啟動作業

重新加載Spark UI Jobs頁面。

點擊job description來查看stages。

點擊stages來查看stage詳情。

當作業完成后,返回Jobs標簽查看執行的任務的最終統計和作業花費的時間。

持久化RDD

在上一個實驗的基礎上來完成后面的步驟

統計用戶點擊量大于5的賬戶數量

調用accounthits.persist()來緩存RDD

在瀏覽器中,查看Spark應用UI并選擇Storage標簽。現在你已經標記RDD被持久化,但是還沒有執行action操作使得它持久化。

在Spark Shell,執行count操作

查看RDD的toDebugString。注意輸出包含了選擇的持久化級別。

重新加載Storage標簽,注意持久化的RDD顯示出來。點擊RDD ID查看分區和持久化的明細。

點擊executors標簽并使用的內存量和可用的工作節點。

scala> val accounthits = joined.filter(pair => pair._2._1 > 5)scala> accounthits.persist()
res0: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> accounthits.count()
res1: Long = 3721scala> accounthits.toDebugString
res2: String =
(15) MapPartitionsRDD[15] at filter at <console>:25 [Memory Deserialized 1x Replicated]|        CachedPartitions: 15; MemorySize: 643.5 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B|   MapPartitionsRDD[13] at join at <console>:27 [Memory Deserialized 1x Replicated]|   MapPartitionsRDD[12] at join at <console>:27 [Memory Deserialized 1x Replicated]|   CoGroupedRDD[11] at join at <console>:27 [Memory Deserialized 1x Replicated]|   ShuffledRDD[4] at reduceByKey at <console>:25 [Memory Deserialized 1x Replicated]+-(15) MapPartitionsRDD[3] at map at <console>:25 [Memory Deserialized 1x Replicated]|   MapPartitionsRDD[2] at map at <console>:25 [Memory Deserialized 1x Replicated]|   /dw/weblogs MapPartitionsRDD[1] at textFile at <console>:24 [Memory De...
--清理緩存
scala> accounthits.unpersist()
res3: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevelscala> accounthits.persist(StorageLevel.DISK_ONLY)
res4: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> accounthits.count()

Spark SQL處理Hive表數據

數據探索任務

讀取 Hive 表?db_userapp.user_base_info,并查看表結構和部分數據。

檢查新的DataFrame的schema

創建新的DataFrame,選擇?user_id?和?user_name?列

將DataFrame轉換為Pair RDD,字段為?user_id?和?user_name?列

scala> val df_user = spark.read.table("db_userapp.user_base_info")
scala> df_user.printSchema
root|-- user_id: long (nullable = true)|-- user_name: string (nullable = true)|-- age: integer (nullable = true)|-- gender_id: integer (nullable = true)|-- edu_id: integer (nullable = true)|-- marital_id: integer (nullable = true)|-- property_cert_count: integer (nullable = true)|-- car_count: integer (nullable = true)scala> val df_user2 = df_user.select($"user_id",$"user_name")scala> df_user.select($"age"+10).show(4)
+----------+
|(age + 10)|
+----------+
|        95|
|        36|
|        28|
|        80|
+----------+
only showing top 4 rowsscala> df_user.sort($"age".desc).show(4)
+-------+---------+---+---------+------+----------+-------------------+---------+
|user_id|user_name|age|gender_id|edu_id|marital_id|property_cert_count|car_count|
+-------+---------+---+---------+------+----------+-------------------+---------+
|     27|   宋凌寒| 85|        1|     5|         4|                  0|        0|
|      1|   周雅芙| 85|        1|     4|         2|                  0|        0|
|     41|   黃冰萍| 85|        1|     6|         3|                  4|        0|
|     22|   韓映雪| 85|        2|     3|         3|                  0|        0|
+-------+---------+---+---------+------+----------+-------------------+---------+
only showing top 4 rowsscala> df_user.registerTempTable("tbl_user")
warning: there was one deprecation warning; re-run with -deprecation for detailsscala> spark.sql("select user_name,age from tbl_user limit 3").show()
+---------+---+
|user_name|age|
+---------+---+
|   周雅芙| 85|
|   王從夢| 26|
|   孫憶翠| 18|
+---------+---+scala> val rdd_user = df_user.rdd.map(row => (row(0).toString,row(1).toString))

關于代碼 val rdd_user = df_user.rdd.map(row => (row(0).toString,row(1).toString)的解析
1. 原始RDD數據結構
scala
Array([1,周雅芙,85,1,4,2,0,0],  // 第一行[2,王從夢,26,1,3,1,0,0]   // 第二行
)
每行是一個org.apache.spark.sql.Row對象字段順序對應DataFrame的Schema:
scala
root|-- user_id: long (index 0)|-- user_name: string (index 1)|-- age: integer (index 2)|-- gender_id: integer (index 3)|-- edu_id: integer (index 4)|-- marital_id: integer (index 5)|-- property_cert_count: integer (index 6)|-- car_count: integer (index 7)2. 轉換操作詳解
原始代碼:
scala
val rdd_user = df_user.rdd.map(row => (row(0).toString, row(1).toString)
)
對第一行 [1,周雅芙,85,...] 的處理:
row(0) → 取第0個字段:1 (Long型)
.toString → 轉為字符串:"1"
row(1) → 取第1個字段:"周雅芙" (已是String)
最終生成鍵值對:("1", "周雅芙")對第二行 [2,王從夢,26,...] 的處理:
同理生成:("2", "王從夢")3. 轉換后的RDD內容
執行rdd_user.take(2)會得到:
Array(("1", "周雅芙"), ("2", "王從夢")
)

數據變換任務

新增年齡分組字段:根據age字段,將用戶分為“未成年”(<18)、“青年”(18-29)、“中年”(30-49)、“老年”(≥50)四類,新增age_group字段。

拼接姓名與年齡:將user_nameage字段拼接為新字段name_age,格式如“張三-25”。

篩選有房且有車的用戶:篩選property_cert_count>0且car_count>0的用戶,輸出其user_iduser_nameproperty_cert_countcar_count

val df_user3 = df_user.withColumn(
"age_group",
when(col("age") <18, "未成年")
.when(col("age") <30, "青年")
.when(col("age") <50, "中年")
.otherwise("老年")
)
年齡分組代碼的解析:
2.1 withColumn 方法
功能:向DataFrame添加新列或替換現有列參數:
第一個參數:新列名(此處為"age_group")
第二個參數:列表達式(此處為when-otherwise條件鏈)2.2 when 條件表達式
工作方式:類似SQL的CASE WHEN語句
鏈式調用:多個when可以串聯,最后以otherwise結束
執行順序:從上到下依次判斷,第一個滿足的條件即返回對應值2.3 col("age") 列引用
指向DataFrame中的age列
所有比較操作都基于該列值3. 條件邏輯分解
條件判斷	分組標簽	對應年齡段
age < 18	未成年	小于18歲
age between 18 and 29	青年	18-29歲(含)
age between 30 and 49	中年	30-49歲(含)
以上都不滿足(otherwise)	老年	50歲及以上4. 執行過程示例
以原始數據中的兩行為例:
[1,周雅芙,85,...]  // age=85
[2,王從夢,26,...]  // age=26
第一行處理:
85 < 18? → 否
85 between 18-29? → 否
85 between 30-49? → 否
執行otherwise → "老年"第二行處理:
26 < 18? → 否
26 between 18-29? → 是 → "青年"

統計分析任務

統計不同學歷(edu_id)用戶的數量。

統計不同性別(gender_id)用戶的平均年齡。

統計擁有房產證數量大于0的用戶比例。

將處理結果保存為 Parquet 和 JSON 格式到 HDFS 指定目錄。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/88036.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/88036.shtml
英文地址,請注明出處:http://en.pswp.cn/web/88036.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【R語言】Can‘t subset elements that don‘t exist.

Error in select(): ? In argument: all_of(label_col). Caused by error in all_of(): ! Cant subset elements that dont exist. ? Element Label doesnt exist. Run rlang::last_trace() to see where the error occurred.原文中文解釋涉及關鍵詞Error in select()報錯發生…

Spring的依賴注入(xml)

引入 首先先明白&#xff0c;依賴注入描述的是在容器中建立bean與bean之間的依賴關系&#xff0c;本質就是將一個類中和別的類解耦的方式&#xff0c;就是把別的類&#xff0c;寫在成員變量位置&#xff0c;再對外提供可以給成員變量賦值的方法&#xff0c;外界就直接調用來給…

docker運行的一些常用命令

docker images 顯示可以加載的鏡像docker ps 顯示運行的docker容器 加-a顯示所有的容器docker run --name 容器名字 -d 鏡像名字docker start 容器名/ID 開啟容器docker stop 容器名/ID 關閉容器docker exec -it dock…

Django跨域

步驟 1&#xff1a;安裝 django-cors-headerspip install django-cors-headers步驟 2&#xff1a;修改 Django 配置 在 settings.py 中添加&#xff1a;INSTALLED_APPS [...,"corsheaders", # 新增 ]MIDDLEWARE [...,"corsheaders.middleware.CorsMiddleware…

20250706-10-Docker快速入門(下)-Harbor鏡像倉庫_筆記

一、Harbor鏡像倉庫搭建與使用1. Harbor概述&#xfeff;&#xfeff;定義: 由VMWare公司開源的容器鏡像倉庫系統技術基礎: 在Docker Registry基礎上進行企業級擴展核心特性:提供管理用戶界面(GUI)基于角色的訪問控制(RBAC)支持&#xfeff;AD/LDAP\mathrm{AD}/\mathrm{LDAP}AD…

JavaScript之數組方法詳解

JavaScript之數組方法詳解一、數組的創建與基礎特性1.1 數組的創建方式1.2 數組的核心特性二、修改原數組的方法2.1 添加/刪除元素2.1.1 push()&#xff1a;尾部添加元素2.1.2 pop()&#xff1a;尾部刪除元素2.1.3 unshift()&#xff1a;頭部添加元素2.1.4 shift()&#xff1a;…

品牌增長困局突圍:大模型時代,AI 如何幫我的品牌少走彎路?

AI時代對企業戰略的沖擊與機遇 在當今瞬息萬變的商業環境中&#xff0c;大模型的崛起正以前所未有的力量重塑著各行各業的競爭格局。傳統的市場營銷、品牌傳播模式正在被顛覆&#xff0c;消費者獲取信息、認知品牌的方式發生了根本性變化。如果說過去十年是“互聯網”的時代&am…

從單體到微服務:Spring Cloud 開篇與微服務設計

一、單體架構的核心痛點與微服務化目標 1. 單體架構的致命缺陷問題表現后果可維護性差百萬行代碼耦合&#xff0c;修改一處需全量測試迭代周期長&#xff0c;創新停滯擴展性受限無法按模塊獨立擴縮容&#xff08;如訂單模塊需擴容時&#xff0c;用戶模塊被迫一起擴容&#xff0…

篇二 OSI七層模型,TCP/IP四層模型,路由器與交換機原理

一 前言 本章節主要介紹OSI七層模型&#xff0c;TCP/IP四層模型劃分&#xff0c;以及日常使用的路由器&#xff0c;交換機的一些基礎知識 二 OSI 七層 OSI&#xff08;Open Systems Interconnection Model&#xff09;即開放式系統互聯模型&#xff0c;是國際標準化組織提出的&…

【JavaSE面試篇】Java集合部分高頻八股匯總

目錄 概念 1. 說說Java中的集合&#xff1f; 2. Java中的線程安全的集合有什么&#xff1f; 3. Collections和Collection的區別&#xff1f; 4. 集合遍歷的方法有哪些&#xff1f; List 5. 講一下java里面list的幾種實現&#xff0c;幾種實現有什么不同&#xff1f; 6.…

利用低空無人機影像進行樹種實例分割

在本項先導研究中,我們開發了一個基于低空無人機影像的本地樹種機器學習實例分割模型,用于生態調查。該實例分割包括單株樹冠的描繪和樹種的分類。我們利用無人機影像對20個樹種及其對應的學名進行了訓練,并收集了這些樹種的學名用于機器學習。為了評估該機器學習模型的準確…

二、Flutter基礎

目錄1. 什么是Widget&#xff1f;Flutter中的Widget分為哪幾類&#xff1f;2. StatelessWidget和StatefulWidget的區別3. StatefulWidget生命周期4. 什么是BuildContext&#xff1f;5. 如何優化Widget重建&#xff1f;6. Flutter布局機制7. Row/Column的主軸和交叉軸8. Expande…

設計模式筆記_創建型_建造者模式

1. 建造者模式介紹 建造者模式是一種創建型設計模式&#xff0c;旨在通過將復雜對象的構建過程與其表示分離&#xff0c;使得同樣的構建過程可以創建不同的表示。它通常用于構造步驟固定但具體實現可能變化的對象。 1.1 功能&#xff1a; 封裝復雜對象的創建過程&#xff1a;適…

【ROS2 自動駕駛學習】03-ROS2常用命令

目錄 1. ros2 pkg list 2. ros2 node list 3. ros2 node info 節點名稱 4. ros2 topic list 5. ros2 topic info 話題名 6. ros2 topic type 話題名 7. ros2 topic find 消息類型 8. ros2 service list 9. ros2 service type 服務名稱 10. ros2 service find 服…

MyBatis-Plus:提升數據庫操作效率的利器

在Java開發中&#xff0c;MyBatis是一個非常流行的持久層框架&#xff0c;它簡化了數據庫操作&#xff0c;提供了靈活的SQL映射功能。然而&#xff0c;隨著項目規模的擴大和業務復雜度的增加&#xff0c;開發者需要更高效、更便捷的方式來處理數據庫操作。MyBatis-Plus應運而生…

App爬蟲實戰篇-以華為真機手機爬取集換社的app為例

前言 在開始學習這篇文章之前,建議你先按照之前2篇文章(App爬蟲工具篇-Appium安裝和App爬蟲工具篇-appium配置),配置必要的環境,才可以繼續完成本章節內容。 電腦連接手機 可以通過usb連接電腦。如果通過adb devices命令,發現沒有連接上,就需要手動配置一些信息 華為…

Vue3組合式API應用:狀態共享與邏輯復用最佳實踐

Vue3組合式API應用&#xff1a;狀態共享與邏輯復用最佳實踐 在Vue3中&#xff0c;組合式API的引入為我們提供了一種全新的方式來編寫Vue組件&#xff0c;并有效地解決了混入和繁瑣邏輯復用的問題。本文將為您介紹如何在Vue3中使用組合式API來實現狀態共享與邏輯復用的最佳實踐&…

在linux 上使用tcpdump監聽http 端口的報文并分析

這里寫目錄標題 1. 使用 tcpdump(原始報文捕獲)觀察:報文翻譯與分析(按行解釋)第一段:客戶端請求報文HTTP 請求頭JSON 請求體第二段:服務器響應報文HTTP 響應頭響應體關鍵問題分析在 Linux 上監聽 HTTP 端口的報文,有多種工具可以實現。以下是幾種常用方法的詳細說明:…

XSStrike 進行 XSS 漏洞測試

XSStrike 是一個功能強大的 XSS 漏洞測試工具&#xff0c;專為檢測、驗證和利用反射型、存儲型、DOM型 XSS 漏洞而設計&#xff0c;適合配合手工測試&#xff0c;也可用于自動化發現。 &#x1f6e0;? 1. 安裝 XSStrike 確保系統中有 Python3 和 git&#xff1a; git clone ht…

any實現(基于LLVM中libcxx實現分析)

本文根據LLVM中libcxx的實現&#xff0c;分析了std::any和std::variant的具體實現。 1 簡介 在 C17 標準中&#xff0c;std::any提供了一種類型安全的方式來存儲任意類型的值。它使用類型擦除&#xff08;type erasure&#xff09;技術實現&#xff0c;使得一個對象可以包含任…