spark將rdd轉為string_八、Spark之詳解Tranformation算子

RDD中的所有轉換(Transformation)算子都是延遲加載的,也就是說,它們并不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。

常用Transformation類算子列表

a484cfc46d844e378e06d4f7817ddad3.png

常用Transformation類算子列表

常用Transformation類算子實例

  • map(func): 返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成, map操作是一對一操作,每進去一個元素,就出來一個元素
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24# 對每個元素乘以10返回新的rdd2scala> val rdd2 = rdd1.map(_*10)rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at :25scala> rdd2.collectres1: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)# 對每個元素拼接一個字符串,返回新的String類型的RDDscala> val rdd3 = rdd1.map(_+"@map.com")rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at map at :25scala> rdd3.collectres3: Array[String] = Array(1@map.com, 2@map.com, 3@map.com, 4@map.com, 5@map.com, 6@map.com, 7@map.com, 8@map.com, 9@map.com, 10@map.com)
  • filter(func): 過濾。返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成, RDD元素的類型不會改變。
scala> val rdd1 = sc.parallelize(Array("喬峰","段譽","虛竹","鳩摩智","達摩祖師"))rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at :24# filter中為true的會被保留,會false的會被過濾scala> val rdd2 = rdd1.filter(!_.contains("摩"))rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at :25scala> rdd2.collectres4: Array[String] = Array(喬峰, 段譽, 虛竹)
  • flatMap(func): 壓平。類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
scala> val rdd1 = sc.parallelize(Array("say you say me say it together","good good study day day up"))rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at :24# 進去一條,出來多條,是一對多的轉換scala> val rdd2 = rdd1.flatMap(_.split(" "))rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at :25scala> rdd2.collectres5: Array[String] = Array(say, you, say, me, say, it, together, good, good, study, day, day, up)

集合類Transformation算子實例

  • union(otherRDD): 對源RDD和參數RDD求并集后返回一個新的RDD, 需要兩個RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24# 求兩個RDD的并集scala> val rdd3 = rdd1.union(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[11] at union at :27scala> rdd3.collectres6: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 6)
  • subtract(otherRDD): 對源RDD和參數RDD求差集后返回一個新的RDD, 需要兩個RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24scala> val rdd3 = rdd1.subtract(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at subtract at :27# rdd1與rdd2的差集是"1"scala> rdd3.collectres7: Array[Int] = Array(1)# rdd2與rdd1的差集是"6"scala> val rdd4 = rdd2.subtract(rdd1)rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at subtract at :27scala> rdd4.collect()res8: Array[Int] = Array(6)
  • intersection(otherRDD): 對源RDD和參數RDD求交集后返回一個新的RDD, 需要有兩個RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(2,3,4,5,6))rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :24# 求兩個RDD的交集返回新的RDDscala> val rdd3 = rdd1.intersection(rdd2)rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[25] at intersection at :27scala> rdd3.collect()res9: Array[Int] = Array(4, 3, 5, 2)
  • distinct(): 對源RDD進行去重后返回一個新的RDD, 只需要一個RDD
scala> val rdd1 = sc.parallelize(Array(1,1,1,2,2,2,3,3,3))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at :24# 在一個RDD中實現去重功能scala> val rdd2 = rdd1.distinct()rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at distinct at :25scala> rdd2.collect()res10: Array[Int] = Array(1, 3, 2)

其底層的實現原理(如下面Java代碼所示)是:mapToPair+reduceByKey+mapToPair =>

import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.*;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;/** * distinct: 對RDD中的元素去重 */public class distinctOperator {    public static void main(String[] args) {        SparkConf conf = new SparkConf()                .setMaster("local")                .setAppName("distinct");        JavaSparkContext sc = new JavaSparkContext(conf);        sc.setLogLevel("WARN");        JavaRDD rdd1 = sc.parallelize(Arrays.asList(                "a", "a", "a", "a",                "b", "b", "b", "b"        ));        /**         * 傳統方式實現RDD元素去重需要三步         *  第一步:把RDD轉換成K,V格式的RDD, K為元素,V為1         *  每二步:對K,V格式的RDD的Key進行分組計算         *  第三步:對得到的RDD只取第一位鍵         */        // [(a,1),(a,1),(a,1),(a,1),(b,1),b,1),b,1),b,1)]        JavaPairRDD mapToPairRDD = rdd1.mapToPair(new PairFunction() {            @Override            public Tuple2 call(String s) throws Exception {                return new Tuple2(s, 1);            }        });        //對每個key進行聚合        //[(a,4),(b,4)]        JavaPairRDD reduceRDD = mapToPairRDD.reduceByKey(new Function2() {            @Override            public Integer call(Integer v1, Integer v2) throws Exception {                return v1 + v2;            }        });        //只取鍵,不要值        JavaRDD mapRDD = reduceRDD.map(new Function, String>() {            @Override            public String call(Tuple2 tuple) throws Exception {                return tuple._1;            }        });        mapRDD.foreach(new VoidFunction() {            @Override            public void call(String s) throws Exception {                System.out.println(s);            }        });        System.out.println("-----------------------------------");        //使用Spark提供的算子distinct實現RDD元素去重        JavaRDD distinctRDD = rdd1.distinct();        distinctRDD.foreach(new VoidFunction() {            @Override            public void call(String s) throws Exception {                System.out.println(s);            }        });        sc.stop();    }}

分組類的轉換算子

groupByKey([numTasks]): 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD。偏底層
scala> val rdd1 = sc.parallelize(List(("張軍",1000),("李軍",2500),("王軍",3000),("張軍",1500)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at :24scala> val rdd2 = rdd1.groupByKey()rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[31] at groupByKey at :25scala> rdd2.collect()res11: Array[(String, Iterable[Int])] = Array((王軍,CompactBuffer(3000)), (張軍,CompactBuffer(1000, 1500)), (李軍,CompactBuffer(2500)))
  • reduceByKey(func, [numTasks]): 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置。調用groupByKey。
scala> val rdd1 = sc.parallelize(Array(("red",10),("red",20),("red",30),("red",40),("red",50),("yellow",100),("yellow",100)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at :24# 按照key進行聚合操作scala> val rdd2 = rdd1.reduceByKey(_+_)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at :25scala> rdd2.collect()res12: Array[(String, Int)] = Array((yellow,200), (red,150))
  • cogroup(otherRDD, [numTasks]): 在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD
scala> val rdd1 = sc.parallelize(Array(("張飛","丈八蛇矛"),("關羽","青龍偃月刀"),("呂布","方天畫戟")))rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at parallelize at :24scala> val rdd2 = sc.parallelize(Array(("張飛",30),("關羽",35),("呂布",45),("劉備",42)))rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[35] at parallelize at :24scala> val rdd3=rdd1.cogroup(rdd2)rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[Int]))] = MapPartitionsRDD[37] at cogroup at :27scala> rdd3.collect()res13: Array[(String, (Iterable[String], Iterable[Int]))] = Array((呂布,(CompactBuffer(方天畫戟),CompactBuffer(45))), (關羽,(CompactBuffer(青龍偃月刀),CompactBuffer(35))), (張飛,(CompactBuffer(丈八蛇矛),CompactBuffer(30))), (劉備,(CompactBuffer(),CompactBuffer(42))))

排序類Transformation算子

sortBy(func,[ascending], [numTasks]): 與sortByKey類似,但是更靈活
scala> val rdd1=sc.parallelize(Array(10,9,8,7,4,6,5,3,1,2))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at :24scala> val rdd2=rdd1.sortBy(x=>x,true)rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at sortBy at :25scala> rdd2.collect()res14: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)# K, V格式的RDDscala> val rdd1=sc.parallelize(Array(("張飛",30),("劉備",42),("關羽",32),("曹操",46),("公孫瓚",62)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at :24scala> val rdd2=rdd1.sortBy(tuple=>tuple._2, false)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[45] at sortBy at :25scala> rdd2.collect()res15: Array[(String, Int)] = Array((公孫瓚,62), (曹操,46), (劉備,42), (關羽,32), (張飛,30))
sortByKey([ascending], [numTasks]): 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
scala> val rdd1=sc.parallelize(Array(("張飛",30),("劉備",42),("關羽",32),("曹操",46),("公孫瓚",62)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at :24# 同樣對rdd1調用,需要進行轉換scala> val rdd2=rdd1.map(tuple=>tuple.swap).sortByKey(false).map(tuple=>tuple.swap)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[48] at map at :25scala> rdd2.collect()res16: Array[(String, Int)] = Array((公孫瓚,62), (曹操,46), (劉備,42), (關羽,32), (張飛,30))

高級類的轉換算子

  • mapPartitionWithIndex(func): 類似于mapPartitions, 但是func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Iterator[T]) => Iterator[U]。其功能是對RDD中的每個分區進行操作,帶有索引下標,可以取到分區號。
    • func: 接收兩個參數,第一個參數代表分區號,第二參數代表分區中的元素。
scala> val rdd1 = sc.parallelize(List("son1","son2","son3","son4","son5","son6","son7","son8","son9","son10","son11","son12"),4)rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at :24scala> val rdd2 = rdd1.mapPartitionsWithIndex((index, iter) => {iter.toList.map(x=> "【分區號為:"+index+", 值為:" + x+ "】").iterator})rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at :25scala> rdd2.collect()res0: Array[String] = Array(【分區號為:0, 值為:son1】, 【分區號為:0, 值為:son2】, 【分區號為:0, 值為:son3】, 【分區號為:1, 值為:son4】, 【分區號為:1, 值為:son5】, 【分區號為:1, 值為:son6】, 【分區號為:2, 值為:son7】, 【分區號為:2, 值為:son8】, 【分區號為:2, 值為:son9】, 【分區號為:3, 值為:son10】, 【分區號為:3, 值為:son11】, 【分區號為:3, 值為:son12】)
  • aggregateByKey: 后面有單獨文章講解此算子

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/533143.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/533143.shtml
英文地址,請注明出處:http://en.pswp.cn/news/533143.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

html的過渡屬性,CSS3屬性transition(過渡)多屬性詳解

transform呈現的是一種變形結果,而transition呈現的是一種過渡,通俗點說就是一種動畫轉換過程,如漸顯、漸弱、動畫快慢等。transition和transform是兩種不同的動畫模型。transition屬性是一個簡寫屬性,用于設置四個過渡屬性transi…

2021年呼和浩特高考段考成績查詢,2019屆呼和浩特市高三段考成績排名分析

原標題:2019屆呼和浩特市高三段考成績排名分析不忘初心 天道酬勤╳?校對:劉姝坤?文稿:王濤老師?聲明:如有轉載請聯系并注明出處好樂(巨人)教育2019高三普文理集訓段考班火熱招生中全呼市唯一一家吃住學一體封閉式管理的學校唯一…

dj打碟怎么學_學DJ要不要去培訓學校?

酒吧學DJ打碟他有很多種的說法,有些酒吧他是自己招學生,當這樣的酒吧在現今是挺少的,也有,但要求很高。還有一種就是說你自己在酒吧里上班的人自己招私人徒弟什么的,那也是就學DJ打碟,那一搬酒吧都是怎么學…

html設置table表格的弧度,用CSS3和table標簽實現一個圓形軌跡的動畫的示例代碼

html:其實就是根據table標簽把幾個實心圓div進行等邊六角形的排布,并放入一個div容器中,然后利用CSS3的循環旋轉的動畫效果對最外層的div容器進行自轉實現,當然不要忘了把div容器的外邊框設置圓形弧度的。BMI色盲色弱心率開始測試…

ios時間相差多少天_上海自駕拉薩,走川進青出,應如何規劃線路?需要多少天時間?...

線路總覽上海自駕西藏拉薩,川進青出,全程約8000公里,需用時18~25天,行程主要分為4段:1、進藏之前:上海—成都,2000公里,3~5天;2、川藏線進:成都—拉薩&#x…

儒林外史每回概括簡短10字_早安心語正能量經典短句 一句話的簡短勵志語錄

1、人生就兩件事, 一件是拿事兒把時間填滿,另一件是拿感覺把心填滿 。早安!早安心語正能量經典短句 一句話的簡短勵志語錄點擊添加圖片描述(最多60個字)2、憑著一股子信念往前沖,到哪兒都是優秀的人。生活它從來不會允諾我們一份輕…

半個小時用計算機怎么算,CPA機考計算器操作指南,掌握這些快捷鍵,考試“延長”半小時!...

原標題:CPA機考計算器操作指南,掌握這些快捷鍵,考試“延長”半小時!注冊會計師考試就是一場在有限的時間內得分最多的比拼游戲!不僅要求考生學習各種專業知識,同時還需要掌握一定的策略。例如注會考試采用機…

c均值聚類matlab程序_機器學習筆記-9-聚類

1 K-means算法K-means是最普及的聚類算法,算法接受一個未標記的數據集,然后將數據聚類成不同的組。它是一個迭代算法,假設我們想要將數據聚類成 n 個組,其方法為:選擇K個隨機的點,稱為聚類中心對于數據集中的每一個數據…

php井字游戲代碼_PHP初級筆試題:Tic-Tac-Toe(n階井字棋)判斷勝負

//Tic-Tac-Toe$n 5;//五階棋盤$res array();function check($arr){$n $GLOBALS[n];$res $GLOBALS[res];//已經下過這一步,返回false;沒有,賦值if (isset($res[$arr[1]][$arr[2]])) {return false;} else {$res[$arr[1]][$arr[2]] $arr[0…

js與html編碼不同,js與html中unicode編碼的使用

【轉】javascript和html中unicode編碼和字符轉義的詳解不是十分理解unicode和html轉義的情況下,可能會誤用,所以下面會對它們再做比較容易理解的解釋: 1.html中的轉義:在html中如果遇到轉義字符(如“ ”),不管你的頁面字符編碼是utf-8 ...javascript和html中unicode編碼和字符轉…

g標簽 怎么設置svg_SVG g元素

SVG 元素SVG 元素用于將SVG形狀分組在一起。分組后,您可以像變形單個形狀一樣變換整個形狀。與 不能單獨成為轉換目標的嵌套 元素相比,這是一個優勢。您還可以設置分組元素的樣式,并像對待單個元素一樣重復使用它們。元素g是用來組合對象的容…

html和css哪個優先,CSS3 | 樣式和優先級

css3一般介紹:CSS注釋:/*CSS*/CSS長度單位:1.px(像素)2.em(倍數,一般用于文字)一、HTML嵌套CSS3樣式:1.外部(推薦)例如HTML文件為index.html將樣式放入另一文件中,index.css以上兩個文件放入同一文件夾下2.…

java上傳視頻到七牛云_Java進階學習:將文件上傳到七牛云中

Java進階學習:將文件上傳到七牛云中通過本文,我們將講述如何利用七牛云官方SDK,將我們的本地文件傳輸到其存儲空間中去。JavaSDK:https://developer.qiniu.com/kodo/sdk/1239/java#server-upload官方SDK:https://devel…

計算機網絡討論4,計算機網絡實驗四

實驗四IEEE 802.3協議分析和以太網一、實驗目的1、分析802.3協議2、熟悉以太網幀的格式二、實驗環境與因特網連接的計算機網絡系統;主機操作系統為windows;Ethereal、IE 等軟件。三、實驗步驟(注:本次實驗先完成前面的“1 俘獲并分析以太網幀…

rust新版組隊指令_Rust最新控制臺命令2017

物品名稱物品代碼電池battery.small骨頭碎片bone.fragments空的豆罐頭can.beans.empty空的金槍魚罐頭can.tuna.empty攝像頭cctv.camera木炭charcoal煤coal石油crude.oil炸藥explosives動物脂肪fat.animal火藥gunpowder高級金屬礦hq.metal.ore金屬碎片metal.fragments金屬礦meta…

python實現mini-batch_Mini-Batch 、Momentum、Adam算法的實現

def random_mini_batches(X,Y,mini_batch_size64,seed0):"""從(X,Y)中創建一個隨機的mini-batch列表參數:X - 輸入數據,維度為(輸入節點數量,樣本的數量)Y - 對應的是X的標簽,【1 | 0】(藍|紅)&#xf…

html5+shim腳本,HTML5探秘:用requestAnimationFrame優化Web動畫

requestAnimationFrame是什么?在瀏覽器動畫程序中,我們通常使用一個定時器來循環每隔幾毫秒移動目標物體一次,來讓它動起來。如今有一個好消息,瀏覽器開發商們決定:“嗨,為什么我們不在瀏覽器里提供這樣一個…

計算機科學與技術的專業論述,關于計算機科學專業的論文題目 計算機科學專業論文題目怎樣定...

【100道】關于關于計算機科學專業的論文題目匯總,作為大學生的畢業生應該明白了計算機科學專業論文題目怎樣定,選一個好的題目后續的計算機科學專業論文寫作起來會更輕松!一、比較好寫的計算機科學專業論文題目:1、計算機科學與技術專業應用型人才培養改革調研分析—…

ming window 交叉編譯_opencv3編譯pc端及交叉編譯arm端

環境: opensuse opencv3.4.1 交叉編譯器arm-openwrt-linux 作者:帥得不敢出門https://github.com/opencv/opencv/tree/3.4.1選擇右邊的"clone or download"按鈕進行下載,選擇下載zip我下的是opencv-3.4.1.zip, 3.4.1的版本號…

鎖定計算機 背景圖片,win7系統電腦更換鎖屏壁紙的方法

當win7系統電腦在一段時間不動的話就進入鎖屏狀態,然而很多用戶覺得默認的鎖屏壁紙不好看,就想要更換自己喜歡的鎖屏壁紙,那么win7怎么更換鎖屏壁紙呢?下面給大家講解一下win7系統電腦更換鎖屏壁紙的方法。1、同時按下窗口鍵winR組…