1.分區規則
1.分區規則
shuffle
1.打亂順序
2.重新組合
1.分區的規則
默認與MapReduce的規則一致,都是按照哈希值取余進行分配.
一個分區可以多個組,一個組的數據必須一個分區
2. 分組的分區導致數據傾斜怎么解決?
- 擴容 讓分區變多
- 修改分區規則
3.HashMap擴容為什么必須是2的倍數?
當不是2的倍數時, 好多的位置取不到
比如 為5 01234 123都取不到
必須保證,相關的位數全是1,所以必定2的倍數 2的n次方
所以位運算不是什么時候都能用的
2.轉換算子
1.單值轉換算子
1.filter過濾器
1.注意
過濾只是將數據進行校驗,而不是修改數據. 結果為true就保留,false就丟棄
2.代碼
JavaSparkContext sc = new JavaSparkContext("local[*]","filter");List<String> dataList = Arrays.asList("giao","giao2","zhangsan","lisi");
JavaRDD<String> rdd1 = sc.parallelize(dataList);
//JavaRDD<String> rddFilter1 = rdd1.filter(null);
JavaRDD<String> rddFilter2= rdd1.filter(s->s.substring(0,1).toLowerCase().equals("g"));
//rddFilter1.collect().forEach(System.out::println);
System.out.println("----------------------------");
rddFilter2.collect().forEach(System.out::println);
2.dinstinct
1.原理
分組
通過使用分組取重,相同的話,都是一個組了,所以Key唯一
應該是先分組,然后吧K提出來就好了
2.代碼
JavaSparkContext sc = new JavaSparkContext("local[*]","Distinct");List<String> dataList = Arrays.asList("giao1","gg1","gg1","gg2","gg2","gg1","gg3","gg1","gg5","gg3");
JavaRDD<String> rdd1 = sc.parallelize(dataList);
JavaRDD<String> rddDistinct = rdd1.distinct();
rddDistinct.collect().forEach(System.out::println);
3.排序
1.介紹
sortby方法需要傳3個參數
參數1 排序規則
參數2 升序還是降序(false) 默認升序(true)
參數3 排序的分區數量(說明方法底層是靠shuffle實現,所以才有改變分區的能力)
2.排序規則
排序規則,是按照結果去排序
其實是用結果生成一個K值,通過K值進行排序,然后展示 V值
或者說權值, 按照權值排序
將Value變成K V
3.代碼
public static void main(String[] args) {JavaSparkContext sc = new JavaSparkContext("local[*]","SparkSort");List<String> dataList = Arrays.asList("kunkun","giaogiao","GSD","JJ","chenzhen","Lixiaolong");JavaRDD<String> rdd1 = sc.parallelize(dataList);JavaRDD<String> rddSort = rdd1.sortBy(s -> {switch (s.substring(0, 1).toLowerCase()) {case "k":return 5;case "g":return 3;case "j":return 1;case "c":return 2;case "l":return 4;}return null;}, false, 3);rddSort.collect().forEach(System.out::println);}
2.鍵值對轉換算子
1.介紹
1.什么是鍵值對轉換算子
如何區分是鍵值對方法還是單值方法呢?
通過參數來判斷, 如果參數是一個值,就是單值,如果是2個,就是鍵值對
2.元組是不是鍵值對?
public static void main(String[] args) {JavaSparkContext sc = new JavaSparkContext("local[*]","KVRDD");List<Integer> dataList = Arrays.asList(1, 2, 3, 4, 5);JavaRDD<Integer> rdd1 = sc.parallelize(dataList);JavaRDD<Tuple2> rddmap = rdd1.map(num -> new Tuple2(num, num));rddmap.collect().forEach(System.out::println);
}
答案是,不是,因為這個的返回值,是一個元組,而元組整體,是一個單值,所以,是單值
只有返回值 是RDD<K1,V1 >的時候,才是鍵值對類型算子
3. 使用Pair轉換鍵值對算子
public static void main(String[] args) {JavaSparkContext sc = new JavaSparkContext("local[*]","RddPair");List<Integer> dataList = Arrays.asList(1, 2, 3, 4, 5);JavaRDD<Integer> rdd = sc.parallelize(dataList);JavaPairRDD<Integer, Integer> rddPair = rdd.mapToPair(num -> new Tuple2<>(num, num));rddPair.collect().forEach(System.out::println);}
4.直接在獲取時轉換鍵值對
這里使用的是parallelizePairs方法 獲取的是JavaPairRDD
public static void main(String[] args) {JavaSparkContext sc = new JavaSparkContext("local[*]","KVRDD");JavaPairRDD<String, Integer> rddPair = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 1),new Tuple2<>("a", 2),new Tuple2<>("b", 1),new Tuple2<>("b", 1),new Tuple2<>("c", 2),new Tuple2<>("c", 1)));rddPair.collect().forEach(System.out::println);}
5.分組來獲取鍵值對
```java
public static void main(String[] args) {JavaSparkContext sc = new JavaSparkContext("local[*]","RddPair");List<String> dataList = Arrays.asList("aa","bb","aa","bb","cc");JavaRDD<String> rdd = sc.parallelize(dataList);JavaPairRDD<Object, Iterable<String>> rddGroup = rdd.groupBy(s->s);rddGroup.collect().forEach(System.out::println);
}
2.mapValue方法
1.介紹
直接對value進行操作,不需要管K
當然,也有mapKey方法可以無視Value操作Key
2.代碼演示
public static void main(String[] args) {JavaSparkContext sc = new JavaSparkContext("local[*]","KVRDD");JavaPairRDD<String, Integer> rddPair = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 1),new Tuple2<>("a", 2),new Tuple2<>("b", 1),new Tuple2<>("b", 1),new Tuple2<>("c", 2),new Tuple2<>("c", 1)));JavaPairRDD<String, Integer> mapV = rddPair.mapValues(num -> num * 2);mapV.collect().forEach(System.out::println);}
3.WordCount實現
iter.spliterator().estimateSize());
spliterator
Spliterator(Split Iterator)是Java 8引入的一個新接口,用于支持并行遍歷和操作數據。它是Iterator的擴展,可以用于在并行流(Parallel Stream)中對數據進行劃分和遍歷,從而實現更高效的并行處理
spliterator()方法是在Iterable接口中定義的一個默認方法,用于生成一個Spliterator對象,以支持數據的并行遍歷。它的具體作用是將Iterable中的數據轉換為一個可以在并行流中使用的Spliterator對象。
estimateSize
estimateSize()方法是Java中Spliterator接口的一個方法,用于估算Spliterator所包含的元素數量的大小。Spliterator是用于支持并行遍歷和操作數據的接口,而estimateSize()方法提供了一個估計值,用于在處理數據時預測Spliterator包含的元素數量。
public static void main(String[] args) {JavaSparkContext sc = new JavaSparkContext("local[*]","RddPair");List<String> dataList = Arrays.asList("aa","bb","aa","bb","cc");JavaRDD<String> rdd = sc.parallelize(dataList);JavaPairRDD<Object, Iterable<String>> rddGroup = rdd.groupBy(s->s);JavaPairRDD<Object, Long> wordCount = rddGroup.mapValues(iter -> iter.spliterator().estimateSize());wordCount.collect().forEach(System.out::println);
}
3.groupby 與groupByKey
1 .代碼
public static void main(String[] args) {JavaSparkContext sc = new JavaSparkContext("local[*]","G1");JavaPairRDD<String, Integer> rddPair;rddPair = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 1),new Tuple2<>("a", 2),new Tuple2<>("b", 1),new Tuple2<>("b", 1),new Tuple2<>("c", 2),new Tuple2<>("c", 1)));JavaPairRDD<String, Iterable<Integer>> rddGroupByKey = rddPair.groupByKey();JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> rddGroupBy = rddPair.groupBy(t -> t._1);rddGroupByKey.collect().forEach(System.out::println);}
2.分析區別
- 1.參數
GroupBy是自選規則 而GroupByKey是將PairRDD的Key當做分組規則 - 2.結果
GroupBy是將作為單值去分組,即使RDD是Pair, 而GroupByKey 則是將K V分開 ,將V作為組成員
3.注意
GroupByKey是不能進行隨意使用的,底層用的含有shuffle,如果計算平均值,就不能通過GroupByKey直接進行計算.
4.reduce與reduceByKey
1.介紹
多個變量進行同樣的運算規則
Stream是1.8新特性,
計算的本質 兩兩結合
reduce
2. 代碼
public static void main(String[] args) {JavaSparkContext sc = new JavaSparkContext("local[*]","Reduce");JavaPairRDD<String, Integer> rddPair;rddPair = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 1),new Tuple2<>("a", 2),new Tuple2<>("b", 1),new Tuple2<>("b", 1),new Tuple2<>("c", 2),new Tuple2<>("c", 1)));rddPair.reduceByKey(Integer::sum).collect().forEach(System.out::println);}
3.理解
相同Key值的V進行運算,所以底層是有分組的,所以底層是一定有Shuffle,一定有改變分區的能力,改變分區數量和分區規則.
4.與groupByKey區別
reduceByKey
將相同key的數量中1的V進行兩兩聚合
reduceByKey 相同的key兩兩聚合,在shuffle落盤之前對分區內數據進行聚合,這樣會減少落盤數據量,并不會影響最終結果(預聚合) 這就是combine
有錢先整IBM小型機
Shuffle優化
1.花錢
2.調大緩沖區(溢出次數減少)
3.
sortByKey
想比較必須實現可比較的接口
默認排序規則為升序,
通過K對鍵值對進行排序
行動算子
通過調用RDD方法讓Spark的功能行動起來
map 是在new
轉換算子 得到的是RDD
注意 轉換跑不起來 行動能跑起來 這句話是錯誤的
當使用sort時,也是能跑起來的,但是還是轉換算子
第一行運行占用內存,第一個for 運算需要內存,但是第一行占用了大量內存,所以第一行浪費了,這就需要懶加載,所以第一行的執行時機是在第二個for運行前使用的.
注意map collect 不是懶加載,只是沒人調用他的job(RDD算子內部的代碼)
RDD算子外部的代碼都是在Driver端