一、上下文
《Structured-Streaming初識》博客中已經初步認識了Structured-Streaming,Kafka作為目前最流行的一個分布式的實時流消息系統,是眾多實時流處理框架的最優數據源之一。下面我們就跟著官方例子來看看Structured-Streaming是如何集成Kafka的?
二、官方例子
這里我們先把官方例子貼出來,所屬包路徑為:org.apache.spark.examples.sql.streaming
該示例使用Kafka中一個或多個Topic的消息并進行字數統計。
object StructuredKafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 3) {System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +"<subscribe-type> <topics> [<checkpoint-location>]")System.exit(1)}val Array(bootstrapServers, subscribeType, topics, _*) = argsval checkpointLocation =if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toStringval spark = SparkSession.builder.appName("StructuredKafkaWordCount").getOrCreate()import spark.implicits._// 創建表示來自kafka的輸入行流的DataSetval lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option(subscribeType, topics).load().selectExpr("CAST(value AS STRING)").as[String]// 運行 word countval wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()// 開始運行將運行計數打印到控制臺的查詢val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()query.awaitTermination()}}
三、分析
1、參數解釋
運行該官方示例需要3或4個參數,分別是
- Kafka的bootstrap-servers
- 訂閱Kafka?TopicPartition 的類型
- 訂閱Kafka的Topic
- checkpointLocation(不是必須的)
bootstrap-servers用于連接Kafka集群。
訂閱類型有3種,且只能選擇1種:
- assign:手動指定分區消費,需要自己管理分區的分配和再平衡。需要指定一個Json字符串,例如:{"topicA":[0,1],"topicB":[2,4]}
- subscribe:訂閱一個或多個topic進行消費(逗號分割),Kafka會自動處理分區的分配和再平衡。
- subscribePattern:基于正則的topic訂閱方式,但可能增加一些復雜性和性能開銷。
Topic的指定根據訂閱類型的變化而變化。
checkpointLocation如果不指定默認會在/tmp下存放。
2、將從Kafka訂閱的數據做成一個DataSet
1、構建DataStreamReader
用于從外部存儲系統(如文件系統、鍵值存儲等)加載流式“數據集”的接口。使用`SparkSession.readStream`訪問此內容。
2、指定輸入源格式
默認的輸入源格式是parquet,這里指定的是 kafka,輸入源格式是DataStreamReader中的一個屬性。
private var source: String = sparkSession.sessionState.conf.defaultDataSourceName
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default").doc("The default data source to use in input/output.").version("1.3.0").stringConf.createWithDefault("parquet")
3、用輸入的3個參數對DataStreamReader添加選項
DataStreamReader中維護了一個Map來接收這些選項,比如:
kafka.bootstrap.servers -> cdh1:9092
assign ->?{"topicA":[0,1],"topicB":[2,4]}
subscribe ->?topicA,topicB
subscribePattern -> topicP*
private var extraOptions = CaseInsensitiveMap[String](Map.empty)
4、加載輸入流數據為DataFrame
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {def load(): DataFrame = loadInternal(None)private def loadInternal(path: Option[String]): DataFrame = {//*******//根據輸入源格式獲取相應的輸入源提供者//這里的 source 為 kafka ,因此會返回KafkaSourceProvider//它是 所有Kafka readers 和 writers 的提供者類//此外還有ConsoleSinkProvider、JdbcRelationProvider、TextSocketSourceProvider等等val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).getConstructor().newInstance()// 我們需要生成V1數據源,以便將其作為勻場傳遞給V2關系。目前我們無法確定是否真的要使用V2,因為我們不知道編寫者,也不知道查詢是否是連續的。val v1DataSource = DataSource(sparkSession,userSpecifiedSchema = userSpecifiedSchema,className = source,options = optionsWithPath.originalMap)val v1Relation = ds match {case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))case _ => None}ds match {//******Dataset.ofRows(sparkSession,StreamingRelationV2( //用于將[[表]]鏈接到流式[[LogicalPlan]]。Some(provider), source, table, dsOptions,table.schema.toAttributes, None, None, v1Relation))//******}}}
并將表中的數據設置成STRING
3、WordCount統計
在第2步的基礎上進行數據處理:
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
4、開始運行并將結果打印到控制臺
val query = wordCounts.writeStream.outputMode("complete").format("console").option("checkpointLocation", checkpointLocation).start()
writeStream是用于將流式數據集的內容保存到外部存儲的接口。將返回一個DataStreamWriter
outputMode是指定如何將流式DataFrame/Dataset的數據寫入流式接收器。
- append:只有流式DataFrame/Dataset中的新行才會寫入接收器
- complete:每次有更新時,流式DataFrame/Dataset中的所有行都將寫入接收器
- update:每次有更新時,只有流式DataFrame/Dataset中更新的行才會寫入接收器。如果查詢不包含聚合,則相當于“append”模式
format是指定外部存儲,這里的取值有6種:memory、foreach、foreachBatch、console、table、noop。
四、運行
1、創建Topic
kafka-topics --create --topic structured-streaming-wc --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
2、啟動程序
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example sql.streaming.StructuredKafkaWordCount cdh1:9092,cdh2:9092 subscribe structured-streaming-wc
3、向topic推送數據
kafka-console-producer --topic structured-streaming-wc --broker-list cdh1:9092,cdh2:9092,cdh3:9092
4、控制臺查看結果
?他和sparksql一樣默認的分區為200個,如果數據量很小,速度非常慢。需要根據數據量來設置自己的分區數。
大多數高校碩博生畢業要求需要參加學術會議,發表EI或者SCI檢索的學術論文會議論文:
可訪問艾思科藍官網,瀏覽即將召開的學術會議列表。會議如下:
第四屆大數據、信息與計算機網絡國際學術會議(BDICN 2025)
- 廣州
- https://ais.cn/u/fi2yym
第四屆電子信息工程、大數據與計算機技術國際學術會議(EIBDCT 2025)
- 青島
- https://ais.cn/u/nuQr6f
第六屆大數據與信息化教育國際學術會議(ICBDIE 2025)
- 蘇州
- https://ais.cn/u/eYnmQr
第三屆通信網絡與機器學習國際學術會議(CNML 2025)
- 南京
- https://ais.cn/u/vUNva2