对xxl-job架构的一点思考 二

本文中源码来自xxl-job 2.4.0版本

当我们花费很多精力和时间,学习一个工具、组件的源码时,我们可以从中收获什么?编程技巧、框架使用、架构思维等等。每个人都有不同的体会。

从写下第一篇关于xxl-job的文章,到现在已有10多篇了。 细节终会模糊,思维方可长存。今天我们就从大的方面入手,想想xxl-job到底教会了我们什么。

一 调度中心HA设计

xxl-job-admin可以集群部署,保证高可用。但是,这个集群中各个节点地位平等,无主节点。那如何防止并发调度呢?

1.1 数据库表锁

在调度类JobScheduleHelper中,使用Mysql行锁(select * from XXX for update),来避免调度平台并发执行。核心代码如下:

	try {

		conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
		connAutoCommit = conn.getAutoCommit();
        // 关闭自动提交
		conn.setAutoCommit(false);

		// 加表行锁,防止集群下并发
		preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
		preparedStatement.execute();

        // 扫描任务表,执行调度
	} catch (Exception e) {
	} finally {
		// commit
		if (conn != null) {
			try {
				conn.commit();
				conn.setAutoCommit(connAutoCommit);
			}
		}
	}

xxl_job_lock表仅有一个字段、一条记录,且没有对这条记录的DML操作。 当调度中心节点A加锁成功后,查询待执行任务并修改trigger_next_time值,然后commit事务释放锁。在这个过程中,节点B尝试加锁时将发生阻塞,直到A释放锁,B才能继续执行,A已处理过的任务,不会被B重复触发。

1.2 DB行锁的不足

1.2.1 连接中断或事务未提交

调度中心某个节点获得表行锁,还没来得及commit事务就宕机了。 虽然Mysql最终会回滚事务,但是需要等待net_write_timeout时间(默认60秒),或者等行锁超时释放。此期间其他节点将一直阻塞,任务不能被正常触发。

查询数据库在写入数据时允许的最长时间,单位秒:

SHOW VARIABLES like ‘net_write_timeout’;

查询数据库的行锁超时时间,单位秒:

SHOW VARIABLES like ‘innodb_lock_wait_timeout’;

1.2.2 多节点连续扫描任务

调度中心的一次执行,会捞取5分钟内即将到期的任务。因此,每5分钟执行一次即可;源码中也是如此实现。

但是,当调度中心多节点部署,节点A获得行锁处理中,节点B阻塞在获取锁处。假设A耗时2500秒处理完成并释放锁。此时B获得锁再次处理。可见,在5分钟内触发了两次对任务的扫描。

当然,频繁扫表也不是什么大问题。

二 多种路由策略实现

在服务集群部署时,为了使每个节点承担相近的任务量,负载均衡策略必不可少。在xxl-job中,提供了多种路由策略,定义了任务在执行器集群中的分发规则。

xxl-job-admin中有抽象类ExecutorRouter,声明了route方法,并提供了很多实现。

public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);

其中关于忙碌转移、失效转移、LFU、LRU的实现,都是可借鉴的优秀方案。

如失效转移的实现ExecutorRouteFailover:

  • 遍历执行器地址列表,主动发送心跳检查,返回第一个在线的节点。

如忙碌转移的实现ExecutorRouteBusyover:

  • 需要执行器自身记录对某类任务的执行状态,执行任务中或有待执行任务,即为忙碌,否则为空闲;
  • 调度中心能够主动问询执行器的忙碌状态。

还有ExecutorRouteConsistentHash类对一致性哈希算法的实现,从中我们能学到:

  1. 优化string的hashCode取值,扩大到2^32范围,减少哈希冲突;
  2. 如何对真实节点创建virtual node,解决分布不均衡问题;
  3. 如何使用TreeMap实现哈希环。

三 时间轮算法实现

在xxl-job-admin的JobScheduleHelper类中,周期性查询将在trigger_next_time< now + 5000的任务(即小于未来5秒时间点),分三种情况:

  1. 已过点5秒的任务,按MisfireStrategyEnum策略,可DO_NOTHING或FIRE_ONCE_NOW;
  2. 过点不超过5秒的任务,会立即触发一次;
  3. 还没到点的任务,放入时间轮,等待被触发。

时间轮模型如下:

xxl-job中对时间轮做了实现,也值得借鉴。

// key是秒数(取值0-59),value是jobId集合,时间轮
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

四 任务异步调度

当任务很多时,为了避免调度延迟,调度中心采用线程池并行调度,被阻塞的几率很低。

调度中心每次任务触发时,仅异步发送一次调度请求,执行器先将请求存入“异步执行队列”,立即响应调度中心。

因此,调度过程耗时极短,基本为一次请求的网络开销,实现使用有限的线程支撑大量的Job并发调度。

4.1 快慢线程池

在JobTriggerPoolHelper类中,提供了fast/slow两个调度线程池。

线程池 corePoolSize maximumPoolSize keepAliveTime workQueue size
fastTriggerPool 10 最小为200,需配置 60秒 1000
slowTriggerPool 10 最小为100,需配置 60秒 2000

xxl.job.triggerpool.fast.max=200

xxl.job.triggerpool.slow.max=100

可见,fastTriggerPool与slowTriggerPool相比,可以有更多的救急线程,任务队列尺寸更小。

调度某个job时,默认使用fastTriggerPool;如果1分钟窗口期内调度耗时达500ms超过10次,该窗口期内判定为慢任务,将自动降级进入”Slow”线程池。避免耗尽调度线 程,提高系统稳定性。

// key是jobId,value是超时计数
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();

五 任务异步执行与异步回调

5.1 异步执行

当执行器收到任务调度时,会先将任务放入队列,响应调度成功。因此调度中心执行一次调度的耗时极短。

对每个job,执行器都会分配一个JobThread,实现了不同任务间线程池隔离。

private int jobId;
// 任务体
private IJobHandler handler;
// 任务队列
private LinkedBlockingQueue<TriggerParam> triggerQueue;

JobThread的run方法(简化逻辑如下),会持续从triggerQueue中拉取任务,如果能拉取到则执行,将idleTimes重置为0;否则对idleTimes计数加1。

当连续30次未能拉取到任务时(idleTimes > 30),就回收当前JobThread,释放线程资源。之后有新任务要执行时,再次新建一个JobThread即可。

public void run() {

	// 只被线程执行一次
	handler.init();

	// 消费任务队列
	while(!toStop){
		running = false;
		idleTimes++;

		TriggerParam triggerParam = null;
		try {
			// 非阻塞的,如果没有则返回null
			triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
			if (triggerParam!=null) {
				running = true;
				// 执行任务体
				handler.execute()
			} else {
				// while连续空转30次,即90秒,终止jobThread
				if (idleTimes > 30) {
					if(triggerQueue.size() == 0) {
						XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
					}
				}
			}
		}
	}

	// 只被线程执行一次
	handler.destroy();
  // 线程退出
}

IJobHandler的init()、destroy(),只会被同一个JobThread执行一次。

5.2 异步回调

待任务执行完成后,将结果放入回调队列,由回调线程批量异步推送给调度中心。

在TriggerCallbackThread类中,有callBackQueue和两个线程。

// 容量为Integer.MAX_VALUE,相当于无界
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();

// 回调线程
private Thread triggerCallbackThread;
// 失败重试线程
private Thread triggerRetryCallbackThread;

回调线程triggerCallbackThread持续消费结果队列,通过AdminBiz.callback()接口,批量推送执行结果给调度中心。

如果回调失败,会将这些消息保存到名为“xxl-job-callback-时间戳.log”的文件,由triggerRetryCallbackThread再次重试回调。

六 总结

调度平台采用HA( high availability)设计,通过数据库锁的简单方案,避免了重复调度问题;又无需引入Zookeeper来做主节点选举。保证系统”轻量级”并且降低学习部署成本。

xxl-job使用全异步化设计,源码中大量使用异步编程:任务触发、执行、结果回调等,都是异步实现。从而提供了更好的性能、更多的特性。如:

  1. 避免大量任务下的调度延迟;
  2. 支持多种任务阻塞处理策略、任务超时;
  3. 结果回调失败重试;
  4. 不同任务间的线程隔离;

原文地址:https://juejin.cn/post/7454535223176511551