您当前的位置: 首页 > 

杨林伟

暂无认证

  • 3浏览

    0关注

    3337博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

DataX教程(03)- 源码解读(超详细版)

杨林伟 发布时间:2022-01-26 15:32:55 ,浏览量:3

文章目录
  • 01 引言
  • 02 DataX框架讲解
    • 2.1 DataX设计思想
    • 2.2 DataX运行流程
      • 2.2.1 DataX运行流程解析
      • 2.2.2 DataX运行流程简单举例
  • 03 DataX源码分析
    • 3.1 源码流程描述
    • 3.2 流程对应代码
      • 3.2.1 step1:入口
      • 3.2.2 step2:封装配置
      • 3.2.3 step3:初始化并启动容器
      • 3.2.4 step4:JobContainer运行内容
      • 3.2.5 step5:调度JobContainer
      • 3.2.6 step6:JobContainer详解
      • 3.2.7 step7:业务实现
  • 04 几个关键的类
    • 4.1 LoadUtil 插件加载工具
    • 4.2 ClassLoaderSwapper 类加载器管理者
    • 4.3 ErrorRecordChecker 错误记录检查者
    • 4.4 Communication 所有的状态及信息统计
    • 4.5 AbstractScheduler 调度器
  • 05 文末

01 引言

通过前面的两篇博文,我们可以在IDEA下运行DataX源码项目了:

  • 《DataX教程(01)- 入门》
  • 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》

本文需要讲解的是DataX的源码。

02 DataX框架讲解 2.1 DataX设计思想

DataX框架设计思想 DataX采用Framework + plugin架构构建,将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader做为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
2.2 DataX运行流程 2.2.1 DataX运行流程解析

DataX核心架构 结合上图,描述一下DataX整个运行流程(从左往右看):

  • step1: DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程;
  • step2: 会根据不同的源端切分策略,将Job切分成多个小的Task(子任务);
  • step3: 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将step2拆分成的Task重新组合,组装成TaskGroup(任务组),每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5
  • step4:TaskGroup里面的Task都有TaskGroup来启动,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作;
  • step5:DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
2.2.2 DataX运行流程简单举例

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。

DataX的调度决策思路是:

  • ① DataXJob根据分库分表切分成了100个Task。
  • ② 根据20个并发,DataX计算共需要分配4个TaskGroup(默认单个任务组的并发数量为5)。
  • ③ 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
03 DataX源码分析

在进行源码分析前,首先贴上我自己整理的DataX源码分析流程图: 在这里插入图片描述

3.1 源码流程描述

其实DataX的运行流程也没有我们想的这么复杂,根据上面的流程图,简单描述如下:

  • 【Step1】:首先入口在Engine类,入口参数我们有“mode”、“jobId”、“job.json”(即运行的模式、任务id、任务配置等);
  • 【Step2】:有了入口参数,会先校验,然后统一封装好成Configuration,传给下一步使用;
  • 【Step3】:根据配置初始化不同的容器并启动,目前只有两种容器,分别为JobContainer(作业任务容器)、TaskGroupContainer(分组任务容器),一般都是创建一个作业,先进JobContainer
  • 【Step4】:在JobContainer里面,会根据配置使用类加载器加载readerwriter的实例对象,然后出发加载对象的生命周期方法,如init()prepare()post()等;
  • 【Step5】:JobContainer执行流程里,有一个schedule方法,主要是为了切割任务组,并初始化到TaskContainer,调度使用;
  • 【Step6】:TaskContainer主要是对任务进行初始化、并控制执行的顺序(如:reader -> transfromer -> writer)。
  • 【Step7】:最后,具体的业务实现,就在Reader(即:E)、Transformer(即:T)、Writer(即:L)里面实现。
3.2 流程对应代码 3.2.1 step1:入口

【Step1】:首先入口在Engine类,入口参数我们有“mode”、“jobId”、“job.json”(即运行的模式、任务id、任务配置等)

对应的代码(com.alibaba.datax.core.Engine#main)及详细描述如下: 在这里插入图片描述

3.2.2 step2:封装配置

【Step2】:有了入口参数,会先校验,然后统一封装好成Configuration,传给下一步使用。

对应的代码(com.alibaba.datax.core.Engine#entry)及详细描述如下: 在这里插入图片描述

3.2.3 step3:初始化并启动容器

【Step3】:根据配置初始化不同的容器并启动,目前只有两种容器,分别为JobContainer(作业任务容器)、TaskGroupContainer(分组任务容器),一般都是创建一个作业,先进JobContainer

对应的代码(com.alibaba.datax.core.Engine#start)及详细描述如下: 在这里插入图片描述

3.2.4 step4:JobContainer运行内容

【Step4】:在JobContainer里面,会根据配置使用类加载器加载readerwriter的实例对象,然后出发加载对象的生命周期方法,如init()prepare()post()等;

对应代码(com.alibaba.datax.core.job.JobContainer#start)及详细描述如下: 在这里插入图片描述

3.2.5 step5:调度JobContainer

【Step5】:JobContainer执行流程里,有一个schedule方法,主要是为了切割任务组,并初始化到TaskContainer,调度使用;

对应代码(com.alibaba.datax.core.job.JobContainer#start),内容如step4的图片,具体对应第119行代码:

this.schedule();
3.2.6 step6:JobContainer详解

【Step6】:TaskContainer主要是对任务进行初始化、并控制执行的顺序(如:reader -> transfromer -> writer

对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer#start)及详细描述如下: 在这里插入图片描述 再来看看TaskExecutor是怎么初始化的,对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#TaskExecutor)具体详情如下: 在这里插入图片描述 ok,到这里可以看看doStart()运行的方法,对应代码(com.alibaba.datax.core.taskgroup.TaskGroupContainer.TaskExecutor#doStart),详情如下: 在这里插入图片描述

3.2.7 step7:业务实现

【Step7】:最后,具体的业务实现,就在Reader(即:E)、Transformer(即:T)、Writer(即:L)里面实现。

那么,经过前几个步骤的操作,最终哪里实现业务呢?DataX的例子是使用了StreamReaderStreamWriter的,我们Stream插件里的代码:

① E:对应StreamReader,详情如下: 在这里插入图片描述 ② L:对应StreamWriter,详情如下: 在这里插入图片描述

这个时候,可能很多小伙伴都会提出,还有个T(即Transformer)呢?这里拿个简单的来看看,即ReplaceTransformer(com.alibaba.datax.core.transport.transformer.ReplaceTransformer),代码内容如下: 在这里插入图片描述 好奇宝贝肯定会问,这里的evaluate()方法什么时候调用?我们一直逆向查询调用,会发现调用的流程如下:

Transformer.evaluate() 

最后处理发送到writer前的业务操作(`Transformer.evaluate() `)
04 几个关键的类

到了这一步,我们大概知道了DataX的整个运行流程机制了,那么有一些工具类在这里还是值得说一下的。

4.1 LoadUtil 插件加载工具

描述:这一个工具类是一个插件加载器,大体上分readertransformer(还未实现)和writer三中插件类型,readerwriter在执行时又可能出现JobTask两种运行时(加载的类不同)。

Ctrl+O看看有哪些方法,这里就不再描述里面的每一个方法了,知道怎么用就可以了: 在这里插入图片描述

4.2 ClassLoaderSwapper 类加载器管理者

描述:这个类主要配合LoadUtil使用,主要管理不同线程(即:Task任务)的类加载器。

LoadUtil配合使用的方式(这里说的是线程,但是官方文档说开了个进程,感觉不太合适):

// 初始化ClassLoaderSwapper
private ClassLoaderSwapper classLoaderSwapper 
= ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();

// 注入插件到ClassLoaderSwapper
classLoaderSwapper.setCurrentThreadClassLoader(
	LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName)
);

Ctrl+O看看ClassLoaderSwapper有什么方法,这里看注释,不再描述了: 在这里插入图片描述

4.3 ErrorRecordChecker 错误记录检查者

描述: 检查任务是否到达错误记录限制。有检查条数(recordLimit)和百分比(percentageLimit)两种方式。

  1. errorRecord表示出错条数不能大于限制数,当超过时任务失败。比如errorRecord为0表示不容许任何脏数据。
  2. errorPercentage表示出错比例,在任务结束时校验。
  3. errorRecord优先级高于errorPercentage。

Ctrl+O看看ClassLoaderSwapper有什么方法: 在这里插入图片描述

4.4 Communication 所有的状态及信息统计

描述:主要记录DataX所有的状态及统计信息交互,jobtaskGrouptask等的消息汇报都走该类。

Ctrl+O看看Communication有什么方法(这里方法比较多,其实就是一些DataX的一些运行状态及信息统计): 在这里插入图片描述

4.5 AbstractScheduler 调度器

描述:主要就是根据job配置来调度TaskGroupContainer

Ctrl+O看看AbstractScheduler有什么方法: 在这里插入图片描述

05 文末

本文是对DataX源码解读的一篇分析文章,如果文中有理解错误的,欢迎童鞋们指出,本文完!

原创不易,最后需要声明:

  • 本文原创作者:阿甘兄
  • 作者博客地址:https://yanglinwei.blog.csdn.net/

需要转载的请务必联系我本人!

关注
打赏
1662376985
查看更多评论
立即登录/注册

微信扫码登录

0.0783s