2023_Spark_實驗二十五:SparkStreaming讀取Kafka數據源:使用Direct方式

SparkStreaming讀取Kafka數據源:使用Direct方式

一、前提工作

  • 安裝了zookeeper

  • 安裝了Kafka

  • 實驗環境:kafka + zookeeper + spark

  • 實驗流程

二、實驗內容

實驗要求:實現的從kafka讀取實現wordcount程序

啟動zookeeper

zk.sh start# zk.sh腳本 參考教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

啟動Kafka

kf.sh start# kf.sh 參照教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

?(測試用,實驗不做)創建Kafka主題,如test,可參考:Kafka的安裝與基本操作

--topic 定義topic名

--replication-factor? 定義副本數

--partitions? 定義分區數

--bootstrap-server??連接的Kafka Broker主機名稱和端口號

--create?創建主題

--describe?查看主題詳細描述

# 創建kafka主題測試
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --create --bootstrap-server hd1:9092 --replication-factor 3 --partitions 1 --topic gnutest2# 再次查看first主題的詳情
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server hd1:9092 --describe --topic gnutest2

啟動Kafka控制臺生產者,可參考:Kafka的安裝與基本操作

# 創建kafka生產者
/opt/module/kafka_2.12-3.0.0/bin/kafka-console-producer.sh --bootstrap-server hd1:9092 --topic gnutest2

創建maven項目

添加kafka依賴

       <!--- 添加streaming依賴 ---><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>${spark.version}</version></dependency><!--- 添加streaming kafka依賴 ---><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.13</artifactId><version>3.4.1</version></dependency>

編寫程序,如下所示:

package examsimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport java.lang/*** @projectName SparkLearning2023  * @package exams  * @className exams.SparkStreamingReadKafka  * @description ${description}  * @author pblh123* @date 2023/12/1 15:19* @version 1.0**/object SparkStreamingReadKafka {def main(args: Array[String]): Unit = {//  1. 創建spark,sc對象if (args.length != 2) {println("您需要輸入一個參數")System.exit(5)}val musrl: String = args(0)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(musrl).getOrCreate()val sc: SparkContext = spark.sparkContext// 生成streamingContext對象val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))//  2. 代碼主體val bststrapServers = args(1)val kafkaParms: Map[String, Object] = Map[String, Object]("bootstrap.servers" -> bststrapServers, //kafka列表"key.deserializer" -> classOf[StringDeserializer], k和v 的序列化類型"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream", //消費者組"auto.offset.reset" -> "latest", //如果沒有記錄偏移量,第一次從最開始讀,有偏移量,接著偏移量讀"enable.auto.commit" -> (true: java.lang.Boolean) // 消費者不自動提交偏移量)val topics = Array("gnutest2", "t100")// createDirectStream: 主動拉取數據val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParms))val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value))//kafka 是一個key value 格式的, 默認key 為null ,一般用不上val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _)// 打印resultRDD.print()//  3. 關閉sc,spark對象ssc.start()ssc.awaitTermination()ssc.stop()sc.stop()spark.stop()}
}

配置輸入參數

生產者追加數據

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/207232.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/207232.shtml
英文地址,請注明出處:http://en.pswp.cn/news/207232.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

生成元(Digit Generator, ACM/ICPC Seoul 2005, UVa1583)

如果x加上x的各個數字之和得到y&#xff0c;就說x是y的生成元。 給出n&#xff08;1≤n≤100000&#xff09;&#xff0c;求最小生成元。 無解輸出0。 例如&#xff0c;n216&#xff0c;121&#xff0c;2005時的解分別為198&#xff0c;0&#xff0c;1979。 我的思路很簡單&am…

element-UI中el-scrollbar的使用

在elment-ui中有這么一個滾動條&#xff0c;當鼠標over到內容部分才會顯示&#xff0c;移開鼠標之后滾動條就會隱藏起來&#xff0c;相較于原生的滾動條比較美觀。 <el-scrollbar> //將滾動條的內部的內容放在里面即可 </el-scrollbar> 在使用過程中&#xff…

SNMP陷阱監控工具

SNMP&#xff08;簡單網絡管理協議&#xff09;是網絡管理的一個重要方面&#xff0c;其中網絡設備&#xff08;包括路由器、交換機和服務器&#xff09;在滿足預定義條件時將SNMP陷阱作為異步通知發送到中央管理系統。簡而言之&#xff0c;每當發生關鍵服務器不可用或硬件高溫…

microblaze仿真

verdivcs (1) vlogan/vcs增加編譯選項 -debug_accessall -kdb -lca (2) 在 simulation 選項中加入下面三個選項 -guiverdi UVM_VERDI_TRACE"UVM_AWARERALHIERCOMPWAVE" UVM_TR_RECORD 這里 -guiverdi是啟動verdi 和vcs聯合仿真。UVM_VERDI_TRACE 這里是記錄 U…

第四十二篇,MATLAB on Linux

最近在Ubuntu上安裝了一把MATLAB&#xff0c;以下操作親測有效。 一、版本 Linux&#xff1a;Ubuntu 18.04 MATLAB&#xff1a;R2021a Linux版&#xff0c;910 MATLAB下載鏈接&#xff1a;提取碼MUYU&#xff0c;感謝大佬無私奉獻&#xff01; 二、安裝 詳細的安裝步驟不…

linux高級篇基礎理論七(Tomcat)

??作者&#xff1a;小劉在C站 ??個人主頁&#xff1a; 小劉主頁 ??不能因為人生的道路坎坷,就使自己的身軀變得彎曲;不能因為生活的歷程漫長,就使求索的 腳步遲緩。 ??學習兩年總結出的運維經驗&#xff0c;以及思科模擬器全套網絡實驗教程。專欄&#xff1a;云計算技…

算法題,文本左右對齊

/*** 給定一個單詞數組 words 和一個長度 maxWidth &#xff0c;重新排版單詞&#xff0c;使其成為每行恰好有 maxWidth 個字符&#xff0c;且左右兩端對齊的文本。** 你應該使用 “貪心算法” 來放置給定的單詞&#xff1b;也就是說&#xff0c;盡可能多地往每行中放置單詞。必…

ubuntu22.04系統更改完resolv.conf后 重啟網絡服務后resolv.conf被重置

vi /etc/systemd/resolved.conf&#xff0c; [Resolve] DNS8.8.8.8 114.114.114.114 192.168.4.2 2.重啟域名解析服務 systemctl restart systemd-resolved systemctl enable systemd-resolved 3.備份當前的/etc/resolve.conf&#xff0c;并重新設置/run/systemd/resolve/res…

Docker 安裝 Centos和寶塔

1. 安裝centos docker pull centos:centos7 2. 創建docker容器&#xff1a;newbt 代表容器名 docker run -i -t -d --name newbt -p 2000:20 -p 2100:21 -p 8000:80 -p 4430:443 -p 8880:888 -p 8888:8888 -p 38444:38444 -p 2200:22 -p 2300:23 -p 2500:25 -p 3306:3306 -p 6…

c++ 解析zip文件,實現對流式文件pptx內容的修改

libzip 官網地址&#xff1a;示例代碼 #include <iostream> #include <cstdlib> #include <cstring> #include <ctime> #include <zip.h>//解析原始zip內容&#xff0c;保存為新的zip文件 int ziptest(const char* inputPath, const char* out…

vue pc官網頂部導航欄組件

官網頂部導航分為一級導航和二級導航 導航的樣子 文件的層級 router 文件層級 header 組件代碼 <h1 class"logo-wrap"><router-link to"/"><img class"logo" :src"$config.company.logo" alt"" /><i…

直面雙碳目標,優維科技攜手奧意建筑打造綠色低碳建筑數智云平臺

優維“雙碳”戰略合作建筑 為落實創新驅動發展戰略&#xff0c;增強深圳工程建設領域科技創新能力&#xff0c;促進技術進步、科技成果轉化和推廣應用&#xff0c;根據《深圳市工程建設領域科技計劃項目管理辦法》《深圳市住房和建設局關于組織申報2022年深圳市工程建設領域科…

K8S集群優化的可執行優化

目錄 前期環境優化 1.永久關閉交換分區 2.#加載 ip_vs 模塊 3.調整內核參數 4.#使用Systemd管理的Cgroup來進行資源控制與管理 5.開機自啟kubelet 6.內核參數優化方案 7.etcd優化 默認etcd空間配額大小為 2G&#xff0c;超過 2G 將不再寫入數據。通過給etcd配置 --quo…

IO流(Java)

IO流 在學習IO流之前&#xff0c;我們首先了解一下File File File即文件或文件夾路徑對象&#xff0c;其示例類可以是存在路徑也可以是未創造路徑 File有什么用 用于創建或操作文件或文件夾 File常用API API部分看得懂會查會用即可 IO流 IO(Input 讀數據 Output寫數據…

Qt/QML編程學習之心得:工程中的文件(十二)

Qt生成了工程之后,尤其在QtCreator產生對應的project項目之后,就如同VisualStudio一樣,會產生相關的工程文件,那么這些工程文件都是做什么的呢?這里介紹一下。比如產生了一個Qt Widget application,當然如果Qt Quick Application工程會有所不同。 一、.pro和.pro.user …

企業計算機服務器中了360勒索病毒如何解密,勒索病毒解密數據恢復

網絡技術的不斷應用與發展&#xff0c;為企業的生產運營提供了極大便利&#xff0c;但隨之而來的網絡安全威脅也不斷增加。近期&#xff0c;云天數據恢復中心接到很多企業的求助&#xff0c;企業的計算機服務器遭到了360后綴勒索病毒攻擊&#xff0c;導致企業的所有數據被加密&…

游戲策劃常用的ChatGPT通用提示詞模板

游戲設計&#xff1a;請幫助我設計一個有趣的游戲。 游戲玩法&#xff1a;如何設計游戲的玩法&#xff1f; 游戲機制&#xff1a;如何設計游戲的機制&#xff1f; 游戲平衡&#xff1a;如何平衡游戲中的各種元素&#xff1f; 游戲美術&#xff1a;如何設計游戲的美術風格&a…

『PyTorch學習筆記』如何快速下載huggingface模型/數據—全方法總結

如何快速下載huggingface模型/數據—全方法總結 文章目錄 一. 如何快速下載huggingface大模型1.1. IDM(Windows)下載安裝連接1.2. 推薦 huggingface 鏡像站1.3. 管理huggingface_hub cache-system(緩存系統) 二. 參考文獻 一. 如何快速下載huggingface大模型 推薦 huggingface…

希亦洗地機跟追覓洗地機入手哪個更好?追覓跟希亦洗地機深度評估

近年來&#xff0c;洗地機可以同時處理干濕垃圾&#xff0c;同時降低用戶在清潔過程中的勞動強度&#xff0c;成為了家居清潔的新寵&#xff0c;但是目前市場上的品牌和型號層出不窮。用戶往往很難挑選&#xff0c;本文挑選了兩款目前口碑最好的兩款洗地機給大家做一個全面的評…

Android 記錄一些Framework開發的命令

源碼編譯流程 1. "source build/envsetup.sh" (source可以用 . 代替&#xff0c;即". build/envsetup.sh") 2. "lunch"&#xff0c;并選擇要編譯的項目或者"choosecombo" 3. "make idegen -j4" (這里的 -j4 表示用4線程來…