Publish/Subscribe
在前面的教程中,我们创建了一个工作队列。工作队列后面的假设是,每个任务只交付给一个工作者。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递消息。这种模式称为“发布/订阅”。
为了说明这个模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序将接收并打印它们。
在我们的日志系统中,接收程序的每个运行副本都会收到消息。这样我们就可以运行一个接收器,并将日志指向磁盘;同时,我们可以运行另一个接收器在屏幕上看到日志。
实际上,已发布的日志消息将被广播到所有接收方。
Exchange
在本教程的前几部分中,我们向队列发送和接收消息。
现在是时候介绍Rabbit中的完整消息传递模型了。
让我们快速回顾一下我们在前面教程中介绍的内容:
-
生产者是发送消息的用户应用程序。
-
队列是存储消息的缓冲区。
-
使用者是接收消息的用户应用程序。
RabbitMQ消息传递模型的核心思想是,生产者从不直接向队列发送任何消息。实际上,生产者常常根本不知道消息是否会传递到任何队列。
相反,生产者只能向交换器发送消息。交换是一件非常简单的事情。一边接收来自生产者的消息,另一边将消息推送到队列中。
交换器必须确切地知道如何处理接收到的消息。它应该附加到特定的队列吗?它应该附加到许多队列中吗?或者它应该被丢弃。
这些规则由exchange类型定义。
交换类型
direct, topic, headers and fanout.
有几种交换类型可用: direct topic, headers 和 fanout。
我们将关注最后一个——fanout。
- 创建
让我们创建这种类型的交换,并将其称为日志:
channel.exchangeDeclare("logs", "fanout");
扇出交换(fanout exchange)非常简单。
顾名思义,它只是将接收到的所有消息广播给它所知道的所有队列。这正是我们需要的记录器。
- 发布
channel.basicPublish( "logs", "", null, message.getBytes());
- 列出所有的交换类型
sudo rabbitmqctl list_exchanges
- 无名交换
channel.basicPublish("", "hello", null, message.getBytes());
空字符串表示默认的或无名的交换: 如果存在的话,消息将以routingKey指定的名称路由到队列中。
Temporary queues
您可能还记得以前我们使用的队列有一个指定的名称(还记得hello和task_queue吗?)能够命名一个队列对我们来说至关重要——我们需要将工人指向同一个队列。
当您希望在生产者和消费者之间共享队列时,为队列提供一个名称非常重要。
但我们的记录员却不是这样。我们希望听到所有日志消息,而不仅仅是其中的一个子集。我们也只对当前流动的消息感兴趣,而不是旧的消息。要解决这个问题,我们需要两件事。
首先,当我们连接到Rabbit时,我们需要一个新的空队列。为此,我们可以创建一个随机名称的队列,或者更好的做法是让服务器为我们选择一个随机队列名称。
其次,一旦我们断开用户的连接,队列就会被自动删除。
在Java客户机中,当我们不向queueDeclare()提供任何参数时,我们将创建一个具有生成名称的非持久性、互斥性、自动删除队列:
String queueName = channel.queueDeclare().getQueue();
属性配置
Bindings
我们已经创建了一个扇出交换和一个队列。现在我们需要告诉exchange将消息发送到我们的队列。exchange和队列之间的关系称为绑定。
channel.queueBind(queueName, "logs", "");
- Listing bindings
罗列出所有的队列绑定
rabbitmqctl list_bindings
示例代码
生产者
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 模拟 5 次
for (int i = 0; i < 5; i++) {
String message = UUID.randomUUID().toString();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
}
消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
测试方式
窗口 1:ReceiveLogs - 01
窗口 2:ReceiveLogs - 01
启动 EmitLog
- EmitLog 日志
[x] Sent 'e9fc8989-a72a-4626-a578-a8fa931b462d'
[x] Sent 'afe3a804-93f2-45a0-b340-b12db0638de2'
[x] Sent 'c42a6ef1-4676-434d-8de5-3105736858db'
[x] Sent 'ee432f13-ae0d-43db-be3b-b7527000a3be'
[x] Sent '0931bf27-468a-4695-ad2f-f038d9723d0c'
- ReceiveLogs - 01 日志
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'e9fc8989-a72a-4626-a578-a8fa931b462d'
[x] Received 'afe3a804-93f2-45a0-b340-b12db0638de2'
[x] Received 'c42a6ef1-4676-434d-8de5-3105736858db'
[x] Received 'ee432f13-ae0d-43db-be3b-b7527000a3be'
[x] Received '0931bf27-468a-4695-ad2f-f038d9723d0c'
- ReceiveLogs - 02 日志
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'e9fc8989-a72a-4626-a578-a8fa931b462d'
[x] Received 'afe3a804-93f2-45a0-b340-b12db0638de2'
[x] Received 'c42a6ef1-4676-434d-8de5-3105736858db'
[x] Received 'ee432f13-ae0d-43db-be3b-b7527000a3be'
[x] Received '0931bf27-468a-4695-ad2f-f038d9723d0c'
就像广播一样,每个订阅者都受到相同的内容。
源码地址
以上源代码参见 rabbitmq-pubsub
参考资料
- 官方
http://www.rabbitmq.com/tutorials/tutorial-three-java.html
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/java