Spark-Streaming有狀態計算

一、上下文

《Spark-Streaming初識》中的NetworkWordCount示例只能統計每個微批下的單詞的數量,那么如何才能統計從開始加載數據到當下的所有數量呢?下面我們就來通過官方例子學習下Spark-Streaming有狀態計算。

二、官方例子

所屬包:org.apache.spark.examples.streaming

object StatefulNetworkWordCount {def main(args: Array[String]): Unit = {if (args.length < 2) {System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")System.exit(1)}StreamingExamples.setStreamingLogLevels()val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")//創建微批為 1 秒的上下文val ssc = new StreamingContext(sparkConf, Seconds(1))//指定 checkpoint 目錄ssc.checkpoint(".")// 用一個 List 初始化一個 RDDval initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))// 在目標ip:port上創建一個ReceiverInputDStream,并對分隔測試的輸入流中的單詞進行計數(例如由'nc'生成)val lines = ssc.socketTextStream(args(0), args(1).toInt)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))// 使用mapWithState更新累積計數這將給出一個由狀態組成的DStream(即單詞的累積計數)val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {val sum = one.getOrElse(0) + state.getOption.getOrElse(0)val output = (word, sum)state.update(sum)output}val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))stateDstream.print()ssc.start()ssc.awaitTermination()}
}

三、分析

1、構建SparkConf

它是Spark應用程序的配置,用于設置Spark的各種參數。支持鏈式設置

new SparkConf().setMaster("local").setAppName("My app")

?一旦SparkConf對象傳遞給Spark,用戶就不能再對其進行修改。Spark不支持在運行時修改配置

2、構建StreamingContext

它是Spark Streaming功能的主要入口點,且提供了從各種輸入源創建[[org.apache.spark.streaming.dstream.DStream]] 的方法。

創建和轉換DStreams后,可以分別使用start()、stop()啟動和停止流計算,awaitTermination()允許當前線程通過stop()或異常等待上下文的終止。

3、設置checkpoint

StreamingContext最終還是通過SparkContext來設置checkpoint,但其實都是為各自的checkpointDir設置checkpoint路徑,在有狀態計算中checkpoint是必須的。

所謂有狀態計算就必須要把歷史狀態給存儲下來,spark中使用使用checkpoint來實現這個存儲,每個微批的數據的計算都要更新到歷史狀態中。

class SparkContext(config: SparkConf) extends Logging {private[spark] var checkpointDir: Option[String] = None}
class StreamingContext private[streaming] (_sc: SparkContext,_cp: Checkpoint,_batchDur: Duration) extends Logging {private[streaming] var checkpointDir: String = {if (isCheckpointPresent) {sc.setCheckpointDir(_cp.checkpointDir)_cp.checkpointDir} else {null}}}

4、初始化一個RDD

為什么要初始化一個RDD呢?我們看看下面是如何用到的。

5、創建一個ReceiverInputDStream

這里是從TCP源hostname:port創建輸入流。使用TCP套接字接收數據,并使用給定的轉換器將接收字節解釋為對象

6、處理單詞

從源碼中可以看出會把這樣的文本

hadoop spark flink kafka hadoop spark-streaming

處理成這樣的格式

hadoop 1

spark 1

flink 1

kafka 1

hadoop 1

spark-streaming 1

6、使用mapWithState更新累積計數

該算子可以維護并更新每個key的狀態。

這里用到一個新對象:StateSpec,且用到了它的兩個方法,initialState和function

initialState:設置包含“mapWithState”將使用的初始狀態的RDD`

function:設置實際的狀態更新操作

//第1個參數:狀態 key 的類別
//第2個參數:狀態 value 的類別
//第3個參數:狀態 數據 的類別
//第4個參數:狀態 處理完要返回 的類別
def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {// 使用state.exists()、state.get()、state.update()和state.remove()來管理狀態,并返回必要的字符串
}

四、運行

運行Netcat

nc -lk 9999

新建一個窗口運行官方例子

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount cdh1 9999


大多數高校碩博生畢業要求需要參加學術會議,發表EI或者SCI檢索的學術論文會議論文:
可訪問艾思科藍官網,瀏覽即將召開的學術會議列表。會議如下:

第四屆大數據、信息與計算機網絡國際學術會議(BDICN 2025)

  • 廣州
  • https://ais.cn/u/fi2yym

第四屆電子信息工程、大數據與計算機技術國際學術會議(EIBDCT 2025)

  • 青島
  • https://ais.cn/u/nuQr6f

第六屆大數據與信息化教育國際學術會議(ICBDIE 2025)

  • 蘇州
  • https://ais.cn/u/eYnmQr

第三屆通信網絡與機器學習國際學術會議(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/64932.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/64932.shtml
英文地址,請注明出處:http://en.pswp.cn/web/64932.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python 3 輸入與輸出指南

文章目錄 1. 輸入與 input()示例&#xff1a;提示&#xff1a; 2. 輸出與 print()基本用法&#xff1a;格式化輸出&#xff1a;使用 f-string&#xff08;推薦&#xff09;&#xff1a;使用 str.format()&#xff1a;使用占位符&#xff1a; print() 的關鍵參數&#xff1a; 3.…

【SQLi_Labs】Basic Challenges

什么是人生&#xff1f;人生就是永不休止的奮斗&#xff01; Less-1 嘗試添加’注入&#xff0c;發現報錯 這里我們就可以直接發現報錯的地方&#xff0c;直接將后面注釋&#xff0c;然后使用 1’ order by 3%23 //得到列數為3 //這里用-1是為了查詢一個不存在的id,好讓第一…

Swift Combine 學習(四):操作符 Operator

Swift Combine 學習&#xff08;一&#xff09;&#xff1a;Combine 初印象Swift Combine 學習&#xff08;二&#xff09;&#xff1a;發布者 PublisherSwift Combine 學習&#xff08;三&#xff09;&#xff1a;Subscription和 SubscriberSwift Combine 學習&#xff08;四&…

時間序列預測算法---LSTM

目錄 一、前言1.1、深度學習時間序列一般是幾維數據&#xff1f;每個維度的名字是什么&#xff1f;通常代表什么含義&#xff1f;1.2、為什么機器學習/深度學習算法無法處理時間序列數據?1.3、RNN(循環神經網絡)處理時間序列數據的思路&#xff1f;1.4、RNN存在哪些問題? 二、…

leetcode題目(3)

目錄 1.加一 2.二進制求和 3.x的平方根 4.爬樓梯 5.顏色分類 6.二叉樹的中序遍歷 1.加一 https://leetcode.cn/problems/plus-one/ class Solution { public:vector<int> plusOne(vector<int>& digits) {int n digits.size();for(int i n -1;i>0;-…

快速上手LangChain(三)構建檢索增強生成(RAG)應用

文章目錄 快速上手LangChain(三)構建檢索增強生成(RAG)應用概述索引阿里嵌入模型 Embedding檢索和生成RAG應用(demo:根據我的博客主頁,分析一下我的技術棧)快速上手LangChain(三)構建檢索增強生成(RAG)應用 langchain官方文檔:https://python.langchain.ac.cn/do…

[cg] android studio 無法調試cpp問題

折騰了好久&#xff0c;native cpp庫無法調試問題&#xff0c;原因 下面的Deploy 需要選Apk from app bundle!! 另外就是指定Debug type為Dual&#xff0c;并在Symbol Directories 指定native cpp的so路徑 UE項目調試&#xff1a; 使用Android Studio調試虛幻引擎Android項目…

【Windows】powershell 設置執行策略(Execution Policy)禁止了腳本的運行

報錯信息&#xff1a; 無法加載文件 C:\Users\11726\Documents\WindowsPowerShell\profile.ps1&#xff0c;因為在此系統上禁止運行腳本。有關詳細信息&#xff0c;請參 閱 https:/go.microsoft.com/fwlink/?LinkID135170 中的 about_Execution_Policies。 所在位置 行:1 字符…

可編輯37頁PPT |“數據湖”構建汽車集團數據中臺

薦言分享&#xff1a;隨著汽車行業智能化、網聯化的快速發展&#xff0c;數據已成為車企經營決策、優化生產、整合供應鏈的核心資源。為了在激烈的市場競爭中占據先機&#xff0c;汽車集團亟需構建一個高效、可擴展的數據管理平臺&#xff0c;以實現對海量數據的收集、存儲、處…

【快速實踐】類激活圖(CAM,class activation map)可視化

類激活圖可視化&#xff1a;有助于了解一張圖像的哪一部分讓卷積神經網絡做出了最終的分類決策 對輸入圖像生成類激活熱力圖類激活熱力圖是與特定輸出類別相關的二維分數網格&#xff1a;對任何輸入圖像的每個位置都要進行計算&#xff0c;它表示每個位置對該類別的重要程度 我…

ros2 py文件間函數調用

文章目錄 寫在前面的話生成python工程包命令運行python函數命令python工程包的目錄結構目錄結構&#xff08;細節&#xff09; 報錯 1&#xff08; no module name ***&#xff09;錯誤示意 截圖終端輸出解決方法 報錯 2&#xff08; AttributeError: *** object has no attrib…

Milvus×合邦電力:向量數據庫如何提升15%電價預測精度

01. 全球能源市場化改革下的合邦電力 在全球能源轉型和市場化改革的大背景下&#xff0c;電力交易市場正逐漸成為優化資源配置、提升系統效率的關鍵平臺。電力交易通過市場化手段&#xff0c;促進了電力資源的有效分配&#xff0c;為電力行業的可持續發展提供了動力。 合邦電力…

OLED的顯示

一、I2C I2C時序&#xff1a;時鐘線SCL高電平下&#xff1a;SDA由高變低代表啟動信號&#xff0c;開始發送數據&#xff1b;SCL高電平時&#xff0c;數據穩定&#xff0c;數據可以被讀走&#xff0c;開始進行讀操作&#xff0c;SCL低電平時&#xff0c;數據發生改變&#xff1…

VMware運維效率提升50%,RVTools管理更簡單

RVTools 是一款專為 VMware 虛擬化環境量身打造的高效管理工具&#xff0c;基于 .NET 4.7.2 框架開發&#xff0c;并與 VMware vSphere Management SDK 8.0 和 CIS REST API 深度集成&#xff0c;能夠全面呈現虛擬化平臺的各項關鍵數據。該工具不僅能夠詳細列出虛擬機、CPU、內…

JS 中 json數據 與 base64、ArrayBuffer之間轉換

JS 中 json數據 與 base64、ArrayBuffer之間轉換 json 字符串進行 base64 編碼 function jsonToBase64(json) {return Buffer.from(json).toString(base64); }base64 字符串轉為 json 字符串 function base64ToJson(base64) {try {const binaryString atob(base64);const js…

介紹 C++ 中的智能指針及其應用:以 PyTorch框架自動梯度AutogradMeta為例

介紹 C 中的智能指針及其應用&#xff1a;以 AutogradMeta 為例 在 C 中&#xff0c;智能指針&#xff08;Smart Pointer&#xff09;是用于管理動態分配內存的一種工具。它們不僅自動管理內存的生命周期&#xff0c;還能幫助避免內存泄漏和野指針等問題。在深度學習框架如 Py…

python +t kinter繪制彩虹和云朵

python t kinter繪制彩虹和云朵 彩虹&#xff0c;簡稱虹&#xff0c;是氣象中的一種光學現象&#xff0c;當太陽光照射到半空中的水滴&#xff0c;光線被折射及反射&#xff0c;在天空上形成拱形的七彩光譜&#xff0c;由外圈至內圈呈紅、橙、黃、綠、藍、靛、紫七種顏色。事實…

Zabbix5.0版本(監控Nginx+PHP服務狀態信息)

目錄 1.監控Nginx服務狀態信息 &#xff08;1&#xff09;通過Nginx監控模塊&#xff0c;監控Nginx的7種狀態 &#xff08;2&#xff09;開啟Nginx狀態模塊 &#xff08;3&#xff09;配置監控項 &#xff08;4&#xff09;創建模板 &#xff08;5&#xff09;用默認鍵值…

Python入門教程 —— 字符串

字符串介紹 字符串可以理解為一段普通的文本內容,在python里,使用引號來表示一個字符串,不同的引號表示的效果會有區別。 字符串表示方式 a = "Im Tom" # 一對雙引號 b = Tom said:"I am Tom" # 一對單引號c = Tom said:"I\m Tom" # 轉義…

AcWing練習題:差

讀取四個整數 A,B,C,D&#xff0c;并計算 (AB?CD)的值。 輸入格式 輸入共四行&#xff0c;第一行包含整數 A&#xff0c;第二行包含整數 B&#xff0c;第三行包含整數 C&#xff0c;第四行包含整數 D。 輸出格式 輸出格式為 DIFERENCA X&#xff0c;其中 X 為 (AB?CD) 的…