死信队列
死信的概念
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
死信的来源
消息 TTL 过期
TTL是 Time To Live 的缩写, 也就是生存时间
队列达到最大长度
队列满了,无法再添加数据到 MQ 中
消息被拒绝
(basic.reject 或 basic.nack) 并且 requeue = false
死信实战
在RabbitMQ中,死信队列(Dead Letter Exchange, DLX)是一种特殊类型的队列,用于存储无法被正常消费的消息。当消息在正常队列中无法被消费(比如因为消息被拒绝且不重新入队,或者消息在队列中停留的时间超过了设置的TTL),它就会被发送到死信队列。
定义死信队列和死信交换机:
- 死信队列和普通的队列类似,但是它需要绑定到一个死信交换机。
- 死信交换机可以是任何类型的交换机,包括
direct
。
配置消息队列:
- 消息队列需要设置
deadLetterExchange
参数,指定当消息无法被消费时应该发送到哪个死信交换机。
- 消息队列需要设置
配置死信队列:
- 死信队列需要绑定到死信交换机,并指定相应的路由键。
csharp
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;
public class RabbitMQService
{
private readonly IRabbitMQConnection _connection;
public RabbitMQService(IRabbitMQConnection connection)
{
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
}
public async Task SetupDLXAsync()
{
using var channel = _connection.CreateChannel();
// 死信交换机
string dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(dlxExchangeName, ExchangeType.Direct);
// 死信队列
string dlxQueueName = "dlx_queue";
channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(dlxQueueName, dlxExchangeName, ""); // 绑定到死信交换机
// 消息队列
string messageQueueName = "message_queue";
var messageQueueArgs = new Dictionary<string, object>
{
{"x-dead-letter-exchange", dlxExchangeName}, // 设置死信交换机
{"x-message-ttl", 30000} // 设置消息TTL为30秒
};
channel.QueueDeclare(messageQueueName, durable: true, exclusive: false, autoDelete: false, arguments: messageQueueArgs);
channel.QueueBind(messageQueueName, dlxExchangeName, ""); // 绑定到死信交换机
// 消费者
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
// 处理消息
// 如果消息处理失败,拒绝消息并重新入队
// channel.BasicReject(ea.DeliveryTag, true);
// 如果消息处理成功,确认消息
// channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(messageQueueName, false, consumer);
}
public async Task SendAsync(string exchange, string routingKey, object message)
{
using var channel = _connection.CreateChannel();
var messageJson = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(messageJson);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
await channel.BasicPublishAsync(exchange, routingKey, false, properties, body);
}
}
示例中:
dlx_exchange
是死信交换机。dlx_queue
是死信队列,它绑定到死信交换机。message_queue
是消息队列,它设置了死信交换机和消息TTL。- 当消息在
message_queue
中无法被消费(比如因为TTL超时),它会被发送到dlx_exchange
,然后路由到dlx_queue
。