线上消息队列故障,兜底改造方案
线上部分接口频繁报错,经排查是因为消息发送超时导致的。我们用的是阿里云的 RocketMQ,去询问客服发现是因为云厂商升级故障使得消息发送频繁超时。
当时没有办法兜底方案处理消息队列故障的场景,后面复盘这部分功能,让我做一定的重构改造。
涉及的业务简单概括:需要在一个 service 方法里执行一些业务,修改了一些数据落库,然后再发送一条 MQ 消息,触发下一个流程。
因此需要保证当前 service 修改的数据事务提交成功,消息一定被发出去,事务提交失败,消息不能发出去。
将同步发送消息的逻辑写在事务内部,就能保证发送失败,事务不会提交。但如果消息发送成功了,最后事务提交失败了呢?那发出去的消息还能撤回吗?
因此要解决的第一个问题其实是:当前 service 事务提交后,才能发送消息,不然就可能导致消息发出去了,实际事务是没执行成功的。
那如何保证当前 service 修改的数据事务提交了,消息一定就发出去了呢?万一事务提交了应用就挂了呢?消息不就没了,后续的流程也就中断了。
这归根结底是分布式事务问题,是数据库操作跟 MQ 消息的爱恨情仇,关于这个 RocketMQ 提供了解决方案即事务消息,但是它的侵入性比较大,需要修改接口适配事务消息的实现。
而本地消息表则非常简单,接下来我们开始操作!
首先我们需要建立一张本地消息表(当前这个设计主要是为了MQ消息的事务场景):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| CREATE TABLE `message` ( `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `status_delete` tinyint NOT NULL DEFAULT '0' COMMENT '删除标记 0正常 1删除', `topic` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'topic', `tag` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'tag', `msg_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '消息id', `msg_key` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '消息key', `data` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息json串', `try_num` int NOT NULL DEFAULT '0' COMMENT '重试次数', `status` tinyint NOT NULL DEFAULT '0' COMMENT '发送状态 0-未发送 1-已发送', `next_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下次驱动开始时间' PRIMARY KEY (`id`), KEY `idx_key` (`msg_key`), KEY `idx_nexttime_status` (`next_time`,`status`), KEY `idx_msgid` (`msg_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='本地消息记录表';
|
然后再写个 MessageService 来包装下消息的发送流程,把本地消息记录保存封装在里面。(大家需要详细看下代码逻辑)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| @Service public class MessageService implements IMessageService{ @Resource private Producer producer; @Resource private MessageMapper messageMapper;
@Override @Transactional(rollbackFor = Exception.class) public void send(String topic, String tag, String key, Object obj) { sendDelay(topic, tag, key, obj, 0L); } @Override @Transactional(rollbackFor = Exception.class) public void sendDelay(String topic, String tag, String key, Object obj, Long period) { //计算时间,防止定时任务扫描将还在正常流程中的消息进行重试 int time = (period == 0L ? 10 : period.intValue() / 1000); Date nextTime = DateUtil.getAfterNewDateSecond(new Date(), time); String data = JSON.toJSONString(obj); Message message = new Message() .setStatusDelete(0) .setTopic(topic) .setTag(tag) .setMsgId("") .setMsgKey(key) .setData(data) .setTryNum(0) .setStatus(0) .setNextTime(nextTime); // 保存本地消息记录 messageMapper.save(message);
// 当前事务提交后,再执行发送消息和更改本地消息记录状态 TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronizationAdapter() { @Override public void afterCommit() { String messageId; try { if (period == 0L) { messageId = producer.send(topic, tag, key, data); } else { messageId = producer.sendDelay(topic, tag, key, data, period); } Message update = new Message() .setId(message.getId()) .setMsgId(messageId) .setStatus(1); messageMapper.updateById(update); } catch (Exception e) { log.error(".."); } } } ); } }
|
定时任务的逻辑就很简单了,就是扫描 nextTime 到期且未发送的消息,重新发送即可,这里不多赘述。
最终的使用就非常简单了:
1 2 3 4 5 6
| @Transactional(rollbackFor = Exception.class) public void doSth(xx) { saveA(); saveB(); messageService.send(xxx); }
|
我们来分析一下:
1)假设数据库事务提交失败,那么无事发生,消息也没发出去,此时业务正常。
2)假设数据库操作成功,但是数据库事务提交后,服务宕机了,那么消息没发出去,此时 saveA 和 saveB 都保存成功,那么 message 肯定也插入了(它们在同一个事务中),message 的 status 是 0 ,那么我们有个定时任务,根据 nextTime 和 status 来扫描得到未成功发送的消息,进行重试即可,后续消息可正常发送.
3)假设数据库操作成功,但是数据库事务提交了,MQ有问题,使得消息发不出去,同理第二条,后续定时任务扫描重试即可。
假设依赖的消息队列中间件又产生频繁超时的故障,那么也不会影响业务的正常运行,数据都会正常落库,事务正常提交,部分发送超时的消息,由后续补偿任务自动补偿重试。
可以想象,如果没有这个机制可能会发送两种情况:
- 如果消息在事务内发送,由于消息发送出错,那么事务提交失败,业务会直接受到影响,线上频繁报错(还解决不了,因为这是阿里云MQ底层升级导致的问题),妥妥P0故障。
- 如果消息在事务提交后发送,又没落库记录,那么消息发送超时,后续流程中断,后续需要手动补数据,能累死个人。
简单补充
一般 service 事务相关方法都用 @Transactional 修饰, messageService.send 也被 @Transactional,默认事务传播级别是 PROPAGATION_REQUIRED,继承外部事务,因此它们处于同一个事务。
然后 TransactionSynchronizationManager 可以管理当前线程的事务,内部的 TransactionSynchronizationAdapter 是一个抽象类

可以看到,它能让我们在事务提交前、后、暂停等各阶段实现一些自己的逻辑。