Worker Queue
拓展阅读
工作队列
工作队列(又名:任务队列)背后的主要思想是避免立即执行占用大量资源的任务,并且必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的worker进程将弹出任务并最终执行作业。当您运行许多工作者时,任务将在他们之间共享。
这个概念在web应用程序中尤其有用,在web应用程序中,在短HTTP请求窗口中不可能处理复杂任务。
代码
- NewTask.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* <p> 任务 </p>
*/
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 模拟 5 次任务
for(int i = 0; i < 5; i++) {
String message = UUID.randomUUID().toString();
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
}
- Worker.java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* <p> 工人 </p>
*/
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// // accept only one unack-ed message at a time
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
}
}
Round-robin dispatching
使用任务队列的优点之一是能够轻松地并行工作。如果我们正在积累积压的工作,我们可以增加更多的工人,这样就可以很容易地扩大规模。
首先,让我们尝试同时运行两个worker实例。它们都将从队列中获取消息,但具体如何呢?让我们来看看。
你需要打开三个控制台。两个将运行工人程序。这些控制台将是我们的两个消费者——C1和C2。
默认情况下,RabbitMQ将依次向下一个使用者发送每条消息。平均而言,每个消费者都会收到相同数量的消息。这种消息分发的方式称为循环。在三个或更多的员工身上试试。
模拟测试
开启两个 worker 命令行窗口,然后执行 NewTask。
- NewTask 日志
[x] Sent 'b80c968a-0515-4c12-b067-d5b9ba87bc87'
[x] Sent '2925c8a1-6095-4d3e-a7ac-2e412fb4963e'
[x] Sent '180a72ed-dbe7-4632-9787-a64a7bdc8eaf'
[x] Sent 'ea6c55cc-8563-4654-85ee-231269fb74c0'
[x] Sent 'daeb218a-d247-4bfe-9e13-162e3d67183e'
- Worker 窗口1
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'b80c968a-0515-4c12-b067-d5b9ba87bc87'
[x] Done
[x] Received 'ea6c55cc-8563-4654-85ee-231269fb74c0'
[x] Done
- Worker 窗口2
[*] Waiting for messages. To exit press CTRL+C
[x] Received '2925c8a1-6095-4d3e-a7ac-2e412fb4963e'
[x] Done
[x] Received '180a72ed-dbe7-4632-9787-a64a7bdc8eaf'
[x] Done
[x] Received 'daeb218a-d247-4bfe-9e13-162e3d67183e'
[x] Done
Message acknowledgment
消息确认
完成一项任务可能需要几秒钟。您可能想知道,如果某个消费者开始了一项很长的任务,但只完成了部分任务,那么会发生什么情况呢?在我们当前的代码中,一旦RabbitMQ向客户发送一条消息,它就会立即标记为删除。在这种情况下,如果您杀死一个工人,我们将失去消息,它只是处理。我们还会丢失所有发送给这个工人但尚未处理的消息。
但我们不想失去任何任务。如果一个工人死了,我们希望把任务交给另一个工人。
为了确保消息不会丢失,RabbitMQ支持消息确认。
使用者返回ack(nowledgement),告诉RabbitMQ已经接收、处理了特定的消息,RabbitMQ可以随意删除它。
未发送 ack 死亡的 Worker
如果使用者在没有发送ack的情况下死亡(其通道关闭、连接关闭或TCP连接丢失),RabbitMQ将理解消息没有完全处理,并将重新排队。如果在同一时间有其他消费者在线,它会迅速将其重新发送给另一个消费者。这样你就可以确保没有信息丢失,即使工人偶尔会死去。
没有任何消息超时;当使用者死亡时,RabbitMQ将重新传递消息。即使处理一条消息需要非常非常长的时间,也没有问题。
手动消息确认在默认情况下是打开的。
在前面的示例中,我们通过autoAck=true标志显式地关闭了它们。当我们完成一项任务时,是时候将此标志设置为false并从工作人员发送适当的确认信息了。
处理方式
就像我们上面的代码一样。
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
使用这段代码,我们可以确保即使您在处理消息时使用CTRL+C杀死了一个工人,也不会丢失任何东西。在工人死后不久,所有未确认的信息将被重新发送。
确认必须通过相同的通道发送,这是为了在同一通道上接收。尝试承认使用不同的通道将导致通道级别的协议异常。
Forgotten acknowledgment
错过基础是一个常见的错误。这是一个容易犯的错误,但后果是严重的。当您的客户端退出时,消息将被重新传递(这可能看起来像随机的重新传递),但是RabbitMQ将占用越来越多的内存,因为它不能释放任何未被添加的消息。
为了调试这种错误,可以使用 rabbitmqctl 打印 messages_unrecognized
字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Message durability
我们已经学会了如何确保即使用户死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。需要做两件事情来确保消息不会丢失:我们需要将队列和消息都标记为持久的。
首先,我们需要确保RabbitMQ永远不会丢失队列。
为了做到这一点,我们需要声明它是可持久化的:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
此时,我们确信即使RabbitMQ重新启动,task_queue 队列也不会丢失。
现在我们需要将我们的消息标记为持久化——通过将 MessageProperties(它实现了BasicProperties)设置为 PERSISTENT_TEXT_PLAIN
的值。
注意
将消息标记为持久性并不能完全保证消息不会丢失。
虽然它告诉RabbitMQ将消息保存到磁盘,但是仍然有一个短时间窗口,当RabbitMQ接受了一条消息并且还没有保存它。
此外,RabbitMQ不会对每条消息执行 fsync(2)
操作——它可能只是被保存为缓存,而不是真正写入磁盘。
持久性保证并不强,但对于我们的简单任务队列来说已经足够了。
如果您需要更强的保证,那么您可以使用publisher confirmed。
Fair dispatch
您可能已经注意到,调度仍然不能完全按照我们的要求工作。
例如,在有两个工人的情况下,当所有奇怪的信息都很重,偶数信息都很轻时,一个工人会一直很忙,而另一个几乎不做任何工作。
好吧,RabbitMQ对此一无所知,它仍然会均匀地发送消息。
这是因为RabbitMQ只是在消息进入队列时发送消息。它不查看消费者未确认消息的数量。它只是盲目地将第n个消息发送给第n个消费者。
为了克服这个缺点,我们可以使用 prefetchCount = 1
的 basicQos 方法。
这告诉RabbitMQ不要一次给一个工人发送多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向工作人员发送新消息。相反,它会把它发送给下一个不太忙的工人。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意大小
如果所有的工人都很忙,你的队伍就会排满。
你会想要关注这个问题,也许会增加更多的员工,或者有其他的策略。
源码地址
以上源代码参见 rabbitmq-workerqueue
参考资料
- 官方
http://www.rabbitmq.com/getstarted.html
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/java