文章目錄
- SparkSQL與Hive的整合
- 1.1. Spark On Hive
- 1.1.1. Hive的準備工作
- 1.1.2. Spark的準備工作
- 1.1.3. Spark代碼開發
- 1.1.4. Spark On Hive案例
- 1.2. Hive On Spark
- 1.3. SparkSQL命令行
- 1.4. SparkSQL分布式查詢引擎
- 1.4.1. 開啟ThriftServer服務
- 1.4.2. beeline連接ThriftServer
- 1.4.3. 代碼連接
- 1.4.4. 任務查看
SparkSQL與Hive的整合
1.1. Spark On Hive
SparkSQL其實就是一個Spark框架下的執行引擎,可以對結構化的數據使用SQL的方式,將SQL翻譯成為SparkCore的代碼去完成計算。SparkSQL支持不同的數據源,可以讀取各種數據文件的數據、可以通過JDBC讀取MySQL的數據,在實際開發過程中,有時候我們需要使用SparkSQL去處理Hive中的數據。這就是SparkSQL與Hive的整合方式之一:Spark On Hive。
其實Spark只是一個計算引擎,本身是沒有元數據管理的功能的。而我們在前面使用到的無論是DSL風格的處理方式,還是SQL風格的處理方式,所謂的“元數據”、“表”,其實都是向DataFrame注冊的。DataFrame中記錄了“表”、“字段”、“類型”等信息,就可以將SQL語句解析成為Spark程序來運行了。
但是Hive不同,Hive本身就是有一個元數據庫(MetaStore)的,因此我們需要使用SparkSQL處理Hive的數據的時候,無需再注冊表、注冊字段等信息,直接從Hive的元數據庫(MetaStore)中獲取元數據即可。
1.1.1. Hive的準備工作
-
配置Hive的元數據服務:修改hive的配置文件
hive-site.xml
<!-- 配置Hive的MetaStore服務,使用thrift協議,設置好主機名和端口號 --> <property><name>hive.metastore.uris</name><value>thrift://qianfeng01:9083</value> </property>
-
啟動Hive的元數據服務
# 開啟Hive的metastore服務 # 這種方式開啟的服務是一個前臺進程,不方便使用 hive --service metastore# 開啟Hive的metastore服務,并設置為后臺進程 # 這種方式開啟的元數據服務是后臺進程,方便交互了,但是不方便查看日志,并且隨著session的退出,服務會中斷 hive --service metastore &# 啟動后臺進程,將日志輸出到指定位置 nohup hive --service metastore > /var/log/metastore.log 2>&1 &
1.1.2. Spark的準備工作
-
在spark的conf目錄下,創建hive-site.xml文件,存放連接到hive的配置信息
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><property><name>hive.metastore.warehouse.dir</name><value>/user/hive/warehouse</value></property><property><name>hive.metastore.uris</name><value>thrift://qianfeng01:9083</value></property> </configuration>
Spark程序在運行的時候,相關的配置信息的加載次序:
- 首先加載
conf
目錄下的配置文件。 - 再加載代碼中進行的配置。
其實只需要讓SparkSQL程序知道metastore服務在哪里就可以了,如果不配置上面的這個文件也可以,不過就需要在代碼中配置了。為了避免每一次在寫程序的時候,都在代碼里面去配置,簡單起見,就直接創建這個文件,將連接到Hive元數據服務的配置都放進去。這樣每次Spark程序在啟動的時候,都可以自動的加載到。
- 首先加載
-
準備MySQL的驅動包
因為Hive的元數據保存到了MySQL數據庫,Spark SQL程序如果需要訪問的話,肯定需要從MySQL數據庫中讀取元數據信息。此時就必須要這個jar包了。
將準備好的mysql-connector-java-8.0.26.jar文件存放到spark的jars目錄下。
注意:
- 如果需要運行本地模式,那么本地的Spark的jars目錄下需要存放有這個jar包。
- 如果需要運行集群模式,那么集群中的Spark的jars目錄下需要存放有這個jar包。
1.1.3. Spark代碼開發
# @Author : 千鋒大數據教研院
# @Company : 北京千鋒互聯科技有限公司from pyspark.sql import SparkSession# 這里的 .enableHiveSupport() 表示的就是打開Hive支持,此時就可以訪問到Hive的數據了。
# 注意:
# 如果沒有在spark的conf目錄下面創建hive-site.xml并正確的設置hive的元數據服務
# 那么在創建SparkSession對象的時候,就必須要設置hive的元數據服務信息
# .config("spark.sql.warehouse.dir", "hdfs://qianfeng01:9820/user/hive/warehouse")
# .config("hive.metastore.uris", "thrift://qianfeng01:9083")
spark = SparkSession.builder\.master("local[*]")\.appName("hive-enable")\.enableHiveSupport()\.getOrCreate()# spark.sql("select * from mydb.emp").show()
spark.sql("select * from mydb.emp join mydb.dept on mydb.emp.deptno = mydb.dept.deptno;").show()spark.stop()
1.1.4. Spark On Hive案例
基本的Spark On Hive的程序就編寫完成了。我們也可以結合之前的內容,整合其他的數據源與Hive配合使用
在Hive中有一張表,存儲了用戶的名字與身份證號。讀取這個表中的數據,通過身份證號解析出生日、性別、年齡等信息,并將結果保存到Hive中。
"""
需求: 從 Hive 的mydb.users表中通過身份證號,解析出用戶的生日、年齡、性別信息,并將結果存入到一個新的表中
res:usernameidcardphonebirthdayagegender
create table if not exists mydb.res(username string,idcard string,phone string,birthday string,age string,gender string
)
row format delimited
fields terminated by ','
"""import os
import re
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import MapType, StringTypeos.environ.setdefault("HADOOP_USER_NAME", "root")def calculate_age(year, month, day) -> int:now = datetime.datetime.now()age = now.year - yearif now.month < month:age -= 1elif now.month == month and now.day < day:age -= 1return agedef parse_idcard(idcard: str) -> dict:# 1. 驗證身份證號碼是否合法m = re.fullmatch(r'(\d{6})'r'(?P<year>(19|20)\d{2})'r'(?P<month>0[1-9]|1[0-2])'r'(?P<day>[012][0-9]|10|20|30|31)'r'\d{2}'r'(?P<gender>\d)'r'[0-9xX]', idcard)if m is None:return {}# 2. 解析每一部分year = m.group('year')month = m.group('month')day = m.group('day')age = calculate_age(int(year), int(month), int(day))gender = '男' if int(m.group('gender')) % 2 != 0 else '女'birthday = '-'.join([year, month, day])return {"birthday": birthday, "age": age, "gender": gender}with SparkSession.builder.master("local[*]").appName("exercise").enableHiveSupport().getOrCreate() as spark:# 注冊 UDF 函數spark.udf.register("parse_idcard", parse_idcard, MapType(StringType(), StringType()))# 查詢數據res = spark.sql("""selectusername, idcard,phone,parse_idcard(idcard)['birthday'] as birthday,parse_idcard(idcard)['age'] as age,parse_idcard(idcard)['gender'] as genderfrom mydb.users""")# 將查詢結果寫出到 Hive 指定的表中,這個表需要提前存在res.write.insertInto("mydb.res")
1.2. Hive On Spark
其實Hive On Spark的意思就是,將Hive的底層計算引擎替換成Spark!Hive默認的計算引擎是MapReduce,而這個是可以替換的。只需要使用set hive.execution.engine=spark
即可完成替換,同時需要指定Spark的Master。
# 使用Hive On Spark非常簡單
# 只要用set hive.execution.engine命令設置Hive的執行引擎為spark即可
# 默認是mr
set hive.execution.engine=spark;
# 這里,是完全可以將其設置為Spark Master的URL地址的
set spark.master=spark://192.168.10.101:7077
# 注意上面這種配置是只適用于匹配的版本才可以,如果高版本的話現在是沒有這種功能的,需要自行編譯
# 參考官方文檔:https://cwiki.apache.org//confluence/display/Hive/Hive+on+Spark:+Getting+Started
但是需要注意,HiveOnSpark并不是適合所有場景的,因為Spark是內存計算的計算引擎,需要消耗大量的內存資源,不利于其他程序的計算應用。因此需要使用Spark來處理Hive的數據的時候,SparkOnHive是一個比較常見的選擇。
1.3. SparkSQL命令行
在Spark的bin目錄下,有一個腳本文件spark-sql
,這個腳本文件會啟動一個命令交互界面,可以使得我們在命令行上直接使用Spark來操作Hive的數據。
在3.3.1.章節的部分,已經在spark的conf
目錄下面創建出來一個hive-site.xml
文件,其中定義了hive的元數據相關的信息,這樣我們就可以直接使用了。
1.4. SparkSQL分布式查詢引擎
在Spark中有一個服務是ThriftServer服務,通過這個服務,用戶可以通過JDBC連接ThriftServer來訪問SparkSQL的數據。連接后可以直接通過編寫SQL語句訪問SparkSQL的數據。在配置ThriftServer的時候,至少需要配置ThriftServer的主機名和端口號,如果需要使用Hive的數據的話,還需要再提供Hive的Metastore的URIs。
如果你前面已經配置完成了Spark On Hive,那么在你的Spark的conf目錄下已經存在了一個文件:hive-site.xml,在這個文件中已經配置好了Hive的Metastore的URIs了。
1.4.1. 開啟ThriftServer服務
$SPARK_HOME/bin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10000\
--hiveconf hive.server2.thrift.bind.host=qianfeng01\
--master local[*]
這里的
--master
可以設置為local模式、Standalone模式或者YARN模式。
1.4.2. beeline連接ThriftServer
ThriftServer服務啟動之后,默認件監聽的是10000端口,我們可以使用一些客戶端工具來連接到這個服務。例如beeline。
1.4.3. 代碼連接
如果需要需要使用ThriftServer連接到SparkSQL,進而操作Hive的數據的話,我們需要安裝Hive的依賴。
pip3 install pyhive
# @Author : 千鋒大數據教研院
# @Company : 北京千鋒互聯科技有限公司from pyhive import hive# 通過Spark ThriftServer,創建到Hive的連接對象,
conn = hive.Connection(host="qianfeng01", port=10000, username="root", database="mydb")
# 創建一個光標對象,用來操作hive
cursor = conn.cursor()with conn, cursor:# 執行SQL語句cursor.execute("select * from emp join dept on emp.deptno = dept.deptno")result = cursor.fetchall()for r in result:print(r)
1.4.4. 任務查看
ThriftServer提交到Spark的任務,我們可以通過http://192.168.10.101:4040/jobs/來查看到。