1 為什么使用廣播變量 和 累加器
變量存在的問題:在spark程序中,當一個傳遞給Spark操作(例如map和reduce)的函數在遠程節點上面運行時,Spark操作實際上操作的是這個函數所用變量的一個獨立副本。這些變量會被復制到每臺機器上,并且這些變量在遠程機器上的所有更新都不會傳遞回驅動程序,通常跨任務的讀寫變量是低效的。
廣播變量的目的就是解決變量存在的問題,變量聲明為廣播變量,那么知識每個executor擁有一份,這個executor啟動的task會共享這個變量,節省了通信的成本和服務器的資源。
總的來說:累加器是用來對信息進行聚合,廣播變量是用來分發較大的只讀對象。
?
2 如何定義? 和? 還原? 廣播變量
int a = 3; Broadcast<Integer> broadcast = sc.broadcast(a); //定義廣播變量
int c = broadcast.value; //還原廣播變量
?
3 廣播變量注意事項
(1)變量一旦被定義為一個廣播變量,那么這個變量只能讀,不能修改
(2)能不能將一個RDD使用廣播變量廣播出去?
?????? 不能,因為RDD是不存儲數據的。可以將RDD的結果廣播出去。
(3) 廣播變量只能在Driver端定義,不能在Executor端定義。
(4) 在Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值。
(5)如果executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。
(6)如果Executor端用到了Driver的變量,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本。
?
4 廣播變量的優化
當廣播一個比較大的值時,選擇既快又好的序列化格式是很重要的。因為如果序列化對象的時間很長或者傳送時間太久,這段時間很容易出現性能瓶頸。
默認情況下,spark會使用java內建的序列化庫。建議選擇kryo序列化工具,使用方法設置spark.serializer為org.apache.spark.serializer.KryoSerializer;
最好強制要求這種注冊,設置spark.kryo.registrationRequired為true;
SparkConf conf = new SparkConf();conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");conf.set("spark.kryo.registrationRequired","true");conf.registerKryoClasses(Array(classOf[myClass]),classOf(MyOtherClass));
這樣還會有其他的問題,如果代碼中引用的類沒有序列化,會報異常,最簡單的方式是實現序列化接口。
?
5 累加器和定義和還原
累加器只是一個只寫變量
LongAccumulator accumulator = new LongAccumulator();accumulator.add(1);long count = accumulator.count();
?
?
參考文獻:扎心了,老鐵