一、UDF
一進一出函數
/**語法:SparkSession.udf.register(func_name: String, op: T => K)
*/
object TestSparkSqlUdf {def main(args: Array[String]): Unit = {// 創建 sparksql 環境對象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入環境對象中的隱式轉換import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:給 username 字段的每個值添加前綴*/spark.udf.register("prefixName", name => "Name: " + name)df.createOrReplaceTempView("user")spark.sql("select prefixName(username), age from user").show()// 關閉環境spark.close()}}
二、UDAF
多進一出函數,即聚合函數
1. 弱類型函數
/**自定義步驟:1.繼承 UserDefinedAggregateFunction 抽象類(已過時)2.重寫 8 個方法
*/
object TestSparkSqlUdaf {def main(args: Array[String]): Unit = {// 創建 sparksql 環境對象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入環境對象中的隱式轉換import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:自定義求年齡平均值的udaf函數*/val myAvgUdaf = new MyAvgUdaf()spark.udf.register("ageAvg", myAvgUdaf)df.createOrReplaceTempView("user")spark.sql("select ageAvg(age) from user").show()// 關閉環境spark.close()}}// 自定義聚合函數類,實現求年齡平均值
class MyAvgUdaf extends UserDefinedAggregateFunction {// 輸入數據的結構類型def inputSchema: StructType = {// StructType 是樣例類StructType(Array(// StructField 是樣例類,必傳參數 name: String, dataType: DataTypeStructField("age", LongType)))} // 緩沖區的結構類型def bufferSchema: StructType = {StructType(Array(StructField("totalAge", LongType),StructField("count", LongType)))}// 輸出數據的結構類型def dataType: DataType = DoubleType// 函數穩定性def deterministic: Boolean = true// 緩沖區初始化def initialize(buffer: MutableAggregationBuffer): Unit = {buffer.update(0, 0L)buffer.update(1, 0L)}// 接收輸入數據更新緩沖區數據def update(buffer: MutableAggregationBuffer, input: Row): Unit = {val totalAge = buffer.getLong(0)val count = buffer.getLong(1)val age = input.getLong(0)buffer.update(0, totalAge + age)buffer.update(1, count + 1)}// 合并緩沖區def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = {buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))}// 計算最終結果def evaluate(buffer: Row): Any = {buffer.getLong(0).toDouble/buffer.getLong(1)}}
2. 強類型函數
2.1 Spark3.0 之前
/**自定義步驟:1.繼承 Aggregator 抽象類,定義泛型IN:輸入數據類型BUF:緩沖區類型OUT:輸出數據類型2.重寫 6 個方法
*/
object TestSparkSqlUdaf1 {def main(args: Array[String]): Unit = {// 創建 sparksql 環境對象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入環境對象中的隱式轉換import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:自定義求年齡平均值的udaf函數*/// Spark3.0 之前的強類型UDAF函數必須在 DSL 語法中使用val ds = df.as[User]// 將UDAF函數對象轉換成 DSL 語法中的查詢列val col: TypedColumn[User, Double] = new MyAvgUdaf().toColumnds.select(col).show()// 關閉環境spark.close()}}// 定義封裝輸入的一行數據的類
case class User(username: String, age: Long)// 定義緩沖區類
case class Buff(var totalAge: Long, var count: Long)// 自定義聚合函數類,實現求年齡平均值
class MyAvgUdaf extends Aggregator[User, Buff, Long] {// 緩沖區初始化override def zero: Buff = Buff(0L, 0L)// 根據輸入數據更新緩沖區數據override def reduce(buff: Buff, in: User): Buff = {buff.totalAge = buff.totalAge + in.agebuff.count = buff.count + 1buff}// 合并緩沖區override def merge(buff1: Buff, buff2: Buff): Buff = {buff1.totalAge = buff1.totalAge + buff2.totalAgebuff1.count = buff1.count + buff2.countbuff1}// 計算最終結果override def finish(buff: Buff): Double = {buff.totalAge.toDouble/buff.count} //DataSet 默認的編解碼器,用于序列化,固定寫法//自定義類型是 product // 緩沖區編碼操作override def bufferEncoder: Encoder[Buff] = Encoders.product// 輸出數據編碼操作// 自帶類型根據類型選擇override def outputEncoder: Encoder[Double] = Encoders.scalaDouble}
2.2 Spark3.0 之后
/**自定義步驟:1.繼承 Aggregator 抽象類,定義泛型IN:輸入數據類型BUF:緩沖區類型OUT:輸出數據類型2.重寫 6 個方法
*/
object TestSparkSqlUdaf1 {def main(args: Array[String]): Unit = {// 創建 sparksql 環境對象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入環境對象中的隱式轉換import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:自定義求年齡平均值的udaf函數*/// Spark3.0 之后的強類型UDAF可以在 SQL 語法中使用val myAvgUdaf = new MyAvgUdaf()// 注冊函數時需要使用 functions.udaf(func) 包裝轉換spark.udf.register("ageAvg", functions.udaf(myAvgUdaf))df.createOrReplaceTempView("user")spark.sql("select ageAvg(age) from user").show()// 關閉環境spark.close()}}// 定義緩沖區類
case class Buff(var totalAge: Long, var count: Long)// 自定義聚合函數類,實現求年齡平均值
class MyAvgUdaf extends Aggregator[Long, Buff, Long] {// 緩沖區初始化override def zero: Buff = Buff(0L, 0L)// 根據輸入數據更新緩沖區數據override def reduce(buff: Buff, in: Long): Buff = {buff.totalAge = buff.totalAge + inbuff.count = buff.count + 1buff}// 合并緩沖區override def merge(buff1: Buff, buff2: Buff): Buff = {buff1.totalAge = buff1.totalAge + buff2.totalAgebuff1.count = buff1.count + buff2.countbuff1}// 計算最終結果override def finish(buff: Buff): Double = {buff.totalAge.toDouble/buff.count} //DataSet 默認的編解碼器,用于序列化,固定寫法//自定義類型是 product // 緩沖區編碼操作override def bufferEncoder: Encoder[Buff] = Encoders.product// 輸出數據編碼操作// 自帶類型根據類型選擇override def outputEncoder: Encoder[Double] = Encoders.scalaDouble}