日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

datax底层原理_Datax 插件加载原理

發布時間:2023/12/9 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 datax底层原理_Datax 插件加载原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Datax 插件加載原理

插件類型

Datax有好幾種類型的插件,每個插件都有不同的作用。

reader, 讀插件。Reader就是屬于這種類型的

writer, 寫插件。Writer就是屬于這種類型的

transformer, 目前還未知

handler, 主要用于任務執行前的準備工作和完成的收尾工作。

插件類型由PluginType枚舉表示

1

2

3public enum PluginType {

READER("reader"), TRANSFORMER("transformer"), WRITER("writer"), HANDLER("handler");

}

根據運行類型,又可以分為Job級別的插件和Task級別的插件。uml如下圖所示

插件配置讀取

ConfigParser首先會讀取配置文件,提取需要使用的reader,writer,prehandler 和 posthandler的名稱。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34public static Configuration parse(final String jobPath){

Configuration configuration = ConfigParser.parseJobConfig(jobPath);

// 合并 conf/core.json文件的配置, false 表示不覆蓋原有的配置

configuration.merge(

//CoreConstant.DATAX_CONF_PATH的值為conf/core.json

ConfigParser.parseCoreConfig

(CoreConstant.DATAX_CONF_PATH),

false);

// 獲取job.content列表的第一個reader

String readerPluginName = configuration.getString(

//CoreConstant.DATAX_JOB_CONTENT_READER_NAME的值為job.content[0].reader.name

CoreConstant.DATAX_JOB_CONTENT_READER_NAME);

// 獲取job.content列表的第一個writer

String writerPluginName = configuration.getString(

//CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME的值為job.content[0].writer.name

CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);

// 讀取job.preHandler.pluginName

String preHandlerName = configuration.getString(

//CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME的值為job.preHandler.pluginName

CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);

// 讀取job.postHandler.pluginName

String postHandlerName = configuration.getString(

//CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME的值為job.postHandler.pluginName

CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);

Set pluginList = new HashSet();

pluginList.add(readerPluginName);

pluginList.add(writerPluginName);

......

// 調用parsePluginConfig生成plugin的配置,然后合并

configuration.merge(parsePluginConfig(new ArrayList(pluginList)), false);

......

return configuration;

}

提取完插件名稱后,會去reader目錄和writer目錄,尋找插件的位置。目前Datax只支持reader和writer插件,因為它只從這兩個目錄中尋找。如果想自己擴展其他類型插件的話,比如handler類型的, 需要修改parsePluginConfig的代碼。每個插件目錄會有一個重要的配置文件 plugin.json ,它定義了插件的名稱和對應的類,在LoadUtils類加載插件的時候會使用到。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62public static Configuration parsePluginConfig(List wantPluginNames){

Configuration configuration = Configuration.newDefault();

......

// 遍歷plugin.reader目錄下的文件夾

for (final String each : ConfigParser

.getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {

// 調用 parseOnePluginConfig解析單個plugin配置

Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);

if(eachReaderConfig!=null) {

configuration.merge(eachReaderConfig, true);

complete += 1;

}

}

// 遍歷plugin.writer目錄下的文件夾

for (final String each : ConfigParser

.getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {

// 調用 parseOnePluginConfig解析單個plugin配置

Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);

if(eachWriterConfig!=null) {

configuration.merge(eachWriterConfig, true);

complete += 1;

}

}

......

return configuration;

}

// 讀取plugin目錄下的plugin.json 文件

public static Configuration parseOnePluginConfig(final String path, final String type, Set pluginSet, List wantPluginNames){

String filePath = path + File.separator + "plugin.json";

Configuration configuration = Configuration.from(new File(filePath));

String pluginPath = configuration.getString("path");

String pluginName = configuration.getString("name");

if(!pluginSet.contains(pluginName)) {

pluginSet.add(pluginName);

} else {

......

}

//不是想要的插件,就不生成配置,直接返回

if (wantPluginNames != null && wantPluginNames.size() > 0 && !wantPluginNames.contains(pluginName)) {

return null;

}

// plugin.json的path路徑,是指插件的jar包。如果沒有指定,則默認為和plugin.json文件在同一個目錄下

boolean isDefaultPath = StringUtils.isBlank(pluginPath);

if (isDefaultPath) {

configuration.set("path", path);

}

Configuration result = Configuration.newDefault();

// 最后保存在puligin.{type}.{pluginName}路徑下

result.set(

String.format("plugin.%s.%s", type, pluginName),

configuration.getInternal());

return result;

}

動態加載插件

插件的加載都是使用ClassLoader動態加載。 為了避免類的沖突,對于每個插件的加載,對應著獨立的加載器。加載器由JarLoader實現,插件的加載接口由LoadUtil類負責。當要加載一個插件時,需要實例化一個JarLoader,然后切換thread class loader之后,才加載插件。

JarLoader 類

JarLoader繼承URLClassLoader,擴充了可以加載目錄的功能??梢詮闹付ǖ哪夸浵?#xff0c;把傳入的路徑、及其子路徑、以及路徑中的jar文件加入到class path。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80public class JarLoader extends URLClassLoader{

public JarLoader(String[] paths){

this(paths, JarLoader.class.getClassLoader());

}

public JarLoader(String[] paths, ClassLoader parent){

// 調用getURLS,獲取所有的jar包路徑

super(getURLs(paths), parent);

}

// 獲取所有的jar包

private static URL[] getURLs(String[] paths) {

// 獲取包括子目錄的所有目錄路徑

List dirs = new ArrayList();

for (String path : paths) {

dirs.add(path);

// 獲取path目錄和其子目錄的所有目錄路徑

JarLoader.collectDirs(path, dirs);

}

// 遍歷目錄,獲取jar包的路徑

List urls = new ArrayList();

for (String path : dirs) {

urls.addAll(doGetURLs(path));

}

return urls.toArray(new URL[0]);

}

// 遞歸的方式,獲取所有目錄

private static void collectDirs(String path, List collector){

// path為空,終止

if (null == path || StringUtils.isBlank(path)) {

return;

}

// path不為目錄,終止

File current = new File(path);

if (!current.exists() || !current.isDirectory()) {

return;

}

// 遍歷完子文件,終止

for (File child : current.listFiles()) {

if (!child.isDirectory()) {

continue;

}

collector.add(child.getAbsolutePath());

collectDirs(child.getAbsolutePath(), collector);

}

}

private static List doGetURLs(final String path){

File jarPath = new File(path);

// 只尋找文件以.jar結尾的文件

FileFilter jarFilter = new FileFilter() {

@Override

public boolean accept(File pathname){

return pathname.getName().endsWith(".jar");

}

};

File[] allJars = new File(path).listFiles(jarFilter);

List jarURLs = new ArrayList(allJars.length);

for (int i = 0; i < allJars.length; i++) {

try {

jarURLs.add(allJars[i].toURI().toURL());

} catch (Exception e) {

throw DataXException.asDataXException(

FrameworkErrorCode.PLUGIN_INIT_ERROR,

"系統加載jar包出錯", e);

}

}

return jarURLs;

}

}

LoadUtil 類

LoadUtil管理著插件的加載器,調用getJarLoader返回插件對應的加載器。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31public class LoadUtil{

// 加載器的HashMap, Key由插件類型和名稱決定, 格式為plugin.{pulginType}.{pluginName}

private static Map jarLoaderCenter = new HashMap();

public static synchronized JarLoader getJarLoader(PluginType pluginType, String pluginName){

Configuration pluginConf = getPluginConf(pluginType, pluginName);

JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,

pluginName));

if (null == jarLoader) {

// 構建加載器JarLoader

// 獲取jar所在的目錄

String pluginPath = pluginConf.getString("path");

jarLoader = new JarLoader(new String[]{pluginPath});

//添加到HashMap中

jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),

jarLoader);

}

return jarLoader;

}

private static final String pluginTypeNameFormat = "plugin.%s.%s";

// 生成HashMpa的key值

private static String generatePluginKey(PluginType pluginType,

String pluginName){

return String.format(pluginTypeNameFormat, pluginType.toString(),

pluginName);

}

當獲取類加載器,就可以調用LoadUtil來加載插件。LoadUtil提供了 loadJobPlugin 和 loadTaskPlugin 兩個接口,加載Job 和 Task 的兩種插件。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43// 加載Job類型的Plugin

public static AbstractJobPlugin loadJobPlugin(PluginType pluginType, String pluginName){

// 調用loadPluginClass方法,加載插件對應的class

Class extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Job);

// 實例化Plugin,轉換為AbstractJobPlugin

AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz.newInstance();

// 設置Job的配置,路徑為plugin.{pluginType}.{pluginName}

jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));

return jobPlugin;

}

// 加載Task類型的Plugin

public static AbstractTaskPlugin loadTaskPlugin(PluginType pluginType, String pluginName){

// 調用loadPluginClass方法,加載插件對應的class

Class extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(pluginType, pluginName, ContainerType.Task);

// 實例化Plugin,轉換為AbstractTaskPlugin

AbstractTaskPlugin taskPlugin = (AbstracTasktTaskPlugin) clazz.newInstance();

// 設置Task的配置,路徑為plugin.{pluginType}.{pluginName}

taskPlugin.setPluginConf(getPluginConf(pluginType, pluginName));

}

// 加載插件類

// pluginType 代表插件類型

// pluginName 代表插件名稱

// pluginRunType 代表著運行類型,Job或者Task

private static synchronized Class extends AbstractPlugin> loadPluginClass(

PluginType pluginType, String pluginName,

ContainerType pluginRunType) {

// 獲取插件配置

Configuration pluginConf = getPluginConf(pluginType, pluginName);

// 獲取插件對應的ClassLoader

JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);

try {

// 加載插件的class

return (Class extends AbstractPlugin>) jarLoader

.loadClass(pluginConf.getString("class") + "$"

+ pluginRunType.value());

} catch (Exception e) {

throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);

}

}

切換類加載器

ClassLoaderSwapper類,提供了比較方便的切換接口。

1

2

3

4

5

6

7

8

9// 實例化

ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();

ClassLoader classLoader1 = new URLClassLoader();

// 切換加載器classLoader1

classLoaderSwapper.setCurrentThreadClassLoader(classLoader1);

Class extends MyClass> myClass = classLoader1.loadClass("MyClass");

// 切回加載器

classLoaderSwapper.restoreCurrentThreadClassLoader();

ClassLoaderSwapper的源碼比較簡單, 它有一個屬性storeClassLoader, 用于保存著切換之前的ClassLoader。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24public final class ClassLoaderSwapper{

// 保存切換之前的加載器

private ClassLoader storeClassLoader = null;

public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader){

// 保存切換前的加載器

this.storeClassLoader = Thread.currentThread().getContextClassLoader();

// 切換加載器到classLoader

Thread.currentThread().setContextClassLoader(classLoader);

return this.storeClassLoader;

}

public ClassLoader restoreCurrentThreadClassLoader(){

ClassLoader classLoader = Thread.currentThread()

.getContextClassLoader();

// 切換到原來的加載器

Thread.currentThread().setContextClassLoader(this.storeClassLoader);

// 返回切換之前的類加載器

return classLoader;

}

}

總結

以上是生活随笔為你收集整理的datax底层原理_Datax 插件加载原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。