线上消息队列故障,兜底改造方案

Sherwin.Wei Lv7

线上消息队列故障,兜底改造方案

线上部分接口频繁报错,经排查是因为消息发送超时导致的。我们用的是阿里云的 RocketMQ,去询问客服发现是因为云厂商升级故障使得消息发送频繁超时。

当时没有办法兜底方案处理消息队列故障的场景,后面复盘这部分功能,让我做一定的重构改造。

涉及的业务简单概括:需要在一个 service 方法里执行一些业务,修改了一些数据落库,然后再发送一条 MQ 消息,触发下一个流程。

因此需要保证当前 service 修改的数据事务提交成功,消息一定被发出去,事务提交失败,消息不能发出去

将同步发送消息的逻辑写在事务内部,就能保证发送失败,事务不会提交。但如果消息发送成功了,最后事务提交失败了呢?那发出去的消息还能撤回吗?

因此要解决的第一个问题其实是:当前 service 事务提交后,才能发送消息,不然就可能导致消息发出去了,实际事务是没执行成功的。

那如何保证当前 service 修改的数据事务提交了,消息一定就发出去了呢?万一事务提交了应用就挂了呢?消息不就没了,后续的流程也就中断了。

这归根结底是分布式事务问题,是数据库操作跟 MQ 消息的爱恨情仇,关于这个 RocketMQ 提供了解决方案即事务消息,但是它的侵入性比较大,需要修改接口适配事务消息的实现。

而本地消息表则非常简单,接下来我们开始操作!

首先我们需要建立一张本地消息表(当前这个设计主要是为了MQ消息的事务场景):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE TABLE `message` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`status_delete` tinyint NOT NULL DEFAULT '0' COMMENT '删除标记 0正常 1删除',
`topic` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'topic',
`tag` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'tag',
`msg_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '消息id',
`msg_key` varchar(64) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '消息key',
`data` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息json串',
`try_num` int NOT NULL DEFAULT '0' COMMENT '重试次数',
`status` tinyint NOT NULL DEFAULT '0' COMMENT '发送状态 0-未发送 1-已发送',
`next_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下次驱动开始时间'
PRIMARY KEY (`id`),
KEY `idx_key` (`msg_key`),
KEY `idx_nexttime_status` (`next_time`,`status`),
KEY `idx_msgid` (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='本地消息记录表';

然后再写个 MessageService 来包装下消息的发送流程,把本地消息记录保存封装在里面。(大家需要详细看下代码逻辑)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Service
public class MessageService implements IMessageService{
@Resource
private Producer producer;
@Resource
private MessageMapper messageMapper;

@Override
@Transactional(rollbackFor = Exception.class)
public void send(String topic, String tag, String key, Object obj) {
sendDelay(topic, tag, key, obj, 0L);
}

@Override
@Transactional(rollbackFor = Exception.class)
public void sendDelay(String topic, String tag, String key, Object obj, Long period) {
//计算时间,防止定时任务扫描将还在正常流程中的消息进行重试
int time = (period == 0L ? 10 : period.intValue() / 1000);
Date nextTime = DateUtil.getAfterNewDateSecond(new Date(), time);
String data = JSON.toJSONString(obj);
Message message = new Message()
.setStatusDelete(0)
.setTopic(topic)
.setTag(tag)
.setMsgId("")
.setMsgKey(key)
.setData(data)
.setTryNum(0)
.setStatus(0)
.setNextTime(nextTime);
// 保存本地消息记录
messageMapper.save(message);

// 当前事务提交后,再执行发送消息和更改本地消息记录状态
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
String messageId;
try {
if (period == 0L) {
messageId = producer.send(topic, tag, key, data);
} else {
messageId = producer.sendDelay(topic, tag, key, data, period);
}
Message update = new Message()
.setId(message.getId())
.setMsgId(messageId)
.setStatus(1);
messageMapper.updateById(update);
} catch (Exception e) {
log.error("..");
}
}
}
);
}
}

定时任务的逻辑就很简单了,就是扫描 nextTime 到期且未发送的消息,重新发送即可,这里不多赘述。

最终的使用就非常简单了:

1
2
3
4
5
6
@Transactional(rollbackFor = Exception.class)
public void doSth(xx) {
saveA();
saveB();
messageService.send(xxx);
}

我们来分析一下:

1)假设数据库事务提交失败,那么无事发生,消息也没发出去,此时业务正常。

2)假设数据库操作成功,但是数据库事务提交后,服务宕机了,那么消息没发出去,此时 saveA 和 saveB 都保存成功,那么 message 肯定也插入了(它们在同一个事务中),message 的 status 是 0 ,那么我们有个定时任务,根据 nextTime 和 status 来扫描得到未成功发送的消息,进行重试即可,后续消息可正常发送.

3)假设数据库操作成功,但是数据库事务提交了,MQ有问题,使得消息发不出去,同理第二条,后续定时任务扫描重试即可。

假设依赖的消息队列中间件又产生频繁超时的故障,那么也不会影响业务的正常运行,数据都会正常落库,事务正常提交,部分发送超时的消息,由后续补偿任务自动补偿重试。

可以想象,如果没有这个机制可能会发送两种情况:

  1. 如果消息在事务内发送,由于消息发送出错,那么事务提交失败,业务会直接受到影响,线上频繁报错(还解决不了,因为这是阿里云MQ底层升级导致的问题),妥妥P0故障。
  2. 如果消息在事务提交后发送,又没落库记录,那么消息发送超时,后续流程中断,后续需要手动补数据,能累死个人。

简单补充

一般 service 事务相关方法都用 @Transactional 修饰, messageService.send 也被 @Transactional,默认事务传播级别是 PROPAGATION_REQUIRED,继承外部事务,因此它们处于同一个事务。

然后 TransactionSynchronizationManager 可以管理当前线程的事务,内部的 TransactionSynchronizationAdapter 是一个抽象类

企业微信截图_891c1547-fe9f-43ed-a7b5-584047cfeb3b.png

可以看到,它能让我们在事务提交前、后、暂停等各阶段实现一些自己的逻辑。

Comments
On this page
线上消息队列故障,兜底改造方案