Kafka和Spark Streaming的組合使用學習筆記(Spark 3.5.1)

一、安裝Kafka

1.執行以下命令完成Kafka的安裝:
cd ~  //默認壓縮包放在根目錄
sudo tar -zxf  kafka_2.12-2.6.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.6.0 kafka-2.6.0
sudo chown -R qiangzi ./kafka-2.6.0

二、啟動Kafaka

1.首先需要啟動Kafka,打開一個終端,輸入下面命令啟動Zookeeper服務:
cd  /usr/local/kafka-2.6.0
./bin/zookeeper-server-start.sh  config/zookeeper.properties

注意:以上現象是Zookeeper服務器已經啟動,正在處于服務狀態。不要關閉!

2.打開第二個終端輸入下面命令啟動Kafka服務:
cd  /usr/local/kafka-2.6.0
./bin/kafka-server-start.sh  config/server.properties//加了“&”的命令,Kafka就會在后臺運行,即使關閉了這個終端,Kafka也會一直在后臺運行。
bin/kafka-server-start.sh  config/server.properties  &

注意:同樣不要誤以為死機了,而是Kafka服務器已經啟動,正在處于服務狀態。

三、創建Topic

1.再打開第三個終端,然后輸入下面命令創建一個自定義名稱為“wordsender”的Topic:
cd /usr/local/kafka-2.6.0
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsender
2.然后,可以執行如下命令,查看名稱為“wordsender”的Topic是否已經成功創建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新開一個終端(記作“監控輸入終端”),執行如下命令監控Kafka收到的文本:
cd /usr/local/kafka-2.6.0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordsender

注意,所有這些終端窗口都不要關閉,要繼續留著后面使用。

四、Spark準備工作

Kafka和Flume等高級輸入源,需要依賴獨立的庫(jar文件),因此,需要為Spark添加相關jar包。訪問MVNREPOSITORY官網(http://mvnrepository.com),下載spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本號,3.5.1表示Spark版本號。然后,把這兩個文件復制到Spark目錄的jars目錄下(即“/usr/local/spark-3.5.1/jars”目錄)。此外,還需要把“/usr/local/kafka-2.6.0/libs”目錄下的kafka-clients-2.6.0.jar文件復制到Spark目錄的jars目錄下。

cd ~  .jar文件默認放在根目錄
sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo cp /usr/local/kafka-2.6.0/libs/kafka-clients-2.6.0.jar /usr/local/spark-3.5.1/jars/

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下載頁面:

Maven Repository: org.apache.spark ? spark-streaming-kafka-0-10_2.12 ? 3.5.1 (mvnrepository.com)

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下載頁面:

Maven Repository: org.apache.spark ? spark-token-provider-kafka-0-10_2.12 ? 3.5.1 (mvnrepository.com)

進入下載頁面以后,如下圖所示,點擊紅色方框內的“jar”,就可以下載JAR包了。

五、編寫Spark Streaming程序使用Kafka數據源

1.編寫生產者(Producer)程序
(1)新打開一個終端,然后,執行如下命令創建代碼目錄和代碼文件:
cd /usr/local/spark-3.5.1
mkdir mycode
cd ./mycode
mkdir kafka
mkdir -p kafka/src/main/scala
vi kafka/src/main/scala/KafkaWordProducer.scala
(2)使用vi編輯器新建了KafkaWordProducer.scala

它是用來產生一系列字符串的程序,會產生隨機的整數序列,每個整數被當作一個單詞,提供給KafkaWordCount程序去進行詞頻統計。請在KafkaWordProducer.scala中輸入以下代碼:

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +"<messagesPerSec> <wordsPerMessage>")System.exit(1)}val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args// Zookeeper connection propertiesval props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)// Send some messageswhile(true) {(1 to messagesPerSec.toInt).foreach { messageNum =>val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString).mkString(" ")print(str)println()val message = new ProducerRecord[String, String](topic, null, str)producer.send(message)}Thread.sleep(1000)}}
}
2.編寫消費者(Consumer)程序

在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目錄下創建文件KafkaWordCount.scala,用于單詞詞頻統計,它會把KafkaWordProducer發送過來的單詞進行詞頻統計,代碼內容如下:

cd /usr/local/spark-3.5.1/mycode
vi kafka/src/main/scala/KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject KafkaWordCount{def main(args:Array[String]){val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val sc = new SparkContext(sparkConf)sc.setLogLevel("ERROR")val ssc = new StreamingContext(sc,Seconds(10))ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //設置檢查點,如果存放在HDFS上面,則寫成類似ssc.checkpoint("/user/hadoop/checkpoint")這種形式,但是,要啟動Hadoopval kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean))val topics = Array("wordsender")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.foreachRDD(rdd => {val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))val lines = maped.map(_._2)val words = lines.flatMap(_.split(" "))val pair = words.map(x => (x,1))val wordCounts = pair.reduceByKey(_+_)wordCounts.foreach(println)})ssc.startssc.awaitTermination}
}
3.在路徑“file:///usr/local/spark/mycode/kafka/”下創建“checkpoint”目錄作為預寫式日志的存放路徑。
cd ./kafka
mkdir checkpoint
4.繼續在當前目錄下創建StreamingExamples.scala代碼文件,用于設置log4j:
cd /usr/local/spark-3.5.1/mycode/
vi kafka/src/main/scala/StreamingExamples.scala/*StreamingExamples.scala*/
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}                                                                                 /** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}                                                                                                                     }                                                                                                                     } 
5.編譯打包程序

現在在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目錄下,就有了如下3個scala文件:

然后,執行下面命令新建一個simple.sbt文件:

cd /usr/local/spark-3.5.1/mycode/kafka/
vim simple.sbt

在simple.sbt中輸入以下代碼:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"

然后執行下面命令,進行編譯打包:

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/sbt-1.9.0/sbt/sbt  package

打包成功界面

6. 運行程序

首先,啟動Hadoop,因為如果前面KafkaWordCount.scala代碼文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")這種形式,這時的檢查點是被寫入HDFS,因此需要啟動Hadoop。啟動Hadoop的命令如下:

cd  /usr/local/hadoop-2.10.1
./sbin/start-dfs.sh
或者
start-dfs.sh
start-yarn.sh

啟動Hadoop成功以后,就可以測試剛才生成的詞頻統計程序了。
要注意,之前已經啟動了Zookeeper服務和Kafka服務,因為之前那些終端窗口都沒有關閉,所以,這些服務一直都在運行。如果不小心關閉了之前的終端窗口,那就參照前面的內容,再次啟動Zookeeper服務,啟動Kafka服務。
然后,新打開一個終端,執行如下命令,運行“KafkaWordProducer”程序,生成一些單詞(是一堆整數形式的單詞):

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/sime-project_2.12-1.0.jar localhost:9092 wordsender  3  5

注意,上面命令中,“localhost:9092 wordsender 3 5”是提供給KafkaWordProducer程序的4個輸入參數,第1個參數“localhost:9092”是Kafka的Broker的地址,第2個參數“wordsender”是Topic的名稱,我們在KafkaWordCount.scala代碼中已經把Topic名稱寫死掉,所以,KafkaWordCount程序只能接收名稱為“wordsender”的Topic。第3個參數“3”表示每秒發送3條消息,第4個參數“5”表示每條消息包含5個單詞(實際上就是5個整數)。
執行上面命令后,屏幕上會不斷滾動出現類似如下的新單詞:

不要關閉這個終端窗口,讓它一直不斷發送單詞。然后,再打開一個終端,執行下面命令,運行KafkaWordCount程序,執行詞頻統計:

cd  /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordCount" ./target/scala-2.12/simple-oject_2.12-1.0.jar
運行上面命令以后,就啟動了詞頻統計功能,屏幕上就會顯示如下類似信息:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/10533.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/10533.shtml
英文地址,請注明出處:http://en.pswp.cn/web/10533.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

計算機畢業設計Python地震預測系統 地震數據分析可視化 地震爬蟲 大數據畢業設計 Flink Hadoop 深度學習 機器學習 人工智能 知識圖譜

學生信息 姓名&#xff1a;  祁浩 題目&#xff1a; 基于Python的中國地震數據分析與可視化系統的設計與實現 學號&#xff1a; 2020135211 班級&#xff1a; 20大數據本科2班 指導教師&#xff1a; 劉思思 答辯過程 學生開題陳述 為了讓學習者更好的了解了解地震…

Coze扣子開發指南:AI零代碼編程創建插件

在Coze扣子中創建插件&#xff0c;有兩種方式&#xff0c;一是用API&#xff0c;具體方式參照上一篇文章《Coze扣子開發指南&#xff1a;用免費API自己創建插件》&#xff0c;還有一種方式就是編程&#xff0c;不過有了AI的幫助&#xff0c;即使不會編程的人&#xff0c;也可以…

HarmonyOS開發案例:【生活健康app之獲取成就】(3)

獲取成就 本節將介紹成就頁面。 功能概述 成就頁面展示用戶可以獲取的所有勛章&#xff0c;當用戶滿足一定的條件時&#xff0c;將點亮本頁面對應的勛章&#xff0c;沒有得到的成就勛章處于熄滅狀態。共有六種勛章&#xff0c;當用戶連續完成任務打卡3天、7天、30天、50天、…

用大于meilisearch-java-0.7.0.jar的報錯的解決

Elasticsearch 做為老牌搜索引擎&#xff0c;功能基本滿足&#xff0c;但復雜&#xff0c;重量級&#xff0c;適合大數據量。 MeiliSearch 設計目標針對數據在 500GB 左右的搜索需求&#xff0c;極快&#xff0c;單文件&#xff0c;超輕量。 所以&#xff0c;對于中小型項目來說…

阿里云服務器在線安裝nginx

??個人主頁: 蒾酒 &#x1f525;系列專欄&#xff1a;《nginx實戰》 目錄 內容簡介 安裝步驟 1.root用戶登錄連接阿里云服務器 2.在usr/local下新建nginx目錄 3.安裝 1安裝下載工具 2下載nginx壓縮包 3解壓 4安裝nginx依賴的庫 5編譯并安裝 6啟動nginx 7開啟…

藍橋杯-遞增三元組(三種解法,二分, 雙指針, 前綴和)

給定三個整數數組 A[A1,A2,…AN], B[B1,B2,…BN], C[C1,C2,…CN], 請你統計有多少個三元組 (i,j,k) 滿足&#xff1a; 1≤i,j,k≤N Ai<Bj<Ck 輸入格式 第一行包含一個整數 N。 第二行包含 N 個整數 A1,A2,…AN。 第三行包含 N 個整數 B1,B2,…BN。 第四行包含 N …

【圖像畸變校正】

接上篇文章&#xff1a;【魚眼&#xff0b;普通相機】相機標定 附代碼&#xff1a; 方法一&#xff1a; 使用cv2.undistort """Create May 11, 2024author Wang Jiajun """import cv2 import numpy as npdef correct(img,camera_fileE:/cali…

怎么使用遠程桌面傳輸文件?

微軟提供的遠程桌面功能是一項強大的工具&#xff0c;可讓您在同一網絡下遠程訪問和管理其他計算機。除了遠程控制&#xff0c;它還支持文件傳輸功能&#xff0c;為Windows用戶提供了極大的便利。在接下來的內容中&#xff0c;我們將介紹如何使用遠程桌面傳輸文件。 如何從遠程…

PADS:生成自交叉平面區域

根據板外形鋪銅方法&#xff1a; pads根據板外形鋪銅_鋪銅如何根據板子形狀改變-CSDN博客 根據板外形創建平面區域出現問題&#xff1a; 解決方法&#xff1a;去找結構&#xff0c;讓他把出圖之前把線合并了

【數據結構】順序棧

順序棧 一、相關概念 棧和隊列是操作受限的線性表&#xff0c;是限定性的數據結構&#xff1b;棧分為順序棧和鏈式棧棧只能在一端進行操作&#xff08;插入、刪除&#xff09;棧是限定僅在表尾進行插入或刪除操作的線性表&#xff0c;因此&#xff0c;對棧來說&#xff0c;表…

https免費證書獲取

獲取免費證書的網址&#xff1a; Certbot 1. 進入你的linux系統&#xff0c;先安裝snapd&#xff0c; yum install snapd 2. 啟動snapd service snapd start 3.安裝 Certbot snap install --classic certbot 注意如下出現此錯誤時&#xff0c;需要先建立snap 軟連接后&am…

山東大學軟件學院創新項目實訓開發日志——第11周

山東大學軟件學院創新項目實訓開發日志——第11周 項目名稱&#xff1a;ModuFusion Visionary&#xff1a;實現跨模態文本與視覺的相關推薦 -------項目目標&#xff1a; 本項目旨在開發一款跨模態交互式應用&#xff0c;用戶可以上傳圖片或視頻&#xff0c;并使用文本、點、…

Golang | Leetcode Golang題解之第84題柱狀圖中最大的矩形

題目&#xff1a; 題解&#xff1a; func largestRectangleArea(heights []int) int {n : len(heights)left, right : make([]int, n), make([]int, n)for i : 0; i < n; i {right[i] n}mono_stack : []int{}for i : 0; i < n; i {for len(mono_stack) > 0 &&am…

SQLite索引名稱重復(index already exists)

文章目錄 概述報錯信息解決方案 概述 SQLite中創建單列索引的方式&#xff0c;跟MySQL類似&#xff1a; CREATE INDEX index_name ON table_name (column_name);但是也有不同的地方&#xff1a; MySQL中索引名稱在表內部不重復即可。 SQLite中索引名稱在整個庫中必須是不重復…

整理項目中經常用到的正則

目錄 1、手機號碼 2、Email 郵箱 3、QQ 號碼 4、非零正整數 5、URL 地址 6、身份證號 項目中難免會經常使用到表單&#xff0c;而表單項校驗就需要用到正則&#xff0c; 所以整理總結一下自己項目中使用比較頻繁的一些正則校驗邏輯。 正則表達式 是由一些具有特殊含義的…

JavaScript之數據類型(3)——object進階

前言&#xff1a; 利用基礎知識來構建對象會發現十分復雜&#xff0c;我們可以結合其他的知識點來為我們object的構建進行優化。 <1>工廠法&#xff1a; 基本格式&#xff1a; function creatObject(屬性值1,屬性值2,屬性值3,...,屬性值n) {var 對象名 new Object();對…

在IDEA中使用 Spring Initializr 新建 spring boots 項目

【在IDEA中使用 Spring Initializr 新建 spring boots 項目 - CSDN Apphttp://t.csdnimg.cn/mVs5P Spring Initializr 創建spring boots項目 添加到pom.xml <dependency> <groupId>mysql</groupId> <artifactId>mysql-connec…

Python | Leetcode Python題解之第84題柱狀圖中最大的矩形

題目&#xff1a; 題解&#xff1a; class Solution:def largestRectangleArea(self, heights: List[int]) -> int:n len(heights)left, right [0] * n, [n] * nmono_stack list()for i in range(n):while mono_stack and heights[mono_stack[-1]] > heights[i]:righ…

代碼隨想錄算法訓練營day21 | 513.找樹左下角的值、112. 路徑總和、106.從中序與后序遍歷序列構造二叉樹

513.找樹左下角的值 迭代法比較簡單&#xff0c;層序遍歷&#xff0c;找到最下面一層的第一個節點。題目已經說明節點數>1了 class Solution:def findBottomLeftValue(self, root: Optional[TreeNode]) -> int:queue collections.deque()queue.append(root)result ro…

LeetCode題練習與總結:復原IP地址--93

一、題目描述 有效 IP 地址 正好由四個整數&#xff08;每個整數位于 0 到 255 之間組成&#xff0c;且不能含有前導 0&#xff09;&#xff0c;整數之間用 . 分隔。 例如&#xff1a;"0.1.2.201" 和 "192.168.1.1" 是 有效 IP 地址&#xff0c;但是 &qu…