SparkKV轉換算子實戰解析


目錄

KV算子

parallelizePairs

mapToPair

mapValues

groupByKey

reduceByKey

sortByKey

算子應用理解

reduceByKey和groupByKey的區別

groupByKey+mapValues實現KV數據的V的操作

改進用reduceByKey

groupby通過K和通過V分組的模板代碼

問題集錦

寶貴的經驗


這里會講到之前還未講到過的KV算子。我們之前的操作都是單值操作,這一篇我們會著重講到KV操作、行動算子和持久化等知識。

KV算子

作用:操作KV流數據,能夠分別操作K和V

出現JavaPairRDD就表示出現了成對KV數據流

parallelizePairs

作用:封裝Tuple2集合形成RDD

細節源碼如下


?

mapToPair


作用:配合parallelizePairs方法
1.單值數據轉化成KV對數據
2.Tuple元組整體轉化成KV鍵值對形式

?


兩者一起的代碼

        JavaPairRDD<String, Integer> JRD = sc.parallelizePairs(Arrays.asList(a, a1, a2, a3));JRD.mapToPair( tuple -> new Tuple2<>(tuple._1, tuple._2*2)).collect().forEach(System.out::println);

mapValues

作用:K不變,操作KV流中的V,并且只要類型是JavaPairRDD就可以用此方法

????????

示意圖

????????
代碼實現

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);sc.parallelize(list).mapToPair(integer -> new Tuple2<Integer,Integer>(integer, integer * 2)).mapValues(int1 -> int1 * 2).collect().forEach(System.out::println);

這里配合一個wordcount案例加深一下理解


思考鏈條:
讀取文件textFile --> flatmap扁平化流數據(String[] -> String)->groupby分組 ->mapValues按照V來計算


代碼

//TODO 寫一個wordcountJavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));JavaRDD<String> rdd = javaSparkContext.textFile("E:\\ideaProjects\\spark_project\\data\\wordcount");rdd.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")).iterator();}}).groupBy(n -> n).mapValues(iter -> {int sum = 0;for (String word : iter) {sum++;}return sum;}).collect().forEach(System.out::println);javaSparkContext.close();

?所以,整個轉換過程是:
? ?- 輸入:一行字符串(`String`)
? ?- 用`split`方法:將該行字符串分割成字符串數組(`String[]`)
? ?- 用`Arrays.asList`:將字符串數組轉換為字符串列表(`List<String>`)
? ?- 調用列表的`iterator`方法:得到字符串的迭代器(`Iterator<String>`)
? ?- 在`flatMap`中,Spark會遍歷這個迭代器,將每個字符串(單詞)作為新元素放入結果RDD。

flatmap本質:都是將數組轉換成一個可以逐個訪問其元素的迭代器

groupByKey

作用:將KV對按照K對V進行分組


代碼實現
?

        JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 2);Tuple2<String, Integer> c = new Tuple2<>("a", 3);Tuple2<String, Integer> d = new Tuple2<>("b", 4);javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).collect().forEach(System.out::println);System.out.println();javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).groupByKey(3).collect().forEach(System.out::println);

reduceByKey

作用:在KV對中,按照K對V進行聚合操作,(底層會在分區內進行預聚合優化)


代碼實現
對二元組進行按照K對V相加的聚合操作

????????

                javaSparkContext.parallelizePairs(tuple2s).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}}).collect().forEach(System.out::println);

sortByKey

????????
作用:按照K進行XXX的升序/降序排列

代碼實現

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> aa0 = new Tuple2<>("a", 4);Tuple2<String, Integer> aa1 = new Tuple2<>("a", 1);Tuple2<String, Integer> aa2 = new Tuple2<>("a", 2);Tuple2<String, Integer> bb1 = new Tuple2<>("b", 2);Tuple2<String, Integer> aa3 = new Tuple2<>("a", 3);Tuple2<String, Integer> bb2 = new Tuple2<>("b", 1);ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>(Arrays.asList(aa0,aa1, aa2, aa3, bb1, bb2));javaSparkContext.parallelizePairs(tuple2s).sortByKey().collect().forEach(System.out::println);javaSparkContext.close();

傳入參數為false時



Comparable接口的使用

利用自定義類型進行排序操作

        JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Artist at1 = new Artist("小王", 100);Artist at2 = new Artist("小李", 1000);Artist at3 = new Artist("小趙", 10000);Artist at4 = new Artist("小宇", 100000);Tuple2<Artist, Integer> artistIntegerTuple2 = new Tuple2<>(at1, 1);Tuple2<Artist, Integer> artistIntegerTuple3 = new Tuple2<>(at2, 2);Tuple2<Artist, Integer> artistIntegerTuple4 = new Tuple2<>(at3, 3);Tuple2<Artist, Integer> artistIntegerTuple5 = new Tuple2<>(at4, 4);JavaPairRDD<Artist, Integer> artistIntegerJavaPairRDD = javaSparkContext.parallelize(Arrays.asList(artistIntegerTuple2, artistIntegerTuple3, artistIntegerTuple4, artistIntegerTuple5)).mapToPair(t -> t);artistIntegerJavaPairRDD.sortByKey().collect().forEach(System.out::println);javaSparkContext.close();class Artist implements Serializable, Comparable<Artist> {String name;int salary;public Artist(String name, int salary) {this.name = name;this.salary = salary;}@Overridepublic int compareTo(Artist o) {return o.salary - this.salary;}@Overridepublic String toString() {return "Artist{" +"name='" + name + '\'' +", salary=" + salary +'}';}
}

coalesce

????????
作用:縮減分區,不會自動進行shuffle


示意圖


代碼實現

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);javaSparkContext.parallelize(tuple2s).coalesce(2).collect().forEach(System.out::println);javaSparkContext.close();


repartition

????????
作用:調整分區數,等價于coalesce的shuffle=true時

示意圖


代碼實現
?

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);javaSparkContext.parallelize(tuple2s).repartition(2).saveAsTextFile("out2");javaSparkContext.close();

算子應用理解

reduceByKey和groupByKey的區別

?

性能更高:在shuffle之前有一個預聚合的功能Combine,可以將分區中的小文件合并,減少shuffle落盤的數據量
因此在實際開發中





groupByKey+mapValues實現KV數據的V的操作

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 2);Tuple2<String, Integer> c = new Tuple2<>("a", 3);Tuple2<String, Integer> d = new Tuple2<>("b", 4);System.out.println();javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).groupByKey(3).mapValues(new Function<Iterable<Integer>, Integer>() {@Overridepublic Integer call(Iterable<Integer> v1) throws Exception {int sum = 0;for (Integer v2 : v1) {sum += v2;}return sum;}}).collect().forEach(System.out::println);javaSparkContext.close();

改進用reduceByKey

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 1);Tuple2<String, Integer> c = new Tuple2<>("a", 2);Tuple2<String, Integer> d = new Tuple2<>("b", 2);ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>(Arrays.asList(a, b, c, d));
javaSparkContext.parallelizePairs(tuple2s).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}}).collect().forEach(System.out::println);javaSparkContext.close();

groupby通過K和通過V分組的模板代碼
?

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));Tuple2<String, Integer> a = new Tuple2<>("a", 1);Tuple2<String, Integer> b = new Tuple2<>("b", 1);Tuple2<String, Integer> c = new Tuple2<>("a", 2);Tuple2<String, Integer> d = new Tuple2<>("b", 2);System.out.println();javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d)).groupBy(new Function<Tuple2<String, Integer>, Integer>() {@Overridepublic Integer call(Tuple2<String, Integer> v1) throws Exception {return v1._2(); //通過Values分組 將2改為1就是通過K分組}}).collect().forEach(System.out::println);javaSparkContext.close();

數據轉換圖

問題集錦


1.iterator迭代器怎么迭代,它在mapValues方法中的傳出類型是iterator類型,并且在將Lambda和匿名內部類互轉的時候注意傳出泛型即可。(其中封裝了兩種迭代方法)

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = sc.parallelize(list).groupBy(n -> n % 2, 2);groupByRDD.mapValues(new Function<Iterable<Integer>, Integer>() {public Integer call(Iterable<Integer> integers) {int sum = 0;Iterator<Integer> iterator = integers.iterator();while (iterator.hasNext()) {sum += iterator.next();}return sum;
//                        int sum = 0;
//                        for (Integer i : integers) {
//                            sum += i;
//                        }
//                        return sum;}}).collect().forEach(System.out::println);

寶貴的經驗

1.function函數傳入泛型不能修改,但是傳出泛型可以修改



2.正則表達式可以通過中括號將多次分割的邏輯封裝到一行代碼中



3.RDD采用了和javaIO一樣的設計模式-裝飾者設計模式,將對象嵌套實現功能


本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/917751.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/917751.shtml
英文地址,請注明出處:http://en.pswp.cn/news/917751.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

深度解析 TCP 三次握手與四次揮手:從原理到 HTTP/HTTPS 的應用

TCP 的三次握手和四次揮手是網絡通信的基石&#xff0c;無論是 HTTP 還是 HTTPS&#xff0c;它們都依賴 TCP 提供可靠的傳輸層服務。本文將用萬字篇幅&#xff0c;結合 Mermaid 圖表和代碼示例&#xff0c;深入講解 TCP 三次握手、四次揮手的原理、過程、狀態變化&#xff0c;以…

Hyper-V + Centos stream 9 搭建K8s集群(一)

一、創建虛擬機一臺32G內存&#xff0c;16核心的Win11&#xff0c;已經安裝了Hyper-V 管理器。然后也下載了CentOS-Stream-9-latest-x86_64-dvd1.iso的鏡像文件。這里Hyper-V創建虛擬機的過程就不贅述了&#xff0c;如果出現虛擬機加載不到鏡像的問題&#xff0c;先把這個使用安…

Pygame如何制作小游戲

以下是 Pygame 的詳細使用指南&#xff0c;從安裝到開發完整游戲的步驟說明&#xff0c;包含代碼示例和最佳實踐&#xff1a; 一、安裝與環境配置 1. 安裝 Pygame pip install pygame2. 驗證安裝 import pygame pygame.init() print(pygame.version.ver) # 應輸出版本號&am…

@【JCIDS】【需求論證】聯合能力集成與開發系統知識圖譜

JCIDS(聯合能力集成與開發系統)知識圖譜 1. JCIDS概述 2. JCIDS的提出背景 3. JCIDS核心流程 4. JCIDS分析方法 5. JCIDS優勢 6. JCIDS與采辦系統的關系 7. JCIDS知識圖譜結構 8. 對我的啟示 9.JCIDS(聯合能力集成與開發系統)相關術語列表 10. 參考文獻 1. JCIDS概述 定義:…

每天學一個Linux命令(38):vi/vim

每天學一個 Linux 命令(38):vi/vim vi 和 vim(Vi IMproved)是 Linux 和 Unix 系統中功能強大的文本編輯器。vim 是 vi 的增強版,提供語法高亮、多級撤銷、插件支持等更多功能。掌握 vi/vim 是 Linux 系統管理員的必備技能之一。 1. 命令簡介 vi:經典的文本編輯器,幾乎…

【PZ-ZU49DR-KFB】:璞致電子 UltraScale+ RFSoC 架構下的軟件無線電旗艦開發平臺

璞致電子 PZ-ZU49DR-KFB 開發板基于 Xilinx ZYNQ UltraScale RFSoC XCZU49DR 主控制器&#xff0c;以 "ARMFPGA 異構架構" 為核心&#xff0c;融合高帶寬信號采集、高速數據處理與靈活擴展能力&#xff0c;專為專業工程師打造的軟件無線電&#xff08;SDR&#xff09…

力扣106:從中序與后序遍歷序列構造二叉樹

力扣106:從中序與后序遍歷序列構造二叉樹題目思路代碼題目 給定兩個整數數組 inorder 和 postorder &#xff0c;其中 inorder 是二叉樹的中序遍歷&#xff0c; postorder 是同一棵樹的后序遍歷&#xff0c;請你構造并返回這顆 二叉樹 。 思路 我們首先要知道中序遍歷和后序…

IDEA JAVA工程入門

Maven配置&#xff1a; IDEA -> settings -> Build, Execution, Deployment -> Build Tools -> MavenMaven home pathUser setting file : 特定倉庫下載依賴包&#xff0c;自動下載(界面右邊M圖標點開&#xff0c;)local repository &#xff08;本地倉庫&#xff…

Spring依賴注入:從原理到實踐的自學指南

Spring依賴注入&#xff1a;從原理到實踐的自學指南 一、什么是依賴注入&#xff1f; 依賴注入&#xff08;Dependency Injection, DI&#xff09;是Spring框架實現控制反轉&#xff08;IoC&#xff09;的核心手段。其核心思想是&#xff1a;對象不再自己創建依賴項&#xff…

3_軟件重構_組件化開發實例方法論

1、上期回顧上次內容核心的地方有兩個&#xff0c;①是C多態基類的指針指向派生類&#xff0c;用于初始化各個插件。②是使用C語言的dlopen函數“動態加載”各個插件&#xff0c;實現用戶根據契約接口自定義開發插件&#xff0c;極大程度地實現了軟件上的解耦。③再進一步&…

C#接口的定義與使用

第1章 接口&#xff08;interface&#xff09;是什么1.1 定義? 接口是一組“能力”或“契約”的抽象描述&#xff0c;只規定“能做什么”&#xff0c;不規定“怎么做”。? 在 C# 中&#xff0c;接口是一種完全抽象的類型&#xff08;fully abstract type&#xff09;。 ? 關…

【STM32】HAL庫中的實現(三):PWM(脈沖寬度調制)

&#x1f527; HAL庫中的實現&#xff1a;PWM&#xff08;脈沖寬度調制&#xff09; PWM&#xff08;Pulse Width Modulation&#xff09;是基于定時器&#xff08;TIM&#xff09;產生的周期性脈沖信號&#xff0c;廣泛應用于&#xff1a;① 電機調速&#xff1b;② LED 亮度控…

GitHub 趨勢日報 (2025年08月03日)

&#x1f680; GitHub 趨勢日報 (2025年08月03日) &#x1f4ca; 由 TrendForge 系統生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日報中的項目描述已自動翻譯為中文 &#x1f4c8; 今日獲星趨勢圖 今日獲星趨勢圖751dyad362LLMs-from-scratch291…

Java后端高頻面試題

Java后端高頻面試題 目錄 Java集合框架Java并發編程JVM相關MySQL數據庫Redis緩存Spring框架 Java集合框架 HashMap的數據結構是什么&#xff0c;為什么在JDK8要引入紅黑樹&#xff1f; HashMap數據結構&#xff1a; JDK7&#xff1a;數組 鏈表JDK8&#xff1a;數組 鏈表…

37. line-height: 1.2 與 line-height: 120% 的區別

概述 line-height 是 CSS 中用于控制文本行間距的重要屬性。雖然 line-height: 1.2 和 line-height: 120% 看似相同&#xff0c;但它們在計算方式上存在關鍵區別&#xff0c;尤其是在繼承和計算值方面。1. 計算方式不同寫法類型計算方式說明line-height: 1.2無單位&#xff08;…

藍橋杯----DS1302實時時鐘

&#xff08;六&#xff09;、DS1302實時時鐘1、原理&#xff08;圖 二十六&#xff09;DS1302通過三線串行接口與單片機進行通信。微控制器可以通過設置RST引腳為高電平來使能DS1302&#xff0c;并通過SCK引腳提供串行時鐘信號&#xff0c;然后通過I/O引腳進行數據的讀寫操作。…

C++對象訪問有訪問權限是不是在ide里有效

在C中&#xff0c;對象的訪問權限&#xff08;即公有&#xff08;public&#xff09;、保護&#xff08;protected&#xff09;和私有&#xff08;private&#xff09;成員的訪問&#xff09;是編譯時的一部分&#xff0c;而不是運行時。這意味著&#xff0c;無論是在IDE&#…

CubeMX安裝芯片包

1.點擊HELP2.選擇公理嵌入式軟件包3.選擇并下載芯片包

【面向對象】面向對象七大原則

設計模式 設計模式是什么&#xff1f; 設計模式是一種針對于反復提出問題的解決方案&#xff0c;是經過長時間經驗和試錯而總結出來的一套業務流程&#xff1b; 其目的是為了提高代碼的可重用性和可維護性&#xff0c;讓代碼更容易讓人理解&#xff0c;保證代碼可靠性&#…

《計算機“十萬個為什么”》之 面向對象 vs 面向過程:編程世界的積木與流水線

《計算機“十萬個為什么”》之 面向對象 vs 面向過程&#xff1a;編程世界的積木與流水線 &#x1f916; 想象你要造一輛汽車&#x1f527;&#xff1a; 面向過程 按手冊一步步擰螺絲&#xff1a;擰緊螺栓A → 安裝輪胎B → 焊接車架C 面向對象 召喚汽車人戰隊&#xff1a;引…