最近在GitHub上創建了一個新工程,收集個人在數據工程工作的小工具集合,命名為data_dragon (數據一條龍)。取這個名字的是希望這些腳本或代碼能夠復用,端到端地減少臨時數據處理的時間。
最近因為工作上的一些變化,寫作節奏有點被打亂,已經有2個月沒有更新文章了。這次剛好將最近創建的小工程做個介紹。首先這個工程都是些零散的代碼腳本,目前上傳了3個,有Python,有Bash shell。后期,可能還有用sed寫的HLR上的IMSI處理腳本,Java寫的Hive UDF……總之,就是一些在實際工作中為了避免重復勞動的臨時代碼。
放在線上,就是為了方便以后遇到相同的問題可以重用,該工程的代碼地址是https://github.com/camash/data_dragon。
工程中,每一個文件夾都是一個獨立的小工具,用于解決一個獨立的問題。目前,已經上傳的三個腳本分別用于“快速生成Azkaban任務調度的DAG”,“hive同步數據到ES索引”以及“傳輸大量SFTP文件以及檢查”。以下,分別對上傳的三個腳本做簡要介紹。
generate_azkaban_flow
作用
在Excel中配置任務依賴關系,然后使用shell腳本快速生成Azkaban的job文件。
使用方法
- 創建文件 在Excel或者其它表格軟件中,按如下結構創建
編號 | 任務名稱 | 任務調用腳本 | 依賴 |
---|---|---|---|
000 | start_kettle | ||
001 | test | /home/hadoop/test/kettle/all/test.sh resource | 000_start_kettle |
002 | end_job | /home/hadoop/test/kettle/all/end_job.sh | 000_start_kettle, 001_test |
- 執行轉換 復制到文本文件中,保存為tsv文件,比如test.txt。然后執行shell腳本。
bash?./gen_azkaban_flow.sh?test.txt
執行之后在文件所在路徑內會生成以編號_任務名稱.job
的Azkaban任務文件。文件數量等同于tsv文件中的行數。
ls?-1?*.job
000_start_kettle.job
001_test.job
002_end_job.job
- job文件內容
主要包含執行項和依賴項,依賴項就是最終生成任務DAG的邊。同時,這個腳本中默認會給執行命令加入執行日期參數,若不需要可以通過修改shell命令實現。
$?cat?002_end_job.job?
type=command
dependencies=000_start_kettle,?001_test
command=/bin/bash?/home/hadoop/test/kettle/all/end_job.sh?'${azkaban.flow.start.year}${azkaban.flow.start.month}${azkaban.flow.start.day}'
使用總結
可以通過先在表格中規則的梳理任務流,避免了任務太多時直接寫job文件容易遺漏的情況。梳理完成之后,使用該腳本一次性生成所有的job,秒秒鐘完成。
hive_to_elasticsearch
作用
將Hive表中的數據導入到Elasticsearch的索引中。
使用方法
腳本是通過Python3編寫的,因此使用Python3調用即可。
python3?hive_records_to_es.py
其中,Hive地址和Elasticsearch的地址放在connection.cfg
文件中,樣例如下:
[hive]
host?=?192.168.1.4
port?=?10000
user?=?hadoop
[es]
es_url_1?=?http://192.168.1.6:7200/
es_url_2?=?http://192.168.1.7:7200/
es_url_3?=?http://192.168.1.8:7200/
另外,表名和索引名在腳本中是靜態賦值,后期需要動態傳入。
scp_copy_and_check
作用
從遠程SFTP同步文件指定文件夾下的所有文件至本地的指定路徑。在傳輸前后,可以對源和目標系統上的文件數量進行校驗(也可以支持其它校驗和方式)。同時在傳輸前,對源的文件數量可以設置一定閾值,數量過少直接報異常退出程序。
使用方法
腳本是使用bash shell進行編寫的,需要傳入日期參數來確認文件夾的路徑,參數格式為YYYYMMDD
。若不傳入參數,則會取執行日期做為默認參數。調用方式如下:
bash?scp_log_file.sh?20200916