延迟队列
延迟队列介绍
延迟队列概念:
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。
延迟队列使用场景:
订单在十分钟之内未支付则自动取消
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
用户注册成功后,如果三天内没有登陆则进行短信提醒
用户发起退款,如果三天内没有得到处理则通知相关运营人员
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;那我们一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
如果数据量比较少,确实可以这样做,比如:对于「如果账单一周内未支付则进行自动结算」这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:「订单十分钟内未支付则关闭」,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
TTL的两种设置
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。
换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为「死信」。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
队列设置 TTL
在创建队列的时候设置队列的 x-message-ttl 属性
Map<String, Object> params = new HashMap<>();
params.put("x-message-ttl",5000);
return QueueBuilder.durable("QA").withArguments(args).build(); // QA 队列的最大存活时间位 5000 毫秒
消息设置 TTL
针对每条消息设置 TTL
rabbitTemplate.converAndSend("X","XC",message,correlationData -> {
correlationData.getMessageProperties().setExpiration("5000");
});
两个代码块来自下方的案例
两者区别
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间,具体看下方案例。
另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃
整合Dotnet
在RabbitMQ中,延迟队列(也称为延迟消息或定时消息)是一种特殊类型的队列,它允许消息在被消费之前等待一段指定的时间。这种队列非常有用,比如在需要执行延时任务、实现定时提醒等场景中。
RabbitMQ本身不直接支持延迟队列,但可以通过几种方式来实现延迟消息的功能:
1. 使用RabbitMQ的TTL和死信队列(DLX)
这是实现延迟队列最常用的方法。通过设置消息的TTL(Time-To-Live)和队列的死信交换机,可以让消息在一定时间后自动从原队列移动到另一个队列进行处理。
- 设置消息TTL:让消息在队列中等待一段时间后自动过期。
- 设置死信队列:消息过期后会被发送到死信队列。
2. 使用RabbitMQ插件
RabbitMQ有一些插件可以实现延迟队列的功能,比如rabbitmq_delayed_message_exchange
插件。这个插件允许你直接在消息上设置延迟时间,而不需要依赖TTL和死信队列。
3. 自定义实现
如果你需要更灵活的控制,可以自定义实现延迟队列。例如,使用数据库或外部存储来存储消息和预计的处理时间,然后通过定时任务检查并处理消息。
示例:使用TTL和死信队列实现延迟队列
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;
public class RabbitMQService
{
private readonly IRabbitMQConnection _connection;
public RabbitMQService(IRabbitMQConnection connection)
{
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
}
public async Task SetupDelayedQueueAsync()
{
using var channel = _connection.CreateChannel();
// 死信交换机
string dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(dlxExchangeName, ExchangeType.Direct);
// 死信队列
string dlxQueueName = "dlx_queue";
channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(dlxQueueName, dlxExchangeName, "");
// 延迟队列
string delayedQueueName = "delayed_queue";
var delayedQueueArgs = new Dictionary<string, object>
{
{"x-dead-letter-exchange", dlxExchangeName}, // 设置死信交换机
{"x-message-ttl", 60000} // 设置消息TTL为60秒
};
channel.QueueDeclare(delayedQueueName, durable: true, exclusive: false, autoDelete: false, arguments: delayedQueueArgs);
channel.QueueBind(delayedQueueName, dlxExchangeName, "");
}
public async Task SendDelayedMessageAsync(string exchange, string routingKey, object message, int delaySeconds)
{
using var channel = _connection.CreateChannel();
var messageJson = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(messageJson);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Expiration = delaySeconds.ToString(); // 设置消息TTL
await channel.BasicPublishAsync(exchange, routingKey, false, properties, body);
}
}
示例中:
dlx_exchange
是死信交换机。dlx_queue
是死信队列,用于接收从延迟队列过期的消息。delayed_queue
是延迟队列,设置了死信交换机和消息TTL。SendDelayedMessageAsync
方法允许你发送一个延迟消息,通过设置消息的TTL来实现延迟。