Routing
在前面的教程中,我们构建了一个简单的日志系统。我们能够向许多接收器广播日志消息。
在本教程中,我们将为它添加一个特性——我们将使订阅消息的一个子集成为可能。
例如,我们将能够仅将关键错误消息直接指向日志文件(以节省磁盘空间),同时仍然能够在控制台打印所有日志消息。
Bindings
前面的例子绑定如下:
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定是交换器和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。
绑定可以接受额外的路由关键参数。为了避免与 basic_publish
参数混淆,我们将它称为绑定键。
这就是我们如何创建一个键绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
绑定键的含义取决于交换类型。我们以前使用过的扇出交换,只是忽略了它的价值。
Direct exchange
我们的日志系统从以前的教程广播所有消息到所有消费者。我们希望扩展此功能,以允许根据消息的严重程度对其进行过滤。
例如,我们可能希望一个将日志消息写入磁盘的程序只接收关键错误,而不会在警告或信息日志消息上浪费磁盘空间。
我们使用的是扇出交换器,它没有给我们太多的灵活性——它只能进行无目的的广播。
我们将使用直接交换。直接交换背后的路由算法很简单——消息转到队列,其绑定键与消息的路由键完全匹配。
为了说明这一点,考虑以下设置:
在这个设置中,我们可以看到直接exchange X,它绑定了两个队列。第一个队列用绑定键橙色绑定,第二个队列有两个绑定,一个用绑定键黑色绑定,另一个用绿色绑定。
在这样的设置中,发布到交换器的带有路由关键字橙色的消息将被路由到队列Q1。带有黑色或绿色路由键的消息将转到Q2。所有其他消息将被丢弃。
Multiple bindings
使用相同的绑定键绑定多个队列是完全合法的。
在我们的示例中,我们可以使用绑定键black在X和Q1之间添加绑定。
在这种情况下,直接交换将表现得像扇出,并将消息广播到所有匹配的队列。带有路由密钥黑色的消息将同时发送到Q1和Q2。
Emitting logs
我们将在日志系统中使用这个模型。我们将发送消息到一个直接交换器,而不是扇出。我们将提供日志严重性作为路由键。
这样,接收程序就能够选择它想要接收的严重性。让我们首先关注发出日志。
和往常一样,我们需要首先创建一个exchange:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
发送消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
Subscribing
接收消息的工作方式将与上一教程一样,只有一个例外——我们将为感兴趣的每个错误级别创建一个新的绑定。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
代码实现
ps: 有种按照不同的 topic 分类的感觉。
消息的生产者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
for(int i = 0; i < 8; i++) {
String severity = getSeverity();
String message = UUID.randomUUID().toString();
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "': '" + message + "'");
}
channel.close();
connection.close();
}
/**
* 随机一下一种日志级别
* @return 日志级别
*/
private static String getSeverity() {
Random random = new Random();
String[] strings = "trace,debug,info,warn,error".split(",");
int randomIndex = random.nextInt(strings.length);
return strings[randomIndex];
}
}
消息的接收者
为了方便,分成两大类。错误的和其他级别的。
- ReceiveLogsDirectError.java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class ReceiveLogsDirectError {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
// 支持错误级别
System.out.println("【队列级别】" + "error");
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
- ReceiveLogsDirectCommon.java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class ReceiveLogsDirectCommon {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
List<String> severities = Arrays.asList("trace", "info", "debug", "warn");
System.out.println("【队列级别】" + severities);
for(String severity : severities){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
测试
分别打开接收者两个命令行,然后启动生产者。
- 生产者日志
[x] Sent 'error': '1233e7e8-d408-4d82-a2ed-411c5cf80e48'
[x] Sent 'info': '7ac26e2d-c1c7-45b1-8200-911d9837af51'
[x] Sent 'info': '50239cf6-ea98-47ef-862b-000af00a1d9d'
[x] Sent 'trace': 'a199b276-90e4-4534-b002-bf594577cf9e'
[x] Sent 'trace': 'ca50ed6e-1b98-470a-8883-a8e9dabbe533'
[x] Sent 'error': '61a90a12-a0bf-4129-baab-ed2537d311f7'
[x] Sent 'debug': '672a513e-f230-4a98-ad9f-59a008daf401'
[x] Sent 'trace': 'b3b86bf4-27cb-411e-84b7-f73700420c28'
- 错误级别日志接收者
【队列级别】error
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'error':'1233e7e8-d408-4d82-a2ed-411c5cf80e48'
[x] Received 'error':'61a90a12-a0bf-4129-baab-ed2537d311f7'
- 普通级别日志接收者
【队列级别】[trace, info, debug, warn]
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'info':'7ac26e2d-c1c7-45b1-8200-911d9837af51'
[x] Received 'info':'50239cf6-ea98-47ef-862b-000af00a1d9d'
[x] Received 'trace':'a199b276-90e4-4534-b002-bf594577cf9e'
[x] Received 'trace':'ca50ed6e-1b98-470a-8883-a8e9dabbe533'
[x] Received 'debug':'672a513e-f230-4a98-ad9f-59a008daf401'
[x] Received 'trace':'b3b86bf4-27cb-411e-84b7-f73700420c28'
源码地址
以上源代码参见 rabbitmq-routing
参考资料
- 官方
http://www.rabbitmq.com/tutorials/tutorial-four-java.html
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/java