Structured-Streaming集成Kafka

一、上下文

《Structured-Streaming初識》博客中已經初步認識了Structured-Streaming,Kafka作為目前最流行的一個分布式的實時流消息系統,是眾多實時流處理框架的最優數據源之一。下面我們就跟著官方例子來看看Structured-Streaming是如何集成Kafka的?

二、官方例子

這里我們先把官方例子貼出來,所屬包路徑為:org.apache.spark.examples.sql.streaming

該示例使用Kafka中一個或多個Topic的消息并進行字數統計。

object StructuredKafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 3) {System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +"<subscribe-type> <topics> [<checkpoint-location>]")System.exit(1)}val Array(bootstrapServers, subscribeType, topics, _*) = argsval checkpointLocation =if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toStringval spark = SparkSession.builder.appName("StructuredKafkaWordCount").getOrCreate()import spark.implicits._// 創建表示來自kafka的輸入行流的DataSetval lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option(subscribeType, topics).load().selectExpr("CAST(value AS STRING)").as[String]// 運行 word countval wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()// 開始運行將運行計數打印到控制臺的查詢val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()query.awaitTermination()}}

三、分析

1、參數解釋

運行該官方示例需要3或4個參數,分別是

  • Kafka的bootstrap-servers
  • 訂閱Kafka?TopicPartition 的類型
  • 訂閱Kafka的Topic
  • checkpointLocation(不是必須的)

bootstrap-servers用于連接Kafka集群。

訂閱類型有3種,且只能選擇1種:

  1. assign:手動指定分區消費,需要自己管理分區的分配和再平衡。需要指定一個Json字符串,例如:{"topicA":[0,1],"topicB":[2,4]}
  2. subscribe:訂閱一個或多個topic進行消費(逗號分割),Kafka會自動處理分區的分配和再平衡。
  3. subscribePattern:基于正則的topic訂閱方式,但可能增加一些復雜性和性能開銷。

Topic的指定根據訂閱類型的變化而變化。

checkpointLocation如果不指定默認會在/tmp下存放。

2、將從Kafka訂閱的數據做成一個DataSet

1、構建DataStreamReader

用于從外部存儲系統(如文件系統、鍵值存儲等)加載流式“數據集”的接口。使用`SparkSession.readStream`訪問此內容。

2、指定輸入源格式

默認的輸入源格式是parquet,這里指定的是 kafka,輸入源格式是DataStreamReader中的一個屬性。

private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
  val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default").doc("The default data source to use in input/output.").version("1.3.0").stringConf.createWithDefault("parquet")

3、用輸入的3個參數對DataStreamReader添加選項

DataStreamReader中維護了一個Map來接收這些選項,比如:

kafka.bootstrap.servers -> cdh1:9092

assign ->?{"topicA":[0,1],"topicB":[2,4]}

subscribe ->?topicA,topicB

subscribePattern -> topicP*

private var extraOptions = CaseInsensitiveMap[String](Map.empty)

4、加載輸入流數據為DataFrame

final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {def load(): DataFrame = loadInternal(None)private def loadInternal(path: Option[String]): DataFrame = {//*******//根據輸入源格式獲取相應的輸入源提供者//這里的 source 為 kafka ,因此會返回KafkaSourceProvider//它是 所有Kafka readers 和 writers 的提供者類//此外還有ConsoleSinkProvider、JdbcRelationProvider、TextSocketSourceProvider等等val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).getConstructor().newInstance()// 我們需要生成V1數據源,以便將其作為勻場傳遞給V2關系。目前我們無法確定是否真的要使用V2,因為我們不知道編寫者,也不知道查詢是否是連續的。val v1DataSource = DataSource(sparkSession,userSpecifiedSchema = userSpecifiedSchema,className = source,options = optionsWithPath.originalMap)val v1Relation = ds match {case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))case _ => None}ds match {//******Dataset.ofRows(sparkSession,StreamingRelationV2( //用于將[[表]]鏈接到流式[[LogicalPlan]]。Some(provider), source, table, dsOptions,table.schema.toAttributes, None, None, v1Relation))//******}}}

并將表中的數據設置成STRING

3、WordCount統計

在第2步的基礎上進行數據處理:

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

4、開始運行并將結果打印到控制臺

val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()

writeStream是用于將流式數據集的內容保存到外部存儲的接口。將返回一個DataStreamWriter

outputMode是指定如何將流式DataFrame/Dataset的數據寫入流式接收器。

  1. append:只有流式DataFrame/Dataset中的新行才會寫入接收器
  2. complete:每次有更新時,流式DataFrame/Dataset中的所有行都將寫入接收器
  3. update:每次有更新時,只有流式DataFrame/Dataset中更新的行才會寫入接收器。如果查詢不包含聚合,則相當于“append”模式

format是指定外部存儲,這里的取值有6種:memory、foreach、foreachBatch、console、table、noop。

四、運行

1、創建Topic

kafka-topics --create --topic structured-streaming-wc --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

2、啟動程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/

bin/run-example sql.streaming.StructuredKafkaWordCount cdh1:9092,cdh2:9092 subscribe structured-streaming-wc

3、向topic推送數據

kafka-console-producer --topic structured-streaming-wc --broker-list cdh1:9092,cdh2:9092,cdh3:9092

4、控制臺查看結果

?他和sparksql一樣默認的分區為200個,如果數據量很小,速度非常慢。需要根據數據量來設置自己的分區數。


大多數高校碩博生畢業要求需要參加學術會議,發表EI或者SCI檢索的學術論文會議論文:
可訪問艾思科藍官網,瀏覽即將召開的學術會議列表。會議如下:

第四屆大數據、信息與計算機網絡國際學術會議(BDICN 2025)

  • 廣州
  • https://ais.cn/u/fi2yym

第四屆電子信息工程、大數據與計算機技術國際學術會議(EIBDCT 2025)

  • 青島
  • https://ais.cn/u/nuQr6f

第六屆大數據與信息化教育國際學術會議(ICBDIE 2025)

  • 蘇州
  • https://ais.cn/u/eYnmQr

第三屆通信網絡與機器學習國際學術會議(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/65299.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/65299.shtml
英文地址,請注明出處:http://en.pswp.cn/web/65299.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Spring Boot 項目中集成 Kafka-03

在 Spring Boot 項目中集成 Kafka 有多種方式&#xff0c;適應不同的應用場景和需求。以下將詳細介紹幾種常用的集成方法&#xff0c;包括&#xff1a; 使用 Spring Kafka (KafkaTemplate 和 KafkaListener)使用 Spring Cloud Stream 與 Kafka Binder使用 Spring for Apache K…

生物醫學信號處理--緒論

前言 參考書籍&#xff1a;劉海龍&#xff0c;生物醫學信號處理&#xff0c;化學工業出版社 生物醫學信號分類 1、由生理過程自發或者誘發產生的電生理信號和非電生理信號 ? 電生理信號&#xff1a;ECG/心電、EEG/腦電、EMG/肌電、 EGG/胃電、 EOG/眼電 ? 非電生理信號&am…

unity 播放 序列幀圖片 動畫

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄 前言一、方法一&#xff1a;代碼控制播放序列幀1、設置圖片屬性2、創建Image組件3、簡單的代碼控制4、掛載代碼并賦值 二、方法二&#xff1a;直接使用1.Image上添加…

QT c++ 自定義按鈕類 加載圖片 美化按鈕

如果你有需要利用圖片美化按鈕的情況&#xff0c;本文能幫助你。 鼠標左鍵按下按鈕和松開&#xff0c;按鈕顯示不同的圖片。 1.按鈕類 //因為此類比較簡單&#xff0c;1個頭文件搞定&#xff0c;沒有cpp文件 #ifndef CUSTOMBUTTON_H #define CUSTOMBUTTON_H #include <Q…

web漏洞之文件包含漏洞

一、文件包含漏洞 1、把DVWA頁面改為low級別&#xff0c;然后點擊File Inclusion頁面 文件包含漏洞有四種include()/require()/include_once()/require_once() 常見的文件包含漏洞代碼如下 <?php$file$_GET[filename]; filename隨意定義include($file); ?> -----…

小程序與物聯網(IoT)融合:開啟智能生活新篇章

一、引言 隨著移動互聯網技術的飛速發展&#xff0c;小程序作為一種輕量級的應用形式&#xff0c;憑借其無需下載安裝、即用即走的特點&#xff0c;迅速滲透到人們生活的各個領域。與此同時&#xff0c;物聯網&#xff08;IoT&#xff09;技術也在不斷進步&#xff0c;將各種物…

Ubuntu無法創建python venv環境

排查步驟如下 1. python3 -m venv venv he virtual environment was not created successfully because ensurepip is not available. On Debian/Ubuntu systems, you need to install the python3-venv package using the following command.apt install python3.8-venvYou…

如何很快將文件轉換成另外一種編碼格式?編碼?按指定編碼格式編譯?如何檢測文件編碼格式?Java .class文件編碼和JVM運行期內存編碼?

如何很快將文件轉換成另外一種編碼格式? 利用VS Code右下角的"選擇編碼"功能&#xff0c;選擇"通過編碼保存"可以很方便將文件轉換成另外一種編碼格式。尤其&#xff0c;在測試w/ BOM或w/o BOM, 或者ANSI編碼和UTF編碼轉換&#xff0c;特別方便。VS文件另…

PCL點云庫入門——PCL庫點云特征之PFH點特征直方圖(Point Feature Histograms -PHF)

1、算法原理 PFH點&#xff08;Point Feature Histogram&#xff09;特征直方圖的原理涉及利用參數化查詢點與鄰域點之間的空間差異&#xff0c;并構建一個多維直方圖以捕捉點的k鄰域幾何屬性。這個高維超空間為特征表示提供了一個可度量的信息空間&#xff0c;對于點云對應曲面…

5. CSS引入方式

5.1 CSS的三種樣式 按照 CSS 樣式書寫的位置(或者引入的方式)&#xff0c;CSS樣式表可以分為三大類&#xff1a; 1.行內樣式表&#xff08;行內式&#xff09; 2.內部樣式表&#xff08;嵌入式&#xff09; 3. 外部樣式表&#xff08;鏈接式&#xff09; 5.2 內部樣式表 …

為什么ip屬地一會河南一會江蘇

在使用互聯網的過程中&#xff0c;許多用戶可能會遇到這樣一個問題&#xff1a;自己的IP屬地一會兒顯示為河南&#xff0c;一會兒又變成了江蘇。這種現象可能會讓人感到困惑&#xff0c;甚至產生疑慮&#xff0c;擔心自己的網絡活動是否受到了某種影響。為了解答這一疑問&#…

unity3d-搞個場景漫游如何實現Alpha

要處理兩個問題&#xff1a; 如何設置地面人不掉下去 方法一、 游戲物體加剛體&#xff0c;將游戲物體和地面加collider。如果是地形&#xff0c;可以使用 Terrain Collider&#xff1b;如果是簡單的平面&#xff0c;可以添加 Box Collider 或者 Mesh Collider&#xff08;如果…

git merge rebase

merge操作 Git自己分支合并dev分支 rebase 操作 git rebase

doris 2.1 temporay partition 測試及總結

測試步驟 創建表 drop table order_info_shuffle; CREATE TABLE order_info_shuffle ( order_id varchar(20), user_id varchar(20), goods_id

jmeter性能測試例子

目錄 一、介紹 二、操作例子 設置線程數 添加同步定時器 添加聚合報告 一、介紹 在軟件測試中&#xff0c;一般用jmeter來對接口做性能測試&#xff0c;對對接口進行一個壓力的測試。 簡述&#xff1a; 在接口的線程中設置線程的數量和時間&#xff0c;添加一個定時器…

C# 設計模式(行為型模式):解釋器模式

C# 設計模式&#xff08;行為型模式&#xff09;&#xff1a;解釋器模式 (Interpreter Pattern) 什么是解釋器模式&#xff1f; 解釋器模式&#xff08;Interpreter Pattern&#xff09;是一種行為型設計模式&#xff0c;用于定義一種語言的語法表示&#xff0c;并提供一個解釋…

ubuntu16 重啟之后lvm信息丟失故障恢復

一、背景 1、問題背景 業務有一臺物理開發服務器&#xff0c;文件系統有損壞&#xff1b;由于重啟時沒有檢查&#xff0c;導致重啟卡住。后面通過斷電重新啟動之后&#xff0c;無法進入系統&#xff1b;進入救援模式&#xff0c;注釋數據盤掛載。重啟之后進入系統&#xff0c…

React函數組件中與生命周期相關Hooks詳解

React 函數組件及其鉤子渲染流程是 React 框架中的核心概念之一。以下是對該流程的詳細解析&#xff1a; 一、React 函數組件基礎 定義&#xff1a; React 函數組件是一個接收 props 作為參數并返回 React 元素的函數。它們通常用于表示 UI 的一部分&#xff0c;并且不保留內部…

水一篇水水水

為了拿推廣卷&#xff0c;但不想把我原本完整的文章拆成零散的多篇&#xff0c;只能出此下策隨便發一篇&#xff0c;認真寫的都筆記專欄里 網絡是由若干節點和連接這些節點的鏈路構成&#xff0c;表示諸多對象及其相互聯系。 在1999年之前&#xff0c;人們一般認為網絡的結構都…