Spark---轉換算子、行動算子、持久化算子

一、轉換算子和行動算子

1、Transformations轉換算子

1)、概念

Transformations類算子是一類算子(函數)叫做轉換算子,如map、flatMap、reduceByKey等。Transformations算子是延遲執行,也叫懶加載執行。

2)、Transformation類算子

filter :過濾符合條件的記錄數,true保留,false過濾掉

map:將一個RDD中的每個數據項,通過map中的函數映射變為一個新的元素。特點:輸入一條,輸出一條數據。

flatMap:先map后flat。與map類似,每個輸入項可以映射為0到多個輸出項。

sample:隨機抽樣算子,根據傳進去的小數按比例進行又放回或者無放回的抽樣。

reduceByKey:將相同的Key根據相應的邏輯進行處理。

sortByKey/sortBy:作用在K,V格式的RDD上,對Key進行升序或者降序排序。

2、Action行動算子

1)、概念:

Action類算子也是一類算子(函數)叫做行動算子,如foreach,collect,count等。Transformations類算子是延遲執行,Action類算子是觸發執行。一個application應用程序中有幾個Action類算子執行,就有幾個job運行。

2)、Action類算子

count:返回數據集中的元素數。會在結果計算完成后回收到Driver端。

take(n):返回一個包含數據集前n個元素的集合。

first:first=take(1),返回數據集中的第一個元素。

foreach:循環遍歷數據集中的每個元素,運行相應的邏輯。

collect:將計算結果回收到Driver端。

3)、demo:動態統計出現次數最多的單詞個數,過濾掉。

  • 一千萬條數據量的文件,過濾掉出現次數多的記錄,并且其余記錄按照出現次數降序排序。

假設有一個records.txt文件

hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark1
hello Spark
hello Spark
hello Spark2
hello Spark
hello Spark
hello Spark
hello Spark3
hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark4
hello Spark
hello Spark
hello Spark5
hello Spark
hello Spark

代碼處理:

package com.bjsxt.demo;import java.util.Arrays;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;
/*** 動態統計出現次數最多的單詞個數,過濾掉。* @author root**/
public class Demo1 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("demo1");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile("./records.txt");JavaRDD<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Iterable<String> call(String t) throws Exception {return Arrays.asList(t.split(" "));}});JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String,String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String t) throws Exception {return new Tuple2<String, Integer>(t, 1);}});JavaPairRDD<String, Integer> sample = mapToPair.sample(true, 0.5);final List<Tuple2<String, Integer>> take = sample.reduceByKey(new Function2<Integer,Integer,Integer>(){/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}}).take(1);System.out.println("take--------"+take);JavaPairRDD<String, Integer> result = mapToPair.filter(new Function<Tuple2<String,Integer>, Boolean>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<String, Integer> v1) throws Exception {return !v1._1.equals(take.get(0)._1);}}).reduceByKey(new Function2<Integer,Integer,Integer>(){/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}});result.foreach(new VoidFunction<Tuple2<String,Integer>>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t);}});jsc.stop();}
}

3、Spark代碼流程

1)、創建SparkConf對象

可以設置Application name。

可以設置運行模式。

可以設置Spark application的資源需求。

2)、創建SparkContext對象

3)、基于Spark的上下文創建一個RDD,對RDD進行處理。

4)、應用程序中要有Action類算子來觸發Transformation類算子執行。

5)、關閉Spark上下文對象SparkContext。

二、Spark持久化算子

1、控制算子

1)、概念

控制算子有三種,cache,persist,checkpoint,以上算子都可以將RDD持久化,持久化單位是partition。cache和persist都是懶執行的。必須有一個action類算子觸發執行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系。

2)、cache

默認將RDD的數據持久化到內存中。cache是懶執行。

注意:chche()=persist()=persist(StorageLevel.Memory_Only)

測試cache文件:

測試代碼:

1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("CacheTest");
3.JavaSparkContext jsc = new JavaSparkContext(conf);
4.JavaRDD<String> lines = jsc.textFile("persistData.txt");
5.
6.lines = lines.cache();
7.long startTime = System.currentTimeMillis();
8.long count = lines.count();
9.long endTime = System.currentTimeMillis();
10.System.out.println("共"+count+ "條數據,"+"初始化時間+cache時間+計算時間="+ 
11.(endTime-startTime));
12.
13.long countStartTime = System.currentTimeMillis();
14.long countrResult = lines.count();
15.long countEndTime = System.currentTimeMillis();
16.System.out.println("共"+countrResult+ "條數據,"+"計算時間="+ (countEndTime-
17.countStartTime));
18.
19.jsc.stop();

persist:

可以指定持久化的級別。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2“表示有副本數。

持久化級別如下:

2、cache和persist的注意事項

1)、cache和persist都是懶執行,必須有一個action類算子觸發執行。

2)、cache和persist算子的返回值可以賦值給一個變量,在其他job中直接使用這個變量就是使用持久化的數據了。持久化的單位是partition。

3)、cache和persist算子后不能立即緊跟action算子。

4)、cache和persist算子持久化的數據當applilcation執行完成之后會被清除。

錯誤:rdd.cache().count() 返回的不是持久化的RDD,而是一個數值了。

3、checkpoint

checkpoint將RDD持久化到磁盤,還可以切斷RDD之間的依賴關系。checkpoint目錄數據當application執行完之后不會被清除。
  • persist(StorageLevel.DISK_ONLY)與Checkpoint的區別?

1)、checkpoint需要指定額外的目錄存儲數據,checkpoint數據是由外部的存儲系統管理,不是Spark框架管理,當application完成之后,不會被清空。

2)、cache() 和persist() 持久化的數據是由Spark框架管理,當application完成之后,會被清空。

3)、checkpoint多用于保存狀態。

  • checkpoint 的執行原理:

1)、當RDD的job執行完畢后,會從finalRDD從后往前回溯。

2)、當回溯到某一個RDD調用了checkpoint方法,會對當前的RDD做一個標記。

3)、Spark框架會自動啟動一個新的job,重新計算這個RDD的數據,將數據持久化到HDFS上。

  • 優化:對RDD執行checkpoint之前,最好對這個RDD先執行cache,這樣新啟動的job只需要將內存中的數據拷貝到HDFS上就可以,省去了重新計算這一步。
  • 使用:
1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("checkpoint");
3.JavaSparkContext sc = new JavaSparkContext(conf);
4.sc.setCheckpointDir("./checkpoint");
5.JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
6.parallelize.checkpoint();
7.parallelize.count();
8.sc.stop();

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/161432.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/161432.shtml
英文地址,請注明出處:http://en.pswp.cn/news/161432.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Jina AI 的 8K 向量模型上線 AWS Marketplace,支持本地部署!

在當前多模態 AI 和大模型技術風頭正勁的背景下&#xff0c;Jina AI 始終領跑于創新前沿&#xff0c;技術領先。2023 年 10 月 30 日&#xff0c;Jina AI 隆重推出 jina-embeddings-v2&#xff0c;這是全球首款支持 8192 輸入長度的開源向量大模型&#xff0c;其性能媲美 OpenA…

匯編-PROC定義子過程(函數)

過程定義 過程用PROC和ENDP偽指令來聲明&#xff0c; 并且必須為其分配一個名字(有效的標識符) 。目前為止&#xff0c; 我們所有編寫的程序都包含了一個main過程&#xff0c; 例如&#xff1a; 當要創建的過程不是程序的啟動過程時&#xff0c; 就用RET指令來結束它。RET強制…

Bean依賴注入注解開發

value Value("xfy")private String userName;private String userName;Value("xiao")public void setUserName(String userName) {this.userName userName;} Autowired // 根據類型進行注入 如果同一類型的Bean有多個&#xff0c;嘗試根基名字進行二次…

AIGC,ChatGPT AI繪畫 Midjourney 注冊流程詳細步驟

AI 繪畫,Midjourney完成高清圖片繪制,輕松掌握AI工具。 前期準備: ① 一個能使用的谷歌賬號 ② 可以訪問外網 Midjourney注冊 1.進入midjourney官網https://www.midjourney.com 點擊左下角”Join the Beta”,就可以注冊,第一次使用的小伙伴會彈出提示,只需要點擊Acc…

2019年12月 Scratch(三級)真題解析#中國電子學會#全國青少年軟件編程等級考試

Scratch等級考試(1~4級)全部真題?點這里 一、單選題(共25題,每題2分,共50分) 第1題 怎樣修改圖章的顏色? A:只需要一個數字來設置顏色 B:設置RGB的值 C:在畫筆中設置顏色、飽和度、亮度 D:在外觀中設置或修改角色顏色特效 答案:D 在外觀中設置或修改角色顏色特…

【深度學習】臉部修復,CodeFormer,論文,實戰

代碼&#xff1a; https://github.com/sczhou/CodeFormer 論文&#xff1a;https://arxiv.org/abs/2206.11253 Towards Robust Blind Face Restoration with Codebook Lookup Transformer 文章目錄 論文摘要1 引言2 相關工作**4 實驗****4.1 數據集****4.2 實驗設置和指標***…

【ArrayList是如何擴容(ArrayList、LinkedList、與Vector的區別)】

ArrayList、LinkedList、與Vector的區別 解讀ArrayList 是一個可改變大小的數組LinkedList 是一個雙向鏈表Vector 屬強同步類 拓展知識面ArrayList是如何擴容&#xff1f;如何利用List實現LRU&#xff1f; 解讀 List主要有ArrayList、LinkedList與Vector幾種實現。這三者都實現…

[論文筆記] Scaling Laws for Neural Language Models

概覽: 一、總結 計算量、數據集大小、模型參數量大小的冪律 與 訓練損失呈現 線性關系。 三個參數同時放大時,如何得到最佳的性能? 更大的模型 需要 更少的樣本 就能達到相同的效果。 </

開源WIFI繼電器之源代碼

源代碼:WiFiRelay: 基于ESP8285的WiFi繼電器代碼

筆記本外接顯示器的一些基本操作

1>&#xff0c;安裝問題直接問客服&#xff0c;正常情況是將顯示屏接上電源&#xff0c;然后用先將顯示屏和筆記本的HDMI接口連接即可。 按下組合鍵 win p ,選擇 “復制”。 2>&#xff0c;接上顯示屏后&#xff0c;原筆記本無聲音&#xff1f; 1、找到筆記本電腦右下…

Doris 建表示例(七)

建表語法 使用 CREATE TABLE 命令建立一個表(Table)。更多詳細參數可以查看&#xff1a; HELP CREATE TABLE; 建表語法&#xff1a; CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name(column_definition1[, column_definition2, ...][, index_definition1[, i…

阿里云99元服務器ECS經濟型e實例性能如何?測評來了

阿里云服務器優惠99元一年&#xff0c;配置為云服務器ECS經濟型e實例&#xff0c;2核2G配置、3M固定帶寬和40G ESSD Entry系統盤&#xff0c;CPU采用Intel Xeon Platinum架構處理器&#xff0c;2.5 GHz主頻&#xff0c;3M帶寬下載速度384KB/秒&#xff0c;上傳速度1028KB/秒&am…

人工智能對我們的生活影響

目錄 前言 一、人工智能的領域 二、人工智能的應用 三、對人工智能的看法 總結 &#x1f308;嗨&#xff01;我是Filotimo__&#x1f308;。很高興與大家相識&#xff0c;希望我的博客能對你有所幫助。 &#x1f4a1;本文由Filotimo__??原創&#xff0c;首發于CSDN&#x1f4…

運算與表達式模板(第一節)

目錄 前言 一、表達式模板簡介 為什么引入表達式模板&#xff1f; 緩式求值&#xff08;Memoization&#xff09; 關系 好處 前言 一個深度學習框架的初步實現為例&#xff0c;討論如何在一個相對較大的項目中深入應用元編程&#xff0c;為系統優化提供更多的可能。 以…

阿里云服務器ECS經濟型e實例優惠99元性能怎么樣?

阿里云服務器ECS經濟型e實例優惠99元性能怎么樣&#xff1f;阿里云服務器優惠99元一年&#xff0c;配置為云服務器ECS經濟型e實例&#xff0c;2核2G配置、3M固定帶寬和40G ESSD Entry系統盤&#xff0c;CPU采用Intel Xeon Platinum架構處理器&#xff0c;2.5 GHz主頻&#xff0…

使用NPOI處理EXCEL文件:例1-關于優化的一些問題

記得有一次處理Excel文件對比&#xff0c;自己前后使用VBA和NPOI對比了下效率。由于涉及到頁面的渲染和刷新&#xff0c;二者的處理速度差了個數量級&#xff08;10多秒和幾十分鐘的差別&#xff09;。當然使用NPOI操作時也做了一定優化。印象這么深刻這次一有需求就想到了NPOI…

千云物流 - 使用k8s負載均衡openelb

openelb的介紹 具體根據官方文檔進行安裝官方文檔,這里作為測試環境的安裝使用. OpenELB 是一個開源的云原生負載均衡器實現,可以在基于裸金屬服務器、邊緣以及虛擬化的 Kubernetes 環境中使用 LoadBalancer 類型的 Service 對外暴露服務。OpenELB 項目最初由 KubeSphere 社區…

redis的性能管理及集群架構(主從復制、哨兵模式)

一、redis的性能管理 1、內存指標info memory 內存指標&#xff08;重要&#xff09; used_memory:853736 數據占用的內存 used_memory_rss:10551296 redis向操作系統申請的內存 used_memory_peak:853736 redis使用內存的峰值 注&#xff1a;單位&#xff1a;字節 系…

策略模式應用(內窺鏡項目播放不同種類的視頻)

新舊代碼對比 策略模式 基本概念 策略模式是一種行為設計模式&#xff0c;它定義了一系列算法&#xff0c;將每個算法封裝起來&#xff0c;并且使它們可以互相替換。策略模式允許客戶端選擇算法的具體實現&#xff0c;而不必改變客戶端的代碼。這樣&#xff0c;客戶端代碼就…

中國區域250米歸一化植被指數數據集(2000-2022)

中國區域250米歸一化植被指數數據集(2000-2022)數據集是中國區域2000至2022年月度歸一化植被指數產品&#xff0c;空間分辨率250米&#xff0c;合成方式采用月最大值合成&#xff0c;每年12期&#xff0c;共275期。本產品是基于Aqua/Terra-MODIS衛星傳感器MOD13Q1產品以及土地利…