大數據分析與應用實驗任務九
實驗目的
-
進一步熟悉pyspark程序運行方式;
-
熟練掌握pysaprkRDD基本操作相關的方法、函數,解決基本問題。
實驗任務
進入pyspark實驗環境,打開命令行窗口,輸入pyspark,完成下列任務:
在實驗環境中自行選擇路徑新建以自己姓名拼音命名的文件夾,后續代碼中涉及的文件請保存到該文件夾下(需要時文件夾中可以創建新的文件夾)。
一、參考書中相應代碼,練習RDD持久性、分區及寫入文件(p64、67、80頁相應代碼)。
1.持久化
迭代計算經常需要多次重復使用同一組數據。下面就是多次計算同一個RDD的例子。
listlzy=["Hadoop","Spark","Hive","Darcy"]
rddlzy=sc.parallelize(listlzy)
print(rddlzy.count())#行動操作,觸發一次真正從頭到尾的計算
print(','.join(rddlzy.collect()))#行動操作,觸發一次真正從頭到尾的計算
一般而言,使用cache()方法時,會調用persist(MEMORY_ONLY)。針對上面的實例,增加持久化語句以后的執行過程如下:
listlzy=["Hadoop","Spark","Hive","Darcy"]
rdd=sc.parallelize(listlzy)
rdd.cache()#會調用persist(MEMORY_ONLY),但是,語句執行到這里,并不會緩存rdd,因為這時rdd還沒有被計算生成
print(rdd.count())#第一次行動操作,觸發一次真正從頭到尾的計算,這時上面的rdd.cache()才會被執行,把這個rdd放到緩存中
print(','.join(rdd.collect()))#第二次行動操作,不需要觸發從頭到尾的計算,只需要重復使用上面緩存中的rdd
2.分區
-
設置分區的個數
在調用textFile()和parallelize()方法的時候手動指定分區個數即可,語法格式如下:
sc.textFile(path,partitionNum)
其中,path參數用于指定要加載的文件的地址,partitionNum參數用于指定分區個數。下面是一個分區的實例。
listlzy=[5,2,0,1,3,1,4] rddlzy=sc.parallelize(listlzy,2)//設置兩個分區
-
使用repartition方法重新設置分區個數
通過轉換操作得到新RDD時,直接調用repartition方法即可。例如
datalzy=sc.parallelize([1,2,3,4,5],2)
len(datalzy.glom().collect())#顯示datalzy這個RDD的分區數量
rdd = datalzy.repartition(1) #對 data 這個 RDD 進行重新分區
len(rdd.glom().collect()) #顯示 rdd 這個 RDD 的分區數量
-
自定義分區方法
下面是一個實例,要求根據 key 值的最后一位數字將 key 寫入到不同的文件中,比如,10 寫入到 part-00000,11 寫入到 part-00001,12 寫入到 part-00002。打開一個 Linux 終端,使用 vim 編輯器創建一個代碼文件“/root/Desktop/luozhongye/TestPartitioner.py”,輸入以下代碼:
from pyspark import SparkConf, SparkContextdef MyPartitioner(key):print("MyPartitioner is running")print('The key is %d' % key)return key % 10def main():print("The main function is running")conf = SparkConf().setMaster("local").setAppName("MyApp")sc = SparkContext(conf=conf)data = sc.parallelize(range(10), 5)data.map(lambda x: (x, 1)).partitionBy(10, MyPartitioner).map(lambda x: x[0]).saveAsTextFile("file:///root/Desktop/luozhongye/partitioner")if __name__ == '__main__':main()
使用如下命令運行 TestPartitioner.py:
cd /root/Desktop/luozhongye
python3 TestPartitioner.py
或者,使用如下命令運行 TestPartitioner.py:
cd /root/Desktop/luozhongye
spark-submit TestPartitioner.py
程序運行后會返回如下信息:
3.文件數據寫入
- 把 RDD 寫入到文本文件中
textFile = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt")
textFile.saveAsTextFile("file:///root/Desktop/luozhongye/writeback")
其中wordlzy.txt的內容
Hadoop is good
Spark is fast
Spark is better
luozhongye is handsome
Spark 采用惰性機制。可以使用如下的“行動”類型的操作查看 textFile 中的內容:
textFile.first()
正因為 Spark 采用了惰性機制,在執行轉換操作的時候,即使輸入了錯誤的語句,pyspark 也不會馬上報錯,而是等到執行“行動”類型的語句啟動真正的計算時,“轉換”操作語句中的錯誤才會顯示出來,比如:
textFile = sc.textFile("file:///root/Desktop/luozhongye/wordcount/word123.txt")
- 把 RDD 寫入到文本文件中
可以使用 saveAsTextFile()方法把 RDD 中的數據保存到文本文件中。下面把 textFile 變量中的內容再次寫回到另外一個目錄 writeback 中,命令如下:
textFile = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt")
textFile.saveAsTextFile("file:///root/Desktop/luozhongye/writeback")
進入到“/root/Desktop/luozhongye/writeback”目錄查看
cd /root/Desktop/luozhongye/writeback
ls
二、逐行理解并運行4.4.2實例“文件排序”。
新建多個txt文件file1.txt 、file2.txt 、file3.txt ,其內容分別如下:
33
37
12
40
4
16
39
5
1
45
25
要求讀取所有文件中的整數,進行排序后,輸出到一個新的文件中,輸出的內容為每行兩個整數,第一個整數為第二個整數的排序位次,第二個整數為原待排序的整數。
實現上述功能的代碼文件“/root/Desktop/luozhongye/FileSort.py”的內容如下:
#!/usr/bin/env python3
from pyspark import SparkConf, SparkContextindex = 0def getindex():global indexindex += 1return indexdef main():conf = SparkConf().setMaster("local[1]").setAppName("FileSort")sc = SparkContext(conf=conf)lines = sc.textFile("file:///root/Desktop/luozhongye/file*.txt")index = 0result1 = lines.filter(lambda line: (len(line.strip()) > 0))result2 = result1.map(lambda x: (int(x.strip()), ""))result3 = result2.repartition(1)result4 = result3.sortByKey(True)result5 = result4.map(lambda x: x[0])result6 = result5.map(lambda x: (getindex(), x))result6.foreach(print)result6.saveAsTextFile("file:///root/Desktop/luozhongye/sortresult")if __name__ == '__main__':main()
三、完成p96實驗內容3,即“編寫獨立應用程序實現求平均值問題”,注意每位同學自己修改題目中的數據。
每個輸入文件表示班級學生某個學科的成績,每行內容由兩個字段組成,第一個字段是學生名字,第二個字段是學生的成績;編寫 Spark 獨立應用程序求出所有學生的平均成績,并輸出到一個新文件中。
數學成績.txt:
小羅 110
小紅 107
小新 100
小麗 99
英語成績.txt:
小羅 95
小紅 81
小新 82
小麗 76
政治成績.txt:
小羅 65
小紅 71
小新 61
小麗 66
408成績.txt:
小羅 100
小紅 103
小新 94
小麗 110
實現代碼如下:
from pyspark import SparkConf, SparkContext# 初始化Spark配置和上下文
conf = SparkConf().setAppName("AverageScore")
sc = SparkContext(conf=conf)# 讀取數學成績文件
math_rdd = sc.textFile("數學成績.txt").map(lambda x: (x.split()[0], int(x.split()[1])))# 讀取英語成績文件
english_rdd = sc.textFile("英語成績.txt").map(lambda x: (x.split()[0], int(x.split()[1])))# 讀取政治成績文件
politics_rdd = sc.textFile("政治成績.txt").map(lambda x: (x.split()[0], int(x.split()[1])))# 讀取408成績文件
computer_rdd = sc.textFile("408成績.txt").map(lambda x: (x.split()[0], int(x.split()[1])))# 合并所有成績數據
all_scores_rdd = math_rdd.union(english_rdd).union(politics_rdd).union(computer_rdd)# 計算每個學生的成績總和和成績數量
sum_count_rdd = all_scores_rdd.combineByKey(lambda value: (value, 1),lambda acc, value: (acc[0] + value, acc[1] + 1),lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))# 計算平均成績
average_scores_rdd = sum_count_rdd.mapValues(lambda x: x[0] / x[1])# 輸出到新文件
average_scores_rdd.saveAsTextFile("平均成績")# 關閉Spark上下文
sc.stop()
實驗心得
在這次實驗中,我進一步熟悉了使用PySpark進行大數據處理和分析的方法,并深入了解了PySpark RDD的基本操作。學會了分區、持久化、數據寫入文件,并解決實際問題。這次實驗讓我對PySpark有了更深入的理解,并增強了我處理和分析大數據的能力。