目錄
1.廣播變量(broadcast variables)
2.累加器(accumulators)
? ? ? 在分布式計算中,當在集群的多個節點上并行運行函數時,默認情況下,每個任務都會獲得函數中使用到的變量的一個副本。如果變量很大,這會導致網絡傳輸占用大量帶寬,并且在每個節點上都占用大量內存空間。為了解決這個問題,Spark引入了共享變量的概念。
????????共享變量允許在多個任務之間共享數據,而不是為每個任務分別復制一份變量。這樣可以顯著降低網絡傳輸的開銷和內存占用。Spark提供了兩種類型的共享變量:廣播變量(broadcast variables)和累加器(accumulators)。
1.廣播變量(broadcast variables)
????????通常情況下,Spark程序運行時,通常會將數據以副本的形式分發到每個執行器(Executor)的任務(Task)中,但當變量較大時,這會導致大量的內存和網絡開銷。通過使用廣播變量,Spark將變量只發送一次到每個節點,并在多個任務之間共享這個副本,從而顯著降低了內存占用和網絡傳輸的開銷。
Scala 實現:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Java 實現:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});broadcastVar.value();
// returns [1, 2, 3]
2.累加器(accumulators)
????????累加器是Spark中的一種特殊類型的共享變量,主要用來把Executor端變量信息聚合到Driver端。在Driver程序中定義的變量,在Executor端的每個task都會得到這個變量的一份新的副本,每個task更新這些副本的值后,傳回Driver端進行merge。累加器支持的數據類型僅限于數值類型,包括整數和浮點數等。
Scala 實現:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala> accum.value
res2: Long = 10
Java 實現:
LongAccumulator accum = jsc.sc().longAccumulator();sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 saccum.value();
// returns 10
? ? ? ? 內置累加器功能有限,但可以通過繼承AccumulatorV2來創建自己的類型。AccumulatorV2抽象類有幾個方法必須重寫:reset用于將累加器重置為零,add用于向累加器中添加另一個值,merge用于將另一個相同類型的累加器合并到此累加器。
自定義累加器Scala實現:
package com.yichenkeji.demo.sparkscalaimport org.apache.spark.util.AccumulatorV2class CustomAccumulator extends AccumulatorV2[Int, Int]{//初始化累加器的值private var sum = 0override def isZero: Boolean = sum == 0override def copy(): AccumulatorV2[Int, Int] = {val newAcc = new CustomAccumulator()newAcc.sum = sumnewAcc}override def reset(): Unit = sum = 0override def add(v: Int): Unit = sum += voverride def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.valueoverride def value: Int = sum
}
自定義累加器Java實現:
package com.yichenkeji.demo.sparkjava;import org.apache.spark.util.AccumulatorV2;public class CustomAccumulator extends AccumulatorV2<Integer, Integer> {// 初始化累加器的值private Integer sum = 0;@Overridepublic boolean isZero() {return sum == 0;}@Overridepublic AccumulatorV2<Integer, Integer> copy() {CustomAccumulator customAccumulator = new CustomAccumulator();customAccumulator.sum = this.sum;return customAccumulator;}@Overridepublic void reset() {this.sum = 0;}@Overridepublic void add(Integer v) {this.sum += v;}@Overridepublic void merge(AccumulatorV2<Integer, Integer> other) {this.sum += ((CustomAccumulator) other).sum;}@Overridepublic Integer value() {return sum;}
}
自定義累加器的使用:
package com.yichenkeji.demo.sparkjava;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;
import java.util.List;public class AccumulatorTest {public static void main(String[] args) {//1.初始化SparkContext對象SparkConf sparkConf = new SparkConf().setAppName("Spark Java").setMaster("local[*]");JavaSparkContext sc = new JavaSparkContext(sparkConf);CustomAccumulator customAccumulator = new CustomAccumulator();//注冊自定義累加器才能使用sc.sc().register(customAccumulator);sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).foreach(x -> customAccumulator.add(x));System.out.println(customAccumulator.value());//5.停止SparkContextsc.stop();}
}