使用Spark處理數據文件
檢查數據
檢查$DATA_EXERCISE/activations
里的數據,每個XML文件包含了客戶在指定月份活躍的設備數據。
拷貝數據到HDFS的/dw
目錄
樣本數據示例:
<activations><activation timestamp="1225499258" type="phone"><account-number>316</account-number><device-id>d61b6971-33e1-42f0-bb15-aa2ae3cd8680</device-id><phone-number>5108307062</phone-number><model>iFruit 1</model></activation>…
</activations>
處理文件
讀取XML文件并抽取賬戶號和設備型號,把結果保存到/dw/account-models
,格式為account_number:model
。
輸出示例:
1234:iFruit 1
987:Sorrento F00L
4566:iFruit 1
…
提供了解析XML的函數如下:
// Stub code to copy into Spark Shellimport scala.xml._// Given a string containing XML, parse the string, and
// return an iterator of activation XML records (Nodes) contained in the stringdef getActivations(xmlstring: String): Iterator[Node] = {val nodes = XML.loadString(xmlstring) \\ "activation"nodes.toIterator
}// Given an activation record (XML Node), return the model name
def getModel(activation: Node): String = {(activation \ "model").text
}// Given an activation record (XML Node), return the account number
def getAccount(activation: Node): String = {(activation \ "account-number").text
}
上傳數據
# 1. 檢查并創建HDFS目錄
hdfs dfs -mkdir -p /dw# 2. 將本地數據上傳到HDFS(替換$DATA_EXERCISE為實際路徑)
hdfs dfs -put $DATA_EXERCISE/activations /dw/# 3. 檢查文件是否上傳成功
hdfs dfs -ls /dw/activations
定義題目提供的解析函數
def getActivations(xmlstring: String): Iterator[Node] = {(XML.loadString(xmlstring) \\ "activation").toIterator
}def getModel(activation: Node): String = (activation \ "model").text
def getAccount(activation: Node): String = (activation \ "account-number").text
讀取數據(像處理日志一樣)
val xmlRDD = sc.wholeTextFiles("/dw/activations/*.xml")
測試解析(查看第一條記錄)
val firstRecord = getActivations(xmlRDD.first()._2).next()
println(s"測試解析結果: ${getAccount(firstRecord)}:${getModel(firstRecord)}")
處理全部數據
val resultRDD = xmlRDD.flatMap { case (_, xml) => getActivations(xml).map(act => s"${getAccount(act)}:${getModel(act)}")
}
查看結果樣例(10條)
resultRDD.take(10).foreach(println)
保存結果(先清理舊數據)
import org.apache.hadoop.fs._
val outputPath = "/dw/account-models"
val fs = FileSystem.get(sc.hadoopConfiguration)
if (fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath), true)resultRDD.saveAsTextFile(outputPath)
println(s"結果已保存到 hdfs://$outputPath")
驗證結果(在Linux終端執行)
# 查看輸出結果
hdfs dfs -cat /dw/account-models/part-* | head -n 10# 如果需要合并結果到單個文件
hdfs dfs -getmerge /dw/account-models ./account_models.txt
head account_models.txt