sparksql的Transformation與 Action操作

Transformation操作

與RDD類似的操作

map、filter、flatMap、mapPartitions、sample、 randomSplit、 limit、

distinct、dropDuplicates、describe,而以上這些都是企業中比較常用的,這里在一個文件中統一論述

val df1 = spark.read.json("src/main/resources/people.json")
// 使用map去除某些字段
df1.map(row => row.getAs[Long](1)).withColumnRenamed("value","age").show()
//df1.map(row => row.getAs[String]("address")).show()
//df1.map(row => row.getString[String](0)).show()// randomSplit,按照數組中的權重將數據集劃分為不同的比例,可用于機器學習
val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7))
df2(0).count
df2(1).count
df2(2).count// 取10行數據生成新的DataSet
val df3 = df1.limit(5).show()// distinct,去重
val df4 = df1.union(df1)
df4.distinct.count// 這個方法,不需要傳入任何的參數,默認根據所有列進行去重,然后按數據行的順序保留每行數據出現的第一條。
df4.dropDuplicates.show
// 傳入的參數是一個序列。你可以在序列中指定你要根據哪些列的重復元素對數據表進行去重,然后也是返回每一行數據出現的第一條
//def dropDuplicates(colNames: Seq[String])
df4.dropDuplicates("name", "age").show
df4.dropDuplicates("name").show// 返回全部列的統計(count、mean、stddev、min、max)
df4.describe().show// 返回指定列的統計
df4.describe("age").show
df4.describe("name", "age").show
存儲相關

persist、checkpoint、unpersist、cache

備注:Dataset 默認的存儲級別是 MEMORY_AND_DISK

val df1 = spark.read.json("src/main/resources/people.json")
import org.apache.spark.storage.StorageLevel
spark.sparkContext.setCheckpointDir("src/main/resources/data/checkpoint")
df1.show()
df1.checkpoint()// 默認的存儲級別是MEMORY_AND_DISK
df1.cache()
df1.persist(StorageLevel.MEMORY_ONLY)
println(df1.count())
df1.unpersist(true
select相關

列的多種表示:select、selectExpr、drop、withColumn、withColumnRenamed、cast(內置函數)

import spark.implicits._
import org.apache.spark.sql.functions._
val df1 = spark.read.json("src/main/resources/people.json")
// 列的多種表示方法。使用'、""、$""、col()、df("")
// 注意:不要混用;必要時使用spark.implicitis._;并非每個表示在所有的地方都有效
df1.select('name, 'age, 'address).show
df1.select("name", "age", "address").show
df1.select($"name", $"age", $"address").show
df1.select(col("name"), col("age"), col("address")).show
df1.select(df1("name"), df1("age"), df1("address")).show// 下面的寫法無效并且會報錯
// df1.select("name", "age"+10, "address").show
// df1.select("name", "age+10", "address").show// 這樣寫才符合語法
df1.select($"name", $"age"+10, $"address").show
df1.select('name, 'age+10, 'address).show// 可使用expr表達式(expr里面只能使用引號)
df1.select(expr("name"), expr("age+100"), expr("address")).show
df1.selectExpr("name as ename").show
df1.selectExpr("power(age, 2)", "address").show
df1.selectExpr("round(age, -3) as newAge", "name", "address").show// drop、withColumn、 withColumnRenamed、casting
// drop 刪除一個或多個列,得到新的DF
df1.drop("name")
df1.drop("name", "age")// withColumn,修改列值
val df2 = df1.withColumn("age", $"age"+10)
df2.show// withColumnRenamed,更改列名
df1.withColumnRenamed("name", "ename")
// 備注:drop、withColumn、withColumnRenamed返回的是DF// 類型轉化的兩種方式
df1.selectExpr("cast(age as string)").printSchema
import org.apache.spark.sql.types._
df1.select('age.cast(StringType)).printSchema
?where 相關的
val df1 = spark.read.json("src/main/resources/people.json")
// 過濾操作
df1.filter("age>30").show
df1.filter("age>30 and name=='Tom'").show
// 底層調用的就是filter算子
df1.where("age>30").show
df1.where("age>30 and name=='Tom'").show
groupBy相關的

groupBy、agg、max、min、avg、sum、count(后面5個為內置函數)

import spark.implicits._
import org.apache.spark.sql.functions._
val df1 = spark.read.json("src/main/resources/people.json")// 內置的sum max min avg count
df1.groupBy("address").sum("age").show
df1.groupBy("address").max("age").show
df1.groupBy("address").min("age").show
df1.groupBy("address").avg("age").show
df1.groupBy("address").count.show// 類似having子句
df1.groupBy("address").avg("age").where("avg(age) > 20").show
df1.groupBy("address").avg("age").where($"avg(age)" > 20).show// agg
df1.groupBy("address").agg("age"->"max", "age"->"min", "age"->"avg", "age"->"sum", "age"->"count").show// 這種方式更好理解
df1.groupBy("address").agg(max("age"), min("age"), avg("age"),sum("age"), count("age")).show// 給列取別名
df1.groupBy("address").agg(max("age"), min("age"), avg("age"),sum("age"), count("age")).withColumnRenamed("min(age)","minAge").show// 給列取別名,最簡便
df1.groupBy("address").agg(max("age").as("maxAge"),min("age").as("minAge"), avg("age").as("avgAge"),sum("age").as("sumAge"), count("age").as("countAge")).show
?orderBy相關的
import spark.implicits.
val df1 = spark.read.json("src/main/resources/people.json")// sort,以下語句等價
df1.sort("age").show
df1.sort($"age").show
df1.sort($"age".asc).show
df1.sort($"age".desc).show
df1.sort(-$"age").show
df1.sort(-'age, -'name).show// orderBy,底層調用的還是sort
df1.orderBy("age").show
join相關的

目前?Apache?Spark?3.x 版本中,一共支持以下七種 Join 類型:

INNER JOIN

CROSS JOIN

LEFT OUTER JOIN

RIGHT OUTER JOIN

FULL OUTER JOIN

LEFT SEMI JOIN

LEFT ANTI JOIN

在實現上,這七種 Join 對應的實現類分別如下:

object JoinType {def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {case "inner" => Innercase "outer" | "full" | "fullouter" => FullOutercase "leftouter" | "left" => LeftOutercase "rightouter" | "right" => RightOutercase "leftsemi" | "semi" => LeftSemicase "leftanti" | "anti" => LeftAnticase "cross" => Crosscase _ =>val supported = Seq("inner","outer", "full", "fullouter", "full_outer","leftouter", "left", "left_outer","rightouter", "right", "right_outer","leftsemi", "left_semi", "semi","leftanti", "left_anti", "anti","cross")throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")}
}

準備數據

    // 準備數據val order = spark.sparkContext.parallelize(Seq((1, 101, 2500), (2, 102, 1110), (3, 103, 500), (4, 102, 400))).toDF("paymentId", "customerId", "amount")val customer = spark.sparkContext.parallelize(Seq((101, "ds"), (102, "ds_hadoop"), (103, "ds001"), (104, "ds002"), (105, "ds003"), (106, "ds004"))).toDF("customerId", "name")

order 表

customer表

?INNER JOIN

在 Spark 中,如果沒有指定任何 Join 類型,那么默認就是 INNER JOIN。INNER JOIN 只會返回滿足 Join 條件( join condition)的數據,這個在企業中用的應該比較多,具體如下:?

    // inner join// 單字段關聯customer.join(order,"customerId").show// 多字段關聯  Seq(“customerId”, “name”)customer.join(order,Seq("customerId")).show

執行結果

CROSS JOIN

這種類型的 Join 也稱為笛卡兒積(Cartesian Product),Join 左表的每行數據都會跟右表的每行數據進行 Join,產生的結果行數為 m*n,所以在生產環境下盡量不要用這種 Join。下面是 CROSS JOIN 的使用例子:

    // cross join// 笛卡爾積customer.crossJoin(order).show()// 如果兩張表出現相同的字段,可以使用下面的方式進去篩選  類似customer.name  order.amountcustomer.crossJoin(order).select(customer("name"), order("amount") ).show

執行1結果

執行2結果,只顯示select的字段

?LEFT OUTER JOIN

LEFT OUTER JOIN 等價于 LEFT JOIN,這個 Join 的返回的結果相信大家都知道,我就不介紹了。下面三種寫法都是等價的

    // 倆個表關聯字段名一致customer.join(order, Seq("customerId"), "left_outer").showcustomer.join(order, Seq("customerId"), "leftouter").showcustomer.join(order, Seq("customerId"), "left").showval order2 = spark.sparkContext.parallelize(Seq((1, 101, 2500), (2, 102, 1110), (3, 103, 500), (4, 102, 400))).toDF("paymentId", "custId", "amount")// 如果兩張表使用不同的字段進行關聯的話,要使用三等號即===customer.join(order2, $"customerId"===$"custId", "left").show

執行結果

RIGHT OUTER JOIN

和?LEFT OUTER JOIN 類似,RIGHT OUTER JOIN 等價于 RIGHT JOIN,下面三種寫法也是等價的:

    order.join(customer, Seq("customerId"), "right").showorder.join(customer, Seq("customerId"), "right_outer").showorder.join(customer, Seq("customerId"), "rightouter").show

FULL OUTER JOIN

FULL OUTER JOIN 的含義大家應該也都熟悉,會將左右表的數據全部顯示出來。FULL OUTER JOIN 有以下四種寫法:

    order.join(customer, Seq("customerId"), "outer").showorder.join(customer, Seq("customerId"), "full").showorder.join(customer, Seq("customerId"), "full_outer").showorder.join(customer, Seq("customerId"), "fullouter").show
LEFT SEMI JOIN

LEFT SEMI JOIN 只會返回匹配右表的數據,而且 LEFT SEMI JOIN 只會返回左表的數據,右表的數據是不會顯示的,下面三種寫法都是等價的

order.join(customer, Seq("customerId"), "leftsemi").show
order.join(customer, Seq("customerId"), "left_semi").show
order.join(customer, Seq("customerId"), "semi").show

LEFT SEMI JOIN 其實可以用 IN/EXISTS?來改寫

select * from order where customerId in (select customerId from customer)

LEFT ANTI JOIN

與?LEFT SEMI JOIN 相反,LEFT ANTI JOIN 只會返回沒有匹配到右表的左表數據。而且下面三種寫法也是等效的

order.join(customer, Seq("customerId"), "leftanti").show
order.join(customer, Seq("customerId"), "left_anti").show
order.join(customer, Seq("customerId"), "anti").show

LEFT SEMI JOIN 其實可以用 NOT?IN/EXISTS?來改寫

select * from order where customerId not in (select customerId from customer)

?集合相關的

union、unionAll、intersect、except

main{val lst = List(StudentAge(1, "Alice", 18),StudentAge(2, "Andy", 19),StudentAge(3, "Bob", 17),StudentAge(4, "Justin", 21),StudentAge(5, "Cindy", 20))val ds1 = spark.createDataset(lst)ds1.show()val rdd = spark.sparkContext.makeRDD(List(StudentHeight("Alice", 160),StudentHeight("Andy", 159),StudentHeight("Bob", 170),StudentHeight("Cindy", 165),StudentHeight("Rose", 160)))val ds2 = rdd.toDS// union、unionAll、intersect、except。集合的交、并、差val ds3 = ds1.select("name")val ds4 = ds2.select("sname")// union 求并集,不去重  去重使用distinctds3.union(ds4).show// 底層依舊調用的是unionds3.unionAll(ds4).show// intersect 求交ds3.intersect(ds4).show// except 求差ds3.except(ds4).show}// 定義第一個數據集case class StudentAge(sno: Int, name: String, age: Int)// 定義第二個數據集case class StudentHeight(sname: String, height: Int)

交集

差集

空值處理

na.fill、na.drop、na.replace、na.filter

    import spark.implicits._import org.apache.spark.sql.functions._val df1 = spark.read.json("src/main/resources/data/people.json")// NA表示缺失值,即“Missing value”,是“not available”的縮寫// 刪出含有空值的行df1.na.drop.show// 刪除某列的空值和nulldf1.na.drop(Array("age")).show// 對全部列填充df1.na.fill("NULL").show// 對指定單列填充;對指定多列填充df1.na.fill("NULL", Array("address")).showdf1.na.fill(Map("age" -> 0, "address" -> "NULL")).show// 對指定的值進行替換df1.na.replace(Array("address"), Map("NULL" -> "Shanghai")).na.replace(Array("age"), Map(0 -> 100)).show// 查詢空值列或非空值列。isNull、isNotNull為內置函數df1.where("address is null").showdf1.where($"address".isNull).showdf1.where(col("address").isNull).showdf1.filter("address is not null").showdf1.filter(col("address").isNotNull).show

?Action操作

與RDD類似的操作

show、 collect、 collectAsList、 head、 first、 count、 take、 takeAsList、 reduce

    // 隱式轉換import spark.implicits._// show:顯示結果,默認顯示20行,截取(true)spark.read.json("src/main/resources/data/people.json").show(100, false)val df = spark.read.json("src/main/resources/data/people.json")println(df.count())// 輸出數組arr   9df.collect().foreach(println)// 輸出listdf.collectAsList().forEach(println)// 輸出head3條 輸出數組1df.head(3).foreach(println)println(df.head(3))// 輸出第一條 head(1)println(df.first())// 底層調用的就是head  輸出數組3df.take(3).foreach(println)// 底層調用take,再調用headdf.takeAsList(3).forEach(println)
?獲取結構屬性的操作

printSchema、explain、columns、dtypes、col

    val df1 = spark.read.json("src/main/resources/data/people.json")// 結構屬性df1.columns.foreach(println) // 查看列名  address,age,namedf1.dtypes.foreach(println) // 查看列名和類型  (address,StringType) (age,LongType) (name,StringType)df1.explain() // 參看執行計劃println(df1.col("name")) // 獲取某個列df1.printSchema // 常用

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/898435.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/898435.shtml
英文地址,請注明出處:http://en.pswp.cn/news/898435.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

微軟Data Formulator:用AI重塑數據可視化的未來

在數據驅動的時代,如何快速將復雜數據轉化為直觀的圖表是每個分析師面臨的挑戰。微軟研究院推出的開源工具 Data Formulator,通過結合AI與交互式界面,重新定義了數據可視化的工作流。本文將深入解析這一工具的核心功能、安裝方法及使用技巧,助你輕松駕馭數據之美。 一、Dat…

20分鐘上手DeepSeek開發:SpringBoot + Vue2快速構建AI對話系統

20分鐘上手DeepSeek開發:SpringBoot Vue2快速構建AI對話系統 前言 在生成式AI技術蓬勃發展的今天,大語言模型已成為企業智能化轉型和個人效率提升的核心驅動力。作為國產大模型的優秀代表,DeepSeek憑借其卓越的中文語義理解能力和開發者友…

神經網絡中層與層之間的關聯

目錄 1. 層與層之間的核心關聯:數據流動與參數傳遞 1.1 數據流動(Forward Propagation) 1.2 參數傳遞(Backward Propagation) 2. 常見層與層之間的關聯模式 2.1 典型全連接網絡(如手寫數字分類&#xf…

本地部署deepseek-r1建立向量知識庫和知識庫檢索實踐【代碼】

目錄 一、本地部署DS 二、建立本地知識庫 1.安裝python和必要的庫 2.設置主目錄工作區 3.編寫文檔解析腳本 4.構建向量數據庫 三、基于DS,使用本地知識庫檢索 本地部署DS,其實非常簡單,我寫了一篇操作記錄,我終于本地部署了DeepSeek-R1(圖文全過程)-CSDN博客 安裝…

String、StringBuffer、StringBuiler的區別

可變性 String是不可變的,這是因為String內部用于存儲數據的char[]數組用了final關鍵字修飾,而且是private的,并且沒有對外提供修改數組的方法。 StringBuffer和StringBuilder是可變的,它們內部的char數組沒有用final關鍵字修飾。…

Certd自動化申請和部署SSL證書并配置https

服務器使用的華為云,之前SSL證書通過配置Cloudflare的DNS實現的,最近華為云備案提示需修改解析至境內華為云IP,若解析境外IP,域名無需備案,需注銷或取消接入備案信息,改為使用Certd自搭建證書管理工具&…

git tag以及git

git tag 以及git 一、先說收獲吧 1. git bash 在windows上 類似于linux的bash提供的shell命令行窗口,可以執行很多linux命令,cd pwd ls vim cat touch mkdir,還可以用正則匹配查看標簽。相當于在windows上裝了一個小的linux。git init myproj…

ESP8266通過AT指令配置雙向透傳

一、固件燒錄 IO0接地后上電,進入燒錄模式,燒錄完成后去掉即可 二、參數配置 1、服務器端 ATCWMODE_DEF2 ATCWSAP_DEF"ESP8266","12345678",5,3 ATSAVETRANSLINK1,"192.168.4.2",9090,"UDP",8080 2、客戶端…

【3D模型】【游戲開發】【Blender】Blender模型分享-獅頭木雕附導入方法

導入方法: [Blender] 如何導入包含紋理的 .blend 模型文件 在 3D 建模和渲染工作中,Blender 是一款功能強大的免費開源軟件。很多時候,我們需要導入 .blend 后綴的模型文件,同時確保紋理(textures)文件夾…

C# | 超簡單CSV表格讀寫操作(輕松將數據保存到CSV,并支持讀取還原)

C# | 超簡單CSV表格讀寫操作(輕松將數據保存到CSV,并支持讀取還原) 文章目錄 C# | 超簡單CSV表格讀寫操作(輕松將數據保存到CSV,并支持讀取還原)一、上位機開發中的CSV應用背景二、CSV讀寫實戰教學1. 基本對…

Git push后撤銷提交

一、介紹 當某次更改完工程后,push了本地倉庫到云端,但是發現有地方改錯了,想撤銷這次推送,或者某次提交就更改了很小一部分,想和本地這次修改的合并為一次推送,省的在云端顯示特別多次提交,顯得…

Unity導出WebGL,無法顯示中文

問題:中文無法顯示 默認字體無法顯示中文 在編輯器中設置了中文和英文的按鈕,中文按鈕無法顯示 導出后無法顯示中文 解決辦法: 自己添加字體,導入項目,并引用 示例 下載一個字體文件,這里使用的阿里…

閱讀《Vue.js設計與實現》 -- 02

接上一篇文章:閱讀《Vue.js設計與實現》 – 01 文章目錄 第二章提升用戶的開發體驗tips 控制框架代碼的體積Tree-Shaking副作用 框架應該輸出怎樣的構建產物?注意這兩個文件有什么區別? 特性開關如何實現? 處理錯誤TS支持 第二章 …

Mac:Ant 下載+安裝+環境配置(詳細講解)

📌 下載 Ant 下載地址:https://ant.apache.org/bindownload.cgi 📌 無需安裝 Apache官網下載 Ant 壓縮包,無需安裝,下載解壓后放到自己指定目錄下即可。 按我自己的習慣,我會在用戶 jane 目錄下新建了個…

qt圖表背景問題

從代碼來看,這段代碼涉及到設置背景透明度和背景可見性的操作,主要是在一個基于Qt框架的圖形界面程序中對某個圖表控件(fontChart)和視圖控件(fontChartView)進行操作。以下是每行代碼的作用以及它們之間的…

藍橋杯國賽子串2023動態規劃,暴力

#include <bits/stdc.h> using namespace std; // string ss; #define int long long string s; //該方法通過動態規劃&#xff0c;找到2023字串&#xff0c;而2023等于202加3&#xff0c;202等于202&#xff0c;20等于20&#xff1b; int f2() {int dp[4]{0};//dp[0]代表…

uni-app——網絡API

uni-app 網絡API 在 uni-app 開發中&#xff0c;網絡請求是獲取數據與和服務器交互的重要手段。以下介紹 uni-app 中常見的網絡 API&#xff0c;包括發起請求、上傳和下載以及 WebSocket、UDP 通信等方面。 發起請求 在 uni-app 里&#xff0c;使用uni.request(OBJECT)來發起…

計算機網絡筆記再戰——理解幾個經典的協議HTTP章3

理解幾個經典協議——HTTP章3 返回結果的HTTP狀態碼 ? 我們知道&#xff0c;ICMP可以傳遞IP通信時候的狀態如何。HTTP雖然沒有輔助的解析&#xff0c;但是它可以使用狀態碼來表達我們的HTTP請求的結果&#xff0c;標記服務器端的處理是否正常、通知出現的錯誤等工作。這就是…

國產編輯器EverEdit - Hex Dump插件:看到文本的另一面!

1 Hex Dump插件 1.1 應用場景 有時可能需要顯示字母的ASCII編碼&#xff0c;或其他文字的字節編碼&#xff0c;可以使用Hex Dump插件來完成 1.2 使用方法 安裝Hex Dump插件&#xff0c;安裝插件方法參考&#xff1a;擴展管理 在編輯器中選中文本&#xff0c;選擇擴展 -> …

《駕馭MXNet:深度剖析分布式深度學習訓練的高效之道》

在深度學習的迅猛發展進程中&#xff0c;模型的規模和復雜性持續攀升&#xff0c;對計算資源的需求也愈發苛刻。單機訓練在面對大規模數據集和復雜模型結構時&#xff0c;常常顯得力不從心。分布式深度學習訓練成為解決這一困境的關鍵途徑&#xff0c;而MXNet作為一款強大的開源…