Spark-streaming核心編程

1.導入依賴?

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>

<version>3.0.0</version>

</dependency>

2.編寫代碼?

創建SparkConfStreamingContext

定義Kafka相關參數,如bootstrap serversgroup idkeyvaluedeserializer

使用KafkaUtils.createDirectStream方法創建DStream,該方法接受StreamingContext、位置策略、消費者策略等參數。

提取數據中的value部分,并進行word count計算。

啟動StreamingContext并等待其終止。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object DirectAPI {

? def main(args: Array[String]): Unit = {

??? val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")

??? val ssc = new StreamingContext(sparkConf,Seconds(3))

??? //定義kafka相關參數

??? val kafkaPara :Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG

????? ->"node01:9092,node02:9092,node03:9092",

????? ConsumerConfig.GROUP_ID_CONFIG->"kafka",

????? "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",

????? "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

??? )

??? //通過讀取kafka數據,創建DStream

??? val kafkaDStream:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](

????? ssc,LocationStrategies.PreferConsistent,

????? ConsumerStrategies.Subscribe[String,String](Set("kafka"),kafkaPara)

??? )

??? //提取出數據中的value部分

??? val valueDStream :DStream[String] = kafkaDStream.map(record=>record.value())

??? //wordCount計算邏輯

??? valueDStream.flatMap(_.split(" "))

????? .map((_,1))

????? .reduceByKey(_+_)

????? .print()

??? ssc.start()

??? ssc.awaitTermination()

? }

? }

3.運行程序?

開啟Kafka集群。

4.使用Kafka生產者產生數據。

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka

?5運行Spark Streaming程序,接收Kafka生產的數據并進行處理。

6.查看消費進度?

使用Kafka提供的kafka-consumer-groups.sh腳本查看消費組的消費進度。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/76975.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/76975.shtml
英文地址,請注明出處:http://en.pswp.cn/web/76975.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Kafka的ISR機制是什么?如何保證數據一致性?

一、Kafka ISR機制深度解析 1. ISR機制定義 ISR&#xff08;In-Sync Replicas&#xff09;是Kafka保證數據一致性的核心機制&#xff0c;由Leader副本&#xff08;復雜讀寫&#xff09;和Follower副本(負責備份)組成。當Follower副本的延遲超過replica.lag.time.max.ms&#…

Docker 基本概念與安裝指南

Docker 基本概念與安裝指南 一、Docker 核心概念 1. 容器&#xff08;Container&#xff09; 容器是 Docker 的核心運行單元&#xff0c;本質是一個輕量級的沙盒環境。它基于鏡像創建&#xff0c;包含應用程序及其運行所需的依賴&#xff08;如代碼、庫、環境變量等&#xf…

數據庫監控 | MongoDB監控全解析

PART 01 MongoDB&#xff1a;靈活、可擴展的文檔數據庫 MongoDB作為一款開源的NoSQL數據庫&#xff0c;憑借其靈活的數據模型&#xff08;基于BSON的文檔存儲&#xff09;、水平擴展能力&#xff08;分片集群&#xff09;和高可用性&#xff08;副本集架構&#xff09;&#x…

OpenFeign和Gateway

OpenFeign和Gateway 一.OpenFeign介紹二.快速上手1.引入依賴2.開啟openfeign的功能3.編寫客戶端4.修改遠程調用代碼5.測試 三.OpenFeign參數傳遞1.傳遞單個參數2.多個參數、傳遞對象和傳遞JSON字符串3.最佳方式寫代碼繼承的方式抽取的方式 四.部署OpenFeign五.統一服務入口-Gat…

spark-streaming(二)

DStream創建&#xff08;kafka數據源&#xff09; 1.在idea中的 pom.xml 中添加依賴 <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version> </…

JAVA聚焦OutOfMemoryError 異常

個人主頁 文章專欄 在正文開始前&#xff0c;我想多說幾句&#xff0c;也就是吐苦水吧…最近這段時間一直想寫點東西&#xff0c;停下來反思思考一下。 心中萬言&#xff0c;真正執筆時又不知先寫些什么。通常這個時候&#xff0c;我都會隨便寫寫&#xff0c;文風極像散文&…

如何在Spring Boot中配置自定義端口運行應用程序

Spring Boot 應用程序默認在端口 8080 上運行嵌入式 Web 服務器&#xff08;如 Tomcat、Jetty 或 Undertow&#xff09;。然而&#xff0c;在開發、測試或生產環境中&#xff0c;開發者可能需要將應用程序配置為在自定義端口上運行&#xff0c;例如避免端口沖突、適配微服務架構…

linux嵌入式(進程與線程1)

Linux進程 進程介紹 1. 進程的基本概念 定義&#xff1a;進程是程序的一次執行過程&#xff0c;擁有獨立的地址空間、資源&#xff08;如內存、文件描述符&#xff09;和唯一的進程 ID&#xff08;PID&#xff09;。 組成&#xff1a; 代碼段&#xff1a;程序的指令。 數據…

智馭未來:NVIDIA自動駕駛安全白皮書與實驗室創新實踐深度解析

一、引言&#xff1a;自動駕駛安全的范式革新 在當今數字化浪潮的推動下&#xff0c;全球自動駕駛技術正大步邁入商業化的深水區。隨著越來越多的自動駕駛車輛走上道路&#xff0c;其安全性已成為整個行業乃至社會關注的核心命題。在這個關鍵的轉折點上&#xff0c;NVIDIA 憑借…

多模態大模型 Qwen2.5-VL 的學習之旅

Qwen-VL 是阿里云研發的大規模視覺語言模型&#xff08;Large Vision Language Model, LVLM&#xff09;。Qwen-VL 可以以圖像、文本、檢測框作為輸入&#xff0c;并以文本和檢測框作為輸出。Qwen-VL 系列模型性能強大&#xff0c;具備多語言對話、多圖交錯對話等能力&#xff…

Redis 與 Memcache 全面對比:功能、性能與應用場景解析

Redis 和 Memcache 都是常用的內存數據庫&#xff0c;以下是它們在多個方面的能力比較&#xff1a; 一、數據類型 Redis&#xff1a;支持豐富的數據類型&#xff0c;如字符串&#xff08;String&#xff09;、哈希&#xff08;Hash&#xff09;、列表&#xff08;List&#x…

Oracle--PL/SQL編程

前言&#xff1a;本博客僅作記錄學習使用&#xff0c;部分圖片出自網絡&#xff0c;如有侵犯您的權益&#xff0c;請聯系刪除 PL/SQL&#xff08;Procedural Language/SQL&#xff09;是Oracle數據庫中的一種過程化編程語言&#xff0c;構建于SQL之上&#xff0c;允許編寫包含S…

新增優惠券

文章目錄 概要整體架構流程技術細節小結 概要 接口分析 一個基本的新增接口&#xff0c;按照Restful風格設計即可&#xff0c;關鍵是請求參數。之前表分析時已經詳細介紹過這個頁面及其中的字段&#xff0c;這里不再贅述。 需要特別注意的是&#xff0c;如果優惠券限定了使…

力扣面試經典150題(第二十三題)- KMP算法

問題 給你兩個字符串 haystack 和 needle &#xff0c;請你在 haystack 字符串中找出 needle 字符串的第一個匹配項的下標&#xff08;下標從 0 開始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;則返回 -1 。 示例 1&#xff1a; 輸入&#xff1a;haysta…

PostgreSQL 的 MVCC 機制了解

PostgreSQL 的 MVCC 機制了解 PostgreSQL 使用多版本并發控制(MVCC)作為其核心并發控制機制&#xff0c;這是它與許多其他數據庫系統的關鍵區別之一。MVCC 允許讀操作不阻塞寫操作&#xff0c;寫操作也不阻塞讀操作&#xff0c;從而提供高度并發性。 一 MVCC 基本原理 1.1 M…

互聯網大廠Java面試:RocketMQ、RabbitMQ與Kafka的深度解析

互聯網大廠Java面試&#xff1a;RocketMQ、RabbitMQ與Kafka的深度解析 面試場景 面試官&#xff1a;馬架構&#xff0c;您好&#xff01;歡迎參加我們的面試。今天我們將圍繞消息中間件展開討論&#xff0c;尤其是RocketMQ、RabbitMQ和Kafka。您有十年的Java研發和架構設計經…

《巧用DeepSeek快速搞定數據分析》書籍分享

文章目錄 前言內容簡介作者簡介購書鏈接書籍目錄 前言 隨著大數據時代的到來&#xff0c;數據分析和人工智能技術正迅速改變著各行各業的運作方式。DeepSeek作為先進的人工智能模型&#xff0c;不僅在自然語言處理領域具有廣泛應用&#xff0c;還在數據分析、圖像識別、推薦系…

4.Three.js 中 Camera 攝像機詳解

一、什么是 Camera&#xff1f; 在 Three.js 中&#xff0c;Camera&#xff08;攝像機&#xff09;決定了我們如何觀察三維場景。 你可以把它理解為我們“眼睛”的位置和方向&#xff0c;場景中的物體再復雜&#xff0c;如果沒有攝像機&#xff0c;就沒有“觀察角度”&#x…

gem5-gpu教程03 當前的gem5-gpu軟件架構(因為涉及太多專業名詞所以用英語表達)

Current gem5-gpu Software Architecture 這是當前gem5-gpu軟件架構的示意圖。 Ruby是在gem5-gpu上下文中用于處理CPU和GPU之間內存訪問的高度可配置的內存系統 CudaCore (src/gpu/gpgpu-sim/cuda_core.*, src/gpu/gpgpu-sim/CudaCore.py) Wrapper for GPGPU-Sim shader_cor…

負載均衡的實現方式有哪些?

負載均衡實現方式常見的有: 軟件負載均衡、硬件負載均衡、DNS負載均衡 擴展 二層負載均衡&#xff1a;在數據鏈路層&#xff0c;基于MAC地址進行流量分發&#xff0c;較少見于實際應用中 三層負載均衡&#xff1a;在網絡層&#xff0c;基于IP地址來分配流量&#xff0c;例如某…