一、介紹
1. 不同的數據處理
- 從數據處理的方式:
- 流式數據處理(Streaming)
- 批量數據處理(Batch)
- 從數據處理的延遲:
- 實時數據處理(毫秒級別)
- 離線數據處理(小時或天級別)
2. 簡介
- SparkStreaming 是一個準實時(秒或分鐘級別)、微批量的數據處理框架
- SparkStreaming 支持的很多數據輸入源,如: Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等。數據輸入后可以用 Spark 的高度抽象原語,如: map、 reduce、 join、 window 等進行運算。結果能保存在很多地方,如 HDFS,數據庫等
- SparkStreaming 使用離散化流 (discretized stream) 作為抽象表示,稱為 DStream,它是對 RDD 在實時數據處理場景的一種封裝
3. 特點
- 易用
- 容錯
- 易整合到 Spark 體系
二、基本架構
1. 背壓機制
- Spark 1.5 以前版本:通過設置靜態配制參數
spark.streaming.receiver.maxRate
來限制 Receiver 的數據接收速率,來解決生產和消費速率不對等造成的內存溢出等問題,但當數據生產和數據消費的能力都高于 maxRate 時會造成資源利用率下降等問題 - Spark 1.5 版本及以后版本:為了動態控制數據接收速率來適配集群數據處理能力,引入了背壓機制 (Spark Streaming Backpressure),即根據 JobScheduler 反饋作業的執行信息來動態調整 Receiver 數據接收率
- 通過屬性
spark.streaming.backpressure.enabled
來配置啟用 backpressure 機制,默認值為 false,即不啟用
三、入門 WordCount 案例
需求:使用 netcat 工具向 9999 端口不斷的發送數據,通過 SparkStreaming 讀取端口數據并統計不同單詞出現的次數
1. 引入依賴
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version>
</dependency>
2. 代碼實現
object SparkStreamingWC {def main(args: Array[String]): Unit = {// 1.創建 SparkStreaming 環境對象val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")/*創建 StreamingContext 對象需要傳遞兩個參數1.SparkConf:配置對象2.Duration:批處理的周期,即數據采集周期,單位為毫秒,內置有 Seconds/Minute 等對象 */val ssc = new StreamingContext(conf, Seconds(3))// 2.邏輯處理val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)val words = line.flatMap(_.split(" "))val wordAsOne = words.map((_, 1))val wordCount: DStream[(String, Int)] = wordAsOne.reduceByKey(_ + _)wordCount.print()// 3.運行采集器并等待關閉/*采集器是一個長期運行的任務,所以不能關閉 ssc,也不能讓 main 方法執行完畢*/ssc.start()ssc.awaitTermination()}
}
3. 測試
- 打開 cmd 命令窗口,執行
nc -lp 9999
命令(Linux 下為nc -lk 999
) - 運行程序 main 方法
- 在窗口中輸入測試字符串(以空格分隔),觀察程序命令行輸出結果