spark-streaming(二)

DStream創建(kafka數據源)

1.在idea中的 pom.xml 中添加依賴

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version>
</dependency>

2.創建一個新的object,并寫入以下代碼

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord/*** 通過 DirectAPI 0 - 10 消費 Kafka 數據* 消費的 offset 保存在 _consumer_offsets 主題中*/
object DirectAPI {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")val ssc = new StreamingContext(sparkConf, Seconds(3))// 定義 Kafka 相關參數val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",ConsumerConfig.GROUP_ID_CONFIG -> "kafka","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer])// 通過讀取 Kafka 數據,創建 DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara))// 提取出數據中的 value 部分val valueDStream = kafkaDStream.map(record => record.value())// WordCount 計算邏輯valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()ssc.awaitTermination()}
}    

3.在虛擬機中,開啟kafka、zookeeper、yarn、dfs集群

4.創建一個新的topic---kafka,用于接下來的操作

查看所有的topic(是否創建成功)

開啟kafka生產者,用于產生數據

啟動idea中的代碼,在虛擬機中輸入數據

輸入后可以在idea中查看到

查看消費進度

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/76969.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/76969.shtml
英文地址,請注明出處:http://en.pswp.cn/web/76969.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

JAVA聚焦OutOfMemoryError 異常

個人主頁 文章專欄 在正文開始前&#xff0c;我想多說幾句&#xff0c;也就是吐苦水吧…最近這段時間一直想寫點東西&#xff0c;停下來反思思考一下。 心中萬言&#xff0c;真正執筆時又不知先寫些什么。通常這個時候&#xff0c;我都會隨便寫寫&#xff0c;文風極像散文&…

如何在Spring Boot中配置自定義端口運行應用程序

Spring Boot 應用程序默認在端口 8080 上運行嵌入式 Web 服務器&#xff08;如 Tomcat、Jetty 或 Undertow&#xff09;。然而&#xff0c;在開發、測試或生產環境中&#xff0c;開發者可能需要將應用程序配置為在自定義端口上運行&#xff0c;例如避免端口沖突、適配微服務架構…

linux嵌入式(進程與線程1)

Linux進程 進程介紹 1. 進程的基本概念 定義&#xff1a;進程是程序的一次執行過程&#xff0c;擁有獨立的地址空間、資源&#xff08;如內存、文件描述符&#xff09;和唯一的進程 ID&#xff08;PID&#xff09;。 組成&#xff1a; 代碼段&#xff1a;程序的指令。 數據…

智馭未來:NVIDIA自動駕駛安全白皮書與實驗室創新實踐深度解析

一、引言&#xff1a;自動駕駛安全的范式革新 在當今數字化浪潮的推動下&#xff0c;全球自動駕駛技術正大步邁入商業化的深水區。隨著越來越多的自動駕駛車輛走上道路&#xff0c;其安全性已成為整個行業乃至社會關注的核心命題。在這個關鍵的轉折點上&#xff0c;NVIDIA 憑借…

多模態大模型 Qwen2.5-VL 的學習之旅

Qwen-VL 是阿里云研發的大規模視覺語言模型&#xff08;Large Vision Language Model, LVLM&#xff09;。Qwen-VL 可以以圖像、文本、檢測框作為輸入&#xff0c;并以文本和檢測框作為輸出。Qwen-VL 系列模型性能強大&#xff0c;具備多語言對話、多圖交錯對話等能力&#xff…

Redis 與 Memcache 全面對比:功能、性能與應用場景解析

Redis 和 Memcache 都是常用的內存數據庫&#xff0c;以下是它們在多個方面的能力比較&#xff1a; 一、數據類型 Redis&#xff1a;支持豐富的數據類型&#xff0c;如字符串&#xff08;String&#xff09;、哈希&#xff08;Hash&#xff09;、列表&#xff08;List&#x…

Oracle--PL/SQL編程

前言&#xff1a;本博客僅作記錄學習使用&#xff0c;部分圖片出自網絡&#xff0c;如有侵犯您的權益&#xff0c;請聯系刪除 PL/SQL&#xff08;Procedural Language/SQL&#xff09;是Oracle數據庫中的一種過程化編程語言&#xff0c;構建于SQL之上&#xff0c;允許編寫包含S…

新增優惠券

文章目錄 概要整體架構流程技術細節小結 概要 接口分析 一個基本的新增接口&#xff0c;按照Restful風格設計即可&#xff0c;關鍵是請求參數。之前表分析時已經詳細介紹過這個頁面及其中的字段&#xff0c;這里不再贅述。 需要特別注意的是&#xff0c;如果優惠券限定了使…

力扣面試經典150題(第二十三題)- KMP算法

問題 給你兩個字符串 haystack 和 needle &#xff0c;請你在 haystack 字符串中找出 needle 字符串的第一個匹配項的下標&#xff08;下標從 0 開始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;則返回 -1 。 示例 1&#xff1a; 輸入&#xff1a;haysta…

PostgreSQL 的 MVCC 機制了解

PostgreSQL 的 MVCC 機制了解 PostgreSQL 使用多版本并發控制(MVCC)作為其核心并發控制機制&#xff0c;這是它與許多其他數據庫系統的關鍵區別之一。MVCC 允許讀操作不阻塞寫操作&#xff0c;寫操作也不阻塞讀操作&#xff0c;從而提供高度并發性。 一 MVCC 基本原理 1.1 M…

互聯網大廠Java面試:RocketMQ、RabbitMQ與Kafka的深度解析

互聯網大廠Java面試&#xff1a;RocketMQ、RabbitMQ與Kafka的深度解析 面試場景 面試官&#xff1a;馬架構&#xff0c;您好&#xff01;歡迎參加我們的面試。今天我們將圍繞消息中間件展開討論&#xff0c;尤其是RocketMQ、RabbitMQ和Kafka。您有十年的Java研發和架構設計經…

《巧用DeepSeek快速搞定數據分析》書籍分享

文章目錄 前言內容簡介作者簡介購書鏈接書籍目錄 前言 隨著大數據時代的到來&#xff0c;數據分析和人工智能技術正迅速改變著各行各業的運作方式。DeepSeek作為先進的人工智能模型&#xff0c;不僅在自然語言處理領域具有廣泛應用&#xff0c;還在數據分析、圖像識別、推薦系…

4.Three.js 中 Camera 攝像機詳解

一、什么是 Camera&#xff1f; 在 Three.js 中&#xff0c;Camera&#xff08;攝像機&#xff09;決定了我們如何觀察三維場景。 你可以把它理解為我們“眼睛”的位置和方向&#xff0c;場景中的物體再復雜&#xff0c;如果沒有攝像機&#xff0c;就沒有“觀察角度”&#x…

gem5-gpu教程03 當前的gem5-gpu軟件架構(因為涉及太多專業名詞所以用英語表達)

Current gem5-gpu Software Architecture 這是當前gem5-gpu軟件架構的示意圖。 Ruby是在gem5-gpu上下文中用于處理CPU和GPU之間內存訪問的高度可配置的內存系統 CudaCore (src/gpu/gpgpu-sim/cuda_core.*, src/gpu/gpgpu-sim/CudaCore.py) Wrapper for GPGPU-Sim shader_cor…

負載均衡的實現方式有哪些?

負載均衡實現方式常見的有: 軟件負載均衡、硬件負載均衡、DNS負載均衡 擴展 二層負載均衡&#xff1a;在數據鏈路層&#xff0c;基于MAC地址進行流量分發&#xff0c;較少見于實際應用中 三層負載均衡&#xff1a;在網絡層&#xff0c;基于IP地址來分配流量&#xff0c;例如某…

MyBatis 和 MyBatis-Plus 在 Spring Boot 中的配置、功能對比及 SQL 日志輸出的詳細說明,重點對比日志輸出的配置差異

以下是 MyBatis 和 MyBatis-Plus 在 Spring Boot 中的配置、功能對比及 SQL 日志輸出的詳細說明&#xff0c;重點對比日志輸出的配置差異&#xff1a; 1. MyBatis 和 MyBatis-Plus 核心對比 特性MyBatisMyBatis-Plus定位基礎持久層框架MyBatis 的增強版&#xff0c;提供代碼生…

《數據結構世界的樂高積木:順序表的奇幻旅程》

目錄 1. 線性表 2. 順序表 2.1 概念與結構 2.2 分類 2.2.1 靜態順序表 2.2.2 動態順序表 2.3 動態順序表的實現 1. 線性表 線性表&#xff08;linear list&#xff09;是n個具有相同特性的數據元素的有限序列。線性表是?種在實際中?泛使?的數據結構&#xff0c;常?的…

RHCE 練習二:通過 ssh 實現兩臺主機免密登錄以及 nginx 服務通過多 IP 區分多網站

一、題目要求 1.配置ssh實現A&#xff0c;B主機互相免密登錄 2.配置nginx服務&#xff0c;通過多ip區分多網站 二、實驗 實驗開始前需準備兩臺 linux 主機便于充當服務端以及客戶端&#xff0c;兩臺主機 IP 如下圖&#xff1a; 實驗1&#xff1a;配置 ssh 實現 A&#xff0…

第十五屆藍橋杯 2024 C/C++組 好數

題目&#xff1a; 題目描述&#xff1a; 題目鏈接&#xff1a; 好數 思路&#xff1a; 第一種思路詳解&#xff1a; 因為每次檢查數都是從個位開始&#xff0c;所以對于每一個數都是先檢查奇數位再檢查偶數位&#xff0c;即存在先檢查奇數位再檢查偶數位的循環。注意一次完…

展銳Android13狀態欄默認顯示電池電量百分比

展銳Android13電池狀態默認不顯示電池電量百分比&#xff0c;打開 /frameworks/base/packages/SettingsProvider/res/values/defaults.xml 在xml的文件最后&#xff0c;增加一項配置def_show_battery_percent&#xff1a; <?xml version"1.0" encoding"u…