一、全量同步方案設計
1.1 基礎命令模板
sqoop import \
--connect jdbc:mysql://mysql_host:3306/db_name \
--username user \
--password pass \
--table source_table \
--hive-import \
--hive-table target_table \
--hive-overwrite \ # 覆蓋已有表
--num-mappers 8 \ # 并行度(根據集群資源調整)
--split-by id \ # 分片字段(需為數字類型)
--fields-terminated-by '\001' \
--lines-terminated-by '\n' \
--null-string '\\N' \
--null-non-string '\\N'
1.2 關鍵參數說明
參數 | 說明 | 必須項 |
---|---|---|
--hive-import | 啟用Hive表自動創建 | 否 |
--hive-drop-import-delims | 刪除Hive默認分隔符 | 推薦 |
--mapreduce.job.name | 自定義任務名稱 | 否 |
--autoreset-to-one-mapper | 無主鍵表自動單線程 | 推薦 |
1.3 多表批量處理腳本
#!/bin/bash# 配置參數
DB_CONFIG="mysql_host:3306/db_name"
USER="root"
PASS="password"
HIVE_DB="dw"
TABLES=("user" "order" "product") # 表名列表# 循環處理每個表
for TABLE in "${TABLES[@]}"
doecho "正在同步表: $TABLE"sqoop import \--connect "jdbc:mysql://$DB_CONFIG" \--username $USER \--password $PASS \--table $TABLE \--hive-import \--hive-database $HIVE_DB \--hive-overwrite \--num-mappers $(get_conf $TABLE) \ # 動態獲取并行度--split-by $(get_split_col $TABLE) \ # 動態獲取分片字段--null-string '\\N' \--null-non-string '\\N'
done
二、增量同步方案設計
2.1 Append模式(新增數據)
sqoop import \
--connect jdbc:mysql://mysql_host:3306/db_name \
--username user \
--password pass \
--table source_table \
--hive-import \
--hive-table target_table \
--incremental append \
--check-column update_time \ # 時間戳字段
--last-value '2024-05-01 00:00:00' \
--num-mappers 4
2.2 LastModified模式(更新數據)
sqoop import \
--connect jdbc:mysql://mysql_host:3306/db_name \
--username user \
--password pass \
--table source_table \
--hive-import \
--hive-table target_table \
--incremental lastmodified \
--check-column update_time \
--last-value '2024-05-01 00:00:00' \
--merge-key id \ # 主鍵合并
--num-mappers 4
2.3 自動化增量管理
# 動態獲取最后同步時間
LAST_VALUE=$(hive -e "SELECT MAX(update_time) FROM target_table")# 執行增量同步
sqoop import \
--check-column update_time \
--last-value "$LAST_VALUE" \
--incremental append \
--hive-import \
--hive-table target_table
三、多表同步增強方案
3.1 配置驅動模式
?table_sync.conf?
[user]
table=user
split_col=id
parallel=8[order]
table=order
split_col=order_id
parallel=12[product]
table=product
split_col=product_id
parallel=6
?批量執行腳本?
#!/bin/bashCONFIG_FILE="table_sync.conf"
HIVE_DB="dw"while IFS='=' read -r key value
doif [[ $key == "table" ]]; thenTABLE=${value}echo "處理表: $TABLE"sqoop import \--connect "jdbc:mysql://mysql_host:3306/db_name" \--username user \--password pass \--table $TABLE \--hive-import \--hive-database $HIVE_DB \--hive-overwrite \--num-mappers $(grep "^${TABLE}=" $CONFIG_FILE | cut -d'=' -f2) \--split-by $(grep "^${TABLE}=" $CONFIG_FILE | cut -d'=' -f3) \--null-string '\\N'fi
done < $CONFIG_FILE
3.2 全量+增量混合模式
#!/bin/bash# 全量同步配置
FULL_SYNC_TABLES=("config" "lookup")# 增量同步配置
INCREMENTAL_TABLES=("user" "order")# 執行全量同步
for TABLE in "${FULL_SYNC_TABLES[@]}"
dosqoop import \--table $TABLE \--hive-import \--hive-overwrite
done# 執行增量同步
for TABLE in "${INCREMENTAL_TABLES[@]}"
dosqoop import \--table $TABLE \--incremental append \--check-column update_time \--last-value $(hive -e "SELECT MAX(update_time) FROM $TABLE")
done
四、關鍵問題解決方案
4.1 數據類型映射
# 顯式指定類型映射(解決TINYINT轉BOOLEAN問題)
sqoop import \
--map-column-hive status=STRING \
--map-column-hive is_valid=BOOLEAN
4.2 分區表同步
# 按日期分區
sqoop import \
--hive-partition-key dt \
--hive-partition-value $(date +%Y-%m-%d) \
--where "dt='${DATE}'"
4.3 性能優化參數
# 壓縮傳輸(提升30%網絡效率)
sqoop import \
--compress \
--compression-codec org.apache.hadoop.io.compress.SnappyCodec# 內存調優(避免OOM)
sqoop import \
--driver-memory 4G \
--executor-memory 8G
五、監控與容錯機制
5.1 同步狀態記錄
-- 創建同步狀態表
CREATE TABLE sync_status (table_name VARCHAR(50) PRIMARY KEY,last_sync_time TIMESTAMP,sync_type VARCHAR(20) -- FULL/APPEND
);
5.2 自動重試策略
MAX_RETRY=3
RETRY_COUNT=0while [ $RETRY_COUNT -lt $MAX_RETRY ]; dosqoop import ... || {RETRY_COUNT=$((RETRY_COUNT+1))sleep 300}
done
5.3 異常檢測腳本
#!/bin/bash# 檢查Hive表行數
HIVE_COUNT=$(hive -e "SELECT COUNT(*) FROM target_table")# 檢查MySQL行數
MYSQL_COUNT=$(mysql -uroot -ppass -D db -e "SELECT COUNT(*) FROM source_table")if [ $HIVE_COUNT -ne $MYSQL_COUNT ]; thenecho "數據不一致!差異行數:$(expr $MYSQL_COUNT - $HIVE_COUNT)"# 觸發告警send_alert "Sqoop同步異常"
fi
六、生產環境最佳實踐
-
?元數據管理?
使用sqoop import-all-tables
同步全庫時,需提前在Hive創建對應數據庫 -
?增量同步策略?
增量頻率 | 適用場景 | 檢查字段 ---------|----------|---------- 每分鐘 | 實時日志 | log_ts 每小時 | 交易流水 | update_time 每天 | 統計報表 | dt
-
?資源隔離方案?
# 為不同業務分配獨立隊列 sqoop import \ --queue hadoop_yarn_queue_olap
-
?版本兼容性?
MySQL版本 推薦Sqoop版本 注意事項 5.7 1.4.7 需添加JDBC驅動 8.0 1.4.7 需升級Connector/J