SparkStreaming調優
一 、要點
4.1 SparkStreaming運行原理
深入理解
4.2 調優策略
4.2.1 調整BlockReceiver的數量
案例演示:
object MultiReceiverNetworkWordCount {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("NetworkWordCount")val sc = new SparkContext(sparkConf)// Create the context with a 1 second batch sizeval ssc = new StreamingContext(sc, Seconds(5))//創建多個接收器(ReceiverInputDStream),這個接收器接收一臺機器上的某個端口通過socket發送過來的數據并處理val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)val lines = lines1.union(lines2)lines.repartition(100)//處理的邏輯,就是簡單的進行word countval words = lines.repartition(100).flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))//將結果輸出到控制臺wordCounts.print()//啟動Streaming處理流ssc.start()//等待Streaming程序終止ssc.awaitTermination()ssc.stop(false)}
}
??4.2.2 調整Block的數量
batchInterval : 觸發批處理的時間間隔
blockInterval :將接收到的數據生成Block的時間間隔,spark.streaming.blockInterval(默認是200ms),那么,BlockRDD的分區數 = batchInterval / blockInterval,即一個Block就是RDD的一個分區,就是一個task
比如,batchInterval是2秒,而blockInterval是200ms,那么task數為10,如果task的數量太少,比一個executor的core數還少的話,那么可以減少blockInterval,blockInterval最好不要小于50ms,太小的話導致task數太多,那么launch task的時間久多了
4.2.3 調整Receiver的接受速率
pps:permits per second 每秒允許接受的數據量(QPS -> queries per second)
Spark Streaming默認的PPS是沒有限制的,可以通過參數spark.streaming.receiver.maxRate來控制,默認是Long.Maxvalue
??4.2.3 調整數據處理的并行度
BlockRDD的分區數
a. 通過Receiver接受數據的特點決定
b. 也可以自己通過repartition設置
ShuffleRDD的分區數
a. 默認的分區數為spark.default.parallelism(core的大小)
b. 通過我們自己設置決定
val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))
4.2.4 數據的序列化
SparkStreaming兩種需要序列化的數據:
a. 輸入的數據:默認是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存儲在executor上的內存中
b. 緩存的數據:默認是以StorageLevel.MEMORY_ONLY_SER的形式存儲的內存中
使用Kryo序列化機制,比Java序列化機制性能好
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
4.2.5 內存調優
(1)需要內存大小
和transformation的類型有關,如果使用的是updateStateByKey,Window這樣的算子,那么內存就要設置得偏大
(2)數據存儲級別
如果把接收到的數據設置的存儲級別是MEMORY_DISK這種級別,也就是說如果內存不夠可以把數據存儲到磁盤上,其實性能還是不好的,性能最好的就是所有的數據都在內存里面,所以如果在資源允許的情況下,把內存調大一點,讓所有的數據都存在內存里面。
4.2.6 Outout性能
(1)MySQL,HBase
(2)Kafka(0.8版本)
雖然現在的Kafka的版本已經到2.x版本了,但是很多公司因為歷史遺留的原因,公司里面還是會有0.8x的Kafka。比如本人公司里面有兩個Kafka集群,一個是0.8x的kafka,一個是1.x的Kafka。開發的時候有時候需要我們使用SparkStreaming做實時的ETL,然后再把數據打回Kafka,0.8版本的kafka默認是沒有批量提交的功能的。本人公司里面一個真實的案例,一位同學寫的SparkStreaming程序將數據處理完了以后通過ForeachRDD把數據寫回到0.8Kafka。但是數據處理得很慢,經常會收到延時告警。最終發現他把數據寫到Kafka的時候是一條數據一條數據提交的性能很差。最終手動實現了批量提交的功能。從此再也沒有收到過告警。
4.2.7 Backpressure(壓力反饋)
Feedback Loop : 動態使得Streaming app從unstable狀態回到stable狀態
從Spark1.5版本開始:spark.streaming.backpressure.enabled = true
4.2.8 Elastic Scaling(資源動態分配)
動態分配資源:
批處理動態的決定這個application中需要多少個Executors:
- 當一個Executor空閑的時候,將這個Executor殺掉
- 當task太多的時候,動態的啟動Executors
Streaming分配Executor的原則是比對 process time / batchInterval 的比率
如果延遲了,那么就自動增加資源
從Spark2.0有這個功能:: spark.streaming.dynamicAllocation.enabled = true
??4.2.8 數據傾斜調優(重要)
因為SparkStreaming的底層就是RDD,之前SparkCore的所有的數據傾斜的調優策略(見Spark之數據傾斜調優)都適合于SparkStreaming,需要靈活掌握,在實際開發的工作當中用得頻率較高。
二 、總結
面試問題:你在工作當中有SparkStreaming調優過項目嗎?怎么調優的?效果怎么樣?
- 比如舉foreachRDD的例子
- 比如舉個數據傾斜的例子
- 用Xmind整理調優的策略