Skip to content

延迟队列

延迟队列介绍

延迟队列概念:

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。

延迟队列使用场景:

  1. 订单在十分钟之内未支付则自动取消

  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒

  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒

  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员

  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;那我们一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于「如果账单一周内未支付则进行自动结算」这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:「订单十分钟内未支付则关闭」,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

TTL的两种设置

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。

换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为「死信」。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

队列设置 TTL

在创建队列的时候设置队列的 x-message-ttl 属性

csharp
Map<String, Object> params = new HashMap<>();
params.put("x-message-ttl",5000);
return QueueBuilder.durable("QA").withArguments(args).build(); // QA 队列的最大存活时间位 5000 毫秒

消息设置 TTL

针对每条消息设置 TTL

csharp
rabbitTemplate.converAndSend("X","XC",message,correlationData -> {
    correlationData.getMessageProperties().setExpiration("5000");
});

两个代码块来自下方的案例

两者区别

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间,具体看下方案例。

另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃

整合Dotnet

在RabbitMQ中,延迟队列(也称为延迟消息或定时消息)是一种特殊类型的队列,它允许消息在被消费之前等待一段指定的时间。这种队列非常有用,比如在需要执行延时任务、实现定时提醒等场景中。

RabbitMQ本身不直接支持延迟队列,但可以通过几种方式来实现延迟消息的功能:

1. 使用RabbitMQ的TTL和死信队列(DLX)

这是实现延迟队列最常用的方法。通过设置消息的TTL(Time-To-Live)和队列的死信交换机,可以让消息在一定时间后自动从原队列移动到另一个队列进行处理。

  • 设置消息TTL:让消息在队列中等待一段时间后自动过期。
  • 设置死信队列:消息过期后会被发送到死信队列。

2. 使用RabbitMQ插件

RabbitMQ有一些插件可以实现延迟队列的功能,比如rabbitmq_delayed_message_exchange插件。这个插件允许你直接在消息上设置延迟时间,而不需要依赖TTL和死信队列。

3. 自定义实现

如果你需要更灵活的控制,可以自定义实现延迟队列。例如,使用数据库或外部存储来存储消息和预计的处理时间,然后通过定时任务检查并处理消息。

示例:使用TTL和死信队列实现延迟队列

csharp
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

public class RabbitMQService
{
    private readonly IRabbitMQConnection _connection;

    public RabbitMQService(IRabbitMQConnection connection)
    {
        _connection = connection ?? throw new ArgumentNullException(nameof(connection));
    }

    public async Task SetupDelayedQueueAsync()
    {
        using var channel = _connection.CreateChannel();

        // 死信交换机
        string dlxExchangeName = "dlx_exchange";
        channel.ExchangeDeclare(dlxExchangeName, ExchangeType.Direct);

        // 死信队列
        string dlxQueueName = "dlx_queue";
        channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
        channel.QueueBind(dlxQueueName, dlxExchangeName, "");

        // 延迟队列
        string delayedQueueName = "delayed_queue";
        var delayedQueueArgs = new Dictionary<string, object>
        {
            {"x-dead-letter-exchange", dlxExchangeName}, // 设置死信交换机
            {"x-message-ttl", 60000} // 设置消息TTL为60秒
        };
        channel.QueueDeclare(delayedQueueName, durable: true, exclusive: false, autoDelete: false, arguments: delayedQueueArgs);
        channel.QueueBind(delayedQueueName, dlxExchangeName, "");
    }

    public async Task SendDelayedMessageAsync(string exchange, string routingKey, object message, int delaySeconds)
    {
        using var channel = _connection.CreateChannel();
        var messageJson = JsonSerializer.Serialize(message);
        var body = Encoding.UTF8.GetBytes(messageJson);
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;
        properties.Expiration = delaySeconds.ToString(); // 设置消息TTL
        await channel.BasicPublishAsync(exchange, routingKey, false, properties, body);
    }
}

示例中:

  • dlx_exchange是死信交换机。
  • dlx_queue是死信队列,用于接收从延迟队列过期的消息。
  • delayed_queue是延迟队列,设置了死信交换机和消息TTL。
  • SendDelayedMessageAsync方法允许你发送一个延迟消息,通过设置消息的TTL来实现延迟。