Spark-06:共享變量

目錄

1.廣播變量(broadcast variables)

2.累加器(accumulators)


? ? ? 在分布式計算中,當在集群的多個節點上并行運行函數時,默認情況下,每個任務都會獲得函數中使用到的變量的一個副本。如果變量很大,這會導致網絡傳輸占用大量帶寬,并且在每個節點上都占用大量內存空間。為了解決這個問題,Spark引入了共享變量的概念。

????????共享變量允許在多個任務之間共享數據,而不是為每個任務分別復制一份變量。這樣可以顯著降低網絡傳輸的開銷和內存占用。Spark提供了兩種類型的共享變量:廣播變量(broadcast variables)和累加器(accumulators)。

1.廣播變量(broadcast variables)

????????通常情況下,Spark程序運行時,通常會將數據以副本的形式分發到每個執行器(Executor)的任務(Task)中,但當變量較大時,這會導致大量的內存和網絡開銷。通過使用廣播變量,Spark將變量只發送一次到每個節點,并在多個任務之間共享這個副本,從而顯著降低了內存占用和網絡傳輸的開銷。

Scala 實現:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

Java 實現:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});broadcastVar.value();
// returns [1, 2, 3]

2.累加器(accumulators)

????????累加器是Spark中的一種特殊類型的共享變量,主要用來把Executor端變量信息聚合到Driver端。在Driver程序中定義的變量,在Executor端的每個task都會得到這個變量的一份新的副本,每個task更新這些副本的值后,傳回Driver端進行merge。累加器支持的數據類型僅限于數值類型,包括整數和浮點數等。

Scala 實現:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala> accum.value
res2: Long = 10

Java 實現:

LongAccumulator accum = jsc.sc().longAccumulator();sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 saccum.value();
// returns 10

? ? ? ? 內置累加器功能有限,但可以通過繼承AccumulatorV2來創建自己的類型。AccumulatorV2抽象類有幾個方法必須重寫:reset用于將累加器重置為零,add用于向累加器中添加另一個值,merge用于將另一個相同類型的累加器合并到此累加器。

自定義累加器Scala實現:

package com.yichenkeji.demo.sparkscalaimport org.apache.spark.util.AccumulatorV2class CustomAccumulator extends AccumulatorV2[Int, Int]{//初始化累加器的值private var sum = 0override def isZero: Boolean = sum == 0override def copy(): AccumulatorV2[Int, Int] = {val newAcc = new CustomAccumulator()newAcc.sum = sumnewAcc}override def reset(): Unit = sum = 0override def add(v: Int): Unit = sum += voverride def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.valueoverride def value: Int = sum
}

自定義累加器Java實現:

package com.yichenkeji.demo.sparkjava;import org.apache.spark.util.AccumulatorV2;public class CustomAccumulator extends AccumulatorV2<Integer, Integer> {// 初始化累加器的值private Integer sum = 0;@Overridepublic boolean isZero() {return sum == 0;}@Overridepublic AccumulatorV2<Integer, Integer> copy() {CustomAccumulator customAccumulator = new CustomAccumulator();customAccumulator.sum = this.sum;return customAccumulator;}@Overridepublic void reset() {this.sum = 0;}@Overridepublic void add(Integer v) {this.sum += v;}@Overridepublic void merge(AccumulatorV2<Integer, Integer> other) {this.sum += ((CustomAccumulator) other).sum;}@Overridepublic Integer value() {return sum;}
}

自定義累加器的使用:

package com.yichenkeji.demo.sparkjava;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;
import java.util.List;public class AccumulatorTest {public static void main(String[] args) {//1.初始化SparkContext對象SparkConf sparkConf = new SparkConf().setAppName("Spark Java").setMaster("local[*]");JavaSparkContext sc = new JavaSparkContext(sparkConf);CustomAccumulator customAccumulator = new CustomAccumulator();//注冊自定義累加器才能使用sc.sc().register(customAccumulator);sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).foreach(x -> customAccumulator.add(x));System.out.println(customAccumulator.value());//5.停止SparkContextsc.stop();}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/165670.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/165670.shtml
英文地址,請注明出處:http://en.pswp.cn/news/165670.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

開啟數據庫審計(db,extended級別或os級別),并將審計文件存放到/home/oracle/audit下

文章目錄 開啟數據庫審計&#xff08;db,extended級別或os級別&#xff09;&#xff0c;并將審計文件存放到/home/oracle/audit下一. 簡介二. 配置2.1. 審計是否安裝2.2. 審計表空間遷移2.3. 審計參數2.4. 審計級別2.5. 其他審計選項2.6. 審計相關視圖 三. 使用3.1. 開啟/關閉審…

成為獨立開發者有多難

首先自我介紹&#xff1a;我是一名前端開發工程師&#xff0c;7年的前端開發經驗。CSDN 九段刀客_js,vue,ReactNative-CSDN博客,80多萬的訪問量&#xff0c;1萬多的粉絲。 相信80%的程序員的終極夢想都是成為一名獨立開發者&#xff0c;不用找工作有自己的產品可以有睡后收入。…

深度學習模型訓練計算量的估算

深度學習模型訓練計算量的估算 方法1&#xff1a;基于網絡架構和批處理數量計算算術運算次數前向傳遞計算和常見層的參數數量全連接層&#xff08;Fully connected layer&#xff09;參數浮點數計算量 CNN參數浮點數計算量 轉置CNN參數浮點數計算量 RNN參數浮點數計算量 GRU參數…

刷題學習記錄(含2023ISCTFweb題的部分知識點)

[SWPUCTF 2021 新生賽]sql 進入環境 查看源碼&#xff0c;發現是get傳參且參數為wllm fuzz測試&#xff0c;發現空格&#xff0c;&#xff0c;and被過濾了 同樣的也可以用python腳本進行fuzz測試 import requests fuzz{length ,,handler,like,select,sleep,database,delete,h…

java學習part09類的構造器

1. 2.默認構造器 如果沒有顯式定義任何構造器&#xff0c;系統會默認加一個默認構造器。 如果定義了&#xff0c;則不會有默認構造器。 默認構造器的權限和類的權限一樣&#xff0c;類是public構造器就是public&#xff0c;類是缺省默認構造器就是缺省 反編譯之后添加的構造…

解決DaemonSet沒法調度到master節點的問題

最近在kubernetes部署一個springcloud微服務項目&#xff0c;到了最后一步部署邊緣路由&#xff1a;使用nginx-ingress和traefik都可以&#xff0c;必須使用DaemonSet部署&#xff0c;但是發現三個節點&#xff0c;卻總共只有兩個pod。 換句話說&#xff0c; DaemonSet沒法調度…

UML建模圖文詳解教程05——包圖

版權聲明 本文原創作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl本文參考資料&#xff1a;《UML面向對象分析、建模與設計&#xff08;第2版&#xff09;》呂云翔&#xff0c;趙天宇 著 包圖概述 包圖(package diagram)是用來描述模型中的…

一個最簡單的工業通訊數據分析例子

1.背景 對工業設備的通訊協議進行分析可以幫助我們更好地理解其工作原理和相關技術&#xff0c;并且有助于以下幾個方面&#xff1a; 1. 優化工業設備的通訊效率&#xff1a;了解通訊協議的細節可以幫助我們找到通訊效率低下的原因并進行優化&#xff0c;提高設備的通訊效率和…

Axioss筆記

一、Get請求 // 請求頭攜帶參數&#xff0c;案例&#xff1a;?uid1001 axios.get(http://localhost:8080/user/api/v1/user/query, {params: {uid: 1001}}).then(res > {console.log(res.data) }).catch(err > {console.log("請求錯誤" err) }).finally(() …

MySQL 8 配置文件詳解與最佳實踐

MySQL 8 是一款強大的關系型數據庫管理系統&#xff0c;通過適當的配置文件設置&#xff0c;可以充分發揮其性能潛力。在這篇博客中&#xff0c;我們將深入探究 MySQL 8 常用的配置文件&#xff0c;并提供一些建議&#xff0c;幫助您優化數據庫性能。 配置文件概覽 在 MySQL …

【華為OD題庫-030】阿里巴巴找黃金寶箱(V)-java

題目 一貧如洗的樵夫阿里巴巴在去砍柴的路上&#xff0c;無意中發現了強盜集團的藏寶地&#xff0c;藏寶地有編號從0-N的箱子&#xff0c;每個箱子上面貼有一個數字.阿里巴巴念出一個咒語數字k(k<N),找出連續k個寶箱數字和的最大值&#xff0c;并輸出該最大值。 輸入描述 第…

攔截器的使用

攔截器&#xff08;Interceptor&#xff09;是一種在應用程序中用于干預、修改或攔截請求和響應的組件&#xff0c;是AOP 編程的一種實踐&#xff0c;和過濾器一樣都是一種具體的AOP實現。它可以在請求被發送到目標處理程序之前或之后&#xff0c;對請求進行預處理或對響應進行…

【數據結構】二叉樹概念 | 滿二叉樹 | 完全二叉樹

二叉樹的概念 二叉樹在實踐中用的很多。 一棵二叉樹是結點的一個有限集合&#xff0c;該集合&#xff1a; 或者為空&#xff1b;由一個根結點加上兩棵別稱為左子樹和右子樹的二叉樹組成。二叉樹最多兩個孩子。 這里注意&#xff1a;二叉樹并不是度為2的樹。 二叉樹的度最大值是…

Go lumberjack 日志輪換和管理

在開發應用程序時&#xff0c;記錄日志是一項關鍵的任務&#xff0c;以便在應用程序運行時追蹤問題、監視性能和保留審計記錄。Go 語言提供了靈活且強大的日志記錄功能&#xff0c;可以通過多種方式配置和使用。其中一個常用的日志記錄庫是 github.com/natefinch/lumberjack&am…

python selenium 模擬瀏覽器自動操作搶購腳本

每逢秒殺&#xff0c;都在遺憾網速和手速慢沒能搶購到商品吧。 手寫一個腳本&#xff0c;讓程序幫你搶&#xff0c;搶到的概率會大大提升。 廢話不多說&#xff0c;直接上代碼。 本實例以華為官網搶購手機為例 """ 模擬瀏覽器操作華為官網(1) 【只需要安裝一…

【JAVA】我們該如何規避代碼中可能出現的錯誤?(二)

個人主頁&#xff1a;【&#x1f60a;個人主頁】 系列專欄&#xff1a;【??初識JAVA】 文章目錄 前言異常方法&#xff08;Throwable類&#xff09;Throwable類的方法 捕獲異常多重捕獲塊 前言 異常是程序中的一些錯誤&#xff0c;但并不是所有的錯誤都是異常&#xff0c;并…

git-3

1.如何讓工作區的文件恢復為和暫存區一樣&#xff1f; 工作區所作的變更還不及暫存區的變更好&#xff0c;想從暫存區拷貝到工作區&#xff0c;變更工作區(恢復成和暫存區一樣的狀態)&#xff0c;想到用git checkout -- 文件名 2.怎樣取消暫存區部分文件的更改&#xff1f; 如…

無損壓縮技巧:減小PDF文件尺寸的有效方法

我們在制作pdf文檔的時候&#xff0c;會加入許多內容&#xff0c;文字、圖片等等&#xff0c;素材添加的過多之后就會導致pdf文檔特別大&#xff0c;在上傳或者儲存時&#xff0c;就會特別不方便&#xff0c;所以今天就告訴大家一個pdf壓縮的方法&#xff0c;使用pdf在線壓縮工…

4-Docker命令之docker info

后續為大家逐個講解一下docker常用命令及其相關用法。docker常用命令查看如下: [root@centos79 ~]# docker --helpUsage: docker [OPTIONS] COMMANDA self-sufficient runtime for containersCommon Commands:run Create and run a new container from an imageexec…

洛谷 P1883 函數

P1883 函數 - 洛谷 | 計算機科學教育新生態 (luogu.com.cn) Error Curves - 洛谷 | 計算機科學教育新生態 (luogu.com.cn) 這兩題是一模一樣的&#xff0c;過一題水兩題。 分析 主要難點在于證明F(x)是一個單峰函數可以被三分&#xff0c;但是我隨便畫了幾個f(x)之后發現好像…