Skip to content

死信队列

死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

死信的来源

  • 消息 TTL 过期

    TTL是 Time To Live 的缩写, 也就是生存时间

  • 队列达到最大长度

    队列满了,无法再添加数据到 MQ 中

  • 消息被拒绝

    (basic.reject 或 basic.nack) 并且 requeue = false

死信实战

在RabbitMQ中,死信队列(Dead Letter Exchange, DLX)是一种特殊类型的队列,用于存储无法被正常消费的消息。当消息在正常队列中无法被消费(比如因为消息被拒绝且不重新入队,或者消息在队列中停留的时间超过了设置的TTL),它就会被发送到死信队列。

  1. 定义死信队列和死信交换机

    • 死信队列和普通的队列类似,但是它需要绑定到一个死信交换机。
    • 死信交换机可以是任何类型的交换机,包括direct
  2. 配置消息队列

    • 消息队列需要设置deadLetterExchange参数,指定当消息无法被消费时应该发送到哪个死信交换机。
  3. 配置死信队列

    • 死信队列需要绑定到死信交换机,并指定相应的路由键。
csharp
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

public class RabbitMQService
{
    private readonly IRabbitMQConnection _connection;

    public RabbitMQService(IRabbitMQConnection connection)
    {
        _connection = connection ?? throw new ArgumentNullException(nameof(connection));
    }

    public async Task SetupDLXAsync()
    {
        using var channel = _connection.CreateChannel();

        // 死信交换机
        string dlxExchangeName = "dlx_exchange";
        channel.ExchangeDeclare(dlxExchangeName, ExchangeType.Direct);

        // 死信队列
        string dlxQueueName = "dlx_queue";
        channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
        channel.QueueBind(dlxQueueName, dlxExchangeName, ""); // 绑定到死信交换机

        // 消息队列
        string messageQueueName = "message_queue";
        var messageQueueArgs = new Dictionary<string, object>
        {
            {"x-dead-letter-exchange", dlxExchangeName}, // 设置死信交换机
            {"x-message-ttl", 30000} // 设置消息TTL为30秒
        };
        channel.QueueDeclare(messageQueueName, durable: true, exclusive: false, autoDelete: false, arguments: messageQueueArgs);
        channel.QueueBind(messageQueueName, dlxExchangeName, ""); // 绑定到死信交换机

        // 消费者
        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            // 处理消息
            // 如果消息处理失败,拒绝消息并重新入队
            // channel.BasicReject(ea.DeliveryTag, true);
            // 如果消息处理成功,确认消息
            // channel.BasicAck(ea.DeliveryTag, false);
        };
        channel.BasicConsume(messageQueueName, false, consumer);
    }

    public async Task SendAsync(string exchange, string routingKey, object message)
    {
        using var channel = _connection.CreateChannel();
        var messageJson = JsonSerializer.Serialize(message);
        var body = Encoding.UTF8.GetBytes(messageJson);
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;
        await channel.BasicPublishAsync(exchange, routingKey, false, properties, body);
    }
}

示例中:

  • dlx_exchange是死信交换机。
  • dlx_queue是死信队列,它绑定到死信交换机。
  • message_queue是消息队列,它设置了死信交换机和消息TTL。
  • 当消息在message_queue中无法被消费(比如因为TTL超时),它会被发送到dlx_exchange,然后路由到dlx_queue