超强总结:搞定最终一致性的4个方案,有一个少有人用过,你都知道吗?

很多人认为电商场景对于一致性要求非常高。一定有很多高深技术保证强一致性。这是一种错误的理解,大多数电商业务场景只需要保证最终一致性,往往不需要强一致性。而保证最终一致性的最重要方法就是重试。(电商业务往往要求系统最终严格一致……即最终一致性 + 较强的可靠性)

业务系统中保证最终一致性的主要方式是不断地进行重试。大部分系统的问题都是由于超时导致的,而重试通常可以解决大多数问题。

下面列举 3 个场景——通过重试解决最终一致性。

  1. 会员订单履约:系统给用户发放优惠券失败时,可以通过重试(采用幂等性操作)来保证订单履约最终成功。(用户已经支付成功,难道 1 次履约超时,就给用户退钱吗?)

  2. 库存回滚: 当订单退款成功后,需要回滚库存。如果库存回滚超时,可以通过重试来保证最终回滚成功(幂等)。

  3. 库存回滚: 当订单扣减库存成功后,提单后续流程失败,需要回滚库存。在这种情况下,系统不需要等待库存回滚成功后再返回用户提单失败的结果,而是可以采用异步回滚库存的方式,如果超时,则不断进行重试。

  4. 会员订单退款:先将用户的虚拟资产退回,然后调用订单退款API出现超时。在这种情况下,不需要回滚虚拟资产的状态到正常状态,只需要重试订单退款API(一定要幂等)即可。

接下来我将详细介绍四种非常通用的重试方案。

1. Spring Retryable 框架

Spring 提供 Retryable 注解提供重试能力,通过 Retryable 注解声明的 Aop 切面,在切面抛出异常后触发重试策略,通过重试保证系统的最终一致性。

1.1 使用示例

@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 100, multiplier = 2))
public int handle(int userId, String orderId) {
   //do something

   return 1;//返回结果
}

@Recover
public int recover(Exception e) {
   //最终失败的处理。一般是打印日志或者上报异常
   return 1;
}

1.2 使用方式

Retryable 配置在失败后需要重试的方法上。

  • value: 指定捕获哪类异常进行重试
  • maxAttempts: 指定最大重试次数。
  • backoff: 指定重试策略,其中
    • delay: 指定了延迟多久重试
    • multiplier: 指定了两次重试的间隔时间的乘积。例如为 2,第一次重试前间隔 100毫秒,第二次重试间隔 200毫秒,第三次间隔 400毫秒。

1.3 最终失败的处理逻辑放在 Recover 中

通过 Retryable 重试超过最大重试次数后,如果依然失败,则会调用被 Recover 修饰的方法。Recover 中不再适合再次重试,因为已经重试多次后依然失败,再重试也没意义,Recover 中上报异常到其他系统或记录到日志中心。稍后需要人工介入,排查问题。

需要注意的是无需在 Retryable 注解中声明 Recover 方法。Recover 方法只需要和 Retryable 方法在同一个类中,方法声明为 public,并且还需要满足保持返回值一致、参数列表一致。

  • 参数列表中第一个是需捕获的异常,后续的其他参数需要和重试函数的参数列表保持一致。(不一致Recover 注解会失效)
  • Recover 方法和重试方法的返回值应该保持一致。

1.4配置方式

1.4.1 pom 依赖

<dependency>
  <groupId>org.springframework.retry</groupId>
  <artifactId>spring-retry</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

1.4.2 开启重试

@EnableRetry
@SpringBootApplication
public class AppStarter {

    public static void main(String[] args) {
        SpringApplication.run(AppStarter.class, args);
    }

}

1.5 异步重试

如果需要异步重试,可以通过在重试方法中添加 @Async注解,Spring 会自动将目标方法封装为 Task 提交到线程池中执行。

2. MQ 重试

Spring 重试可以解决大部分的重试问题,然而遇到复杂的业务场景,例如订单履约系统的重试间隔非常长,例如重试间隔从 1 秒重试到 10 分钟,如此长期的间隔时间要求系统具备较高的可靠性。如果通过 Spring 单机式的重试,当遇到服务发布时,很容易阻断重试过程。为此需要想到其他办法保证长期重试的可靠性。

方法异常后,将重试的内容发送到MQ 中,通过消费 MQ 消息实现重试能力,是非常可靠的办法。需要注意的是 MQ 需要具备延迟消息能力,因为上一次重试失败后,需要间隔一段时间才能再次重试,这要求重试消息应该是延迟类消息。

重试消息中应该包括重试次数,每重试失败一次,发送重试消息时,应该递增重试次数。当消费重试消息时,超过最大重试次数应该终止重试,并且设置合理的策略记录重试消息,人工介入排查问题。

2.1 通用的 MQ 重试组件

MQ 重试能力是一个通用性的需求,目前 Spring 只有单机版的 Retryable 重试能力。各位读者可自行根据公司的现状,开发一个通用性的 MQ 重试工具,体现个人的技术实力。我在此仅仅提供想法和关键代码。

2.2 定义注解

定义的注解内容和 Spring Retryable注解极其相似,提供了如下能力

  • 可自定义要捕获的异常类型
  • 可配置延迟重试、重试的间隔时间、重试间隔的递增倍数。
  • 可配置 重试最终失败的 fallback 方法。
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MqRetry {
    Class<? extends Throwable>[] value() default {Exception.class};
    int delayMills() default 500;
    int maxAttempts() default 5;
    int multiplier() default 1;
    String fallbackMethodName();
}

2.3 定义异常捕获的切面

@Pointcut("@annotation(com.xxx.Retry)")
public void retry() {
}

2.4 切面的拦截逻辑

@Around("retry()")
public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
   try {
      return joinPoint.proceed();
   } catch (Throwable throwable) {
      MethodSignature signature = (MethodSignature) joinPoint.getSignature();
      Method method = signature.getMethod();
      Retry retry = method.getAnnotation(Retry.class);
      Class<? extends Throwable>[] throwables = retry.value();

      boolean retryEnable = false;
      //检查捕获的异常是否需要重试
      for (Class<? extends Throwable> throwableClass : throwables) {
         if (throwableClass.isInstance(throwable)) {
            retryEnable = true;
         }
      }
      //不需要重试,则抛出异常。
      if (!retryEnable) {
         throw throwable;
      }

      //上报重试消息
      sendRetryMessage(joinPoint, retry);

      //处理各种类型返回值。建议重试消息返回值为 Void
      return Defaults.defaultValue(method.getReturnType());
   }
}

2.5 上报重试消息

重试消息关键内容如下

  • Bean ClassName。要求Retry 方法所在类必须是 Spring 的 Bean,故障消息只记录了 Bean 的类型名,在消费逻辑中,系统需要通过 Bean ClassName 从 Spring容器中获取 Bean。
joinPoint.getTarget().getClass().getName(); 获取 Bean 的 ClassName
applicationContext.getBean(ClassUtils.getClass(className)); 根据 className 获取 Bean

  • methodName。 重试方法名。 joinPoint.getSignature().getName()
  • 参数类型的列表。获取各个参数类型的全类名。
Class<?>[] parameterClazzs = ((MethodSignature) joinPoint.getSignature()).getMethod().getParameterTypes();
List<String> parameterClassNames = new ArrayList<>(parameterClazzs.length);
for (Class<?> parameterClass : parameterClazzs) {
    parameterClassNames.add(parameterClass.getName());
}

  • 参数值列表。各个参数JSON 序列化
List<String> argumentList = new ArrayList<>(joinPoint.getArgs().length);
for (Object argument : joinPoint.getArgs()) {
    argumentList.add(JSON.toJSONString(argument));
}

以上是重试消息格式和构造方式。各位读者可根据公司内的实际情况,自行编写发消息代码。

值得一提的是:发送消息需要指定微服务名称吗?每个微服务可能都需要这个能力, MQ 重试框架是通用性的诉求。所有的服务都放到一个共用的Topic,还是每个服务有专属的 Topic 呢?

这两个方式各有优劣。前者有优点,由于共用 Topic,新的服务接入时,无需申请新的 Topic,缺点是:消费时各服务需要过滤其他服务的消息,各个服务之间互相影响,如果重试消息量级过大,对每个服务的压力都很大。 如果使用后者:每个服务有专属的 topic,优劣恰好相反。读者可自行决策。

2.5 重试消息的消费逻辑

重试消息发送到 MQ 后,当达到延迟时间后,消费者可以消费消息,实现重试。下面放一些关键代码(只包含了重试的关键代码,没有MQ 消费订阅代码)

  • 获取 Bean

applicationContext.getBean(ClassUtils.getClass(className));

  • 获取要重试的方法
Class<?>[] parameterClazzs = null;
if (CollectionUtils.isNotEmpty(message.getParameterClazzs())) {
    parameterTypes = new Class[message.getParameterClazzs().size()];
    for(int i = 0; i < message.getParameterClazzs().size(); ++i) {
        parameterClazzs[i] = ClassUtils.getClass(message.getParameterClazzs().get(i));
    }
}
// 获取方法
Method method = ReflectionUtils.findMethod(bean, message.getMethodName(), parameterClazzs);

  • 获取参数列表
Object[] arguments = null;
if (CollectionUtils.isNotEmpty(message.argumentList())) {
    arguments = new Object[message.getParameterClazzs().size()];

    for(int i = 0; i < parameterClazzs.length; ++i) {
        arguments[i] = JSONObject.parseObject(message.argumentList().get(i), parameterClazzs[i]);
    }
}

  • 调用 方法

method.invoke(bean, arguments);

以上是 通用 MQ 重试组件的关键代码,还有关于 fallback 方法的部分,各位读者可以自行脑补代码实现。

以上 Spring Retryable 和 MQ 重试组件都有一个共同的痛点————重试失败后没有很好的解决办法。 这个问题曾经一直困扰我,因为最终失败时我能做的只有 打印异常日志、上报异常消息到公司故障群,稍后人工介入处理异常场景。

从我的经验来看,重试失败的频率并不高,但是每次出现都需要人工介入处理真的很烦人。处理问题往往很棘手,需要在线上手动执行一些命令,是比较危险的人肉运维工作。

所以我一直想做一个通用的故障管理平台,提供可视化页面进行故障的修复能力。下图是我在公司做的故障修复后台,故障上报后,我可以收到通知,在后台页面发起重试或者回滚等动作。

image.png

3. 故障修复平台

3.1 故障可视化管理

提供可视化页面处理故障,可以有效提高安全性,避免人肉运维工作。解决思路是系统故障发生后,统一上报到故障管理平台。然后故障管理平台提供可视化页面展示故障列表,故障详情。在可视化页面提供修复工具。一般的修复工具是继续重试,如果有需求回滚故障,也可以提供回滚工具。

系统的架构图如下

image.png

系统交互共存在三个角色 业务系统、故障管理后台、故障管理后台页面。一个故障的处理流程是这样的。

当请求一直重试失败超过最大重试次数时,业务系统会上报到故障MQ,故障管理平台消费MQ,收集故障并落库。研发同学收到故障通知,同时在故障管理后台页面可以看到故障列表、故障详情。 排查问题原因、敦促相关同事修复问题后,点击重试按钮。故障管理后台收到重试请求,会通过 Rpc SPI 调用到业务系统 重试故障,并告知管理后台成功和失败结果。

后台页面大致参考 image.png

3.2 故障管理处理流程

3.2.1 上报故障的处理流程

sequenceDiagram
业务系统 ->> 业务系统: 重试一直失败
业务系统 ->> MQ: 上报故障
MQ ->> 故障管理后台: 收集故障,落库存储
故障管理后台->> IM: 通知到公司故障群

3.2.2 故障重试的处理流程

sequenceDiagram
研发 ->> 后台页面: 浏览故障列表
研发 ->> 研发:  排查问题原因、敦促相关同事修复问题
研发 ->> 后台页面: 点击重试按钮
后台页面 ->> 故障管理后台: 重试故障 Http接口
故障管理后台->> 业务系统: 调用 Rpc SPI 重试故障
业务系统 -->> 故障管理后台: 返回处理结果,成功或失败原因
故障管理后台-->> 后台页面: 返回处理结果
后台页面-->> 研发: 失败继续排查原因。成功则结束

上报故障和重试故障是两个核心流程比较关键的点包括

  1. 上报故障使用 MQ 的方式,可以实现业务系统和故障后台的解耦、隔离。不妨碍业务流程。

  2. 业务系统需要实现故障重试SPI。业务系统负责重试的业务逻辑

  3. 故障管理后台是通用平台,通过故障类型区分各种故障。通过故障类型关联业务系统。重试故障时,根据故障类型查找到对应的业务系统 Rpc Client,并调用SPI方法 重试故障。

  4. 需要有前端页面实现故障列表页检索、故障详情页展示、故障重试按钮等功能。这是重中之重的环节,没有可视化页面,故障管理后台就没有存在的意义了。

3.3 系统关键实现

3.3.1 故障收集任务

CREATE TABLE `fault_collect_task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
  `misId` bigint(20) NOT NULL DEFAULT '0' COMMENT '创建人 ',
   `taskCode` varchar(256) NOT NULL DEFAULT '' COMMENT '故障类型code码,上报时需指定,全局唯一',
  `taskName` varchar(128) NOT NULL DEFAULT '' COMMENT '任务名',
  `taskComment` text comment '任务备注',
  `appkey` varchar(1024) DEFAULT '' COMMENT '业务系统微服务唯一键',
  `port`  int(11) NOT NULL DEFAULT '0' COMMENT '业务系统spi port',
  `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态',
  `ext_config` text COMMENT '扩展字段',
  `op_log` text COMMENT '操作记录',
  `ctime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
  `utime` int(11) NOT NULL DEFAULT '0' COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uni_idx_id` (`taskId`, `identityId`,`key` )
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='故障收集任务表'

故障收集任务是某一类故障的集合,任务会关联业务系统,上报故障时候,需要指定taskCode,用来标记故障属于哪一个任务。

3.3.2 故障表

故障表结构如下

CREATE TABLE `fault_item` (
 `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',
 `user_id` bigint(20) NOT NULL DEFAULT 0 COMMENT 'userId',
 `biz_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '业务方',
 `unique_key` varchar(256) NOT NULL DEFAULT '' COMMENT '业务定义的唯一键',
 `task_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '故障收集任务id',
 `task_code` varchar(256) NOT NULL DEFAULT '' COMMENT '故障收集任务code码,上报时需指定,全局唯一',
 `context` text COMMENT '故障内容',
 `status` int(11) NOT NULL DEFAULT 0 COMMENT '状态 0 初始化,1. 已重试成功,2 上次重试失败,3 忽略 ',
 `ext_config` text COMMENT '扩展字段, 被索引字段也放在里面',
 `comment` text COMMENT '备注',
`retry_result` text COMMENT '重试结果',
`version` int(11) NOT NULL DEFAULT 0 COMMENT 'version',
 `ctime` int(11) NOT NULL DEFAULT 0 COMMENT '创建时间',
 `utime` int(11) NOT NULL DEFAULT 0 COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uni_idx_taskid_id` (`task_id`, `unique_key`),
KEY `task_code_ctime_idx` (`task_code`, `ctime`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT = '故障表';

  1. taskId是故障收集任务id,task_code是故障任务的编码。
  2. context 是故障内容。业务系统自定义故障内容,重试故障时自己解析故障内容。管理后台不限制故障的格式。
  3. uniqukeKey 是故障自定义的唯一键,系统使用 taskId+uniqueKey 作为唯一键。当上报的故障重复时,系统会覆盖原有的故障内容。

3.3.3 注册Rpc Client 进入Spring

业务系统需要实现故障重试的SPI,故障管理后台需要创建 Rpc Client 的Bean,注册进Spring管理。

首先构建 BeanDefinitionBuilder,需要声明该bean的类型,声明bean的重要参数。 将 Spring ApplicationContext 转化为 BeanFactory。调用 registerBeanDefinition 注册bean进上下文。

ConfigurableApplicationContext configurableApplicationContext = ((ConfigurableApplicationContext) applicationContext);
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getBeanFactory();

BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(ThriftProxy.class);
beanDefinitionBuilder.addPropertyReference("mtThriftPoolConfig", "thriftPoolConfig");
beanDefinitionBuilder.addPropertyValue("timeout", );
beanDefinitionBuilder.addPropertyValue("appKey", "");
beanDefinitionBuilder.addPropertyValue("port", );

beanFactory.registerBeanDefinition(config.getBeanName(), beanDefinitionBuilder.getRawBeanDefinition());

4. 任务表重试

任务表的实现方案和 MQ 重试组件技术原理类似。例如都需要

  1. 定义重试注解
  2. 收集重试上下文(包括bean 类名,方法名,参数列表、参数值等等)的方式也是类似的。
  3. 解析重试上下文,执行重试方法的方法也是类似的。

它们在重试上下文的处理方式上有所不同:MQ重试组件选择将重试上下文发送到消息队列(MQ),而任务表重试选择将重试上下文记录到数据库中。然而,由于重试间隔要求具备延迟重试的能力,因此需要有扫库任务来扫描即将需要重试的记录。

相比之下,基于数据库的任务表重试组件的难度要更大,因为它需要自己实现扫库任务,而MQ重试组件只需要增加MQ订阅逻辑即可。

此外该组件还有其他缺点

  1. 如果重试消息非常多,写入数据库会面临很大的压力,而MQ消息队列具备削峰填谷的高并发写能力。
  2. 如果重试消息非常多,在订阅消费方面也会存在问题。基于MQ,多个实例可以订阅消息并同时消费。然而,任务表的扫库线程只能单机执行(分布式扫库的开发难度更大),所以只能单机处理海量的重试任务。为了解决这个问题,需要开启多线程同时处理重试任务,但是需要考虑设置多大的线程数和队列长度,这也是一个难题。
  3. 如果多个微服务使用这个组件,那么每个微服务在接入时,需要自己重新建表。接入成本高~

以上难题和痛点在 MQ 重试组件中是完全不存在的。

我前公司同事实现了基于数据库的重试组件,并在组内进行了推广,然而我们遇到了上述的种种难题,遗憾的是,最终也没有来得及解决这些难题(公司大裁员)。因此,我强烈建议大家组合使用前三个方案。

你可以先尝试使用 Spring Retryable 注解实现短期场景的重试。

遇到可靠性要求更高的长期重试场景,引入 MQ 重试组件。

当重试失败的频率非常高,运维成本高的情况发生后,你可以考虑建设故障修复平台,提供可视化页面修复故障。

最后啰嗦一句:假如 MQ 崩溃了,故障消息发送失败怎么办,重试过程不就阻断了吗?所以如果建设更可靠的最终一致性,需要其他运维手段。

  • 提供特定的运维任务,如指定用户 id 和 orderId 进行退款操作。
  • 建设运维后台,例如用户订单后台,提供可视化界面,以便进行各类运维操作。
  • 提供扫库任务,例如编写扫库任务,重试某个时间段内或者在订单列表中的失败订单。

保证系统最终一致性和可靠性是一个不断精进的过程。相对于前面提到的运维手段,Spring Retry、MQ 故障重试、故障修复平台是比较通用的,可以快速构建强大的最终一致性方案。这三种方式很少出现失效的情况,只有在以下场景下可能会失效:

  1. MQ 消息队列崩溃,导致无法发送重试消息。
  2. 代码出现 Bug,发生异常却没有抛出异常,所以没有重试。

扫库任务的一个致命缺点是:

  • 当有大量订单失败时,需要大量的重试操作,这会给数据库带来过大的压力。对于如何尽快处理重试任务,需要进一步考虑。(因为重试可能会超时、可能失败,会占用大量的线程资源,所以单机重试的压力非常大!)

各有优劣,很难有完美的方案,各位读者可根据公司的实际情况,自行选择适合自己的方案。

原文地址:https://juejin.cn/post/7324505713740021771