Spark的核心就是RDD,對SPARK的使用入門也就是對RDD的使用,包括action和transformation
對于Java的開發者,單單看文檔根本是沒有辦法理解每個API的作用的,所以每個SPARK的新手,最好按部就班直接學習scale,
?那才是一個高手的必經之路,但是由于項目急需使用,沒有閑工夫去學習一門語言,只能從JAVA入門的同學,
?福利來了。。。。
對API的解釋:
1.1?transform
l??map(func):對調用map的RDD數據集中的每個element都使用func,然后返回一個新的RDD,這個返回的數據集是分布式的數據集
l??filter(func) :?對調用filter的RDD數據集中的每個元素都使用func,然后返回一個包含使func為true的元素構成的RDD
l??flatMap(func):和map差不多,但是flatMap生成的是多個結果
l??mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition
l??mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index
l??sample(withReplacement,faction,seed):抽樣
l??union(otherDataset):返回一個新的dataset,包含源dataset和給定dataset的元素的集合
l??distinct([numTasks]):返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element
l??groupByKey(numTasks):返回(K,Seq[V]),也就是Hadoop中reduce函數接受的key-valuelist
l??reduceByKey(func,[numTasks]):就是用一個給定的reduce func再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數
l??sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean類型
1.2?action
l??reduce(func):說白了就是聚集,但是傳入的函數是兩個參數輸入返回一個值,這個函數必須是滿足交換律和結合律的
l??collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組
l??count():返回的是dataset中的element的個數
l??first():返回的是dataset中的第一個元素
l??take(n):返回前n個elements
l??takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素,隨機種子seed
l??saveAsTextFile(path):把dataset寫到一個text file中,或者hdfs,或者hdfs支持的文件系統中,spark把每條記錄都轉換為一行記錄,然后寫到file中
l??saveAsSequenceFile(path):只能用在key-value對上,然后生成SequenceFile寫到本地或者hadoop文件系統
l??countByKey():返回的是key對應的個數的一個map,作用于一個RDD
l??foreach(func):對dataset中的每個元素都使用func
以下是案例:
package com.leoao;
import org.apache.spark.SparkConf;
/**
* Created by chengtao on 16/12/27.
*/
public class Test2 {
public static void main( String[] args ) {
SparkConf conf = new SparkConf().setAppName("App").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// JavaRDD<String> rdd = sc.textFile("/Users/chengtao/downloads/worldcount/ctTest.txt");
String C = "c 3";
String D = "d 4";
String E = "e 5";
ArrayList<String> listA = new ArrayList<String>();
listA.add("a 1");
listA.add("b 2");
listA.add(C);
listA.add(D);
listA.add(E);
JavaRDD<String> rdd = sc.parallelize(listA);
System.out.println("listA ----> " + listA); // listA ----> [a 1, b 2, c 3, d 4, e 5]
List list = rdd.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println("rdd ----> " + list.get(i));
}
// rdd ----> a 1
// rdd ----> b 2
// rdd ----> c 3
// rdd ----> d 4
// rdd ----> e 5
ArrayList<String> listb = new ArrayList<String>();
listb.add("aa 11");
listb.add("bb 22");
listb.add(C);
listb.add(D);
listb.add(E);
JavaRDD<String> rdd2 = sc.parallelize(listb);
// -------transform
testSparkCoreApiMap(rdd);
testSparkCoreApiFilter(rdd);
testSparkCoreApiFlatMap(rdd);
testSparkCoreApiUnion(rdd,rdd2);
testSparkCoreApiDistinct(rdd,rdd2);
testSparkCoreApiMaptoPair(rdd);
testSparkCoreApiGroupByKey(rdd,rdd2);
testSparkCoreApiReduceByKey(rdd);
// -------action
testSparkCoreApiReduce(rdd);
}
//Map主要是對數據進行處理,不進行數據集的增減:本案例實現,打印所有數據,并在結束加上"test"
private static void testSparkCoreApiMap(JavaRDD<String> rdd){
JavaRDD<String> logData1=rdd.map(new Function<String,String>(){
public String call(String s){
return s + " test";
}
});
List list = logData1.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*方法輸出:
a 1 test
b 2 test
c 3 test
d 4 test
e 5 test
*/
//filter主要是過濾數據的功能,本案例實現:過濾含有a的那行數據
private static void testSparkCoreApiFilter(JavaRDD<String> rdd){
JavaRDD<String> logData1=rdd.filter(new Function<String,Boolean>(){
public Boolean call(String s){
if(!(s.contains("a"))){
return true;
}
//return (s.split(" "))[0].equals("a");
return false;
}
});
List list = logData1.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*方法輸出:
b 2
c 3
d 4
e 5
*/
//flatMap 用戶行轉列,本案例實現:打印所有的字符
private static void testSparkCoreApiFlatMap(JavaRDD<String> rdd){
JavaRDD<String> words=rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
List list = words.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*方法輸出:
a
1
b
2
c
3
d
4
e
5
*/
//合并兩個RDD
private static void testSparkCoreApiUnion(JavaRDD<String> rdd,JavaRDD<String> rdd2){
JavaRDD<String> unionRdd=rdd.union(rdd2);
unionRdd.foreach(new VoidFunction<String>(){
public void call(String lines){
System.out.println(lines);
}
});
}
/*方法輸出:
a 1
b 2
c 3
d 4
e 5
aa 11
bb 22
c 3
d 4
e 5
*/
//對RDD去重
private static void testSparkCoreApiDistinct(JavaRDD<String> rdd,JavaRDD<String> rdd2){
JavaRDD<String> unionRdd=rdd.union(rdd2).distinct();
unionRdd.foreach(new VoidFunction<String>(){
public void call(String lines){
System.out.println(lines);
}
});
}
/*方法輸出:
e 5
d 4
c 3
aa 11
a 1
bb 22
b 2
*/
//把RDD映射為鍵值對類型的數據
private static void testSparkCoreApiMaptoPair(JavaRDD<String> rdd){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], st[1]);
}
});
pairRdd.foreach(new VoidFunction<Tuple2<String, Integer>>(){
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._2());
}
});
}
/*方法輸出:
1
3
2
4
5
*/
// 對鍵值對類型的數據進行按鍵值合并
private static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd,JavaRDD<String> rdd1){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});
JavaPairRDD<String, Integer> pairRdd1=rdd1.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});
JavaPairRDD<String, Iterable<Integer>> pairrdd2= pairRdd.union(pairRdd1).groupByKey();
pairrdd2.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){
@Override
public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
Iterable<Integer> iter = t._2();
for (Integer integer : iter) {
System.out.println(integer);
}
}
});
}
/*方法輸出:
5
5
1
4
4
11
22
2
3
3
*/
//對鍵值對進行按鍵相同的對值進行操作
private static void testSparkCoreApiReduceByKey(JavaRDD<String> rdd){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});
JavaPairRDD<String, Integer> pairrdd2 =pairRdd.union(pairRdd).reduceByKey(
new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}
).sortByKey() ;
pairrdd2.foreach(new VoidFunction<Tuple2<String, Integer>>(){
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._2());
}
});
}
/*方法輸出:
2
4
6
10
8
*/
// 對RDD進行遞歸調用
private static void testSparkCoreApiReduce(JavaRDD<String> rdd){
//由于原數據是String,需要轉為Integer才能進行reduce遞歸
JavaRDD<Integer> rdd1=rdd.map(new Function<String,Integer>(){
@Override
public Integer call(String v1) throws Exception {
return Integer.valueOf(v1.split(" ")[1]);
}
});
Integer a= rdd1.reduce(new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1,Integer v2) throws Exception {
return v1+v2;
}
});
System.out.println("a ----> " + a);
}
/*方法輸出:
a ----> 15
*/
}
?
?