Spark SQL 中DataFrame DSL的使用

在上一篇文章中已經大致說明了DataFrame APi,下面我們具體介紹DataFrame DSL的使用。DataFrame DSL是一種命令式編寫Spark SQL的方式,使用的是一種類sql的風格語法。

文章鏈接:

一、單詞統計案例引入

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]): Unit = {/*** 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/*** spark sql和spark core的核心數據類型不太一樣** 1、讀取數據構建一個DataFrame,相當于一張表*/val linesDF: DataFrame = sparkSession.read.format("csv") //指定讀取數據的格式.schema("line STRING") //指定列的名和列的類型,多個列之間使用,分割.option("sep", "\n") //指定分割符,csv格式讀取默認是英文逗號.load("spark/data/words.txt") // 指定要讀取數據的位置,可以使用相對路徑/*** DSL: 類SQL語法 api  介于代碼和純sql之間的一種api** spark在DSL語法api中,將純sql中的函數都使用了隱式轉換變成一個scala中的函數* 如果想要在DSL語法中使用這些函數,需要導入隱式轉換**///導入Spark sql中所有的sql隱式轉換函數import org.apache.spark.sql.functions._//導入另一個隱式轉換,后面可以直接使用$函數引用字段進行處理import sparkSession.implicits._//    linesDF.select(explode(split($"line","\\|")) as "word")
//      .groupBy($"word")
//      .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/*** 保存數據*/resultDF.repartition(1).write.format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}

注意:show()可以指定兩個參數,第一個參數為展現的條數,不指定默認展示前20條數據,第二個參數默認為false,代表的是如果數據過長展示就會不完全,可以指定為true,使得數據展示完整,比如 : show(200,truncate = false)

二、數據源獲取

查看官方文檔:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多種數據源的獲取。

?1、csv-->json

    val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("多種類型數據源讀取演示").config("spark.sql.shuffer.partitions", 1) //指定分區數為1,默認分區數是200個.getOrCreate()//導入spark sql中所有的隱式轉換函數import org.apache.spark.sql.functions._//導入sparkSession下的所有隱式轉換函數,后面可以直接使用$函數引用字段import sparkSession.implicits._/*** 讀csv格式的文件-->寫到json格式文件中*///1500100967,能映秋,21,女,文科五班val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age Int,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")studentsDF.write.format("json").mode(SaveMode.Overwrite).save("spark/data/students_out_json.json")

2、json-->parquet

val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分區數為1,默認分區數是200個.getOrCreate()//導入spark sql中所有的隱式轉換函數//導入sparkSession下的所有隱式轉換函數,后面可以直接使用$函數引用字段/*** 讀取json數據格式,因為json數據有鍵值對,會自動的將健作為列名,值作為列值,不需要手動的設置表結構*///1500100967,能映秋,21,女,文科五班//方式1://    val studentsJsonDF: DataFrame = sparkSession.read//      .format("json")//      .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")//方式2:實際上也是調用方式1,只是更簡潔了// def json(paths: String*): DataFrame = format("json").load(paths : _*)val studebtsReadDF: DataFrame = sparkSession.read.json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")studebtsReadDF.write.format("parquet").mode(SaveMode.Overwrite).save("spark/data/students_parquet")

3、parquet-->csv

    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分區數為1,默認分區數是200個.getOrCreate()//導入Spark sql中所有的sql隱式轉換函數import org.apache.spark.sql.functions._//導入另一個隱式轉換,后面可以直接使用$函數引用字段進行處理import sparkSession.implicits._/*** parquet:壓縮的比例由信息熵決定,通俗的說就是數據的重復程度決定*/val studebtsReadDF: DataFrame = sparkSession.read.format("parquet").load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")studebtsReadDF.write.format("csv").mode(SaveMode.Overwrite).save("spark/data/students_csv")

三、DataFrame DSL API的使用

1、select


import org.apache.spark.sql.{DataFrame, SparkSession}object Demo1Select {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("select函數演示").getOrCreate()//導入Spark sql中所有的sql隱式轉換函數import org.apache.spark.sql.functions._//導入另一個隱式轉換,后面可以直接使用$函數引用字段進行處理import sparkSession.implicits._val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age String,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")/*** select函數*///方式1:只能查詢原有字段,不能對字段做出處理,比如加減、起別名之類studentsDF.select("id", "name", "age")//方式2:彌補了方式1的不足studentsDF.selectExpr("id","name","age+1 as new_age")//方式3:使用隱式轉換函數中的$將字段變為一個對象val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")//3.1使用對象對字段進行處理
//    stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show()       //不可使用未變為對象的字段stuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age")                 // +是函數,可以等價于該語句//3.2可以在select中使用sql函數studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))}
}

2、where

    /*** where函數:過濾數據*///方式1:直接將sql中的where語句以字符串形式傳參studentsDF.where("clazz='文科一班' and gender='男'")//方式2:使用$列對象形式過濾/*** 注意在此種方式下:等于和不等于符號與我們平常使用的有所不同* 等于:===* 不等于:=!=*/studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()

3、groupBy和agg

    /*** groupby:分組函數     agg:聚合函數* 注意:* 1、groupby與agg函數通常都是一起使用* 2、分組聚合之后的結果DataFrame中只會包含分組字段與聚合字段* 3、分組聚合之后select中無法出現不是分組的字段*///需求:根據班級分組,求每個班級的人數和平均年齡studentsDF.groupBy($"clazz").agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age").show()

4、join

/*** 5、join:表關聯*/val subjectDF1: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id String,subject_id String,score Int").load("spark/data/score.csv")val subjectDF2: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("sid String,subject_id String,score Int").load("spark/data/score.csv")//關聯場景1:所關聯的字段名字一樣studentsDF.join(subjectDF1,"id")//關聯場景2:所關聯的字段名字不一樣studentsDF.join(subjectDF2,$"id"===$"sid","inner")
//    studentsDF.join(subjectDF2,$"id"===$"sid","left").show()/*** 上面兩種關聯場景默認inner連接方式(內連接),可以指定參數選擇連接方式,比如左連接、右連接、全連接之類* * @param joinType Type of join to perform. Default `inner`. Must be one of:* *                 `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,* *                 `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.*/

5、開窗

    /*** 開窗函數* 1、ROW_NUMBER():為分區中的每一行分配一個唯一的序號。序號是根據ORDER BY子句定義的順序分配的* 2、RANK()和DENSE_RANK():為分區中的每一行分配一個排名。RANK()在遇到相同值時會產生間隙,而DENSE_RANK()則不會。**///需求:統計每個班級總分前三的學生val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")//方式1:在select中使用row_number() over Window.partitionBy().orderBy()stu_scoreDF.groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank").where($"score_rank" <= 3)//方式2:使用withcolumn()函數,會新增一列,但是要預先指定列名stu_scoreDF.repartition(1).groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc)).where($"score_rank" <= 3).show()

注意:

? ? ? DSL API 不直接對應 SQL 的關鍵字執行順序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照構建邏輯查詢的方式來組織代碼,使其與 SQL 查詢的邏輯結構相似。

在構建 Spark DataFrame 轉換和操作時,常用流程介紹:

  1. 選擇數據源:使用?spark.read?或從其他 DataFrame 派生。
  2. 轉換:使用各種轉換函數(如?selectfiltermapflatMapjoin?等)來修改 DataFrame。
  3. 聚合:使用?groupBy?和聚合函數(如?sumavgcount?等)對數據進行分組和匯總。
  4. 排序:使用?orderBy?或?sort?對數據進行排序。
  5. 輸出:使用?showcollectwrite?等函數將結果輸出到控制臺、收集到驅動程序或寫入外部存儲。

四、RDD與DataFrame的轉換

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object RddToDf {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("Rdd與Df之間的轉換").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._val sparkContext: SparkContext = sparkSession.sparkContextval idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv").map(_.split(",")).map {case Array(id: String, name: String, _, _, _) => (id, name)}/*** Rdd-->DF* 因為在Rdd中不會存儲文件的結構(schema)信息,所以要指定字段*/val idNameDF: DataFrame = idNameRdd.toDF("id", "name")idNameDF.createOrReplaceTempView("idNameTb")sparkSession.sql("select id,name from idNameTb").show()/*** DF-->Rdd*/val idNameRdd2: RDD[Row] = idNameDF.rddidNameRdd2.foreach(println)}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/14488.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/14488.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/14488.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在SpringBoot中添加自定義增強SpringEvent事件組件

場景說明&#xff1a;在使用SpringBoot時&#xff0c;總是要添加一大堆自定義事件&#xff0c;實現ApplicationEvent&#xff0c;來實現事件發送。 這樣寫代碼量非常大。為了方便和避免出錯&#xff0c;封裝自定義的模塊&#xff0c;快速實現泛型中調用SpringEvent實現事件。省…

Xinstall助力實現App間直接跳轉,提升用戶體驗

在移動互聯網時代&#xff0c;App已成為我們日常生活中不可或缺的一部分。然而&#xff0c;在使用各類App時&#xff0c;我們經常會遇到需要在不同App之間切換的情況&#xff0c;這時如果能夠直接跳轉&#xff0c;將會大大提升用戶體驗。而Xinstall正是這樣一款能夠幫助開發者實…

OpenCV 獲取 RTSP 攝像頭視頻流保存至本地

介紹 Java OpenCV 是一個強大的開源計算機視覺庫&#xff0c;它提供了豐富的圖像處理和分析功能&#xff0c;越來越多的應用需要使用攝像頭來獲取實時視頻流進行處理和分析。 在 Java 中使用 OpenCV 打開攝像頭的基本步驟如下&#xff1a; 確保已經安裝了OpenCV庫使用 OpenC…

Raylib 繪制自定義字體的一種套路

Raylib 繪制自定義字體是真的難搞。我的需求是程序可以加載多種自定義字體&#xff0c;英文中文的都有。 我調試了很久成功了&#xff01; 很有用的參考&#xff0c;建議先看一遍&#xff1a; 瞿華&#xff1a;raylib繪制中文內容 個人筆記&#xff5c;Raylib 的字體使用 - …

W801 實現獲取天氣情況

看了小安派&#xff08;AiPi-Eyes 天氣站&#xff09;的源碼&#xff0c;感覺用W801也可以實現。 一、部分源碼 main.c #include "wm_include.h" #include "Lcd_Driver.h"void UserMain(void) {printf("\n user task \n");Lcd_Init();Lcd_Clea…

MySQL主從復制(五):讀寫分離

一主多從架構主要應用場景&#xff1a;讀寫分離。讀寫分離的主要目標是分攤主庫的壓力。 讀寫分離架構 讀寫分離架構一 架構一結構圖&#xff1a; 這種結構模式下&#xff0c;一般會把數據庫的連接信息放在客戶端的連接層&#xff0c;由客戶端主動做負載均衡。也就是說由客戶…

RabbitMQ 消息隊列安裝及入門

市面常見消息隊列中間件對比 技術名稱吞吐量 /IO/并發時效性&#xff08;類似延遲&#xff09;消息到達時間可用性可靠性優勢應用場景activemq萬級高高高簡單易學中小型企業、項目rabbitmq萬級極高&#xff08;微秒&#xff09;高極高生態好&#xff08;基本什么語言都支持&am…

為什么MySQL推薦使用utf8mb4代替utf8?

前言 在MySQL數據庫的世界里&#xff0c;字符集的選擇直接影響著數據的存儲和檢索方式&#xff0c;尤其是對于多語言支持至關重要的應用而言。近年來&#xff0c;utf8mb4字符集逐漸成為MySQL中存儲Unicode字符的標準選擇&#xff0c;逐步取代了傳統的utf8字符集。本文將詳細探…

leetcode124 二叉樹中的最大路徑和-dp

題目 二叉樹中的 路徑 被定義為一條節點序列&#xff0c;序列中每對相鄰節點之間都存在一條邊。同一個節點在一條路徑序列中 至多出現一次 。該路徑 至少包含一個 節點&#xff0c;且不一定經過根節點。 路徑和 是路徑中各節點值的總和。 給你一個二叉樹的根節點 root &…

【Crypto】Rabbit

文章目錄 一、Rabbit解題感悟 一、Rabbit 題目提示很明顯是Rabbit加密&#xff0c;直接解 小小flag&#xff0c;拿下&#xff01; 解題感悟 提示的太明顯了

redis核心面試題二(實戰優化)

文章目錄 10. redis配置mysql實戰優化[重要]11. redis之緩存擊穿、緩存穿透、緩存雪崩12. redis實現分布式session 10. redis配置mysql實戰優化[重要] // 最初實現OverrideTransactionalpublic Product createProduct(Product product) {productRepo.saveAndFlush(product);je…

MQTT 5.0 報文解析 05:DISCONNECT

歡迎閱讀 MQTT 5.0 報文系列 的第五篇文章。在上一篇中&#xff0c;我們已經介紹了 MQTT 5.0 的 PINGREQ 和 PINGRESP 報文。現在&#xff0c;我們將介紹下一個控制報文&#xff1a;DISCONNECT。 在 MQTT 中&#xff0c;客戶端和服務端可以在斷開網絡連接前向對端發送一個 DIS…

手把手教你搭建一個花店小程序商城

如果你是一位花店店主&#xff0c;想要為你的生意搭建一個精美的小程序商城&#xff0c;以下是你將遵循的五個步驟。 步驟1&#xff1a;登錄喬拓云平臺進入后臺 首先&#xff0c;你需要登錄喬拓云平臺的后臺管理頁面。你可以在電腦或移動設備上的瀏覽器中輸入喬拓云的官方網站…

2024.5.26 機器學習周報

目錄 引言 Abstract 文獻閱讀 1、題目 2、引言 3、創新點 4、Motivation 5、naive Lite-HRNet 6、Lite-HRNet 7、實驗 深度學習 解讀SAM(Segment Anything Model) 1、SAM Task 2、SAM Model 2.1、Patch Embedding 2.2、Positiona Embedding 2.3、Transformer …

移動端適配:vw適配方案

vw (Viewport Width) 是一種長度單位&#xff0c;代表視口寬度的百分比。1vw 等于視口寬度的1%。在網頁設計和前端開發中&#xff0c;vw 單位常用于實現響應式設計和屏幕適配&#xff0c;尤其是針對不同尺寸和分辨率的移動設備。 為什么使用vw適配&#xff1f; 響應式: 使用v…

互聯網醫院開發:引領智慧醫療新時代

隨著科技的迅猛發展和互聯網的普及&#xff0c;傳統醫療模式正在迎來一場深刻的變革。互聯網醫院的崛起&#xff0c;打破了時間和空間的限制&#xff0c;為患者和醫療機構帶來了更加便捷、高效、安全的醫療服務體驗。本文將從技術角度深入探討互聯網醫院的開發&#xff0c;包括…

【openpcdet中yaml文件的DATA_AUGMENTOR學習】

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄 前言一、代碼二、詳細解釋DISABLE_AUG_LISTAUG_CONFIG_LIST1. gt_sampling2. random_world_flip3. random_world_rotation4. random_world_scaling 總結 前言 提示…

多線程(八)

一、wait和notify 等待 通知 機制 和join的用途類似,多個線程之間隨機調度,引入 wait notify 就是為了能夠從應用層面上,干預到多個不同線程代碼的執行順序.( 這里說的干預,不是影響系統的線程調度策略 內核里的線程調度,仍然是無序的. 相當于是在應用程序…

Pod容器資源限制和探針

目錄 一、資源限制 1.Pod和容器的資源請求和限制 2.CPU 資源單位 案例一 案例二 二、健康檢查&#xff0c;又稱為探針&#xff08;Probe&#xff09; 1.探針的三種規則 2.Probe支持三種檢查方法 3.探測獲得的三種結果 案例一&#xff1a;exec 案例二&#xff1a;htt…

OneMO同行 心級服務:中移物聯OneMO模組助力客戶終端寒冷環境下的穩定運行

中移物聯OneMO模組以客戶為中心&#xff0c;基于中國移動心級服務要求&#xff0c;開展“OneMO同行 心級服務 標定一流”高標服務主題活動&#xff0c;升級“服務內容““服務方式”和“服務意識”&#xff0c;為行業客戶提供全新的服務體驗。 近日&#xff0c;某車載監控設備…