04-240606Spark筆記

04-240606Spark筆記

1.行動算子-2

  • save相關算子:

格式:

def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit

例子:

 ?val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))
?rdd.saveAsTextFile("output")rdd.saveAsObjectFile("output1")// saveAsSequenceFile方法要求數據的格式必須為K-V類型rdd.saveAsSequenceFile("output2")

輸出結果:

image-20240604225213130

  • foreach

格式:

def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

例子:

 ? ?val rdd = sc.makeRDD(List(1,2,3,4))
?//foreach 其實是Driver端內存集合的循環遍歷方法rdd.collect().foreach(println) //Driverprintln("***************")// foreach 其實是Executor端內存數據打印rdd.foreach(println)    // Executor// 算子 : Operator(操作)// ? ? ? ? RDD的方法和Scala集合對象的方法不一樣// ? ? ? ? 集合對象的方法都是在同一個節點的內存中完成的。// ? ? ? ? RDD的方法可以將計算邏輯發送到Executor端(分布式節點)執行// ? ? ? ? 為了區分不同的處理效果,所以將RDD的方法稱之為算子。// ? ? ?  RDD的方法外部的操作都是在Driver端執行的,而方法內部的邏輯代碼是在Executor端執行。

輸出結果:

image-20240604232824753

2. 序列化

2.1 閉包檢測
  • 閉包檢測

因為Driver需要給兩個Executor共享User方法,共享就需要序列化

案例:

 ?def main(args: Array[String]): Unit = {
?val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)
?val rdd = sc.makeRDD(List[Int]())
?val user = new User()
?// SparkException: Task not serializable// NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User
?// RDD算子中傳遞的函數是會包含閉包操作,那么就會進行檢測功能// 閉包檢測rdd.foreach(num => {println("age = " + (user.age + num))})
?sc.stop()
?}//class User extends Serializable {// 樣例類在編譯時,會自動混入序列化特質(實現可序列化接口)//case class User() {class User {var age : Int = 30}
  • RDD 的分區器

自己來寫分區器:

 ? ?def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf)
?val rdd = sc.makeRDD(List(("nba", "xxxxxxxxx"),("cba", "xxxxxxxxx"),("wnba", "xxxxxxxxx"),("nba", "xxxxxxxxx"),),3)val partRDD: RDD[(String, String)] = rdd.partitionBy( new MyPartitioner )
?partRDD.saveAsTextFile("output")
?sc.stop()}

自定義的分區器:

 ? ?class MyPartitioner extends Partitioner{// 分區數量override def numPartitions: Int = 3
?// 根據數據的key值返回數據所在的分區索引(從0開始)override def getPartition(key: Any): Int = {key match {case "nba" => 0case "wnba" => 1case _ => 2}}}
* 自定義分區器
* 1. 繼承Partitioner
* 2. 重寫方法

輸出結果:

image-20240605170312913

image-20240605170321664

  • RDD 文件讀取與保存

案例1:

 ? ?def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf)
?val rdd = sc.textFile("output1")println(rdd.collect().mkString(","))
?val rdd1 = sc.objectFile[(String, Int)]("output2")println(rdd1.collect().mkString(","))
?val rdd2 = sc.sequenceFile[String, Int]("output3")println(rdd2.collect().mkString(","))
?sc.stop()}

輸出結果:

image-20240605170535800

案例2:

 ? ?def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf)
?val rdd = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))
?rdd.saveAsTextFile("output1")rdd.saveAsObjectFile("output2")rdd.saveAsSequenceFile("output3")
?sc.stop()}

輸出結果:

image-20240605170643956

1. 數據結構:

image-20240605170954358

  • 累加器

累加器用來把 Executor 端變量信息聚合到 Driver 端。

![image-20240605202228850](E:\Files2\Typictures\image-20240605202228850.png

image-20240605202424331

Acc,累加器可以把Excutor端的數據返回到Driver中去:

image-20240605202543334

案例:

 ? ?def main(args: Array[String]): Unit = {
?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
?val rdd = sc.makeRDD(List(1,2,3,4))
?// reduce : 分區內計算,分區間計算//val i: Int = rdd.reduce(_+_)//println(i)var sum = 0rdd.foreach(num => {sum += num})println("sum = " + sum)
?sc.stop()
?}
  • 系統累加器

案例:

 ? ?def main(args: Array[String]): Unit = {
?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
?val rdd = sc.makeRDD(List(1,2,3,4))
?// 獲取系統累加器// Spark默認就提供了簡單數據聚合的累加器val sumAcc = sc.longAccumulator("sum")
?//sc.doubleAccumulator//sc.collectionAccumulator
?rdd.foreach(num => {// 使用累加器sumAcc.add(num)})
?// 獲取累加器的值println(sumAcc.value)
?sc.stop()
?}

累加器的一些特殊情況:

少加:轉換算子中調用累加器,如果沒有行動算子的話,那么不會執行
多加:轉換算子中調用累加器,如果沒有行動算子的話,那么不會執行
一般情況下,累加器會放置在行動算子進
 ? ?def main(args: Array[String]): Unit = {
?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
?val rdd = sc.makeRDD(List(1,2,3,4))
?// 獲取系統累加器// Spark默認就提供了簡單數據聚合的累加器val sumAcc = sc.longAccumulator("sum")
?//sc.doubleAccumulator//sc.collectionAccumulator
?val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})
?// 獲取累加器的值// 少加:轉換算子中調用累加器,如果沒有行動算子的話,那么不會執行// 多加:轉換算子中調用累加器,如果沒有行動算子的話,那么不會執行// 一般情況下,累加器會放置在行動算子進行操作mapRDD.collect()mapRDD.collect()println(sumAcc.value)
?sc.stop()
?}
  • 自定義累加器

分布式共享只寫變量

案例:

 ? ?def main(args: Array[String]): Unit = {
?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
?val rdd = sc.makeRDD(List("hello", "spark", "hello"))
?// 累加器 : WordCount// 創建累加器對象val wcAcc = new MyAccumulator()// 向Spark進行注冊sc.register(wcAcc, "wordCountAcc")
?rdd.foreach(word => {// 數據的累加(使用累加器)wcAcc.add(word)})
?// 獲取累加器累加的結果println(wcAcc.value)
?sc.stop()
?}/*自定義數據累加器:WordCount
?1. 繼承AccumulatorV2, 定義泛型IN : 累加器輸入的數據類型 StringOUT : 累加器返回的數據類型 mutable.Map[String, Long]
?2. 重寫方法(6)*/class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
?private var wcMap = mutable.Map[String, Long]()
?// 判斷是否初始狀態override def isZero: Boolean = {wcMap.isEmpty}
?override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator()}
?override def reset(): Unit = {wcMap.clear()}
?// 獲取累加器需要計算的值override def add(word: String): Unit = {val newCnt = wcMap.getOrElse(word, 0L) + 1wcMap.update(word, newCnt)}
?// Driver合并多個累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
?val map1 = this.wcMapval map2 = other.value
?map2.foreach{case ( word, count ) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}}
?// 累加器結果override def value: mutable.Map[String, Long] = {wcMap}}
  • 廣播變量

實現原理:

廣播變量用來高效分發較大的對象。在多個并行操作中使用同一個變量,但是 Spark 會為每個任務

分別發送。

案例:

 ? ?def main(args: Array[String]): Unit = {
?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
?val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))
// ? ? ?  val rdd2 = sc.makeRDD(List(
// ? ? ? ? ?  ("a", 4),("b", 5),("c", 6)
// ? ? ?  ))val map = mutable.Map(("a", 4),("b", 5),("c", 6))
?
?
?// join會導致數據量幾何增長,并且會影響shuffle的性能,不推薦使用//val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)//joinRDD.collect().foreach(println)// (a, 1), ?  (b, 2), ?  (c, 3)// (a, (1,4)),(b, (2,5)),(c, (3,6))rdd1.map {case (w, c) => {val l: Int = map.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println)
?
?
?sc.stop()
?}

join會導致數據量幾何增長,并且會影響shuffle的性能,不推薦使用

image-20240606162528164

Spark 中的廣播變量就可以將閉包的數據保存到Executor的內存中

Spark 中的廣播變量不能更改 : 分布式共享只讀變量

image-20240606162609035

封裝廣播變量1

案例:

 ? ?def main(args: Array[String]): Unit = {
?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)
?val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))val map = mutable.Map(("a", 4),("b", 5),("c", 6))
?// 封裝廣播變量val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
?rdd1.map {case (w, c) => {// 方法廣播變量val l: Int = bc.value.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println)
?
?
?sc.stop()
?}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/23547.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/23547.shtml
英文地址,請注明出處:http://en.pswp.cn/web/23547.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Python報錯】已解決NameError: name ‘Image‘ is not defined

解決Python報錯&#xff1a;NameError: name ‘Image’ is not defined 在使用Python進行圖像處理時&#xff0c;我們經常使用Pillow庫&#xff08;PIL的一個分支&#xff09;。如果你在嘗試創建或處理圖像時遇到了NameError: name Image is not defined的錯誤&#xff0c;這通…

史上最易懂的mysql鎖 、mvvc分析

1 mysql中的鎖類型&#xff1a; 1) 表鎖 表共享鎖(S):表級別的讀鎖&#xff0c;表共享鎖之間是兼容的。 表排他鎖(X): 表級別的寫鎖&#xff0c;表排他鎖和任何鎖(包括表排他鎖)都不兼容(不包括意向鎖)。 意向排他鎖(IX): 獲取行排他鎖之前必須獲取的意向排他鎖&#xff0c;這…

關于python包導入問題的重思考

將頂層目錄直接設置為一個包 像這樣&#xff0c;每一個文件從頂層包開始導入 這樣可以解決我的問題&#xff0c;但是要注意的時&#xff0c;要避免使用出現上下級出現同名包的情況&#xff0c;比如&#xff1a; AutoServer--AutoServer--__init__.py--__init__.py這種情況下…

騰訊云的身份證核驗,找不到這個類

系統接入騰訊云的sdk&#xff0c;Class ‘TencentCloud\Common\Credential’ not found 以下方法核對一下看有沒有做錯&#xff0c;如果沒有需要重啟一下守護一般是能解決問的 這個錯誤表明PHP代碼試圖加載一個名為TencentCloud\Common\Credential的類&#xff0c;但是在指定…

綠聯云NAS一些探索(1):SSH、包管理器探測、安裝docker-compose等

綠聯云NAS一些探索SSH、包管理器探測、安裝docker-compose等 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https:…

AI圖書推薦:《如何利用ChatGPT在線賺錢》

這本書《如何利用ChatGPT在線賺錢》&#xff08;$100m ChatGPT_ How To Make Money Online With ChatGPT -- Sharp, Biily -- 2023 &#xff09;主要闡述如何利用ChatGPT這一強大的語言模型工具在互聯網上創造收入。 以下是各章節內容的概要&#xff1a; **引言** - 介紹了Chat…

STM32F103單片機工程移植到航順單片機HK32F103注意事項

一、簡介 作為國內MCU廠商中前三陣營之一的航順芯片&#xff0c;建立了世界首創超低功耗7nA物聯網、萬物互聯核心處理器浩瀚天際10X系列平臺&#xff0c;接受代理商/設計企業/方案商定制低于自主研發十倍以上成本&#xff0c;接近零風險自主品牌產品&#xff0c;芯片設計完成只…

spring整合kafka

原文鏈接&#xff1a;spring整合kafka_spring集成kafka-CSDN博客 1、導入依賴 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.10.RELEASE</version> </depende…

Arthas Profiler 事件監控場景介紹

CPU 使用情況 (cpu) 場景描述&#xff1a; 當應用程序響應緩慢或者CPU使用率異常高時&#xff0c;開發者需要找出導致高CPU消耗的具體方法調用。通過監控CPU使用情況&#xff0c;可以識別出那些占用大量CPU時間的熱點方法。 使用Arthas的步驟&#xff1a; 啟動CPU profiler:…

編譯等底層知識

目錄 一. GCC命令語句大全 二. GCC編譯4個階段 三. makefile的使用 四. CMake 五. GNU工具鏈開發流程圖 六. Keil中的地址段 七. 靜態庫和動態庫 一. GCC命令語句大全 -c只編譯源文件&#xff0c;生成目標文件&#xff08;.o 文件&#xff09;&#xff0c;不進行鏈接。…

CC++內存管理【new和delete操作符的詳細分析】【常見面試題】

C/C內存管理 1.C/C內存分布 我們先來看一段代碼&#xff0c;來了解一下C/C中的數據內存分布。 # include <stdlib.h>int globalVar 1; static int staticGlobalVar 1; // 比globalVar還要先銷毀,同一個文件下后定義的先析構 // 全局變量存在 數據段&#xff08;靜態…

[Unity]播放音頻卡頓問題

記錄一個問題&#xff1a; 游戲內播放完音頻A再去循環播放音頻B&#xff0c;在協程里使用等待n秒來實現拼接&#xff0c;發現在個別手機上會有卡頓的問題&#xff0c;盲猜是和幀率有關。 這是最初的實現方案&#xff1a; IEnumerator IEPlayAudio(){if(ASOnBeginDrag ! null)…

VSCode+Vite+Vue3斷點調試

目錄 lunch.json創建 vite.config.ts 打斷點運行 lunch.json創建 首先&#xff0c;點擊VSCode左上角&#xff0c;甲殼蟲運行的按鈕&#xff0c;然后點擊運行與調試&#xff0c;選擇chrome瀏覽器&#xff0c;修改成一下配置。 { // 使用 IntelliSense 了解相關屬性。 // 懸停…

codeforces round 949 div2

A Turtle and Piggy Are Playing a Game 題目&#xff1a; 思路&#xff1a;輸出2的冪次b使得2^b為最大的不超過x的數 代碼&#xff1a; #include <iostream>using namespace std;const int N 2e5 10;void solve() {int l, r;cin >> l >> r;if(r % 2) …

vscode 運行和調試

vscode使用斷點 1.安裝并激活擴展 Debugger for Chrome (棄用 --> JavaScript Debugger)Debugger for Firefox 2. 配置config文件 打開 config/index.js 并找到 devtool property。將其更新為&#xff1a; 如果你使用的是 Vue CLI 2&#xff0c;請設置并更新 config/in…

SpringBoot Redis讀寫與數據序列化 RedisTemplate 與 StringRedisTemplate 防轉字節

介紹 RedisTemplate 對象在底層默認會轉成字節&#xff0c;造成了內存的開銷很大&#xff0c;這是他底層進行處理的,造成可讀性差&#xff0c;如需要轉成簡單的字符串存儲需要進行序列化的配置。 RedisTemplate 配置類 Configuration public class RedisConfig {Beanpublic …

OpenGL系列(五)紋理貼圖

概述 OpenGL紋理是一種在三維圖形中應用紋理映射的技術。紋理是一張圖像&#xff0c;可以應用到三維模型的表面上&#xff0c;從而使得模型看起來更加真實和具有細節。通過紋理映射&#xff0c;可以將圖像的像素值與三維模型的頂點進行匹配&#xff0c;從而為模型的表面增加細節…

Java并發編程之由于靜態變量錯誤使用可能導致的并發問題

Java并發編程之由于靜態變量錯誤使用可能導致的并發問題 1.1 前言1.2 業務背景1.3 問題分析1.4 為什么呢&#xff1f;1.5 修復方案2 演示示例源碼下載 1.1 前言 我們知道在 Java 后端服務開發中&#xff0c;如果出現并發問題一般都是由于在多個線程中使用了共享的變量導致的。…

JVM相關:Java內存區域

Java 虛擬機&#xff08;JVM)在執行 Java 程序的過程中會把它管理的內存劃分成若干個不同的數據區域。 Java運行時數據區域是指Java虛擬機&#xff08;JVM&#xff09;在執行Java程序時&#xff0c;為了管理內存而劃分的幾個不同作用域。這些區域各自承擔特定的任務&#xff0c…

Day23 自定義對話框服務

?本章節實現了,自定義對話框服務的功能 當現有的對話框服務無法滿足特定需求時,我們可以采用自定義對話框的解決方案,以更好地滿足一些特殊需求。 一.自定義對話框主機服務步驟 在Models 文件夾中,再建立一個 IDialogHostService 接口類,繼承自 IDialogService 對話框服…