詳解 Spark SQL 代碼開發之用戶自定義函數

一、UDF

一進一出函數

/**語法:SparkSession.udf.register(func_name: String, op: T => K)
*/
object TestSparkSqlUdf {def main(args: Array[String]): Unit = {// 創建 sparksql 環境對象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入環境對象中的隱式轉換import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:給 username 字段的每個值添加前綴*/spark.udf.register("prefixName", name => "Name: " + name)df.createOrReplaceTempView("user")spark.sql("select prefixName(username), age from user").show()// 關閉環境spark.close()}}

二、UDAF

多進一出函數,即聚合函數

1. 弱類型函數

/**自定義步驟:1.繼承 UserDefinedAggregateFunction 抽象類(已過時)2.重寫 8 個方法
*/
object TestSparkSqlUdaf {def main(args: Array[String]): Unit = {// 創建 sparksql 環境對象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入環境對象中的隱式轉換import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:自定義求年齡平均值的udaf函數*/val myAvgUdaf = new MyAvgUdaf()spark.udf.register("ageAvg", myAvgUdaf)df.createOrReplaceTempView("user")spark.sql("select ageAvg(age) from user").show()// 關閉環境spark.close()}}// 自定義聚合函數類,實現求年齡平均值
class MyAvgUdaf extends UserDefinedAggregateFunction {// 輸入數據的結構類型def inputSchema: StructType = {// StructType 是樣例類StructType(Array(// StructField 是樣例類,必傳參數 name: String, dataType: DataTypeStructField("age", LongType)))} // 緩沖區的結構類型def bufferSchema: StructType = {StructType(Array(StructField("totalAge", LongType),StructField("count", LongType)))}// 輸出數據的結構類型def dataType: DataType = DoubleType// 函數穩定性def deterministic: Boolean = true// 緩沖區初始化def initialize(buffer: MutableAggregationBuffer): Unit = {buffer.update(0, 0L)buffer.update(1, 0L)}// 接收輸入數據更新緩沖區數據def update(buffer: MutableAggregationBuffer, input: Row): Unit = {val totalAge = buffer.getLong(0)val count = buffer.getLong(1)val age = input.getLong(0)buffer.update(0, totalAge + age)buffer.update(1, count + 1)}// 合并緩沖區def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = {buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))}// 計算最終結果def evaluate(buffer: Row): Any = {buffer.getLong(0).toDouble/buffer.getLong(1)}}

2. 強類型函數

2.1 Spark3.0 之前
/**自定義步驟:1.繼承 Aggregator 抽象類,定義泛型IN:輸入數據類型BUF:緩沖區類型OUT:輸出數據類型2.重寫 6 個方法
*/
object TestSparkSqlUdaf1 {def main(args: Array[String]): Unit = {// 創建 sparksql 環境對象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入環境對象中的隱式轉換import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:自定義求年齡平均值的udaf函數*/// Spark3.0 之前的強類型UDAF函數必須在 DSL 語法中使用val ds = df.as[User]// 將UDAF函數對象轉換成 DSL 語法中的查詢列val col: TypedColumn[User, Double] = new MyAvgUdaf().toColumnds.select(col).show()// 關閉環境spark.close()}}// 定義封裝輸入的一行數據的類
case class User(username: String, age: Long)// 定義緩沖區類
case class Buff(var totalAge: Long, var count: Long)// 自定義聚合函數類,實現求年齡平均值
class MyAvgUdaf extends Aggregator[User, Buff, Long] {// 緩沖區初始化override def zero: Buff = Buff(0L, 0L)// 根據輸入數據更新緩沖區數據override def reduce(buff: Buff, in: User): Buff = {buff.totalAge = buff.totalAge + in.agebuff.count = buff.count + 1buff}// 合并緩沖區override def merge(buff1: Buff, buff2: Buff): Buff = {buff1.totalAge = buff1.totalAge + buff2.totalAgebuff1.count = buff1.count + buff2.countbuff1}// 計算最終結果override def finish(buff: Buff): Double = {buff.totalAge.toDouble/buff.count} //DataSet 默認的編解碼器,用于序列化,固定寫法//自定義類型是 product // 緩沖區編碼操作override def bufferEncoder: Encoder[Buff] = Encoders.product// 輸出數據編碼操作// 自帶類型根據類型選擇override def outputEncoder: Encoder[Double] = Encoders.scalaDouble}
2.2 Spark3.0 之后
/**自定義步驟:1.繼承 Aggregator 抽象類,定義泛型IN:輸入數據類型BUF:緩沖區類型OUT:輸出數據類型2.重寫 6 個方法
*/
object TestSparkSqlUdaf1 {def main(args: Array[String]): Unit = {// 創建 sparksql 環境對象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入環境對象中的隱式轉換import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:自定義求年齡平均值的udaf函數*/// Spark3.0 之后的強類型UDAF可以在 SQL 語法中使用val myAvgUdaf = new MyAvgUdaf()// 注冊函數時需要使用 functions.udaf(func) 包裝轉換spark.udf.register("ageAvg", functions.udaf(myAvgUdaf))df.createOrReplaceTempView("user")spark.sql("select ageAvg(age) from user").show()// 關閉環境spark.close()}}// 定義緩沖區類
case class Buff(var totalAge: Long, var count: Long)// 自定義聚合函數類,實現求年齡平均值
class MyAvgUdaf extends Aggregator[Long, Buff, Long] {// 緩沖區初始化override def zero: Buff = Buff(0L, 0L)// 根據輸入數據更新緩沖區數據override def reduce(buff: Buff, in: Long): Buff = {buff.totalAge = buff.totalAge + inbuff.count = buff.count + 1buff}// 合并緩沖區override def merge(buff1: Buff, buff2: Buff): Buff = {buff1.totalAge = buff1.totalAge + buff2.totalAgebuff1.count = buff1.count + buff2.countbuff1}// 計算最終結果override def finish(buff: Buff): Double = {buff.totalAge.toDouble/buff.count} //DataSet 默認的編解碼器,用于序列化,固定寫法//自定義類型是 product // 緩沖區編碼操作override def bufferEncoder: Encoder[Buff] = Encoders.product// 輸出數據編碼操作// 自帶類型根據類型選擇override def outputEncoder: Encoder[Double] = Encoders.scalaDouble}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/21752.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/21752.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/21752.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

subline text3安裝numpy,scipy,matplotlib,pandas,sklearn,ipynb

1,numpy(基礎數值算法) 安裝,要是在cmd直接安裝到最后會報錯, import numpy as np ModuleNotFoundError: No module named numpy 直接進入python環境,輸入python -m pip install numpy就不會報錯…

【SringBoot項目中MyBatis-Plus多數據源應用實踐】

文章目錄 前言 一、Mybatis-Plus是什么? 二、多數據源是什么? 三、使用步驟 1. 新建一個SpringBoot項目 2. 引入必要的MyBatis架包 3. 新建兩個數據庫及兩張表 3.3.1 新建數據庫:DB_A,并創建一張數據表alarm_kind,以及…

云端數據提取:安全、高效地利用無限資源

在當今的大數據時代,企業和組織越來越依賴于云平臺存儲和處理海量數據。然而,隨著數據的指數級增長,數據的安全性和高效的數據處理成為了企業最為關心的議題之一。本文將探討云端數據安全的重要性,并提出一套既高效又安全的數據提…

淺測 長亭雷池 WAF “動態防護”

本文首發于 Anyeの小站 前言 雷池 WAF 社區版的更新速度是真快啊,幾乎一周一個小版本,倆月一個大版本,攻城獅們真的狠啊,沒法測了。 廢話不多說,前兩天看到了 這篇文章,對雷池的“動態防護”功能挺感興趣…

Android應用的基本構造及威脅(apk)

目錄 APK文件是什么 apk文件解壓后的目錄結構 apk文件的存儲位置

去掉el-table表頭右側類名是gutter,width=17px的空白區域(包括表頭樣式及表格奇偶行樣式和表格自動滾動)

代碼如下&#xff1a; <el-table:data"tableData"ref"scroll_Table":header-cell-style"getRowClass":cell-style"styleBack"height"350px"style"width: 100%"><el-table-column prop"id" l…

Scrum團隊在迭代中如何處理計劃外的工作

認為 Scrum 團隊不做計劃其實是一個誤區&#xff0c;實際上很多 Scrum 團隊在沖刺計劃會議以及在細化工作項時均會進行詳細規劃。此外&#xff0c;他們還會創建一個路線圖&#xff0c;以便顯示他們在多個沖刺中的計劃。 Scrum 團隊需要經常進行計劃&#xff0c;以便在不斷變化…

linux學習:進程

目錄 例子1 獲取當前進程的進程標識符 例子2 創建一個新的子進程 例子3 展示了父進程和子進程的進程標識符 例子4 區分父進程和子進程 例子5 區分父進程和子進程的行為 例子6 比較進程標識符來區分父進程和子進程 例子7 子進程如何修改一個變量&…

混合動力電動汽車介紹(二)

接續前一章內容&#xff0c;本篇文章介紹混合動力汽車串聯、并聯和混聯的系統組成和工作原理。 一、串聯混合動力電動汽車的系統組成和工作原理 上圖為串聯混合動力電動汽車的結構簡圖。汽車由電動機-發電機驅動行駛&#xff0c;電機控制器的動力來自油箱-發動機-發電機-發電機…

Python 爬蟲零基礎:探索網絡數據的神秘世界

Python 爬蟲零基礎&#xff1a;探索網絡數據的神秘世界 在數字化時代&#xff0c;網絡數據如同無盡的寶藏&#xff0c;等待著我們去發掘。Python爬蟲&#xff0c;作為獲取這些數據的重要工具&#xff0c;正逐漸走進越來越多人的視野。對于零基礎的學習者來說&#xff0c;如何入…

基于Spring Boot框架的分頁查詢和文件上傳

分頁查詢 分析 要想從數據庫中進行分頁查詢&#xff0c;我們要使用LIMIT關鍵字&#xff0c;格式為&#xff1a;limit 開始索引 每頁顯示的條數 假設一頁想展示10條數據 查詢第1頁數據的SQL語句是&#xff1a; select * from emp limit 0,10; 查詢第2頁數據的SQL語句是&…

【Pytest官方文檔翻譯及學習】2.2 如何在測試中編寫和報告斷言

目錄 2.2 如何在測試中編寫和報告斷言 2.2.1 使用assert語句斷言 2.2.2 關于預期異常的斷言 2.2.3 關于預期警告的斷言 2.2.4 應用上下文相關的比較 2.2.5 為失敗的斷言定義自己的解釋 2.2.6 斷言內省細節 2.2 如何在測試中編寫和報告斷言 2.2.1 使用assert語句斷言 p…

6、架構-服務端緩存

為系統引入緩存之前&#xff0c;第一件事情是確認系統是否真的需要緩 存。從開發角度來說&#xff0c;引入緩存會提 高系統復雜度&#xff0c;因為你要考慮緩存的失效、更新、一致性等問題&#xff1b;從運維角度來說&#xff0c;緩存會掩蓋一些缺 陷&#xff0c;讓問題在更久的…

npm徹底清理緩存

在使用npm過程中&#xff0c;肯定會遇到清緩存的情況&#xff0c;網上的命令一般為 npm cache clear --force有時筆者在清理緩存之后npm install依然失敗&#xff0c;仔細發現&#xff0c;執行該命令之后npm報了一個警告 npm WARN using --force Recommended protections dis…

代碼隨想錄算法訓練營第27天|● 39. 組合總和● 40.組合總和II● 131.分割回文串

組合總和 題目鏈接 39. 組合總和 - 力扣&#xff08;LeetCode&#xff09; 代碼&#xff1a; class Solution {public List<List<Integer>> res new ArrayList<>();public List<Integer> list new ArrayList<>();public int sum 0;/**…

在nginx中配置反向代理

在nginx中配置反向代理&#xff0c;需要使用proxy_pass指令。以下是一個簡單的nginx反向代理配置示例&#xff1a; server {listen 80;server_name example.com;location / {proxy_pass http://backend_server;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote…

LoadRunner 錄制腳本時提示無Internet訪問/加載慢,如何解決?

LoadRunner 錄制腳本時提示無Internet訪問/加載慢&#xff0c;如何解決&#xff1f; 在使用LoadRunner 12.02 進行錄制腳本時提示無Internet訪問&#xff0c;這是如下圖&#xff1a; 翻譯中文如下&#xff1a; 這里&#xff0c;我認為大家應該都已經點過yes了&#xff0c;但是…

python結構化模式匹配switch-case,Python 3.10中引入,Python的模式匹配(pattern matching)語法

增加了采用模式加上相應動作的 match 語句 和 case 語句 的形式的結構化模式匹配。 模式由序列、映射、基本數據類型以及類實例構成。 模式匹配使得程序能夠從復雜的數據類型中提取信息、根據數據結構實現分支&#xff0c;并基于不同的數據形式應用特定的動作。 語法與操作 模…

Linux下配置Pytorch

1.Anaconda 1.1虛擬環境創建 2.Nvidia驅動 3.CUDA驅動安裝 4.Pytorch安裝 具體的步驟如上&#xff1a;可參考另一位博主的博客非常詳細&#xff1a; Linux服務器配置PythonPyTorchCUDA深度學習環境_linux cuda環境配置-CSDN博客https://blog.csdn.net/NSJim/article/detai…

極海APM32F072用Keil5燒錄失敗Error: Flash Download failed -“Cortex-MO+“

在用Keil5燒錄時&#xff0c;出現錯誤彈窗&#xff0c;大概長這樣&#xff1a; 檢查了一圈設置&#xff0c;都搞不好。 先用J-Flash&#xff0c;顯示讀寫保護&#xff08;未截圖&#xff09;&#xff0c;會跳出界面讓選擇是否解除讀寫保護&#xff1a; 1.點擊允許讀操作YES&am…