datax底層原理_Datax 插件加載原理

Datax 插件加載原理

插件類型

Datax有好幾種類型的插件,每個插件都有不同的作用。

reader, 讀插件。Reader就是屬于這種類型的

writer, 寫插件。Writer就是屬于這種類型的

transformer, 目前還未知

handler, 主要用于任務執行前的準備工作和完成的收尾工作。

插件類型由PluginType枚舉表示

1

2

3public enum PluginType {

READER("reader"), TRANSFORMER("transformer"), WRITER("writer"), HANDLER("handler");

}

根據運行類型,又可以分為Job級別的插件和Task級別的插件。uml如下圖所示

插件配置讀取

ConfigParser首先會讀取配置文件,提取需要使用的reader,writer,prehandler 和 posthandler的名稱。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34public static Configuration parse(final String jobPath){

Configuration configuration = ConfigParser.parseJobConfig(jobPath);

// 合并 conf/core.json文件的配置, false 表示不覆蓋原有的配置

configuration.merge(

//CoreConstant.DATAX_CONF_PATH的值為conf/core.json

ConfigParser.parseCoreConfig

(CoreConstant.DATAX_CONF_PATH),

false);

// 獲取job.content列表的第一個reader

String readerPluginName = configuration.getString(

//CoreConstant.DATAX_JOB_CONTENT_READER_NAME的值為job.content[0].reader.name

CoreConstant.DATAX_JOB_CONTENT_READER_NAME);

// 獲取job.content列表的第一個writer

String writerPluginName = configuration.getString(

//CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME的值為job.content[0].writer.name

CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);

// 讀取job.preHandler.pluginName

String preHandlerName = configuration.getString(

//CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME的值為job.preHandler.pluginName

CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);

// 讀取job.postHandler.pluginName

String postHandlerName = configuration.getString(

//CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME的值為job.postHandler.pluginName

CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);

Set pluginList = new HashSet();

pluginList.add(readerPluginName);

pluginList.add(writerPluginName);

......

// 調用parsePluginConfig生成plugin的配置,然后合并

configuration.merge(parsePluginConfig(new ArrayList(pluginList)), false);

......

return configuration;

}

提取完插件名稱后,會去reader目錄和writer目錄,尋找插件的位置。目前Datax只支持reader和writer插件,因為它只從這兩個目錄中尋找。如果想自己擴展其他類型插件的話,比如handler類型的, 需要修改parsePluginConfig的代碼。每個插件目錄會有一個重要的配置文件 plugin.json ,它定義了插件的名稱和對應的類,在LoadUtils類加載插件的時候會使用到。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62public static Configuration parsePluginConfig(List wantPluginNames){

Configuration configuration = Configuration.newDefault();

......

// 遍歷plugin.reader目錄下的文件夾

for (final String each : ConfigParser

.getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {

// 調用 parseOnePluginConfig解析單個plugin配置

Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);

if(eachReaderConfig!=null) {

configuration.merge(eachReaderConfig, true);

complete += 1;

}

}

// 遍歷plugin.writer目錄下的文件夾

for (final String each : ConfigParser

.getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {

// 調用 parseOnePluginConfig解析單個plugin配置

Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);

if(eachWriterConfig!=null) {

configuration.merge(eachWriterConfig, true);

complete += 1;

}

}

......

return configuration;

}

// 讀取plugin目錄下的plugin.json 文件

public static Configuration parseOnePluginConfig(final String path, final String type, Set pluginSet, List wantPluginNames){

String filePath = path + File.separator + "plugin.json";

Configuration configuration = Configuration.from(new File(filePath));

String pluginPath = configuration.getString("path");

String pluginName = configuration.getString("name");

if(!pluginSet.contains(pluginName)) {

pluginSet.add(pluginName);

} else {

......

}

//不是想要的插件,就不生成配置,直接返回

if (wantPluginNames != null && wantPluginNames.size() > 0 && !wantPluginNames.contains(pluginName)) {

return null;

}

// plugin.json的path路徑,是指插件的jar包。如果沒有指定,則默認為和plugin.json文件在同一個目錄下

boolean isDefaultPath = StringUtils.isBlank(pluginPath);

if (isDefaultPath) {

configuration.set("path", path);

}

Configuration result = Configuration.newDefault();

// 最后保存在puligin.{type}.{pluginName}路徑下

result.set(

String.format("plugin.%s.%s", type, pluginName),

configuration.getInternal());

return result;

}

動態加載插件

插件的加載都是使用ClassLoader動態加載。 為了避免類的沖突,對于每個插件的加載,對應著獨立的加載器。加載器由JarLoader實現,插件的加載接口由LoadUtil類負責。當要加載一個插件時,需要實例化一個JarLoader,然后切換thread class loader之后,才加載插件。

JarLoader 類

JarLoader繼承URLClassLoader,擴充了可以加載目錄的功能。可以從指定的目錄下,把傳入的路徑、及其子路徑、以及路徑中的jar文件加入到class path。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80public class JarLoader extends URLClassLoader{

public JarLoader(String[] paths){

this(paths, JarLoader.class.getClassLoader());

}

public JarLoader(String[] paths, ClassLoader parent){

// 調用getURLS,獲取所有的jar包路徑

super(getURLs(paths), parent);

}

// 獲取所有的jar包

private static URL[] getURLs(String[] paths) {

// 獲取包括子目錄的所有目錄路徑

List dirs = new ArrayList();

for (String path : paths) {

dirs.add(path);

// 獲取path目錄和其子目錄的所有目錄路徑

JarLoader.collectDirs(path, dirs);

}

// 遍歷目錄,獲取jar包的路徑

List urls = new ArrayList();

for (String path : dirs) {

urls.addAll(doGetURLs(path));

}

return urls.toArray(new URL[0]);

}

// 遞歸的方式,獲取所有目錄

private static void collectDirs(String path, List collector){

// path為空,終止

if (null == path || StringUtils.isBlank(path)) {

return;

}

// path不為目錄,終止

File current = new File(path);

if (!current.exists() || !current.isDirectory()) {

return;

}

// 遍歷完子文件,終止

for (File child : current.listFiles()) {

if (!child.isDirectory()) {

continue;

}

collector.add(child.getAbsolutePath());

collectDirs(child.getAbsolutePath(), collector);

}

}

private static List doGetURLs(final String path){

File jarPath = new File(path);

// 只尋找文件以.jar結尾的文件

FileFilter jarFilter = new FileFilter() {

@Override

public boolean accept(File pathname){

return pathname.getName().endsWith(".jar");

}

};

File[] allJars = new File(path).listFiles(jarFilter);

List jarURLs = new ArrayList(allJars.length);

for (int i = 0; i < allJars.length; i++) {

try {

jarURLs.add(allJars[i].toURI().toURL());

} catch (Exception e) {

throw DataXException.asDataXException(

FrameworkErrorCode.PLUGIN_INIT_ERROR,

"系統加載jar包出錯", e);

}

}

return jarURLs;

}

}

LoadUtil 類

LoadUtil管理著插件的加載器,調用getJarLoader返回插件對應的加載器。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31public class LoadUtil{

// 加載器的HashMap, Key由插件類型和名稱決定, 格式為plugin.{pulginType}.{pluginName}

private static Map jarLoaderCenter = new HashMap();

public static synchronized JarLoader getJarLoader(PluginType pluginType, String pluginName){

Configuration pluginConf = getPluginConf(pluginType, pluginName);

JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,

pluginName));

if (null == jarLoader) {

// 構建加載器JarLoader

// 獲取jar所在的目錄

String pluginPath = pluginConf.getString("path");

jarLoader = new JarLoader(new String[]{pluginPath});

//添加到HashMap中

jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),

jarLoader);

}

return jarLoader;

}

private static final String pluginTypeNameFormat = "plugin.%s.%s";

// 生成HashMpa的key值

private static String generatePluginKey(PluginType pluginType,

String pluginName){

return String.format(pluginTypeNameFormat, pluginType.toString(),

pluginName);

}

當獲取類加載器,就可以調用LoadUtil來加載插件。LoadUtil提供了 loadJobPlugin 和 loadTaskPlugin 兩個接口,加載Job 和 Task 的兩種插件。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43// 加載Job類型的Plugin

public static AbstractJobPlugin loadJobPlugin(PluginType pluginType, String pluginName){

// 調用loadPluginClass方法,加載插件對應的class

Class extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Job);

// 實例化Plugin,轉換為AbstractJobPlugin

AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz.newInstance();

// 設置Job的配置,路徑為plugin.{pluginType}.{pluginName}

jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));

return jobPlugin;

}

// 加載Task類型的Plugin

public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType, String pluginName){

// 調用loadPluginClass方法,加載插件對應的class

Class extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Task);

// 實例化Plugin,轉換為AbstractTaskPlugin

AbstractTaskPlugin taskPlugin = (AbstracTasktTaskPlugin) clazz.newInstance();

// 設置Task的配置,路徑為plugin.{pluginType}.{pluginName}

taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));

}

// 加載插件類

// pluginType 代表插件類型

// pluginName 代表插件名稱

// pluginRunType 代表著運行類型,Job或者Task

private static synchronized Class extends AbstractPlugin> loadPluginClass(

PluginType pluginType, String pluginName,

ContainerType pluginRunType) {

// 獲取插件配置

Configuration pluginConf = getPluginConf(pluginType, pluginName);

// 獲取插件對應的ClassLoader

JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);

try {

// 加載插件的class

return (Class extends AbstractPlugin>) jarLoader

.loadClass(pluginConf.getString("class") + "$"

+ pluginRunType.value());

} catch (Exception e) {

throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);

}

}

切換類加載器

ClassLoaderSwapper類,提供了比較方便的切換接口。

1

2

3

4

5

6

7

8

9// 實例化

ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();

ClassLoader classLoader1 = new URLClassLoader();

// 切換加載器classLoader1

classLoaderSwapper.setCurrentThreadClassLoader(classLoader1);

Class extends MyClass> myClass = classLoader1.loadClass("MyClass");

// 切回加載器

classLoaderSwapper.restoreCurrentThreadClassLoader();

ClassLoaderSwapper的源碼比較簡單, 它有一個屬性storeClassLoader, 用于保存著切換之前的ClassLoader。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24public final class ClassLoaderSwapper{

// 保存切換之前的加載器

private ClassLoader storeClassLoader = null;

public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader){

// 保存切換前的加載器

this.storeClassLoader = Thread.currentThread().getContextClassLoader();

// 切換加載器到classLoader

Thread.currentThread().setContextClassLoader(classLoader);

return this.storeClassLoader;

}

public ClassLoader restoreCurrentThreadClassLoader(){

ClassLoader classLoader = Thread.currentThread()

.getContextClassLoader();

// 切換到原來的加載器

Thread.currentThread().setContextClassLoader(this.storeClassLoader);

// 返回切換之前的類加載器

return classLoader;

}

}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/394056.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/394056.shtml
英文地址,請注明出處:http://en.pswp.cn/news/394056.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

mysql windows身份驗證_SQL Server 2005 怎么就不能用Windows身份驗證方式登錄呢?

SQL Server 2005 自從裝到我的電腦上始終無法使用Windows身份驗證的方式登錄,由于使用用戶名和密碼登錄還算順暢,所以一直忽略了這SQL Server 2005 自從裝到我的電腦上始終無法使用Windows身份驗證的方式登錄,由于使用用戶名和密碼登錄還算順暢,所以一直忽略了這個問題,直到又有…

JavaScript正則表達式快速簡單的指南

Interested in learning JavaScript? Get my ebook at jshandbook.com有興趣學習JavaScript嗎&#xff1f; 在jshandbook.com上獲取我的電子書 正則表達式簡介 (Introduction to Regular Expressions) A regular expression (also called regex for short) is a fast way to w…

leetcode104. 二叉樹的最大深度(dfs)

給定一個二叉樹&#xff0c;找出其最大深度。二叉樹的深度為根節點到最遠葉子節點的最長路徑上的節點數。說明: 葉子節點是指沒有子節點的節點。示例&#xff1a; 給定二叉樹 [3,9,20,null,null,15,7]&#xff0c;3/ \9 20/ \15 7 返回它的最大深度 3 。代碼 class Soluti…

[解讀REST] 3.基于網絡應用的架構

鏈接上文[解讀REST] 2.REST用來干什么的&#xff1f;&#xff0c;上文中解釋到什么是架構風格和應該以怎樣的視角來理解REST&#xff08;Web的架構風格&#xff09;。本篇來介紹一組自洽的術語&#xff0c;用它來描述和解釋軟件架構&#xff1b;以及列舉下對于基于網絡的應用來…

js判斷對象還是數組

1.對于Javascript 1.8.5&#xff08;ECMAScript 5&#xff09;&#xff0c;變量名字.isArray( )可以實現這個目的 var a[]; var b{}; Array.isArray(a);//true Array.isArray(b)//false 2.如果你只是用typeof來檢查該變量&#xff0c;不論是array還是object&#xff0c;都將返回…

mysql 除去列名打印_sql – 使用beeline時避免在列名中打印表名

在beeline中使用hive時使用簡單的select查詢我想在列名中返回沒有表名的表作為默認值.例數據CREATE TABLE IF NOT EXISTS employee ( eid int, name String,salary String, destination String)COMMENT Employee detailsROW FORMAT DELIMITEDFIELDS TERMINATED BY \tLINES TERM…

移動應用程序和網頁應用程序_如何開發感覺像本機移動應用程序的漸進式Web應用程序...

移動應用程序和網頁應用程序by Samuele Dassatti通過薩穆爾達薩蒂 如何開發感覺像本機移動應用程序的漸進式Web應用程序 (How you can develop Progressive Web Apps that feel like native mobile apps) I’m currently developing a Progressive Web App that will also ser…

leetcode1162. 地圖分析(bfs)

你現在手里有一份大小為 N x N 的「地圖」&#xff08;網格&#xff09; grid&#xff0c;上面的每個「區域」&#xff08;單元格&#xff09;都用 0 和 1 標記好了。其中 0 代表海洋&#xff0c;1 代表陸地&#xff0c;請你找出一個海洋區域&#xff0c;這個海洋區域到離它最近…

mysql修改root密碼的方法

在 Navicat for MySQL 下面直接執行 SET PASSWORD FOR rootlocalhost PASSWORD(newpass); 就可以 方法1&#xff1a; 用SET PASSWORD命令 mysql -u root mysql> SET PASSWORD FOR rootlocalhost PASSWORD(newpass); 方法2&#xff1a;用mysqladmin mysqladmin -u root …

android 上下偏差怎么寫_詳解 Android 熱更新升級如何突破底層結構差異?

知道了 native 替換方式兼容性問題的原因&#xff0c;我們是否有辦法尋求一種新的方式&#xff0c;不依賴于 ROM 底層方法結構的實現而達到替換效果呢&#xff1f;我們發現&#xff0c;這樣 native 層面替換思路&#xff0c;其實就是替換 ArtMethod 的所有成員。那么&#xff0…

Python3 Flask+nginx+Gunicorn部署(上)

前言&#xff1a;一般在本地運行flask項目通常是直接python3 文件名.py&#xff0c;然后打開&#xff1a;http://127.0.0.1:5000 查看代碼結果 這次主要是記錄flask在python3 環境結合nginx gunicorn在服務器上進行項目的部署 &#xff08;一&#xff09;運行環境&#xff1a;虛…

NOIP2011 鋪地毯

題目描述 為了準備一個獨特的頒獎典禮&#xff0c;組織者在會場的一片矩形區域&#xff08;可看做是平面直角坐標系的第一象限&#xff09;鋪上一些矩形地毯&#xff0c;一共有n張地毯&#xff0c;編號從 1 到n。現在將這些地毯按照編號從小到大的順序平行于坐標軸先后鋪設&…

java lock可重入_Java源碼解析之可重入鎖ReentrantLock

本文基于jdk1.8進行分析。ReentrantLock是一個可重入鎖&#xff0c;在ConcurrentHashMap中使用了ReentrantLock。首先看一下源碼中對ReentrantLock的介紹。如下圖。ReentrantLock是一個可重入的排他鎖&#xff0c;它和synchronized的方法和代碼有著相同的行為和語義&#xff0c…

matlab的qammod函數_基于-MATLAB下的16QAM仿真.doc

1.課程設計目的隨著現代通信技術的發展&#xff0c;特別是移動通信技術高速發展&#xff0c;頻帶利用率問題越來越被人們關注。在頻譜資源非常有限的今天&#xff0c;傳統通信系統的容量已經不能滿足當前用戶的要求。正交幅度調制QAM(Quadrature Amplitude Modulation)以其高頻…

POJ3264 【RMQ基礎題—ST-線段樹】

ST算法Code&#xff1a; //#include<bits/stdc.h> #include<cstdio> #include<math.h> #include<iostream> #include<queue> #include<algorithm> #include<string.h> using namespace std; typedef long long LL;const int N5e410;…

leetcode199. 二叉樹的右視圖(bfs)

給定一棵二叉樹&#xff0c;想象自己站在它的右側&#xff0c;按照從頂部到底部的順序&#xff0c;返回從右側所能看到的節點值。示例:輸入: [1,2,3,null,5,null,4] 輸出: [1, 3, 4] 解釋:1 <---/ \ 2 3 <---\ \5 4 <---解題思…

開發人員工作周報_如何增加找到開發人員工作的機會

開發人員工作周報In a recent job as a senior developer, I helped interview and hire many of my employer’s development team members. This is a brain dump of my advice based on those interviews.在最近擔任高級開發人員的工作中&#xff0c;我幫助面試和雇用了許多…

安全專家教你如何利用Uber系統漏洞無限制的免費乘坐?

本文講的是安全專家教你如何利用Uber系統漏洞無限制的免費乘坐&#xff1f;&#xff0c;近日&#xff0c;根據外媒報道&#xff0c;美國一名安全研究人員發現Uber上存在一處安全漏洞&#xff0c;允許發現這一漏洞的任何用戶在全球范圍內免費享受Uber乘車服務。據悉&#xff0c;…

flume介紹

flume 1.flume是什么 Flume:** Flume是Cloudera提供的一個高可用的&#xff0c;高可靠的&#xff0c;分布式的海量日志采集、傳輸、聚合的系統。** Flume僅僅運行在linux環境下** flume.apache.org(Documentation--Flume User Guide) Flume體系結構(Architecture)&#xff1a; …

threadx 信號量 應用_操作系統及ThreadX簡介.ppt

操作系統及ThreadX簡介操作系統及ThreadX簡介 軟件二部 2006.09 主要內容 多任務操作系統概述 ThreadX簡介 關于驅動的交流 操作系統概述 什么是操作系統 管理計算機的所有資源&#xff0c;并為應用程序提供服務的最重要的系統軟件 操作系統的目的 為用戶編程提供簡單的接口&am…