面试官在面试候选人时,如果发现候选人的简历中写了在项目中使用了 MQ 技术(如 Kafka、RabbitMQ、RocketMQ),基本都会抛出一个问题:在使用 MQ 的时候,怎么确保消息 100% 不丢失?
这个问题在实际工作中很常见,既能考察候选者对于 MQ 中间件技术的掌握程度,又能很好地区分候选人的能力水平。接下来,我们就从这个问题出发,探讨你应该掌握的基础知识和答题思路,以及延伸的面试考点。
场景
目前,我们的电商系统上线了一项新产品发红包功能。当用户在商城的消费额度达到一定标准后,系统会自动给用户发放现金红包,以此来答谢用户的支持,并进一步刺激用户的消费欲望。之前我们提到过,由于发红包这一操作并非当前下单流程的核心环节,所以我们采用了消息队列来进行异步处理。然而,这种处理方式背后隐藏着一些潜在问题:
- 在消息投递过程中,可能会出现消息丢失的情况。一旦消息丢失,用户可能会拨打客服电话进行投诉,反映自己没有收到红包,甚至可能会向有关部门进行投诉,这无疑会对我们的品牌形象造成负面影响。
- 另一个问题是消息重复发送的风险。如果消息被重复发送,用户就会收到多个红包,这不仅会增加公司的运营成本,还可能引发后续的财务和管理问题。
因此, 我们现在面临的关键任务是确保系统生成的消息能够被准确消费,并且只能被消费一次。那么,究竟该如何实现这一目标呢?接下来,我们将深入探讨这一问题,寻找有效的解决方案。
为何消息会丢失?
要实现消息仅被消费一次的目标,首要前提是确保消息在整个传输和处理过程中不丢失。接下来,我们来分析一下,从消息被写入消息队列,到最终被成功消费的整个流程中,有哪些环节可能会导致消息丢失的情况发生。经过仔细分析,我们发现主要有以下三个关键环节:
- 消息从生产者传递到消息队列的过程中,可能由于网络波动、连接中断等原因导致消息丢失。
- 消息在消息队列中存储时,可能会因为队列本身的故障、数据存储异常等问题,造成消息的丢失。
- 消息在被消费的过程中,也可能会出现消费者异常、处理失败等情况,导致消息未能被正确消费,从而造成消息的丢失。
如上,我们分析了共有 3 个消息可能丢失的地方,接下来,我们就具体来分析下每一种情况。
消息在写入消息队列过程中丢失
消息生产者通常是业务系统,而消息队列独立部署在专门的服务器上。因此,业务服务器与消息队列服务器之间可能会出现网络抖动的情况。一旦发生网络抖动,正在传输的消息就有可能丢失。
针对这种情况,一般我们会采用消息重传的策略。也就是说,当检测到发送的消息超时未收到确认时,就重新发送一次该消息。但需要注意的是,不能无限制地重传消息。根据实际经验,如果不是消息队列本身出现故障,或者网络彻底断开连接,通常重试 2 到 3 次就足够了。
然而,这种消息重传方案也存在一定的弊端,那就是有可能导致消息重复。这就会使得消费者接收到重复的消息。比如,消息已经成功发送到消息队列中,但由于消息队列处理消息的速度较慢,或者网络抖动影响了响应时间,在生产端看来,消息发送似乎超时了。于是,生产者就会重新发送当前消息,这样就会出现消息重复的情况。在我们之前提到的电商发红包案例中,就可能导致用户收到两个红包。
消息在消息队列中丢失
即便消息成功发送到了消息队列,也不能保证消息绝对安全,仍然存在丢失的风险。
以 Kafka 为例,Kafka 中的消息存储在本地磁盘上。为了降低消息存储过程中对磁盘的随机 I/O 操作,通常会先将消息写入操作系统的 Page Cache 中,然后在合适的时机再将消息刷新到磁盘上。
例如,Kafka 可以进行相关配置,当达到特定的时间间隔,或者累积到一定数量的消息时,才进行刷盘操作,这就是所谓的 异步刷盘。
不过,这种异步刷盘方式存在一个隐患。如果发生机器意外掉电或者异常重启的情况,那么 Page Cache 中还未来得及刷盘的消息就会丢失。那么该如何解决这个问题呢?有人可能会想到把刷盘的时间间隔设置得很短,或者设置为每累积一条消息就立即刷盘。但这样做会带来一个明显的问题,即频繁刷盘会对系统性能产生较大的影响。而且从实际经验来看,机器出现宕机或者掉电的概率相对较低。所以,并不建议采用这种过于激进的刷盘设置方式。
如果你的电商系统对消息丢失的容忍度很低,那么你可以考虑以集群方式部署 Kafka 服务,通过部署多个副本备份数据,保证消息尽量不丢失。
那么它是怎么实现的呢?Kafka 集群中有一个 Leader 负责消息的写入和消费,可以有多个 Follower 负责数据的备份。Follower 中有一个特殊的集合叫做 ISR(in-sync replicas),当 Leader 故障时,新选举出来的 Leader 会从 ISR 中选择,默认 Leader 的数据会异步地复制给 Follower,这样在 Leader 发生掉电或者宕机时,Kafka 会从 Follower 中消费消息,减少消息丢失的可能。
由于默认消息是异步地从 Leader 复制到 Follower 的,所以一旦 Leader 宕机,那些还没有来得及复制到 Follower 的消息还是会丢失。为了解决这个问题,Kafka 为生产者提供一个选项叫做“acks”,当这个选项被设置为“all”时,生产者发送的每一条消息除了发给 Leader 外还会发给所有的 ISR,并且必须得到 Leader 和所有 ISR 的确认后才被认为发送成功。这样,只有 Leader 和所有的 ISR 都挂了,消息才会丢失。
从上面这张图能够清晰看出,当设置“acks=all”时,1、3、4三个步骤需要同步执行,这对消息生产的性能会产生较为显著的影响。所以,在实际应用过程中,你必须要进行谨慎、全面的权衡考量。这里给出以下建议:
- 倘若你对消息的可靠性要求极高,务必确保消息一条都不能丢失,那么不建议开启消息队列的同步刷盘,而是采用集群部署的方式来保障消息的可靠传输。可以进行相关配置,当所有处于ISR(In-Sync Replicas,同步副本集)中的Follower都成功接收到消息后,才返回成功响应。这样,即使个别节点出现故障,消息也不会丢失。
- 若业务对消息丢失有一定的容忍程度,那么可以不部署集群;即便采用集群方式部署,也建议配置为只要发送给一个Follower成功,就返回成功响应。这种配置方式能够在一定程度上提高消息生产的效率,降低系统开销,同时也能满足业务对消息可靠性的基本要求。
- 通常情况下,我们的业务系统对消息丢失都存在一定的容忍范围。就像前面提到的红包系统,假如红包消息不幸丢失,我们只需在后续对未收到红包的用户进行补发操作即可。这种方式既保证了业务的正常运转,又能在一定程度上降低系统的复杂性和成本。
消费过程中消息存在丢失风险
以Kafka为例进行说明。在Kafka中,消费者消费消息的进度是记录在消息队列集群里的。整个消费过程可以拆解为三个步骤:接收消息、处理消息以及更新消费进度。
在接收消息和处理消息这两个环节,都有可能出现异常或失败的情况。比如,接收消息时若网络发生抖动,就可能导致消息未能正确接收;处理消息时,也可能因业务层面出现异常,致使处理流程无法完整执行。倘若在这种情况下更新了消费进度,那么这条处理失败的消息就会永远得不到处理,从结果上看,等同于消息丢失。
因此, 这里需要特别注意,必须在消息接收和处理都完成之后,才可以更新消费进度。然而,这样做也会引发消息重复的问题。例如,某条消息处理完毕后,消费者突然宕机,由于尚未更新消费进度,当该消费者重启后,就会再次重复消费这条消息。
如何确保消息仅被消费一次
从上述分析可知,为了防止消息丢失,我们需要付出两方面的代价:一是性能方面的损耗;二是可能导致消息被重复消费。
性能损耗在一定程度上是可以接受的,因为通常业务系统只有在处理写请求时才会向消息队列发送操作,而一般系统的写请求量相对不大。但消息一旦被重复消费,就极有可能导致业务逻辑处理出现错误。那么,我们该如何避免消息重复的情况呢?
想要完全杜绝消息重复几乎是不可能的,毕竟网络抖动、机器宕机以及处理异常等状况很难避免,目前工业领域也没有成熟的解决方案。所以,我们可以适当放宽要求,只要确保即便消费到重复消息,最终的消费结果与只消费一次时等同即可,也就是要保证在消息的生产和消费过程具备“幂等性”。
什么是幂等
幂等原本是一个数学概念,它指的是无论对一个操作执行多少次,最终得到的结果都与仅执行一次时相同。这听起来或许有些抽象,下面给你举个形象的例子来帮助理解。
想象一下,男生和女生吵架的场景。女生揪着一个点不放,不断传递“你不在乎我了吗?”(这就好比生产消息)这个信息。即便她多次埋怨“你不在乎我了吗?”(多次生产相同消息),但男生的耳朵(相当于消息处理环节)会自动将这多次重复的信息屏蔽,就好像只听到了一次一样。从结果上看,不管女生重复说多少次,对男生接收信息的最终状态没有影响,这就是幂等性在生活场景中的体现。
再从实际业务角度看,假设消费一条消息的操作是让现有的库存数量减1,那么消费两条相同的消息就会使库存数量减2,这种情况下,该操作就不具备幂等性。但如果消费一条消息后,处理逻辑设定为将库存数量设置为0,或者是当当前库存数量为10时才减1 ,这样无论消费多少条相同的消息,最终库存的结果都是一样的,这就满足了幂等性的要求。
简单来说,“幂等”可以理解为:一件事情不管重复做多少次,所产生的最终结果都和只做一次时完全相同,那么这件事情就具有幂等性。
在生产、消费过程中增加消息幂等性的保证
由于消息在生产和消费过程中都有产生重复的可能性,所以关键在于,要在这两个环节分别增加对消息幂等性的保障措施。如此一来,从最终的实际效果来看,就可以认为消息实际上仅被消费了一次。
在消息生产阶段,Kafka 0.11版本以及Pulsar都支持“producer idempotency”特性,即生产过程的幂等性。这一特性能够确保,即便消息在生产端可能出现重复的情况,但最终存储到消息队列时,只会保留一份。
其实现原理是,给每个生产者分配一个独一无二的ID,同时为生产者发出的每一条消息都赋予一个唯一的ID。消息队列的服务端会保存<生产者ID,最后一条消息ID>这样的映射关系。当某个生产者产生新消息时,消息队列服务端会将新消息的ID与存储的最后一条消息ID进行比对。若两者一致,服务端就判定该消息为重复消息,会自动将其丢弃 。
而在消费端,幂等性的保证会稍微复杂一些,你可以从通用层和业务层两个层面来考虑。
你可以看到,无论是生产端的幂等性保证方式,还是消费端通用的幂等性保证方式,它们的共同特点都是为每一个消息生成一个唯一的 ID,然后在使用这个消息的时候,先比对这个 ID 是否已经存在,如果存在,则认为消息已经被使用过。所以这种方式是一种标准的实现幂等的方式,你在项目之中可以拿来直接使用,它在逻辑上的伪代码就像下面这样:
boolean isIDExisted = selectByID(ID); // 判断ID是否存在
if(isIDExisted) {
return; //存在则直接返回
} else {
process(message); //不存在,则处理消息
saveID(ID); //存储ID
}
不过这样会有一个问题:如果消息在处理之后,还没有来得及写入数据库,消费者宕机了重启之后发现数据库中并没有这条消息,还是会重复执行两次消费逻辑,这时你就需要引入事务机制,保证消息处理和写入数据库必须同时成功或者同时失败,但是这样消息处理的成本就更高了,所以,如果对于消息重复没有特别严格的要求,可以直接使用这种通用的方案,而不考虑引入事务。
在业务层面怎么处理呢?这里有很多种处理方式,其中有一种是增加乐观锁的方式。比如,你的消息处理程序需要给一个人的账号加钱,那么你可以通过乐观锁的方式来解决。
具体的操作方式是这样的:你给每个人的账号数据中增加一个版本号的字段,在生产消息时先查询这个账户的版本号,并且将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号后,在执行更新账户金额 SQL 的时候带上版本号,类似于执行:
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;
你看,我们在更新数据时给数据加了乐观锁,这样在消费第一条消息时,version 值为 1,SQL 可以执行成功,并且同时把 version 值改为了 2;在执行第二条相同的消息时,由于 version 值不再是 1,所以这条 SQL 不能执行成功,也就保证了消息的幂等性。
总结
今天我们主要学习了在消息队列中,消息可能会发生丢失的场景,和我们的应对方法,以及在消息重复的场景下,我们要如何保证,尽量不影响消息最终的处理结果。
文章围绕面试中“使用MQ时如何确保消息100%不丢失”的问题,结合电商发红包案例,深入探讨消息队列中消息丢失和重复的问题,以及确保消息仅被消费一次的方法,主要要点如下:
- 消息队列使用场景及问题:在电商发红包功能中,使用消息队列异步处理发红包操作,存在消息丢失和重复发送的问题,分别会影响用户体验和公司运营成本。
- 消息丢失的原因及解决方法
- 写入消息队列过程中丢失:业务服务器与消息队列服务器间网络抖动可能导致消息丢失,通常采用消息重传策略,但可能引发消息重复。
- 消息在消息队列中丢失:以Kafka为例,采用异步刷盘存储消息,机器掉电或重启可能导致Page Cache中未刷盘的消息丢失。可通过集群部署Kafka服务、设置“acks=all”等方式减少消息丢失,但需权衡性能影响。
- 消费过程中消息丢失:Kafka中,消息接收和处理环节的异常可能导致消息丢失,应在消息接收和处理完成后更新消费进度,不过这可能造成消息重复消费。
- 确保消息仅被消费一次的方法 - 幂等性
- 幂等性概念:多次执行同一操作与执行一次操作的最终结果相同。例如在库存处理业务中,特定逻辑下无论消费多少相同消息,库存最终结果一致,即具备幂等性。
- 生产过程幂等性保证:Kafka 0.11版本及Pulsar支持“producer idempotency”特性,通过给生产者和消息分配唯一ID,服务端比对ID丢弃重复消息。
- 消费过程幂等性保证:消费端可从通用层和业务层考虑。通用层通过为消息生成唯一ID并比对是否已存在来实现幂等,必要时引入事务机制;业务层可采用增加乐观锁的方式,如在账户金额更新时,利用版本号字段实现幂等性。
文章来源: https://study.disign.me/article/202509/7.message-queue-single-consume.md
发布时间: 2025-02-25
作者: 技术书栈编辑