一、轉換算子和行動算子
1、Transformations轉換算子
1)、概念
Transformations類算子是一類算子(函數)叫做轉換算子,如map、flatMap、reduceByKey等。Transformations算子是延遲執行,也叫懶加載執行。
2)、Transformation類算子
filter :過濾符合條件的記錄數,true保留,false過濾掉
map:將一個RDD中的每個數據項,通過map中的函數映射變為一個新的元素。特點:輸入一條,輸出一條數據。
flatMap:先map后flat。與map類似,每個輸入項可以映射為0到多個輸出項。
sample:隨機抽樣算子,根據傳進去的小數按比例進行又放回或者無放回的抽樣。
reduceByKey:將相同的Key根據相應的邏輯進行處理。
sortByKey/sortBy:作用在K,V格式的RDD上,對Key進行升序或者降序排序。
2、Action行動算子
1)、概念:
Action類算子也是一類算子(函數)叫做行動算子,如foreach,collect,count等。Transformations類算子是延遲執行,Action類算子是觸發執行。一個application應用程序中有幾個Action類算子執行,就有幾個job運行。
2)、Action類算子
count:返回數據集中的元素數。會在結果計算完成后回收到Driver端。
take(n):返回一個包含數據集前n個元素的集合。
first:first=take(1),返回數據集中的第一個元素。
foreach:循環遍歷數據集中的每個元素,運行相應的邏輯。
collect:將計算結果回收到Driver端。
3)、demo:動態統計出現次數最多的單詞個數,過濾掉。
- 一千萬條數據量的文件,過濾掉出現次數多的記錄,并且其余記錄按照出現次數降序排序。
假設有一個records.txt文件
hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark1
hello Spark
hello Spark
hello Spark2
hello Spark
hello Spark
hello Spark
hello Spark3
hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark4
hello Spark
hello Spark
hello Spark5
hello Spark
hello Spark
代碼處理:
package com.bjsxt.demo;import java.util.Arrays;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;
/*** 動態統計出現次數最多的單詞個數,過濾掉。* @author root**/
public class Demo1 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("demo1");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile("./records.txt");JavaRDD<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Iterable<String> call(String t) throws Exception {return Arrays.asList(t.split(" "));}});JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String,String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String t) throws Exception {return new Tuple2<String, Integer>(t, 1);}});JavaPairRDD<String, Integer> sample = mapToPair.sample(true, 0.5);final List<Tuple2<String, Integer>> take = sample.reduceByKey(new Function2<Integer,Integer,Integer>(){/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}}).take(1);System.out.println("take--------"+take);JavaPairRDD<String, Integer> result = mapToPair.filter(new Function<Tuple2<String,Integer>, Boolean>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<String, Integer> v1) throws Exception {return !v1._1.equals(take.get(0)._1);}}).reduceByKey(new Function2<Integer,Integer,Integer>(){/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}});result.foreach(new VoidFunction<Tuple2<String,Integer>>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t);}});jsc.stop();}
}
3、Spark代碼流程
1)、創建SparkConf對象
可以設置Application name。
可以設置運行模式。
可以設置Spark application的資源需求。
2)、創建SparkContext對象
3)、基于Spark的上下文創建一個RDD,對RDD進行處理。
4)、應用程序中要有Action類算子來觸發Transformation類算子執行。
5)、關閉Spark上下文對象SparkContext。
二、Spark持久化算子
1、控制算子
1)、概念
控制算子有三種,cache,persist,checkpoint,以上算子都可以將RDD持久化,持久化單位是partition。cache和persist都是懶執行的。必須有一個action類算子觸發執行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系。
2)、cache
默認將RDD的數據持久化到內存中。cache是懶執行。
注意:chche()=persist()=persist(StorageLevel.Memory_Only)
測試cache文件:
測試代碼:
1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("CacheTest");
3.JavaSparkContext jsc = new JavaSparkContext(conf);
4.JavaRDD<String> lines = jsc.textFile("persistData.txt");
5.
6.lines = lines.cache();
7.long startTime = System.currentTimeMillis();
8.long count = lines.count();
9.long endTime = System.currentTimeMillis();
10.System.out.println("共"+count+ "條數據,"+"初始化時間+cache時間+計算時間="+
11.(endTime-startTime));
12.
13.long countStartTime = System.currentTimeMillis();
14.long countrResult = lines.count();
15.long countEndTime = System.currentTimeMillis();
16.System.out.println("共"+countrResult+ "條數據,"+"計算時間="+ (countEndTime-
17.countStartTime));
18.
19.jsc.stop();
persist:
可以指定持久化的級別。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2“表示有副本數。
持久化級別如下:
2、cache和persist的注意事項
1)、cache和persist都是懶執行,必須有一個action類算子觸發執行。
2)、cache和persist算子的返回值可以賦值給一個變量,在其他job中直接使用這個變量就是使用持久化的數據了。持久化的單位是partition。
3)、cache和persist算子后不能立即緊跟action算子。
4)、cache和persist算子持久化的數據當applilcation執行完成之后會被清除。
錯誤:rdd.cache().count() 返回的不是持久化的RDD,而是一個數值了。
3、checkpoint
checkpoint將RDD持久化到磁盤,還可以切斷RDD之間的依賴關系。checkpoint目錄數據當application執行完之后不會被清除。
- persist(StorageLevel.DISK_ONLY)與Checkpoint的區別?
1)、checkpoint需要指定額外的目錄存儲數據,checkpoint數據是由外部的存儲系統管理,不是Spark框架管理,當application完成之后,不會被清空。
2)、cache() 和persist() 持久化的數據是由Spark框架管理,當application完成之后,會被清空。
3)、checkpoint多用于保存狀態。
- checkpoint 的執行原理:
1)、當RDD的job執行完畢后,會從finalRDD從后往前回溯。
2)、當回溯到某一個RDD調用了checkpoint方法,會對當前的RDD做一個標記。
3)、Spark框架會自動啟動一個新的job,重新計算這個RDD的數據,將數據持久化到HDFS上。
- 優化:對RDD執行checkpoint之前,最好對這個RDD先執行cache,這樣新啟動的job只需要將內存中的數據拷貝到HDFS上就可以,省去了重新計算這一步。
- 使用:
1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("checkpoint");
3.JavaSparkContext sc = new JavaSparkContext(conf);
4.sc.setCheckpointDir("./checkpoint");
5.JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
6.parallelize.checkpoint();
7.parallelize.count();
8.sc.stop();