本節課學習了spark連接hive數據,在 spark-shell 中,可以看到連接成功
將依賴放進pom.xml中
運行代碼
創建文件夾 spark-warehouse?
為了使在 node01:50070 中查看到數據庫,需要添加如下代碼,就可以看到新創建的數據庫 spark-sql_1
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject HiveSupport {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("hql")val spark = SparkSession.builder().enableHiveSupport().config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse").config(sparkConf).getOrCreate()if (!spark.catalog.databaseExists("spark_sql_1")) {spark.sql("create database spark_sql_1")}spark.sql("use spark_sql_1")// 創建表spark.sql("""|create table json(data string)|row format delimited|""".stripMargin)spark.sql("load data local inpath 'Spark-SQL/input/movie.txt' into table json")spark.sql("select * from json").show()spark.sql("""|create table movie_info|as|select get_json_object(data,'$.movie') as movie,|get_json_object(data,'$.uid') as uid|from json|""".stripMargin)spark.sql("select * from movie_info").show()spark.stop()}
}
可以使用提取數據
運行結果
實驗報告
將數據放進input中,并運行如下代碼,用于輸出統計有效數據條數及用戶數量最多的前二十個地址
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject uid {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("JsonDataAnalysis")val spark = SparkSession.builder().enableHiveSupport().config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse").config(sparkConf).getOrCreate()// 使用spark-sql_1數據庫spark.sql("use spark_sql_1")// 創建表用于存儲原始JSON數據-uidspark.sql("""|create table if not exists data_set(data string)|row format delimited|""".stripMargin)// 加載json數據到uid表中spark.sql("load data local inpath 'D:/school/workspace/workspace-IJ/Spark/Spark-SQL/input/user_login_info.json' into table data_set")// 判斷filter_data表是否存在,若存在則刪除(可根據實際需求調整此處邏輯,比如不刪除直接使用等)if (spark.catalog.tableExists("spark_sql_1.filter_data")) {spark.sql("drop table filter_data")}// 篩選數據(不是null的)并創建filter_data表spark.sql("""|create table filter_data|as|select| get_json_object(data, '$.uid') as uid,| get_json_object(data, '$.phone') as phone,| get_json_object(data, '$.addr') as addr|from| data_set|where| get_json_object(data, '$.uid') is not null| and get_json_object(data, '$.phone') is not null| and get_json_object(data, '$.addr') is not null|""".stripMargin)// 統計有效數據條數val validDataCount = spark.sql("select count(*) from filter_data").collect()(0)(0).toString.toLongprintln(s"有效數據條數: $validDataCount")// 統計每個地址的用戶數量并排序,取前20spark.sql("""|select| addr,| count(*) as user_count|from| filter_data|group by| addr|order by| user_count desc|limit 20|""".stripMargin).show()spark.stop()}
}
運行結果: