spark 快速入門 java API

1.1?transform

l??map(func):對調用map的RDD數據集中的每個element都使用func,然后返回一個新的RDD,這個返回的數據集是分布式的數據集

l??filter(func) :?對調用filter的RDD數據集中的每個元素都使用func,然后返回一個包含使func為true的元素構成的RDD

l??flatMap(func):和map差不多,但是flatMap生成的是多個結果

l??mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition

l??mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index

l??sample(withReplacement,faction,seed):抽樣

l??union(otherDataset):返回一個新的dataset,包含源dataset和給定dataset的元素的集合

l??distinct([numTasks]):返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element

l??groupByKey(numTasks):返回(K,Seq[V]),也就是Hadoop中reduce函數接受的key-valuelist

l??reduceByKey(func,[numTasks]):就是用一個給定的reduce func再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數

l??sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean類型

1.2?action

l??reduce(func):說白了就是聚集,但是傳入的函數是兩個參數輸入返回一個值,這個函數必須是滿足交換律和結合律的

l??collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組

l??count():返回的是dataset中的element的個數

l??first():返回的是dataset中的第一個元素

l??take(n):返回前n個elements

l??takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素,隨機種子seed

l??saveAsTextFile(path):把dataset寫到一個text file中,或者hdfs,或者hdfs支持的文件系統中,spark把每條記錄都轉換為一行記錄,然后寫到file中

l??saveAsSequenceFile(path):只能用在key-value對上,然后生成SequenceFile寫到本地或者hadoop文件系統

l??countByKey():返回的是key對應的個數的一個map,作用于一個RDD

l??foreach(func):對dataset中的每個元素都使用func

以下是案例:

package com.leoao;
import org.apache.spark.SparkConf;
/**
* Created by chengtao on 16/12/27.
*/
public class Test2 {
public static void main( String[] args ) {
SparkConf conf = new SparkConf().setAppName("App").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// JavaRDD<String> rdd = sc.textFile("/Users/chengtao/downloads/worldcount/ctTest.txt");

String C = "c 3";
String D = "d 4";
String E = "e 5";
ArrayList<String> listA = new ArrayList<String>();
listA.add("a 1");
listA.add("b 2");
listA.add(C);
listA.add(D);
listA.add(E);
JavaRDD<String> rdd = sc.parallelize(listA);
System.out.println("listA ----> " + listA); // listA ----> [a 1, b 2, c 3, d 4, e 5]
List list = rdd.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println("rdd ----> " + list.get(i));
}
// rdd ----> a 1
// rdd ----> b 2
// rdd ----> c 3
// rdd ----> d 4
// rdd ----> e 5

ArrayList<String> listb = new ArrayList<String>();
listb.add("aa 11");
listb.add("bb 22");
listb.add(C);
listb.add(D);
listb.add(E);
JavaRDD<String> rdd2 = sc.parallelize(listb);

// -------transform
testSparkCoreApiMap(rdd);
testSparkCoreApiFilter(rdd);
testSparkCoreApiFlatMap(rdd);
testSparkCoreApiUnion(rdd,rdd2);
testSparkCoreApiDistinct(rdd,rdd2);
testSparkCoreApiMaptoPair(rdd);
testSparkCoreApiGroupByKey(rdd,rdd2);
testSparkCoreApiReduceByKey(rdd);
// -------action
testSparkCoreApiReduce(rdd);
}

//Map主要是對數據進行處理,不進行數據集的增減:本案例實現,打印所有數據,并在結束加上"test"
private static void testSparkCoreApiMap(JavaRDD<String> rdd){
JavaRDD<String> logData1=rdd.map(new Function<String,String>(){
public String call(String s){
return s + " test";
}
});
List list = logData1.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*方法輸出:
a 1 test
b 2 test
c 3 test
d 4 test
e 5 test
*/

//filter主要是過濾數據的功能,本案例實現:過濾含有a的那行數據
private static void testSparkCoreApiFilter(JavaRDD<String> rdd){
JavaRDD<String> logData1=rdd.filter(new Function<String,Boolean>(){
public Boolean call(String s){
if(!(s.contains("a"))){
return true;
}
//return (s.split(" "))[0].equals("a");
return false;
}
});
List list = logData1.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*方法輸出:
b 2
c 3
d 4
e 5
*/

//flatMap 用戶行轉列,本案例實現:打印所有的字符
private static void testSparkCoreApiFlatMap(JavaRDD<String> rdd){
JavaRDD<String> words=rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
List list = words.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*方法輸出:
a
1
b
2
c
3
d
4
e
5
*/

//合并兩個RDD
private static void testSparkCoreApiUnion(JavaRDD<String> rdd,JavaRDD<String> rdd2){
JavaRDD<String> unionRdd=rdd.union(rdd2);
unionRdd.foreach(new VoidFunction<String>(){
public void call(String lines){
System.out.println(lines);
}
});
}
/*方法輸出:
a 1
b 2
c 3
d 4
e 5
aa 11
bb 22
c 3
d 4
e 5
*/


//對RDD去重
private static void testSparkCoreApiDistinct(JavaRDD<String> rdd,JavaRDD<String> rdd2){
JavaRDD<String> unionRdd=rdd.union(rdd2).distinct();
unionRdd.foreach(new VoidFunction<String>(){
public void call(String lines){
System.out.println(lines);
}
});
}
/*方法輸出:
e 5
d 4
c 3
aa 11
a 1
bb 22
b 2
*/

//把RDD映射為鍵值對類型的數據
private static void testSparkCoreApiMaptoPair(JavaRDD<String> rdd){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], st[1]);
}

});

pairRdd.foreach(new VoidFunction<Tuple2<String, Integer>>(){
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._2());
}
});
}
/*方法輸出:
1
3
2
4
5
*/


// 對鍵值對類型的數據進行按鍵值合并
private static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd,JavaRDD<String> rdd1){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});

JavaPairRDD<String, Integer> pairRdd1=rdd1.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});

JavaPairRDD<String, Iterable<Integer>> pairrdd2= pairRdd.union(pairRdd1).groupByKey();
pairrdd2.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){
@Override
public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
Iterable<Integer> iter = t._2();
for (Integer integer : iter) {
System.out.println(integer);
}
}
});
}

/*方法輸出:
5
5
1
4
4
11
22
2
3
3
*/


//對鍵值對進行按鍵相同的對值進行操作
private static void testSparkCoreApiReduceByKey(JavaRDD<String> rdd){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});

JavaPairRDD<String, Integer> pairrdd2 =pairRdd.union(pairRdd).reduceByKey(
new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}
).sortByKey() ;
pairrdd2.foreach(new VoidFunction<Tuple2<String, Integer>>(){
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._2());
}
});
}
/*方法輸出:
2
4
6
10
8
*/

// 對RDD進行遞歸調用
private static void testSparkCoreApiReduce(JavaRDD<String> rdd){
//由于原數據是String,需要轉為Integer才能進行reduce遞歸
JavaRDD<Integer> rdd1=rdd.map(new Function<String,Integer>(){
@Override
public Integer call(String v1) throws Exception {
return Integer.valueOf(v1.split(" ")[1]);
}
});

Integer a= rdd1.reduce(new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1,Integer v2) throws Exception {
return v1+v2;
}
});
System.out.println("a ----> " + a);
}
/*方法輸出:
a ----> 15
*/
}










?

轉載于:https://www.cnblogs.com/ctaixw/p/6226187.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:
http://www.pswp.cn/news/371763.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/371763.shtml
英文地址,請注明出處:http://en.pswp.cn/news/371763.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

網頁設計上機考試原題_Dreamweaver上機考試題目dreamweaver試題庫網頁制作試題.doc...

網頁設計上機考試題集注意&#xff1a;所有題目中涉及的素材都在考試文件夾內&#xff0c;其中圖片在下面的pic文件夾中&#xff0c;音樂、flash在media文件夾。1) 在1.html中的頂部添加一個錨點鏈接&#xff0c;點擊之能立即到達頁面最底端。2) 將1.html中的所有鏈接的默認樣式…

35數據結構與算法分析之---最短路徑

本系列是閱讀《數據結構與算法應用實踐教程》第2版 主編 李文書 北京大學出版社 的讀書筆記&#xff0c;加上自己的理解&#xff0c;更多的是學習的記錄與反思&#xff0c;如有不妥&#xff0c;歡迎指正&#xff0c;非常感謝。轉載于:https://www.cnblogs.com/guochaoxxl/p/712…

Quartz 2 Scheduler示例

Quartz是一個開源作業調度框架。 它可用于管理和計劃應用程序中的作業。 步驟1&#xff1a;建立已完成的專案 創建一個Maven項目&#xff0c;如下所示。 &#xff08;可以使用Maven或IDE插件來創建它&#xff09;。 步驟2&#xff1a;圖書館 Quartz依賴項已添加到Maven的po…

sql server 2008 com.microsoft.sqlserver.jdbc.SQLServerException: 通過端口 1433 連接到主機

原內容搬遷到了新網站&#xff0c;給你帶來的不便&#xff0c;敬請諒解&#xff01; 》 http://www.suanliutudousi.com/2017/08/28/sql-server-2008-com-microsoft-sqlserver-jdbc-sqlserverexception-%E9%80%9A%E8%BF%87%E7%AB%AF%E5%8F%A3-1433-%E8%BF%9E%E6%8E%A5%E5%88%B0…

如何通過網線連接兩臺電腦快速傳輸數據?

介紹 我們經常需要拷貝文件會用到類似U盤等工具&#xff0c;但我們有時在傳輸大文件時又苦于沒有&#xff0c;那么大內存的轉存工具。這時候我們就可以通過一條小小的網線連接兩臺電腦&#xff0c;形成一個小的局域網傳輸數據&#xff0c;因為是通過網線傳輸&#xff0c;所以傳…

30分鐘內使用MongoDB

最近&#xff0c;我被NoSQL錯誤咬住了-或是我的同事Mark Atwell提出的“燃燒在哪里&#xff01;” 運動。 盡管我無意于在不久的將來或可預見的將來回避友好的“ SELECT ... WHERE”&#xff0c;但我確實設法弄懂了一些代碼。 在本文中&#xff0c;我分享了我在NoSQL世界中首次…

【Django】--ModelForm組件

ModelForm a.class Meta:model,#對應Model的  fieldsNone,#字段  excludeNone,#排除字段  labelsNone,#提示信息  help_texts None,#幫助提示信息  widgets None,#自定義插件  error_messages None,#自定義錯誤信息(整體錯誤信息from django.core.exceptions im…

mysql實際綜合案例_Mysql綜合案例

Mysql綜合案例考核要點&#xff1a;創建數據表、單表查詢、多表查詢已知&#xff0c;有一個學生表student和一個分數表score&#xff0c;請按要求對這兩個表進行操作。student表和score分數表的表結構分別如表1-1和表1-2所示。表1-1student表結構字段名數據類型主鍵外鍵非空唯一…

2012年I / O之后

從注冊到贈品&#xff0c;每年的I / O瘋狂都在不斷發展。 在今年20分鐘內被出售&#xff0c;并沒有阻止Google贈送更多的東西。 以這種速度并有望在明年發布Google Glass&#xff0c;明年注冊很可能會變得更加混亂&#xff01; 因此&#xff0c;Google&#xff0c;請停止提供免…

h5啟動原生APP總結

許久沒有寫博客了&#xff0c;最近有個H5啟動APP原生頁面的需求&#xff0c;中間遇上一些坑&#xff0c;看了些網上的實現方案&#xff0c;特意來總結下 一、需要判斷客戶端的平臺以及是否在微信瀏覽器中訪問 1、客戶端判斷 在啟動APP時&#xff0c;Android和IOS系統處理的方式…

mysql導入創建表空間_oracle創建表空間 用戶 數據庫導入和導出(轉)

已經安裝orcale 9i 和pl/sql(6.0)OracleJobSchedulerORCL、OracleOraDb10g_home1iSQL*PlusOracleOraDb10g_home1TNSListenerOracleServiceORCL第一個是oem控制臺服務進程第二個是定時器和isql*plus的服務進程第三個是監聽器的服務進程最后是數據庫服務進程1. pl/sql客戶機安裝后…

什么時候使用Apache Camel?

Apache Camel是JVM / Java環境中我最喜歡的開源框架之一。 它可以輕松集成使用多種協議和技術的不同應用程序。 本文介紹了何時使用Apache Camel以及何時使用其他替代方法。 問題&#xff1a;企業應用程序集成&#xff08;EAI&#xff09; 由于新產品和新應用&#xff0c;幾乎…

念整數

念整數&#xff08;5分&#xff09;題目內容&#xff1a; 你的程序要讀入一個整數&#xff0c;范圍是[-100000,100000]。然后&#xff0c;用漢語拼音將這個整數的每一位輸出出來。 如輸入1234&#xff0c;則輸出&#xff1a; yi er san si注意&#xff0c;每個字的拼音之間有一…

python 比較運算符放在列表中_在Python3中將運算符放在列表中

我想把操作符作為一個列表&#xff0c;然后從列表中調用一個元素作為操作符。在如果我沒有在運算符周圍加引號&#xff0c;那么列表中逗號的語法錯誤&#xff1a;File "p22.py", line 24cat [,-,*]^SyntaxError: invalid syntax如果我把引語放在周圍&#xff0c;那么…

軟工個人總結

目錄 一、個人提升二、寫下屬于自己的人月神話三、對下一屆、后來人、自己的建議四、我的團隊——Clover五、關于代碼質量六、學過軟件工程&#xff1f;七、自我介紹八、個性發揮一、個人提升 1. 開學初的目標 希望通過團隊合作領會團隊合作的內在精神&#xff0c;希望在分工完…

Tomcat上下文JUnit @Rule

創建測試上下文的JUnit Rule的初稿。 這可以用Spring上下文規則可用于 這個帖子 創建集成測試一個完整的Spring上下文。 import org.apache.commons.dbcp.BasicDataSource; import org.apache.log4j.Logger; import org.junit.rules.TestRule; import org.junit.runner.Descrip…

排序算法之(7)——堆排序

【堆排序的思路】 堆排序主要是利用了堆的性質。對于大頂堆&#xff1a;堆中的每一個節點的值都不小于它的孩子節點的值&#xff0c;具體可參考我的還有一篇博客http://blog.csdn.net/adminabcd/article/details/46880591&#xff0c;那么大頂堆的堆頂元素就是當前堆中全部元素…

HTML基礎:基本標簽簡介(3)

html中有很多標簽&#xff0c;下面介紹最基本的幾個標簽。 1、meta 是head標簽中的一個輔助性標簽。 有2個重要屬性&#xff1a; &#xff08;1&#xff09;name 可以優化頁面被搜索到的可能性。name中可以指定屬性&#xff0c;content是屬性值。 <html><head><…

java 字符碼_Java字符編碼

編碼原理介紹(中文編碼雜談)&#xff1a;int -> byte可以直接使用強制類型轉換: byte b (byte) aInt;這個操作是直接截取int中最低一個字節&#xff0c;如果int大于255&#xff0c;則值就會變得面目全非了byte -> int這里有兩種情況&#xff0c;一種是要求保持值不變&am…

重新登錄:重新登錄

嗨&#xff0c;我再次回到日志中來&#xff0c;這是任何應用程序設計和開發的固有部分。 我是堅強的基礎知識的忠實擁護者&#xff0c;在我的拙見中&#xff0c;日志記錄是任何企業級應用程序中經常被忽略但基本的關鍵要素之一。 我已經寫在此之前這里 。 為了理解當前文章&…