Spark RDD算子介紹

Spark學習筆記總結

01. Spark基礎

1. 介紹

Spark可以用于批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足。

2. Spark-Shell

  1. spark-shell是Spark自帶的交互式Shell程序,用戶可以在該命令行下用scala編寫spark程序。
  2. 直接啟動spark-shell,實質是spark的local模式,在master:8080中并未顯示客戶端連接。
  3. 集群模式:
    /usr/local/spark/bin/spark-shell --master spark://172.23.27.19:7077 --executor-memory 2g --total-executor-cores 2
  4. spark-shell中編寫wordcount
    sc.textFile("hdfs://172.23.27.19:9000/wrd/wc/srcdata/").flatMap(.split(" ")).map((,1)).reduceByKey(+).sortBy(_._2,false).collect

3. RDD介紹與屬性

1. 介紹

RDD(Resilient Distributed Dataset)叫做分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變(創建了內容不可變)、可分區、里面的元素可并行計算的集合。

2. 屬性:

nkfed8Z.png

  1. 由多個分區組成。對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度。
  2. 一個計算函數用于每個分區。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。
  3. RDD之間的依賴關系。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系。數據丟失時,根據依賴重新計算丟失的分區而不是整個分區。
  4. 一個Partitioner,即RDD的分片函數。默認是HashPartition
  5. 分區數據的最佳位置去計算。就是將計算任務分配到其所要處理數據塊的存儲位置。數據本地化。
3. 創建方式:
  1. 可通過并行化scala集合創建RDD
    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
  2. 通過HDFS支持的文件系統創建,RDD里沒有真的數據,只是記錄了元數據
    val rdd2 = sc.textFile("hdfs://172.23.27.19:9000/wrd/wc/srcdata/")

查看該rdd的分區數量
rdd1.partitions.length

3. 基礎的transformation和action

RDD中兩種算子:
transformation轉換,是延遲加載的

常用的transformation:
(1)map、flatMap、filter
(2)intersection求交集、union求并集:注意類型要一致
distinct:去重
(3)join:類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
(4)groupByKey:在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
但是效率reduceByKey較高,因為有一個本地combiner的過程。
(5)cartesian笛卡爾積

常用的action
(1)collect()、count()
(2)reduce:通過func函數聚集RDD中的所有元素
(3)take(n):取前n個;top(2):排序取前兩個
(4)takeOrdered(n),排完序后取前n個

4. 較難的transformation和action

參考《http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html》

(1)mapPartitions(func)和
mapPartitions(func):
獨立地在RDD的每一個分片上運行,但是返回值;foreachPartition(func)也常用,不需要返回值

mapPartitionsWithIndex(func):
可以看到分區的編號,以及該分區數據。
類似于mapPartitions,但func帶有一個整數參數表示分片的索引值,func的函數類型必須是
(Int, Interator[T]) => Iterator[U]

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
val func = (index: Int, iter: Iterator[(Int)]) => {iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator}
rdd1.mapPartitionsWithIndex(func).collect

(2)aggregate
action操作,
第一個參數是初始值,
第二個參數:是2個函數[每個函數都是2個參數(第一個參數:先對個個分區進行的操作, 第二個:對個個分區合并后的結果再進行合并), 輸出一個參數]

例子:

rdd1.aggregate(0)(_+_, _+_)
//前一個是對每一個分區進行的操作,第二個是對各分區結果進行的結果rdd1.aggregate(5)(math.max(_, _), _ + _)
//結果:5 + (5+9) = 19val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
//結果:24或者42val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
//結果01或者10

(3)aggregateByKey
將key值相同的,先局部操作,再整體操作。。和reduceByKey內部實現差不多

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
//結果:Array((dog,12), (cat,17), (mouse,6))

PS:
和reduceByKey(+)調用的都是同一個方法,只是aggregateByKey要底層一些,可以先局部再整體操作。

(4)combineByKey
和reduceByKey是相同的效果,是reduceByKey的底層。
第一個參數x:原封不動取出來, 第二個參數:是函數, 局部運算, 第三個:是函數, 對局部運算后的結果再做運算
每個分區中每個key中value中的第一個值,

val rdd1 = sc.textFile("hdfs://master:9000/wordcount/input/").flatMap(_.split(" ")).map((_, 1))
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd2.collect

第一個參數的含義:
每個分區中相同的key中value中的第一個值
如:
(hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就相當于hello的第一個1, good中的1

val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3.collect
//每個會多加3個10val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val rdd6 = rdd5.zip(rdd4)
val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n)
//將key相同的數據,放入一個集合中

(5)collectAsMap
Action
Map(b -> 2, a -> 1)//將Array的元祖轉換成Map,以后可以通過key取值

val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd.collectAsMap
//可以下一步使用

(6)countByKey
根據key計算key的數量
Action

val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1.countByKey
rdd1.countByValue//將("a", 1)當做一個元素,統計其出現的次數

(7)flatMapValues
對每一個value進行操作后壓平

轉載于:https://www.cnblogs.com/wangrd/p/6216924.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/457215.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/457215.shtml
英文地址,請注明出處:http://en.pswp.cn/news/457215.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

java 寫tb級文件_三管齊下!TB 級文件的上傳性能瞬間被優化 100 倍!

作者 | 中華石杉責編 | 伍杏玲本文經授權轉載石杉的架構筆記(ID:shishan100)這篇文章我們來看看,世界上最優秀的分布式文件系統HDFS,是如何對超大文件的上傳做性能優化的?首先,我們還是通過一張圖來看一下文件上傳的大…

CentOS7下安裝Redis — 單節點

2019獨角獸企業重金招聘Python工程師標準>>> 1. 環境準備 安裝編譯所需要的包: yum install gcc tcl 2. 下載redis http://download.redis.io/releases/redis-3.2.7.tar.gz 3. 安裝redis ## 創建redis的安裝目錄 mkdir /usr/local/redis## 解壓redis tar…

筆記本中美化代碼的方法

這里向大家推薦一個很好用的記筆記軟件,微軟的OneNote,這個筆記軟件,支持分區和分區組的創建,而且入門簡單,界面簡潔,很適合從word過渡過來的人來記筆記! 不過如果直接記筆記,對于程序員來說,可能希望代碼在筆記本上更好看一些,那么應該怎么辦呢?下面提供了在OneNote中,讓代碼…

工具使用——印象(匯總)

作者:桂。 時間:2017-02-09 23:11:30 鏈接:http://www.cnblogs.com/xingshansi/articles/6384097.html 說明:轉載請注明出處,謝謝。 前言 本文僅僅介紹印象筆記的使用,至于挖掘機哪家強,本文不…

java final修飾屬性_Java final關鍵字用來修飾類、方法、屬性

1.final修飾類:這個類不能被繼承。如:String類、StringBuffer類、System類。2.final修飾方法:不能被重寫。如:Object類的getClass()方法。3.final修飾屬性:此屬性就是一個常量,一旦初始化就不可再被賦值。習…

SQL SERVER 數據導出JSON

執行下面的存儲過程: SET ANSI_NULLS ONGOSET QUOTED_IDENTIFIER ONGOCREATE PROCEDURE[dbo].[SerializeJSON](ParameterSQL AS VARCHAR(MAX))ASBEGINDECLARE SQL NVARCHAR(MAX)DECLARE XMLString VARCHAR(MAX)DECLARE XML XMLDECLARE Paramlist NVARCHAR(1000)SET …

JSP+Javabean+Servlet實現用戶注冊

在entity包下新建javabean 也就是實體類User 注意id用 Integer 而不用 int, 因為 int 自動初始化為0 public class User { private Integer id; private String username; private String password; 后面是set和get方法... 在Servlet包下創建servlet 右擊Servlet…

main的方法是Java_Java中的main()方法

在Java中,main()方法是Java應用程序的入口方法,也就是說,程序在運行的時候,第一個執行的方法就是main()方法,這個方法和其他的方法有很大的不同,比如方法的名字必須是main,方法必須是public sta…

深入理解Python的logging模塊:從基礎到高級

在Python編程中,日志記錄是一種重要的調試和錯誤追蹤工具。Python的logging模塊提供了一種靈活的框架,用于發出日志消息,這些消息可以被發送到各種輸出源,如控制臺、文件、HTTP GET/POST位置等。本文將深入探討Python的logging模塊…

http請求連接

1、在Info.plist中添加NSAppTransportSecurity類型Dictionary。2、在NSAppTransportSecurity下添加NSAllowsArbitraryLoads類型Boolean,值設為YES轉載于:https://www.cnblogs.com/liuting-1204/p/5919233.html

數據庫不完全恢復 以及恢復到測試環境:

sample 1: 1.清空歸檔日志 RMAN> crosscheck archivelog all; RMAN> delete achivelog all; 2.清空數據文件。 select name from v$datafile; rm v$datafile 3.恢復數據 ##check file date: ##把db數據恢復到:2017-02-05 00:00:00 ls -lt /ngenprdblog/ ls…

centos7安裝java6_CentOS7.6安裝jdk1.8

2、登錄Linux服務器,通過rz命令將jdk導入服務器如果沒有rz命令 需要先安裝lrzszyum install lrzsz -y3、將jdk壓縮包解壓到指定路徑 -C 指定路徑4、配置環境變量編輯/etc/profile文件 在末尾加上以下內容 wq保存退出source /etc/profile文件 使配置文件生效export J…

ubuntu安裝wkhtmltopdf

下載安裝wkhtmltox系統環境 http://wkhtmltopdf.org/downloads.html wget https://bitbucket.org/wkhtmltopdf/wkhtmltopdf/downloads/wkhtmltox-0.13.0-alpha-7b36694_linux-precise-amd64.deb dpkg -i 安裝包名字 當我把它生成pdf的時候我想讓每個塊都是一頁,經過…

人生苦短,我用python——當我在玩python的時候我玩些什么 -

程序的基本思路 用一個txt文件記錄電腦的一天內累計使用時間累計使用時間超過若干小時就會自動關機程序開機自動運行 為什么我最后選擇了python 想著怎么寫、搜資料的時候就發現Java并不適合,雖然不是不能實現,但有好幾個問題解決起來都有點麻煩。對我這…

IO流的練習5 —— 讀取文件中的字符串,排序后寫入另一文件中

需求:已知s.txt文件中有這樣的一個字符串:“hcexfgijkamdnoqrzstuvwybpl”     請編寫程序讀取數據內容,把數據排序后寫入ss.txt中。分析:   A:讀取文件中的數據   B:把數據存在一個字符串中   C…

java解析未知key json_Gson解析JSON中動態未知字段key的方法

前面一篇文章我介紹了Gson的解析的基本方法。但我們在享受Gson解析的高度封裝帶來的便利時,有時可能會遇到一些特殊情況,比如json數據中的字段key是動態可變的時候,由于Gson是使用靜態注解的方式來設置實體對象的,因此我們很難直接…

Twisted入門教程(5)

2019獨角獸企業重金招聘Python工程師標準>>> 第五部分:由Twited支持的詩歌下載服務客戶端 你可以從這里從頭開始閱讀這個系列 抽象地構建客戶端 在第四部分中,我們構建了第一個使用Twisted的客戶端。它確實能很好地工作,但仍有提高…

Jquery 學習之基礎一

1.添加一個CSS類 $("button").click(function(){ $("#div1").addClass("important blue");}); 2.移除一個類 $("button").click(function(){ $("h1,h2,p").removeClass("blue");}); 3.切換類 $("button&…

**print('人生苦短 我愛Python')**

print(‘人生苦短 我愛Python’) 一、變量 **""" 1.代碼自上而下執行 2_運算符和表達式.一行一句,不要把多個語句寫到一行上,可讀性不好 3中文只能出現在引號里,其他地方不能出現中文 4不能隨意縮進 """**pr…

java線程提高速度_如何在JAVA中減慢線程速度

我有這個類,我在其中運行10次for循環.該類實現了Runnable接口.現在在main()中我創建了2個線程.現在兩個都將循環運行到10.但我想檢查每個線程的循環計數.如果t1超過7,則讓它休眠1秒,以便讓t2完成.但是如何實現這一目標呢?請參閱代碼.我嘗試但看起來完全愚蠢.只是如何…