【無標題】spark編程

Value類型:

9) distinct

??函數簽名

def distinct()(implicit?ord: Ordering[T] = null): RDD[T]

def distinct(numPartitions: Int)(implicit?ord: Ordering[T] = null): RDD[T]

??函數說明

將數據集中重復的數據去重

?

val?dataRDD?=?sparkContext.makeRDD(List(
?1,2,3,4,1,2
))
val?dataRDD1 =?dataRDD.distinct()
val?dataRDD2 =?dataRDD.distinct(2)

?

10) coalesce

??函數簽名

def coalesce(numPartitions: Int, shuffle: Boolean = false,

partitionCoalescer: Option[PartitionCoalescer] =?Option.empty)?

(implicit?ord: Ordering[T] = null)

: RDD[T]

??函數說明

根據數據量縮減分區,用于大數據集過濾后,提高小數據集的執行效率

當?spark 程序中,存在過多的小任務的時候,可以通過 coalesce 方法,收縮合并分區,減少分區的個數,減小任務調度成本

val?dataRDD?=?sparkContext.makeRDD(List(
?1,2,3,4,1,2
),6)
val?dataRDD1 =?dataRDD.coalesce(2)

?

11) repartition

??函數簽名

def repartition(numPartitions: Int)(implicit?ord: Ordering[T] = null): RDD[T]

??函數說明

該操作內部其實執行的是?coalesce 操作,參數 shuffle 的默認值為 true。無論是將分區數多的RDD 轉換為分區數少的 RDD,還是將分區數少的 RDD 轉換為分區數多的 RDD,repartition操作都可以完成,因為無論如何都會經?shuffle 過程。

val?dataRDD?=?sparkContext.makeRDD(List(
?1,2,3,4,1,2
),2)
val?dataRDD1 =?dataRDD.repartition(4)

?

12)?sortBy

??函數簽名

def?sortBy[K](

f: (T) => K,

ascending: Boolean = true,

numPartitions: Int =?this.partitions.length)?

(implicit?ord: Ordering[K],?ctag:?ClassTag[K]): RDD[T]

??函數說明

該操作用于排序數據。在排序之前,可以將數據通過f 函數進行處理,之后按照 f 函數處理的結果進行排序,默認為升序排列。排序后新產生的?RDD 的分區數與原 RDD 的分區數一致。中間存在?shuffle 的過程。

val?dataRDD?=?sparkContext.makeRDD(List(
?1,2,3,4,1,2
),2)
val?dataRDD1 =?dataRDD.sortBy(num=>num,?false,?4)
val?dataRDD2 =?dataRDD.sortBy(num=>num,?true,?4)

?

?

雙Value類型:

13) intersection

??函數簽名

def intersection(other: RDD[T]): RDD[T]

??函數說明

對源?RDD 和參數 RDD 求交集后返回一個新的 RDD

val?dataRDD1 =?sparkContext.makeRDD(List(1,2,3,4))
val?dataRDD2 =?sparkContext.makeRDD(List(3,4,5,6))
val?dataRDD?= dataRDD1.intersection(dataRDD2)

?

14) union

??函數簽名

def union(other: RDD[T]): RDD[T]

??函數說明

對源?RDD 和參數 RDD 求并集后返回一個新的 RDD(重復數據不會去重)

val?dataRDD1 =?sparkContext.makeRDD(List(1,2,3,4))
val?dataRDD2 =?sparkContext.makeRDD(List(3,4,5,6))
val?dataRDD?= dataRDD1.union(dataRDD2)

?

15) subtract

??函數簽名

def subtract(other: RDD[T]): RDD[T]

??函數說明

以源?RDD 元素為主,去除兩個 RDD 中重復元素,將源RDD的其他元素保留下來。(求差集)

val?dataRDD1 =?sparkContext.makeRDD(List(1,2,3,4))
val?dataRDD2 =?sparkContext.makeRDD(List(3,4,5,6))
val?dataRDD?= dataRDD1.subtract(dataRDD2)

?

16) zip

??函數簽名

def zip[U:?ClassTag](other: RDD[U]): RDD[(T, U)]

??函數說明

將兩個?RDD 中的元素,以鍵值對的形式進行合并。其中,鍵值對中的 Key 為第 1?個?RDD

中的元素,Value 為第 2?個?RDD 中的相同位置的元素。

val?dataRDD1 =?sparkContext.makeRDD(List("a","b","c","d"))
val?dataRDD2 =?sparkContext.makeRDD(List(1,2,3,4))
val?dataRDD?= dataRDD1.zip(dataRDD2)

flatMap

??函數簽名

def?flatMap[U:?ClassTag](f: T =>?TraversableOnce[U]): RDD[U]

??函數說明

將處理的數據進行扁平化后再進行映射處理,所以算子也稱之為扁平映射。

val?dataRDD?=?sparkContext.makeRDD(List(
?List(1,2),List(3,4)
),1)
val?dataRDD1?=?dataRDD.flatMap(
?list => list
)

?

map和flatMap的區別:

?

map會將每一條輸入數據映射為一個新對象。

?

flatMap包含兩個操作:會將每一個輸入對象輸入映射為一個新集合,然后把這些新集合連成一個大集合。

partitionBy

??函數簽名

def?partitionBy(partitioner: Partitioner): RDD[(K, V)]

??函數說明

將數據按照指定?Partitioner 重新進行分區。Spark 默認的分區器是?HashPartitioner

val?rdd: RDD[(Int,?String)] =
?sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)

val?rdd2: RDD[(Int,?String)] =
?rdd.partitionBy(new?HashPartitioner(2))

函數說明

將數據源的數據根據?key 對 value 進行分組

val?dataRDD1 =
?sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val?dataRDD2 = dataRDD1.groupByKey()
val?dataRDD3 = dataRDD1.groupByKey(2)
val?dataRDD4 = dataRDD1.groupByKey(new?HashPartitioner(2))

可以將數據按照相同的?Key 對 Value 進行聚合

val?dataRDD1 =?sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val?dataRDD2 = dataRDD1.reduceByKey(_+_)
val?dataRDD3 = dataRDD1.reduceByKey(_+_,?2)

將數據根據不同的規則進行分區內計算和分區間計算val?dataRDD1 =
?sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val?dataRDD2 =
?dataRDD1.aggregateByKey(0)(_+_,_+_)

val?dataRDD1 =
?sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val?dataRDD2 = dataRDD1.foldByKey(0)(_+_)

現有數據 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),求每個key的總值及每個key對應鍵值對的個數

val?list:?List[(String, Int)] =?List(("a",?88), ("b",?95), ("a",?91), ("b",?93),("a",?95), ("b",?98))
val?input: RDD[(String, Int)] =?sc.makeRDD(list,?2)
val?combineRDD: RDD[(String, (Int, Int))] =?input.combineByKey(
?(_,?1),?//a=>(a,1)
?(acc: (Int, Int), v) => (acc._1 + v, acc._2 +?1),?//acc_1為數據源的value,acc_2為key出現的次數,二者進行分區內部的計算
?(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)?//將分區內部計算的結果進行分區間的匯總計算,得到每個key的總值以及每個key出現的次數
)

在一個(K,V)的 RDD 上調用,K 必須實現 Ordered 接口(特質),返回一個按照 key 進行排序

val?dataRDD1 =?sc.makeRDD(List(("a",1),("b",2),("c",3)))
val?sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val?sortRDD2: RDD[(String, Int)] = dataRDD1.sortByKey(false)

?

?

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/75719.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/75719.shtml
英文地址,請注明出處:http://en.pswp.cn/web/75719.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

GPT-2 語言模型 - 模型訓練

本節代碼是一個完整的機器學習工作流程,用于訓練一個基于GPT-2的語言模型。下面是對這段代碼的詳細解釋: 文件目錄如下 1. 初始化和數據準備 設置隨機種子 random.seed(1002) 確保結果的可重復性。 定義參數 test_rate 0.2 context_length 128 tes…

架構師面試(二十九):TCP Socket 編程

問題 今天考察網絡編程的基礎知識。 在基于 TCP 協議的網絡 【socket 編程】中可能會遇到很多異常,在下面的相關描述中說法正確的有哪幾項呢? A. 在建立連接被拒絕時,有可能是因為網絡不通或地址錯誤或 server 端對應端口未被監聽&#x…

HTTP實現心跳模塊

HTTP實現心跳模塊 使用輕量級的cHTTP庫cpp-httplib重現實現HTTP心跳模塊 頭文件HttplibHeartbeat.h #ifndef HTTPLIB_HEARTBEAT_H #define HTTPLIB_HEARTBEAT_H#include <string> #include <thread> #include <atomic> #include <chrono> #include …

openharmony—release—4.1開發環境搭建(踩坑記錄)

環境開發需要分別在window以及ubuntu下進行相應設置 一、window 1.安裝DevEco Device Tool OpenAtom OpenHarmony 二、ubuntu 1.將Ubuntu Shell環境修改為bash ls -l /bin/sh 2.打開終端工具&#xff0c;執行如下命令&#xff0c;輸入密碼&#xff0c;然后選擇No&#xff0…

Go學習系列文章聲明

本次學習是基于B站的視頻&#xff0c;【Udemy高分熱門付費課程】Golang&#xff1a;完整開發者指南&#xff08;基礎知識和高級特性&#xff09;中英文字幕_嗶哩嗶哩_bilibili 本人會嘗試輸出視頻中的內容&#xff0c;如有錯誤歡迎指出 next page: Go installation process

error: RPC failed; HTTP 408 curl 22 The requested URL returned error: 408

在git push時報錯&#xff1a;error: RPC failed; HTTP 408 curl 22 The requested URL returned error: 408 原因&#xff1a;可能是推送的文件太大&#xff0c;要么是緩存不夠&#xff0c;要么是網絡不行。 解決方法&#xff1a; 將本地 http.postBuffer 數值調整到500MB&…

Android.bp中添加條件判斷編譯方式

背景&#xff1a; 馬哥學員朋友以前在vip群里&#xff0c;有問道如何在Android.bp中添加條件判斷&#xff0c;在工作中經常需要一套代碼兼容發貨目標版本&#xff0c;即代碼都是公共的一套&#xff0c;但是需要用這一套代碼集成到各個產品設備上 但是這個產品設備可能面臨比…

swift ui基礎

一個樸實無華的目錄 今日學習內容&#xff1a;1.三種布局&#xff08;可以相互包裹&#xff09;1.1 vstack&#xff08;豎直&#xff09;&#xff1a;先寫的在上面1.1 hstack&#xff08;水平&#xff09;&#xff1a;先寫的在左邊1.1 zstack&#xff08;前后&#xff09;&…

第16屆藍橋杯單片機模擬試題Ⅲ

試題 代碼 sys.h #ifndef __SYS_H__ #define __SYS_H__#include <STC15F2K60S2.H> //sys.c extern unsigned char UI; //界面標志(0濕度界面、1參數界面、2時間界面) extern unsigned char time; //時間間隔(1s~10S) extern bit ssflag; //啟動/停止標志…

Node.js中URL模塊詳解

Node.js 中 URL 模塊全部 API 詳解 1. URL 類 const { URL } require(url);// 1. 創建 URL 對象 const url new URL(https://www.example.com:8080/path?queryvalue#hash);// 2. URL 屬性 console.log(協議:, url.protocol); // https: console.log(主機名:, url.hos…

Java接口性能優化面試問題集錦:高頻考點與深度解析

1. 如何定位接口性能瓶頸&#xff1f;常用哪些工具&#xff1f; 考察點&#xff1a;性能分析工具的使用與問題定位能力。 核心答案&#xff1a; 工具&#xff1a;Arthas&#xff08;在線診斷&#xff09;、JProfiler&#xff08;內存與CPU分析&#xff09;、VisualVM、Prometh…

WheatA小麥芽:農業氣象大數據下載器

今天為大家介紹的軟件是WheatA小麥芽&#xff1a;專業純凈的農業氣象大數據系統。下面&#xff0c;我們將從軟件的主要功能、支持的系統、軟件官網等方面對其進行簡單的介紹。 主要內容來源于軟件官網&#xff1a;WheatA小麥芽的官方網站是http://www.wheata.cn/ &#xff0c;…

Python10天突擊--Day 2: 實現觀察者模式

以下是 Python 實現觀察者模式的完整方案&#xff0c;包含同步/異步支持、類型注解、線程安全等特性&#xff1a; 1. 經典觀察者模式實現 from abc import ABC, abstractmethod from typing import List, Anyclass Observer(ABC):"""觀察者抽象基類""…

CST1019.基于Spring Boot+Vue智能洗車管理系統

計算機/JAVA畢業設計 【CST1019.基于Spring BootVue智能洗車管理系統】 【項目介紹】 智能洗車管理系統&#xff0c;基于 Spring Boot Vue 實現&#xff0c;功能豐富、界面精美 【業務模塊】 系統共有三類用戶&#xff0c;分別是&#xff1a;管理員用戶、普通用戶、工人用戶&…

Windows上使用Qt搭建ARM開發環境

在 Windows 上使用 Qt 和 g++-arm-linux-gnueabihf 進行 ARM Linux 交叉編譯(例如針對樹莓派或嵌入式設備),需要配置 交叉編譯工具鏈 和 Qt for ARM Linux。以下是詳細步驟: 1. 安裝工具鏈 方法 1:使用 MSYS2(推薦) MSYS2 提供 mingw-w64 的 ARM Linux 交叉編譯工具鏈…

Python爬蟲教程011:scrapy爬取當當網數據開啟多條管道下載及下載多頁數據

文章目錄 3.6.4 開啟多條管道下載3.6.5 下載多頁數據3.6.6 完整項目下載3.6.4 開啟多條管道下載 在pipelines.py中新建管道類(用來下載圖書封面圖片): # 多條管道開啟 # 要在settings.py中開啟管道 class DangdangDownloadPipeline:def process_item(self, item, spider):…

Mysql -- 基礎

SQL SQL通用語法&#xff1a; SQL分類&#xff1a; DDL: 數據庫操作 查詢&#xff1a; SHOW DATABASES&#xff1b; 創建&#xff1a; CREATE DATABASE[IF NOT EXISTS] 數據庫名 [DEFAULT CHARSET字符集] [COLLATE 排序規則]&#xff1b; 刪除&#xff1a; DROP DATABA…

實操(環境變量)Linux

環境變量概念 我們用語言寫的文件編好后變成了程序&#xff0c;./ 運行的時候他就會變成一個進程被操作系統調度并運行&#xff0c;運行完畢進程相關資源被釋放&#xff0c;因為它是一個bash的子進程&#xff0c;所以它退出之后進入僵尸狀態&#xff0c;bash回收他的退出結果&…

torch.cat和torch.stack的區別

torch.cat 和 torch.stack 是 PyTorch 中用于組合張量的兩個常用函數&#xff0c;它們的核心區別在于輸入張量的維度和輸出張量的維度變化。以下是詳細對比&#xff1a; 1. torch.cat (Concatenate) 作用&#xff1a;沿現有維度拼接多個張量&#xff0c;不創建新維度 輸入要求…

深入解析@Validated注解:Spring 驗證機制的核心工具

一、注解出處與核心定位 1. 注解來源 ? 所屬框架&#xff1a;Validated 是 Spring Framework 提供的注解&#xff08;org.springframework.validation.annotation 包下&#xff09;。 ? 核心定位&#xff1a; 作為 Spring 對 JSR-380&#xff08;Bean Validation 2.0&#…