SpringBoot整合DataX數據同步(自動生成job文件)

SpringBoot整合Datax數據同步

文章目錄

  • SpringBoot整合Datax數據同步
    • 1.簡介
        • 設計理念
      • DataX3.0框架設計
      • DataX3.0核心架構
        • 核心模塊介紹
        • DataX調度流程
    • 2.DataX3.0插件體系
    • 3.數據同步
      • 1.編寫job的json文件
      • 2.進入bin目錄下,執行文件
    • 4.SpringBoot整合DataX生成Job文件并執行
        • 1.準備工作
        • 2.文件目錄如圖
        • 3.Mysql數據同步
        • 4.Elasticsearch寫入Mysql數據
    • 5.Job文件參數說明
      • 1.MysqlReader
      • 2.MysqlWriter
      • 3.ElasticsearchWriter

1.簡介

DataX 是阿里云 DataWorks數據集成 的開源版本,在阿里巴巴集團內被廣泛使用的離線數據同步工具/平臺。DataX 實現了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各種異構數據源之間高效的數據同步功能。

DataX 是一個異構數據源離線同步工具,致力于實現各種異構數據源之間穩定高效的數據同步功能。

Download DataX下載地址

Github主頁地址:https://github.com/alibaba/DataX

請點擊:Quick Start

在這里插入圖片描述

  • 設計理念

    為了解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。

DataX3.0框架設計

在這里插入圖片描述

DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。

  • Reader:Reader為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
  • Writer: Writer為數據寫入模塊,負責不斷向Framework取數據,并將數據寫入到目的端。
  • Framework:Framework用于連接reader和writer,作為兩者的數據傳輸通道,并處理緩沖,流控,并發,數據轉換等核心技術問題。

DataX3.0核心架構

DataX 3.0 開源版本支持單機多線程模式完成同步作業運行,本小節按一個DataX作業生命周期的時序圖,從整體架構設計非常簡要說明DataX各個模塊相互關系。

在這里插入圖片描述

核心模塊介紹
  1. DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
  2. DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便于并發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
  3. 切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的并發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的并發運行完畢分配好的所有Task,默認單個任務組的并發數量為5。
  4. 每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
  5. DataX作業運行起來之后, Job監控并等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0
DataX調度流程

舉例來說,用戶提交了一個DataX作業,并且配置了20個并發,目的是將一個100張分表的mysql數據同步到odps里面。 DataX的調度決策思路是:

  1. DataXJob根據分庫分表切分成了100個Task。
  2. 根據20個并發,DataX計算共需要分配4個TaskGroup。
  3. 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個并發共計運行25個Task。

2.DataX3.0插件體系

DataX目前已經有了比較全面的插件體系,主流的RDBMS數據庫、NOSQL、大數據計算系統都已經接入,目前支持數據如下圖,詳情請點擊:DataX數據源參考指南

類型數據源Reader(讀)Writer(寫)文檔
RDBMS 關系型數據庫MySQL讀 、寫
Oracle讀 、寫
OceanBase讀 、寫
SQLServer讀 、寫
PostgreSQL讀 、寫
DRDS讀 、寫
Kingbase讀 、寫
通用RDBMS(支持所有關系型數據庫)讀 、寫
阿里云數倉數據存儲ODPS讀 、寫
ADB
ADS
OSS讀 、寫
OCS
Hologres
AnalyticDB For PostgreSQL
阿里云中間件datahub讀 、寫
SLS讀 、寫
圖數據庫阿里云 GDB讀 、寫
Neo4j
NoSQL數據存儲OTS讀 、寫
Hbase0.94讀 、寫
Hbase1.1讀 、寫
Phoenix4.x讀 、寫
Phoenix5.x讀 、寫
MongoDB讀 、寫
Cassandra讀 、寫
數倉數據存儲StarRocks讀 、寫
ApacheDoris
ClickHouse讀 、寫
Databend
Hive讀 、寫
kudu
selectdb
無結構化數據存儲TxtFile讀 、寫
FTP讀 、寫
HDFS讀 、寫
Elasticsearch
時間序列數據庫OpenTSDB
TSDB讀 、寫
TDengine讀 、寫

3.數據同步

1.編寫job的json文件

mysql數據抽取到本地內存

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","name","amount",],"connection": [{"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/datax?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"],"table": ["user"]}],"password": "root","username": "root"}},"writer": {"name": "streamwriter","parameter": {"print": false,"encoding": "UTF-8"}}}],"setting": {"speed": {"channel": "1"}}}
}

mysqlWriter數據寫入

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "streamreader","parameter": {"column" : [{"value": "DataX","type": "string"},{"value": 19880808,"type": "long"},{"value": "1988-08-08 08:08:08","type": "date"},{"value": true,"type": "bool"},{"value": "test","type": "bytes"}],"sliceRecordCount": 1000}},"writer": {"name": "mysqlwriter","parameter": {"writeMode": "insert","username": "root","password": "root","column": ["id","name"],"session": ["set session sql_mode='ANSI'"],"preSql": ["delete from test"],"connection": [{"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk","table": ["test"]}]}}}]}
}

mysql數據同步

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","project_code","category"],"connection": [{"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"],"table": ["project_index"]}],"password": "root","username": "root"}},"writer": {"name": "mysqlwriter","parameter": {"column": ["id","project_code","category"],"connection": [{"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai","table": ["project_index"]}],"password": "root","username": "root","writeMode": "update"}}}],"setting": {"speed": {"channel": "1"}}}
}

2.進入bin目錄下,執行文件

需python環境

python datax.py {YOUR_JOB.json}

4.SpringBoot整合DataX生成Job文件并執行

1.準備工作

下載datax,安裝lib下的datax-commondatax-core的jar到本地maven倉庫

依賴

  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.54</version></dependency><dependency><groupId>com.alibaba.datax</groupId><artifactId>datax-core</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>com.alibaba.datax</groupId><artifactId>datax-common</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.21</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency>
spring:datasource:url: jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceserver:port: 8080# datax 相關配置,在生成文件時使用
datax:home: D:/software/datax/# job文件存儲位置save-path: D:/software/datax/job/

屬性配置

/*** datax配置* @author moshangshang*/
@Data
@Component
@ConfigurationProperties("datax")
public class DataXProperties {private String home;private String savePath;}

公共實體

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Content {private Reader reader;private Writer writer;}
@Data
public class DataXJobRoot {private Job job;
}
@Data
public class Job {private List<Content> content;private Setting setting = new Setting();}
@Data
public class Setting {private Speed speed = new Speed();@Datapublic static class Speed {private String channel = "1";}
}
public abstract class Parameter {
}
/***  讀取抽象類*  @author moshangshang*/
@Data
public abstract class Reader {private String name;private Parameter parameter;}
/*** 寫入抽象類*  @author moshangshang*/
@Data
public abstract class Writer {private String name;private Parameter parameter;
}

公共處理接口

public interface DataXInterface {/*** 獲取讀對象*/Reader getReader(String table);/*** 獲取寫對象*/Writer getWriter(String table);/*** 同類型讀取寫入,如mysql到mysql*/String getJobTaskName(String readerTable,String writeTable);/*** 自定義讀取寫入* @param reader 讀處理* @param write 寫處理* @param suffix 文件名*/String getJobTaskName(Reader reader,Writer write, String suffix);}

/*** 接口抽象類* @author moshangshang*/
@Component
public abstract class AbstractDataXHandler implements DataXInterface {@Autowiredprivate DataXProperties dataXProperties;/*** 自定義讀取寫入* @param reader 讀處理* @param write 寫處理* @param suffix 文件名*/@Overridepublic String getJobTaskName(Reader reader, Writer write, String suffix) {DataXJobRoot root = new DataXJobRoot();Job job = new Job();root.setJob(job);Content content = new Content(reader,write);job.setContent(Collections.singletonList(content));String jsonStr = JSONUtil.parse(root).toJSONString(2);String fileName = "datax_job_"+ UUID.randomUUID().toString().replaceAll("-","") +"_"+suffix+".json";File file = FileUtil.file(dataXProperties.getSavePath(),fileName);FileUtil.appendString(jsonStr, file, "utf-8");return fileName;}}

工具方法

@Repository
public class DatabaseInfoRepository {private final JdbcTemplate jdbcTemplate;@Autowiredpublic DatabaseInfoRepository(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}/*** 獲取所有表名*/public List<String> getAllTableNames() {String sql = "SHOW TABLES";return jdbcTemplate.queryForList(sql, String.class);}/*** 根據表名獲取字段信息*/public List<Map<String, Object>> getTableColumns(String tableName) {String sql = "SHOW FULL COLUMNS FROM " + tableName;return jdbcTemplate.queryForList(sql);}
}
@Slf4j
@Service
public class DatabaseInfoService {private final DatabaseInfoRepository databaseInfoRepository;@Autowiredpublic DatabaseInfoService(DatabaseInfoRepository databaseInfoRepository) {this.databaseInfoRepository = databaseInfoRepository;}public void printAllTablesAndColumns() {// 獲取所有表名List<String> tableNames = databaseInfoRepository.getAllTableNames();// 遍歷表名,獲取并打印每個表的字段信息for (String tableName : tableNames) {System.out.println("Table: " + tableName);// 獲取當前表的字段信息List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);// 遍歷字段信息并打印for (Map<String, Object> column : columns) {System.out.println("  Column: " + column.get("Field") + " (Type: " + column.get("Type") + ")" + " (Comment: " + column.get("Comment") + ")");}// 打印空行作為分隔System.out.println();}}/** 查詢指定表的所有字段列表 */public List<String> getColumns(String tableName) {List<String> list = new ArrayList<>();// 獲取當前表的字段信息List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);// 遍歷字段信息并打印for (Map<String, Object> column : columns) {list.add(column.get("Field").toString());}return list;}/** 查詢指定表的所有字段列表,封裝成HdfsWriter格式 */public List<HdfsWriter.Column> getHdfsColumns(String tableName) {List<HdfsWriter.Column> list = new ArrayList<>();// 獲取當前表的字段信息List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);// 遍歷字段信息并打印for (Map<String, Object> column : columns) {String name = column.get("Field").toString();String typeDb = column.get("Type").toString();String type = "string";if (typeDb.equals("bigint")) {type = "bigint";} else if (typeDb.startsWith("varchar")) {type = "string";} else if (typeDb.startsWith("date") || typeDb.endsWith("timestamp")) {type = "date";}HdfsWriter.Column columnHdfs = new HdfsWriter.Column();columnHdfs.setName(name);columnHdfs.setType(type);list.add(columnHdfs);}return list;}
}

datax的job任務json執行方法


/*** 執行器* @author moshangshang*/
@Slf4j
@Component
public class DataXExecuter {@Autowiredprivate DataXProperties dataXProperties;public void run(String fileName) throws IOException {System.setProperty("datax.home", dataXProperties.getHome());String filePath = dataXProperties.getSavePath()+fileName;String dataxJson = JSONUtil.parse(FileUtils.readFileToString(new File(filePath),"UTF-8")).toJSONString(2);log.info("datax log:{}",dataxJson);String[] dataxArgs = {"-job", filePath, "-mode", "standalone", "-jobid", "-1"};try {Engine.entry(dataxArgs);}catch (DataXException e){log.error("執行失敗",e);} catch (Throwable throwable) {log.error("DataX執行異常,error cause::\n" + ExceptionTracker.trace(throwable));}}}
2.文件目錄如圖

在這里插入圖片描述

3.Mysql數據同步

1.編寫mysql讀寫對象,繼承讀寫接口

@Data
public class MysqlReader extends Reader {public String getName() {return "mysqlreader";}@Datapublic static class MysqlParameter extends Parameter {private List<String> column;private List<Connection> connection;private String password;private String username;private String splitPk;private String where;}@Datapublic static class Connection {private List<String> jdbcUrl;private List<String> table;private List<String> querySql;}
}
@EqualsAndHashCode(callSuper = true)
@Data
public class MysqlWriter extends Writer {public String getName() {return "mysqlwriter";}@EqualsAndHashCode(callSuper = true)@Datapublic static class MysqlParameter extends Parameter {private List<String> column;private List<Connection> connection;private String password;private String username;private String writeMode = "update";}@Datapublic static class Connection {private String jdbcUrl;private List<String> table;}
}

2.配置mysql讀和寫的數據庫信息

/*** mysql讀寫配置* @author moshangshang*/
@Data
@ConfigurationProperties("datax.mysql.reader")
public class DataXMysqlReaderProperties {private String url;private String password;private String username;}
/*** mysql讀寫配置* @author moshangshang*/
@Data
@ConfigurationProperties("datax.mysql.writer")
public class DataXMysqlWriterProperties {private String url;private String password;private String username;}
# datax 相關配置,在生成文件時使用
datax:mysql:reader:url: jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghaiusername: rootpassword: rootwriter:url: jdbc:mysql://127.0.0.1:3306/ruoyi_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghaiusername: rootpassword: root

2.編寫mysql處理類,繼承抽象處理接口。生成job文件

/*** mysql讀寫處理* @author moshangshang*/
@Component
@EnableConfigurationProperties({DataXMysqlReaderProperties.class, DataXMysqlWriterProperties.class})
public class MysqlHandler extends AbstractDataXHandler{@Autowiredprivate DatabaseInfoService databaseInfoService;@Autowiredprivate DataXProperties dataXProperties;@Autowiredprivate DataXMysqlReaderProperties dataXMysqlReaderProperties;@Autowiredprivate DataXMysqlWriterProperties dataXMysqlWriterProperties;@Overridepublic Reader getReader(String table) {MysqlReader reader = new MysqlReader();MysqlReader.MysqlParameter readerParameter = new MysqlReader.MysqlParameter();readerParameter.setPassword(dataXMysqlReaderProperties.getPassword());readerParameter.setUsername(dataXMysqlReaderProperties.getUsername());List<String> readerColumns = databaseInfoService.getColumns(table);readerParameter.setColumn(readerColumns);MysqlReader.Connection readerConnection = new MysqlReader.Connection();readerConnection.setJdbcUrl(Collections.singletonList(dataXMysqlReaderProperties.getUrl()));readerConnection.setTable(Collections.singletonList(table));readerParameter.setConnection(Collections.singletonList(readerConnection));reader.setParameter(readerParameter);return reader;}@Overridepublic Writer getWriter(String table) {MysqlWriter writer = new MysqlWriter();MysqlWriter.MysqlParameter writerParameter = new MysqlWriter.MysqlParameter();writerParameter.setPassword(dataXMysqlWriterProperties.getPassword());writerParameter.setUsername(dataXMysqlWriterProperties.getUsername());List<String> columns = databaseInfoService.getColumns(table);writerParameter.setColumn(columns);MysqlWriter.Connection connection = new MysqlWriter.Connection();connection.setJdbcUrl(dataXMysqlWriterProperties.getUrl());connection.setTable(Collections.singletonList(table));writerParameter.setConnection(Collections.singletonList(connection));writer.setParameter(writerParameter);return writer;}@Overridepublic String getJobTaskName(String readerTable,String writeTable) {DataXJobRoot root = new DataXJobRoot();Job job = new Job();root.setJob(job);Content content = new Content(getReader(readerTable),getWriter(writeTable));job.setContent(Collections.singletonList(content));String jsonStr = JSONUtil.parse(root).toJSONString(2);String fileName = "datax_job_"+ UUID.randomUUID().toString().replaceAll("-","") +"_h2h.json";File file = FileUtil.file(dataXProperties.getSavePath(),fileName);FileUtil.appendString(jsonStr, file, "utf-8");return fileName;}
}

3.調用執行器,執行任務job

@SpringBootTest
public class DataxTest {@Autowiredprivate MysqlHandler mysqlHandler;@Autowiredprivate DataXExecuter dataXExecuter;/*** 讀t_user表同步到user*/@Testpublic void test() throws IOException {String jobTask = mysqlHandler.getJobTaskName("t_user", "user");dataXExecuter.run(jobTask);}/*** 直接執行json文件*/@Testpublic void test2() throws IOException {dataXExecuter.run("datax_job_83798b5f1766406289a44fe681dc8878_m2m.json");}
}

4.執行結果

在這里插入圖片描述

4.Elasticsearch寫入Mysql數據

注意事項:

  • es目前只支持寫入不支持讀取
  • mysql數據寫入es時,需保證es與mysql的列數column相同,不支持類似mysql的部分字段寫入
  • 需保證列的順序相同,寫入時不會根據name名稱字段去自動對應,如果順序不一致,則可能會轉換錯誤。如id,name,寫入name,id

原理:使用elasticsearch的rest api接口, 批量把從reader讀入的數據寫入elasticsearch

創建es索引映射

PUT datax_data
{"mappings": {"properties": {"name":{"type": "keyword"},"amount":{"type": "long"}}}
}

1.添加es配置和文件

spring:elasticsearch:#username:#password:#path-prefix:uris: http://127.0.0.1:9200#連接elasticsearch超時時間connection-timeout: 60000socket-timeout: 30000
# datax 相關配置,在生成文件時使用
datax:elasticsearch:writer:url: http://127.0.0.1:9200username:password:

/*** es寫配置* @author moshangshang*/
@Data
@ConfigurationProperties("datax.elasticsearch.writer")
public class DataXElasticSearchProperties {private String url;private String username;private String password;}

2.編寫生成job文件實體類

@EqualsAndHashCode(callSuper = true)
@Data
public class ElasticSearchWriter extends Writer {public String getName() {return "elasticsearchwriter";}@EqualsAndHashCode(callSuper = true)@Datapublic static class ElasticSearchParameter extends Parameter {private List<Column> column;private String endpoint;private String accessId;private String accessKey;private String index;private Settings settings;private String type = "default";private boolean cleanup = true;private boolean discovery = false;private Integer batchSize = 1000;private String splitter = ",";}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Column {private String name;private String type;private String analyzer;}@Datapublic static class Settings {private Map<String,Object> index;}}

3.es接口擴展

/*** es接口擴展* @author moshangshang*/
public interface DataXElasticsearchInterface extends DataXInterface {Writer getWriter(String table, Map<String,Object> indexSettings);}

4.es核心處理類

@Component
@EnableConfigurationProperties({DataXElasticSearchProperties.class})
public class ElasticSearchHandler extends AbstractDataXHandler implements DataXElasticsearchInterface{@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Autowiredprivate DataXElasticSearchProperties dataXElasticSearchProperties;@Overridepublic Reader getReader(String table) {return null;}/*** 普通寫入* @param index 索引* @return Writer*/@Overridepublic Writer getWriter(String index) {ElasticSearchWriter writer = new ElasticSearchWriter();ElasticSearchWriter.ElasticSearchParameter writerParameter = getElasticSearchWriter(index);writer.setParameter(writerParameter);return writer;}@Overridepublic String getJobTaskName(String readerTable, String writeTable) {return null;}/*** es寫入,帶setting設置*/@Overridepublic Writer getWriter(String index,Map<String,Object> map) {ElasticSearchWriter writer = new ElasticSearchWriter();ElasticSearchWriter.ElasticSearchParameter writerParameter = getElasticSearchWriter(index);ElasticSearchWriter.Settings settings = new ElasticSearchWriter.Settings();settings.setIndex(map);writerParameter.setSettings(settings);writer.setParameter(writerParameter);return writer;}/*** 獲取公共寫入參數*/public ElasticSearchWriter.ElasticSearchParameter getElasticSearchWriter(String index){ElasticSearchWriter.ElasticSearchParameter writerParameter = new ElasticSearchWriter.ElasticSearchParameter();List<Column> columns = getEsColumns(index);writerParameter.setColumn(columns);writerParameter.setEndpoint(dataXElasticSearchProperties.getUrl());writerParameter.setAccessId(dataXElasticSearchProperties.getUsername());writerParameter.setAccessKey(dataXElasticSearchProperties.getPassword());writerParameter.setIndex(index);return writerParameter;}/*** 獲取指定索引的映射字段* 讀取時和創建順序相反*/public List<ElasticSearchWriter.Column> getEsColumns(String index){List<ElasticSearchWriter.Column> columns = new ArrayList<>();// 獲取操作的索引文檔對象IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(index));Map<String, Object> mapping = indexOperations.getMapping();mapping.forEach((k,value) ->{JSONObject json = JSON.parseObject(JSONObject.toJSONString(value));for (Map.Entry<String, Object> entry : json.entrySet()) {String key = entry.getKey();JSONObject properties = JSON.parseObject(JSONObject.toJSONString(entry.getValue()));String type = properties.getString("type");String analyzer = properties.getString("analyzer");columns.add(new ElasticSearchWriter.Column(key,type,analyzer));}});return columns;}}

5.測試

    @Testpublic void test3() throws IOException {Map<String,Object> settings = new HashMap<>();settings.put("number_of_shards",1);settings.put("number_of_replicas",1);String jobTask = elasticSearchHandler.getJobTaskName(mysqlHandler.getReader("t_user"), elasticSearchHandler.getWriter("datax_data",settings),"m2e");dataXExecuter.run(jobTask);}

5.Job文件參數說明

1.MysqlReader

參數名描述必選
jdbcUrl對端數據庫的JDBC連接信息并支持一個庫填寫多個連接地址。之所以使用JSON數組描述連接信息,是因為阿里集團內部支持多個IP探測,如果配置了多個,MysqlReader可以依次探測ip的可連接性,直到選擇一個合法的IP,如果全部連接失敗,MysqlReader報錯。 注意,jdbcUrl必須包含在connection配置單元中。對于阿里外部使用情況,JSON數組填寫一個JDBC連接即可。
username數據源的用戶名
password數據源指定用戶名的密碼
table所選取的需要同步的表。支持多張表同時抽取,用戶自己需保證多張表是同一schema結構,注意,table必須包含在connection配置單元中。
column所配置的表中需要同步的列名集合,使用JSON的數組描述字段信息。用戶使用代表默認使用所有列配置,例如['']。
splitPk分區主鍵,DataX因此會啟動并發任務進行數據同步。推薦splitPk用戶使用表主鍵
where篩選條件,MysqlReader根據指定的column、table、where條件拼接SQL,并根據這個SQL進行數據抽取。注意:limit不是SQL的合法where子句。where條件可以有效地進行業務增量同步。如果不填寫where語句,包括不提供where的key或者value,均視作同步全量數據。
querySql查詢SQL同步。當用戶配置了這一項之后,當用戶配置querySql時,MysqlReader直接忽略table、column、where條件的配置,querySql優先級大于table、column、where選項。

2.MysqlWriter

參數名描述必選
jdbcUrl目的數據庫的 JDBC 連接信息。作業運行時,DataX 會在你提供的 jdbcUrl 后面追加如下屬性:yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true;在一個數據庫上只能配置一個 jdbcUrl 值。這與 MysqlReader 支持多個備庫探測不同,因為此處不支持同一個數據庫存在多個主庫的情況(雙主導入數據情況)
username數據源的用戶名
password數據源指定用戶名的密碼
table目的表的表名稱。支持寫入一個或者多個表。當配置為多張表時,必須確保所有表結構保持一致。table 和 jdbcUrl 必須包含在 connection 配置單元中
column目的表需要寫入數據的字段,例如: “column”: [“id”,“name”,“age”]。如果要依次寫入全部列,使用*表示, 例如: "column": ["*"]
sessionDataX在獲取Mysql連接時,執行session指定的SQL語句,修改當前connection session屬性
preSql寫入數據到目的表前,會先執行這里的標準語句。如果 Sql 中有你需要操作到的表名稱,請使用 @table 表示,這樣在實際執行 Sql 語句時,會對變量按照實際表名稱進行替換。比如你的任務是要寫入到目的端的100個同構分表(表名稱為:datax_00,datax01, … datax_98,datax_99),并且你希望導入數據前,先對表中數據進行刪除操作,那么你可以這樣配置:"preSql":["delete from 表名"],效果是:在執行到每個表寫入數據前,會先執行對應的 delete from 對應表名稱
postSql寫入數據到目的表后,會執行這里的標準語句
writeMode控制寫入數據到目標表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 語句;可選:insert/replace/update,默認insert
batchSize一次性批量提交的記錄數大小,該值可以極大減少DataX與Mysql的網絡交互次數,并提升整體吞吐量。但是該值設置過大可能會造成DataX運行進程OOM情況。默認:1024

3.ElasticsearchWriter

參數名描述必選
endpointElasticSearch的連接地址
accessIdhttp auth中的user
accessKeyhttp auth中的password
indexelasticsearch中的index名
typeelasticsearch中index的type名,默認index名
cleanup是否刪除原表, 默認值:false
batchSize每次批量數據的條數,默認值:1000
trySize失敗后重試的次數, 默認值:30
timeout客戶端超時時間,默認值:600000
discovery啟用節點發現將(輪詢)并定期更新客戶機中的服務器列表。默認false
compressionhttp請求,開啟壓縮,默認true
multiThreadhttp請求,是否有多線程,默認true
ignoreWriteError忽略寫入錯誤,不重試,繼續寫入,默認false
alias數據導入完成后寫入別名
aliasMode數據導入完成后增加別名的模式,append(增加模式), exclusive(只留這一個),默認append
settings創建index時候的settings, 與elasticsearch官方相同
splitter如果插入數據是array,就使用指定分隔符,默認值:-,-
columnelasticsearch所支持的字段類型,樣例中包含了全部
dynamic不使用datax的mappings,使用es自己的自動mappings,默認值: false

參考資料:https://blog.csdn.net/wlddhj/article/details/137585979

Github主頁地址:https://github.com/alibaba/DataX

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/42327.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/42327.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/42327.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

生產力工具|VS Code安裝及使用指南

一、VS Code介紹 &#xff08;一&#xff09;軟件介紹 Visual Studio Code&#xff08;簡稱VS Code&#xff09;是由Microsoft開發的免費開源代碼編輯器&#xff0c;適用于Windows、macOS和Linux操作系統。它支持多種編程語言&#xff0c;如JavaScript、Python、C等&#xff0…

知識社區在線提問小程序模板源碼

藍色的知識問答&#xff0c;問答交流&#xff0c;知識社區&#xff0c;在線提問手機app小程序網頁模板。包含&#xff1a;社區主頁、提問、我的、綁定手機&#xff0c;實名認證等。 知識社區在線提問小程序模板源碼

ubuntu 檢查硬盤的通電時長、健康度

ubuntu 檢查硬盤的通電時長、健康度 在Ubuntu系統中&#xff0c;檢查硬盤的通電時長和健康度通常需要使用SMART&#xff08;Self-Monitoring, Analysis, and Reporting Technology&#xff09;工具。SMART是硬盤制造商內置的一套硬盤保護技術&#xff0c;用于監控硬盤的健康狀況…

品質至上!中國星坤連接器的發展之道!

在電子連接技術領域&#xff0c;中國星坤以其卓越的創新能力和對品質的不懈追求&#xff0c;贏得了業界的廣泛認可。憑借在高精度連接器設計和制造上的領先地位&#xff0c;星坤不僅獲得了多項實用新型專利&#xff0c;更通過一系列國際質量管理體系認證&#xff0c;彰顯了其產…

【Qt5.12.9】程序無法顯示照片問題(已解決)

問題記錄&#xff1a;Qt5.12.9下無法顯示照片 我的工程名為03_qpainter&#xff0c;照片cd.png存放在工程目錄下的image文件夾中。 /03_qpainter/image/cd.png 因為這是正點原子Linux下Qt書籍中的例程&#xff0c;在通過學習其配套的例程中的項目&#xff0c;發現我的項目少…

【Python】搭建屬于自己 AI 機器人

目錄 前言 1 準備工作 1.1 環境搭建 1.2 獲取 API KEY 2 寫代碼 2.1 引用庫 2.2 創建用戶 2.3 創建對話 2.4 輸出內容 2.5 調試 2.6 全部代碼 2.7 簡短的總結 3 優化代碼 3.1 規范代碼 3.1.1 引用庫 3.1.2 創建提示詞 3.1.3 創建模型 3.1.4 規范輸出&#xf…

在線調試網絡接口的免費網站

免費接口網站 GET接口 https://httpbin.org/get https://httpbin.org/ip https://publicobject.com/helloworld.txt POST接口 https://httpbin.org/post 調試網站 Postman需要下載安裝&#xff0c;還要登錄賬號。對于簡單測試&#xff0c;麻煩&#xff01; http://coolaf.…

西門子1200高速計數器編碼器的應用 接線 組態 編程 調試 測距測速

編碼器的應用、接線、組態、博途1200編程與調試&#xff1a;高速計數器&#xff0c;用于給PLC發高速脈沖&#xff0c;接I點 用來例如&#xff1a;檢測電機轉速&#xff0c;皮帶輸送機運行的距離 &#xff08;粗略定位&#xff09; 360&#xff1a;代表轉一圈會對外發360個脈沖&…

系統化學習 H264視頻編碼(02) I幀 P幀 B幀 引入及相關概念解讀

說明&#xff1a;我們參考黃金圈學習法&#xff08;什么是黃金圈法則?->模型 黃金圈法則&#xff0c;本文使用&#xff1a;why-what&#xff09;來學習音H264視頻編碼。本系列文章側重于理解視頻編碼的知識體系和實踐方法&#xff0c;理論方面會更多地講清楚 音視頻中概念的…

Python類實例的json

web開發中有這么一個場景&#xff0c;我們從數據庫中查詢某一數據的時候&#xff0c;往往需要對數據進行一些轉化之后才能傳給前端。 當然我們可以根據查詢出來的實例對象&#xff0c;構建一個dict返回&#xff0c;這樣會導致我們的代碼非常的臃腫。但是這也確實是一種最直接的…

網絡空間測繪是什么?

網絡空間測繪是一種技術過程&#xff0c;用于探測、分析和可視化互聯網及其他網絡環境中的各種資源和連接。這個概念在2016年開始廣泛使用&#xff0c;它涉及到收集有關網絡節點&#xff08;如服務器、路由器、個人電腦和其他設備&#xff09;的信息&#xff0c;并建立這些節點…

C++ STL 多線程庫用法介紹

目錄 一:Atomic: 二:Thread 1. 創建線程 2. 小心移動(std::move)線程 3. 如何創建帶參數的線程 4. 線程參數是引用類型時,要小心謹慎。 5. 獲取線程ID 6. jthread 7. 如何在線程中使用中斷 stop_token 三:如何解決數據競爭 1.有問題的代碼 2.使用互斥 3.預防…

Vue3+.NET6前后端分離式管理后臺實戰(二十八)

1&#xff0c;Vue3.NET6前后端分離式管理后臺實戰(二十八)

【Linux進階】文件系統6——理解文件操作

目錄 1.文件的讀取 1.1.目錄 1.2.文件 1.3.目錄樹讀取 1.4.文件系統大小與磁盤讀取性能 2.增添文件 2.1.數據的不一致&#xff08;Inconsistent&#xff09;狀態 2.2.日志式文件系統&#xff08;Journaling filesystem&#xff09; 3.Linux文件系統的運行 4、文件的刪…

動態規劃算法-以中學排班管理系統為例

1.動態規劃算法介紹 1.算法思路 動態規劃算法通常用于求解具有某種最優性質的問題。在這類問題中&#xff0c;可能會有許多可行解。每一個解都對應于一個值&#xff0c;我們希望找到具有最優值的解。動態規劃算法與分治法類似&#xff0c;其基本思想也是將待求解問題分解成若…

干貨 | 2024大模型場景下智算平臺的設計與優化實踐(免費下載)

誠摯邀請您微信掃描以下二維碼加入方案驛站知識星球&#xff0c;獲取上萬份PPT/WORD解決方案&#xff01;&#xff01;&#xff01;感謝支持&#xff01;&#xff01;&#xff01;

android pdf框架-11,查看圖片

前10篇文章,9章關于pdf的,pdf解析后,里面也是有各種圖片,于是利用pdf的view來展示圖片,似乎也是個不錯的想法. android手機中的圖片查看功能,有的可以展示,有的不能.比如華為,榮耀對大體積的png是可以顯示的,小米是不顯示,只有縮略圖. 一張png50m大,比如清明上河圖,原圖是tif…

【C++】string的底層原理及實現

文章目錄 string類的存儲結構默認成員函數構造函數析構函數拷貝構造函數賦值重載 容量操作size()capacity()reserve()resize()clear() 遍歷與訪問operator[ ]迭代器范圍與for 增刪查改push_back()pop_back()append()operatorinsert()erase()c_str()find()substr() 非成員函數op…

c#的List<T>的SelectMany 和Select

在C#中&#xff0c;List<T>&#xff08;以及任何實現了IEnumerable<T>的集合&#xff09;的Select和SelectMany擴展方法都是LINQ&#xff08;Language Integrated Query&#xff09;的一部分&#xff0c;用于對集合中的元素進行查詢和轉換。 盡管它們的作用有些相…

Virtualbox和ubuntu之間的關系

1、什么是ubuntu Ubuntu 是一個類似于 Windows 的操作系統&#xff0c;但它是基于 Linux 內核開發的開源操作系統 2、什么是Virtualbox VirtualBox 是一款虛擬機軟件&#xff0c;使我們可以物理機上創建和運行虛擬機 也就是說,VirtualBox 提供了一個可以安裝和運行其他操作系…