Spark-streaming(一)

Spark-Streaming概述

Spark Streaming 用于流式數據的處理。

和?Spark 基于 RDD 的概念很相似,Spark Streaming 使用離散化流(discretized stream)作為抽象表示,叫作?DStream。

DStream 是隨時間推移而收到的數據的序列。

Spark-Streaming的特點:易用、容錯、易整合到spark體系。

Spark-Streaming架構

DStream實操

案例:詞頻統計

idea中運行

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object wordcount {def main(args: Array[String]): Unit = {// 創建 SparkConf 對象,設置運行模式為本地多線程,應用名為 streamingval sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")// 創建 StreamingContext 對象,設置批處理間隔為 3 秒val ssc = new StreamingContext(sparkConf, Seconds(3))// 從指定的主機和端口接收文本流數據val lineStreams = ssc.socketTextStream("node01", 9999)// 將每行文本拆分為單詞val wordStreams = lineStreams.flatMap(_.split(" "))// 為每個單詞映射為 (單詞, 1) 的鍵值對val wordAndOneStreams = wordStreams.map((_, 1))// 按單詞進行分組并對每個單詞的計數進行累加val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)// 打印每個批次中每個單詞的計數結果wordAndCountStreams.print()// 啟動流式計算ssc.start()// 等待計算終止ssc.awaitTermination()}
}

在虛擬機中輸入: nc -lk 9999? ?并輸入數據

結果:

解析:

對數據的操作也是按照 RDD 為單位來進行的

計算過程由 Spark Engine 來完成

DStream 創建

RDD隊列

案例:

循環創建幾個 RDD,將 RDD 放入隊列。通過 SparkStream 創建 Dstream,計算 WordCount

代碼

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutableobject RDD {def main(args: Array[String]): Unit = {// 創建 SparkConf 對象,設置運行模式為本地多線程,應用名為 RDDStreamval sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")// 創建 StreamingContext 對象,設置批處理間隔為 4 秒val ssc = new StreamingContext(sparkConf, Seconds(4))// 創建一個可變隊列,用于存儲 RDDval rddQueue = new mutable.Queue[RDD[Int]]()// 從隊列中創建輸入流,oneAtATime 為 false 表示可以同時處理多個 RDDval inputStream = ssc.queueStream(rddQueue, oneAtATime = false)// 將輸入流中的每個元素映射為 (元素, 1) 的鍵值對val mappedStream = inputStream.map((_, 1))// 按鍵對鍵值對進行聚合,統計每個鍵的出現次數val reducedStream = mappedStream.reduceByKey(_ + _)// 打印每個批次中每個鍵的計數結果reducedStream.print()// 啟動流式計算ssc.start()// 循環 5 次,每次向隊列中添加一個 RDD,并休眠 2 秒for (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}// 等待計算終止ssc.awaitTermination()}
}

運行結果:

自定義數據源

自定義數據源

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsetsimport org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiverimport scala.util.control.NonFatalclass CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {new Thread("Socket Receiver") {override def run(): Unit = {receive()}}.start()}def receive(): Unit = {var socket: Socket = nullvar reader: BufferedReader = nulltry {socket = new Socket(host, port)reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))var input: String = reader.readLine()while (!isStopped() && input != null) {store(input)input = reader.readLine()}} catch {case NonFatal(e) =>restart("Error receiving data", e)} finally {if (reader != null) {try {reader.close()} catch {case NonFatal(e) =>println(s"Error closing reader: ${e.getMessage}")}}if (socket != null) {try {socket.close()} catch {case NonFatal(e) =>println(s"Error closing socket: ${e.getMessage}")}}}restart("Restarting receiver")}override def onStop(): Unit = {}
}    

使用自定義的數據源采集數據

object sparkConf {def main(args: Array[String]): Unit = {try {// 創建 SparkConf 對象,設置運行模式為本地多線程,應用名為 streamval sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")// 創建 StreamingContext 對象,設置批處理間隔為 5 秒val ssc = new StreamingContext(sparkConf, Seconds(5))// 使用自定義 Receiver 創建輸入流val lineStream = ssc.receiverStream(new CustomerReceiver("node01", 9999))// 將每行文本拆分為單詞val wordStream = lineStream.flatMap(_.split(" "))// 為每個單詞映射為 (單詞, 1) 的鍵值對val wordAndOneStream = wordStream.map((_, 1))// 按單詞進行分組并對每個單詞的計數進行累加val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)// 打印每個批次中每個單詞的計數結果wordAndCountStream.print()// 啟動流式計算ssc.start()// 等待計算終止ssc.awaitTermination()
}}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/902786.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/902786.shtml
英文地址,請注明出處:http://en.pswp.cn/news/902786.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

CS144 Lab 6 實戰記錄:構建 IP 路由器

1 實驗背景與目標 在 CS144 的 Lab 6 中&#xff0c;我們需要在之前實現的 NetworkInterface&#xff08;Lab 5&#xff09;基礎上構建一個完整的 IP 路由器。路由器的主要任務是根據路由表將接收到的 IP 數據報轉發到正確的網絡接口&#xff0c;并發送給正確的下一跳&#xf…

【網絡安全】社會工程學策略

1. 社會工程學簡介 社會工程攻擊是威脅行為者常用的攻擊方式。這是因為&#xff0c;誘騙人們提供訪問權限、信息或金錢通常比利用軟件或網絡漏洞更容易。 您可能還記得&#xff0c;社會工程學是一種利用人為錯誤來獲取私人信息、訪問權限或貴重物品的操縱技術。它是一個涵蓋性…

【含文檔+PPT+源碼】基于SpringBoot的開放實驗管理平臺設計與實現

項目介紹 本課程演示的是一款基于SpringBoot的開放實驗管理平臺設計與實現&#xff0c;主要針對計算機相關專業的正在做畢設的學生與需要項目實戰練習的 Java 學習者。 1.包含&#xff1a;項目源碼、項目文檔、數據庫腳本、軟件工具等所有資料 2.帶你從零開始部署運行本套系統…

鴻蒙NEXT開發定位工具類 (WGS-84坐標系)(ArkTs)

import geoLocationManager from ohos.geoLocationManager; import { BusinessError, Callback } from ohos.base; import { LogUtil } from ./LogUtil; import { PermissionUtil } from ./PermissionUtil; import { map, mapCommon } from kit.MapKit; /*** 定位工具類 (WGS-8…

SSM從入門到上手-全面講解SSM框架的使用.

一、SSM框架整合 將Spring、Spring MVC和MyBatis結合在一起&#xff0c;形成一個高效且易于維護的Web應用程序架構。具體整合的方式如下&#xff1a; Spring管理Bean&#xff1a;Spring負責管理所有的Java對象&#xff0c;包括Service層、DAO層等。通過Spring的IoC容器進行依賴…

學員答題pk知識競賽小程序怎么做

制作學員答題PK知識競賽小程序&#xff0c;主要有以下步驟&#xff1a; 一、規劃設計 明確需求&#xff1a;確定小程序的使用場景是校園知識競賽、培訓機構考核還是企業內部培訓等。答題功能&#xff0c;規定答題的具體規則&#xff0c;包括題目類型&#xff08;單選、多選、…

視頻分析設備平臺EasyCVR視頻技術驅動下,監控上墻全組件解析與組網應用方案

隨著數字化進程的加速推進&#xff0c;視頻監控技術在工業、商業、社區等諸多領域得到了廣泛應用。盡管不同場景對監控功能的具體需求存在差異&#xff0c;但底層硬件架構具有顯著的共性特征。實際部署中&#xff0c;僅需依據網絡環境等實際情況&#xff0c;靈活調整設備的連接…

idea使用docker插件一鍵部署項目

一、首先保證我們電腦上已經安裝了docker docker -v查看docker版本&#xff0c;如果不能識別&#xff0c;需要先下載docker destop&#xff0c;在官網下載正常安裝即可。 安裝成功就可以使用docker 命令了 二、idea下載docker插件并配置docker參數 我是通過tcp連接docker服務…

SQL Tuning Advisor

什么是SQL Tuning Advisor STA可以用來優化那些已經被發現的高負載SQL. 默認情況下, Oracle數據庫在自動維護窗口中自動認證那些有問題的SQL并且執行優化建議&#xff0c;找尋提升高負載SQL執行計劃性能的方法. ** 如何查看自動優化維護窗口產生的報告? ** SQL> set ser…

uniapp-商城-31-shop頁面中的 我的訂單

前面的章節講了很多關于頁面 布局 的知識。 現在來看看其他欄目&#xff0c;我的訂單頁面。 1 頁面樣式圖 基本的樣式包含shop頁面 我的訂單 點擊我的訂單&#xff0c;跳轉到訂單頁面 點擊訂單的每一條訂單&#xff0c;跳轉到訂單詳情 2、創建訂單頁面 2.1 創建sub頁面文件…

深入探討JavaScript性能瓶頸與優化實戰指南

JavaScript作為現代Web開發的核心語言,其性能直接影響用戶體驗與業務指標。隨著2025年前端應用的復雜性持續增加,性能優化已成為開發者必須掌握的核心技能。本文將從性能瓶頸分析、優化策略、工具使用三個維度,結合實戰案例,系統梳理JavaScript性能優化的關鍵路徑。 一、Ja…

基于AI與drawio的圖表生成技術及其在學術研究中的應用前景分析

一、研究背景與沖突 在當今數字化時代&#xff0c;學術研究與信息傳播的方式發生了深刻變革。隨著數據量的爆炸式增長以及研究內容的日益復雜&#xff0c;高效、精準地呈現研究成果變得至關重要。圖表作為一種直觀、簡潔且信息承載量大的表達方式&#xff0c;在學術研究中扮演著…

uniapp 仿小紅書輪播圖效果

通過對小紅書的輪播圖分析&#xff0c;可得出以下總結&#xff1a; 1.單張圖片時容器根據圖片像素定高 2.多圖時輪播圖容器高度以首圖為錨點 3.比首圖長則固高左右留白 4.比首圖短則固寬上下留白 代碼如下&#xff1a; <template><view> <!--輪播--><s…

【ORACLE】記錄一些ORACLE的merge into語句的BUG

【ORACLE】記錄一些ORACLE的merge into語句的BUG 一、自相矛盾-DML重啟動行為差異,違反acid原則 發現版本&#xff1a;10g ~ 23ai 這個用例在我之前的文章里有提過&#xff0c;ORACLE和PG系關于并發事務行為有一個非常大的差異&#xff0c;就是ORACLE在某些并發沖突的場景下會…

2025上海車展:光峰科技全球首發“靈境”智能車載光學系統

當AI為光賦予思想&#xff0c;汽車將會變成什么樣&#xff1f;深圳光峰科技為您揭曉答案。 2025年4月23日&#xff0c;在剛剛開幕的“2025上海車展”上&#xff0c;全球領先的激光核心器件公司光峰科技舉辦了主題為“AI光影盛宴&#xff0c;智享未來出行”的媒體發布會&#x…

密碼學的hash函數,哈希碰撞, collision resistance, BTC用到的SHA-256簡介

密碼學中的哈希函數、哈希碰撞、抗碰撞性&#xff08;collision resistance&#xff09;以及比特幣中使用的 SHA-256 的簡明介紹&#xff1a; &#x1f9e9; 一、哈希函數&#xff08;Hash Function&#xff09; 定義&#xff1a; 哈希函數是一種將任意長度的輸入&#xff08;…

unity TEngine學習4

上一篇我們學習了UI部分&#xff0c;這一篇我們學習其他部分&#xff0c;按照老規矩還是先打開官方文檔 ResourceModule 在官方文檔里介紹了當前加載的設置&#xff0c;但是我們是小白看不懂&#xff0c;那就不管他內部怎么實現的&#xff0c;我們主要看下面的代碼給的方法&am…

【AI訓練環境搭建】在IDE(Pycharm或VSCode)上使用WSL2+Ubuntu22.04+Conda+Tensorflow+GPU進行機器學習訓練

本次實踐將在IDE&#xff08;Pycharm或VSCode&#xff09;上使用WSL2Ubuntu22.04TensorflowGPU進行機器學習訓練。基本原理是在IDE中拉起WSL2中的Python解釋器&#xff0c;并運行Python程序。要運行CondaTensorflowGPU你可能需要進行以下準備工作。 1. 此示例中將使用一個mnis…

【華為OD機試真題E卷】521、 機器人可活動的最大網格點數目 | 機試真題+思路參考+代碼解析(E卷復用)(C++)

文章目錄 一、題目題目描述輸入輸出樣例1 一、代碼與思路&#x1f9e0;C語言思路?C代碼 一、題目 參考鏈接&#xff1a;https://sars2025.blog.csdn.net/article/details/141748083 題目描述 現有一個機器人口&#xff0c;可放置于MxN的網格中任意位置&#xff0c;每個網格包…

windows端遠程控制ubuntu運行腳本程序并轉發ubuntu端腳本輸出的網頁

背景 對于一些只能在ubuntu上運行的腳本&#xff0c;并且這個腳本會在ubuntu上通過網頁展示運行結果。我們希望可以使用windows遠程操控ubuntu&#xff0c;在windows上查看網頁內容。 方法 start cmd.exe /k "sshpass -p passwd ssh namexxx.xxx.xxx.xxx "cd /hom…