使用场景
- 用户下单成功后,超过N分钟未支付则自动取消订单;
- 秒杀活动开始前N分钟对设置了提醒的用户进行推送提醒;
- 预定工作会议,会议开始前N分钟进行通知参会人员。
实现延迟队列的两种方式:
- 死信队列 + 消息过期
- 延迟消息插件(rabbitmq-delayed-message-exchange)
基于死信队列(DLX+TTL)实现
实现的原理就是:给一个消息设定 TTL,但是不消费这个消息,等消息过期,过期后进入死信队列,然后再监听死信队列的消息进行消费。
实现步骤
定义一个死信交换机(
dead-letter-exchange
,即DLX),其实就是个普通的交换机,专门用来处理死信消息;
定义死信队列(
dead-letter-queue
),其实也就是个普通的队列,并在交换机上绑定:
定义延迟队列(
delay-queue-test
),并配置以下参数- 添加
x-dead-letter-exchange
参数,值为dead-letter-exchange
- 添加
x-dead-letter-routing-key
参数,值为dead-letter-routing
- 添加
x-message-ttl
参数,值为你需要延迟的毫秒数,这里用 3000 演示
- 添加
- 绑定延迟队列到交换机(自己选择合适的交换机):
测试
- 保证
delay-queue-test
这个队列无客户端消费,在项目监听死信队列dead-letter-queue
:
@Component
@Slf4j
public class DeadConsumer {
@RabbitListener(queues = "dead-letter-queue")
public void handleDeadQueue(Channel channel, Message message) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("死信消息消费:{}", msg);
}
}
- 在控制台或者编写代码发送消息测试:
@Service
@AllArgsConstructor
@Slf4j
public class TestServiceImpl implements ITestService {
private final RabbitTemplate rabbitTemplate;
@Override
public boolean sendTestMq(String msg) {
rabbitTemplate.convertAndSend("hong-exchange", "hong.routing.test.dead", "测试延迟消息");
log.info("发送消息结束......");
return true;
}
}
输出如下:
2023-11-03 20:47:57.951 INFO 37144 --- [nio-9018-exec-1] c.l.a.m.test.service.TestServiceImpl : 发送消息结束......
2023-11-03 20:48:01.027 INFO 37144 --- [ntContainer#2-1] c.l.assistant.consumer.DeadConsumer : 死信消息消费:"测试延迟消息"
从输出日志中可看到,消息是3秒后消费的,确实实现了延迟消息。
注意事项
基于死信队列可以实现延迟消息,但是存在一个问题,那就是可能会造成队头阻塞,队列是先进先出的,每次只会判断队头的消息是否过期,如果队头的消息时间很长,一直都不消费,那就会阻塞整个队列,影响后面的消息消费,及时后面的消息过期了,也依然会被阻塞。
另外,此方案是给队列设置TTL的,不能灵活动态配置,如果业务中需要不同的TTL,那就得配置不同的队列,增加系统复杂性。
rabbitmq-delayed-message-exchange
插件
rabbitmq-delayed-message-exchange
是官方提供的一个插件,用于实现延迟消息。
插件下载地址:rabbitmq-delayed-message-exchange/releases,选择对应的版本下载。
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez
# 进入 docker 容器查看插件目录
docker exec -it <container_name> /bin/bash
# 查看插件目录并退出容器
rabbitmq-plugins directories -s
输出如下:
Plugin archives directory: /opt/rabbitmq/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@master-rabbit-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins
拷贝:
docker cp rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/opt/rabbitmq/plugins
启用插件:
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
输出如下:
Enabling plugins on node rabbit@master-rabbit:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_prometheus
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@master-rabbit...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
至此,插件安装完成。在控制台中可看到 exchange 的类型中出现了 x-delayed-message
:
测试
- 在控制台新增交换机
hong.delay.topic
:
rabbitmq-delayed-message-exchange
创建队列并绑定交换机:
项目中配置
监听队列消费端:
@RabbitListener(queues = "hong.delay.queue.test")
public void handleDelayMessage(Channel channel, Message message) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("延迟消息消费:{}", msg);
}
延迟消息生产端:
private final RabbitTemplate rabbitTemplate;
@Override
public boolean sendTestMq(String msg) {
// 设置延迟时间为3000ms
rabbitTemplate.convertAndSend("hong.delay.exchange", "hong.delay.routing.test",msg, new HongMessagePostProcessor(3000));
log.info("发送延迟消息结束......");
return true;
}
其中 HongMessagePostProcessor
代码如下,重点是在 header 中指定延迟时间 x-delay
:
public class HongMessagePostProcessor implements MessagePostProcessor {
private long delay = -1L;
public HongMessagePostProcessor() {
}
public HongMessagePostProcessor(long delay) {
this.delay = delay;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
if (this.delay > 0L) {
// 设置延迟
message.getMessageProperties().setHeader("x-delay", this.delay);
}
return message;
}
}
运行代码测试,输出如下,成功实现延迟消息。
2023-11-04 20:56:44.228 INFO 9048 --- [nio-9018-exec-9] c.l.a.m.test.service.TestServiceImpl : 发送延迟消息结束......
2023-11-04 20:56:47.249 INFO 9048 --- [ntContainer#2-1] c.l.assistant.consumer.DeadConsumer : 延迟消息消费:"测试延迟消息-啦啦啦"
注意事项
- 这个插件最大的延迟时间是 (2^32)-1 毫秒(约49天),超过这个时间会被立即消费;
- 目前该插件的设计并不真正适合具有大量延迟消息(例如数百、数千或数百万)的场景,详细信息请参见 #72;
- 另外该插件的一个可变来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器后,它们开始争用调度程序资源,并且时间漂移不断累积。
更多的信息请查看官方描述:Limitations。
总结
基于死信队列与插件实现的延迟消息均有一定的局限性,如果消息TTL是相同的,则可以考虑使用死信队列来实现;如果想要消息TTL是动态可变的,那推荐使用插件方式实现的延迟消息,这种方式应该也满足更多的应用场景,不过对于大量的延迟消息,可能不太合适。