Spark Streaming 概述及入門案例

一、介紹

1. 不同的數據處理

  • 從數據處理的方式:
    • 流式數據處理(Streaming)
    • 批量數據處理(Batch)
  • 從數據處理的延遲:
    • 實時數據處理(毫秒級別)
    • 離線數據處理(小時或天級別)

2. 簡介

  • SparkStreaming 是一個準實時(秒或分鐘級別)、微批量的數據處理框架
  • SparkStreaming 支持的很多數據輸入源,如: Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等。數據輸入后可以用 Spark 的高度抽象原語,如: map、 reduce、 join、 window 等進行運算。結果能保存在很多地方,如 HDFS,數據庫等
  • SparkStreaming 使用離散化流 (discretized stream) 作為抽象表示,稱為 DStream,它是對 RDD 在實時數據處理場景的一種封裝

3. 特點

  • 易用
  • 容錯
  • 易整合到 Spark 體系

二、基本架構

在這里插入圖片描述

1. 背壓機制

  • Spark 1.5 以前版本:通過設置靜態配制參數 spark.streaming.receiver.maxRate 來限制 Receiver 的數據接收速率,來解決生產和消費速率不對等造成的內存溢出等問題,但當數據生產和數據消費的能力都高于 maxRate 時會造成資源利用率下降等問題
  • Spark 1.5 版本及以后版本:為了動態控制數據接收速率來適配集群數據處理能力,引入了背壓機制 (Spark Streaming Backpressure),即根據 JobScheduler 反饋作業的執行信息來動態調整 Receiver 數據接收率
  • 通過屬性 spark.streaming.backpressure.enabled 來配置啟用 backpressure 機制,默認值為 false,即不啟用

三、入門 WordCount 案例

需求:使用 netcat 工具向 9999 端口不斷的發送數據,通過 SparkStreaming 讀取端口數據并統計不同單詞出現的次數

1. 引入依賴

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version>
</dependency>

2. 代碼實現

object SparkStreamingWC {def main(args: Array[String]): Unit = {// 1.創建 SparkStreaming 環境對象val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")/*創建 StreamingContext 對象需要傳遞兩個參數1.SparkConf:配置對象2.Duration:批處理的周期,即數據采集周期,單位為毫秒,內置有 Seconds/Minute 等對象 */val ssc = new StreamingContext(conf, Seconds(3))// 2.邏輯處理val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)val words = line.flatMap(_.split(" "))val wordAsOne = words.map((_, 1))val wordCount: DStream[(String, Int)] = wordAsOne.reduceByKey(_ + _)wordCount.print()// 3.運行采集器并等待關閉/*采集器是一個長期運行的任務,所以不能關閉 ssc,也不能讓 main 方法執行完畢*/ssc.start()ssc.awaitTermination()}
}

3. 測試

  • 打開 cmd 命令窗口,執行 nc -lp 9999 命令(Linux 下為 nc -lk 999)
  • 運行程序 main 方法
  • 在窗口中輸入測試字符串(以空格分隔),觀察程序命令行輸出結果

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/22330.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/22330.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/22330.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在Red Hat Enterprise Linux 9上使用Docker快速安裝并部署RocketMQ

在Red Hat Enterprise Linux 9上快速安裝和部署RocketMQ可以按照以下步驟進行&#xff1a; 1. 安裝Docker 首先&#xff0c;確保Docker已經安裝在你的系統上。 更新系統包并安裝依賴項&#xff1a; sudo yum update -y sudo yum install -y yum-utils device-mapper-persiste…

2024年5月份面試總結

2024年5月份找工作/面試總結&#xff1a; 本人前段時間寫了剛過完年后的一個月內找工作的情況&#xff0c;請查看https://blog.csdn.net/zgaoq/article/details/136236788?spm1001.2014.3001.5501 但是后續寫的總結被和諧了&#xff0c;不知道這篇文章能不能發出來。 1、5月份…

系統架構設計師【第19章】: 大數據架構設計理論與實踐 (核心總結)

文章目錄 19.1 傳統數據處理系統存在的問題19.2 大數據處理系統架構分析19.2.1 大數據處理系統面臨挑戰19.2.2 大數據處理系統架構特征 19.3 Lambda架構19.3.1 Lambda架構對大數據處理系統的理解19.3.2 Lambda架構應用場景19.3.3 Lambda架構介紹19.3.4  Lambda架構的實…

數據庫的換行符到前端不展示了

是這樣的原本數據庫中的數據都是帶有\n換行符的但是頁面卻一直不展示 解決辦法 <el-drawer title"預覽" :visible.sync"drawer" :with-header"false"><div v-for"(item, index) in cardArray" :key"index"><…

如何將 Vue 應用程序部署到 Cloudflare Pages

在現代 Web 開發中&#xff0c;Vue.js 已經成為了一個非常受歡迎的前端框架。它的簡潔、高效和靈活性使得開發人員可以輕松構建出色的用戶界面和交互體驗。而 Cloudflare Pages 提供了一個簡單而強大的方式來托管和部署靜態網站和應用程序。本文將介紹如何將 Vue 應用程序部署到…

Android常見內存泄漏場景總結

一、非靜態內部類造成的內存泄漏 造成原因&#xff1a;非靜態內部類默認會持有外部類的引用&#xff0c;如果內部類的生命周期超過了外部類就會造成內存泄漏。 場景&#xff1a;當Activity銷毀后&#xff0c;由于內部類中存在異步耗時任務還在執行&#xff0c;導致Activity實…

[補題記錄]Leetcode 3.無重復字符的最長子串

傳送門&#xff1a;無重復字符的最長子串 Problem/題意 給一個由英文、數字、符號、空格組成的字符串&#xff0c;找出其中不含有重復字符的最長子串的長度。 比如&#xff1a;abb 包含了重復字符 b&#xff1b;abc 沒有包含重復字符。 注意是子串&#xff0c;不是子序列。 …

內網安全:橫向傳遞攻擊(PTH || PTK || PTT 哈希票據傳遞)

內網安全&#xff1a;橫向傳遞攻擊. 橫向移動就是在拿下對方一臺主機后&#xff0c;以拿下的那臺主機作為跳板&#xff0c;對內網的其他主機再進行后面滲透&#xff0c;利用既有的資源嘗試獲取更多的憑據、更高的權限&#xff0c;一步一步拿下更多的主機&#xff0c;進而達到控…

CodeMirror 創建標簽計算編輯器

在日常開發中對于一些數據計算場景可能會遇到標簽計算的需求&#xff0c;下面關于如何使用CodeMirror實現標簽計算編輯功能。 1&#xff0c;結果圖 2&#xff0c;主體代碼邏輯 大家只需要復制粘貼主要codeMirror使用邏輯即可 <template><el-dialogref"dialogRe…

抖店商家疑惑,自然流量突然下滑,為什么呢?

大家好&#xff0c;我是噴火龍。 很多的抖店商家會遇到一種情況&#xff0c;那就是自己店鋪的流量好好的&#xff0c;不知道怎么的就突然沒流量了&#xff0c;各方面的數據都斷崖式的下降。 為什么會這樣呢&#xff1f;原因有以下幾點&#xff0c;大家可以檢查一下&#xff0…

低代碼和零代碼軟件時代質量管理(QM)和質量管理系統(QMS)

【前言】 質量控制過程的目的是為了確保產品的制造標準得到保持和改進。質量控制過程使公司能夠滿足客戶的期望&#xff0c;同時確保產品質量的一致水平。采用這些標準創造了一種公司文化&#xff0c;鼓勵所有員工努力實現高質量的生產標準。低代碼和零代碼軟件可以成為質量控…

【網絡通信層】華為云連接MQTT設備

本文介紹華為云設備連接到設備的操作。 目錄 一、在華為云創建設備 二、連接MQTT 三、通信 一、在華為云創建設備 現在華為云上可以免費使用部分受限服務&#xff0c;包括免費創建自己的設備連接。 首先&#xff0c;登錄華為云平臺共建智能世界云底座-華為云 (huaweicl…

徐州服務器機柜租用的好處

隨著服務器的廣泛應用&#xff0c;越來越多的企業都選擇服務器托管和租用等服務&#xff0c;在選擇服務器租用之前我們還需要進行機柜租用&#xff0c;便于放置所適用的服務器&#xff0c;那么企業選擇徐州服務器機柜租用的好處有哪些呢&#xff1f; 選擇徐州服務器機柜租用&am…

Qt Window Dialog 無標題欄 ,無邊框,可拖動

1.效果&#xff1a; 2. 主要實現步驟&#xff1a; 設置窗口 flag&#xff1a; this->setWindowFlags(Qt::FramelessWindowHint | Qt::WindowStaysOnTopHint); 創建變量存儲位置 QPoint m_dragPosition; 對鼠標左鍵按下和移動事件做處理 void DraggableDialog::mousePre…

Java 集合中的組內平均值計算

在Java開發中&#xff0c;集合&#xff08;Collection&#xff09;是一個重要的數據結構&#xff0c;廣泛應用于各種場景。計算集合中的組內平均值是一個常見的操作&#xff0c;尤其是在數據分析、統計和處理時更為重要。本文將深入探討如何使用Java來計算集合中的組內平均值&a…

Web 頁面性能衡量指標-以用戶為中心的效果指標

Web 頁面性能衡量指標-以用戶為中心的性能指標 以用戶為中心的性能指標是理解和改進站點體驗的關鍵點 一、以用戶為中心的性能指標 1. 指標是用來干啥的&#xff1f; 指標是用來衡量性能和用戶體驗的 2. 指標類型 感知加載速度&#xff1a;網頁可以多快地加載網頁中的所有…

如何在vs code中安裝JavaFX

目錄 下載JavaFX 配置vs code工程 編寫測試代碼 下載JavaFX 網站鏈接:https://openjfx.io 選擇如下的版本

從1.0到4.0,看看你公司的費控模式是第幾代?

早在2021年9月&#xff0c;艾媒咨詢在《2021H1企業費控報銷服務專題研究報告》中&#xff0c;第一次對企業費用管控模式的進化歷程進行了清晰的劃代&#xff1a;1.0手工模式、2.0網報模式、3.0移動報銷模式、4.0智能費控模式。 2022年&#xff0c;在《中國企業費用管理發展白皮…

vr樣板房實景漫游展示制作解決了地產商難題

家具和軟裝銷售中&#xff0c;如何直觀展示產品優勢一直是老板們的難題。口頭描述往往難以讓客戶真正感受到產品的獨特之處&#xff0c;這不僅影響了銷售效果&#xff0c;也增加了溝通的難度。但現在&#xff0c;我們有了全新的解決方案——樣板房VR全景編輯軟件! 樣板房VR全景…

精打細算:可燃氣體報警器檢驗收費的合理規劃與管理

隨著工業化的快速發展&#xff0c;可燃氣體報警器已經成為各類工業場所不可或缺的安全設備。 它的主要功能是在可燃氣體濃度超標時發出警報&#xff0c;有效預防和減少火災、爆炸等安全事故的發生。 然而&#xff0c;為了確保報警器能夠持續、準確地發揮作用&#xff0c;定期…