一、上下文
《Spark-Streaming初識》中的NetworkWordCount示例只能統計每個微批下的單詞的數量,那么如何才能統計從開始加載數據到當下的所有數量呢?下面我們就來通過官方例子學習下Spark-Streaming有狀態計算。
二、官方例子
所屬包:org.apache.spark.examples.streaming
object StatefulNetworkWordCount {def main(args: Array[String]): Unit = {if (args.length < 2) {System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")System.exit(1)}StreamingExamples.setStreamingLogLevels()val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")//創建微批為 1 秒的上下文val ssc = new StreamingContext(sparkConf, Seconds(1))//指定 checkpoint 目錄ssc.checkpoint(".")// 用一個 List 初始化一個 RDDval initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))// 在目標ip:port上創建一個ReceiverInputDStream,并對分隔測試的輸入流中的單詞進行計數(例如由'nc'生成)val lines = ssc.socketTextStream(args(0), args(1).toInt)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))// 使用mapWithState更新累積計數這將給出一個由狀態組成的DStream(即單詞的累積計數)val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {val sum = one.getOrElse(0) + state.getOption.getOrElse(0)val output = (word, sum)state.update(sum)output}val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))stateDstream.print()ssc.start()ssc.awaitTermination()}
}
三、分析
1、構建SparkConf
它是Spark應用程序的配置,用于設置Spark的各種參數。支持鏈式設置
new SparkConf().setMaster("local").setAppName("My app")
?一旦SparkConf對象傳遞給Spark,用戶就不能再對其進行修改。Spark不支持在運行時修改配置
2、構建StreamingContext
它是Spark Streaming功能的主要入口點,且提供了從各種輸入源創建[[org.apache.spark.streaming.dstream.DStream]] 的方法。
創建和轉換DStreams后,可以分別使用start()、stop()啟動和停止流計算,awaitTermination()允許當前線程通過stop()或異常等待上下文的終止。
3、設置checkpoint
StreamingContext最終還是通過SparkContext來設置checkpoint,但其實都是為各自的checkpointDir設置checkpoint路徑,在有狀態計算中checkpoint是必須的。
所謂有狀態計算就必須要把歷史狀態給存儲下來,spark中使用使用checkpoint來實現這個存儲,每個微批的數據的計算都要更新到歷史狀態中。
class SparkContext(config: SparkConf) extends Logging {private[spark] var checkpointDir: Option[String] = None}
class StreamingContext private[streaming] (_sc: SparkContext,_cp: Checkpoint,_batchDur: Duration) extends Logging {private[streaming] var checkpointDir: String = {if (isCheckpointPresent) {sc.setCheckpointDir(_cp.checkpointDir)_cp.checkpointDir} else {null}}}
4、初始化一個RDD
為什么要初始化一個RDD呢?我們看看下面是如何用到的。
5、創建一個ReceiverInputDStream
這里是從TCP源hostname:port創建輸入流。使用TCP套接字接收數據,并使用給定的轉換器將接收字節解釋為對象
6、處理單詞
從源碼中可以看出會把這樣的文本
hadoop spark flink kafka hadoop spark-streaming
處理成這樣的格式
hadoop 1
spark 1
flink 1
kafka 1
hadoop 1
spark-streaming 1
6、使用mapWithState更新累積計數
該算子可以維護并更新每個key的狀態。
這里用到一個新對象:StateSpec,且用到了它的兩個方法,initialState和function
initialState:設置包含“mapWithState”將使用的初始狀態的RDD`
function:設置實際的狀態更新操作
//第1個參數:狀態 key 的類別
//第2個參數:狀態 value 的類別
//第3個參數:狀態 數據 的類別
//第4個參數:狀態 處理完要返回 的類別
def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {// 使用state.exists()、state.get()、state.update()和state.remove()來管理狀態,并返回必要的字符串
}
四、運行
運行Netcat
nc -lk 9999
新建一個窗口運行官方例子
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount cdh1 9999
大多數高校碩博生畢業要求需要參加學術會議,發表EI或者SCI檢索的學術論文會議論文:
可訪問艾思科藍官網,瀏覽即將召開的學術會議列表。會議如下:
第四屆大數據、信息與計算機網絡國際學術會議(BDICN 2025)
- 廣州
- https://ais.cn/u/fi2yym
第四屆電子信息工程、大數據與計算機技術國際學術會議(EIBDCT 2025)
- 青島
- https://ais.cn/u/nuQr6f
第六屆大數據與信息化教育國際學術會議(ICBDIE 2025)
- 蘇州
- https://ais.cn/u/eYnmQr
第三屆通信網絡與機器學習國際學術會議(CNML 2025)
- 南京
- https://ais.cn/u/vUNva2