(本教程是使用Net客户端,也就是针对微软技术平台的)
在前一个教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务会被交付给一个【工人】。在这一部分我们将做一些完全不同的事情--我们将向多个【消费者】传递信息。这种模式被称为“发布/订阅”。 为了说明这种模式,我们将构建一个简单的日志系统。它将包括两个程序,第一个将发出日志消息,第二个将接收并打印它们。 在我们的日志系统中每个接收程序的运行副本都会得到消息。这样我们就可以运行一个接收者程序,将日志记录到磁盘;同时我们可以运行另一个接收者程序,并在屏幕上看到打印出来的日志。 从本质上讲,已发布的日志消息将被广播到所有的接收者程序。1、消息交换机【Exchange】 在教程的前面部分,我们从队列中发送和接收消息。在RabbitMQ中,现在是时候引入全消息模型。 让我们快速看看我们以前的教程讲了什么: 【生产者】:就是一个用于发送消息的用户程序 【消费者】:就是一个用于接收和使用消息的用户程序 【队列】:是一个暂存消息的缓存区 RabbitMQ消息传递模型的核心思想是,【生产者】不直接发送任何信息到队列。事实上,【生产者】根本就不知道消息是否会被传送到任何队列。 相反,【生产者】只能发送消息到【消息交换机】。交换是件很简单的事。一方面它接收来自【生产者】的消息,另一方面是将接收到消息推送到队列中。【消息交换机】必须知道它如何处理接收消息的确切方法。是否应该发送到特定队列?它应该被发送到多个队列呢?或者它应该被丢弃。该规则由【消息交换机】的类型来定义。 这里有一些可用的【消息交换机】的类型:【Direct】直接,【Topic】主题,【Headers】标题和【Fanout】扇出。我们将集中关注最后一个-【Fanout】扇出。让我们创建一个这种类型的【消息交换机】,并给它命名为Logs:channel.ExchangeDeclare("logs", "fanout");
sudo rabbitmqctl list_exchanges
在这个列表中会有一些amq.*【消息交换机】和默认(未命名)消息交换机。这些都是默认创建的,但现在不太可能需要使用它们。
默认的消息交换机 在教程前面的部分我们队【消息交换机】是一无所知,但是我依然可以发送消息去想去的队列,那是因为我们使用了默认的【消息交换机】,这些默认的消息 交换机我用使用两个双引号“”来标识。 我们回忆一下以前是如何发送消息的:var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
第一个参数是【消息交换机】的名称。空字符串表示默认或未命名的消息交换机:消息会被路由到指定的routingkey名称的队列,如果它存在的话。
现在,我们可以发布到我们命名的【消息交换机】:var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
var queueName = channel.QueueDeclare().QueueName;
在这点上,QueueName包含随机队列名称。例如,它可能看起来像amq.gen-jzty20brgko-hjmujj0wlg。
3、绑定【Binding】我们已经创建了一个【Fanout】类型的【消息交换机】和队列。现在我们需要告诉【消息交换机】向我们的队列发送消息。【消息交换机】和【队列】之间的关系称为绑定。
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
rabbitmqctl list_bindings
4、把所有的代码整合到一起
【生产者】的程序,它发出的日志消息,看起来并没有和以前的教程有很大的不同。最重要的变化是,我们现在想发送的消息是到达我们指定名称的日志【消息交换机】,而不是无名的。我们在发送消息的时候需要提供一个routingkey表示的名称,但【Fanout】类型的【消息交换机】会容忽视该routingKey的值的。这里有EmitLog.cs文件代码:1 using System; 2 using RabbitMQ.Client; 3 using System.Text; 4 5 class EmitLog 6 { 7 public static void Main(string[] args) 8 { 9 var factory = new ConnectionFactory() { HostName = "localhost" };10 using(var connection = factory.CreateConnection())11 using(var channel = connection.CreateModel())12 {13 channel.ExchangeDeclare(exchange: "logs", type: "fanout");14 15 var message = GetMessage(args);16 var body = Encoding.UTF8.GetBytes(message);17 channel.BasicPublish(exchange: "logs",18 routingKey: "",19 basicProperties: null,20 body: body);21 Console.WriteLine(" [x] Sent {0}", message);22 }23 24 Console.WriteLine(" Press [enter] to exit.");25 Console.ReadLine();26 }27 28 private static string GetMessage(string[] args)29 {30 return ((args.Length > 0)31 ? string.Join(" ", args)32 : "info: Hello World!");33 }34 }
(EmitLog.cs 的源码)
如你所见,在建立连接后,我们声明了【消息交换机】。此步骤是必要的,因为对非存在【消息交换机】的发送是被禁止的。如果没有队列绑定到【消息交换机】,消息将会丢失,但对我们来说没有问题;如果没有【消费者】正在侦听,我们可以安全地丢弃消息。以下是ReceiveLogs.cs的代码:1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 6 class ReceiveLogs 7 { 8 public static void Main() 9 {10 var factory = new ConnectionFactory() { HostName = "localhost" };11 using(var connection = factory.CreateConnection())12 using(var channel = connection.CreateModel())13 {14 channel.ExchangeDeclare(exchange: "logs", type: "fanout");15 16 var queueName = channel.QueueDeclare().QueueName;17 channel.QueueBind(queue: queueName,18 exchange: "logs",19 routingKey: "");20 21 Console.WriteLine(" [*] Waiting for logs.");22 23 var consumer = new EventingBasicConsumer(channel);24 consumer.Received += (model, ea) =>25 {26 var body = ea.Body;27 var message = Encoding.UTF8.GetString(body);28 Console.WriteLine(" [x] {0}", message);29 };30 channel.BasicConsume(queue: queueName,31 noAck: true,32 consumer: consumer);33 34 Console.WriteLine(" Press [enter] to exit.");35 Console.ReadLine();36 }37 }38 }
cd ReceiveLogsdotnet run > logs_from_rabbit.log
如果你希望看到你的屏幕上的日志,生成一个新的终端和运行:
cd ReceiveLogsdotnet run
当然,要发送日志类型:
cd EmitLogdotnet run
使用rabbitmqctl list_bindings可以验证代码确实创建了我们想要的【绑定】和【队列】。运行两个ReceiveLogs.cs程序时,您应该看到如下所示:
rabbitmqctl list_bindings# => Listing bindings ...# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []# => ...done.
好了,终于翻译了第三篇教程了,翻译的不好,请见谅。如有大家英文比较好可以查看原文地址: