Spark---DataFrame存儲、Spark UDF函數、UDAF函數

四、DataFrame存儲+Spark UDF函數

1、儲存DataFrame

1)、將DataFrame存儲為parquet文件

2)、將DataFrame存儲到JDBC數據庫

3)、將DataFrame存儲到Hive表

2、UDF:用戶自定義函數

可以自定義類實現UDFX接口

java:

SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Row call(String s) throws Exception {
return RowFactory.create(s);}
});List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD,schema);
df.registerTempTable("user");/*** 根據UDF函數參數的個數來決定是實現哪一個UDF  UDF1,UDF2。。。。UDF1xxx*/
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(String t1) throws Exception {return t1.length();}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();//sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
//
//	/**
//	 * 
//	 */
//	private static final long serialVersionUID = 1L;
//
//	@Override
//	public Integer call(String t1, Integer t2) throws Exception {
//return t1.length()+t2;
//	}
//} ,DataTypes.IntegerType );
//sqlContext.sql("select name ,StrLen(name,10) as length from user").show();sc.stop();	

scala:

1.val spark = SparkSession.builder().master("local").appName("UDF").getOrCreate()
2.val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhaoliu", "tianqi")
3.import spark.implicits._
4.val nameDF: DataFrame = nameList.toDF("name")
5.nameDF.createOrReplaceTempView("students")
6.nameDF.show()
7.
8.spark.udf.register("STRLEN",(name:String)=>{
9.name.length
10.})
11.spark.sql("select name ,STRLEN(name) as length from students order by length desc").show(100)

五、UDAF函數

1、UDAF:用戶自定義聚合函數

1)、實現UDAF函數如果要自定義類要繼承

UserDefinedAggregateFunction類

2)、UDAF原理圖

java:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("udaf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu","zhangsan","zhangsan","lisi"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Row call(String s) throws Exception {return RowFactory.create(s);}
});List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/*** 注冊一個UDAF函數,實現統計相同值得個數* 注意:這里可以自定義一個類繼承UserDefinedAggregateFunction類也是可以的*/
sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {/*** */private static final long serialVersionUID = 1L;/*** 更新 可以認為一個一個地將組內的字段值傳遞進來 實現拼接的邏輯* buffer.getInt(0)獲取的是上一次聚合后的值* 相當于map端的combiner,combiner就是對每一個map task的處理結果進行一次小聚合 * 大聚和發生在reduce端.* 這里即是:在進行聚合的時候,每當有新的值進來,對分組后的聚合如何進行計算*/@Overridepublic void update(MutableAggregationBuffer buffer, Row arg1) {buffer.update(0, buffer.getInt(0)+1);}/*** 合并 update操作,可能是針對一個分組內的部分數據,在某個節點上發生的 但是可能一個分組內的數據,會分布在多個節點上處理* 此時就要用merge操作,將各個節點上分布式拼接好的串,合并起來* buffer1.getInt(0) : 大聚和的時候 上一次聚合后的值       * buffer2.getInt(0) : 這次計算傳入進來的update的結果* 這里即是:最后在分布式節點完成后需要進行全局級別的Merge操作*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));}/*** 指定輸入字段的字段及類型*/@Overridepublic StructType inputSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));}/*** 初始化一個內部的自己定義的值,在Aggregate之前每組數據的初始化結果*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);}/*** 最后返回一個和DataType的類型要一致的類型,返回UDAF最后的計算結果*/@Overridepublic Object evaluate(Row row) {return row.getInt(0);}@Overridepublic boolean deterministic() {//設置為truereturn true;}/*** 指定UDAF函數計算后返回的結果類型*/@Overridepublic DataType dataType() {return DataTypes.IntegerType;}/*** 在進行聚合操作的時候所要處理的數據的結果的類型*/@Overridepublic StructType bufferSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));}});sqlContext.sql("select name ,StringCount(name) from user group by name").show();sc.stop();

scala:

1.class MyCount extends UserDefinedAggregateFunction{
2.  //輸入數據的類型
3.  override def inputSchema: StructType =    StructType(List[StructField](StructField("xx",StringType,true)))
4.
5.  //在聚合過程中處理的數據類型
6.  override def bufferSchema: StructType =   StructType(List[StructField](StructField("xx",IntegerType,true)))
7.
8.  //最終返回值的類型,與evaluate返回的值保持一致
9.  override def dataType: DataType = IntegerType
10.
11.  //多次運行數據是否一致
12.  override def deterministic: Boolean = true
13.
14.  //每個分區中每組key 對應的初始值
15.  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,0)
16.
17.  //每個分區中,每個分組內進行聚合操作
18.  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
19.    buffer.update(0,buffer.getInt(0) + 1)
20.  }
21.
22.  //不同的分區中相同的key的數據進行聚合
23.  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
24.    buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0))
25.  }
26.
27.  //聚合之后,每個分組最終返回的值,類型要和dataType 一致
28.  override def evaluate(buffer: Row): Any = buffer.getInt(0)
29.}
30.
31.object Test {
32.  def main(args: Array[String]): Unit = {
33.    val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
34.    val list = List[String]("zhangsan","lisi","wangwu","zhangsan","lisi","zhangsan")
35.
36.    import session.implicits._
37.    val frame = list.toDF("name")
38.    frame.createTempView("mytable")
39.
40.    session.udf.register("MyCount",new MyCount())
41.
42.    val result = session.sql("select name,MyCount(name) from mytable group by name")
43.    result.show()
44.
45.  }
46.}
47.

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/207019.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/207019.shtml
英文地址,請注明出處:http://en.pswp.cn/news/207019.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

案例062:基于微信小程序的健身房私教預約系統

文末獲取源碼 開發語言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 數據庫&#xff1a;mysql 5.7 開發軟件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序開發軟件&#xff1a;HBuilder X 小程序…

模塊式雨水調蓄池施工簡單,無需大型機械,可實現當天開挖當天回填

模塊式雨水調蓄池的施工過程非常簡單&#xff0c;無需大型機械和繁瑣的施工工藝。在施工過程中&#xff0c;只需要進行簡單的開挖和回填即可&#xff0c;而且可以在當天完成。這種施工方式不僅節省了施工時間和成本&#xff0c;還可以避免因大型機械和繁瑣工藝引起的安全隱患。…

MIT_線性代數筆記: 復習一

目錄 問題一問題二問題三問題四 本講為考前復習課&#xff0c;考試范圍就是 Axb 這個單元&#xff0c;重點是長方形矩陣&#xff0c;與此相關的概念包括零空間、左零空間、秩、向量空間、子空間&#xff0c;特別是四個基本子空間。當矩陣為可逆的方陣時&#xff0c;很多性質是一…

二叉樹的層次遍歷

102. 二叉樹的層序遍歷 - 力扣&#xff08;LeetCode&#xff09; 題目描述 給你二叉樹的根節點 root &#xff0c;返回其節點值的 層序遍歷 。 &#xff08;即逐層地&#xff0c;從左到右訪問所有節點&#xff09;。 樣例輸入 示例 1&#xff1a; 輸入&#xff1a;root [3…

php研究課題

對于PHP這門語言而言&#xff0c;可以研究的課題有很多&#xff0c;以下是可能的課題方向和對應的內容&#xff1a; PHP語言基礎研究 PHP語言特性和基本語法PHP的數據類型、變量、運算符和表達式PHP的流程控制語句PHP的函數和引用PHP的面向對象編程和設計模式 PHP與Web開發 …

harmony開發之Text組件的使用

TextInput、TextArea是輸入框組件&#xff0c;通常用于響應用戶的輸入操作&#xff0c;比如評論區的輸入、聊天框的輸入、表格的輸入等&#xff0c;也可以結合其它組件構建功能頁面&#xff0c;例如登錄注冊頁面。 圖片來源黑馬程序員 Text組件的使用&#xff1a; 文本顯示組…

flutter學習-day1-環境搭建和啟動第一個項目

&#x1f4da; 目錄 SDK 下載配置環境變量安裝 flutter搭建 Android 環境SDK 和依賴升級IDE 配置與使用 Android Studio 配置與使用VS Code 配置與使用 真機調試 本文學習和引用自《Flutter實戰第二版》&#xff1a;作者&#xff1a;杜文 1. SDK下載 前置需要操作系統 window …

Spring Cloud + Vue前后端分離-第4章 使用Vue cli 4搭建管理控臺

Spring Cloud Vue前后端分離-第4章 使用Vue cli 4搭建管理控臺 4-1 使用vue cli創建admin項目 Vue 簡介 Vue作者尤雨溪在google工作時&#xff0c;最早只想研究angular的數據綁定功能&#xff0c;后面覺得這個小功能很好用&#xff0c;有前景&#xff0c;就再擴展&#xff…

[MySQL] MySQL復合查詢(多表查詢、子查詢)

前面我們學習了MySQL簡單的單表查詢。但是我們發現&#xff0c;在很多情況下單表查詢并不能很好的滿足我們的查詢需求。本篇文章會重點講解MySQL中的多表查詢、子查詢和一些復雜查詢。希望本篇文章會對你有所幫助。 文章目錄 一、基本查詢回顧 二、多表查詢 2、1 笛卡爾積 2、2…

機器學習筆記 - 基于深度學習計算視頻中演員的出鏡時間

一、基本步驟 這里是使用動畫片貓和老鼠進行計算,基本流程如下: 1、導入并讀取視頻,從中提取幀,并將其另存為圖像 2、標記一些圖像以訓練模型(別擔心,我已經為你做好了) 3、根據訓練數據構建我們的模型 4、對剩余圖像進行預測 5、計算湯姆和杰瑞的屏幕時間 二、基礎環境…

教師未來發展前景如何

作為一名教師&#xff0c;我對未來發展的前景也感到有些迷茫。 不過教育行業仍然是一個穩定的職業&#xff0c;但是隨著社會的變化和科技的發展&#xff0c;傳統的教學模式已經逐漸被在線教育、人工智能等新型教學方式所取代。這使得教師的角色和職責也在發生變化&#xff0c;需…

matplot繪圖時圖像太大報錯但能保存

matplot繪圖時&#xff0c;圖像太大&#xff0c;可能在jupyter里面報錯&#xff0c;但是圖像可以保存。 報錯&#xff1a;Image size of 12237479x675 pixels is too large. It must be less than 2^16 in each direction. 在這里插入圖片描述

Linux中用bash寫腳本

本章主要介紹如何使用bash 了解通配符了解變量了解返回值和數值運算判斷語句 grep的用法是“grep 關鍵字 file”&#xff0c;意思是從file中過濾出含有關鍵字的行 例如&#xff0c;grep root /var/log/messages&#xff0c;意思是從/var/log/messages 中過濾出含有root 的行…

SpringIOC第二課,@Bean用法,DI詳解,常見面試題Autowired VS Resource

一、回顧 但是我們之前MVC時候&#xff0c;在頁面上&#xff0c;為什只用Controller,不用其他的呢&#xff1f; 用其他的好使嗎&#xff1f;(我們可以在這里看到&#xff0c;出現404的字樣&#xff09; Service ResponseBody public class TestController {RequestMapping(&quo…

kubernetes安裝kubesphere

前置默認都安裝了k8s&#xff0c;且k8s都正常 1、nfs文件系統 1.1、安裝nfs-server # 在每個機器。 yum install -y nfs-utils# 在master 執行以下命令 echo "/nfs/data/ *(insecure,rw,sync,no_root_squash)" > /etc/exports# 執行以下命令&#xff0c;啟動 …

數字化和數智化一字之差,究竟有何異同點?

在2023杭州云棲大會的一展臺內&#xff0c;桌子上放著一顆番茄和一個蛋糕&#xff0c;一旁的機器人手臂融入“通義千問”大模型技術后&#xff0c;變得會“思考”&#xff1a;不僅能描述“看”到了什么&#xff0c;還能確認抓取的是番茄而不是蛋糕。 “傳統的機械臂通常都只能基…

Post Quantum Fuzzy Stealth Signatures and Applications

目錄 筆記后續的研究方向摘要引言貢獻模塊化框架模糊構造實施適用于FIDO Post Quantum Fuzzy Stealth Signatures and Applications CCS 2023 筆記 后續的研究方向 摘要 自比特幣問世以來&#xff0c;基于區塊鏈的加密貨幣中的私人支付一直是學術和工業研究的主題。隱形地址…

cmd命令 常用的命令

網絡工作為常年公司里的背鍋俠&#xff0c;不得不集齊十八般武藝很難甩鍋。像cmd命令這種好用又好上手的技術&#xff0c;就是網絡工程師上班常備技能。 只要按下快捷鍵 winR&#xff0c;輸入cmd回車&#xff0c;然后輸入cmd命令。 像我自己&#xff0c;我就經常用cmd命令檢測…

在UBUNTU上使用Qemu和systemd-nspawn搭建RISC-V輕量級用戶模式開發環境

參考鏈接 使用Qemu和systemd-nspawn搭建RISC-V輕量級用戶模式開發環境 - 知乎 安裝Qemu sudo apt updatesudo apt -y install qemu-user-binfmt qemu-user-static systemd-container sudo apt -y install zstd 配置環境 RISCV_FILEarchriscv-2023-10-09.tar.zstwget -c ht…

浪潮信息KeyarchOS——保衛數字未來的安全防御利器

浪潮信息KeyarchOS——保衛數字未來的安全防御利器 前言 眾所周知&#xff0c;目前流行的操作系統有10余種&#xff0c;每一款操作系統都有自己的特點。作為使用者&#xff0c;我們該如何選擇操作系統。如果你偏重操作系統的安全可信和穩定高效&#xff0c;我推薦你使用浪潮信…