- 01 引言
- 02 监控功能
- 2.1 ErrorRecordChecker
- 2.2 ErrorRecordChecker源码
- 2.3 ErrorRecordChecker检查时机
- 03 汇报功能
- 3.1 汇报运行流程
- 3.2 汇报的运行流程
- 3.2.1 汇报的几个角色
- 3.2.2 汇报的流程
- 3.3 什么时候写信息内容
- 3.4 Channel通讯信息接收
- 04 文末
通过前面的博文,我们对DataX
有了一定的深入了解了:
- 《DataX教程(01)- 入门》
- 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》
- 《DataX教程(03)- 源码解读(超详细版)
- 《DataX教程(04)- 配置完整解读》
- 《DataX教程(05)- DataX Web项目实践》
- 《DataX教程(06)- DataX调优》
- 《DataX教程(07)- 图解DataX任务分配及执行流程》
本文主要讲解DataX
的监控与汇报功能。
在JobContainer
类里面,可以看到引用了一个类ErrorRecordChecker
,它在JobContainer
初始化的时候做了初始操作。 ErrorChecker是一个监控类,主要用来检查任务是否到达错误记录限制。有检查条数(
recordLimit
)和百分比(percentageLimit
)两种方式:
errorRecord
表示出错条数不能大于限制数,当超过时任务失败。比如errorRecord
为0表示不容许任何脏数据;errorPercentage
表示出错比例,在任务结束时校验;errorRecord
优先级高于errorPercentage
。
Control+O
可以看到ErrorRecordChecker
,有如下几个方法: 这里主要做简要描述,
① 构造函数ErrorRecordChecker(Configuration configuration)
:主要就是从任务配置文件job.json
里面获取errorLimit.record
错误记录数限制及errorLimit.percentage
错误记录百分比的值:
public ErrorRecordChecker(Configuration configuration) {
this(configuration.getLong(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_RECORD),
configuration.getDouble(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_PERCENT));
}
② 检查错误记录数限制checkRecordLimit(Communication communication)
:主要就是从communication
里获取总共的错误记录数,然后判断是否超出配置的值,如果是,则抛出异常
public void checkRecordLimit(Communication communication) {
if (recordLimit == null) {
return;
}
long errorNumber = CommunicationTool.getTotalErrorRecords(communication);
if (recordLimit 0 && ((double) error / (double) total) > percentageLimit) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_DIRTY_DATA_LIMIT_EXCEED,
String.format("脏数据百分比检查不通过,限制是[%f],但实际上捕获到[%f].",
percentageLimit, ((double) error / (double) total)));
}
}
好了,这里就讲完了ErrorRecordChecker
的功能了,注意check
方法里面有一个Communication
类,这是一个通讯类,主要用来保存当前任务的状态信息的,接下来也会讲解。
Control
点击可以看到ErrorRecordChecker
被JobContainer
调用(初始化,前面已讲),以及在AbstractScheduler
任务任务调度schedule方法
执行的时候调用了。 再看看
check
方法在哪里调用了,经过追踪,可以分析得出:
- 在
JobContainer
的schedule
方法结束后会调用,检查整个任务的错误记录数 - 在
AbstractScheduler
的schedule
方法,里面开了一个while
死循环,不断去采集任务的状态,检查的时间间隔配置(core.container.job.sleepInterval
)在core.json里面的job.sleepInterval里配置。
最后贴下,AbstractScheduler的schedule方法实现实时采集的代码:
while (true) {
/**
* step 1: collect job stat
* step 2: getReport info, then report it
* step 3: errorLimit do check
* step 4: dealSucceedStat();
* step 5: dealKillingStat();
* step 6: dealFailedStat();
* step 7: refresh last job stat, and then sleep for next while
*
* above steps, some ones should report info to DS
*
*/
Communication nowJobContainerCommunication = this.containerCommunicator.collect();
nowJobContainerCommunication.setTimestamp(System.currentTimeMillis());
LOG.debug(nowJobContainerCommunication.toString());
//汇报周期
long now = System.currentTimeMillis();
if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {
Communication reportCommunication = CommunicationTool
.getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);
this.containerCommunicator.report(reportCommunication);
lastReportTimeStamp = now;
lastJobContainerCommunication = nowJobContainerCommunication;
}
errorLimit.checkRecordLimit(nowJobContainerCommunication);
if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {
LOG.info("Scheduler accomplished all tasks.");
break;
}
if (isJobKilling(this.getJobId())) {
dealKillingStat(this.containerCommunicator, totalTasks);
} else if (nowJobContainerCommunication.getState() == State.FAILED) {
dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());
}
Thread.sleep(jobSleepIntervalInMillSec);
}
03 汇报功能
3.1 汇报运行流程
友情提示:可能图片较大,建议下载下来使用图片编辑器查看。
首先贴上一张图,里面描述的是Scheduler
调度器与ErrorRecordChecker
错误检查器及Communicator
通讯者的整个调用关系,从上往下看:
汇报主要有几个重要的角色:
AbstractCommunicator
通讯者抽象类:主要用来做通讯的协调;Communication
通讯的信息载体:主要用来存放通讯过程中产生的信息,为单例;LocalTGCommunicationManager
通讯信息载体工厂:根据任务id来获取通讯信息载体单例的工厂;CommunicationTool
信息载体工具类:此工具类是通讯业务层的处理,主要用来收集当前信息,并写入到Communication
通讯的信息载体;AbstractReporter
信息上报:用来上报通讯信息。
简要的流程描述:
- 首先根据配置
new
一个通讯者对象,有两种,分别为“StandAloneJobContainerCommunicator
”、“StandAloneTGContainerCommunicator
”,生成后,注入进Scheduler
调度者,此时,Scheduler
就有了一个Communicator
工具了; - 通讯者
Communicator
使用collect
方法生成通讯的载体,也就是Communication
,用来存放任务的相关信息,ErrorRecorder
就是从这个Communication
里获取当前任务的信息的; Scheduler
调度器类里面,使用Communicator
通讯工具的collect
方法来获取communication
通讯载体单例(获取单例方法在LocalTGCommunicationManager
类,里面定义了Map
,key
为任务id
,value
为Communication
通讯载体);Scheduler
获取到Communication
通讯载体后,使用CommunicationTool
工具类把当前任务的状态信息写入;- 最后使用
reporter
来上报Communication
信息。
前面的3.1
和3.2
只做到了通讯类Communicator
和通讯信息载体Communication
的初始化,以及上报的流程,但是没有针对到哪里写入内容到Communication
?这里直接看写入信息到Communication
的地方,核心内容在TaskGroupContainer里面,下面来看看:
①首先根据任务id
获取Communication
的代码地方,在内部类TaskExecutor
构造函数的地方: ②把
Communication
注入进Channel
通道类,Channel
通道类主要做内容的记录(核心:统计和限速都在这里): ③
Channel
注入进了BufferedRecordExchanger
或BufferedRecordTransformerExchanger
, 而这连个Exchanger
主要是为了记录RecordSender
记录发送者、RecordReceiver
记录接收者、TransformerExchanger
的内容,就是记录ETL这3个模块里面的内容
根据流程,可以看到Channel
类使用来收集ETL
的信息的,那么看看Channel
这个类的一些核心方法。
Channel
类有很多的方法,Control+O
可以看到: 举个例子,可以看看
Channel
的push(final Record r)
方法:
public void push(final Record r) {
Validate.notNull(r, "record不能为空.");
this.doPush(r);
this.statPush(1L, r.getByteSize());
}
进入statPush
方法:
private void statPush(long recordSize, long byteSize) {
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
//在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
if (isChannelByteSpeedLimit) {
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// 计算根据byteLimit得到的休眠时间
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// 计算根据recordLimit得到的休眠时间
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// 休眠时间取较大值
long sleepTime = byteLimitSleepTime 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
lastCommunication.setTimestamp(nowTimestamp);
}
}
可以看到把内容都设置进Communication
信息载体了,这里还有其它的方法如pushAll
等。大家Control鼠标点一下就能trace
整个调用链了,其实就是不同的插件调用触发Exchanger
方法,然后在Exchanger
里面调用Channel
的方法来记录到Communication
信息载体。
好了,到此把DataX
的监控与汇报功能讲解完毕了,有疑问的童鞋欢迎留言,谢谢大家的阅读,本文完!