每個人的生活都是一個世界,即使最平凡的人也要為他那個世界的存在而戰斗。
——《平凡的世界》
目錄
一、sqoop簡介
1.1 導入流程
1.2 導出流程
二、使用sqoop
2.1 sqoop的常用參數
2.2 連接參數列表
2.3 操作hive表參數
2.4 其它參數
三、sqoop應用 - 導入
3.1 準備測試數據
3.2 sqoop查看數據
3.3 創建Hive表
3.4 多map條件查詢導入HDFS
3.5 全量導入數據
3.6 增量數據導入
四、sqoop應用 - 導出
4.1 Hive中數據導出到MySQL中
五、總結
一、sqoop簡介
sqoop是Apache旗下的一款?hadoop和關系型數據庫服務器之間傳送數據
?的工具
主要的功能:
- 導入數據
- MySQL、Oracle(關系型數據庫)導入數據到hadoop的HDFS、Hive以及Hbase等數據存儲系統
- 導出數據
- 從Hadoop的文件系統(HDFS等)中導出數據到關系型數據庫(MySQL、PostgreSQL)中
1.1 導入流程
1. 首先通過JDBC讀取關系型數據庫元數據信息,獲取到表結構2. 根據元數據信息生成Java類3. 啟動import程序,通過JDBC讀取關系型數據庫數據,并通過上一步的Java類進行序列化4. MapReduce并行寫數據到Hadoop中,并使用Java類進行反序列化
1.2 導出流程
1.sqoop通過JDBC讀取關系型數據庫元數據,獲取到表結構信息,生成Java類2.MapReduce并行讀取HDFS數據,并且通過Java類進行序列化3.export程序啟動,通過Java類反序列化,同時啟動多個map任務,通過JDBC將數據寫入到關系型數據庫中
二、使用sqoop
環境:CDH 6.2.1
快速體驗sqoop
# 前提是你已經下載好了sqoop
# 直接在命令行中輸入以下命令(這個命令類似于你在MySQL中執行 show databases;)
# 格式:sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username 用戶名 --password 密碼
sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root --password 123456
# 查詢指定庫下面所有表(這個命令類似于你在MySQL中指定庫后執行 show tables;)
# 格式:sqoop list-tables --connect jdbc:mysql://localhost:3306/庫名 --username 用戶名 --password 密碼
sqoop list-tables --connect jdbc:mysql://localhost:3306/ecut --username root --password 123456
2.1 sqoop的常用參數
- 指令
sqoop 命令選項 參數
命令名稱 | 對應類 | 命令說明 |
---|---|---|
import | ImportTool | 將關系型數據庫數據導入到HDFS、HIVE、HBASE |
export | ExportTool | 將HDFS上的數據導出到關系型數據庫 |
codegen | CodeGenTool | 獲取數據庫中某張表數據生成Java并打成Jar包 |
create-hive-table | CreateHiveTableTool | 創建hive的表 |
eval | EvalSqlTool | 查看SQL的執行結果 |
list-databases | ListDatabasesTool | 列出所有數據庫 |
list-tables | ListTablesTool | 列出某個數據庫下的所有表 |
help | HelpTool | 打印sqoop幫助信息 |
version | VersionTool | 打印sqoop版本信息 |
2.2 連接參數列表
參數 | 說明 |
---|---|
–connect | 連接關系型數據庫的URL |
–help | 打印幫助信息 |
–username | 連接數據庫的用戶名 |
–password | 連接數據庫的密碼 |
–verbose | 在控制臺打印出詳細信息 |
2.3 操作hive表參數
參數 | 說明 |
---|---|
–hcatalog-database | 指定hive表的數據庫名稱。如果未指定,則使用默認數據庫名稱(default) |
–hcatalog-table | 指定hive表名,該–hcatalog-table選項的存在表示導入或導出作業是使用HCatalog表完成的,并且是HCatalog作業的必需選項。 |
2.4 其它參數
參數 | 含義 |
---|---|
–num-mappers N | 指定啟動N個map進程 |
–table | 指定數據庫表名 |
–query | 編寫sql語句,將查詢的結果導入,如果查詢中有where條件,則條件后必須加上conditions關鍵字。如果使用雙引號包含sql,則condition關鍵字前要加上*$CONDITIONS* 以完成轉義: |
–target-dir | 指定HDFS路徑 |
–delete-target-dir | 若hdfs存放目錄已存在,則自動刪除 |
–fields-terminated-by | 設置字段分隔符 |
–export-dir | 導出到指定HDFS的目錄路徑 |
三、sqoop應用 - 導入
需求:使用sqoop上傳字典表數據到hive中與我們的數據進行關聯查詢。
3.1 準備測試數據
- 在MySQL中創建測試數據(庫名test_ecut,表名products,總共54條數據)
-- 在MySQL客戶端或者圖形化工具里執行下面代碼
drop database if exists test_ecut;
create database if not exists test_ecut char set utf8;
use test_ecut; -- 使用該數據庫create table test_ecut.products (id int auto_increment primary key,product_name varchar(255),price decimal(10, 2)
);-- 插入一些正常數據
insert into test_ecut.products (product_name, price) values ('商品A', 19.99);
insert into test_ecut.products (product_name, price) values ('商品B', 29.99);
insert into test_ecut.products (product_name, price) values ('商品C', 9.99);
insert into test_ecut.products (product_name, price) values ('商品D', 49.99);
insert into test_ecut.products (product_name, price) values ('商品E', 39.99);-- 插入一些包含空值的數據(這里假設price字段允許為空,實際需根據你的表結構定義來確定是否合理)
insert into test_ecut.products (product_name, price) values ('商品F', null);
insert into test_ecut.products (product_name, price) values ('商品G', null);-- 插入一些重復數據
insert into test_ecut.products (product_name, price) values ('商品A', 19.99);
insert into test_ecut.products (product_name, price) values ('商品B', 29.99);-- 繼續插入更多不同情況的數據以湊夠45條示例(以下為隨機模擬更多數據情況)
insert into test_ecut.products (product_name, price) values ('商品H', 59.99);
insert into test_ecut.products (product_name, price) values ('商品I', 15.99);
insert into test_ecut.products (product_name, price) values ('商品J', 25.99);
insert into test_ecut.products (product_name, price) values ('商品K', 69.99);
insert into test_ecut.products (product_name, price) values ('商品L', 89.99);
insert into test_ecut.products (product_name, price) values ('商品M', null);
insert into test_ecut.products (product_name, price) values ('商品N', 35.99);
insert into test_ecut.products (product_name, price) values ('商品O', 45.99);
insert into test_ecut.products (product_name, price) values ('商品P', 79.99);
insert into test_ecut.products (product_name, price) values ('商品Q', 99.99);
insert into test_ecut.products (product_name, price) values ('商品R', 10.99);
insert into test_ecut.products (product_name, price) values ('商品S', 12.99);
insert into test_ecut.products (product_name, price) values ('商品T', 14.99);
insert into test_ecut.products (product_name, price) values ('商品U', 16.99);
insert into test_ecut.products (product_name, price) values ('商品V', 18.99);
insert into test_ecut.products (product_name, price) values ('商品W', 20.99);
insert into test_ecut.products (product_name, price) values ('商品X', 22.99);
insert into test_ecut.products (product_name, price) values ('商品Y', 24.99);
insert into test_ecut.products (product_name, price) values ('商品Z', 26.99);
insert into test_ecut.products (product_name, price) values ('商品AA', 28.99);
insert into test_ecut.products (product_name, price) values ('商品AB', 30.99);
insert into test_ecut.products (product_name, price) values ('商品AC', 32.99);
insert into test_ecut.products (product_name, price) values ('商品AD', 34.99);
insert into test_ecut.products (product_name, price) values ('商品AE', 36.99);
insert into test_ecut.products (product_name, price) values ('商品AF', 38.99);
insert into test_ecut.products (product_name, price) values ('商品AG', 40.99);
insert into test_ecut.products (product_name, price) values ('商品AH', 42.99);
insert into test_ecut.products (product_name, price) values ('商品AI', 44.99);
insert into test_ecut.products (product_name, price) values ('商品AJ', 46.99);
insert into test_ecut.products (product_name, price) values ('商品AK', 48.99);
insert into test_ecut.products (product_name, price) values ('商品AL', 50.99);
insert into test_ecut.products (product_name, price) values ('商品AM', 52.99);
insert into test_ecut.products (product_name, price) values ('商品AN', 54.99);
insert into test_ecut.products (product_name, price) values ('商品AO', 56.99);
insert into test_ecut.products (product_name, price) values ('商品AP', 58.99);
insert into test_ecut.products (product_name, price) values ('商品AQ', 60.99);
insert into test_ecut.products (product_name, price) values ('商品AR', 62.99);
insert into test_ecut.products (product_name, price) values ('商品AS', 64.99);
insert into test_ecut.products (product_name, price) values ('商品AT', 66.99);
insert into test_ecut.products (product_name, price) values ('商品AU', 68.99);
insert into test_ecut.products (product_name, price) values ('商品AV', 70.99);
insert into test_ecut.products (product_name, price) values ('商品AW', 72.99);
insert into test_ecut.products (product_name, price) values ('商品AX', 74.99);
insert into test_ecut.products (product_name, price) values ('商品AY', 76.99);
insert into test_ecut.products (product_name, price) values ('商品AZ', 78.99);select count(1) from test_ecut.products;
3.2 sqoop查看數據
- 可以借助sqoop中eval查看結果
# 通過eval查看:test_ecut庫下的products表前5條數據
sqoop eval \
--connect jdbc:mysql://localhost:3306/test_ecut \
--username root \
--password 123456 \
--query "select * from products limit 5"
3.3 創建Hive表
前提:你需要啟動hadoop集群(hdfs和yarn),以及hive服務(hiveserver2和metastore)
- 1:在hive中你需要先建庫
-- 通過圖形化工具(datagrip等),執行以下命令
create database hive_ecut;
- 2:使用create-hive-table創建hive表
# 基于MySQL表創建hive表
sqoop create-hive-table \
--connect jdbc:mysql://localhost:3306/test_ecut \
--username root \
--password 123456 \
--table products \
--hive-table hive_ecut.goods_table
- 3:然后通過datagrip工具,查看hive中是否存在表
3.4 多map條件查詢導入HDFS
# 語法
sqoop import \
--connect 數據庫連接字符串 \
--username 數據庫用戶名 \
--password 數據庫密碼 \
--target-dir HDFS位置 \
--delete-target-dir 若hdfs存放目錄以及存在,則自動刪除 \
--fields-terminated-by "\t" \
--num-mappers 3 \
--split-by 切分數據依據 \
--query 'select SQL where 查詢條件 and $CONDITIONS'
解釋:
query
?將查詢結果的數據導入,使用時必須伴隨參?--target-dir
?或?--hive-table
?,如果查詢中有?where條件
?,則條件后必須加上$CONDITIONS關鍵字當?
sqoop
?使用?--query
?執行多個maptask并行運行導入數據時,每個maptask將執行一部分數據的導入,原始數據需要使用?--split-by 某個字段'
?來切分數據,不同的數據交給不同的maptask去處理
maptask
?執行sql腳本時,需要在where條件中添加$CONDITIONS條件,這個是linux系統的變量,可以根據?sqoop
?對邊界條件的判斷,來替換成不同的值,這就是說若?split-by id
?,則?sqoop
?會判斷?id
?的最小值和最大值判斷?id
?的整體區間,然后根據maptask的個數來進行區間拆分,每個maptask執行一定?id
?區間范圍的數值導入任務,如下為示意圖。
- 1:導入文本文件
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--target-dir /user/hive/warehouse/hive_ecut.db/goods_table \
--delete-target-dir \
--fields-terminated-by "\001" \
--num-mappers 3 \
--split-by id \
--query 'select * from products where id < 10 and $CONDITIONS'
3.5 全量導入數據
補充:?導入數據可以分為兩步
第一步,將數據導入到HDFS,默認的臨時目錄是/user/當前操作用戶/mysql表名;
第二步,將導入到HDFS的數據遷移到Hive表,如果hive表不存在,sqoop會自動創建內部表;(我們的是在/user/root/products,通過查看job的configuration的outputdir屬性得知)
第二步很重要,因為有時候報錯并不是你的代碼腳本問題,而是臨時文件存在,在調度的時候運行的其實是臨時文件中的配置job,需要刪除才可以(.Trash和.staging 別刪)
- 導入剛剛的商品數據,如果表不存在會自動創建內部表
# 導入命令
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table products \
--num-mappers 1 \
--delete-target-dir \
--hive-import \
--fields-terminated-by "\001" \
--hive-overwrite \
--hive-table hive_ecut.goods_table_test
3.6 增量數據導入
增量數據導入的兩種方法
方法1:append方式
方法2:lastmodified方式,必須要加–append(追加)或者–merge-key(合并,一般填主鍵)
- 1:按照id增量導入數據
-- MySQL添加一條新的數據
insert into test_ecut.products(product_name, price) values ('無敵絕世小學生',999999)
# 按照id增量導入
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table products \
--num-mappers 1 \
--target-dir /user/hive/warehouse/hive_ecut.db/goods_table_test \
--fields-terminated-by "\001" \
--incremental append \
--check-column id \
--last-value 54
參數解釋:
1)incremental:append或lastmodified,使用lastmodified方式導入數據要指定增量數據是要–append(追加)還是要–merge-key(合并)
2)check-column:作為增量導入判斷的列名
3)last-value:指定某一個值,用于標記增量導入的位置,這個值的數據不會被導入到表中,只用于標記當前表中最后的值。(可以看到sqoop腳本中,我設置的id為54,也就意味著要跳過54而直接從55開始存)
- 2:按照時間增量導入數據
–incremental lastmodified
–append
–check-column 日期字段
在MySQL中重新建表,需要時間字段
-- mysql中新建products_update表
create table if not exists test_ecut.products_update(id int auto_increment primary key,product_name varchar(255),price decimal(10, 2),last_update_time datetime default current_timestamp on update current_timestamp
);
insert into test_ecut.products_update (product_name, price) values ('商品H', 59.99);
insert into test_ecut.products_update (product_name, price) values ('商品I', 15.99);
insert into test_ecut.products_update (product_name, price) values ('商品J', 25.99);
導入數據到hive中
# 在命令行中執行,然后在datagrip中查看數據
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table products_update \
--num-mappers 1 \
--delete-target-dir \
--hive-import \
--fields-terminated-by "\001" \
--hive-overwrite \
--hive-table hive_ecut.goods_update_table
隔一段時間,新增一條數據
-- 在MySQL中,新增
insert into test_ecut.products_update (product_name, price) values ('無敵絕世小學生', 999999);
增量導入更新的數據
# 在命令行中執行,在datagrip中查看
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table products_update \
--num-mappers 1 \
--target-dir /user/hive/warehouse/hive_ecut.db/goods_update_table \
--fields-terminated-by "\001" \
--incremental lastmodified \
--check-column last_update_time \
--last-value '2024-12-28 13:18:00' \
--append# 注意:last-value 的設置是把包括 2024-12-28 13:18:00 時間的數據做增量導入。(所以我給2024-12-28 13:17:59加了1秒)
- 3:按照時間增量并按照主鍵合并導入
–incremental lastmodified
–merge-key 用法
如果之前的數據有修改的話可以使用–incremental lastmodified --merge-key進行數據合并執行修改的SQL
更改字段,從而更新時間
-- 在MySQL中更新
update test_ecut.products_update set product_name = '萌神想' where product_name='無敵絕世小學生';
進行合并導入(如果報錯,可能是因為/user/root/_sqoop存在了很多臨時文件,需要刪除這些臨時文件)
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table products_update \
--num-mappers 1 \
--target-dir /user/hive/warehouse/hive_ecut.db/goods_update_table \
--fields-terminated-by "\001" \
--incremental lastmodified \
--check-column last_update_time \
--last-value '2024-12-28 13:20:24' \
--merge-key id# --incremental lastmodified --merge-key的作用:修改過的數據和新增的數據(前提是滿足last-value的條件)都會導入進來,并且重復的數據(不需要滿足last-value的條件)都會進行合并
四、sqoop應用 - 導出
4.1 Hive中數據導出到MySQL中
sqoop的export命令支持 insert、update到關系型數據庫,但是不支持merge
- 1:查看需要導出表的數據
- 2:新建MySQL表用于接收hive中的數據
create table if not exists test_ecut.get_hive_data(id int auto_increment primary key,product_name varchar(255),price decimal(10, 2),last_update_time datetime default current_timestamp on update current_timestamp
);
- 3:導出到MySQL中
# 導出命令
sqoop export \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table get_hive_data \
--export-dir /user/hive/warehouse/hive_ecut.db/goods_update_table \
--num-mappers 1 \
--fields-terminated-by '\001'
補充:sqoop的作用就是負責導入和導出的,我上面所寫的雖然都在虛擬機上運行,但只要改一下localhost就可以實現不同主機之間的數據傳輸(前提是有映射,且可以互通)
五、總結
? 看完上面的操作之后,很容易發現一個特點,Sqoop其實就是個腳本,而且命令很固定,只需改改參數就可以使用,門檻并不高,能用就行,具體它底層怎么實現的,可以去官網看看(Sqoop已經不更新,雖然是apache的項目,但已經被打入冷宮了),值得一提的是,Sqoop 通常只會使用 Map 任務來完成數據的傳輸,不會啟動 Reduce 任務