SparkStreaming在實時處理的兩個場景示例

簡介

Spark Streaming是Apache Spark生態系統中的一個組件,用于實時流式數據處理。它提供了類似于Spark的API,使開發者可以使用相似的編程模型來處理實時數據流。

Spark Streaming的工作原理是將連續的數據流劃分成小的批次,并將每個批次作為RDD(彈性分布式數據集)來處理。這樣,開發者可以使用Spark的各種高級功能,如map、reduce、join等,來進行實時數據處理。Spark Streaming還提供了內置的窗口操作、狀態管理、容錯處理等功能,使得開發者能夠輕松處理實時數據的復雜邏輯。

Spark Streaming支持多種數據源,包括Kafka、Flume、HDFS、S3等,因此可以輕松地集成到各種數據管道中。它還能夠與Spark的批處理和SQL引擎進行無縫集成,從而實現流式處理與批處理的混合使用。
在這里插入圖片描述

本文以 TCP、kafka場景講解spark streaming的使用

消息隊列下的信息鋪抓

類似消息隊列的有redis、kafka等核心組件。
本文以kafka為例,向kafka中實時抓取數據,

pom.xml中添加以下依賴

<dependencies><!-- Spark Core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.0</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.2.0</version></dependency><!-- Spark SQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.0</version></dependency><!-- Kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Spark Streaming Kafka Connector --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.2.0</version></dependency><!-- PostgreSQL JDBC --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.24</version></dependency>
</dependencies>

創建項目編寫以下代碼實現功能

package org.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;public class SparkStreamingKafka {public static void main(String[] args) throws InterruptedException {// 創建 Spark 配置SparkConf sparkConf = new SparkConf().setAppName("spark_kafka").setMaster("local[*]").setExecutorEnv("setLogLevel", "ERROR");//設置日志等級為ERROR,避免日志增長導致的磁盤膨脹// 創建 Spark Streaming 上下文JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 間隔兩秒撲捉一次// 創建 Spark SQL 會話SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();// 設置 Kafka 相關參數Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("auto.offset.reset", "earliest");// auto.offset.reset可指定參數有// latest:從分區的最新偏移量開始讀取消息。// earliest:從分區的最早偏移量開始讀取消息。// none:如果沒有有效的偏移量,則拋出異常。kafkaParams.put("enable.auto.commit", true);  //采用自動提交offset 的模式kafkaParams.put("auto.commit.interval.ms",2000);//每隔離兩秒提交一次commited-offsetkafkaParams.put("group.id", "spark_kafka"); //消費組名稱// 創建 Kafka streamCollection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主題名稱JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)  //訂閱kafka);//定義數據結構StructType schema = new StructType().add("key", DataTypes.LongType).add("value", DataTypes.StringType);kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {// 轉換為 DataFrameDataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {return RowFactory.create(record.offset(), record.value());  //將偏移量和value聚合}), schema);// 寫入到 PostgreSQLdf.write()//選擇寫入數據庫的模式.mode(SaveMode.Append)//采用追加的寫入模式//協議.format("jdbc")//option 參數.option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 連接 URL//確定表名.option("dbtable", "public.spark_kafka")//指定表名.option("user", "postgres") // PostgreSQL 用戶名.option("password", "postgres") // PostgreSQL 密碼.save();});// 啟動 Spark StreamingstreamingContext.start();// 等待 Spark Streaming 應用程序終止streamingContext.awaitTermination();}
}

在執行代碼前,向創建名為spark_kafka的topic

kafka-topics.sh --create --topic spark_kafka --bootstrap-server 10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

向spark_kafka 主題進行隨機推數

kafka-producer-perf-test.sh --topic spark_kafka --thrghput 10 --num-records 10000 --record-size 100000 --producer-props bootstrap.servers=10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

運行過程中消費的offset會一直被提交到每一個分區
在這里插入圖片描述

此時在數據庫中查看,數據已經實時落地到庫中
在這里插入圖片描述

TCP

TCP環境下,實時監控日志的輸出,可用于監控設備狀態、環境變化等。當監測到異常情況時,可以實時發出警報。

package org.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;public class SparkStreamingKafka {public static void main(String[] args) throws InterruptedException {// 創建 Spark 配置SparkConf sparkConf = new SparkConf().setAppName("spark_kafka") // 設置應用程序名稱.setMaster("local[*]") // 設置 Spark master 為本地模式,[*]表示使用所有可用核心// 設置日志等級為ERROR,避免日志增長導致的磁盤膨脹.setExecutorEnv("setLogLevel", "ERROR");// 創建 Spark Streaming 上下文JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 間隔兩秒撲捉一次// 創建 Spark SQL 會話SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();// 設置 Kafka 相關參數Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092"); // Kafka 服務器地址kafkaParams.put("key.deserializer", StringDeserializer.class); // key 反序列化器類kafkaParams.put("value.deserializer", StringDeserializer.class); // value 反序列化器類kafkaParams.put("auto.offset.reset", "earliest"); // 從最早的偏移量開始消費消息kafkaParams.put("enable.auto.commit", true);  // 采用自動提交 offset 的模式kafkaParams.put("auto.commit.interval.ms", 2000); // 每隔兩秒提交一次 committed-offsetkafkaParams.put("group.id", "spark_kafka"); // 消費組名稱// 創建 Kafka streamCollection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主題名稱JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)  // 訂閱 Kafka);// 定義數據結構StructType schema = new StructType().add("key", DataTypes.LongType).add("value", DataTypes.StringType);kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {// 轉換為 DataFrameDataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {return RowFactory.create(record.offset(), record.value());  // 將偏移量和 value 聚合}), schema);// 寫入到 PostgreSQLdf.write()// 選擇寫入數據庫的模式.mode(SaveMode.Append) // 采用追加的寫入模式// 協議.format("jdbc")// option 參數.option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 連接 URL// 確定表名.option("dbtable", "public.spark_kafka") // 指定表名.option("user", "postgres") // PostgreSQL 用戶名.option("password", "postgres") // PostgreSQL 密碼.save();});// 啟動 Spark StreamingstreamingContext.start();// 等待 Spark Streaming 應用程序終止streamingContext.awaitTermination();}
}

在10.0.0.108 打開9999端口鍵入數值 ,使其被spark接收到并進行運算

nc -lk 9999

開啟端口可以鍵入數值 此時會在IDEA的控制臺顯示其計算值
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/714359.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/714359.shtml
英文地址,請注明出處:http://en.pswp.cn/news/714359.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

適配器模式 詳解 設計模式

適配器模式 適配器模式是一種結構型設計模式&#xff0c;其主要作用是解決兩個不兼容接口之間的兼容性問題。適配器模式通過引入一個適配器來將一個類的接口轉換成客戶端所期望的另一個接口&#xff0c;從而讓原本由于接口不匹配而無法協同工作的類能夠協同工作。 結構 適配…

想要調用淘寶開放平臺API,沒有申請應用怎么辦?

用淘寶自定義API接口可以訪問淘寶開放平臺API。 custom-自定義API操作 taobao.custom 公共參數 注冊賬號獲取API請求地址 名稱類型必須描述keyString是調用key&#xff08;必須以GET方式拼接在URL中&#xff09;secretString是調用密鑰api_nameString是API接口名稱&#xf…

Docker與虛擬機比較

在對比Docker和虛擬機前&#xff0c;先簡單了解下虛擬化&#xff0c;明確Docker和虛擬機分別對應的虛擬化級別&#xff0c;然后對Docker和虛擬機進行比較。需要注意的是&#xff0c;Docker和虛擬機并沒有什么可比性&#xff0c;而是Docker使用的容器技術和虛擬機使用的虛擬化技…

【K8S類型系統】一文梳理 K8S 各類型概念之間的關系(GVK/GVR/Object/Schema/RestMapper)

參考 k8s 官方文檔 https://kubernetes.io/zh-cn/docs/reference/https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.29/ 重點 Kubernetes源碼學習-kubernetes基礎數據結構 - 知乎 重點 Kubernetes類型系統 | 李乾坤的博客 重點 k8s源碼學習-三大核心數…

前端學習第二天-html提升

達標要求 了解列表的分類 熟練掌握列表的用法 熟練掌握表格的結構構成 合并單元格 表單的組成 熟練掌握表單控件分類的使用 1.列表 1.1 無序列表 <ul>&#xff1a;定義無序列表&#xff0c;并且只能包含<li>子元素。 <li>&#xff1a;定義列表項&a…

LZO索引文件失效說明

在hive中創建lzo文件和索引時&#xff0c;進行查詢時會出現問題.hive的默認輸入格式是開啟小文件合并的&#xff0c;會把索引也合并進來。所以要關閉hive小文件合并功能&#xff01;

Matlab:元胞自動機

元胞自動機是一種基于離散空間的動態系統&#xff0c;由許多簡單單元按照某些規則進行相互作用和演化而形成的復雜結構。元胞自動機可以用于模擬物理、生物、社會等領域的現象&#xff0c;以及進行優化、圖像處理、噪聲生成等方面的應用。 例1&#xff1a;生命游戲 nextState…

maven項目報錯Cannot resolve plugin org.apache.maven.plugins:maven-war-plugin:2.2

如果IDEA整合maven沒有問題&#xff0c;還是報這個錯誤&#xff0c;很大可能是由于在下載過程中存在網絡問題&#xff0c;導致文件下載一半而停止&#xff0c;但是已經在倉庫中存在這個文件夾&#xff0c;解決方法是刪除文件夾重新下載即可。 刪除本地倉庫下的\org\apache\mav…

(算法)位運算

常見的位運算符&#xff1a; 給定一個數n判斷他的二進制第x位是0還是1 把第x位修改為1 因為是只是修改n的某個位置&#xff0c;所以不應該移動改變n 既然修改為1&#xff0c;那么就要想到 | 運算符 把第x位修改為0 因為修改為0,所以要用&運算符 位圖思想 判定字符串…

C++17之std::invoke: 使用和原理探究(全)

目錄 1.概述 2.輔助類 3.原理分析 4.總結 1.概述 在之前的 C 版本中&#xff0c;要調用不同類型的可調用對象&#xff0c;需要使用不同的語法&#xff0c;例如使用函數調用運算符 () 來調用函數或函數指針&#xff0c;使用成員訪問運算符 -> 或 . 來調用成員函數。這樣的…

二維碼門樓牌管理系統技術服務的深度解析

文章目錄 前言一、標準地址名稱的定義與重要性二、二維碼門樓牌管理系統的核心技術三、標準地址名稱在二維碼門樓牌管理中的應用四、二維碼門樓牌管理系統的優勢與挑戰五、展望未來 前言 在數字化浪潮中&#xff0c;二維碼門樓牌管理系統以其高效、便捷的特性&#xff0c;正逐…

【一】【算法分析與設計】基礎測試

排列式 題目描述 7254是一個不尋常的數&#xff0c;因為它可以表示為7254 39 x 186&#xff0c;這個式子中1~9每個數字正好出現一次 輸出所有這樣的不同的式子&#xff08;乘數交換被認為是相同的式子&#xff09; 結果小的先輸出&#xff1b;結果相同的&#xff0c;較小的乘…

js 實戰小案例

實戰 時間 js 格式化時間 <script type"text/javascript">function formatDate(date) { let year date.getFullYear(); let month String(date.getMonth() 1).padStart(2, 0); // getMonth() 返回的月份是從0開始的&#xff0c;所以要加1&#xff0c;并…

【go從入門到精通】go包,內置類型和初始化順序

大家好&#xff0c;這是我給大家準備的新的一期專欄&#xff0c;專門講golang&#xff0c;從入門到精通各種框架和中間件&#xff0c;工具類庫&#xff0c;希望對go有興趣的同學可以訂閱此專欄。 go基礎 。 Go文件名&#xff1a; 所有的go源碼都是以 ".go" 結尾&…

Mamba 環境安裝:causal-conv1d和mamba-ssm報錯解決辦法

問題描述&#xff1a; 在執行命令 pip install causal_conv1d 和 mamba_ssm 出錯&#xff1a; 解決方案&#xff1a; 1、使用網友配置好的Docker環境&#xff0c;參考&#xff1a;解決causal_conv1d和mamba_ssm無法安裝 -&#xff1e; 直接使用Mamba基礎環境docker鏡像 DockH…

java實現圖片轉pdf,并通過流的方式進行下載(前后端分離)

首先需要導入相關依賴&#xff0c;由于具體依賴本人也不是記得很清楚了&#xff0c;所以簡短的說一下。 iText&#xff1a;PDF 操作庫&#xff0c;用于創建和操作 PDF 文件。可通過 Maven 或 Gradle 引入 iText 依賴。 MultipartFile&#xff1a;Spring 框架中處理文件上傳的類…

一臺工控機的能量

使用Docker搭建EPICS的IOC記錄 Zstack EPICS Archiver在小課題組的使用經驗 以前電子槍調試&#xff0c;用一臺工控機跑起束測后臺&#xff0c;這次新光源用的電子槍加工回來又是測試&#xff0c;又是用一臺工控機做起重復的事&#xff0c;不過生命在于折騰&#xff0c;重復的…

stm32——hal庫學習筆記(IIC)

一、IIC總線協議介紹&#xff08;掌握&#xff09; 二、AT24C02介紹&#xff08;了解&#xff09; 三、AT24C02讀寫時序&#xff08;掌握&#xff09; 四、AT24C02驅動步驟&#xff08;掌握&#xff09; 五、編程實戰&#xff08;掌握&#xff09; myiic.c #include "./B…

汽車虛擬仿真技術的實現、應用和未來

汽車虛擬仿真技術是一種利用計算機模擬汽車運行的技術&#xff0c;以實現對汽車行為的分析、評估和改進。汽車虛擬仿真技術是汽車工業中重要的開發設計和測試工具&#xff0c;可以大大縮短產品研發周期、降低研發成本和提高產品質量。本文將從汽車虛擬仿真技術的實現過程、應用…

Ubuntu18.04 系統上配置并運行SuperGluePretrainedNetwork(僅使用CPU)

SuperGlue是Magic Leap在CVPR 2020上展示的研究項目&#xff0c;它是一個圖神經網絡&#xff08;Graph Neural Network&#xff09;和最優匹配層&#xff08;Optimal Matching layer&#xff09;的結合&#xff0c;訓練用于對兩組稀疏圖像特征進行匹配。這個項目提供了PyTorch代…