Spark RDD/Core 編程 API入門系列 之rdd實戰(rdd基本操作實戰及transformation和action流程圖)(源碼)(三)...

本博文的主要內容是:

1、rdd基本操作實戰

2、transformation和action流程圖

3、典型的transformation和action

?

?

?

RDD有3種操作:

1、? Trandformation ?????對數據狀態的轉換,即所謂算子的轉換

2、? Action??? 觸發作業,即所謂得結果的

3、? Contoller? 對性能、效率和容錯方面的支持,如cache、persist、checkpoint

Contoller包括cache、persist、checkpoint。

?

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

傳入類型是T,返回類型是U。

?

?

?

元素之間,為什么reduce操作,要符合結合律和交換律?
答:因為,交換律,不知,哪個數據先過來。所以,必須符合交換律。
在交換律基礎上,想要reduce操作,必須要符合結合律。

/**

* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

RDD.scala(源碼)


這里,新建包com.zhouls.spark.cores

package com.zhouls.spark.cores

/**
* Created by Administrator on 2016/9/27.
*/
object TextLines {

}


下面,開始編代碼

本地模式

自動 ,會寫好

源碼來看,

所以,?val lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\textlines.txt") //通過HadoopRDD以及MapPartitionsRDD獲取文件中每一行的內容本身

?

?

val lineCount = lines.map(line => (line,1)) //每一行變成行的內容與1構成的Tuple


val textLines = lineCount.reduceByKey(_+_)


textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))

?成功!



?現在,將此行代碼,

     textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
 

總結:

本地模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
運行正常,因為在本地模式下,是jvm,但這樣書寫,是不正規的。

?

 

?

集群模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
運行無法通過,因為結果是分布在各個節點上。
 
collect源碼:
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

得出,collect后array中就是一個元素,只不過這個元素是一個Tuple。
Tuple是元組。通過concat合并!


foreach源碼:

/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
  


?

rdd實戰(rdd基本操作實戰)至此!

?

?

?

?

 

?rdd實戰(transformation流程圖)

?拿wordcount為例!

?

啟動hdfs集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

?

?

?啟動spark集群

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

?

?

啟動spark-shell

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g

?

?

scala> val partitionsReadmeRdd = ?sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt")

?或者

?scala> val readmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md")

?scala> ?val partitionsReadmeRdd = readmeRdd.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_,1)

.saveAsTextFile("~/partition1README.txt")

?

注意,~目錄,不是這里。

?

?

?

?為什么,我的,不是這樣的顯示呢?

?

?

?

RDD的transformation和action執行的流程圖

?

?

典型的transformation和action

轉載于:https://www.cnblogs.com/zlslch/p/5913334.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/254337.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/254337.shtml
英文地址,請注明出處:http://en.pswp.cn/news/254337.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

用GDB調試程序

GDB概述GDB 是GNU開源組織發布的一個強大的UNIX下的程序調試工具。或許,各位比較喜歡那種圖形界面方式的,像VC、BCB等IDE的調試,但如果你是在 UNIX平臺下做軟件,你會發現GDB這個調試工具有比VC、BCB的圖形化調試器更強大的功能。所…

燈塔的出現給那些有想法,有能力而又缺乏資金的社區人士提供了一條途徑

2019獨角獸企業重金招聘Python工程師標準>>> 在上個月,BCH社區傳出基于比特幣現金的眾籌平臺Lighthouse(燈塔)正在復活的消息,并且有網友在論壇上貼出了部分網站圖片。當消息被證實為真,官網和項目的審核細…

PID 算法理解

PID 算法 使用環境:受到外界的影響不能按照理想狀態發展。如小車的速度不穩定的調節,盡快達到目標速度。 條件:閉環系統->有反饋 要求:快準狠 分類:位置式、增量式 增量式 輸入:前次速度、前前次速度、前…

C#字符串的基本操作

文章目錄簡介字符串判斷是否相等語法實例字符串比較大小語法實例判斷字符串變量是否包含指定字符或字符串語法實例查找字符串變量中指定字符或字符串出現的位置語法實例取子串語法實例插入子串語法實例刪除子串語法實例替換子串語法實例去除字符串空格語法實例博主寫作不容易&a…

C++利用SOCKET傳送文件

C利用SOCKET傳送文件 /*server.h*/ #pragma comment(lib, "WS2_32") #include <WinSock2.h> #include <iostream> //#include <stdio.h> #include <assert.h> #ifndef COMMONDEF_H #define COMMONDEF_H #define MAX_PACKET_SIZE 10240 …

三種方式在CentOS 7搭建KVM虛擬化平臺

KVM 全稱是基于內核的虛擬機&#xff08;Kernel-based Virtual Machine&#xff09;&#xff0c;它是一個 Linux的一個內核模塊&#xff0c;該內核模塊使得 Linux變成了一個Hypervisor&#xff1a;它由 Quramnet開發&#xff0c;該公司于 2008年被 Red Hat 收購 KVM的整體結構&…

(五)EasyUI使用——datagrid數據表格

DataGrid以表格形式展示數據&#xff0c;并提供了豐富的選擇、排序、分組和編輯數據的功能支持。DataGrid的設計用于縮短開發時間&#xff0c;并且使開發人員不需要具備特定的知識。它是輕量級的且功能豐富。單元格合并、多列標題、凍結列和頁腳只是其中的一小部分功能。具體功…

拾取模型的原理及其在THREE.JS中的代碼實現

1. Three.js中的拾取 1.1. 從模型轉到屏幕上的過程說開 由于圖形顯示的基本單位是三角形&#xff0c;那就先從一個三角形從世界坐標轉到屏幕坐標說起&#xff0c;例如三角形abc 乘以模型視圖矩陣就進入了視點坐標系&#xff0c;其實就是相機所在的坐標系&#xff0c;如下圖&am…

StringBuilder-C#字符串對象

博主寫作不容易&#xff0c;孩子需要您鼓勵 萬水千山總是情 , 先點個贊行不行 在C# 中&#xff0c;string是引用類型&#xff0c;每次改變string類對象的值&#xff0c;即修改字符串變量對應的字符串&#xff0c;都需要在內存中為新的字符串重新分配空間。在默寫特定的情況…

java 19 - 11 異常的注意事項

1 /*2 * 異常注意事項:3 * A:子類重寫父類方法時&#xff0c;子類的方法必須拋出相同的異常或父類異常的子類。(父親壞了,兒子不能比父親更壞)4 * B:如果父類拋出了多個異常,子類重寫父類時,只能拋出相同的異常或者是他的子集,子類不能拋出父類沒有的異常5 * C:如果被重寫的…

數組去重的各種方式對比

數組去重&#xff0c;是一個老生常談的問題了&#xff0c;在各廠的面試中也會有所提及&#xff0c;接下來就來細數一下各種數組去重的方式吧&#xff1b; 對于以下各種方式都統一命名為 unique&#xff0c;公用代碼如下&#xff1a; // 生成一個包含100000個[0,50000)隨機數的數…

Linux平臺Makefile文件的編寫基礎篇和GCC參數詳解

問&#xff1a;gcc中的-I.是什么意思。。。。看到了有的是gcc -I. -I/usr/xxxxx..那個-I.是什么意思呢 最佳答案 答&#xff1a;-Ixxx 的意思是除了默認的頭文件搜索路徑(比如/usr/include等&#xff09;外&#xff0c;同時還在路徑xxx下搜索需要被引用的頭文件。 所以你的gcc …

舊知識打造新技術--AJAX學習總結

AJAX是將舊知識在新思想的容器內進行碰撞產生的新技術&#xff1a;推翻傳統網頁的設計技術。改善用戶體驗的技術。 學習AJAX之初寫過一篇《與Ajax的初次謀面》。當中都僅僅是一些自己淺顯的理解&#xff0c;這次就總結一下它在歷史長河中的重要地位。 【全】 AJAX全稱為Asnychr…

C#數組基本操作

文章目錄簡介數組排序和反轉語法實例查找數組元素語法實例數組元素求和、最大值、最小值、平均值語法實例數組字符串相互轉化語法實例在字符串中查找、刪除字符數組元素語法實例博主寫作不容易&#xff0c;孩子需要您鼓勵 萬水千山總是情 , 先點個贊行不行 簡介 C#提供了許…

redis(一)--認識redis

Redis官網對redis的定義是&#xff1a;“Redis is an open source, BSD licensed, advanced key-value cache and store”&#xff0c;可以看出&#xff0c;Redis是一種鍵值系統&#xff0c;可以用來緩存或存儲數據。Redis是“Remote Dictionary Server”&#xff08;遠程字典服…

轉:如何用gcc編譯生成動態鏈接庫*.so文件 動態庫

轉&#xff1a;如何編譯.so動態庫問&#xff1a;我源文件為main.c, x.c, y.c, z.c,頭文件為x.h,y.h,z.h如何編譯成.so動態庫&#xff1f;編譯器用gcc最好能給出詳細參數解釋&#xff0c;謝謝答&#xff1a;# 聲稱動代連接庫&#xff0c;假設名稱為libtest.sogcc x.c y.c z.c -f…

工業鏡頭的主要參數與選型

文章目錄簡介1、鏡頭的分類(1) 以鏡頭安裝分類(2) 以攝像頭鏡頭規格分類(3) 以鏡頭光圈分類(4) 以鏡頭的視場大小分類(5) 從鏡頭焦距上分2、選擇鏡頭的技術依據(1) 鏡頭的成像尺寸(2) 鏡頭的分辨率(3) 鏡頭焦距與視野角度(4) 光圈或通光量3、變焦鏡頭&#xff08;zoom lens&…

SQLSEVER 中的那些鍵和約束

SQL Server中有五種約束類型&#xff0c;各自是 PRIMARY KEY約束、FOREIGN KEY約束、UNIQUE約束、DEFAULT約束、和CHECK約束。查看或者創建約束都要使用到 Microsoft SQL Server Managment Studio。1. PRIMARY KEY約束 在表中常有一列或多列的組合&#xff0c;其值能唯一標識表…

數據庫 sqlite 進階

http://www.cppblog.com/czy463/archive/2013/12/16/204816.html 董淳光 前序&#xff1a; Sqlite3 的確很好用。小巧、速度快。但是因為非微軟的產品&#xff0c;幫助文檔總覺得不夠。這些天再次研究它&#xff0c;又有一些收獲&#xff0c;這里把我對 sqlite3 的研究列出來&a…

形象的列舉-C# 枚舉

文章目錄簡介例子分析點撥博主寫作不容易&#xff0c;孩子需要您鼓勵 萬水千山總是情 , 先點個贊行不行 簡介 枚舉類型用于聲明一組命名常數。 定義枚舉類型語法格式如下&#xff1a;enum 枚舉數組名{枚舉成員列表};例如&#xff1a; enum week{星期一&#xff0c;星期二…