前面的課程我們學習了如何從csv文件中讀入數據,這相當于是對csv這種類型的數據的操作。那么接下來,我們一起看看,如何寫Spark程序來操作mysql數據庫。先來給大家介紹一下我們這節課的主要學習內容:
(1)安裝mysql數據庫。
(2)寫Spark程序連接mysql數據庫并進行讀寫操作。
(一)準備mysql環境
我們計劃在hadoop001這臺設備上安裝mysql服務器,(當然也可以重新使用一臺全新的虛擬機)。
以下是具體步驟:
- 使用finalshell連接hadoop001.
- 查看是否已安裝MySQL。命令是: rpm -qa|grep mariadb
若已安裝,需要先做卸載MySQL的操作命令是:rpm -e --nodeps mariadb-libs
- 把mysql的安裝包上傳到虛擬機。
- 進入/opt/software/目錄,解壓上傳的.tar文件。
cd /opt/software
解壓文件
tar -xf mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar
得到的效果如下
- 安裝工具包
yum install -y ??perl ??perl-Data-Dumper ??perl-Digest-MD5 ??net-tools libaio
如果安裝成功,或者顯示以下內容,即可繼續安裝步驟:
軟件包 libaio-0.3.109-13.el7.x86_64 已安裝并且是最新版本
無須任何處理
6.安裝mysql。依次輸入以下5條命令:
rpm -ivh mysql-community-common-5.7.28-1.el7.x86_64.rpm
rpm -ivh mysql-community-libs-5.7.28-1.el7.x86_64.rpm
rpm -ivh mysql-community-libs-compat-5.7.28-1.el7.x86_64.rpm
rpm -ivh mysql-community-client-5.7.28-1.el7.x86_64.rpm
rpm -ivh mysql-community-server-5.7.28-1.el7.x86_64.rpm
命令說明:
rpm -ivh 是 Linux 中用于安裝 RPM(Red Hat Package Manager)軟件包的命令。具體來說,rpm -ivh 中的每個選項都有特定的含義:
rpm:RPM 包管理工具,用于安裝、查詢、驗證、更新和刪除軟件包。 ?
-i:表示安裝(install)軟件包。 ?
-v:表示顯示詳細(verbose)信息,提供更多安裝過程中的輸出信息。 ?
-h:表示在安裝過程中顯示進度條,以 # 符號表示安裝進度。
- 初始化數據庫
使用的命令是: mysqld --initialize --user=mysql
- 查看臨時密碼
安裝完成之后,它會在一個日志文件中保存臨時密碼,通過cat命令來查看這個密碼。具體的操作是:cat /var/log/mysqld.log
將臨時密碼復制,或者暫時存到某處
- 啟動MySQL服務。對應的命令是:systemctl start mysqld
- 登錄MySQL數據庫。對應的命令是:mysql -uroot -p
- 輸入臨時密碼。此時會要求輸入密碼。
Enter password: 臨時密碼。注意,在輸入密碼的過程中,密碼并不可見。
- 登陸成功后,修改密碼為000000。初始密碼太難記了,我們先修改一下密碼。對應的命令如下:
mysql> set password = password("000000");
- 使root允許任意ip連接
mysql> update mysql.user set host='%' where user='root';
mysql> flush privileges;
- 查看已有的數據庫。通過命令:show databases;
- (二)創建數據庫和表
-
接下來,我們去創建一個新的數據庫,數據表,并插入一條數據。
參考代碼如下:
-- 創建數據庫
CREATE DATABASE spark;
-- 使用數據庫USE spark;
-- 創建表
create table person(id int, name char(20), age int);
-- 插入示例數據
insert into person values(1, 'jam', 20), (2,'judi', 21);
-- 查看所有數據
select * from person;
-- 退出
quit
?
提醒:use spark;的作用是使用當前數據庫;
(三)Spark連接MySQL數據庫
- 新建項目,或者使用之前的項目也可以。
- 修改pom.xml文件。
【教師講解這個三個依賴的作用,強調可以刪除spark-core這個包】
補充三個依賴:
(1)scala-library 是 Scala 語言的基礎庫,是編寫 Scala 程序的必要條件。
(2)spark-sql_2.12 提供了 Spark SQL 的功能,用于高效的數據處理和分析。
(3)mysql-connector-java 提供了與 MySQL 數據庫交互的能力。
<dependency>
????????????<groupId>org.scala-lang</groupId>
????????????<artifactId>scala-library</artifactId>
????????????<version>2.12.15</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.apache.spark</groupId>
????????????<artifactId>spark-sql_2.12</artifactId>
????????????<version>3.3.1</version>
?????????</dependency>
????????<dependency>
????????????<groupId>mysql</groupId>
????????????<artifactId>mysql-connector-java</artifactId>
????????????<version>8.0.33</version>
????????</dependency>
?
?
請注意,這里并沒沒有單獨添加spark_core的依賴,因為在spark-sql中已經包含了spark_core。
- 寫Spark程序連接mysql
-
核心步驟:
- 創建Properties對象,設置數據庫的用戶名和密碼
- 使用spark.read.jbdc方法,連接數據庫
-
import org.apache.spark.sql.SparkSessionobject SparkSQL01 {/*sparksql DataFrame:一種分布式的數據集,類似于二維表格(mysql 數據庫中的數據表)1.讀入現成的結構化數據:csv 數據庫 json2.通過RDD轉換而來DataFrame:1.調用API來實現相關功能2.調用SQL語句來實現相關功能*/def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkSQL01").master("local[*]").getOrCreate()//讀入文件val df = spark.read.option("header", "true").csv("input/ecommerce_data.csv") // // //選出quantity > 5 的記錄 // val df1 = df.filter(df("quantity") > 5)// df1.show()//選出 quantity > 5 的記錄:使用SQL來實現相關功能//1.生成一個臨時數據表df.createOrReplaceTempView("ecommerce")//2.執行SQL語句val df2 = spark.sql("select * from t1 where quantity > 5")df2.show()}}
-
park添加數據到mysql
前面演示了數據的查詢,現在來看看添加數據到mysql。
【教師講
import org.apache.spark.sql.SparkSessionimport java.util.Propertiesobject SparkSQL02 {//連接hadoop100上的mysql數據庫,讀出spark數據庫中的person表中的內容def main(args: Array[String]): Unit = {val spark =SparkSession.builder().appName("SparkSQL02").master("local[*]").getOrCreate()//創建一個properties對象,用來儲存mysql的連接信息val prop = new Properties()prop.setProperty("user","root")prop.setProperty("password","000000")//添加一條數據到數據庫val data = Seq((3,"zhangsan",30),(4,"lisi",40),(5,"wangwu",50))val df1 = spark.createDataFrame(data).toDF("id","name","age")df1.write.mode("append").jdbc("jdbc:mysql://hadoop100:3306/spark","person",prop)//讀取mysql數據庫中的數據val df = spark.read.jdbc("jdbc:mysql://hadoop100:3306/spark","person",prop)df.show()}}