使用场景

  1. 用户下单成功后,超过N分钟未支付则自动取消订单;
  2. 秒杀活动开始前N分钟对设置了提醒的用户进行推送提醒;
  3. 预定工作会议,会议开始前N分钟进行通知参会人员。

实现延迟队列的两种方式:

基于死信队列(DLX+TTL)实现

实现的原理就是:给一个消息设定 TTL,但是不消费这个消息,等消息过期,过期后进入死信队列,然后再监听死信队列的消息进行消费。

实现步骤

  1. 定义一个死信交换机(dead-letter-exchange,即DLX),其实就是个普通的交换机,专门用来处理死信消息;
    定义一个死信交换机

  2. 定义死信队列(dead-letter-queue),其实也就是个普通的队列,并在交换机上绑定:
    绑定队列

  3. 定义延迟队列(delay-queue-test),并配置以下参数

    • 添加 x-dead-letter-exchange 参数,值为 dead-letter-exchange
    • 添加 x-dead-letter-routing-key 参数,值为 dead-letter-routing
    • 添加 x-message-ttl 参数,值为你需要延迟的毫秒数,这里用 3000 演示

定义延迟队列

  1. 绑定延迟队列到交换机(自己选择合适的交换机):
    绑定延迟队列到交换

测试

  1. 保证 delay-queue-test 这个队列无客户端消费,在项目监听死信队列 dead-letter-queue
@Component
@Slf4j
public class DeadConsumer {

    @RabbitListener(queues = "dead-letter-queue")
    public void handleDeadQueue(Channel channel, Message message) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("死信消息消费:{}", msg);
    }
}
  1. 在控制台或者编写代码发送消息测试:
@Service
@AllArgsConstructor
@Slf4j
public class TestServiceImpl implements ITestService {

    private final RabbitTemplate rabbitTemplate;

    @Override
    public boolean sendTestMq(String msg) {
        rabbitTemplate.convertAndSend("hong-exchange", "hong.routing.test.dead", "测试延迟消息");
        log.info("发送消息结束......");
        return true;
    }
}

输出如下:

2023-11-03 20:47:57.951  INFO 37144 --- [nio-9018-exec-1] c.l.a.m.test.service.TestServiceImpl     : 发送消息结束......
2023-11-03 20:48:01.027  INFO 37144 --- [ntContainer#2-1] c.l.assistant.consumer.DeadConsumer      : 死信消息消费:"测试延迟消息"

从输出日志中可看到,消息是3秒后消费的,确实实现了延迟消息。

注意事项

基于死信队列可以实现延迟消息,但是存在一个问题,那就是可能会造成队头阻塞,队列是先进先出的,每次只会判断队头的消息是否过期,如果队头的消息时间很长,一直都不消费,那就会阻塞整个队列,影响后面的消息消费,及时后面的消息过期了,也依然会被阻塞。

另外,此方案是给队列设置TTL的,不能灵活动态配置,如果业务中需要不同的TTL,那就得配置不同的队列,增加系统复杂性。

rabbitmq-delayed-message-exchange 插件

rabbitmq-delayed-message-exchange 是官方提供的一个插件,用于实现延迟消息。

插件下载地址:rabbitmq-delayed-message-exchange/releases,选择对应的版本下载。

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez
# 进入 docker 容器查看插件目录
docker exec -it <container_name> /bin/bash
# 查看插件目录并退出容器
rabbitmq-plugins directories -s

输出如下:

Plugin archives directory: /opt/rabbitmq/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@master-rabbit-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins

拷贝:

docker cp rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/opt/rabbitmq/plugins

启用插件:

docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

输出如下:

Enabling plugins on node rabbit@master-rabbit:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@master-rabbit...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.

至此,插件安装完成。在控制台中可看到 exchange 的类型中出现了 x-delayed-message

x-delayed-message类型

测试

  1. 在控制台新增交换机 hong.delay.topic
    新增交换机

rabbitmq-delayed-message-exchange

  1. 创建队列并绑定交换机:
    绑定交换机

  2. 项目中配置

监听队列消费端:

@RabbitListener(queues = "hong.delay.queue.test")
public void handleDelayMessage(Channel channel, Message message) {
    String msg = new String(message.getBody(), StandardCharsets.UTF_8);
    log.info("延迟消息消费:{}", msg);
}

延迟消息生产端:

private final RabbitTemplate rabbitTemplate;

@Override
public boolean sendTestMq(String msg) {
    // 设置延迟时间为3000ms
    rabbitTemplate.convertAndSend("hong.delay.exchange", "hong.delay.routing.test",msg,  new HongMessagePostProcessor(3000));
    log.info("发送延迟消息结束......");
    return true;
}

其中 HongMessagePostProcessor 代码如下,重点是在 header 中指定延迟时间 x-delay

public class HongMessagePostProcessor implements MessagePostProcessor {

    private long delay = -1L;

    public HongMessagePostProcessor() {
    }

    public HongMessagePostProcessor(long delay) {
        this.delay = delay;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        if (this.delay > 0L) {
            // 设置延迟
            message.getMessageProperties().setHeader("x-delay", this.delay);
        }
        return message;
    }
}

运行代码测试,输出如下,成功实现延迟消息。

2023-11-04 20:56:44.228  INFO 9048 --- [nio-9018-exec-9] c.l.a.m.test.service.TestServiceImpl     : 发送延迟消息结束......
2023-11-04 20:56:47.249  INFO 9048 --- [ntContainer#2-1] c.l.assistant.consumer.DeadConsumer      : 延迟消息消费:"测试延迟消息-啦啦啦"

注意事项

  1. 这个插件最大的延迟时间是 (2^32)-1 毫秒(约49天),超过这个时间会被立即消费;
  2. 目前该插件的设计并不真正适合具有大量延迟消息(例如数百、数千或数百万)的场景,详细信息请参见 #72
  3. 另外该插件的一个可变来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器后,它们开始争用调度程序资源,并且时间漂移不断累积。

更多的信息请查看官方描述:Limitations

总结

基于死信队列与插件实现的延迟消息均有一定的局限性,如果消息TTL是相同的,则可以考虑使用死信队列来实现;如果想要消息TTL是动态可变的,那推荐使用插件方式实现的延迟消息,这种方式应该也满足更多的应用场景,不过对于大量的延迟消息,可能不太合适。

参考文档

  1. Scheduling Messages with RabbitMQ
  2. Delayed messages with RabbitMQ
  3. rabbitmq/rabbitmq-delayed-message-exchange
文章目录