datax源码阅读二:Engine流程
生活随笔
收集整理的这篇文章主要介绍了
datax源码阅读二:Engine流程
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
一、根据前面python文件知道,java的main函数是com.alibaba.datax.core.Engine
public static void main(String[] args) throws Exception {int exitCode = 0;try {Engine.entry(args);} catch (Throwable e) {exitCode = 1;String trace = ExceptionTracker.trace(e);String errDesc = "未知datax错误,参考堆栈内容分析。";LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + trace);if (e instanceof DataXException) {DataXException tempException = (DataXException) e;ErrorCode errorCode = tempException.getErrorCode();errDesc = errorCode.getDescription();if (errorCode instanceof FrameworkErrorCode) {FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;exitCode = tempErrorCode.toExitValue();}}System.exit(exitCode);}System.exit(exitCode); }main函数主要catch了一下异常,并将异常信息打印出来,实际执行在entry函数中
public static void entry(final String[] args) throws Throwable {Options options = new Options();options.addOption("job", true, "Job config.");options.addOption("jobid", true, "Job unique id.");options.addOption("mode", true, "Job runtime mode.");BasicParser parser = new BasicParser();CommandLine cl = parser.parse(options, args);String jobPath = cl.getOptionValue("job");// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1String jobIdString = cl.getOptionValue("jobid");RUNTIME_MODE = cl.getOptionValue("mode");Configuration configuration = ConfigParser.parse(jobPath);long jobId;if (!"-1".equalsIgnoreCase(jobIdString)) {jobId = Long.parseLong(jobIdString);} else {// only for dsc & ds & datax 3 updateString dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,dsJobUrlPatternString, dsTaskGroupUrlPatternString);jobId = parseJobIdFromUrl(patternStringList, jobPath);}boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);if (!isStandAloneMode && jobId == -1) {// 如果不是 standalone 模式,那么 jobId 一定不能为-1throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");}configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);//打印vmInfoVMInfo vmInfo = VMInfo.getVmInfo();if (vmInfo != null) {LOG.info(vmInfo.toString());}LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");LOG.debug(configuration.toJSON());ConfigurationValidate.doValidate(configuration);Engine engine = new Engine();engine.start(configuration); }entry函数主要功能:
1、解析了java命令行的三个参数,分别是job、jobid和mode,其中job是用户配置的json文件路径,jobid和mode是python文件带进来的,单机模式下可以忽略改参数 2、读取用户配置的json文件,转化为内部的configuration配置 3、打印相关信息,并校验json文件的合法性 4、启动engine执行entry执行完毕之后,进入start函数,关键代码如下:
public void start(Configuration allConf) {// 绑定column转换信息ColumnCast.bind(allConf);/*** 初始化PluginLoader,可以获取各种插件配置*/LoadUtil.bind(allConf);*************container = new JobContainer(allConf);*************container.start(); }start函数中主要包括:
1、列转换默认值,即动态在configuration中注入默认值 2、初始化插件的LoadUtil,后面classLoader相关操作都会依赖这个函数 3、初始化JobContainer并启动总结
以上是生活随笔为你收集整理的datax源码阅读二:Engine流程的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 一键压缩脚本
- 下一篇: Apache 服务器存在高危提权漏洞,请