大數據分析與應用實驗任務十一

大數據分析與應用實驗任務十一

實驗目的

  • 通過實驗掌握spark Streaming相關對象的創建方法;

  • 熟悉spark Streaming對文件流、套接字流和RDD隊列流的數據接收處理方法;

  • 熟悉spark Streaming的轉換操作,包括無狀態和有狀態轉換。

  • 熟悉spark Streaming輸出編程操作。

實驗任務

一、DStream 操作概述
  1. 創建 StreamingContext 對象

    登錄 Linux 系統后,啟動 pyspark。進入 pyspark 以后,就已經獲得了一個默認的 SparkConext 對象,也就是 sc。因此,可以采用如下方式來創建 StreamingContext 對象:

    from pyspark.streaming import StreamingContext 
    sscluozhongye = StreamingContext(sc, 1)
    

    image-20231207112253827

    如果是編寫一個獨立的 Spark Streaming 程序,而不是在 pyspark 中運行,則需要在代碼文件中通過類似如下的方式創建 StreamingContext 對象:

    from pyspark import SparkContext, SparkConf 
    from pyspark.streaming import StreamingContext 
    conf = SparkConf() 
    conf.setAppName('TestDStream') 
    conf.setMaster('local[2]') 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 1)
    print("創建成功,lzy防偽")
    

    image-20231207112652285

二、基本輸入源
  1. 文件流
  • 在 pyspark 中創建文件流

    首先,在 Linux 系統中打開第 1 個終端(為了便于區分多個終端,這里記作“數據源終端”),創建一個 logfile 目錄,命令如下:

    cd /root/Desktop/luozhongye/
    mkdir streaming 
    cd streaming 
    mkdir logfile
    

    image-20231207112923323

    其次,在 Linux 系統中打開第二個終端(記作“流計算終端”),啟動進入 pyspark,然后,依次輸入如下語句:

    from pyspark import SparkContext 
    from pyspark.streaming import StreamingContext 
    ssc = StreamingContext(sc, 10) 
    lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') 
    words = lines.flatMap(lambda line: line.split(' ')) 
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) 
    wordCounts.pprint() 
    ssc.start() 
    ssc.awaitTermination()
    

image-20231207113305405

  • 采用獨立應用程序方式創建文件流

    #!/usr/bin/env python3 
    from pyspark import SparkContext, SparkConf 
    from pyspark.streaming import StreamingContext 
    conf = SparkConf() 
    conf.setAppName('TestDStream') 
    conf.setMaster('local[2]') 
    sc = SparkContext(conf = conf) 
    ssc = StreamingContext(sc, 10) 
    lines = ssc.textFileStream('file:///root/Desktop/luozhongye/streaming/logfile') 
    words = lines.flatMap(lambda line: line.split(' ')) 
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b) 
    wordCounts.pprint() 
    ssc.start() 
    ssc.awaitTermination()
    print("2023年12月7日lzy")
    

    保存該文件,并執行以下命令:

    cd /root/Desktop/luozhongye/streaming/logfile/ 
    spark-submit FileStreaming.py
    

image-20231207114014647

  1. 套接字流
  • 使用套接字流作為數據源

    新建一個代碼文件“/root/Desktop/luozhongye/streaming/socket/NetworkWordCount.py”,在NetworkWordCount.py 中輸入如下內容:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingNetworkWordCount")ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)counts.pprint()ssc.start()ssc.awaitTermination()
    

    使用如下 nc 命令生成一個 Socket 服務器端:

    nc -lk 9999
    

    新建一個終端(記作“流計算終端”),執行如下代碼啟動流計算:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
    

image-20231208002212790

  • 使用 Socket 編程實現自定義數據源

    新建一個代碼文件“/root/Desktop/luozhongye/streaming/socket/DataSourceSocket.py”,在 DataSourceSocket.py 中輸入如下代碼:

    #!/usr/bin/env python3 
    import socket# 生成 socket 對象
    server = socket.socket()
    # 綁定 ip 和端口
    server.bind(('localhost', 9999))
    # 監聽綁定的端口
    server.listen(1)
    while 1:# 為了方便識別,打印一個“I’m waiting the connect...”print("I'm waiting the connect...")# 這里用兩個值接收,因為連接上之后使用的是客戶端發來請求的這個實例# 所以下面的傳輸要使用 conn 實例操作conn, addr = server.accept()# 打印連接成功print("Connect success! Connection is from %s " % addr[0])# 打印正在發送數據print('Sending data...')conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())conn.close()print('Connection is broken.')
    print("2023年12月7日lzy")
    

    執行如下命令啟動 Socket 服務器端:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit DataSourceSocket.py
    

    新建一個終端(記作“流計算終端”),輸入以下命令啟動 NetworkWordCount 程序:

    cd /root/Desktop/luozhongye/streaming/socket 
    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
    

image-20231208003303167

  1. RDD 隊列流

    Linux 系統中打開一個終端,新建一個代碼文件“/root/Desktop/luozhongye/ streaming/rddqueue/ RDDQueueStream.py”,輸入以下代碼:

    #!/usr/bin/env python3 
    import time
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":print("")sc = SparkContext(appName="PythonStreamingQueueStream")ssc = StreamingContext(sc, 2)# 創建一個隊列,通過該隊列可以把 RDD 推給一個 RDD 隊列流rddQueue = []for i in range(5):rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]time.sleep(1)# 創建一個 RDD 隊列流inputStream = ssc.queueStream(rddQueue)mappedStream = inputStream.map(lambda x: (x % 10, 1))reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)reducedStream.pprint()ssc.start()ssc.stop(stopSparkContext=True, stopGraceFully=True)
    

    下面執行如下命令運行該程序:

    cd /root/Desktop/luozhongye/streaming/rddqueue 
    /usr/local/spark/bin/spark-submit RDDQueueStream.py
    

image-20231208004439462

三、轉換操作
  1. 滑動窗口轉換操作

    對“套接字流”中的代碼 NetworkWordCount.py 進行一個小的修改,得到新的代碼文件“/root/Desktop/luozhongye/streaming/socket/WindowedNetworkWordCount.py”,其內容如下:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")ssc = StreamingContext(sc, 10)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/socket/checkpoint")lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)counts.pprint()ssc.start()ssc.awaitTermination()
    

為了測試程序的運行效果,首先新建一個終端(記作“數據源終端”),執行如下命令運行nc 程序:

   cd /root/Desktop/luozhongye/streaming/socket/ nc -lk 9999

然后,再新建一個終端(記作“流計算終端”),運行客戶端程序 WindowedNetworkWordCount.py,命令如下:

   cd /root/Desktop/luozhongye/streaming/socket/ /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999

在數據源終端內,連續輸入 10 個“hadoop”,每個 hadoop 單獨占一行(即每輸入一個 hadoop就按回車鍵),再連續輸入 10 個“spark”,每個 spark 單獨占一行。這時,可以查看流計算終端內顯示的詞頻動態統計結果,可以看到,隨著時間的流逝,詞頻統計結果會發生動態變化。

image-20231208005821701

  1. updateStateByKey 操作

    在“/root/Desktop/luozhongye/streaming/stateful/”目錄下新建一個代碼文件 NetworkWordCountStateful.py,輸入以下代碼:

    #!/usr/bin/env python3 
    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")# RDD with initial state (key, value) pairsinitialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD=initialStateRDD)running_counts.pprint()ssc.start()ssc.awaitTermination()
    

    新建一個終端(記作“數據源終端”),執行如下命令啟動 nc 程序:

    nc -lk 9999
    

    新建一個 Linux 終端(記作“流計算終端”),執行如下命令提交運行程序:

    cd /root/Desktop/luozhongye/streaming/stateful 
    /usr/local/spark/bin/spark-submit NetworkWordCountStateful.py localhost 9999
    

image-20231208010814959

四、把 DStream 輸出到文本文件中

下面對之前已經得到的“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStateful.py”代碼進行簡單的修改,把生成的詞頻統計結果寫入文本文件中。

修改后得到的新代碼文件“/root/Desktop/luozhongye/streaming/stateful/NetworkWordCountStatefulText.py”的內容如下:

#!/usr/bin/env python3 
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContextif __name__ == "__main__":if len(sys.argv) != 3:print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")ssc = StreamingContext(sc, 1)ssc.checkpoint("file:///root/Desktop/luozhongye/streaming/stateful/")# RDD with initial state (key, value) pairs initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])def updateFunc(new_values, last_sum):return sum(new_values) + (last_sum or 0)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))running_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.updateStateByKey(updateFunc, initialRDD=initialStateRDD)running_counts.saveAsTextFiles("file:///root/Desktop/luozhongye/streaming/stateful/output")running_counts.pprint()ssc.start()ssc.awaitTermination()

新建一個終端(記作“數據源終端”),執行如下命令運行nc 程序:

cd /root/Desktop/luozhongye/streaming/socket/ 
nc -lk 9999

新建一個 Linux 終端(記作“流計算終端”),執行如下命令提交運行程序:

cd /root/Desktop/luozhongye/streaming/stateful 
/usr/local/spark/bin/spark-submit NetworkWordCountStatefulText.py localhost 9999

image-20231208012123002

實驗心得

通過本次實驗,我深入理解了Spark Streaming,包括創建StreamingContext、DStream等對象。同時,我了解了Spark Streaming對不同類型數據流的處理方式,如文件流、套接字流和RDD隊列流。此外,我還熟悉了Spark Streaming的轉換操作和輸出編程操作,并掌握了map、flatMap、filter等方法。最后,我能夠自定義輸出方式和格式。總之,這次實驗讓我全面了解了Spark Streaming,對未來的學習和工作有很大的幫助。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/214226.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/214226.shtml
英文地址,請注明出處:http://en.pswp.cn/news/214226.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux 驅動開發需要掌握哪些編程語言和技術?

Linux 驅動開發需要掌握哪些編程語言和技術&#xff1f; 在開始前我有一些資料&#xff0c;是我根據自己從業十年經驗&#xff0c;熬夜搞了幾個通宵&#xff0c;精心整理了一份「Linux從專業入門到高級教程工具包」&#xff0c;點個關注&#xff0c;全部無償共享給大家&#xf…

1. mycat入門

1、mycat介紹 Mycat 是一個開源的分布式數據庫系統&#xff0c;但是由于真正的數據庫需要存儲引擎&#xff0c;而 Mycat 并沒有存 儲引擎&#xff0c;所以并不是完全意義的分布式數據庫系統。MyCat是目前最流行的基于Java語言編寫的數據庫中間件&#xff0c;也可以理解為是數據…

鴻蒙HarmonyOS4.0 入門與實戰

一、開發準備: 熟悉鴻蒙官網安裝DevEco Studio熟悉鴻蒙官網 HarmonyOS應用開發官網 - 華為HarmonyOS打造全場景新服務 應用設計相關資源: 開發相關資源: 例如開發工具 DevEco Studio 的下載 應用發布: 開發文檔:

3易懂AI深度學習算法:長短期記憶網絡(Long Short-Term Memory, LSTM)生成對抗網絡 優化算法進化算法

繼續寫&#xff1a;https://blog.csdn.net/chenhao0568/article/details/134920391?spm1001.2014.3001.5502 1.https://blog.csdn.net/chenhao0568/article/details/134931993?spm1001.2014.3001.5502 2.https://blog.csdn.net/chenhao0568/article/details/134932800?spm10…

LeetCode 1631. 最小體力消耗路徑:廣度優先搜索BFS

【LetMeFly】1631.最小體力消耗路徑&#xff1a;廣度優先搜索BFS 力扣題目鏈接&#xff1a;https://leetcode.cn/problems/path-with-minimum-effort/ 你準備參加一場遠足活動。給你一個二維 rows x columns 的地圖 heights &#xff0c;其中 heights[row][col] 表示格子 (ro…

視頻如何提取文字?這四個方法一鍵提取視頻文案

視頻如何提取文字&#xff1f;你用過哪些視頻提取工具&#xff1f;視頻轉文字工具&#xff0c;又稱為語音識別軟件&#xff0c;是一款能夠將視頻中的語音或對話轉化為文字的實用工具。它運用了尖端的聲音識別和語言理解技術&#xff0c;能精準地捕捉視頻中的音頻&#xff0c;并…

弧形導軌的工作原理

弧形導軌是一種能夠將物體沿著弧形軌道運動的裝置&#xff0c;它由個弧形軌道和沿著軌道運動的物體組成&#xff0c;弧形導軌的工作原理是利用軌道的形狀和物體的運動方式來實現運動&#xff0c;當物體處于軌道上時&#xff0c;它會受到軌道的引導&#xff0c;從而沿著軌道的弧…

Nginx正則表達式

目錄 1.nginx常用的正則表達式 2.location location 大致可以分為三類 location 常用的匹配規則 location 優先級 location 示例說明 優先級總結 3.rewrite rewrite功能 rewrite跳轉實現 rewrite執行順序 語法格式 rewrite示例 實例1&#xff1a; 實例2&#xf…

生活小記錄

上個月項目總算上線了&#xff0c;節奏也慢慢調整正常。發現自己好久沒有記錄生活點滴了&#xff0c;正好寫寫。其實&#xff0c;最近這段日子發生的事情還是挺多的。 流感 媳婦11.24得流感&#xff0c;這件事情特別好笑&#xff0c;大晚上她和我妹妹想喝酒試試&#xff0c;結…

【Python必做100題】之第六題(求圓的周長)

圓的周長公式&#xff1a;C 2 * pi * r 代碼如下&#xff1a; pi 3.14 r float(input("請輸入圓的半徑&#xff1a;")) c 2 * pi *r print(f"圓的周長為{c}") 運行截圖&#xff1a; 總結 1、圓周長的公式&#xff1a;C 2 * pi * r 2、輸出結果注意…

webrtc 工具類

直接上代碼&#xff1b;webrtc 工具類 package com.example.mqttdome;import android.app.Activity; import android.content.Context; import android.content.Intent; import android.media.projection.MediaProjection; import android.media.projection.MediaProjectionMa…

API低代碼開發平臺的實際應用及好處

API低代碼開發平臺是一種快速開發工具&#xff0c;可以幫助企業快速構建和部署應用程序&#xff0c;并提供易于使用的API集成。 實際應用 API低代碼開發平臺的應用范圍非常廣泛&#xff0c;包括但不限于以下幾個方面&#xff1a; 企業級應用程序開發&#xff1a;API低代碼開發…

TypeScript中的類型縮小、類型謂詞

一. 概覽 TypeScript中的類型縮小的方式有typeof、in等方式進行類型縮小。 二. 類型縮小 typeof function test(a: string| number | string []) {if(a) {if(typeof a string) {} else if(typeof a number) {}} }in關鍵字 nterface ISideBar {hide: () >void }interf…

mybatis-plus查詢的字段和mysql關鍵字重名

先看一下這個 TableField("show") 這個注解表示當前屬性對應在數據庫的字段為show&#xff0c;但是show在mysql中為關鍵字&#xff0c;直接查詢會導致語法錯誤 正確寫法應該是 但寫sql由和mybatis-plus理念相違背&#xff0c; 并且無法輕松創建對應方法&#xff0…

第8課 SQL入門之使用數據處理函數

文章目錄 8.1 函數8.2 使用函數8.2.1 文本處理函數8.2.2 日期和時間處理函數8.2.3 數值處理函數 表8-3 常用數值處理函數 這一課介紹什么是函數&#xff0c;DBMS支持何種函數&#xff0c;以及如何使用這些函數&#xff1b;還將講解為什么SQL函數的使用可能會帶來問題。 8.1 函數…

數據結構之----邏輯結構、物理結構

數據結構之----邏輯結構、物理結構 目前我們常見的數據結構分別有&#xff1a; 數組、鏈表、棧、隊列、哈希表、樹、堆、圖 而它們可以從 邏輯結構和物理結構兩個維度進行分類。 什么是邏輯結構&#xff1f; 邏輯結構是指數據元素之間的邏輯關系&#xff0c;而邏輯結構又分為…

HCIA-H12-811題目解析(5)

1、【單選題】 以下關于Hybrid端口說法正確的有&#xff1f; 2、【單選題】使用命令"vlan batch 10 20"和"valn batch 10 to 20"&#xff0c;分別能創建的vlan數量是&#xff1f;&#xff08;&#xff09; 3、【單選題】二層ACL的編號范圍是&#xff1f;…

Scala日志log4j,序列化Gson

一、日志輸出log4j 1. Scala中配置log4j依賴 對于 Maven 項目,可以在 pom.xml 文件中添加以下內容: <dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version> </dependency>2.創建…

VueUse工具庫

VueUse VueUse不是Vue.use&#xff0c;它是為Vue 2和3服務的一套Vue Composition API的常用工具集&#xff0c;是目前世界上Star最高的同類型庫之一。它的初衷就是將一切原本并不支持響應式的JS API變得支持響應式&#xff0c;省去程序員自己寫相關代碼。 VueUse 是一個基于 …

Java畢業設計 SSM SpringBoot 在線學習系統

Java畢業設計 SSM SpringBoot 在線學習系統 SSM SpringBoot 在線學習系統 功能介紹 首頁 圖片輪播 視頻推薦 在線學習 學習介紹 評論 收藏 資料中心 資料詳情 下載資料 話題討論 文檔發布 試題中心 系統公告 登錄 注冊學生 個人中心 試題記錄 錯題本 我的收藏 算法演示 結果分…