本文中源码来自xxl-job 2.4.0版本
当我们花费很多精力和时间,学习一个工具、组件的源码时,我们可以从中收获什么?编程技巧、框架使用、架构思维等等。每个人都有不同的体会。
从写下第一篇关于xxl-job的文章,到现在已有10多篇了。 细节终会模糊,思维方可长存。今天我们就从大的方面入手,想想xxl-job到底教会了我们什么。
一 调度中心HA设计
xxl-job-admin可以集群部署,保证高可用。但是,这个集群中各个节点地位平等,无主节点。那如何防止并发调度呢?
1.1 数据库表锁
在调度类JobScheduleHelper中,使用Mysql行锁(select * from XXX for update),来避免调度平台并发执行。核心代码如下:
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
// 关闭自动提交
conn.setAutoCommit(false);
// 加表行锁,防止集群下并发
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// 扫描任务表,执行调度
} catch (Exception e) {
} finally {
// commit
if (conn != null) {
try {
conn.commit();
conn.setAutoCommit(connAutoCommit);
}
}
}
xxl_job_lock表仅有一个字段、一条记录,且没有对这条记录的DML操作。
当调度中心节点A加锁成功后,查询待执行任务并修改trigger_next_time值,然后commit事务释放锁。在这个过程中,节点B尝试加锁时将发生阻塞,直到A释放锁,B才能继续执行,A已处理过的任务,不会被B重复触发。
1.2 DB行锁的不足
1.2.1 连接中断或事务未提交
调度中心某个节点获得表行锁,还没来得及commit事务就宕机了。 虽然Mysql最终会回滚事务,但是需要等待net_write_timeout时间(默认60秒),或者等行锁超时释放。此期间其他节点将一直阻塞,任务不能被正常触发。
查询数据库在写入数据时允许的最长时间,单位秒:
SHOW VARIABLES like ‘net_write_timeout’;
查询数据库的行锁超时时间,单位秒:
SHOW VARIABLES like ‘innodb_lock_wait_timeout’;
1.2.2 多节点连续扫描任务
调度中心的一次执行,会捞取5分钟内即将到期的任务。因此,每5分钟执行一次即可;源码中也是如此实现。
但是,当调度中心多节点部署,节点A获得行锁处理中,节点B阻塞在获取锁处。假设A耗时2500秒处理完成并释放锁。此时B获得锁再次处理。可见,在5分钟内触发了两次对任务的扫描。
当然,频繁扫表也不是什么大问题。
二 多种路由策略实现
在服务集群部署时,为了使每个节点承担相近的任务量,负载均衡策略必不可少。在xxl-job中,提供了多种路由策略,定义了任务在执行器集群中的分发规则。
xxl-job-admin中有抽象类ExecutorRouter,声明了route方法,并提供了很多实现。
public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);
其中关于忙碌转移、失效转移、LFU、LRU的实现,都是可借鉴的优秀方案。
如失效转移的实现ExecutorRouteFailover:
- 遍历执行器地址列表,主动发送心跳检查,返回第一个在线的节点。
如忙碌转移的实现ExecutorRouteBusyover:
- 需要执行器自身记录对某类任务的执行状态,执行任务中或有待执行任务,即为忙碌,否则为空闲;
- 调度中心能够主动问询执行器的忙碌状态。
还有ExecutorRouteConsistentHash类对一致性哈希算法的实现,从中我们能学到:
- 优化string的hashCode取值,扩大到2^32范围,减少哈希冲突;
- 如何对真实节点创建virtual node,解决分布不均衡问题;
- 如何使用TreeMap实现哈希环。
三 时间轮算法实现
在xxl-job-admin的JobScheduleHelper类中,周期性查询将在trigger_next_time< now + 5000的任务(即小于未来5秒时间点),分三种情况:
- 已过点5秒的任务,按MisfireStrategyEnum策略,可DO_NOTHING或FIRE_ONCE_NOW;
- 过点不超过5秒的任务,会立即触发一次;
- 还没到点的任务,放入时间轮,等待被触发。
时间轮模型如下:
xxl-job中对时间轮做了实现,也值得借鉴。
// key是秒数(取值0-59),value是jobId集合,时间轮
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
四 任务异步调度
当任务很多时,为了避免调度延迟,调度中心采用线程池并行调度,被阻塞的几率很低。
调度中心每次任务触发时,仅异步发送一次调度请求,执行器先将请求存入“异步执行队列”,立即响应调度中心。
因此,调度过程耗时极短,基本为一次请求的网络开销,实现使用有限的线程支撑大量的Job并发调度。
4.1 快慢线程池
在JobTriggerPoolHelper类中,提供了fast/slow两个调度线程池。
线程池 | corePoolSize | maximumPoolSize | keepAliveTime | workQueue size |
---|---|---|---|---|
fastTriggerPool | 10 | 最小为200,需配置 | 60秒 | 1000 |
slowTriggerPool | 10 | 最小为100,需配置 | 60秒 | 2000 |
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
可见,fastTriggerPool与slowTriggerPool相比,可以有更多的救急线程,任务队列尺寸更小。
调度某个job时,默认使用fastTriggerPool;如果1分钟窗口期内调度耗时达500ms超过10次,该窗口期内判定为慢任务,将自动降级进入”Slow”线程池。避免耗尽调度线 程,提高系统稳定性。
// key是jobId,value是超时计数
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
五 任务异步执行与异步回调
5.1 异步执行
当执行器收到任务调度时,会先将任务放入队列,响应调度成功。因此调度中心执行一次调度的耗时极短。
对每个job,执行器都会分配一个JobThread,实现了不同任务间线程池隔离。
private int jobId;
// 任务体
private IJobHandler handler;
// 任务队列
private LinkedBlockingQueue<TriggerParam> triggerQueue;
JobThread的run方法(简化逻辑如下),会持续从triggerQueue中拉取任务,如果能拉取到则执行,将idleTimes重置为0;否则对idleTimes计数加1。
当连续30次未能拉取到任务时(idleTimes > 30),就回收当前JobThread,释放线程资源。之后有新任务要执行时,再次新建一个JobThread即可。
public void run() {
// 只被线程执行一次
handler.init();
// 消费任务队列
while(!toStop){
running = false;
idleTimes++;
TriggerParam triggerParam = null;
try {
// 非阻塞的,如果没有则返回null
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
running = true;
// 执行任务体
handler.execute()
} else {
// while连续空转30次,即90秒,终止jobThread
if (idleTimes > 30) {
if(triggerQueue.size() == 0) {
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
}
}
// 只被线程执行一次
handler.destroy();
// 线程退出
}
IJobHandler的init()、destroy(),只会被同一个JobThread执行一次。
5.2 异步回调
待任务执行完成后,将结果放入回调队列,由回调线程批量异步推送给调度中心。
在TriggerCallbackThread类中,有callBackQueue和两个线程。
// 容量为Integer.MAX_VALUE,相当于无界
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
// 回调线程
private Thread triggerCallbackThread;
// 失败重试线程
private Thread triggerRetryCallbackThread;
回调线程triggerCallbackThread持续消费结果队列,通过AdminBiz.callback()接口,批量推送执行结果给调度中心。
如果回调失败,会将这些消息保存到名为“xxl-job-callback-时间戳.log”的文件,由triggerRetryCallbackThread再次重试回调。
六 总结
调度平台采用HA( high availability)设计,通过数据库锁的简单方案,避免了重复调度问题;又无需引入Zookeeper来做主节点选举。保证系统”轻量级”并且降低学习部署成本。
xxl-job使用全异步化设计,源码中大量使用异步编程:任务触发、执行、结果回调等,都是异步实现。从而提供了更好的性能、更多的特性。如:
- 避免大量任务下的调度延迟;
- 支持多种任务阻塞处理策略、任务超时;
- 结果回调失败重试;
- 不同任务间的线程隔离;