SparkRDD常用算子實踐(附運行效果圖)

  • 目錄
    • 1、簡單算子說明
    • 2、復雜算子說明

目錄

SparkRDD算子分為兩類:Transformation與Action.
Transformation:即延遲加載數據,Transformation會記錄元數據信息,當計算任務觸發Action時,才會真正開始計算。
Action:即立即加載數據,開始計算。
創建RDD的方式有兩種:
1、通過sc.textFile(“/root/words.txt”)從文件系統中創建 RDD。
2、#通過并行化scala集合創建RDD:val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

1、簡單算子說明

這里先說下簡單的Transformation算子

//通過并行化scala集合創建RDD
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

//查看該rdd的分區數量
rdd1.partitions.length

//map方法同scala中的一樣,將List中的每個數據拿出來做函數運算。
//sortBy:將數據進行排序
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)

//filter:將List中的每個數據進行函數造作,挑選出大于10的值。
val rdd3 = rdd2.filter(_>10)

//collect:將最終結果顯示出來
//flatMap:對數據先進行map操作,再進行flat(碾壓)操作。
rdd4.flatMap(_.split(’ ‘)).collect
運行效果圖
這里寫圖片描述


val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)
val rdd3 = rdd2.filter(_>10)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+”“,true)
val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)
這里寫圖片描述


//intersection求交集
val rdd9 = rdd6.intersection(rdd7)
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 2), (“kitty”, 3)))
val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7)))
這里寫圖片描述


//join
val rdd3 = rdd1.join(rdd2)
這里寫圖片描述
val rdd3 = rdd1.leftOuterJoin(rdd2)
這里寫圖片描述
val rdd3 = rdd1.rightOuterJoin(rdd2)


//union:求并集,注意類型要一致
val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
rdd8.distinct.sortBy(x=>x).collect
這里寫圖片描述


//groupByKey
val rdd3 = rdd1 union rdd2
rdd3.groupByKey
rdd3.groupByKey.map(x=>(x._1,x._2.sum))
這里寫圖片描述


//cogroup
val rdd1 = sc.parallelize(List((“tom”, 1), (“tom”, 2), (“jerry”, 3), (“kitty”, 2)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))
val rdd3 = rdd1.cogroup(rdd2)
val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))
這里寫圖片描述


//cartesian笛卡爾積
val rdd1 = sc.parallelize(List(“tom”, “jerry”))
val rdd2 = sc.parallelize(List(“tom”, “kitty”, “shuke”))
val rdd3 = rdd1.cartesian(rdd2)
這里寫圖片描述


接下來說下簡單的Action算子
val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)

#collect
rdd1.collect

#reduce
val rdd2 = rdd1.reduce(+)

#count
rdd1.count

#top
rdd1.top(2)

#take
rdd1.take(2)

#first(similer to take(1))
rdd1.first

#takeOrdered
rdd1.takeOrdered(3)
這里寫圖片描述

2、復雜算子說明

mapPartitionsWithIndex : 把每個partition中的分區號和對應的值拿出來, 看源碼
val func = (index: Int, iter: Iterator[(Int)]) => {
iter.toList.map(x => “[partID:” + index + “, val: ” + x + “]”).iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func).collect
這里寫圖片描述


aggregate

def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: ” + x + “]”).iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func1).collect
###是action操作, 第一個參數是初始值, 二:是2個函數(第一個函數:先對個個分區進行合并, 第二個函數:對個個分區合并后的結果再進行合并)
###0 + (0+1+2+3+4 + 0+5+6+7+8+9)

rdd1.aggregate(0)(_+_, _+_)

這里寫圖片描述


rdd1.aggregate(0)(math.max(, ), _ + _)
###0分別與0和1分區的List元素對比得到每個分區中的最大值,在這里分別是3和7,然后將0+3+7=10
這里寫圖片描述


###5和1比, 得5再和234比得5 –> 5和6789比,得9 –> 5 + (5+9)
rdd1.aggregate(5)(math.max(, ), _ + _)


val rdd3 = sc.parallelize(List(“12”,”23”,”345”,”4567”),2)
rdd3.aggregate(“”)((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
######### “”.length分別與兩個分區元素的length進行比較得到0分區為字符串”2”,1分區為字符串”4”,然而結果返回不分先后,所以結果是24或42
這里寫圖片描述


val rdd4 = sc.parallelize(List(“12”,”23”,”345”,”“),2)
rdd4.aggregate(“”)((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
######## “”.length的為0,與“12”比較后得到字符串“0”,然后字符串“0”再與“23”比較得到最小值為1.
這里寫圖片描述


aggregateByKey

val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2)
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => “[partID:” + index + “, val: ” + x + “]”).iterator
}
pairRDD.mapPartitionsWithIndex(func2).collect
這里寫圖片描述


pairRDD.aggregateByKey(0)(math.max(, ), _ + _).collect
########## 先對0號分區中的各個數據進行操作(拿初始值和各個數據進行比較)得到(cat,5)(mouse,4).然后再對1號分區中的數據進行操作得到(cat,12)(dog,12)(mouse,2)。然后再對兩個分區的數據進行相加得到最終結果
這里寫圖片描述


coalesce
#coalesce(2, false)代表將數據重新分成2個區,不進行shuffle(將數據重新進行隨機分配,數據通過網絡可分配在不同的機器上)
val rdd1 = sc.parallelize(1 to 10, 10)
val rdd2 = rdd1.coalesce(2, false)
rdd2.partitions.length
這里寫圖片描述


repartition
repartition效果等同于coalesce(x, true)


collectAsMap : Map(b -> 2, a -> 1)
val rdd = sc.parallelize(List((“a”, 1), (“b”, 2)))
rdd.collectAsMap
這里寫圖片描述


combineByKey : 和reduceByKey是相同的效果
###第一個參數x:原封不動取出來, 第二個參數:是函數, 局部運算, 第三個:是函數, 對局部運算后的結果再做運算
###每個分區中每個key中value中的第一個值, (hello,1)(hello,1)(good,1)–>(hello(1,1),good(1))–>x就相當于hello的第一個1, good中的1

val rdd1 = sc.textFile(“hdfs://master:9000/wordcount/input/”).flatMap(.split(” “)).map((, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd1.collect
這里寫圖片描述


###當input下有3個文件時(有3個block塊分三個區, 不是有3個文件就有3個block, ), 每個會多加3個10
val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3.collect
這里寫圖片描述


val rdd4 = sc.parallelize(List(“dog”,”cat”,”gnu”,”salmon”,”rabbit”,”turkey”,”wolf”,”bear”,”bee”), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)
這里寫圖片描述


//第一個參數List(_)代表的是將第一個元素轉換為一個List,第 二個參數x: List[String], y: String) => x :+ y,代表將元素y加入到這個list中。第三個參數:(m: List[String], n: List[String]) => m ++ n),代表將兩個分區的各個list合并成新的List。
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)
這里寫圖片描述


countByKey

val rdd1 = sc.parallelize(List((“a”, 1), (“b”, 2), (“b”, 2), (“c”, 2), (“c”, 1)))
rdd1.countByKey
rdd1.countByValue

這里寫圖片描述


filterByRange

val rdd1 = sc.parallelize(List((“e”, 5), (“c”, 3), (“d”, 4), (“c”, 2), (“a”, 1)))
val rdd2 = rdd1.filterByRange(“b”, “d”)
rdd2.collect
這里寫圖片描述


flatMapValues : Array((a,1), (a,2), (b,3), (b,4))
val rdd3 = sc.parallelize(List((“a”, “1 2”), (“b”, “3 4”)))
val rdd4 = rdd3.flatMapValues(_.split(” “))
rdd4.collect
這里寫圖片描述


foldByKey

val rdd1 = sc.parallelize(List(“dog”, “wolf”, “cat”, “bear”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
val rdd3 = rdd2.foldByKey(“”)(+)
這里寫圖片描述


keyBy : 以傳入的參數做key
val rdd1 = sc.parallelize(List(“dog”, “salmon”, “salmon”, “rat”, “elephant”), 3)
val rdd2 = rdd1.keyBy(_.length)
rdd2.collect
這里寫圖片描述


keys values
val rdd1 = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.keys.collect
rdd2.values.collect

這里寫圖片描述


以下是一些方法的英文解釋
#

map(func)
Return a new distributed dataset formed by passing each element of the source through a function func.

filter(func)
Return a new dataset formed by selecting those elements of the source on which func returns true.

flatMap(func)(內部執行順序是從右往左,先執行Map再執行Flat)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.

mapPartitionsWithIndex(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.

sample(withReplacement, fraction, seed)
Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

union(otherDataset)
Return a new dataset that contains the union of the elements in the source dataset and the argument.

intersection(otherDataset)
Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numTasks]))
Return a new dataset that contains the distinct elements of the source dataset.

groupByKey([numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.

reduceByKey(func, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

sortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

join(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.

cartesian(otherDataset)
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

pipe(command, [envVars])
Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions)
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartitionAndSortWithinPartitions(partitioner)
Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

(K,(Iterable,Iterable))

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/456731.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/456731.shtml
英文地址,請注明出處:http://en.pswp.cn/news/456731.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

six庫是什么

Utilities for writing code that runs on Python 2 and 3”“” 它是一個專門用來兼容 Python 2 和 Python 3 的庫。它解決了諸如 urllib 的部分方法不兼容, str 和 bytes 類型不兼容等“知名”問題。

Kali-linux使用Nessus

Nessus號稱是世界上最流行的漏洞掃描程序,全世界有超過75000個組織在使用它。該工具提供完整的電腦漏洞掃描服務,并隨時更新其漏洞數據庫。Nessus不同于傳統的漏洞掃描軟件,Nessus可同時在本機或遠端上遙控,進行系統的漏洞分析掃描…

HDFS讀寫數據的原理

目錄1 概述2 HDFS寫數據流程3 HDFS讀數據流程 目錄 最近由于要準備面試,就把之前學過的東西好好整理下,權當是復習。 下面說下HDFS讀寫數據的原理。 1 概述 HDFS集群分為兩大角色:NameNode、DataNode NameNode負責管理整個文件系統的元數…

理解列存儲索引

版權聲明:原創作品,謝絕轉載!否則將追究法律責任。 優點和使用場景 SQL Server 內存中列存儲索引通過使用基于列的數據存儲和基于列的查詢處理來存儲和管理數據。 列存儲索引適合于主要執行大容量加載和只讀查詢的數據倉庫工作負荷…

Django項目部署到阿里云服務器上無法發送郵件STMP

部署好項目之后發送郵件無法發送,多方查閱之后,解決問題。 阿里云服務器禁用了25端口,導致無法發送郵件。 25端口申請開放的難度很大,直接放棄。 解決: 在 django項目的 settings.py文件中x修改port端口 。

美國誠實簽經驗——IMG全球醫療險,TODO

那么,誠實簽最關鍵的4個要點 是什么呢? 第一,證明你有一定的經濟實力。 可能需要房產、存款等證明,也需要銀行信用卡或借記卡半年流水證明(讓人信服的每月進帳和消費能力)。 這些是為了證明,你可…

大數據開發初學者學習路線

目錄前言導讀:第一章:初識Hadoop第二章:更高效的WordCount第三章:把別處的數據搞到Hadoop上第四章:把Hadoop上的數據搞到別處去第五章:快一點吧,我的SQL第六章:一夫多妻制第七章&…

Python的虛擬環境配置(pyenv+virtualenv)

一、為什么需要配置虛擬環境 Python 2和Python 3之間存在著較大的差異,并且,由于各種原因導致了Python 2和Python 3的長期共存。在實際工作過程中,我們可能會同時用到Python 2和Python 3,因此,也需要經常在Python 2和P…

安卓屏幕適配問題

屏幕適配是根據屏幕密度,dpi為單位的,而不是分辨率。 手機會根據不同手機的密度,自己去不同資源目錄下去找對應的資源 比如:   每個圖片目錄下的圖片資源都是一樣的,只是大小不一樣   比如drawable-sw800dp-mdpi目錄&#xff…

MapReduce原理全剖析

MapReduce剖析圖 如上圖所示是MR的運行詳細過程 首先mapTask讀文件是通過InputFormat(內部是調RecordReader()–>read())來一次讀一行,返回K,V值。(默認是TextInputFormat,還可以輸入其他的類型如:音視頻&…

利用selenium webdriver點擊alert提示框

在進行元素定位時常常遇到這樣的alert框: 那么該如何定位并點擊確定或取消按鈕呢?stackoverflow上找到了這個問題的答案。 OK, Show you the code: 1 driver.findElement(By.id("updateButton")).click(); 2 //pop up w…

Log 日志的使用與重要性

開發過程中出現bug是必不可免的,你會怎樣debug?從第1行代碼開始看么?還是有個文件里面記錄著哪里錯了更方便呢!!!log日志 Python中有個logging模塊可以完成相關信息的記錄,在debug時用它往往事…

webdriver 的三種等待方式

1、顯式等待 一個顯式等待是你定義的一段代碼,用于等待某個條件發生然后再繼續執行后續代碼。 from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWait #…

Django的核心思想ORM

元類實現ORM 1. ORM是什么 ORM 是 python編程語言后端web框架 Django的核心思想,“Object Relational Mapping”,即對象-關系映射,簡稱ORM。 一個句話理解就是:創建一個實例對象,用創建它的類名當做數據表名&#x…

Secondary Namenode的Check point機制以及Namenode、Datanode工作機制說明

目錄前言:1、NameNode的工作機制2、DataNode的工作機制3、Secondary Namenode的Check point機制 目錄 前言: 在說明checkpoint機制之前,先要了解下namenode、datanode的一些功能和職責。 1、NameNode的工作機制 問題場景: 1…

表單驗證的初步實現和省市級聯

1.表單驗證的初步實現 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd"><html xmlns"http://www.w3.org/1999/xhtml" lang"en"><head><meta http-equiv"Conte…

抓包軟件:Charles

修正&#xff1a;手機不必一定連接電腦分享的熱點&#xff0c;只需要手機和電腦在同一個局域網下就可以了&#xff0c;手機代理IP設置為電腦的IP。 之前寫過一篇通過Wireshark進行抓包&#xff0c;分析網絡連接的文章《通過WireShark抓取iOS聯網數據實例分析》&#xff1a;htt…

Hive的相關介紹

目錄前言&#xff1a;1、Hive簡介2、Hive架構3、Hive與Hadoop的關系4、Hive與傳統數據庫對比5、Hive的數據存儲總結&#xff1a; 目錄 前言&#xff1a; 為什么使用Hive 直接使用hadoop所面臨的問題 人員學習成本太高 項目周期要求太短 MapReduce實現復雜查詢邏輯開發難…

數據結構實驗之排序七:選課名單

數據結構實驗之排序七&#xff1a;選課名單 Time Limit: 1000MS Memory Limit: 65536KB Submit Statistic Problem Description 隨著學校規模的擴大&#xff0c;學生人數急劇增加&#xff0c;選課名單的輸出也成為一個繁重的任務&#xff0c;我校目前有在校生3萬多名&#xff0…

Java第五次作業--面向對象高級特性(抽象類和接口)

一、學習要點 認真看書并查閱相關資料&#xff0c;掌握以下內容&#xff1a; 掌握抽象類的設計掌握接口的設計理解簡單工廠設計模式理解抽象類和接口的區別掌握包裝類的應用掌握對象的比較方法和比較器的使用學習使用日期操作類學習匿名內部類的使用二、作業要求 發布一篇隨筆&…