Sqoop數據采集格式問題
- 一、Sqoop工作原理
- 二、Sqoop命令格式
- 三、Oracle數據采集格式問題
- 四、Sqoop增量采集方案
Apache Sqoop是一款開源的工具,主要用于在Hadoop(Hive)與傳統的數據庫(mysql、postgresql…)間進行數據的傳遞,可以將一個關系型數據庫(例如 : MySQL ,Oracle ,Postgres等)中的數據導進到Hadoop的HDFS中,也可以將HDFS的數據導進到關系型數據庫中。
Sqoop項目開始于2009年,最早是作為Hadoop的一個第三方模塊存在,后來為了讓使用者能夠快速部署,也為了讓開發人員能夠更快速的迭代開發,Sqoop獨立成為一個Apache項目。
一、Sqoop工作原理
-
數據導入:Sqoop通過MapReduce任務來實現數據的并行導入。首先,它會將關系型數據庫中的數據表按照一定的規則進行分區,然后為每個分區啟動一個Map任務,同時從數據庫中讀取相應分區的數據,并將數據寫入到HDFS或其他Hadoop存儲系統中。這樣可以充分利用Hadoop集群的分布式計算能力,提高數據導入的效率。
-
導出過程:與導入類似,Sqoop也會將數據進行分區處理,然后通過Map任務將Hadoop中的數據讀取出來,并按照目標關系型數據庫的格式要求,將數據寫入到數據庫中。
Sqoop通過創建一個數據傳輸的MR程序,進而實現數據傳輸。
Sqoop安裝:
- JAVA環境配置
- Hadoop環境配置
- 相關數據庫驅動包
只要環境滿足以上設置,直接解壓Sqoop安裝包即可安裝,修改配置后即可使用。
二、Sqoop命令格式
基礎使用語法:
sqoop import | export \
--數據庫連接參數
--HDFS或者Hive的連接參數
--配置參數
數據傳輸常用參數:
選項 | 參數 |
---|---|
–connect | jdbc:mysql://hostname:3306(數據庫連接URL) |
–username | 數據庫用戶名 |
–password | 數據庫用戶密碼 |
–table | 指定數據表 |
–columns | 指定表列值 |
–where | 數據過濾條件 |
–e/–query | 自定義SQL語句 |
–driver | 指定數據庫驅動 |
–delete-target-dir | 導入數據時,清空目標目錄 |
–target-dir | 指定導入數據的目錄(通常為HDFS路徑) |
–export-dir | 指定導出數據的源目錄(通常為HDFS路徑) |
Sqoop命令的使用方法可以通過sqoop -h命令查看相關使用方法,此處不在贅述了
三、Oracle數據采集格式問題
場景:
-
Step1: 查看業務數據庫中 CISS_SERVICE_WORKORDER 表的數據條數。
select count(1) as cnt from CISS_SERVICE_WORKORDER; 178609條
-
Step2: 采集CISS_SERVICE_WORKORDER的數據到HDFS上
sqoop import \ --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \ --username ciss \ --password 123456 \ --table CISS4.CISS_SERVICE_WORKORDER \ --delete-target-dir \ --target-dir /test/full_imp/ciss4.ciss_service_workorder \ --fields-terminated-by "\001" \ #指定數據分割符 -m 1 #指定并行度
-
Step3: 使用Hive查看導入數據表的行數
create external table test_text( line string # 將導入的數據一行作為表中的一列 ) location '/test/full_imp/ciss4.ciss_service_workorder'; select count(*) from test_text; 195825條
問題:
Sqoop采集完數據后,HDFS數據中存儲的數據行數跟源數據庫的數據量不符合。
原因:
- sqoop以文本格式導入數據時,默認的換行符是特殊字符。
- Oracle中的數據列中如果出現了\n、\r、\t等特殊字符,就會被劃分為多行
Oracle數據:
id | name | age |
---|---|---|
001 | zhang\nsan | 18 |
Sqoop轉換后的數據:
001 | zhang |
san | 18 |
Hive表中的數據:
id | name | age |
---|---|---|
001 | zhang | |
san | 18 |
解決方法:
- 方案一:
- 刪除或者替換數據中的換行符
- Sqoop參數 --hive-drop-import-delims 刪除換行符
- Sqoop參數 --hive-delims-replacement char 替換換行符
不建議使用,破壞原始數據結構,ODS層數據盡量抱持原結構
- 方案二:
- 采用特殊的存儲格式,AVRO格式
常見的文件格式介紹:
類型 | 介紹 |
---|---|
TextFile | Hive默認的文件格式,最簡單的數據格式,便于查看和編輯,耗費存儲空間,I/O性能較低 |
SequenceFile | 含有鍵值對的二進制文件,優化磁盤利用率和I/O,并行操作數據,查詢效率高,但存儲空間消耗最大 |
AvroFile | 特殊的二進制文件,設計的主要目標是為了滿足schema evolution,Schema和數據保存在一起 |
OrcFile | 列式存儲,Schema存儲在footer中,不支持schema evolution,高度壓縮比并包含索引,查詢速度非常快 |
ParquetFile | 列式存儲,與Orc類似,壓縮比不如Orc,但是查詢性能接近,支持的工具更多,通用性更強 |
Avro格式特點
- 優點
- 二進制數據存儲,性能好、效率高
- 使用JSON描述模式,支持場景更豐富
- Schema和數據統一存儲,消息自描述(將表中的一行數據作為對象存儲,并且Schema為元數據)
- 模式定義允許定義數據的排序
- 缺點
- 只支持Avro自己的序列化格式
- 少量列的讀取性能比較差,壓縮比較低
- 場景:基于行的大規模結構化數據寫入、列的讀取非常多或者Schema變更操作比較頻繁的場景
Sqoop使用Avro格式:
sqoop import \-Dmapreduce.job.user.classpath.first=true \--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \--username ciss \--password 123456 \--table CISS4.CISS_SERVICE_WORKORDER \--delete-target-dir \--target-dir /test/full_imp/ciss4.ciss_service_workorder \--as-avrodatafile \ # 選擇文件存儲格式為AVRO--fields-terminated-by "\001" \-m 1
Hive建表指定文件的存儲格式:
create external table test_avro(
line string
)
stored as avro
location '/test/full_imp/ciss4.ciss_service_workorder';
AVRO 數據以 二進制序列化 存儲,字段通過預定義的 模式(Schema) 解析,而非依賴分隔符,即使字段內容包含逗號、換行符等特殊字符,也不會影響數據結構的正確性。
Schema 定義(JSON 格式),明確描述了字段名稱、類型、順序等信息。
四、Sqoop增量采集方案
Sqoop 支持兩種增量模式:
-
append 模式:
適用于 僅追加數據 的表(如日志表),基于 遞增列(如自增主鍵 id)采集新數據。 -
lastmodified 模式:
適用于 數據會更新 的表(如用戶表),基于 時間戳列(如 last_update_time)采集新增或修改的數據。
append模式要求源數據表具備自增列,如建表時設置的自增id
lastmodified模式要求源數據表具有時間戳字段。
Append模式:
要求:必須有一列自增的值,按照自增的int值進行判斷
特點:只能導入增加的數據,無法導入更新的數據
場景:數據只會發生新增,不會發生更新的場景
sqoop import \ # 執行數據導入操作--connect jdbc:mysql://node3:3306/sqoopTest \ # 連接MySQL數據庫(地址:node3,數據庫名:sqoopTest)--username root \ # 數據庫用戶名:root--password 123456 \ # 數據庫密碼:123456--table tb_tohdfs \ # 要導入的源表:tb_tohdfs--target-dir /sqoop/import/test02 \ # HDFS目標目錄(數據將寫入此路徑)--fields-terminated-by '\t' \ # 字段分隔符為制表符(\t)--check-column id \ # 指定增量檢查列:id(通常是自增主鍵)--incremental append \ # 增量模式為“append”(僅導入新數據)--last-value 0 \ # 上次導入的id最大值(初始值為0,首次導入id>0的數據)-m 1 # 使用1個Map任務(單線程)
appebd模式使用last-value記錄上次導入的數據id最大值,初次導入一般為全量導入,即id>0
此處的last_value需要手動填寫,因此可以使用Sqoop的job管理進行自動記錄。
sqoop job --create my_job -- import ... --incremental append --check-column id --last-value 0
sqoop job --exec my_job # 自動更新 last-value
lastmodified模式:
要求:必須包含動態時間變化這一列,按照數據變化的時間進行判斷
特點:既導入新增的數據也導入更新的數據
場景:表中的記錄會新增或更新,且每次更新都會修改 lastmode 時間戳。一般無法滿足要求,所以不用。
sqoop import \ # 執行數據導入操作--connect jdbc:mysql://node3:3306/sqoopTest \ # 連接MySQL數據庫(地址:node3,數據庫名:sqoopTest)--username root \ # 數據庫用戶名:root--password 123456 \ # 數據庫密碼:123456--table tb_lastmode \ # 要導入的源表:tb_lastmode--target-dir /sqoop/import/test03 \ # HDFS目標目錄(數據將寫入此路徑)--fields-terminated-by '\t' \ # 字段分隔符為制表符(\t)--incremental lastmodified \ # 增量模式為“lastmodified”(采集新增或修改的數據)--check-column lastmode \ # 指定時間戳列:lastmode(記錄數據的更新時間)--last-value '2021-06-06 16:09:32' \ # 上次導入的最大時間值(導入此時間之后的新增/修改數據)-m 1 # 使用1個Map任務(單線程)
lastmodified模式使用時間戳記載數據的更新線。
若同一條記錄被多次更新,且 lastmode 時間超過 --last-value,Sqoop 會多次導入該記錄。
解決方案:添加 --merge-key <主鍵列> 參數,合并新舊數據(基于主鍵去重):
--merge-key id # 假設 id 是主鍵列
自定義模式:
要求:每次運行的輸出目錄不能相同
特點:自己實現增量的數據過濾,可以實現新增和更新數據的采集
場景:一般用于自定義增量采集每天的分區數據到Hive
sqoop import \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file file:///export/data/sqoop.passwd \
--query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \$CONDITIONS " \
--delete-target-dir \
--target-dir /nginx/logs/tb_order/daystr=2021-09-14 \
--fields-terminated-by '\t' \
-m 1
自定義模式可以根據設置的sql進行數據導入,因此是最常用的場景。