Spark(2)-基礎tranform算子(一)

一、算子列表

編號名稱
1map算子
2flatMap算子
3filter算子
4mapPartitions算子
5mapPartitionsWithIndex算子
6keys算子
7values算子
8mapValues算子
9flatMaplValues算子
10union算子
11reducedByKey算子
12combineByKey算子
13groupByKey算子
14foldByKey算子
15aggregateByKey算子
16ShuffledRDD算子
17distinct算子
18partitionBy算子

?二、代碼示例

package sparkCoreimport org.apache.hadoop.mapreduce.task.reduce.Shuffle
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext, TaskContext}/*** spark基本算子*/object basi_transform_02 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//1. map算子val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7),2)val map_rdd: RDD[Int] = rdd1.map(_ * 2)println("*****1. map算子************")map_rdd.foreach(println(_))//2.flatMap算子println("*****2.flatMap算子************")val arr: Array[String] = Array("Hive python spark","Java Hello Word")val rdd2: RDD[String] = sc.makeRDD(arr, 2)val flatMap_rdd: RDD[String] = rdd2.flatMap(_.split(" "))flatMap_rdd.foreach(println(_))//3.filter算子println("*****3.filter算子***********")val rdd3: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10))val filter_rdd :RDD[Int]= rdd3.filter(_ % 2 == 0)filter_rdd.foreach(println(_))//4. mapPartitions算子:將數據以分區的形式返回,進行map操作,一個分區對應一個迭代器// 應用場景: 比如在進行數據庫操作時,在操作數據之前,需要通過JDBC方式連接數據庫,如果使用map,那每條數據處理之前//         都需要連接一次數據庫,效率顯然很低.如果使用mapPartitions,則每個分區連接一次即可println("*****4. mapPartitions算子**********")val rdd4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10),2)val mapParition_rdd: RDD[Int] = rdd4.mapPartitions(iter => {print("模擬數據庫連接操作")iter.map(_ * 2)})mapParition_rdd.foreach(println(_))//5. mapPartitionsWithIndex算子,類似于mapPartitions,不過有兩個參數//  第一個參數是分區索引,第二個是對應的迭代器// 注意:函數返回的是一個迭代器println("*****5. mapPartitionsWithIndex算子**********")val rdd5: RDD[Int] = sc.parallelize(List(10, 20, 30, 40, 60),2)val mapPartitionWithIndex_Rdd: RDD[String] = rdd5.mapPartitionsWithIndex((index, it) => {it.map(e => s"partition:$index,val:$e")})mapPartitionWithIndex_Rdd.foreach(println(_))//6.keys算子: RDD中的數據是【對偶元組】類型,返回【對偶元組】的全部keyprintln("*****6.keys算子**********")val lst: List[(String, Int)] = List(("spark", 1), ("spark", 3), ("hive", 2),("Java", 1), ("Scala", 3), ("Python", 2))val rdd6: RDD[(String, Int)] = sc.parallelize(lst)val keysRdd: RDD[String] = rdd6.keyskeysRdd.foreach(println(_))//7.values: RDD中的數據是【對偶元組】類型,返回【對偶元組】的全部valueprintln("*****7.values算子**********")val values_RDD: RDD[Int] = rdd6.valuesvalues_RDD.foreach(println(_))//8.mapValues: RDD中的數據為對偶元組類型, 將value進行計算,然后與原Key進行組合返回(即返回的仍然是元組)println("*****8.mapValues算子**********")val lst2: List[(String, Int)] = List(("Hello", 1), ("world", 2),("I", 2), ("love", 3), ("you", 2))val rdd8: RDD[(String, Int)] = sc.parallelize(lst2, 2)val mapValues_rdd: RDD[(String, Int)] = rdd8.mapValues(_ * 10)mapValues_rdd.foreach(println(_))//9.flatMaplValues:RDD是對偶元組,將value應用傳入flatMap打平后,再與key組合println("*****9.flatMaplValues算子**********")// ("spark","1 2 3") => ("spark",1),("spark",2),("spark",3)val lst3: List[(String,String )] = List(("Hello", "1 2 3"), ("world", "4 5 6"),)val rdd9: RDD[(String, String)] = sc.parallelize(lst3)// 第一個_是指初始元組中的value;第二個_是指value拆分后的每一個值(轉換成整數)val flatMapValues: RDD[(String, Int)] = rdd9.flatMapValues(_.split(" ").map(_.toInt))flatMapValues.foreach(println(_))//10.union:將兩個類型一樣的RDD合并到一起,返回一個新的RDD,新的RDD分區數量是兩個RDD分區數量之和println("*****10.union算子**********")val union_rdd1 = sc.parallelize(List(1, 2, 3), 2)val union_rdd2 = sc.parallelize(List(4, 5, 6), 3)val union_rdd: RDD[Int] = union_rdd1.union(union_rdd2)union_rdd.foreach(println(_))//11.reducedByKey,在每個分區中進行局部分組聚合,然后將每個分區聚合的結果從上游拉到下游再進行全局分組聚合println("*****11.reducedByKey算子**********")val lst4: List[(String, Int)] = List(("spark", 1), ("spark", 1), ("hive", 3),("Python", 1), ("Java", 1), ("Scala", 3),("flink", 1), ("Mysql", 1), ("hive", 3))val rdd11: RDD[(String, Int)] = sc.parallelize(lst4, 2)val reduced_rdd: RDD[(String, Int)] = rdd11.reduceByKey(_ + _)reduced_rdd.foreach(println(_))//12.combineByKey:相比reducedByKey更底層的方法,后者分區內和分區之間相同Key對應的value值計算邏輯相同,但是前者可以分別定義不同的//   的計算邏輯.combineByKey 需要傳入三個函數作為參數:// 其中第一個函數:key在上游分區第一次出現時,對應的value該如何處理// 第二個函數:分區內相同key對應value的處理邏輯// 第三個函數: 分區間相同Key對應value的處理邏輯println("*****12.combineByKey算子**********")val f1 = (v:Int) => {val stage = TaskContext.get().stageId()val partition = TaskContext.getPartitionId()println(s"f1 function invoked in stage: $stage,partiton:$partition")v}//分區內相同key對應的value使用乘積val f2 = (a:Int,b:Int) => {val stage = TaskContext.get().stageId()val partition = TaskContext.getPartitionId()println(s"f2 function invoked in stage: $stage,partiton:$partition")a * b}//分區間相同key對應的value使用加法val f3 = (m:Int,n:Int) => {val stage = TaskContext.get().stageId()val partition = TaskContext.getPartitionId()println(s"f3 function invoked in stage: $stage,partiton:$partition")m + n}val rdd12: RDD[(String, Int)] = sc.parallelize(lst4,2)val combineByKey_rdd: RDD[(String, Int)] = rdd12.combineByKey(f1, f2, f3)combineByKey_rdd.foreach(println(_))//13.groupByKey:按key進行分組,返回的是(key,iter(value集合)println("*****13.groupByKey算子**********")val rdd13: RDD[(String, Int)] = sc.parallelize(lst4, 3)val groupByKey_rdd: RDD[(String, Iterable[Int])] = rdd13.groupByKey()groupByKey_rdd.foreach(println(_))//14.foldByKey:每個分區應??次初始值,先在每個進?局部聚合,然后再全局聚合(注意全局聚合的時候,初始值并不會被用到)// 局部聚合的邏輯與全局聚合的邏輯相同println("*****14.foldByKey算子**********")val lst5: List[(String, Int)] = List(("maple", 1), ("kelly", 1), ("Avery", 1),("maple", 1), ("kelly", 1), ("Avery", 1))val rdd14: RDD[(String, Int)] = sc.parallelize(lst5)val foldByKey_rdd: RDD[(String, Int)] = rdd14.foldByKey(1)(_ + _)foldByKey_rdd.foreach(println(_))//15.aggregateByKey:foldByKey,并且可以指定初始值,每個分區應??次初始值,傳?兩個函數,分別是局部聚合的計算邏輯// 和全局聚合的邏輯println("*****15.aggregateByKey算子**********")val rdd15: RDD[(String, Int)] = sc.parallelize(lst5)val aggregateByKey_rdd: RDD[(String, Int)] = rdd15.aggregateByKey(1)(_ + _,_ * _ )aggregateByKey_rdd.foreach(print(_))//16 ShuffledRDD:reduceByKey、combineByKey、aggregateByKey、foldByKey底層都是使?的ShuffledRDD,// 并且 mapSideCombine = trueprintln("*****16.ShuffledRDD算子**********")val rdd16: RDD[(String, Int)] = sc.parallelize(lst5,2)val partitioner = new HashPartitioner(rdd16.partitions.length)// 對rdd16按照指定分區器進行分區// String是rdd16中Key的數據類型,第一個Int是rdd16中value的數據類型,第二個Int是中間結果的數據類型(當然前提是傳入聚合器-里面包含計算邏輯// [可以據此知曉中間結果的數據類型])val shuffledRDD: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String, Int, Int](rdd16,partitioner)// 設置一個聚合器: 指定rdd16的計算邏輯(包含三個函數,分別是分區內一個key對應value的處理邏輯;分區內相同key對應value計算邏輯// 和分區間相同Key對應value計算邏輯)val aggregator: Aggregator[String, Int, Int] = new Aggregator[String, Int, Int](f1, f2, f3)// 給shuffledRDD設置聚合器shuffledRDD.setAggregator(aggregator)shuffledRDD.setMapSideCombine(true) // 設置Map端聚合println(shuffledRDD.collect().toList)// 17.distinct算子:對RDD元素進行去重println("*****17.distinct算子**********")val lst6: Array[String] = Array("spark", "spark", "hive","Python", "Python", "Java")val rdd17: RDD[String] = sc.parallelize(lst6)val distinct_rdd: RDD[String] = rdd17.distinct()println(distinct_rdd.collect().toList)// 18.partitionBy: 按照指定的分區器進行分區(底層使用的是ShuffleRDD)println("***** 18.partitionBy算子**********")val rdd18: RDD[(String,Int)] = sc.parallelize(lst5,2)val partitioner2 = new HashPartitioner(rdd18.partitions.length)val partitioned_rdd: RDD[(String, Int)] = rdd18.partitionBy(partitioner2)println(partitioned_rdd.collect().toList)sc.stop()}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/717700.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/717700.shtml
英文地址,請注明出處:http://en.pswp.cn/news/717700.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

深度學習工具之tokens計算器

1.什么是Token Token是GPT處理文本的基本單位。Token可以是一個字、一個詞語或特定語言中的一個字符。它們負責將輸入的文本數據轉換為 GPT 可以處理的數據格式。每個 GPT 模型都有一個預設的最大 Tokens 數量,例如,GPT-3 每次調用允許處理的最大 Token…

韋東山嵌入式Liunx入門驅動開發五

文章目錄 一、驅動程序基石1-1 休眠與喚醒1-2 POLL機制1-3 異步通知(1) 異步通知程序解析(2) 異步通知機制內核代碼詳解 1-4 阻塞與非阻塞1-5 定時器(1) 內核函數(2) 定時器時間單位 1-6 中斷下半部 tasklet 本人學習完韋老師的視頻,因此來復習鞏固,寫以…

華為OD技術面試案例7-2024年

記錄一下我面試od的面試過程. 1、第一個是hr電話面試, 其實也就是od的hr致電, 簡單了解一下個人情況, 問我要一些個人信息, 這塊沒啥問題; 2、第二個就是機考了, 根據我提供的信息, od的hr給我發了一個機考的鏈接, 并告訴我7天內有效, 可以在考試之前先刷刷題, 刷題地址參考…

《幻獸帕魯》游戲對服務器性能的具體要求是什么?

《幻獸帕魯》游戲對服務器性能的具體要求是什么? CPU:官方最低要求為i5-3570K,但在多人游玩時可能會有明顯卡頓。此外,還有建議選擇4核或更高性能的處理器,以確保游戲運行流暢。 內存:對于不同人數的聯機&…

超越想象:人工智能的奇跡與可能性

超越想象:人工智能的奇跡與可能性 人工智能(Artificial Intelligence,AI)作為當今科技領域的熱門話題,其奇跡和可能性正在不斷被揭示和拓展,超越了人們的想象。從智能機器人到自動駕駛汽車,從語…

蘋果ios群控軟件開發常用源代碼分享!

在移動軟件開發領域,蘋果設備由于其封閉性和安全性受到了廣大開發者的青睞,然而,這也為開發者帶來了一些挑戰,特別是在進行群控軟件開發時。 群控軟件是指可以同時控制多臺設備的軟件,這在自動化測試、批量操作等場景…

數據要素:數字化轉型中的新“金礦”及其發展潛力

作為一名在數字化轉型項目中摸爬滾打的實踐者,我們見證了數據從簡單的信息處理工具逐漸演變為驅動經濟社會發展的關鍵要素。近日,多部門聯合發布的《“數據要素”三年行動計劃(2024—2026年)》更是將數據要素的重要性提升到了新的…

C++ //練習 10.15 編寫一個lambda,捕獲它所在函數的int,并接受一個int參數。lambda應該返回捕獲的int和int參數的和。

C Primer(第5版) 練習 10.15 練習 10.15 編寫一個lambda,捕獲它所在函數的int,并接受一個int參數。lambda應該返回捕獲的int和int參數的和。 環境:Linux Ubuntu(云服務器) 工具:v…

Linux:進入vim編輯模式

vim 是一個強大的文本編輯器。 三種模式: 普通模式(Normal mode) 插入模式(Insert mode) 命令行模式(Command-line mode) 當你打開一個文件時,vim 默認處于普通模式。 插入模式&a…

十六、異常和File

異常和File 一、異常1.1異常的分類1.2 異常的作用1.3 異常的處理方式1.3.1 JVM默認的處理方式1.3.2 自己處理(捕獲異常)1.3.3 自己處理(靈魂四問) 1.4 異常中的常見方法1.5 拋出異常綜合練習(鍵盤錄入數據)…

基于springboot+vue的社區養老服務平臺

博主主頁:貓頭鷹源碼 博主簡介:Java領域優質創作者、CSDN博客專家、阿里云專家博主、公司架構師、全網粉絲5萬、專注Java技術領域和畢業設計項目實戰,歡迎高校老師\講師\同行交流合作 ?主要內容:畢業設計(Javaweb項目|小程序|Pyt…

黑馬點評-商戶查詢業務

緩存原理 本文的業務就是redis的經典應用,標準的操作方式就是查詢數據庫之前先查詢緩存,如果緩存數據存在,則直接從緩存中返回,如果緩存數據不存在,再查詢數據庫,然后將數據存入redis。 緩存更新策略 根據…

Spring重點記錄

文章目錄 1.Spring的組成2.Spring優點3.IOC理論推導4.IOC本質5.IOC實現:xml或者注解或者自動裝配(零配置)。6.hellospring6.1beans.xml的結構為:6.2.Spring容器6.3對象的創建和控制反轉 7.IOC創建對象方式7.1以有參構造的方式創建…

【OneAPI】貓狗類別檢測API

OneAPI新接口發布:貓狗類別檢測 45種狗狗類別和15種貓貓類別檢測。 API地址:POST https://oneapi.coderbox.cn/openapi/api/detect/dogcat 請求參數(body) 參數名類型必填含義說明imageUrlstring是圖片地址網絡圖片地址&#…

Vue路由(黑馬程序員)

路由介紹 將資代碼/vue-project(路由)/vue-project/src/views/tlias/DeptView.vue拷貝到我們當前EmpView.vue同級,其結構如下: 此時我們希望,實現點擊側邊欄的部門管理,顯示部門管理的信息,點擊員工管理,顯…

【周總結平淡但不平凡的周末】

上周總結 根據系統生產環境的日志文件,寫了個腳本統計最近使用我們系統的用戶的手機型號以及系統,幫助聚焦主要測試的機型,以及系統類型 依然是根據時區不同對項目進行改造,還有一個開發好的接口需要下周聯調 2024/3/3 晴…

QT Mingw32/64編譯ffmpeg源碼生成32/64bit庫以及測試

文章目錄 前言下載msys2ysamFFmpeg 搭建編譯環境安裝msys2安裝QT Mingw編譯器到msys環境中安裝ysam測試 編譯FFmpeg測試 前言 FFmpeg不像VLC有支持QT的庫文件,它僅提供源碼,需要使用者自行編譯成對應的庫,當使用QTFFmpeg實現播放視頻以及視頻…

連接 mongodb集群的集中方式

mongodb 連接到復制集 mongodb://node1,node2,node3.../database?[options]mongodb 連接到分片集 mongodb://mongos1,mongos2,mongos3.../database?[options]使用 mongosrv 通過域名解析得到所有的 mongos 或 節點的地址, 而不是把這些寫在連接字符串中. mongodbsrv://se…

經典的算法面試題(1)

題目: 給定一個整數數組 nums,編寫一個算法將所有的0移到數組的末尾,同時保持非零元素的相對順序。 示例: 輸入: [0,1,0,3,12] 輸出: [1,3,12,0,0] 注意:必須在原數組上操作,不能拷貝額外的數組。盡量減少操作次數。 這…

數據處理——一維數組轉列向量(分割時間序列為數據塊時的問題)

記錄在處理數據時被磕絆了一下的一個處理細節。 1.想要達到的要求 在某次滑動窗口取樣時間序列數據時,我得到如下一個以一維數組為元素的列表: 對于如上輸出列表中的每個一維數組,我希望將其轉換為下圖中的形式,簡單說就是希望他…