应答与发布
消息应答
自动应答 (Auto-Ack)
在自动应答模式下,消费者接收到消息后,RabbitMQ会自动发送一个应答给消息代理,表示消息已被接收。这种方式下,消费者不需要显式地发送应答确认。
优点:
- 简单易用,代码实现简洁。
- 在消息处理非常快速且可靠的情况下,可以提高吞吐量。
缺点:
- 如果消费者处理消息失败或者崩溃,消息将不会重新入队,因此可能会丢失消息。
- 没有重试机制,一旦消息被确认,就没有机会重新处理它。
手动应答 (Manual Ack)
在手动应答模式下,消费者在接收到消息后需要显式地发送一个应答给消息代理,告知消息已被成功处理。这通常通过调用BasicAck
方法来完成。如果消费者在处理消息时失败或崩溃,消息将不会得到应答,RabbitMQ会将消息重新入队,使其可以被其他消费者处理。
优点:
- 提供了更强的消息可靠性。即使消费者处理失败,消息也不会丢失。
- 允许实现复杂的错误处理和重试逻辑。
缺点:
- 增加了代码的复杂性。
- 可能会降低吞吐量,因为需要等待消费者的确认。
示例代码
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
// 处理消息...
};
channel.BasicConsume(queue: "hello",
autoAck: true, // 自动应答
consumer: consumer);
}
RabbitMQ持久化
当 RabbitMQ 服务停掉以后,消息生产者发送过来的消息不丢失要如何保障?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
队列持久化:首先,需要确保队列是持久化的。这可以通过在声明队列时将
durable
参数设置为true
来实现。这样,即使RabbitMQ服务重启,队列也不会被删除。队列的持久化可以通过以下代码实现:csharpchannel.QueueDeclare("myQueue", true, false, false, null);
消息持久化:其次,消息本身也需要被标记为持久化的。在发布消息时,需要将消息的
deliveryMode
设置为2
,这表示消息是持久化的。在C#中,可以通过设置IBasicProperties
来实现:csharpvar properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; // MessageDeliveryMode.Persistent channel.BasicPublish("", "myQueue", properties, messageBody);
WARNING
如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列
不然就会出现如下错误:
以下为控制台中持久化与非持久化队列的 UI 显示区
不公平分发
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮询分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮询分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,在消费者中消费消息之前,设置参数 channel.basicQos(1);
开启成功,会看到如下结果:
不公平分发思想:如果一个工作队列还没有处理完或者没有应答签收一个消息,则不拒绝 RabbitMQ 分配新的消息到该工作队列。此时 RabbitMQ 会优先分配给其他已经处理完消息或者空闲的工作队列。如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker (工作队列)或者改变其他存储任务的策略。
预取值分发
带权的消息分发
默认消息的发送是异步发送的,所以在任何时候,channel 上不止只有一个消息来自消费者的手动确认,所以本质上是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos
方法设置「预取计数」值来完成的。
该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。
通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。
预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境 中。对于大多数应用来说,稍微高一点的值将是最佳的。
INFO
不公平分发和预取值分发都用到 basic.qos
方法,如果取值为 1,代表不公平分发,取值不为1,代表预取值分发
发布确认
生产者发布消息到 RabbitMQ 后,需要 RabbitMQ 返回「ACK(已收到)」给生产者,这样生产者才知道自己生产的消息成功发布出去。
发布确认(Publisher Confirms)是一种机制,它允许消息的生产者在消息被成功处理(即被持久化)后得到通知。这提供了一种确保消息不会在传输过程中丢失的方法。发布确认是异步的,生产者可以在发送消息后立即继续发送下一条消息,而不必等待确认。
在生产者端启用发布确认: 使用
ConfirmSelect()
方法来启用发布确认模式。一旦启用,所有通过该通道发布的消息都会被跟踪,直到它们被确认或拒绝。发送消息: 像平常一样发送消息,但是每个消息都会被分配一个唯一的
deliveryTag
。等待确认: 使用
WaitForConfirms()
方法来等待消息的确认。如果消息被成功持久化,生产者将收到一个确认。如果消息无法被持久化(例如,因为磁盘满了),生产者将收到一个否定确认。处理确认和否定确认: 你需要设置一个事件处理程序来处理确认和否定确认。这样,你可以知道每条消息是否成功发送。
示例
using System;
using System.Text;
using RabbitMQ.Client;
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 启用发布确认
channel.ConfirmSelect();
// 声明一个持久化的队列
channel.QueueDeclare("myQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
// 创建消息
var message = "Hello, World!";
var body = Encoding.UTF8.GetBytes(message);
// 设置消息属性为持久化
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; // 持久化消息
// 发布消息
channel.BasicPublish(exchange: "",
routingKey: "myQueue",
basicProperties: properties,
body: body);
// 等待确认
bool multiple = true; // 可以设置为false,如果只关心最后一条消息的确认
channel.WaitForConfirmsOrDie(); // 等待所有消息的确认,如果失败则抛出异常
Console.WriteLine("Message published and confirmed.");
}
}
}
我们首先启用了发布确认,然后声明了一个持久化的队列,并发送了一条消息。我们设置了消息的持久化属性,然后使用WaitForConfirmsOrDie()
方法等待确认。如果所有消息都被成功确认,程序将打印一条消息。如果确认失败,WaitForConfirmsOrDie()
方法将抛出一个异常。
发布确认逻辑
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认的策略
开启发布确认的方法:
发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法
//开启发布确认
channel.confirmSelect();
单条确认发布(Individual Confirms)
单条确认发布模式下,每发送一条消息,生产者就会等待一个确认 。这种方式是同步的,生产者在发送每条消息后都会暂停,直到收到确认为止。这种方式的优点是简单直观,可以确保每条消息都被确认,但缺点是吞吐量有限,因为生产者需要等待每条消息的确认 。
批量确认发布(Batch Confirms)
批量确认发布模式下,生产者可以一次性发送多条消息,然后等待一个批量确认 。这种方式允许生产者在发送一定数量的消息后,再等待确认,从而提高吞吐量。但是,如果在这个批次中有任何消息失败,生产者将不知道是哪条消息失败了,因为所有消息都是一起确认的 。这种方式的优点是提高了性能,但缺点是如果批次中的消息出现错误,无法确定是哪条消息出错 。
异步确认发布(Asynchronous Confirms)
异步确认发布模式下,生产者在发送消息后不需要等待确认,而是继续发送下一条消息 。确认过程是异步的,生产者可以在后台线程中处理确认。这种方式的优点是性能最佳,生产者不需要等待每个消息的确认,可以持续发送消息 。缺点是实现起来稍微复杂,需要处理异步逻辑 。
总结
- 单条确认发布:同步等待确认,简单但吞吐量有限 。
- 批量确认发布:批量同步等待确认,合理的吞吐量,但错误定位困难 。
- 异步确认发布:最佳性能和资源使用,在出现错误的情况下可以很好地控制 。
应答和发布区别
应答功能属于消费者,消费完消息告诉 RabbitMQ 已经消费成功。
发布功能属于生产者,生产消息到 RabbitMQ,RabbitMQ 需要告诉生产者已经收到消息。