SpringBoot整合Datax數據同步
文章目錄
- SpringBoot整合Datax數據同步
- 1.簡介
- 設計理念
- DataX3.0框架設計
- DataX3.0核心架構
- 核心模塊介紹
- DataX調度流程
- 2.DataX3.0插件體系
- 3.數據同步
- 1.編寫job的json文件
- 2.進入bin目錄下,執行文件
- 4.SpringBoot整合DataX生成Job文件并執行
- 1.準備工作
- 2.文件目錄如圖
- 3.Mysql數據同步
- 4.Elasticsearch寫入Mysql數據
- 5.Job文件參數說明
- 1.MysqlReader
- 2.MysqlWriter
- 3.ElasticsearchWriter
1.簡介
DataX 是阿里云 DataWorks數據集成 的開源版本,在阿里巴巴集團內被廣泛使用的離線數據同步工具/平臺。DataX 實現了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各種異構數據源之間高效的數據同步功能。
DataX 是一個異構數據源離線同步工具,致力于實現各種異構數據源之間穩定高效的數據同步功能。
Download DataX下載地址
Github主頁地址:https://github.com/alibaba/DataX
請點擊:Quick Start
-
設計理念
為了解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。
DataX3.0框架設計
DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
- Reader:Reader為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
- Writer: Writer為數據寫入模塊,負責不斷向Framework取數據,并將數據寫入到目的端。
- Framework:Framework用于連接reader和writer,作為兩者的數據傳輸通道,并處理緩沖,流控,并發,數據轉換等核心技術問題。
DataX3.0核心架構
DataX 3.0 開源版本支持單機多線程模式完成同步作業運行,本小節按一個DataX作業生命周期的時序圖,從整體架構設計非常簡要說明DataX各個模塊相互關系。
核心模塊介紹
- DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
- DataXJob啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便于并發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
- 切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的并發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的并發運行完畢分配好的所有Task,默認單個任務組的并發數量為5。
- 每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
- DataX作業運行起來之后, Job監控并等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0
DataX調度流程
舉例來說,用戶提交了一個DataX作業,并且配置了20個并發,目的是將一個100張分表的mysql數據同步到odps里面。 DataX的調度決策思路是:
- DataXJob根據分庫分表切分成了100個Task。
- 根據20個并發,DataX計算共需要分配4個TaskGroup。
- 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個并發共計運行25個Task。
2.DataX3.0插件體系
DataX目前已經有了比較全面的插件體系,主流的RDBMS數據庫、NOSQL、大數據計算系統都已經接入,目前支持數據如下圖,詳情請點擊:DataX數據源參考指南
類型 | 數據源 | Reader(讀) | Writer(寫) | 文檔 |
---|---|---|---|---|
RDBMS 關系型數據庫 | MySQL | √ | √ | 讀 、寫 |
Oracle | √ | √ | 讀 、寫 | |
OceanBase | √ | √ | 讀 、寫 | |
SQLServer | √ | √ | 讀 、寫 | |
PostgreSQL | √ | √ | 讀 、寫 | |
DRDS | √ | √ | 讀 、寫 | |
Kingbase | √ | √ | 讀 、寫 | |
通用RDBMS(支持所有關系型數據庫) | √ | √ | 讀 、寫 | |
阿里云數倉數據存儲 | ODPS | √ | √ | 讀 、寫 |
ADB | √ | 寫 | ||
ADS | √ | 寫 | ||
OSS | √ | √ | 讀 、寫 | |
OCS | √ | 寫 | ||
Hologres | √ | 寫 | ||
AnalyticDB For PostgreSQL | √ | 寫 | ||
阿里云中間件 | datahub | √ | √ | 讀 、寫 |
SLS | √ | √ | 讀 、寫 | |
圖數據庫 | 阿里云 GDB | √ | √ | 讀 、寫 |
Neo4j | √ | 寫 | ||
NoSQL數據存儲 | OTS | √ | √ | 讀 、寫 |
Hbase0.94 | √ | √ | 讀 、寫 | |
Hbase1.1 | √ | √ | 讀 、寫 | |
Phoenix4.x | √ | √ | 讀 、寫 | |
Phoenix5.x | √ | √ | 讀 、寫 | |
MongoDB | √ | √ | 讀 、寫 | |
Cassandra | √ | √ | 讀 、寫 | |
數倉數據存儲 | StarRocks | √ | √ | 讀 、寫 |
ApacheDoris | √ | 寫 | ||
ClickHouse | √ | √ | 讀 、寫 | |
Databend | √ | 寫 | ||
Hive | √ | √ | 讀 、寫 | |
kudu | √ | 寫 | ||
selectdb | √ | 寫 | ||
無結構化數據存儲 | TxtFile | √ | √ | 讀 、寫 |
FTP | √ | √ | 讀 、寫 | |
HDFS | √ | √ | 讀 、寫 | |
Elasticsearch | √ | 寫 | ||
時間序列數據庫 | OpenTSDB | √ | 讀 | |
TSDB | √ | √ | 讀 、寫 | |
TDengine | √ | √ | 讀 、寫 |
3.數據同步
1.編寫job的json文件
mysql數據抽取到本地內存
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","name","amount",],"connection": [{"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/datax?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"],"table": ["user"]}],"password": "root","username": "root"}},"writer": {"name": "streamwriter","parameter": {"print": false,"encoding": "UTF-8"}}}],"setting": {"speed": {"channel": "1"}}}
}
mysqlWriter數據寫入
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "streamreader","parameter": {"column" : [{"value": "DataX","type": "string"},{"value": 19880808,"type": "long"},{"value": "1988-08-08 08:08:08","type": "date"},{"value": true,"type": "bool"},{"value": "test","type": "bytes"}],"sliceRecordCount": 1000}},"writer": {"name": "mysqlwriter","parameter": {"writeMode": "insert","username": "root","password": "root","column": ["id","name"],"session": ["set session sql_mode='ANSI'"],"preSql": ["delete from test"],"connection": [{"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk","table": ["test"]}]}}}]}
}
mysql數據同步
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","project_code","category"],"connection": [{"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"],"table": ["project_index"]}],"password": "root","username": "root"}},"writer": {"name": "mysqlwriter","parameter": {"column": ["id","project_code","category"],"connection": [{"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai","table": ["project_index"]}],"password": "root","username": "root","writeMode": "update"}}}],"setting": {"speed": {"channel": "1"}}}
}
2.進入bin目錄下,執行文件
需python環境
python datax.py {YOUR_JOB.json}
4.SpringBoot整合DataX生成Job文件并執行
1.準備工作
下載datax,安裝lib下的datax-common和datax-core的jar到本地maven倉庫
依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.54</version></dependency><dependency><groupId>com.alibaba.datax</groupId><artifactId>datax-core</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>com.alibaba.datax</groupId><artifactId>datax-common</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.21</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency>
spring:datasource:url: jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceserver:port: 8080# datax 相關配置,在生成文件時使用
datax:home: D:/software/datax/# job文件存儲位置save-path: D:/software/datax/job/
屬性配置
/*** datax配置* @author moshangshang*/
@Data
@Component
@ConfigurationProperties("datax")
public class DataXProperties {private String home;private String savePath;}
公共實體
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Content {private Reader reader;private Writer writer;}
@Data
public class DataXJobRoot {private Job job;
}
@Data
public class Job {private List<Content> content;private Setting setting = new Setting();}
@Data
public class Setting {private Speed speed = new Speed();@Datapublic static class Speed {private String channel = "1";}
}
public abstract class Parameter {
}
/*** 讀取抽象類* @author moshangshang*/
@Data
public abstract class Reader {private String name;private Parameter parameter;}
/*** 寫入抽象類* @author moshangshang*/
@Data
public abstract class Writer {private String name;private Parameter parameter;
}
公共處理接口
public interface DataXInterface {/*** 獲取讀對象*/Reader getReader(String table);/*** 獲取寫對象*/Writer getWriter(String table);/*** 同類型讀取寫入,如mysql到mysql*/String getJobTaskName(String readerTable,String writeTable);/*** 自定義讀取寫入* @param reader 讀處理* @param write 寫處理* @param suffix 文件名*/String getJobTaskName(Reader reader,Writer write, String suffix);}
/*** 接口抽象類* @author moshangshang*/
@Component
public abstract class AbstractDataXHandler implements DataXInterface {@Autowiredprivate DataXProperties dataXProperties;/*** 自定義讀取寫入* @param reader 讀處理* @param write 寫處理* @param suffix 文件名*/@Overridepublic String getJobTaskName(Reader reader, Writer write, String suffix) {DataXJobRoot root = new DataXJobRoot();Job job = new Job();root.setJob(job);Content content = new Content(reader,write);job.setContent(Collections.singletonList(content));String jsonStr = JSONUtil.parse(root).toJSONString(2);String fileName = "datax_job_"+ UUID.randomUUID().toString().replaceAll("-","") +"_"+suffix+".json";File file = FileUtil.file(dataXProperties.getSavePath(),fileName);FileUtil.appendString(jsonStr, file, "utf-8");return fileName;}}
工具方法
@Repository
public class DatabaseInfoRepository {private final JdbcTemplate jdbcTemplate;@Autowiredpublic DatabaseInfoRepository(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}/*** 獲取所有表名*/public List<String> getAllTableNames() {String sql = "SHOW TABLES";return jdbcTemplate.queryForList(sql, String.class);}/*** 根據表名獲取字段信息*/public List<Map<String, Object>> getTableColumns(String tableName) {String sql = "SHOW FULL COLUMNS FROM " + tableName;return jdbcTemplate.queryForList(sql);}
}
@Slf4j
@Service
public class DatabaseInfoService {private final DatabaseInfoRepository databaseInfoRepository;@Autowiredpublic DatabaseInfoService(DatabaseInfoRepository databaseInfoRepository) {this.databaseInfoRepository = databaseInfoRepository;}public void printAllTablesAndColumns() {// 獲取所有表名List<String> tableNames = databaseInfoRepository.getAllTableNames();// 遍歷表名,獲取并打印每個表的字段信息for (String tableName : tableNames) {System.out.println("Table: " + tableName);// 獲取當前表的字段信息List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);// 遍歷字段信息并打印for (Map<String, Object> column : columns) {System.out.println(" Column: " + column.get("Field") + " (Type: " + column.get("Type") + ")" + " (Comment: " + column.get("Comment") + ")");}// 打印空行作為分隔System.out.println();}}/** 查詢指定表的所有字段列表 */public List<String> getColumns(String tableName) {List<String> list = new ArrayList<>();// 獲取當前表的字段信息List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);// 遍歷字段信息并打印for (Map<String, Object> column : columns) {list.add(column.get("Field").toString());}return list;}/** 查詢指定表的所有字段列表,封裝成HdfsWriter格式 */public List<HdfsWriter.Column> getHdfsColumns(String tableName) {List<HdfsWriter.Column> list = new ArrayList<>();// 獲取當前表的字段信息List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);// 遍歷字段信息并打印for (Map<String, Object> column : columns) {String name = column.get("Field").toString();String typeDb = column.get("Type").toString();String type = "string";if (typeDb.equals("bigint")) {type = "bigint";} else if (typeDb.startsWith("varchar")) {type = "string";} else if (typeDb.startsWith("date") || typeDb.endsWith("timestamp")) {type = "date";}HdfsWriter.Column columnHdfs = new HdfsWriter.Column();columnHdfs.setName(name);columnHdfs.setType(type);list.add(columnHdfs);}return list;}
}
datax的job任務json執行方法
/*** 執行器* @author moshangshang*/
@Slf4j
@Component
public class DataXExecuter {@Autowiredprivate DataXProperties dataXProperties;public void run(String fileName) throws IOException {System.setProperty("datax.home", dataXProperties.getHome());String filePath = dataXProperties.getSavePath()+fileName;String dataxJson = JSONUtil.parse(FileUtils.readFileToString(new File(filePath),"UTF-8")).toJSONString(2);log.info("datax log:{}",dataxJson);String[] dataxArgs = {"-job", filePath, "-mode", "standalone", "-jobid", "-1"};try {Engine.entry(dataxArgs);}catch (DataXException e){log.error("執行失敗",e);} catch (Throwable throwable) {log.error("DataX執行異常,error cause::\n" + ExceptionTracker.trace(throwable));}}}
2.文件目錄如圖
3.Mysql數據同步
1.編寫mysql讀寫對象,繼承讀寫接口
@Data
public class MysqlReader extends Reader {public String getName() {return "mysqlreader";}@Datapublic static class MysqlParameter extends Parameter {private List<String> column;private List<Connection> connection;private String password;private String username;private String splitPk;private String where;}@Datapublic static class Connection {private List<String> jdbcUrl;private List<String> table;private List<String> querySql;}
}
@EqualsAndHashCode(callSuper = true)
@Data
public class MysqlWriter extends Writer {public String getName() {return "mysqlwriter";}@EqualsAndHashCode(callSuper = true)@Datapublic static class MysqlParameter extends Parameter {private List<String> column;private List<Connection> connection;private String password;private String username;private String writeMode = "update";}@Datapublic static class Connection {private String jdbcUrl;private List<String> table;}
}
2.配置mysql讀和寫的數據庫信息
/*** mysql讀寫配置* @author moshangshang*/
@Data
@ConfigurationProperties("datax.mysql.reader")
public class DataXMysqlReaderProperties {private String url;private String password;private String username;}
/*** mysql讀寫配置* @author moshangshang*/
@Data
@ConfigurationProperties("datax.mysql.writer")
public class DataXMysqlWriterProperties {private String url;private String password;private String username;}
# datax 相關配置,在生成文件時使用
datax:mysql:reader:url: jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghaiusername: rootpassword: rootwriter:url: jdbc:mysql://127.0.0.1:3306/ruoyi_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghaiusername: rootpassword: root
2.編寫mysql處理類,繼承抽象處理接口。生成job文件
/*** mysql讀寫處理* @author moshangshang*/
@Component
@EnableConfigurationProperties({DataXMysqlReaderProperties.class, DataXMysqlWriterProperties.class})
public class MysqlHandler extends AbstractDataXHandler{@Autowiredprivate DatabaseInfoService databaseInfoService;@Autowiredprivate DataXProperties dataXProperties;@Autowiredprivate DataXMysqlReaderProperties dataXMysqlReaderProperties;@Autowiredprivate DataXMysqlWriterProperties dataXMysqlWriterProperties;@Overridepublic Reader getReader(String table) {MysqlReader reader = new MysqlReader();MysqlReader.MysqlParameter readerParameter = new MysqlReader.MysqlParameter();readerParameter.setPassword(dataXMysqlReaderProperties.getPassword());readerParameter.setUsername(dataXMysqlReaderProperties.getUsername());List<String> readerColumns = databaseInfoService.getColumns(table);readerParameter.setColumn(readerColumns);MysqlReader.Connection readerConnection = new MysqlReader.Connection();readerConnection.setJdbcUrl(Collections.singletonList(dataXMysqlReaderProperties.getUrl()));readerConnection.setTable(Collections.singletonList(table));readerParameter.setConnection(Collections.singletonList(readerConnection));reader.setParameter(readerParameter);return reader;}@Overridepublic Writer getWriter(String table) {MysqlWriter writer = new MysqlWriter();MysqlWriter.MysqlParameter writerParameter = new MysqlWriter.MysqlParameter();writerParameter.setPassword(dataXMysqlWriterProperties.getPassword());writerParameter.setUsername(dataXMysqlWriterProperties.getUsername());List<String> columns = databaseInfoService.getColumns(table);writerParameter.setColumn(columns);MysqlWriter.Connection connection = new MysqlWriter.Connection();connection.setJdbcUrl(dataXMysqlWriterProperties.getUrl());connection.setTable(Collections.singletonList(table));writerParameter.setConnection(Collections.singletonList(connection));writer.setParameter(writerParameter);return writer;}@Overridepublic String getJobTaskName(String readerTable,String writeTable) {DataXJobRoot root = new DataXJobRoot();Job job = new Job();root.setJob(job);Content content = new Content(getReader(readerTable),getWriter(writeTable));job.setContent(Collections.singletonList(content));String jsonStr = JSONUtil.parse(root).toJSONString(2);String fileName = "datax_job_"+ UUID.randomUUID().toString().replaceAll("-","") +"_h2h.json";File file = FileUtil.file(dataXProperties.getSavePath(),fileName);FileUtil.appendString(jsonStr, file, "utf-8");return fileName;}
}
3.調用執行器,執行任務job
@SpringBootTest
public class DataxTest {@Autowiredprivate MysqlHandler mysqlHandler;@Autowiredprivate DataXExecuter dataXExecuter;/*** 讀t_user表同步到user*/@Testpublic void test() throws IOException {String jobTask = mysqlHandler.getJobTaskName("t_user", "user");dataXExecuter.run(jobTask);}/*** 直接執行json文件*/@Testpublic void test2() throws IOException {dataXExecuter.run("datax_job_83798b5f1766406289a44fe681dc8878_m2m.json");}
}
4.執行結果
4.Elasticsearch寫入Mysql數據
注意事項:
- es目前只支持寫入
不支持讀取
- mysql數據寫入es時,需保證es與mysql的
列數column相同
,不支持類似mysql的部分字段寫入 需保證列的順序相同
,寫入時不會根據name名稱字段去自動對應,如果順序不一致,則可能會轉換錯誤。如id,name,寫入name,id
原理:使用elasticsearch的rest api
接口, 批量把從reader讀入的數據寫入elasticsearch
創建es索引映射
PUT datax_data
{"mappings": {"properties": {"name":{"type": "keyword"},"amount":{"type": "long"}}}
}
1.添加es配置和文件
spring:elasticsearch:#username:#password:#path-prefix:uris: http://127.0.0.1:9200#連接elasticsearch超時時間connection-timeout: 60000socket-timeout: 30000
# datax 相關配置,在生成文件時使用
datax:elasticsearch:writer:url: http://127.0.0.1:9200username:password:
/*** es寫配置* @author moshangshang*/
@Data
@ConfigurationProperties("datax.elasticsearch.writer")
public class DataXElasticSearchProperties {private String url;private String username;private String password;}
2.編寫生成job文件實體類
@EqualsAndHashCode(callSuper = true)
@Data
public class ElasticSearchWriter extends Writer {public String getName() {return "elasticsearchwriter";}@EqualsAndHashCode(callSuper = true)@Datapublic static class ElasticSearchParameter extends Parameter {private List<Column> column;private String endpoint;private String accessId;private String accessKey;private String index;private Settings settings;private String type = "default";private boolean cleanup = true;private boolean discovery = false;private Integer batchSize = 1000;private String splitter = ",";}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Column {private String name;private String type;private String analyzer;}@Datapublic static class Settings {private Map<String,Object> index;}}
3.es接口擴展
/*** es接口擴展* @author moshangshang*/
public interface DataXElasticsearchInterface extends DataXInterface {Writer getWriter(String table, Map<String,Object> indexSettings);}
4.es核心處理類
@Component
@EnableConfigurationProperties({DataXElasticSearchProperties.class})
public class ElasticSearchHandler extends AbstractDataXHandler implements DataXElasticsearchInterface{@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Autowiredprivate DataXElasticSearchProperties dataXElasticSearchProperties;@Overridepublic Reader getReader(String table) {return null;}/*** 普通寫入* @param index 索引* @return Writer*/@Overridepublic Writer getWriter(String index) {ElasticSearchWriter writer = new ElasticSearchWriter();ElasticSearchWriter.ElasticSearchParameter writerParameter = getElasticSearchWriter(index);writer.setParameter(writerParameter);return writer;}@Overridepublic String getJobTaskName(String readerTable, String writeTable) {return null;}/*** es寫入,帶setting設置*/@Overridepublic Writer getWriter(String index,Map<String,Object> map) {ElasticSearchWriter writer = new ElasticSearchWriter();ElasticSearchWriter.ElasticSearchParameter writerParameter = getElasticSearchWriter(index);ElasticSearchWriter.Settings settings = new ElasticSearchWriter.Settings();settings.setIndex(map);writerParameter.setSettings(settings);writer.setParameter(writerParameter);return writer;}/*** 獲取公共寫入參數*/public ElasticSearchWriter.ElasticSearchParameter getElasticSearchWriter(String index){ElasticSearchWriter.ElasticSearchParameter writerParameter = new ElasticSearchWriter.ElasticSearchParameter();List<Column> columns = getEsColumns(index);writerParameter.setColumn(columns);writerParameter.setEndpoint(dataXElasticSearchProperties.getUrl());writerParameter.setAccessId(dataXElasticSearchProperties.getUsername());writerParameter.setAccessKey(dataXElasticSearchProperties.getPassword());writerParameter.setIndex(index);return writerParameter;}/*** 獲取指定索引的映射字段* 讀取時和創建順序相反*/public List<ElasticSearchWriter.Column> getEsColumns(String index){List<ElasticSearchWriter.Column> columns = new ArrayList<>();// 獲取操作的索引文檔對象IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(index));Map<String, Object> mapping = indexOperations.getMapping();mapping.forEach((k,value) ->{JSONObject json = JSON.parseObject(JSONObject.toJSONString(value));for (Map.Entry<String, Object> entry : json.entrySet()) {String key = entry.getKey();JSONObject properties = JSON.parseObject(JSONObject.toJSONString(entry.getValue()));String type = properties.getString("type");String analyzer = properties.getString("analyzer");columns.add(new ElasticSearchWriter.Column(key,type,analyzer));}});return columns;}}
5.測試
@Testpublic void test3() throws IOException {Map<String,Object> settings = new HashMap<>();settings.put("number_of_shards",1);settings.put("number_of_replicas",1);String jobTask = elasticSearchHandler.getJobTaskName(mysqlHandler.getReader("t_user"), elasticSearchHandler.getWriter("datax_data",settings),"m2e");dataXExecuter.run(jobTask);}
5.Job文件參數說明
1.MysqlReader
參數名 | 描述 | 必選 |
---|---|---|
jdbcUrl | 對端數據庫的JDBC連接信息并支持一個庫填寫多個連接地址。之所以使用JSON數組描述連接信息,是因為阿里集團內部支持多個IP探測,如果配置了多個,MysqlReader可以依次探測ip的可連接性,直到選擇一個合法的IP,如果全部連接失敗,MysqlReader報錯。 注意,jdbcUrl必須包含在connection配置單元中。對于阿里外部使用情況,JSON數組填寫一個JDBC連接即可。 | 是 |
username | 數據源的用戶名 | 是 |
password | 數據源指定用戶名的密碼 | 是 |
table | 所選取的需要同步的表。支持多張表同時抽取,用戶自己需保證多張表是同一schema結構,注意,table必須包含在connection配置單元中。 | 是 |
column | 所配置的表中需要同步的列名集合,使用JSON的數組描述字段信息。用戶使用代表默認使用所有列配置,例如['']。 | 是 |
splitPk | 分區主鍵,DataX因此會啟動并發任務進行數據同步。推薦splitPk用戶使用表主鍵 | 否 |
where | 篩選條件,MysqlReader根據指定的column、table、where條件拼接SQL,并根據這個SQL進行數據抽取。注意:limit不是SQL的合法where子句。where條件可以有效地進行業務增量同步。如果不填寫where語句,包括不提供where的key或者value,均視作同步全量數據。 | 否 |
querySql | 查詢SQL同步。當用戶配置了這一項之后,當用戶配置querySql時,MysqlReader直接忽略table、column、where條件的配置 ,querySql優先級大于table、column、where選項。 | 否 |
2.MysqlWriter
參數名 | 描述 | 必選 |
---|---|---|
jdbcUrl | 目的數據庫的 JDBC 連接信息。作業運行時,DataX 會在你提供的 jdbcUrl 后面追加如下屬性:yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true;在一個數據庫上只能配置一個 jdbcUrl 值。這與 MysqlReader 支持多個備庫探測不同,因為此處不支持同一個數據庫存在多個主庫的情況(雙主導入數據情況) | 是 |
username | 數據源的用戶名 | 是 |
password | 數據源指定用戶名的密碼 | 是 |
table | 目的表的表名稱。支持寫入一個或者多個表。當配置為多張表時,必須確保所有表結構保持一致。table 和 jdbcUrl 必須包含在 connection 配置單元中 | 是 |
column | 目的表需要寫入數據的字段,例如: “column”: [“id”,“name”,“age”]。如果要依次寫入全部列,使用* 表示, 例如: "column": ["*"] 。 | 是 |
session | DataX在獲取Mysql連接時,執行session指定的SQL語句,修改當前connection session屬性 | 否 |
preSql | 寫入數據到目的表前,會先執行這里的標準語句。如果 Sql 中有你需要操作到的表名稱,請使用 @table 表示,這樣在實際執行 Sql 語句時,會對變量按照實際表名稱進行替換。比如你的任務是要寫入到目的端的100個同構分表(表名稱為:datax_00,datax01, … datax_98,datax_99),并且你希望導入數據前,先對表中數據進行刪除操作,那么你可以這樣配置:"preSql":["delete from 表名"] ,效果是:在執行到每個表寫入數據前,會先執行對應的 delete from 對應表名稱 | 否 |
postSql | 寫入數據到目的表后,會執行這里的標準語句 | 否 |
writeMode | 控制寫入數據到目標表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 語句;可選:insert/replace/update,默認insert | 是 |
batchSize | 一次性批量提交的記錄數大小,該值可以極大減少DataX與Mysql的網絡交互次數,并提升整體吞吐量。但是該值設置過大可能會造成DataX運行進程OOM情況。默認:1024 | 否 |
3.ElasticsearchWriter
參數名 | 描述 | 必選 |
---|---|---|
endpoint | ElasticSearch的連接地址 | 是 |
accessId | http auth中的user | 否 |
accessKey | http auth中的password | 否 |
index | elasticsearch中的index名 | 是 |
type | elasticsearch中index的type名,默認index名 | 否 |
cleanup | 是否刪除原表, 默認值:false | 否 |
batchSize | 每次批量數據的條數,默認值:1000 | 否 |
trySize | 失敗后重試的次數, 默認值:30 | 否 |
timeout | 客戶端超時時間,默認值:600000 | 否 |
discovery | 啟用節點發現將(輪詢)并定期更新客戶機中的服務器列表。默認false | 否 |
compression | http請求,開啟壓縮,默認true | 否 |
multiThread | http請求,是否有多線程,默認true | 否 |
ignoreWriteError | 忽略寫入錯誤,不重試,繼續寫入,默認false | 否 |
alias | 數據導入完成后寫入別名 | 否 |
aliasMode | 數據導入完成后增加別名的模式,append(增加模式), exclusive(只留這一個),默認append | 否 |
settings | 創建index時候的settings, 與elasticsearch官方相同 | 否 |
splitter | 如果插入數據是array,就使用指定分隔符,默認值:-,- | 否 |
column | elasticsearch所支持的字段類型,樣例中包含了全部 | 是 |
dynamic | 不使用datax的mappings,使用es自己的自動mappings,默認值: false | 否 |
參考資料:https://blog.csdn.net/wlddhj/article/details/137585979
Github主頁地址:https://github.com/alibaba/DataX