Spark 鍵值對RDD操作

https://www.cnblogs.com/yongjian/p/6425772.html

概述

鍵值對RDD是Spark操作中最常用的RDD,它是很多程序的構成要素,因為他們提供了并行操作各個鍵或跨界點重新進行數據分組的操作接口。

?

?

創建

Spark中有許多中創建鍵值對RDD的方式,其中包括

  • 文件讀取時直接返回鍵值對RDD
  • 通過List創建鍵值對RDD

在Scala中,可通過Map函數生成二元組

1
2
3
4
5
6
7
8
9
10
val listRDD = sc.parallelize(List(1,2,3,4,5))
val result = listRDD.map(x => (x,1))
result.foreach(println)
//結果
(1,1)
(2,1)
(3,1)
(4,1)
(5,1)

?

?

鍵值對RDD的轉化操作

?

基本RDD轉化操作在此同樣適用。但因為鍵值對RDD中包含的是一個個二元組,所以需要傳遞的函數會由原來的操作單個元素改為操作二元組。

下表總結了針對單個鍵值對RDD的轉化操作,以?{ (1,2) , (3,4) , (3,6) }? 為例,f表示傳入的函數

函數名目的示例結果
reduceByKey(f)合并具有相同key的值rdd.reduceByKey( ( x,y) => x+y ){ (1,2) , (3,10) }
groupByKey()對具有相同key的值分組rdd.groupByKey(){ (1,2) , (3, [4,6] ) }
mapValues(f)對鍵值對中的每個值(value)應用一個函數,但不改變鍵(key)rdd.mapValues(x => x+1){ (1,3) , (3,5) , (3,7) }
combineBy Key( createCombiner, mergeValue, mergeCombiners, partitioner)使用不同的返回類型合并具有相同鍵的值下面有詳細講解-
flatMapValues(f)對鍵值對RDD中每個值應用返回一個迭代器的函數,然后對每個元素生成一個對應的鍵值對。常用語符號化rdd.flatMapValues(x => ( x to 5 ))

{ (1, 2) ,? (1, 3) ,?? (1, 4) , (1, 5) ,? (3, 4) , (3, 5) }

keys()獲取所有keyrdd.keys(){1,3,3}
values()獲取所有valuerdd.values(){2,4,6}
sortByKey()根據key排序rdd.sortByKey(){ (1,2) , (3,4) , (3,6) }

?

?

下表總結了針對兩個鍵值對RDD的轉化操作,以rdd1 = { (1,2) , (3,4) , (3,6)?}? rdd2 = { (3,9) }?為例,

函數名目的示例結果
subtractByKey刪掉rdd1中與rdd2的key相同的元素rdd1.subtractByKey(rdd2){ (1,2) }
join內連接rdd1.join(rdd2)

{(3, (4, 9)), (3, (6, 9))}

leftOuterJoin左外鏈接rdd1.leftOuterJoin (rdd2)

{(3,( Some( 4), 9)), (3,( Some( 6), 9))}

rightOuterJoin右外鏈接rdd1.rightOuterJoin(rdd2)

{(1,( 2, None)), (3, (4, Some( 9))), (3, (6, Some( 9)))}

cogroup將兩個RDD鐘相同key的數據分組到一起rdd1.cogroup(rdd2){(1,([ 2],[])), (3, ([4, 6],[ 9]))}

?

?

combineByKey

combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner,mapSideCombine)

combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner)

combineByKey( createCombiner, mergeValue, mergeCombiners)

?

函數功能:

聚合各分區的元素,而每個元素都是二元組。功能與基礎RDD函數aggregate()差不多,可讓用戶返回與輸入數據類型不同的返回值。

combineByKey函數的每個參數分別對應聚合操作的各個階段。所以,理解此函數對Spark如何操作RDD會有很大幫助。

?

參數解析:

createCombiner:分區內?創建組合函數

mergeValue:分區內?合并值函數

mergeCombiners:多分區?合并組合器函數

partitioner:自定義分區數,默認為HashPartitioner

mapSideCombine:是否在map端進行Combine操作,默認為true

?

工作流程:

  1. combineByKey會遍歷分區中的所有元素,因此每個元素的key要么沒遇到過,要么和之前某個元素的key相同。
  2. 如果這是一個新的元素,函數會調用createCombiner創建那個key對應的累加器初始值
  3. 如果這是一個在處理當前分區之前已經遇到的key,會調用mergeCombiners把該key累加器對應的當前value與這個新的value合并

?

代碼例子:

//統計男女個數

1
2
3
4
5
6
7
8
9
10
val conf = new?SparkConf ().setMaster ("local").setAppName ("app_1")
???val sc = new?SparkContext (conf)
???val people = List(("男", "李四"), ("男", "張三"), ("女", "韓梅梅"), ("女", "李思思"), ("男", "馬云"))
???val rdd = sc.parallelize(people,2)
???val result = rdd.combineByKey(
?????(x: String) => (List(x), 1),? //createCombiner
?????(peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), //mergeValue
?????(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) //mergeCombiners
???result.foreach(println)

結果

(男, ( List( 張三,? 李四,? 馬云),3 ) )
(女, ( List( 李思思,? 韓梅梅),2 ) )

?

流程分解:

Spark算子-combineByKey

?

解析:兩個分區,分區一按順序V1、V2、V3遍歷

  • V1,發現第一個key=男時,調用createCombiner,即
    (x: String) => (List(x), 1)
  • V2,第二次碰到key=男的元素,調用mergeValue,即
    (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1)
  • V3,發現第一個key=女,繼續調用createCombiner,即
    (x: String) => (List(x), 1)
  • … …
  • 待各V1、V2分區都計算完后,數據進行混洗,調用mergeCombiners,即
    (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))

?


?

add by jan 2017-02-27?18:34:39

以下例子都基于此RDD

1
2
3
4
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)

reduceByKey(func)

reduceByKey(func)的功能是,使用func函數合并具有相同鍵的值。

比如,reduceByKey((a,b) => a+b),有四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),對具有相同key的鍵值對進行合并后的結果就是:("spark",3)、("hadoop",8)。可以看出,(a,b) => a+b這個Lamda表達式中,a和b都是指value,比如,對于兩個具有相同key的鍵值對("spark",1)、("spark",2),a就是1,b就是2。

1
2
3
4
scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)

  

groupByKey()

roupByKey()的功能是,對具有相同鍵的值進行分組。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),采用groupByKey()后得到的結果是:("spark",(1,2))和("hadoop",(3,5))。

1
2
3
4
5
6
7
scala> pairRDD.groupByKey()
res15:?org.apache.spark.rdd.RDD[(String, Iterable[Int])]?=?ShuffledRDD[15] at groupByKey at <console>:34
//從上面執行結果信息中可以看出,分組后,value被保存到Iterable[Int]中
scala> pairRDD.groupByKey().foreach(println)
(Spark,CompactBuffer(1,?1))
(Hive,CompactBuffer(1))
(Hadoop,CompactBuffer(1))

  

keys

keys只會把鍵值對RDD中的key返回形成一個新的RDD。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)構成的RDD,采用keys后得到的結果是一個RDD[Int],內容是{"spark","spark","hadoop","hadoop"}。

1
2
3
4
5
6
7
scala> pairRDD.keys
res17:?org.apache.spark.rdd.RDD[String]?=?MapPartitionsRDD[17] at keys at <console>:34
scala> pairRDD.keys.foreach(println)
Hadoop
Spark
Hive
Spark

  

values

?values只會把鍵值對RDD中的value返回形成一個新的RDD。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)構成的RDD,采用keys后得到的結果是一個RDD[Int],內容是{1,2,3,5}。

1
2
3
4
5
6
7
8
scala> pairRDD.values
res0:?org.apache.spark.rdd.RDD[Int]?=?MapPartitionsRDD[2] at values at <console>:34
??
scala> pairRDD.values.foreach(println)
1
1
1
1

  

sortByKey()

?sortByKey()的功能是返回一個根據鍵排序的RDD。

1
2
3
4
5
6
7
scala> pairRDD.sortByKey()
res0:?org.apache.spark.rdd.RDD[(String, Int)]?=?ShuffledRDD[2] at sortByKey at <console>:34
scala> pairRDD.sortByKey().foreach(println)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)

  

mapValues(func)

我們經常會遇到一種情形,我們只想對鍵值對RDD的value部分進行處理,而不是同時對key和value進行處理。對于這種情形,Spark提供了mapValues(func),它的功能是,對鍵值對RDD中的每個value都應用一個函數,但是,key不會發生變化。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)構成的pairRDD,如果執行pairRDD.mapValues(x => x+1),就會得到一個新的鍵值對RDD,它包含下面四個鍵值對("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。?

1
2
3
4
5
6
7
scala> pairRDD.mapValues(x?=> x+1)
res2:?org.apache.spark.rdd.RDD[(String, Int)]?=?MapPartitionsRDD[4] at mapValues at <console>:34
scala> pairRDD.mapValues(x?=> x+1).foreach(println)
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)

  

join

join(連接)操作是鍵值對常用的操作。“連接”(join)這個概念來自于關系數據庫領域,因此,join的類型也和關系數據庫中的join一樣,包括內連接(join)、左外連接(leftOuterJoin)、右外連接(rightOuterJoin)等。最常用的情形是內連接,所以,join就表示內連接。
對于內連接,對于給定的兩個輸入數據集(K,V1)和(K,V2),只有在兩個數據集中都存在的key才會被輸出,最終得到一個(K,(V1,V2))類型的數據集。

比如,pairRDD1是一個鍵值對集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一個鍵值對集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的結果就是一個新的RDD,這個新的RDD是鍵值對集合{("spark",1,"fast"),("spark",2,"fast")}。對于這個實例,我們下面在spark-shell中運行一下:

1
2
3
4
5
6
7
8
9
10
11
12
scala>?val?pairRDD1?=?sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
pairRDD1:?org.apache.spark.rdd.RDD[(String, Int)]?=?ParallelCollectionRDD[24] at parallelize at <console>:27
??
scala>?val?pairRDD2?=?sc.parallelize(Array(("spark","fast")))
pairRDD2:?org.apache.spark.rdd.RDD[(String, String)]?=?ParallelCollectionRDD[25] at parallelize at <console>:27
??
scala> pairRDD1.join(pairRDD2)
res9:?org.apache.spark.rdd.RDD[(String, (Int, String))]?=?MapPartitionsRDD[28] at join at <console>:32
??
scala> pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/390456.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/390456.shtml
英文地址,請注明出處:http://en.pswp.cn/news/390456.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

無服務器架構_如何開始使用無服務器架構

無服務器架構Traditionally, when you wanted to build a web app or API, you’d usually have to spend significant time and effort managing servers and ensuring your app scales up to handle large request volumes. Serverless is a cloud computing model which let…

WPF中的動畫——(一)基本概念

原文:WPF中的動畫——&#xff08;一&#xff09;基本概念WPF的一個特點就是支持動畫&#xff0c;我們可以非常容易的實現漂亮大方的界面。首先&#xff0c;我們來復習一下動畫的基本概念。計算機中的動畫一般是定格動畫&#xff0c;也稱之為逐幀動畫&#xff0c;它通過每幀不同…

cloud 異步遠程調用_異步遠程工作的意外好處-以及如何擁抱它們

cloud 異步遠程調用In this article, Ill discuss the positive aspects of being a little out of sync with your team.在本文中&#xff0c;我將討論與您的團隊有點不同步的積極方面。 So you’ve started working from home.因此&#xff0c;您已經開始在家工作。 There …

linux 問題一 apt-get install 被 lock

問題&#xff1a; sudo apt-get install vim E: Could not get lock /var/lib/dpkg/lock - open (11: Resource temporarily unavailable)E: Unable to lock the administration directory (/var/lib/dpkg/), is another process using it? 解決&#xff1a; sudo rm /var/cac…

工信部高級軟件工程師_作為新軟件工程師的信

工信部高級軟件工程師Dear Self, 親愛的自我&#xff0c; You just graduated and you are ready to start your career in the IT field. I cannot spoil anything, but I assure you it will be an interesting ride. 您剛剛畢業&#xff0c;就可以開始在IT領域的職業了。 我…

Python高級網絡編程系列之基礎篇

一、Socket簡介 1、不同電腦上的進程如何通信&#xff1f; 進程間通信的首要問題是如何找到目標進程&#xff0c;也就是操作系統是如何唯一標識一個進程的&#xff01; 在一臺電腦上是只通過進程號PID&#xff0c;但在網絡中是行不通的&#xff0c;因為每臺電腦的IP可能都是不一…

多線程編程和單線程編程_生活與編程的平行線程

多線程編程和單線程編程I’m convinced our deepest desire is, by paying the cost of time, to be shown a glimmer of some fundamental truth about the universe. To hear it whisper its lessons and point towards its purpose.我堅信&#xff0c;我們最深切的愿望是通過…

劍指 Offer 67. 把字符串轉換成整數

寫一個函數 StrToInt&#xff0c;實現把字符串轉換成整數這個功能。不能使用 atoi 或者其他類似的庫函數。 首先&#xff0c;該函數會根據需要丟棄無用的開頭空格字符&#xff0c;直到尋找到第一個非空格的字符為止。 當我們尋找到的第一個非空字符為正或者負號時&#xff0c…

搭建MSSM框架(Maven+Spring+Spring MVC+MyBatis)

https://github.com/easonjim/ssm-framework 先欠著&#xff0c;后續再進行講解&#xff1a; 一、Spring內核集成 二、Spring MVC集成 三、MyBatis集成 四、代碼生成工具集成 >如有問題&#xff0c;請聯系我&#xff1a;easonjim#163.com&#xff0c;或者下方發表評論。<…

4.RabbitMQ Linux安裝

這里使用的Linux是CentOS6.2 將/etc/yum.repo.d/目錄下的所有repo文件刪除 先下載epel源 # wget -O /etc/yum.repos.d/epel-erlang.repo http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo 修改epel-erlang.repo文件&#xff0c;如下圖 添加CentOS 的下載源…

freecodecamp_如何對freeCodeCamp文章提供反饋

freecodecampWe at the freeCodeCamp editorial team do our best to ensure articles are as accurate as they can be.我們的freeCodeCamp編輯團隊竭盡所能&#xff0c;以確保文章盡可能準確。 Still, we occasionally miss factual inaccuracies, non-functioning code exa…

如何對接oracle 建立pdb

Oracle數據庫的結構是一個數據庫實例下有許多用戶&#xff0c;每一個用戶有自己的表空間&#xff0c;即每一個用戶相當于MySQL中的一個數據庫。不久前下了oracle 12c的數據庫&#xff0c;安裝之后建user時才知道oracle12c 有一個很大的變動就是引入了pdb可插入數據庫&#xff0…

二、數據庫設計與操作

一、 數據庫設計仿QQ數據庫一共包括5張數據表&#xff0c;每張數據表結構如下&#xff1a;1、 tb_User&#xff08;用戶信息表&#xff09;這張表主要用來存儲用戶的好友關系與信息字段名數據類型是否Null值默認值綁定描述IDint否用戶賬號PwdVarchar(50)否用戶密碼Frie…

hdu 過山車_從機械工程師到軟件開發人員–我的編碼過山車

hdu 過山車There arent many people out there who grew up dreaming of writing code. I definitely didnt. I wanted to design cars. But somehow I ended up building software.很少有人夢見編寫代碼。 我絕對沒有。 我想設計汽車。 但是我最終以某種方式開發了軟件。 I u…

mysql 兩列互換

mysql 如果想互換兩列的值&#xff0c;直接寫 update 表 set col1col2&#xff0c;col2col1 這樣的后果就是兩列都是 col2 的值 注意這和sql server 是不同的&#xff0c; 如果想實現上述功能&#xff0c;添加一個自增列作為標識&#xff08;必須的&#xff09;&#xff0c; u…

劍指 Offer 36. 二叉搜索樹與雙向鏈表

輸入一棵二叉搜索樹&#xff0c;將該二叉搜索樹轉換成一個排序的循環雙向鏈表。要求不能創建任何新的節點&#xff0c;只能調整樹中節點指針的指向。 為了讓您更好地理解問題&#xff0c;以下面的二叉搜索樹為例&#xff1a; 我們希望將這個二叉搜索樹轉化為雙向循環鏈表。鏈表…

游戲引擎開發和物理引擎_視頻游戲開發的最佳游戲引擎

游戲引擎開發和物理引擎In this article, well look at some of the most popular game engines for video game development. Youll get a brief overview of each engine so you can choose which to use for your project.在本文中&#xff0c;我們將介紹一些用于視頻游戲開…

TPS和QPS的區別和理解

TPS和QPS的區別和理解 原創 2016年04月26日 17:11:3114010QPS&#xff1a;Queries Per Second意思是“每秒查詢率”&#xff0c;是一臺服務器每秒能夠相應的查詢次數&#xff0c;是對一個特定的查詢服務器在規定時間內所處理流量多少的衡量標準。 TPS&#xff1a;是Transaction…

1893. 檢查是否區域內所有整數都被覆蓋

theme: healer-readable 給你一個二維整數數組 ranges 和兩個整數 left 和 right 。每個 ranges[i] [starti, endi] 表示一個從 starti 到 endi 的 閉區間 。 如果閉區間 [left, right] 內每個整數都被 ranges 中 至少一個 區間覆蓋&#xff0c;那么請你返回 true &#xff…

004-docker常用命令[二]-容器操作ps,top,attach,export

2.3、容器操作 2.3.1、docker ps docker ps : 列出容器 語法 docker ps [OPTIONS] OPTIONS說明&#xff1a; -a :顯示所有的容器&#xff0c;包括未運行的。 -f :根據條件過濾顯示的內容。 --format :指定返回值的模板文件。 -l :顯示最近創建的容器。 -n :列出最近創建的n…