Skip to content

高级发布确认

在RabbitMQ中,高级发布确认(Publisher Confirms)是一种机制,它允许发布者(Producer)在消息被正确地投递到所有匹配的队列之后得到通知。如果没有正确投递,发布者也会收到一个负面的确认,这样发布者可以决定如何处理这种情况(比如重试发送消息或者记录错误)。

以下是如何在RabbitMQ中使用高级发布确认的步骤:

  1. 启用Publisher Confirms

    • 你需要在IModel(在RabbitMQ.Client 7.0.0-alpha2版本之前称为IChannel)上启用Publisher Confirms功能。
  2. 发送消息

    • 发送消息时,你会为每个消息分配一个唯一的ID。
  3. 等待确认

    • 发送消息后,你会等待RabbitMQ的确认。如果消息被成功处理,RabbitMQ会发送一个确认响应,包含之前分配的消息ID。
  4. 处理确认

    • 如果消息没有被成功处理,RabbitMQ会发送一个负面确认,你可以据此执行相应的错误处理逻辑。

以下是.NET客户端使用高级发布确认的代码示例:

csharp
using RabbitMQ.Client;
using System;
using System.Text;

public class RabbitMQPublisher
{
    private readonly IModel _channel;

    public RabbitMQPublisher(IModel channel)
    {
        _channel = channel ?? throw new ArgumentNullException(nameof(channel));
    }

    public void EnablePublisherConfirms()
    {
        _channel.ConfirmSelect(); // 启用Publisher Confirms
    }

    public void PublishMessage(string exchange, string routingKey, byte[] messageBody)
    {
        uint messageId = _channel.NextSequenceId(); // 获取一个唯一的消息ID
        _channel.BasicPublish(exchange, routingKey, true, null, messageBody); // 发布消息

        // 等待消息确认
        var ackEvent = new AutoResetEvent(false);
        var consumerTag = _channel.CallbackException + "Ack";
        _channel.BasicAck += (sender, ea) =>
        {
            if (ea.DeliveryTag == messageId)
            {
                // 处理消息确认
                Console.WriteLine($"Message with ID {messageId} was confirmed.");
                ackEvent.Set();
            }
        };
        _channel.BasicNack += (sender, ea) =>
        {
            if (ea.DeliveryTag == messageId)
            {
                // 处理消息未确认
                Console.WriteLine($"Message with ID {messageId} was nack.");
                ackEvent.Set();
            }
        };

        ackEvent.WaitOne(); // 等待确认事件
    }
}

示例中:

  • EnablePublisherConfirms方法用于在IModel上启用Publisher Confirms功能。
  • PublishMessage方法用于发送消息,并等待对应的确认或未确认事件。
  • 使用AutoResetEvent来等待确认事件,这是一个简单的同步机制。

实际应用中可能需要更复杂的错误处理和异步处理机制。此外,BasicAckBasicNack事件处理器需要正确地处理多线程情况,因为它们可能会从不同的线程触发