Spark-SQL概述:是Spark用于結構化數據處理的模塊,前身是Shark。Shark基于Hive開發,使SQL-on-Hadoop性能大幅提升,但對Hive依賴制約了Spark發展。SparkSQL汲取Shark優點并重新開發,在數據兼容、性能優化和組件擴展上優勢顯著。2014年6月,Shark停止開發,發展出SparkSQL和Hive on Spark兩個支線。
Spark-SQL特點:易整合,能無縫整合SQL查詢和Spark編程;統一數據訪問,以相同方式連接不同數據源;兼容Hive,可在已有倉庫運行SQL或HQL;支持標準數據連接,能通過JDBC或ODBC連接。
DataFrame介紹:是基于RDD的分布式數據集,類似二維表格,與RDD區別在于帶有schema元信息,支持嵌套數據類型,API更友好,性能比RDD高,因為查詢計劃經Spark catalyst optimiser優化。
DataSet介紹:是分布式數據集合,是DataFrame的擴展。兼具RDD強類型、使用lambda函數的能力和Spark SQL優化執行引擎的優點,用樣例類定義結構信息,是強類型的,DataFrame是DataSet的特例(DataFrame = DataSet[Row]) ,可通過?as?方法相互轉換。
SparkSession:是Spark最新的SQL查詢起始點,它整合了SQLContext和HiveContext的功能,內部封裝了SparkContext,負責實際的計算。在spark-shell中,系統會自動創建名為spark的SparkSession對象。
DataFrame
創建方式:可通過Spark的數據源創建
SQL語法查詢:使用SQL語法風格查詢數據時,需要借助臨時視圖或全局視圖。先讀取JSON文件創建DataFrame,如?val df1 = spark.read.json("data/user.json")?;接著對DataFrame創建臨時表?df1.createOrReplaceTempView("people")?,之后就能通過SQL語句查詢,如?val sqlDF = spark.sql("select * from people")? ,并使用?sqlDF.show?展示結果。創建全局表時,首次運行可能報錯,需將hive-site.xml文件復制到spark的conf路徑下(最好把整個hive目錄放在本地文件系統中),完成配置后創建全局表?df1.createGlobalTempView("people1")?,可通過?spark.sql("SELECT * FROM global_temp.people1").show()?等語句查詢展示數據。
DataFrame的DSL語法:DataFrame提供DSL管理結構化數據,使用時無需創建臨時視圖。操作包括創建DataFrame;選取特定列,如?df.select("username").show()?;進行列運算?df.select($"username",$"age" + 1).show?;篩選數據?df.filter($"age">18).show?;按列分組統計?df.groupBy("age").count.show? 。
2.?RDD轉換為DataFrame:在IDEA開發時,RDD與DataFrame或DataSet相互操作需引入?import spark.implicits._? (spark為SparkSession對象變量名,且必須用val修飾),spark-shell中自動導入。可直接將RDD轉為DataFrame?;實際開發常借助樣例類轉換,定義?case class User(name:String, age:Int)? 后,通過?sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show?實現轉換 。
DataFrame轉換為RDD:DataFrame可直接獲取內部RDD?。獲取的RDD存儲類型為Row,可通過?collect?方法收集數據,如?val array = rdd.collect?,還能通過?getAs?方法按列名獲取數據。
DataSet操作
創建DataSet:可使用樣例類序列,如定義?case class Person(name: String, age: Long)?后,通過?Seq(Person("zhangsan",2)).toDS()?創建;也能用基本類型序列創建,如?Seq(1,2,3,4,5).toDS? ,但實際更多從RDD獲取DataSet。
RDD與DataSet相互轉換:包含case類的RDD能自動轉為DataSet,如?sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS?;DataSet可直接獲取內部RDD,如?val rdd = res3.rdd? 。
DataFrame和DataSet轉換:DataFrame是DataSet的特例,二者可相互轉換。DataFrame轉DataSet,定義樣例類后用?as?方法,如?val ds = df.as[User]?;DataSet轉DataFrame使用?toDF?方法,如?val df = ds.toDF?。
RDD、DataFrame、DataSet關系
版本產生順序:Spark1.0推出RDD,Spark1.3出現DataFrame,Spark1.6引入DataSet 。
共性:都是分布式彈性數據集,有惰性機制、共同函數,操作需?import spark.implicits._?,會自動緩存運算,都有分區概念,DataFrame和DataSet可模式匹配獲取字段信息。
區別:RDD常與spark mllib使用,不支持sparksql操作;DataFrame每行類型為Row,訪問列值需解析,支持SparkSQL操作和便捷保存方式;DataSet與DataFrame成員函數相同,每行數據類型自定義,獲取行信息更自由。三者可相互轉換。
?
?