DataX 框架學習筆記

官方倉庫

https://github.com/alibaba/DataX?tab=readme-ov-file

1. 介紹

1.1. 基本介紹:

DadaX 是阿里云 DataWorks 數據集成 的開源版本(異構數據同步、離線數據同步工具 / 平臺)。主要抽象為 Reader 和 Writer 插件,管理源數據和目標數據的讀和寫。

1.2. DataX 3.0 框架(插件式、Reader、writer ):

Reader:Reader 為數據采集模塊,負責采集數據源的數據,將數據發送給 Framework。

  • Writer: Writer 為數據寫入模塊,負責不斷向 Framework 取數據,并將數據寫入到目的端
  • Framework:Framework 用于連接 reader 和 writer,作為兩者的數據傳輸通道,并處理緩沖,流控,并發,數據轉換等核心技術問題。目前支持單機多線程的模式完成同步作業任務

1.2.1. 官方例子:

  • 單個 Job 可以分為多個 task,一個 TaskGroup 可以按設置的并發度執行任務。
  • 核心模塊介紹:
  1. DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
  2. DataX作業運行起來之后, Job監控并等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0
  3. 每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
  4. 切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的并發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的并發運行完畢分配好的所有Task,默認單個任務組的并發數量為5。
  5. DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便于并發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
  • DataX調度流程:

舉例來說,用戶提交了一個DataX作業,并且配置了20個并發,目的是將一個100張分表的mysql數據同步到odps里面。 DataX的調度決策思路是:

  1. DataXJob根據分庫分表切分成了100個Task。
  2. 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個并發共計運行25個Task。
  3. 根據20個并發,DataX計算共需要分配4個TaskGroup。
  • 數據來源多、封裝成網頁平臺(數據平臺)優先選擇 DataX (單進程多線程、日志完善)

1.3. 安裝使用

  1. 由于 datax 依賴于 java 1.8 及 以上,和 python 2 (3) 均可以,需要提前安裝 JDK 和 Conda 環境。
  2. datax 安裝:直接下載 DataX工具包:DataX下載地址 下載后解壓至本地某個目錄,進入bin目錄,即可運行同步作業:
$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
# 自檢腳本:    
$ python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json

2. 案例 Demo

2.1. Stream To Stream Demo

  1. 可以在datax目錄下使用 bin/datax.py -r YOURreader -w YOURwriter查看當前組合的 json 配置模板。

  1. 書寫配置文件,將配置文件放入 datax/job路徑下,執行python bin/datax.py job/demo.json命令。
{"job": {"content": [{"reader": {"name": "streamreader","parameter": {"column": [{"type": "string","value": "xy"},{"type": "string","value": "25"}],"sliceRecordCount": "10"}},"writer": {"name": "streamwriter","parameter": {"encoding": "UTF-8","print": true}}}],"setting": {"speed": {"channel": "1"}}}
}

2.2. 從 Mysql 讀取數據存放到 HDFS

  • HDFS 是為大數據而生的分布式文件系統,具備高容錯、高吞吐、強擴展性,適合存儲海量的結構化和非結構化數據

  • mysqlreader 參數解析:

  • hdfswrite 參數解析:

  • 其參數配置如下:
{"job": {"content": [{"reader": {"name": "mysqlreader",  "parameter": {"column": [],"connection": [{"jdbcUrl": [],"table": [],"querySql":[]}],"password": "","username": "","where": ""}},"writer": {"name": "hdfswriter","parameter": {"column": [],"compress": "","defaultFS": "","fieldDelimiter": "","fileName": "","fileType": "","path": "","writeMode": ""}}}],"setting": {"speed": {"channel": ""}}}
}
  • 如何鏈路中有部分線程的內容失敗,datax 會回滾部分成功的數據。

2.2.1. Mysql (準備) 建庫建表

$ mysql -uroot -p
create database datax;
use datax;
create table student(id int, name varchar(20));
// 插入數據
insert into student values(1001,'zhangsan'),(1002,'lisi'),(1003,'wangwu');

2.2.2. hdfs 準備

  1. 安裝 hdfs 需要前置準備 Java 8+。
  2. 下載并解壓 Hadoop:
wget https://downloads.apache.org/hadoop/common/hadoop-3.4.1/hadoop-3.4.1.tar.gz
tar -zxvf hadoop-3.4.1.tar.gz -C ~ # 解壓到根路徑下
cd hadoop-3.4.1
  1. 配置環境變量:
vim ~/.bashrc
# 加入以下內容:
export HADOOP_HOME=你的路徑/hadoop-3.4.1
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATHsource ~/.bashrc
  1. 配置 HDFS , 進入 $HADOOP_HOME/etc/hadoop/,修改如下文件 :

core-site.xml:

<configuration><property><name>fs.defaultFS</name><value>hdfs://localhost:9000</value></property>
</configuration>

hdfs-site.xml:

<configuration><property><name>dfs.replication</name><value>1</value>  <!-- 單節點設為 1 --></property><property><name>dfs.namenode.name.dir</name><value>file:///home/YOURUSER/hadoopdata/hdfs/namenode</value></property><property><name>dfs.datanode.data.dir</name><value>file:///home/YOURUSER/hadoopdata/hdfs/datanode</value></property>
</configuration>

替換 youruser 為當前用戶名,并確保創建了對應目錄:

mkdir -p ~/hadoopdata/hdfs/namenode
mkdir -p ~/hadoopdata/hdfs/datanode

5. 格式化并啟動 HDFS,并驗證啟動。

# 格式化 HDFS
hdfs namenode -format# 啟動 NameNode 和 DataNode
start-dfs.sh# 查看 Java 進程(有 namenode 和 datanode 即成功)
jps# 輸出類似:
12345 NameNode
12346 DataNode# 測試命令是否可用:
hdfs dfs -mkdir /demo
hdfs dfs -ls /

6. 測試 JSON,以及測試結果

{"job": {"content": [{"reader": {"name": "mysqlreader",  "parameter": {"column": ["id","name"],"connection": [{"jdbcUrl": ["jdbc:mysql://localhost:3306/datax?useUnicode=true&characterEncoding=UTF-8"],"table": ["student"]}],"password": "123456","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "int"},{"name": "name","type": "string"}],"defaultFS": "hdfs://localhost:9000","fieldDelimiter": "|","fileName": "student.txt","fileType": "text","path": "/","writeMode": "append"}}}],"setting": {"speed": {"channel": "1"}}}
}

內部有保證數據一致性的判斷,當多線程環境執行完后需要對比是否全部任務成功,否則觸發回滾機制。

  • 可以看到管理界面下有成功同步的兩個 student 文件。

3. DataX 原理分析

3.1. datax.py

  • 配置參數:
  • -j 可以指定 jvm 參數,可以用于配置堆內存。
  • 在該文件中啟動了 java 執行 com.alibaba.datax.core.Engine 啟動類。

3.2. JobContainor

有完整的生命周期管理(對應上方結構圖的描述):

  1. init()
    1. reader 初始化。(根據不同的配置文件)
    2. writer 初始化。
  1. prepare()
    1. 調用插件,做一些基本準備工作,清理等。
  1. split()
    1. task 切分邏輯
  1. schedule()
  2. post()

3.2.1. Task 切分

  • 調整 channel 數量(并發數的確定)

在配置中主要分三個模塊 reader writer setting,setting 中可以配置 channel、byte、record(數據條數),源碼中選擇如果設定了 數據量或帶寬的速度,算出來的 channel 以小的為主。直接指定channel 的優先級最低。

3.2.2. Schedule 調度

根據拆分中計算的任務數和并發數取最小值,避免資源浪費。

  • assignFairly 將 task 公平的分配到 taskgroup

3.3. 優化(并發、限流)

  • job.setting.speed.channel : channel 并發數
  • job.setting.speed.record : 全局配置 channel 的 record 限速
  • job.setting.speed.byte:全局配置 channel 的 byte 限速
  • core.transport.channel.speed.record:單個 channel 的 record 限速
  • core.transport.channel.speed.byte:單個 channel 的 byte 限速

3.3.1. 優化1:提升每個 channel 的速度

在 DataX 內部對每個 Channel 會有嚴格的速度控制

  1. 控制每秒同步的記錄數 record。
  2. 每秒同步的字節數 byte,默認的速度限制是 1MB/s,可以根據具體硬件情況設

置這個 byte 速度或者 record 速度,一般設置 byte 速度,比如:我們可以把單個 Channel 的

速度上限配置為 5MB。

3.3.2. 優化 2:提升 DataX Job 內 Channel 并發數

并發數 = taskGroup 的數量 * 每個 TaskGroup 并發執行的 Task 數 (默認為 5)。

提升 job 內 Channel 并發有三種配置方式:

  1. Channel 個數 = 全局 Byte 限速 / 單 Channel Byte 限速。
  2. Channel 個數 = 全局 Record 限速 / 單 Channel Record 限速。
  3. 直接配置 Channel 個數。

3.3.3. 優化 3:提高 JVM 堆內存

當提升 DataX Job 內 Channel 并發數時,內存的占用會顯著增加,因為 DataX 作為數據

交換通道,在內存中會緩存較多的數據。例如 Channel 中會有一個 Buffer,作為臨時的數據

交換的緩沖區,而在部分 Reader 和 Writer 的中,也會存在一些 Buffer,為了防止 OOM 等錯

誤,調大 JVM 的堆內存。

建議將內存設置為 4G 或者 8G,這個也可以根據實際情況來調整。

調整 JVM xms xmx 參數的兩種方式:一種是直接更改 datax.py 腳本;另一種是在啟動

的時候,加上對應的參數,如下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json

-Xms8G JVM 堆內存 初始大小

-Xmx8G JVM 堆內存 最大大小

設置一致防止內存抖動 。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/909462.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/909462.shtml
英文地址,請注明出處:http://en.pswp.cn/news/909462.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MaxCompute的Logview分析詳解

文章目錄 一、Logview簡介1、概述2、標題與功能3、基礎信息 二、作業詳情1、Job Details2、Fuxi Sensor3、Result①當前作業運行成功&#xff0c;顯示的為運行結果。②當前作業運行失敗&#xff0c;顯示的為失敗原因。 4、SourceXML5、SQL Script6、History7、SubStatusHistory…

HTML5白云飄飄動態效果教程

HTML5白云飄飄動態效果教程 這里寫目錄標題 HTML5白云飄飄動態效果教程效果介紹實現步驟步驟一&#xff1a;創建HTML結構步驟二&#xff1a;設計CSS樣式步驟三&#xff1a;添加JavaScript交互 代碼解析HTML結構解析CSS樣式解析JavaScript功能解析 自定義調整總結 效果介紹 本教…

tcp高難度問題

以下是針對這些問題&#xff0c;在面試場景下&#xff0c;既保證理論扎實、邏輯清晰&#xff0c;又具備交流延展性的回答思路與內容&#xff0c;可根據實際面試節奏和面試官反饋靈活調整展開&#xff1a; 1. 客戶端端口號如何確定的&#xff1f; 面試官您好&#xff0c;客戶端…

廣東省省考備考(第二十八天6.13)—資料分析(第二節課)

基期與現期 官方定義&#xff1a;作為對比參照的是基期&#xff0c;而相對于基期比較的是現期 通俗說法&#xff1a;時間靠前的為基期&#xff0c;時間靠后的為現期 增長量與增長率 增長量用來表述基期量與現期量變化的絕對量&#xff1b; 增長率用來表述基期量與現期量變化…

pytorch 中前向傳播和后向傳播的自定義函數

系列文章目錄 文章目錄 系列文章目錄一、torch.autograd.function代碼實例 在開始正文之前&#xff0c;請各位姥爺動動手指&#xff0c;給小店增加一點訪問量吧&#xff0c;點擊小店&#xff0c;同時希望我的文章對你的學習有所幫助。本文也很簡單&#xff0c;主要講解pytorch的…

【項目實訓#08】HarmonyOS知識圖譜前端可視化實現

【項目實訓#08】HarmonyOS知識圖譜前端可視化實現 文章目錄 【項目實訓#08】HarmonyOS知識圖譜前端可視化實現一、背景簡介二、技術方案與架構設計2.1 技術選型2.2 組件架構設計 三、知識圖譜可視化組件實現3.1 KGResultTab組件設計組件模板結構不同狀態的處理用戶交互控制節點…

【軟件開發】什么是DSL

什么是DSL DSL&#xff08;Domain-Specific Language&#xff0c;領域特定語言&#xff09;是一種為特定領域或任務設計的編程語言&#xff0c;目的在于提高該領域中的表達能力與開發效率。 1 在腳本語言中的 DSL 是什么&#xff1f; 在腳本語言&#xff08;如 Python、Lua、…

JasperReport生成PDF/A類型文檔

當JasperReport導出的文檔為PDF/A模式時&#xff0c;該PDF為只讀可以防止被修改。 設置導出參數 JRPdfExporter exporter new JRPdfExporter();exporter.setExporterInput(SimpleExporterInput.getInstance(jasperPrints));exporter.setExporterOutput(new SimpleOutputStre…

微信小程序使用畫布實現飄落泡泡功能

微信小程序使用畫布實現飄落泡泡功能&#xff1a;從組件封裝到頁面調用的完整實踐 先看示例截圖&#xff1a; 一、背景與技術選型 在微信小程序中實現類似于飄落的泡泡或者櫻花飄落的功能&#xff0c;一般主要有 Canvas 和圖片兩種方案&#xff1a; &#xff08;1&#xff…

使用STM32設置GPIO中斷

使用S? 32設置GPIO中斷 中斷示例按鍵中斷實例設計&#xff1a;EXTI0和EXTI9硬件連接分析STM32代碼實現代碼說明 中斷示例 設計一個按鍵中斷的實例。設置兩個中斷&#xff1a;EXTI0、EXTI9&#xff0c; 在EXTI9的中斷服務之程序中實現LED燈的控制 按鍵中斷實例設計&#xff…

解決在微信小程序中view組件下的text和images設置了樣式display: flex; align-items: center;對不齊

原始代碼的問題 <view style"display: flex; align-items: center;"><text style"line-height: 1;">全國</text><image src"/images/xia.png" style"height: 20rpx; width: 20rpx; display: block;"></im…

歸并排序詳解:優雅的分治藝術

什么&#xff1f;歸并排序&#xff1f;這讓博主想起了大學那會被《數據結構與算法》支配的恐懼… 哈哈言歸正傳&#xff0c;一直想對算法做一個專欄&#xff0c;因為其實工作中很少很少有機會用到算法&#xff0c;倒是很多工具方法底層會使用&#xff0c;工作被各種需求業務“折…

新零售視域下實體與虛擬店融合的技術邏輯與商業模式創新——基于開源AI智能名片與鏈動2+1模式的S2B2C生態構建

摘要&#xff1a;新零售的核心在于打破線上線下邊界&#xff0c;構建“人、貨、場”的全場景融合生態。本文提出&#xff0c;實體線下店與虛擬店的協同發展是新零售的重要演進方向&#xff0c;其底層邏輯在于滿足消費者作為“現實人”的體驗需求與“虛擬人”的效率需求。通過引…

可視化圖解算法51:尋找第K大(數組中的第K個最大的元素)

牛客網 面試筆試 TOP101 | LeetCode 215. 數組中的第K個最大元素 1. 題目 描述 有一個整數數組&#xff0c;請你找出數組中第 k 大的數。 給定一個整數數組 a ,同時給定它的大小n和要找的 k &#xff0c;請返回第 k 大的數(包括重復的元素&#xff0c;不用去重)&…

DataWhale-零基礎網絡爬蟲技術(一)

課程鏈接先給各位 ↓↓↓ &#xff08;點擊即可食用.QAQ Datawhale-學用 AI,從此開始 一、引言 還是在筆記的開始&#xff0c;嘮嘮一些自己的故事 十年前第一次接觸網絡&#xff0c;也可以說是第一次接觸計算機的時候&#xff0c;那時候還是在中學階段&#xff0c;那時候大…

Linux02

目錄 linux常用命令 用戶和權限 壓縮和解壓縮 其他相關命令 Linux中安裝常用軟件 1.1. jdk的安裝 1.1.1. 卸載linux中自帶的open-jdk 1.1.2. 把安裝包上傳到 linux上 1.1.3. 解壓安裝包 1.1.4. 配置環境變量 1.1.5 驗證環境變量 1.3 安裝mysql 1.3.1. 檢查依賴 1.…

JavaSE超詳細筆記-網絡編程篇-基于黑馬

1. 什么是網絡編程【理解】 1.1 概念 在網絡通信協議下&#xff0c;不同計算機上運行的程序&#xff0c;進行的數據傳輸。 應用場景: 即時通信、網游對戰、金融證券、國際貿易、郵件、等等。 不管是什么場景&#xff0c;都是計算機跟計算機之間通過網絡進行數據傳輸Java中可以使…

時序數據庫Influxdb3 core安裝

本文介紹時序數據庫Influxdb3 core(開源版本)的安裝和簡單使用以及調優參數的介紹。 預期&#xff1a; 安裝時序數據庫Influxdb3 core 創建數據庫mydb 寫入數據&#xff1b; 使用influxdb3-cli 和 grafana2種方式查詢寫入的數據 前期準備&#xff1a; linux服務器(本文服…

區間合并:區間合并問題

區間合并&#xff1a;區間合并問題 區間合并 www.acwing.com/problem/content/805/ 按區間的左端點排序 掃描整個區間&#xff0c;在這過程中把可能有交點的區間合并 全包含&#xff1a;不做改動相交&#xff1a;right 后移相離&#xff1a;更新至下一個維護區間 import j…

中國古代數學符號的演進 | 算籌 / 符號 / 算法

注&#xff1a;本文為“中國古代數學符號”相關合輯。 圖片清晰度受引文原圖所限。 略作重排&#xff0c;未整理去重。 如有內容異常&#xff0c;請看原文。 這個中國古代的數學瑰寶&#xff0c;到底厲害在哪&#xff1f; 原創 朱一文 科普中國 2024 年 07 月 31 日 15:30 北…