在上一篇文章中已經大致說明了DataFrame APi,下面我們具體介紹DataFrame DSL的使用。DataFrame DSL是一種命令式編寫Spark SQL的方式,使用的是一種類sql的風格語法。
文章鏈接:
一、單詞統計案例引入
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]): Unit = {/*** 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/*** spark sql和spark core的核心數據類型不太一樣** 1、讀取數據構建一個DataFrame,相當于一張表*/val linesDF: DataFrame = sparkSession.read.format("csv") //指定讀取數據的格式.schema("line STRING") //指定列的名和列的類型,多個列之間使用,分割.option("sep", "\n") //指定分割符,csv格式讀取默認是英文逗號.load("spark/data/words.txt") // 指定要讀取數據的位置,可以使用相對路徑/*** DSL: 類SQL語法 api 介于代碼和純sql之間的一種api** spark在DSL語法api中,將純sql中的函數都使用了隱式轉換變成一個scala中的函數* 如果想要在DSL語法中使用這些函數,需要導入隱式轉換**///導入Spark sql中所有的sql隱式轉換函數import org.apache.spark.sql.functions._//導入另一個隱式轉換,后面可以直接使用$函數引用字段進行處理import sparkSession.implicits._// linesDF.select(explode(split($"line","\\|")) as "word")
// .groupBy($"word")
// .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/*** 保存數據*/resultDF.repartition(1).write.format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}
注意:show()可以指定兩個參數,第一個參數為展現的條數,不指定默認展示前20條數據,第二個參數默認為false,代表的是如果數據過長展示就會不完全,可以指定為true,使得數據展示完整,比如 : show(200,truncate = false)
二、數據源獲取
查看官方文檔:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多種數據源的獲取。
?1、csv-->json
val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("多種類型數據源讀取演示").config("spark.sql.shuffer.partitions", 1) //指定分區數為1,默認分區數是200個.getOrCreate()//導入spark sql中所有的隱式轉換函數import org.apache.spark.sql.functions._//導入sparkSession下的所有隱式轉換函數,后面可以直接使用$函數引用字段import sparkSession.implicits._/*** 讀csv格式的文件-->寫到json格式文件中*///1500100967,能映秋,21,女,文科五班val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age Int,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")studentsDF.write.format("json").mode(SaveMode.Overwrite).save("spark/data/students_out_json.json")
2、json-->parquet
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分區數為1,默認分區數是200個.getOrCreate()//導入spark sql中所有的隱式轉換函數//導入sparkSession下的所有隱式轉換函數,后面可以直接使用$函數引用字段/*** 讀取json數據格式,因為json數據有鍵值對,會自動的將健作為列名,值作為列值,不需要手動的設置表結構*///1500100967,能映秋,21,女,文科五班//方式1:// val studentsJsonDF: DataFrame = sparkSession.read// .format("json")// .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")//方式2:實際上也是調用方式1,只是更簡潔了// def json(paths: String*): DataFrame = format("json").load(paths : _*)val studebtsReadDF: DataFrame = sparkSession.read.json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")studebtsReadDF.write.format("parquet").mode(SaveMode.Overwrite).save("spark/data/students_parquet")
3、parquet-->csv
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分區數為1,默認分區數是200個.getOrCreate()//導入Spark sql中所有的sql隱式轉換函數import org.apache.spark.sql.functions._//導入另一個隱式轉換,后面可以直接使用$函數引用字段進行處理import sparkSession.implicits._/*** parquet:壓縮的比例由信息熵決定,通俗的說就是數據的重復程度決定*/val studebtsReadDF: DataFrame = sparkSession.read.format("parquet").load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")studebtsReadDF.write.format("csv").mode(SaveMode.Overwrite).save("spark/data/students_csv")
三、DataFrame DSL API的使用
1、select
import org.apache.spark.sql.{DataFrame, SparkSession}object Demo1Select {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("select函數演示").getOrCreate()//導入Spark sql中所有的sql隱式轉換函數import org.apache.spark.sql.functions._//導入另一個隱式轉換,后面可以直接使用$函數引用字段進行處理import sparkSession.implicits._val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age String,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")/*** select函數*///方式1:只能查詢原有字段,不能對字段做出處理,比如加減、起別名之類studentsDF.select("id", "name", "age")//方式2:彌補了方式1的不足studentsDF.selectExpr("id","name","age+1 as new_age")//方式3:使用隱式轉換函數中的$將字段變為一個對象val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")//3.1使用對象對字段進行處理
// stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show() //不可使用未變為對象的字段stuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age") // +是函數,可以等價于該語句//3.2可以在select中使用sql函數studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))}
}
2、where
/*** where函數:過濾數據*///方式1:直接將sql中的where語句以字符串形式傳參studentsDF.where("clazz='文科一班' and gender='男'")//方式2:使用$列對象形式過濾/*** 注意在此種方式下:等于和不等于符號與我們平常使用的有所不同* 等于:===* 不等于:=!=*/studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()
3、groupBy和agg
/*** groupby:分組函數 agg:聚合函數* 注意:* 1、groupby與agg函數通常都是一起使用* 2、分組聚合之后的結果DataFrame中只會包含分組字段與聚合字段* 3、分組聚合之后select中無法出現不是分組的字段*///需求:根據班級分組,求每個班級的人數和平均年齡studentsDF.groupBy($"clazz").agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age").show()
4、join
/*** 5、join:表關聯*/val subjectDF1: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id String,subject_id String,score Int").load("spark/data/score.csv")val subjectDF2: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("sid String,subject_id String,score Int").load("spark/data/score.csv")//關聯場景1:所關聯的字段名字一樣studentsDF.join(subjectDF1,"id")//關聯場景2:所關聯的字段名字不一樣studentsDF.join(subjectDF2,$"id"===$"sid","inner")
// studentsDF.join(subjectDF2,$"id"===$"sid","left").show()/*** 上面兩種關聯場景默認inner連接方式(內連接),可以指定參數選擇連接方式,比如左連接、右連接、全連接之類* * @param joinType Type of join to perform. Default `inner`. Must be one of:* * `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,* * `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.*/
5、開窗
/*** 開窗函數* 1、ROW_NUMBER():為分區中的每一行分配一個唯一的序號。序號是根據ORDER BY子句定義的順序分配的* 2、RANK()和DENSE_RANK():為分區中的每一行分配一個排名。RANK()在遇到相同值時會產生間隙,而DENSE_RANK()則不會。**///需求:統計每個班級總分前三的學生val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")//方式1:在select中使用row_number() over Window.partitionBy().orderBy()stu_scoreDF.groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank").where($"score_rank" <= 3)//方式2:使用withcolumn()函數,會新增一列,但是要預先指定列名stu_scoreDF.repartition(1).groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc)).where($"score_rank" <= 3).show()
注意:
? ? ? DSL API 不直接對應 SQL 的關鍵字執行順序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照構建邏輯查詢的方式來組織代碼,使其與 SQL 查詢的邏輯結構相似。
在構建 Spark DataFrame 轉換和操作時,常用流程介紹:
- 選擇數據源:使用?
spark.read
?或從其他 DataFrame 派生。 - 轉換:使用各種轉換函數(如?
select
、filter
、map
、flatMap
、join
?等)來修改 DataFrame。 - 聚合:使用?
groupBy
?和聚合函數(如?sum
、avg
、count
?等)對數據進行分組和匯總。 - 排序:使用?
orderBy
?或?sort
?對數據進行排序。 - 輸出:使用?
show
、collect
、write
?等函數將結果輸出到控制臺、收集到驅動程序或寫入外部存儲。
四、RDD與DataFrame的轉換
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object RddToDf {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("Rdd與Df之間的轉換").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._val sparkContext: SparkContext = sparkSession.sparkContextval idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv").map(_.split(",")).map {case Array(id: String, name: String, _, _, _) => (id, name)}/*** Rdd-->DF* 因為在Rdd中不會存儲文件的結構(schema)信息,所以要指定字段*/val idNameDF: DataFrame = idNameRdd.toDF("id", "name")idNameDF.createOrReplaceTempView("idNameTb")sparkSession.sql("select id,name from idNameTb").show()/*** DF-->Rdd*/val idNameRdd2: RDD[Row] = idNameDF.rddidNameRdd2.foreach(println)}
}