1.reduce?
功能?:聚集RDD中的所有元素,先聚合分區內數據,再聚合分區間數據。
示例?:rdd.reduce(_+_)?將RDD中的所有整數相加。
2.collect?
功能?:在驅動程序中,以數組Array的形式返回數據集的所有元素。
示例?:rdd.collect()?返回RDD中所有元素的數組。
3.foreach?
功能?:分布式遍歷RDD中的每一個元素,調用指定函數。
示例?:rdd.collect().foreach(println)?先收集RDD元素,然后逐個打印。
4.count?
功能?:返回RDD中元素的個數。
示例?:rdd.count()?返回RDD中的元素數量。
5.first?
功能?:返回RDD中的第一個元素。
示例?:rdd.first()?返回RDD中的第一個元素。
6.take?
功能?:返回一個由RDD的前n個元素組成的數組。
示例?:rdd.take(2)?返回RDD中的前兩個元素組成的數組。
7.takeOrdered?
功能?:返回RDD排序后的前n個元素組成的數組。
示例?:rdd.takeOrdered(2)?返回RDD中排序后的前兩個元素。
8.aggregate?
功能?:分區的數據通過初始值和分區內的數據進行聚合,然后再和初始值進行分區間的數據聚合。
示例?:rdd.aggregate(0)(_+_, _+_)?將RDD的所有元素相加。
9.fold?
功能?:折疊操作,aggregate的簡化版操作。
示例?:rdd.fold(0)(_+_)?將RDD的所有元素相加,與aggregate的示例類似但更簡潔。
10.countByKey?
功能?:統計每種key的個數,適用于RDD[(K, V)]類型。
示例?:rdd.countByKey()?統計每種key出現的次數。
11.save相關算子?
功能?:將數據保存到不同格式的文件中,包括文本文件、對象文件和序列文件。
示例?:
rdd.saveAsTextFile("path")?保存為文本文件。rdd.saveAsObjectFile("path")?保存為對象文件。
rdd.saveAsSequenceFile("path")?保存為序列文件(了解即可)。
12.累加器(Accumulator)
?實現原理?:
累加器用于把Executor端的變量信息聚合到Driver端。在Driver程序中定義的變量,在Executor端的每個Task都會得到這個變量的一份新的副本。每個Task更新這些副本的值后,傳回Driver端進行merge操作。
?常用方法?:
sparkContext.longAccumulator(name: String): 創建一個長整型累加器。
sparkContext.doubleAccumulator(name: String): 創建一個雙精度浮點型累加器。
自定義累加器:通過繼承AccumulatorV2類,實現自定義的累加邏輯。
13.廣播變量(Broadcast Variable)
?實現原理?:
廣播變量用于高效分發較大的只讀對象。它向所有工作節點發送一個較大的只讀值,以供一個或多個Spark操作使用。Spark會為每個任務分別發送該變量,但在多個并行操作中可以共享同一個廣播變量,從而提高效率。
?常用方法?:
sparkContext.broadcastT](value: T)
: 創建一個廣播變量。
總結?:
累加器適用于在分布式計算過程中聚合數據,如統計和、最大值、最小值等。
廣播變量適用于在多個任務之間共享大對象,以減少數據傳輸開銷,提高計算效率。