Datax 插件加載原理
插件類型
Datax有好幾種類型的插件,每個插件都有不同的作用。
reader, 讀插件。Reader就是屬于這種類型的
writer, 寫插件。Writer就是屬于這種類型的
transformer, 目前還未知
handler, 主要用于任務執行前的準備工作和完成的收尾工作。
插件類型由PluginType枚舉表示
1
2
3public enum PluginType {
READER("reader"), TRANSFORMER("transformer"), WRITER("writer"), HANDLER("handler");
}
根據運行類型,又可以分為Job級別的插件和Task級別的插件。uml如下圖所示
插件配置讀取
ConfigParser首先會讀取配置文件,提取需要使用的reader,writer,prehandler 和 posthandler的名稱。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public static Configuration parse(final String jobPath){
Configuration configuration = ConfigParser.parseJobConfig(jobPath);
// 合并 conf/core.json文件的配置, false 表示不覆蓋原有的配置
configuration.merge(
//CoreConstant.DATAX_CONF_PATH的值為conf/core.json
ConfigParser.parseCoreConfig
(CoreConstant.DATAX_CONF_PATH),
false);
// 獲取job.content列表的第一個reader
String readerPluginName = configuration.getString(
//CoreConstant.DATAX_JOB_CONTENT_READER_NAME的值為job.content[0].reader.name
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
// 獲取job.content列表的第一個writer
String writerPluginName = configuration.getString(
//CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME的值為job.content[0].writer.name
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
// 讀取job.preHandler.pluginName
String preHandlerName = configuration.getString(
//CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME的值為job.preHandler.pluginName
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
// 讀取job.postHandler.pluginName
String postHandlerName = configuration.getString(
//CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME的值為job.postHandler.pluginName
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
Set pluginList = new HashSet();
pluginList.add(readerPluginName);
pluginList.add(writerPluginName);
......
// 調用parsePluginConfig生成plugin的配置,然后合并
configuration.merge(parsePluginConfig(new ArrayList(pluginList)), false);
......
return configuration;
}
提取完插件名稱后,會去reader目錄和writer目錄,尋找插件的位置。目前Datax只支持reader和writer插件,因為它只從這兩個目錄中尋找。如果想自己擴展其他類型插件的話,比如handler類型的, 需要修改parsePluginConfig的代碼。每個插件目錄會有一個重要的配置文件 plugin.json ,它定義了插件的名稱和對應的類,在LoadUtils類加載插件的時候會使用到。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62public static Configuration parsePluginConfig(List wantPluginNames){
Configuration configuration = Configuration.newDefault();
......
// 遍歷plugin.reader目錄下的文件夾
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {
// 調用 parseOnePluginConfig解析單個plugin配置
Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);
if(eachReaderConfig!=null) {
configuration.merge(eachReaderConfig, true);
complete += 1;
}
}
// 遍歷plugin.writer目錄下的文件夾
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {
// 調用 parseOnePluginConfig解析單個plugin配置
Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);
if(eachWriterConfig!=null) {
configuration.merge(eachWriterConfig, true);
complete += 1;
}
}
......
return configuration;
}
// 讀取plugin目錄下的plugin.json 文件
public static Configuration parseOnePluginConfig(final String path, final String type, Set pluginSet, List wantPluginNames){
String filePath = path + File.separator + "plugin.json";
Configuration configuration = Configuration.from(new File(filePath));
String pluginPath = configuration.getString("path");
String pluginName = configuration.getString("name");
if(!pluginSet.contains(pluginName)) {
pluginSet.add(pluginName);
} else {
......
}
//不是想要的插件,就不生成配置,直接返回
if (wantPluginNames != null && wantPluginNames.size() > 0 && !wantPluginNames.contains(pluginName)) {
return null;
}
// plugin.json的path路徑,是指插件的jar包。如果沒有指定,則默認為和plugin.json文件在同一個目錄下
boolean isDefaultPath = StringUtils.isBlank(pluginPath);
if (isDefaultPath) {
configuration.set("path", path);
}
Configuration result = Configuration.newDefault();
// 最后保存在puligin.{type}.{pluginName}路徑下
result.set(
String.format("plugin.%s.%s", type, pluginName),
configuration.getInternal());
return result;
}
動態加載插件
插件的加載都是使用ClassLoader動態加載。 為了避免類的沖突,對于每個插件的加載,對應著獨立的加載器。加載器由JarLoader實現,插件的加載接口由LoadUtil類負責。當要加載一個插件時,需要實例化一個JarLoader,然后切換thread class loader之后,才加載插件。
JarLoader 類
JarLoader繼承URLClassLoader,擴充了可以加載目錄的功能。可以從指定的目錄下,把傳入的路徑、及其子路徑、以及路徑中的jar文件加入到class path。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80public class JarLoader extends URLClassLoader{
public JarLoader(String[] paths){
this(paths, JarLoader.class.getClassLoader());
}
public JarLoader(String[] paths, ClassLoader parent){
// 調用getURLS,獲取所有的jar包路徑
super(getURLs(paths), parent);
}
// 獲取所有的jar包
private static URL[] getURLs(String[] paths) {
// 獲取包括子目錄的所有目錄路徑
List dirs = new ArrayList();
for (String path : paths) {
dirs.add(path);
// 獲取path目錄和其子目錄的所有目錄路徑
JarLoader.collectDirs(path, dirs);
}
// 遍歷目錄,獲取jar包的路徑
List urls = new ArrayList();
for (String path : dirs) {
urls.addAll(doGetURLs(path));
}
return urls.toArray(new URL[0]);
}
// 遞歸的方式,獲取所有目錄
private static void collectDirs(String path, List collector){
// path為空,終止
if (null == path || StringUtils.isBlank(path)) {
return;
}
// path不為目錄,終止
File current = new File(path);
if (!current.exists() || !current.isDirectory()) {
return;
}
// 遍歷完子文件,終止
for (File child : current.listFiles()) {
if (!child.isDirectory()) {
continue;
}
collector.add(child.getAbsolutePath());
collectDirs(child.getAbsolutePath(), collector);
}
}
private static List doGetURLs(final String path){
File jarPath = new File(path);
// 只尋找文件以.jar結尾的文件
FileFilter jarFilter = new FileFilter() {
@Override
public boolean accept(File pathname){
return pathname.getName().endsWith(".jar");
}
};
File[] allJars = new File(path).listFiles(jarFilter);
List jarURLs = new ArrayList(allJars.length);
for (int i = 0; i < allJars.length; i++) {
try {
jarURLs.add(allJars[i].toURI().toURL());
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INIT_ERROR,
"系統加載jar包出錯", e);
}
}
return jarURLs;
}
}
LoadUtil 類
LoadUtil管理著插件的加載器,調用getJarLoader返回插件對應的加載器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class LoadUtil{
// 加載器的HashMap, Key由插件類型和名稱決定, 格式為plugin.{pulginType}.{pluginName}
private static Map jarLoaderCenter = new HashMap();
public static synchronized JarLoader getJarLoader(PluginType pluginType, String pluginName){
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
pluginName));
if (null == jarLoader) {
// 構建加載器JarLoader
// 獲取jar所在的目錄
String pluginPath = pluginConf.getString("path");
jarLoader = new JarLoader(new String[]{pluginPath});
//添加到HashMap中
jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
jarLoader);
}
return jarLoader;
}
private static final String pluginTypeNameFormat = "plugin.%s.%s";
// 生成HashMpa的key值
private static String generatePluginKey(PluginType pluginType,
String pluginName){
return String.format(pluginTypeNameFormat, pluginType.toString(),
pluginName);
}
當獲取類加載器,就可以調用LoadUtil來加載插件。LoadUtil提供了 loadJobPlugin 和 loadTaskPlugin 兩個接口,加載Job 和 Task 的兩種插件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43// 加載Job類型的Plugin
public static AbstractJobPlugin loadJobPlugin(PluginType pluginType, String pluginName){
// 調用loadPluginClass方法,加載插件對應的class
Class extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Job);
// 實例化Plugin,轉換為AbstractJobPlugin
AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz.newInstance();
// 設置Job的配置,路徑為plugin.{pluginType}.{pluginName}
jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
return jobPlugin;
}
// 加載Task類型的Plugin
public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType, String pluginName){
// 調用loadPluginClass方法,加載插件對應的class
Class extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Task);
// 實例化Plugin,轉換為AbstractTaskPlugin
AbstractTaskPlugin taskPlugin = (AbstracTasktTaskPlugin) clazz.newInstance();
// 設置Task的配置,路徑為plugin.{pluginType}.{pluginName}
taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
}
// 加載插件類
// pluginType 代表插件類型
// pluginName 代表插件名稱
// pluginRunType 代表著運行類型,Job或者Task
private static synchronized Class extends AbstractPlugin> loadPluginClass(
PluginType pluginType, String pluginName,
ContainerType pluginRunType) {
// 獲取插件配置
Configuration pluginConf = getPluginConf(pluginType, pluginName);
// 獲取插件對應的ClassLoader
JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
try {
// 加載插件的class
return (Class extends AbstractPlugin>) jarLoader
.loadClass(pluginConf.getString("class") + "$"
+ pluginRunType.value());
} catch (Exception e) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
}
}
切換類加載器
ClassLoaderSwapper類,提供了比較方便的切換接口。
1
2
3
4
5
6
7
8
9// 實例化
ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();
ClassLoader classLoader1 = new URLClassLoader();
// 切換加載器classLoader1
classLoaderSwapper.setCurrentThreadClassLoader(classLoader1);
Class extends MyClass> myClass = classLoader1.loadClass("MyClass");
// 切回加載器
classLoaderSwapper.restoreCurrentThreadClassLoader();
ClassLoaderSwapper的源碼比較簡單, 它有一個屬性storeClassLoader, 用于保存著切換之前的ClassLoader。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public final class ClassLoaderSwapper{
// 保存切換之前的加載器
private ClassLoader storeClassLoader = null;
public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader){
// 保存切換前的加載器
this.storeClassLoader = Thread.currentThread().getContextClassLoader();
// 切換加載器到classLoader
Thread.currentThread().setContextClassLoader(classLoader);
return this.storeClassLoader;
}
public ClassLoader restoreCurrentThreadClassLoader(){
ClassLoader classLoader = Thread.currentThread()
.getContextClassLoader();
// 切換到原來的加載器
Thread.currentThread().setContextClassLoader(this.storeClassLoader);
// 返回切換之前的類加載器
return classLoader;
}
}