Spark算子reduceByKey深度解析

原文地址:http://blog.csdn.net/qq_23660243/article/details/51435257

--------------------------------------------

最近經常使用到reduceByKey這個算子,懵逼的時間占據多數,所以沉下心來翻墻上國外的帖子仔細過了一遍,發現一篇不錯的,在此加上個人的理解整體過一遍這個算子,那么我們開始:

國外的大牛一上來給出這么一句話,個人感覺高度概括了reduceByKey的功能:

[plain]?view plain?copy
?print?在CODE上查看代碼片派生到我的代碼片
  1. Spark?RDD?reduceByKey?function?merges?the?values?for?each?key???
  2. using?an?associative?reduce?function.【Spark的RDD的reduceByKey??
  3. 是使用一個相關的函數來合并每個key的value的值的一個算子(那么主??
  4. 干就是reduceByKey是個算子/函數)】。??

那么這就基本奠定了reduceByKey的作用域是key-value類型的鍵值對,并且是只對每個key的value進行處理,如果含有多個key的話,那么就對多個values進行處理。這里的函數是我們自己傳入的,也就是說是可人為控制的【其實這是廢話,人為控制不了這算子一點用沒有】。那么舉個例子:

??

scala> val x = sc.parallelize(Array(("a", 1), ("b", 1), ("a", 1),| ("a", 1), ("b", 1), ("b", 1),| ("b", 1), ("b", 1)), 3)

我們創建了一個Array的字符串,并把其存入Spark的集群上,設置了三個分區【這里我們不關注分區,只關注操作】。那么我們調用reduceByKey并且傳入函數進行相應操作【本處我們對相同key的value進行相加操作,類似于統計單詞出現次數】:

scala> val y = x.reduceByKey((pre, after) => (pre + after))
這里兩個參數我們邏輯上讓他分別代表同一個key的兩個不同values,那么結果想必大家應該猜到了:

scala> y.collect
res0: Array[(String, Int)] = Array((a,3), (b,5))
嗯,到這里大家對reduceByKey有了初步的認識和體會。論壇中有一段寫的也很有幫助,由于英文不好怕翻譯過來誤導大家,所以每次附上原話:

[plain]?view plaincopy
print?在CODE上查看代碼片派生到我的代碼片
  1. Basically?reduceByKey?function?works?only?for?RDDs?which?contains?key?and?value?pairs?kind?of??
  2. ?elements(i.e?RDDs?having?tuple?or?Map?as?a?data?element).?It?is?a?transformation?operation???
  3. which?means?it?is?lazily?evaluated.We?need?to?pass?one?associative?function?as?a?parameter,???
  4. which?will?be?applied?to?the?source?RDD?and?will?create?anew?RDD?as?with?resulting?values(i.e.??
  5. key?value?pair).?This?operation?is?a?wide?operation?as?data?shuffling?may?happen?across?the???
  6. partitions.【本質上來講,reduceByKey函數(說算子也可以)只作用于包含key-value的RDDS上,它是??
  7. transformation類型的算子,這也就意味著它是懶加載的(就是說不調用Action的方法,是不會去計算的??
  8. ),在使用時,我們需要傳遞一個相關的函數作為參數,這個函數將會被應用到源RDD上并且創建一個新的??
  9. RDD作為返回結果,這個算子作為data?Shuffling?在分區的時候被廣泛使用】??

看到這大家對這個算子應該有了更加深入的認識,那么再附上我的Scala的一個小例

子,同樣是統計字母出現次數:

[plain]?view plaincopy
print?在CODE上查看代碼片派生到我的代碼片
  1. import?org.apache.spark.{SparkContext,?SparkConf}??
  2. ??
  3. /**??
  4. ?*?mhc??
  5. ?*?Created?by?Administrator?on?2016/5/17.??
  6. ?*/??
  7. object?MyTest?{??
  8. ??def?main(args:?Array[String])?{??
  9. ????val?conf?=?new?SparkConf().setAppName("MyTestApp").setMaster("local[1]")??
  10. ????val?sc?=?new?SparkContext(conf)??
  11. ????val?x?=?sc.parallelize(List("a",?"b",?"a",?"a",?"b",?"b",?"b",?"b"))??
  12. ????val?s?=?x.map((_,?1))??
  13. ????val?result?=?s.reduceByKey((pre,?after)?=>?pre?+?after)??
  14. ????println(result.collect().toBuffer)??
  15. ??
  16. ??}??
  17. }??

結果是:ArrayBuffer((a,3), (b,5)),很簡單對吧。論壇給出了java和python的版本的,如下:

Java:

packagecom.backtobazics.sparkexamples;importjava.util.Arrays;importorg.apache.spark.api.java.JavaPairRDD;
importorg.apache.spark.api.java.JavaRDD;
importorg.apache.spark.api.java.JavaSparkContext;
importorg.apache.spark.api.java.function.Function2;importscala.Tuple2;public classReduceByKeyExample {public static void main(String[] args) throws Exception {JavaSparkContext sc = new JavaSparkContext();//Reduce Function for sum Function2<Integer, Integer, Integer> reduceSumFunc = (accum, n) -> (accum + n);// Parallelized with 2 partitions JavaRDD<String> x = sc.parallelize(Arrays.asList("a", "b", "a", "a", "b", "b", "b", "b"),3);// PairRDD parallelized with 3 partitions// mapToPair function will map JavaRDD to JavaPairRDD JavaPairRDD<String, Integer> rddX = x.mapToPair(e -> newTuple2<String,Integer>(e, 1));// New JavaPairRDD JavaPairRDD<String, Integer> rddY = rddX.reduceByKey(reduceSumFunc);//Print tuples for(Tuple2<String, Integer> element : rddY.collect()){System.out.println("("+element._1+", "+element._2+")");}}
}// Output:// (b, 5)// (a, 3) 
python:

 Bazic reduceByKey example in python# creating PairRDD x with key value pairs>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),
... ("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)# Applying reduceByKey operation on x>>> y = x.reduceByKey(lambda accum, n: accum + n)
>>> y.collect()
[('b', 5), ('a', 3)]# Define associative function separately >>>def sumFunc(accum, n):
...     return accum + n
...
>>> y = x.reduceByKey(sumFunc)
>>> y.collect()
[('b', 5), ('a', 3)]
感謝大家捧場,客官慢走。



本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/538590.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/538590.shtml
英文地址,請注明出處:http://en.pswp.cn/news/538590.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

繞固定軸分解_3軸 / 5軸 / 3+2到底是什么......??

一、 什么是32定位加工在一個三軸銑削程序執行時&#xff0c;使用五軸機床的兩個旋轉軸將切削刀具固定在一個傾斜的位置&#xff0c;32加工技術的名字也由此而來&#xff0c;這也叫做定位五軸機床&#xff0c;因為第四個軸和第五個軸是用來確定在固定位置上刀具的方向&#xff…

unix環境高級編程 pdf_UNIX環境高級編程——記錄鎖

引言在多進程環境下&#xff0c;多個進程同時讀寫一個文件&#xff0c;如果不進行同步&#xff0c;就可能導致不期待的結果&#xff0c;如后一個進程覆蓋了前一個進程寫的內容。Unix為此提供了一種強大的解決辦法&#xff1a;記錄鎖記錄鎖記錄鎖本質上就是對文件加讀寫鎖&#…

LNMP源碼安裝腳本

LNMP安裝腳本&#xff0c;腳本環境 #LNMP環境搭建centos6.8 2.6.32-696.28.1.el6.x86_64 nginx:1.12.2 mysql:5.6.36 PHP:5.5.36 #!/bin/bash#LNMP環境搭建centos6.8 2.6.32-696.28.1.el6.x86_64 nginx:1.12.2 mysql:5.6.36 PHP:5.5.36trap echo "error line: $LINE…

啟動spark shell

spark集群安裝教程&#xff1a;http://blog.csdn.net/zengmingen/article/details/72123717 啟動spark shell. 在spark安裝目錄bin文件夾下 ./spark-shell --master spark://nbdo1:7077 --executor-memory 2g --total-executor-cores 2 參數說明&#xff1a; --master spark…

python發送excel文件_Python操作Excel, 開發和調用接口,發送郵件

接口開發&#xff1a; importflaskimporttoolsimportjson,redisimportrandom server flask.Flask(__name__)#新建一個服務&#xff0c;把當前這個python文件當做一個服務 ip 118.24.3.40passwordHK139bc&*r redis.Redis(hostip,passwordpassword,port6379,db10, decode_res…

第一個Spark實例:求PI值

向spark提交jar&#xff0c;需要使用 bin下的spark-submit [hadoopnbdo1 bin]$ ./spark-submit --help Usage: spark-submit [options] <app jar | python file> [app arguments] Usage: spark-submit --kill [submission ID] --master [spark://...] Usage: spark-submi…

go conn 讀取byte數組后是否要_【技術推薦】正向角度看Go逆向

Go語言具有開發效率高&#xff0c;運行速度快&#xff0c;跨平臺等優點&#xff0c;因此正越來越多的被攻擊者所使用&#xff0c;其生成的是可直接運行的二進制文件&#xff0c;因此對它的分析類似于普通C語言可執行文件分析&#xff0c;但是又有所不同&#xff0c;本文將會使用…

Confluence 6 選擇一個外部數據庫

2019獨角獸企業重金招聘Python工程師標準>>> 注意&#xff1a; 選擇一個合適的數據庫通常需要花費很多時間。同時 Confluence 自帶的 XML 數據備份和恢復功能通常也不適合合并和備份有大量數據的數據庫。如果你想在系統運行后進行數據合并&#xff0c;你通常需要使用…

spark中saveAsTextFile如何最終生成一個文件

原文地址&#xff1a;http://www.cnblogs.com/029zz010buct/p/4685173.html ----------------------------------------------------------------------- 一般而言&#xff0c;saveAsTextFile會按照執行task的多少生成多少個文件&#xff0c;比如part-00000一直到part-0000n&…

python爬取內容亂碼_python爬取html中文亂碼

環境&#xff1a; python3.6 爬取代碼&#xff1a; import requests url https://www.dygod.net/html/tv/hytv/ req requests.get(url) print(req.text) 爬取結果&#xff1a; / _-如上&#xff0c;title內容出現亂碼&#xff0c;自己感覺應該是編碼的問題&#xff0c;但是不…

前端每日實戰:34# 視頻演示如何用純 CSS 創作在文本前后穿梭的邊框

效果預覽 按下右側的“點擊預覽”按鈕可以在當前頁面預覽&#xff0c;點擊鏈接可以全屏預覽。 https://codepen.io/comehope/pen/qYepNv 可交互視頻教程 此視頻是可以交互的&#xff0c;你可以隨時暫停視頻&#xff0c;編輯視頻中的代碼。 請用 chrome, safari, edge 打開觀看。…

not support mysql_MYSQL出現quot; Client does not support authentication quot;的解決方法

MYSQL 幫助&#xff1a;A.2.3 Client does not support authentication protocolMySQL 4.1 and up uses an authentication protocol based on a password hashing algorithm that is incompatible with that used by older clients. If you upgrade the server to 4.1, attemp…

spark shell中編寫WordCount程序

啟動hdfs 略http://blog.csdn.net/zengmingen/article/details/53006541 啟動spark 略安裝&#xff1a;http://blog.csdn.net/zengmingen/article/details/72123717 spark-shell&#xff1a;http://blog.csdn.net/zengmingen/article/details/72162821準備數據 vi wordcount.t…

初級英語02

做客 1 Diana,i havent seen you for ages,how have you been? 2 would you like something to drink? 3 give my best to your parents. 4 did you hear what happened?whats the matter with him? 5 id like to applogize for leaving so early,i brought a little gift,…

mysql計算機二級選擇題題庫_全國計算機二級mysql數據庫選擇題及答案

全國計算機二級mysql數據庫選擇題及答案選擇題是全國計算機二級mysql考試里的送分題&#xff0c;下面小編為大家帶來了全國計算機二級mysql數據庫選擇題及答案&#xff0c;歡迎大家閱讀&#xff01;全國計算機二級mysql數據庫選擇題及答案1) 函數 max( ) 表明這是一個什么函數?…

git add 撤銷_更科學地管理你的項目,Git 簡明教程(二)

修改文件內容上回說到&#xff0c;我們已經成功創建并提交了一個 README.md 文件到 FirstGit 版本庫中1、修改文件現在我們更改 README.md 內容2、查看版本庫狀態該文件夾內右鍵運行 Git Bash Here執行命令 git statusGit 提示我們的改動還沒有 commit&#xff0c;并且它給出了…

Eclipse中Copy Qualified Name復制類全名解決辦法

原文鏈接&#xff1a;http://www.cnblogs.com/zyh1994/p/6393550.html ----------------------------------------------------------------------------------------------- Eclipse中 用Copy Qualified Name復制類全名時 總是這樣的/struts1/src/me/edu/HelloAction.java很不…

c 連接mysql錯誤信息_使用C語言訪問MySQL數據 —— 連接和錯誤處理

2011-05-09 wcdj可以通過許多不同的編程語言來訪問MySQL&#xff0c;例如&#xff0c;C&#xff0c;C&#xff0c;Java&#xff0c;Perl&#xff0c;Python&#xff0c;Tcl&#xff0c;PHP等。本文主要總結使用C語言接口如何訪問MySQL數據。(一) 連接例程(二) 錯誤處理(一) 連接…

eclipse編寫wordcount提交spark運行

采用集成了scala的eclipse編寫代碼 代碼&#xff1a; package wordcountimport org.apache.spark.SparkConf import org.apache.spark.SparkContextobject WordCount {def main(args: Array[String]): Unit {//非常重要&#xff0c;是通向Spark集群的入口val confnew SparkCon…

gitlab 刪除分支_如何刪除gitlab上默認受保護的master主分支

今天開發在檢查代碼的時候&#xff0c;發現master分支有問題&#xff0c;現在準備刪除此主分支&#xff0c;并且重新提交正確的代碼&#xff0c;不過在刪除時發現&#xff0c;master分支不能被刪除。ps&#xff1a;主分支一般都是線上分支&#xff0c;需要開發確認后并且做好備…