交换机
Exchanges
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
Exchanges的类型
直接(direct):处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 abc ,则只有被标记为 abc 的消息才被转发,不会转发 abc.def,也不会转发 dog.ghi,只会转发 abc。
主题(topic):将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号「#」匹配一个或多个词,符号 匹配不多不少一个词。因此 abc.# 能够匹配到 abc.def.ghi,但是 abc. 只会匹配到 abc.def。
标题(headers):不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的。
匹配规则 x-match 有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息
扇出(fanout):不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 交换机转发消息是最快的。
默认exchange
通过空字符串("")进行标识的交换机是默认交换
// 发布消息到队列,使用默认交换机(空字符串表示默认交换机)
channel.BasicPublish(exchange: "",
routingKey: "hello", // 将消息发送到 "hello" 队列
basicProperties: null, // 不设置额外的属性
body: body);
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey) 绑定指定的 key
临时队列
之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。队列的名称我们来说至关重要,我们需要指定我们的消费者去消费哪个队列的消息。
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
创建临时队列的方式如下:
// 创建一个临时队列
string queueName = channel.QueueDeclare(queue: "",
durable: false, // 不持久化
exclusive: true, // 连接私有
autoDelete: true, // 连接关闭时自动删除
arguments: null);
QueueDeclare方法创建了一个临时队列,exclusive: true参数确保了队列是临时的,并且只在创建它的连接生命周期内可见。当连接关闭时,这个队列将自动被删除
绑定bindings
什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
Fanout exchange
Fanout exchange(广播交换器)是RabbitMQ中的一种交换器类型,它不进行消息路由,而是将消息发送到所有绑定到该交换器的队列中。Fanout交换器的主要特点是它不处理路由键(routing key),它只是简单地将消息广播到所有绑定到它的队列上。
特点
不处理路由键:Fanout交换器在消息传递时不会查看消息的路由键,因此,绑定到Fanout交换器的队列不需要指定路由键。
广播消息:Fanout交换器将接收到的消息发送到所有绑定到它的队列,无论这些队列的路由键是什么。
无顺序保证:Fanout交换器不保证消息的顺序,消息可能会以任意顺序到达队列。
适用于广播场景:Fanout交换器适用于需要将消息广播给多个消费者的场景,例如,发布-订阅模式中,当一个事件需要被多个订阅者接收时。
使用场景
- 多消费者监听同一消息:当一个消息需要被多个消费者处理时,可以使用Fanout交换器,确保所有消费者都能接收到消息。
- 负载均衡:在需要将任务分配给多个工作节点时,可以使用Fanout交换器将任务广播给所有工作节点,实现负载均衡。
- 日志收集:当需要将日志信息发送到多个日志收集器时,可以使用Fanout交换器。
创建Fanout交换器的示例代码(C#)
using RabbitMQ.Client;
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// 创建Fanout交换器
string exchangeName = "fanout_exchange";
channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
// 创建队列并绑定到Fanout交换器
string queueName = channel.QueueDeclare(queue: "", durable: false, exclusive: true, autoDelete: true, arguments: null);
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "");
// 发送消息到Fanout交换器
string message = "Hello, Fanout Exchange!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
在这个示例中,创建了一个名为fanout_exchange
的Fanout交换器,并创建了一个队列绑定到这个交换器。然后,我们发送了一条消息到Fanout交换器,这条消息会被广播到所有绑定到该交换器的队列中。
Direct exchange
Direct exchange(直连交换器)是RabbitMQ中的一种交换器类型,它用于将消息路由到一个或多个特定的队列。在Direct交换器中,消息的路由是基于路由键(routing key)的匹配。如果消息的路由键与队列的绑定键完全匹配,那么消息就会被发送到该队列。
特点
路由键匹配:Direct交换器根据消息的路由键和队列的绑定键进行匹配,只有当两者完全相同时,消息才会被路由到对应的队列。
点对点消息传递:Direct交换器通常用于点对点消息传递,即一个消息只被一个消费者处理。
顺序保证:Direct交换器可以保证消息按照发送顺序到达队列。
灵活的路由:通过设置不同的路由键,可以将消息路由到不同的队列,实现灵活的消息路由策略。
使用场景
- 任务分配:当需要将不同类型的任务分配给不同的处理者时,可以使用Direct交换器,通过设置不同的路由键来实现。
- 订单处理:在电子商务系统中,可以根据订单类型(如电子商品、实体商品)将订单消息路由到不同的处理队列。
- 日志处理:根据不同的日志级别(如INFO、WARNING、ERROR)将日志消息路由到不同的处理队列。
创建Direct交换器的示例代码(C#)
using RabbitMQ.Client;
using System;
using System.Text;
class Program
{
static void Main(string[] args)
{
// 创建与RabbitMQ的连接
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// 声明Direct交换器
string exchangeName = "direct_exchange";
channel.ExchangeDeclare(exchange: exchangeName, type: "direct", durable: false);
// 创建队列并绑定到Direct交换器
string queueName = channel.QueueDeclare(queue: "", durable: false, exclusive: true, autoDelete: true, arguments: null);
string routingKey = "info";
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey);
// 设置消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received {message}");
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
// 发送消息
string message = "Hello, Direct Exchange!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
代码解释
- 连接到RabbitMQ:使用
ConnectionFactory
创建到RabbitMQ服务器的连接。 - 声明Direct交换器:使用
ExchangeDeclare
方法声明一个Direct类型的交换器。 - 创建队列并绑定:创建一个队列,并使用
QueueBind
方法将其绑定到Direct交换器上,指定一个路由键。 - 设置消费者:为队列设置一个消费者,当消息到达时,消费者会接收并打印消息。
- 发送消息:使用
BasicPublish
方法向Direct交换器发送消息,并指定一个路由键。只有当消息的路由键与队列的绑定键相匹配时,消息才会被路由到该队列。
运行这段代码后,你将看到消息被正确地路由到指定的队列,并由消费者接收。
Topics exchange
Topic exchange(主题交换器)是RabbitMQ中的一种交换器类型,它允许你根据消息的路由键将消息路由到一个或多个队列,支持模式匹配。这种模式类似于Direct exchange(直连交换器),但是它使用通配符来匹配路由键,而不是要求完全一致。
特点
模式匹配:主题交换器使用特殊的通配符来匹配路由键。
*
(星号):匹配任意一个单词(即路由键中的一个点“.”分隔的部分)。#
(井号):匹配零个或多个单词。
灵活性:通过使用通配符,主题交换器可以非常灵活地定义路由规则,将消息路由到一个或多个队列。
使用场景:适用于需要根据消息类型或级别将消息路由到不同处理逻辑的场景,例如日志级别(info, warning, error)或不同的业务事件(order.created, order.cancelled)。
工作原理
- 生产者将消息发送到主题交换器时,会指定一个路由键。
- 交换器根据绑定到队列上的模式(binding key),决定将消息路由到哪些队列。
- 如果路由键与绑定键匹配(根据通配符规则),消息就会被发送到对应的队列。
示例
假设有一个路由键 "stock.us.aa"
,并且有以下队列绑定到主题交换器:
- 队列A绑定键为
"stock.#"
:匹配所有以“stock”开头的消息。 - 队列B绑定键为
"stock.us.*"
:匹配所有以“stock.us”开头的消息。
在这种情况下,消息 "stock.us.aa"
会被路由到队列A和队列B,因为它们各自的绑定键都与消息的路由键匹配。 当然,以下是一个使用C#和RabbitMQ客户端库来创建Topic交换器并发送消息的完整示例。这个示例将展示如何创建一个Topic交换器,创建几个队列并将它们绑定到这个交换器上,使用不同的绑定键(routing keys)和通配符,然后发布消息到交换器,消息会被广播到所有匹配的队列。
代码
using RabbitMQ.Client;
using System;
using System.Text;
class Program
{
static void Main(string[] args)
{
// 创建与RabbitMQ的连接
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// 声明Topic交换器
string exchangeName = "topic_exchange";
channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: false);
// 创建队列并绑定到Topic交换器
string[] queueNames = { "logs.info", "logs.warning", "logs.error" };
foreach (var queueName in queueNames)
{
channel.QueueDeclare(queue: queueName, durable: false, exclusive: true, autoDelete: true, arguments: null);
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName.Split('.')[1]); // 使用队列名的第二部分作为路由键
}
// 设置消费者
foreach (var queueName in queueNames)
{
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received in {queueName}: {message}");
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}
// 发送消息
string[] messages = {
"info: This is an info message",
"warning: This is a warning message",
"error: This is an error message"
};
foreach (var message in messages)
{
var routingKey = message.Split(':')[0]; // 从消息中提取路由键
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine($" [x] Sent {message}");
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
代码解释
- 连接到RabbitMQ:使用
ConnectionFactory
创建到RabbitMQ服务器的连接。 - 声明Topic交换器:使用
ExchangeDeclare
方法声明一个Topic类型的交换器。 - 创建队列并绑定:创建三个队列,并将它们绑定到Topic交换器上,使用不同的路由键。这里使用队列名的第二部分作为路由键,例如,对于队列
logs.info
,路由键是info
。 - 设置消费者:为每个队列设置一个消费者,当消息到达时,消费者会接收并打印消息。
- 发送消息:使用
BasicPublish
方法向Topic交换器发送消息,并指定一个路由键。消息将根据路由键和通配符规则被路由到相应的队列。
运行这段代码后,你将看到消息被正确地路由到指定的队列,并由消费者接收。这