大數據分析與應用實驗任務十一
實驗目的
-
通過實驗掌握spark Streaming相關對象的創建方法;
-
熟悉spark Streaming對文件流、套接字流和RDD隊列流的數據接收處理方法;
-
熟悉spark Streaming的轉換操作,包括無狀態和有狀態轉換。
-
熟悉spark Streaming輸出編程操作。
實驗任務
一、DStream 操作概述
-
創建 StreamingContext 對象
登錄 Linux 系統后,啟動 pyspark。進入 pyspark 以后,就已經獲得了一個默認的 SparkConext 對象,也就是 sc。因此,可以采用如下方式來創建 StreamingContext 對象:
from pyspark.streaming import StreamingContext sscluozhongye = StreamingContext(sc, 1)
如果是編寫一個獨立的 Spark Streaming 程序,而不是在 pyspark 中運行,則需要在代碼文件中通過類似如下的方式創建 StreamingContext 對象:
from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext conf = SparkConf() conf.setAppName('TestDStream') conf.setMaster('local[2]') sc = SparkContext(conf = conf) ssc = StreamingContext(sc, 1) print("創建成功,lzy防偽")
二、基本輸入源
- 文件流
-
在 pyspark 中創建文件流
首先,在 Linux 系統中打開第 1 個終端(為了便于區分多個終端,這里記作“數據源終端”),創建一個 logfile 目錄,命令如下:
cd /root/Desktop/luozhongye/ mkdir streaming cd streaming mkdir logfile
其次,在 Linux 系統中打開第二個終端(記作“流計算終端”),啟動進入 pyspark,然后,依次輸入如下語句:
from pyspark import SparkContext from pyspark.streaming import StreamingContext ssc = StreamingContext(sc, 10) lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') words = lines.flatMap(lambda line: line.split(' ')) wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) wordCounts.pprint() ssc.start() ssc.awaitTermination()
-
采用獨立應用程序方式創建文件流
#!/usr/bin/env python3 from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext conf = SparkConf() conf.setAppName('TestDStream') conf.setMaster('local[2]') sc = SparkContext(conf = conf) ssc = StreamingContext(sc, 10) lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') words = lines.flatMap(lambda line: line.split(' ')) wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) wordCounts.pprint() ssc.start() ssc.awaitTermination() print("2023年12月7日lzy")
保存該文件,并執行以下命令:
cd /root/Desktop/luozhongye/streaming/logfile/ spark-submit FileStreaming.py
- 套接字流
-
使用套接字流作為數據源
新建一個代碼文件“/root/Desktop/luozhongye/streaming/socket/NetworkWordCount.py”,在NetworkWordCount.py 中輸入如下內容:
#!/usr/bin/env python3 from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingNetworkWordCount")ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)counts.pprint()ssc.start()ssc.awaitTermination()
使用如下 nc 命令生成一個 Socket 服務器端:
nc -lk 9999
新建一個終端(記作“流計算終端”),執行如下代碼啟動流計算:
cd /root/Desktop/luozhongye/streaming/socket /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
-
使用 Socket 編程實現自定義數據源
新建一個代碼文件“/root/Desktop/luozhongye/streaming/socket/DataSourceSocket.py”,在 DataSourceSocket.py 中輸入如下代碼:
#!/usr/bin/env python3 import socket# 生成 socket 對象 server = socket.socket() # 綁定 ip 和端口 server.bind(('localhost', 9999)) # 監聽綁定的端口 server.listen(1) while 1:# 為了方便識別,打印一個“I’m waiting the connect...”print("I'm waiting the connect...")# 這里用兩個值接收,因為連接上之后使用的是客戶端發來請求的這個實例# 所以下面的傳輸要使用 conn 實例操作conn, addr = server.accept()# 打印連接成功print("Connect success! Connection is from %s " % addr[0])# 打印正在發送數據print('Sending data...')conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())conn.close()print('Connection is broken.') print("2023年12月7日lzy")
執行如下命令啟動 Socket 服務器端:
cd /root/Desktop/luozhongye/streaming/socket /usr/local/spark/bin/spark-submit DataSourceSocket.py
新建一個終端(記作“流計算終端”),輸入以下命令啟動 NetworkWordCount 程序:
cd /root/Desktop/luozhongye/streaming/socket /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
-
RDD 隊列流
Linux 系統中打開一個終端,新建一個代碼文件“/root/Desktop/luozhongye/ streaming/rddqueue/ RDDQueueStream.py”,輸入以下代碼:
#!/usr/bin/env python3 import time from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ == "__main__":print("")sc = SparkContext(appName="PythonStreamingQueueStream")ssc = StreamingContext(sc, 2)# 創建一個隊列,通過該隊列可以把 RDD 推給一個 RDD 隊列流rddQueue = []for i in range(5):rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]time.sleep(1)# 創建一個 RDD 隊列流inputStream = ssc.queueStream(rddQueue)mappedStream = inputStream.map(lambda x: (x % 10, 1))reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)reducedStream.pprint()ssc.start()ssc.stop(stopSparkContext=True, stopGraceFully=True)
下面執行如下命令運行該程序:
cd /root/Desktop/luozhongye/streaming/rddqueue /usr/local/spark/bin/spark-submit RDDQueueStream.py
三、轉換操作
-
滑動窗口轉換操作
對“套接字流”中的代碼 NetworkWordCount.py 進行一個小的修改,得到新的代碼文件“/root/Desktop/luozhongye/streaming/socket/WindowedNetworkWordCount.py”,其內容如下:
#!/usr/bin/env python3 from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")ssc = StreamingContext(sc, 10)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/socket/checkpoint")lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)counts.pprint()ssc.start()ssc.awaitTermination()
為了測試程序的運行效果,首先新建一個終端(記作“數據源終端”),執行如下命令運行nc 程序:
cd /root/Desktop/luozhongye/streaming/socket/ nc -lk 9999
然后,再新建一個終端(記作“流計算終端”),運行客戶端程序 WindowedNetworkWordCount.py,命令如下:
cd /root/Desktop/luozhongye/streaming/socket/ /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999
在數據源終端內,連續輸入 10 個“hadoop”,每個 hadoop 單獨占一行(即每輸入一個 hadoop就按回車鍵),再連續輸入 10 個“spark”,每個 spark 單獨占一行。這時,可以查看流計算終端內顯示的詞頻動態統計結果,可以看到,隨著時間的流逝,詞頻統計結果會發生動態變化。
-
updateStateByKey 操作
在“/root/Desktop/luozhongye/streaming/stateful/”目錄下新建一個代碼文件 NetworkWordCountStateful.py,輸入以下代碼:
#!/usr/bin/env python3 from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")# RDD with initial state (key, value) pairsinitialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD=initialStateRDD)running_counts.pprint()ssc.start()ssc.awaitTermination()
新建一個終端(記作“數據源終端”),執行如下命令啟動 nc 程序:
nc -lk 9999
新建一個 Linux 終端(記作“流計算終端”),執行如下命令提交運行程序:
cd /root/Desktop/luozhongye/streaming/stateful /usr/local/spark/bin/spark-submit NetworkWordCountStateful.py localhost 9999
四、把 DStream 輸出到文本文件中
下面對之前已經得到的“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStateful.py”代碼進行簡單的修改,把生成的詞頻統計結果寫入文本文件中。
修改后得到的新代碼文件“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStatefulText.py”的內容如下:
#!/usr/bin/env python3
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")# RDD with initial state (key, value) pairs initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD=initialStateRDD)running_counts.saveAsTextFiles("file:///root/Desktop/luozhongye/streaming/stateful/output")running_counts.pprint()ssc.start()ssc.awaitTermination()
新建一個終端(記作“數據源終端”),執行如下命令運行nc 程序:
cd /root/Desktop/luozhongye/streaming/socket/
nc -lk 9999
新建一個 Linux 終端(記作“流計算終端”),執行如下命令提交運行程序:
cd /root/Desktop/luozhongye/streaming/stateful
/usr/local/spark/bin/spark-submit NetworkWordCountStatefulText.py localhost 9999
實驗心得
通過本次實驗,我深入理解了Spark Streaming,包括創建StreamingContext、DStream等對象。同時,我了解了Spark Streaming對不同類型數據流的處理方式,如文件流、套接字流和RDD隊列流。此外,我還熟悉了Spark Streaming的轉換操作和輸出編程操作,并掌握了map、flatMap、filter等方法。最后,我能夠自定義輸出方式和格式。總之,這次實驗讓我全面了解了Spark Streaming,對未來的學習和工作有很大的幫助。