目錄
KV算子
parallelizePairs
mapToPair
mapValues
groupByKey
reduceByKey
sortByKey
算子應用理解
reduceByKey和groupByKey的區別
groupByKey+mapValues實現KV數據的V的操作
改進用reduceByKey
groupby通過K和通過V分組的模板代碼
問題集錦
寶貴的經驗
這里會講到之前還未講到過的KV算子。我們之前的操作都是單值操作,這一篇我們會著重講到KV操作、行動算子和持久化等知識。
KV算子
作用:操作KV流數據,能夠分別操作K和V
出現JavaPairRDD就表示出現了成對KV數據流
parallelizePairs
作用:封裝Tuple2集合形成RDD
細節源碼如下
?mapToPair
作用:配合parallelizePairs方法將
1.單值數據轉化成KV對數據
2.Tuple元組整體轉化成KV鍵值對形式
?
兩者一起的代碼JavaPairRDD<String, Integer> JRD = sc.parallelizePairs(Arrays.asList(a, a1, a2, a3));JRD.mapToPair( tuple -> new Tuple2<>(tuple._1, tuple._2*2)).collect().forEach(System.out::println);
mapValues
作用:K不變,操作KV流中的V,并且只要類型是JavaPairRDD就可以用此方法
????????
示意圖
????????
代碼實現List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);sc.parallelize(list).mapToPair(integer -> new Tuple2<Integer,Integer>(integer, integer * 2)).mapValues(int1 -> int1 * 2).collect().forEach(System.out::println);
這里配合一個wordcount案例加深一下理解
思考鏈條:
讀取文件textFile --> flatmap扁平化流數據(String[] -> String)->groupby分組 ->mapValues按照V來計算
代碼//TODO 寫一個wordcountJavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));JavaRDD<String> rdd = javaSparkContext.textFile("E:\\ideaProjects\\spark_project\\data\\wordcount");rdd.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")).iterator();}}).groupBy(n -> n).mapValues(iter -> {int sum = 0;for (String word : iter) {sum++;}return sum;}).collect().forEach(System.out::println);javaSparkContext.close();
?所以,整個轉換過程是:
? ?- 輸入:一行字符串(`String`)
? ?- 用`split`方法:將該行字符串分割成字符串數組(`String[]`)
? ?- 用`Arrays.asList`:將字符串數組轉換為字符串列表(`List<String>`)
? ?- 調用列表的`iterator`方法:得到字符串的迭代器(`Iterator<String>`)
? ?- 在`flatMap`中,Spark會遍歷這個迭代器,將每個字符串(單詞)作為新元素放入結果RDD。flatmap本質:都是將數組轉換成一個可以逐個訪問其元素的迭代器
groupByKey
作用:將KV對按照K對V進行分組
代碼實現
?JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 2);Tuple2<String, Integer> c = new Tuple2<>("a", 3);Tuple2<String, Integer> d = new Tuple2<>("b", 4);javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).collect().forEach(System.out::println);System.out.println();javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).groupByKey(3).collect().forEach(System.out::println);
reduceByKey
作用:在KV對中,按照K對V進行聚合操作,(底層會在分區內進行預聚合優化)
代碼實現
對二元組進行按照K對V相加的聚合操作????????
javaSparkContext.parallelizePairs(tuple2s).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}}).collect().forEach(System.out::println);
sortByKey
????????
作用:按照K進行XXX的升序/降序排列
代碼實現JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> aa0 = new Tuple2<>("a", 4);Tuple2<String, Integer> aa1 = new Tuple2<>("a", 1);Tuple2<String, Integer> aa2 = new Tuple2<>("a", 2);Tuple2<String, Integer> bb1 = new Tuple2<>("b", 2);Tuple2<String, Integer> aa3 = new Tuple2<>("a", 3);Tuple2<String, Integer> bb2 = new Tuple2<>("b", 1);ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>(Arrays.asList(aa0,aa1, aa2, aa3, bb1, bb2));javaSparkContext.parallelizePairs(tuple2s).sortByKey().collect().forEach(System.out::println);javaSparkContext.close();
傳入參數為false時
Comparable接口的使用
利用自定義類型進行排序操作JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Artist at1 = new Artist("小王", 100);Artist at2 = new Artist("小李", 1000);Artist at3 = new Artist("小趙", 10000);Artist at4 = new Artist("小宇", 100000);Tuple2<Artist, Integer> artistIntegerTuple2 = new Tuple2<>(at1, 1);Tuple2<Artist, Integer> artistIntegerTuple3 = new Tuple2<>(at2, 2);Tuple2<Artist, Integer> artistIntegerTuple4 = new Tuple2<>(at3, 3);Tuple2<Artist, Integer> artistIntegerTuple5 = new Tuple2<>(at4, 4);JavaPairRDD<Artist, Integer> artistIntegerJavaPairRDD = javaSparkContext.parallelize(Arrays.asList(artistIntegerTuple2, artistIntegerTuple3, artistIntegerTuple4, artistIntegerTuple5)).mapToPair(t -> t);artistIntegerJavaPairRDD.sortByKey().collect().forEach(System.out::println);javaSparkContext.close();class Artist implements Serializable, Comparable<Artist> {String name;int salary;public Artist(String name, int salary) {this.name = name;this.salary = salary;}@Overridepublic int compareTo(Artist o) {return o.salary - this.salary;}@Overridepublic String toString() {return "Artist{" +"name='" + name + '\'' +", salary=" + salary +'}';} }
coalesce
????????
作用:縮減分區,不會自動進行shuffle
示意圖
代碼實現List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);javaSparkContext.parallelize(tuple2s).coalesce(2).collect().forEach(System.out::println);javaSparkContext.close();
repartition????????
作用:調整分區數,等價于coalesce的shuffle=true時
示意圖
代碼實現
?List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);javaSparkContext.parallelize(tuple2s).repartition(2).saveAsTextFile("out2");javaSparkContext.close();
算子應用理解
reduceByKey和groupByKey的區別
?性能更高:在shuffle之前有一個預聚合的功能Combine,可以將分區中的小文件合并,減少shuffle落盤的數據量
因此在實際開發中
groupByKey+mapValues實現KV數據的V的操作
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 2);Tuple2<String, Integer> c = new Tuple2<>("a", 3);Tuple2<String, Integer> d = new Tuple2<>("b", 4);System.out.println();javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).groupByKey(3).mapValues(new Function<Iterable<Integer>, Integer>() {@Overridepublic Integer call(Iterable<Integer> v1) throws Exception {int sum = 0;for (Integer v2 : v1) {sum += v2;}return sum;}}).collect().forEach(System.out::println);javaSparkContext.close();
改進用reduceByKey
JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 1);Tuple2<String, Integer> c = new Tuple2<>("a", 2);Tuple2<String, Integer> d = new Tuple2<>("b", 2);ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>(Arrays.asList(a, b, c, d)); javaSparkContext.parallelizePairs(tuple2s).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}}).collect().forEach(System.out::println);javaSparkContext.close();
groupby通過K和通過V分組的模板代碼
?JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 1);Tuple2<String, Integer> c = new Tuple2<>("a", 2);Tuple2<String, Integer> d = new Tuple2<>("b", 2);System.out.println();javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).groupBy(new Function<Tuple2<String, Integer>, Integer>() {@Overridepublic Integer call(Tuple2<String, Integer> v1) throws Exception {return v1._2(); //通過Values分組 將2改為1就是通過K分組}}).collect().forEach(System.out::println);javaSparkContext.close();
數據轉換圖
問題集錦
1.iterator迭代器怎么迭代,它在mapValues方法中的傳出類型是iterator類型,并且在將Lambda和匿名內部類互轉的時候注意傳出泛型即可。(其中封裝了兩種迭代方法)
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = sc.parallelize(list).groupBy(n -> n % 2, 2);groupByRDD.mapValues(new Function<Iterable<Integer>, Integer>() {public Integer call(Iterable<Integer> integers) {int sum = 0;Iterator<Integer> iterator = integers.iterator();while (iterator.hasNext()) {sum += iterator.next();}return sum;
// int sum = 0;
// for (Integer i : integers) {
// sum += i;
// }
// return sum;}}).collect().forEach(System.out::println);
寶貴的經驗
1.function函數傳入泛型不能修改,但是傳出泛型可以修改
2.正則表達式可以通過中括號將多次分割的邏輯封裝到一行代碼中
3.RDD采用了和javaIO一樣的設計模式-裝飾者設計模式,將對象嵌套實現功能