課堂總結。

第三章第六節 Spark-SQL核心編程(五)自定義函數:UDF:val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")//創建SparkSession對象val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._//讀取json文件val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")spark.udf.register("addName",(x:String)=>"Name:"+x)df.createOrReplaceTempView("people")spark.sql("select addName(username),age from people").show()spark.stop()UDAF(自定義聚合函數)強類型的 Dataset 和弱類型的 DataFrame 都提供了相關的聚合函數, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用戶可以設定自己的自定義聚合函數。Spark3.0之前我們使用的是UserDefinedAggregateFunction作為自定義聚合函數,從 Spark3.0 版本后可以統一采用強類型聚合函數 Aggregator實驗需求:計算平均工資實現方式一:RDDval sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)val resRDD: (Int, Int) = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40))).map { case (name, salary) =>{ (salary, 1) }}.reduce { (t1, t2) =>{ (t1._1 + t2._1, t1._2 + t2._2) }}println(resRDD._1/resRDD._2)// 關閉連接sc.stop()實現方式二:弱類型UDAFclass MyAverageUDAF extends UserDefinedAggregateFunction{ def inputSchema: StructType = StructType(Array(StructField("salary",IntegerType))) // 聚合函數緩沖區中值的數據類型(salary,count) def bufferSchema: StructType = { StructType(Array(StructField("sum",LongType),StructField("count",LongType))) } // 函數返回值的數據類型 def dataType: DataType = DoubleType // 穩定性:對于相同的輸入是否一直返回相同的輸出。 def deterministic: Boolean = true // 函數緩沖區初始化 def initialize(buffer: MutableAggregationBuffer): Unit = { // 存薪資的總和 buffer(0) = 0L // 存薪資的個數 buffer(1) = 0L } // 更新緩沖區中的數據 def update(buffer: MutableAggregationBuffer,input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getInt(0) buffer(1) = buffer.getLong(1) + 1 } } // 合并緩沖區 def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // 計算最終結果 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)}val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")val spark:SparkSession = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._val res :RDD[(String,Int)]= spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40)))val df :DataFrame = res.toDF("name","salary")df.createOrReplaceTempView("user")var myAverage = new MyAverageUDAF//在 spark 中注冊聚合函數spark.udf.register("avgSalary",myAverage)spark.sql("select avgSalary(salary) from user").show()// 關閉連接spark.stop()實現方式三:強類型UDAFcase class Buff(var sum:Long,var cnt:Long)class MyAverageUDAF extends Aggregator[Long,Buff,Double]{ override def zero: Buff = Buff(0,0) override def reduce(b: Buff, a: Long): Buff = { b.sum += a b.cnt += 1 b } override def merge(b1: Buff, b2: Buff): Buff = { b1.sum += b2.sum b1.cnt += b2.cnt b1 } override def finish(reduction: Buff):?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/901548.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/901548.shtml
英文地址,請注明出處:http://en.pswp.cn/news/901548.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

分庫分表-除了hash分片還有別的嗎?

在分庫分表的設計中,除了常見的 Hash 分片,還有多種策略根據業務場景靈活選擇。以下是幾種主流的分庫分表策略及其應用場景、技術實現和優缺點分析,結合項目經驗(如標易行投標服務平臺的高并發場景)進行說明: 一、常見分庫分表策略 1. 范圍分片(Range Sharding) 原理:…

AUTOSAR圖解==>AUTOSAR_SWS_GPTDriver

AUTOSAR GPT驅動 (通用定時器驅動) 分析 AUTOSAR標準軟件規范解析 目錄 1. GPT驅動概述 1.1 GPT驅動在AUTOSAR架構中的位置1.2 GPT驅動主要功能 2. GPT驅動模塊結構3. GPT驅動初始化流程4. GPT驅動狀態機5. GPT驅動錯誤處理6. GPT預定義定時器7. 總結 1. GPT驅動概述 GPT驅動…

MyBatis持久層框架

MyBatis持久層框架 目錄 一、Mybatis簡介 1. 簡介 2. 持久層框架對比 3. 快速入門(基于Mybatis3方式) 二、日志框架擴展 1. 用日志打印替代sout 2. Java日志體系演變 3. 最佳拍檔用法 4. Lombok插件的使用 4.1 Lombok簡介 4.2 Lombok安裝 4.3 …

域控制器升級的先決條件驗證失敗,證書服務器已安裝

出現“證書服務器已安裝”導致域控制器升級失敗時,核心解決方法是卸載已安裝的證書服務?。具體操作如下:? ?卸載證書服務? 以管理員身份打開PowerShell,執行命令: Remove-WindowsFeature -Name AD-Certificate該命令會移除A…

VMware虛擬機常用Linux命令進階指南(一)

摘要:本文涵蓋多方面 Linux 命令的使用。包括用戶與用戶組管理,創建用戶和組并設置權限;目錄結構操作,涉及創建和更改目錄結構;Vim 編輯器及文件歸檔,有文件創建、編譯、合并、打包等任務。 更多優質文章 …

【AI News | 20250415】每日AI進展

AI News 1、字節跳動發布Seaweed-7B視頻模型:70億參數實現音視頻同步生成與多鏡頭敘事 字節跳動推出新一代視頻生成模型Seaweed-7B,該模型僅70億參數卻實現多項突破:支持音視頻同步生成、多鏡頭敘事(保持角色連貫性)、…

如何實現動態請求地址(baseURL)

需求: 在項目中遇到了需要實時更換請求地址,后續使用修改后的請求地址(IP) 例如:原ip請求為http://192.168.1.1:80/xxx,現在需要你點擊或其他操作將其修改為http://192.168.1.2:80/xxx,該如何操作 tips: 修改后需要跳轉( 修改了IP之前的不可使用,需要訪問修改后的地址來操作 …

Open AI 使用篇

一.function Calling 大模型中的 function calling 指的是在人工智能模型(如 GPT-4)中調用外部函數或API,以便模型能夠執行更復雜的任務或獲取外部數據。這種方式允許模型在生成回答時不僅僅依賴于內部的訓練數據,還能夠與外部系…

6.DJI-PSDK:psdk訂閱無人機高度/速度/GPS/RTK/時間/經緯度等消息及問題解決

DJI-PSDK:psdk訂閱無人機高度/速度/GPS/RTK/時間/經緯度等消息 消息訂閱可以獲取絕大多數無人機的動態信息,包括無人機的姿態、速度、加速度、角速度、高度、GPS 位置、云 臺的角度和狀態、飛行模式和飛行狀態、電機和電池等各類關鍵信息。 這些信息并不會“一股腦兒地”全部…

100 個網絡安全基礎知識

1. 什么是網絡安全? 網絡安全是指采取必要措施,防范對網絡的攻擊、侵入、干擾、破壞和非法使用以及意外事故,使網絡處于穩定可靠運行的狀態,保障網絡數據的完整性、保密性、可用性。(參考《中華人民共和國網絡安全法》…

第七屆IEEE通信、信息系統與計算機工程國際會議(CISCE 2025)

重要信息 官網:www.iccisce.com 時間:2025年5月9-11日 地點:中國-廣州 征稿主題 通信技術 信息系統 ?5G/6G通信系統與網絡 ?無線通信與移動網絡 ?光纖通信與光網絡 ?衛星與空間通信 ?通信信號處理與編碼 ?無線傳感器網絡 ?物聯網…

OpenCV 圖像拼接

一、圖像拼接的介紹 圖像拼接是一種將多幅具有部分重疊內容的圖像合并成一幅完整、無縫且具有更廣闊視野或更高分辨率圖像的技術。其目的是通過整合多個局部圖像來獲取更全面、更具信息價值的圖像內容。 二、圖像拼接的原理 圖像拼接的核心目標是將多幅有重疊區域的圖像進行準…

第十一章 網絡編程

在TCP/IP協議中,“IP地址TCP或UDP端口號”唯一標識網絡通訊中的一個進程。 因此可以用Socket來描述網絡連接的一對一關系。 常用的Socket類型有兩種:流式Socket(SOCK_STREAM)和數據報式Socket(SOCK_DGRAM&#xff09…

ffmpeg實現視頻流抽幀

ffmpeg 實現視頻流抽幀 抽取實時視頻幀 如果你的實時視頻是通過 RTSP、UDP 或其他協議獲取的,可以直接調用 FFmpeg 命令來抽取幀。 ffmpeg 命令 示例 1 ffmpeg -i rtsp://your_rtsp_stream_url -vf fps1 -update 1 output.jpg說明: -i rtsp://your…

【GIT】放棄”本地更改,恢復到遠程倉庫的狀態git fetch origin git reset --hard origin/分支名

如果你想完全放棄本地更改,恢復到遠程倉庫的狀態,可以按照以下步驟操作: 獲取遠程最新版本 首先執行: git fetch origin這條命令會把遠程倉庫的最新提交拉取到你的本地,但不會自動合并到你的當前分支。 硬重置你的當前…

flutter doctor 信號號超時

報錯如下: :\Users\Administrator>flutter doctor Doctor summary (to see all details, run flutter doctor -v): [√] Flutter (Channel stable, 3.27.4, on Microsoft Windows [版本 10.0.22631.5189], locale zh-CN) [√] Windows Version (Installed versi…

【Linux】系統入門

【Linux】系統初識 起源開源 閉源版本內核內核編號 Linux的安裝雙系統(不推薦)WindowsLinuxvmware虛擬機vitualbox操作系統的鏡像centos 7/ubuntu云服務器租用 Linux的操作lsmkdir 文件名pwdadduser userdel -rrm文件名cat /proc/cpuinfolinux支持編程vim code.c./a.out 運行程…

mybatis-plus整合springboot與使用方式

注解 TableField(exist false)&#xff1a;表示該屬性不為數據庫表字段&#xff0c;但又是必須使用的。 整合springboot pom <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xs…

[第十六屆藍橋杯 JavaB 組] 真題 + 經驗分享

A&#xff1a;逃離高塔(AC) 這題就是簡單的簽到題&#xff0c;按照題意枚舉即可。需要注意的是不要忘記用long&#xff0c;用int的話會爆。 &#x1f4d6; 代碼示例&#xff1a; import java.io.*; import java.util.*; public class Main {public static PrintWriter pr ne…

GPU服務器聲音很響可以怎么處理

當GPU服務器運行時噪音過大&#xff0c;通常是由于高負載下散熱風扇高速運轉所致。以下是分步驟的解決方案&#xff0c;幫助您有效降低噪音并保持設備穩定運行&#xff1a; 一、排查噪音來源 定位聲源 ? 使用 聲級計 或手機分貝檢測APP&#xff0c;確定最大噪音位置&#xff0…