08 消息消费 API 与版本变迁说明 从本篇开始我们将详细介绍 RockeMQ 的消息消费端的 API。

消息消费类图

RocketMQ 消费端的 API 如下图所示:

1

其核心类图如下所示。

MQAdmin

MQ 一些基本的管理功能,例如创建 Topic,这里稍微有点奇怪,消费端应该不需要继承该接口。该类在消息发送 API 章节已详细介绍,再次不再重复说明。

MQConsumer

MQ 消费者,这个接口定义得过于简单,如果该接口需要,可以将其子接口一些共同的方法提取到该接口中。 Set fetchSubscribeMessageQueues(final String topic)

获取分配该 Topic 所有的读队列。

MQPushConsumer

RocketMQ 支持推、拉两种模式,该接口是拉模式的接口定义。 void start()

启动消费者。

void shutdown()

关闭消费者。

void registerMessageQueueListener(String topic, MessageQueueListener listener)

注册消息队列变更回调时间,即消费端分配到的队列发生变化时触发的回调函数,其声明如下:

2

其参数说明如下:

  • String topic :主题。
  • Set mqAll :该 Topic 所有的队列集合。
  • Set mqDivided :分配给当前消费者的消费队列。 PullResult pull(MessageQueue mq, String subExpression, long offset,int maxNums, long timeout)

消息拉取,应用程序可以通过调用该方法从 RocketMQ 服务器拉取一篇消息,其参数含义说明如下:

  • MessageQueue mq :消息消费队列。
  • String subExpression :消息过滤表达式,基于 Tag、SQL92 的过滤表达式。
  • long offset :消息偏移量,消息在 ConsumeQueue 中的偏移量。
  • int maxNums :一次消息拉取返回的最大消息条数。
  • long timeout :本次拉取的超时时间。 PullResult pull(MessageQueue mq, MessageSelector selector, long offset,int maxNums, long timeout)

pull 重载方法,通过 MessageSelector 构建消息过滤对象,可以通过 MessageSelector 的 buildSql、buildTag 两个方法构建过滤表达式。

void pull(MessageQueue mq, String subExpression, long offset, int maxNums,PullCallback pullCallback)

异步拉取,调用其异步回调函数 PullCallback。

PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression,long offset, int maxNums)

拉取消息,如果服务端没有新消息待拉取,一直阻塞等待,直到有消息返回,同样该方法有一个重载放假支持异步拉取。

void updateConsumeOffset(MessageQueue mq, long offset)

更新消息消费处理进度。

long fetchConsumeOffset(MessageQueue mq, boolean fromStore)

获取指定消息消费队列的消费进度。其中参数 fromStore 如果为 true,表示从消息消费进度存储文件中获取消费进度。

Set fetchMessageQueuesInBalance(String topic)

获取当前正在处理的消息消费队列(通过消息队列负载机制分配的队列)。

void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)

消息消费失败后发送的 ACK。

MQPushConsumer

RocketMQ 推模式消费者接口。 void start()

启动消费者。

void shutdown()

关闭消费者。

void registerMessageListener(MessageListenerConcurrently messageListener)

注册并发消费模式监听器。

void registerMessageListener(MessageListenerOrderly messageListener)

注册顺序消费模式监听器。

void subscribe(String topic, String subExpression)

订阅主题。其参数说明如下:

  • String topic : 订阅的主题,RocketMQ 支持一个消费者订阅多个主题,操作方式是多次调用该方法。
  • String subExpression :消息过滤表达式,例如传入订阅的 tag,SQL92 表达式。 void subscribe(String topic, MessageSelector selector)

订阅主题,重载方法,MessageSelector 提供了 buildSQL、buildTag 的订阅方式。

void unsubscribe(String topic)

取消订阅。

void suspend()

挂起消费。

void resume()

恢复继续消费。

DefaultMQPushConsumer

RocketMQ 消息推模式默认实现类。

DefaultMQPullConsumer

RocketMQ 消息拉取模式默认实现类。

在 RocketMQ 的内部实现原理中,其实现机制为 PULL 模式,而 PUSH 模式是一种伪推送,是对 PULL 模式的封装,PUSH 模式的实现原理如下图所示:

3

即 PUSH 模式就是对 PULL 模式的封装,每拉去一批消息后,提交到消费端的线程池(异步),然后马上向 Broker 拉取消息,即实现类似“推”的效果。

从 PULL 模式来看,消息的消费主要包含如下几个方面:

  • 消息拉取,消息拉取模式通过 PULL 相关的 API 从 Broker 指定消息消费队列中拉取一批消息到消费消费客户端,多个消费者需要手动完成队列的分配。
  • 消息消费端处理完消费,需要向 Broker 端报告消息处理队列,然后继续拉取下一批消息。
  • 如果遇到消息消费失败,需要告知 Broker,该条消息消费失败,后续需要重试,通过手动调用 sendMessageBack 方法实现。

而 PUSH 模式就上述这些处理操作无需使用者考虑,只需告诉 RocketMQ 消费者在拉取消息后需要调用的事件监听器即可,消息消费进度的存储、消息消费的重试统一由 RocketMQ Client 来实现。

消息消费 API 简单使用示例

从上文基本可以得知,推模式 API 与拉模式 API 在使用层面的差别,可以简单理解为汽车领域的自动挡与手动挡。在实际业务类场景中,通常使用的是推送风格的 API,适合实时监控;但在大数据领域,通常是跑批处理即定时类任务,故大数据领域通常使用拉模式更多。

接下来编写几个示例代码对拉取、推送相关 API 进行一个使用方面的演示。

RocketMQ 拉模式核心 API 使用示例

使用场景:例如公司大数据团队需要对订单进行分析,为了提高计算效能,采取每 2 个小时调度一次,每批任务处理任务启动之前的所有消息。

首先我先给出一个基于 RocketMQ PULL 的 API 的编程示例代码,本示例接近生产实践,示例代码如下: import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class PullConsumerTest { public static void main(String[] args) throws Exception { Semaphore semaphore = new Semaphore(); Thread t = new Thread(new Task(semaphore)); t.start(); CountDownLatch cdh = new CountDownLatch(1); try { //程序运行 120s 后介绍 cdh.await(120 /* 1000, TimeUnit.MILLISECONDS); } finally { semaphore.running = false; } } /// /* 消息拉取核心实现逻辑 /*/ static class Task implements Runnable { Semaphore s = new Semaphore(); public Task(Semaphore s ) { this.s = s; } public void run() { try { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(“dw_pull_consumer”); consumer.setNamesrvAddr(“127.0.01:9876”); consumer.start(); Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); Set msgQueueList = consumer. fetchSubscribeMessageQueues("TOPIC_TEST"); // 获取该 Topic 的所有队列 if(msgQueueList != null && !msgQueueList.isEmpty()) { boolean noFoundFlag = false; while(this.s.running) { if(noFoundFlag) { // 没有找到消息,暂停一下消费 Thread.sleep(1000); } for( MessageQueue q : msgQueueList ) { PullResult pullResult = consumer.pull(q, "/*", decivedPulloffset(offsetTable , q, consumer) , 3000); System.out.println("pullStatus:" + pullResult.getPullStatus()); switch (pullResult.getPullStatus()) { case FOUND: doSomething(pullResult.getMsgFoundList()); break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: case OFFSET_ILLEGAL: noFoundFlag = true; break; default: continue ; } //提交位点 consumer.updateConsumeOffset(q, pullResult.getNextBeginOffset()); } System.out.println("balacne queue is empty: " + consumer. fetchMessageQueuesInBalance("TOPIC_TEST").isEmpty()); } } else { System.out.println("end,because queue is enmpty"); } consumer.shutdown(); System.out.println("consumer shutdown"); } catch (Throwable e) { e.printStackTrace(); } } } //*/* 拉取到消息后具体的处理逻辑 /*/ private static void doSomething(List msgs) { System.out.println("本次拉取到的消息条数:" + msgs.size()); } public static long decivedPulloffset(Map<MessageQueue, Long> offsetTable, MessageQueue queue, DefaultMQPullConsumer consumer) throws Exception { long offset = consumer.fetchConsumeOffset(queue, false); if(offset < 0 ) { offset = 0; } System.out.println("offset:" + offset); return offset; } static class Semaphore { public volatile boolean running = true; } }

关于上述代码,提供了优雅的线程拉取的方法,消息的拉取实现主要在任务 Task 的 run 方法中,主要的实现技巧如下:

  • 首先根据 MQConsumer 的 fetchSubscribeMessageQueues 的方法获取 Topic 的所有队列信息。
  • 然后遍历所有队列,依次通过 MQConsuemr 的 PULL 方法从 Broker 端拉取消息。
  • 对拉取的消息进行消费处理。
  • 通过调用 MQConsumer 的 updateConsumeOffset 方法更新位点,但需要注意的是这个方法并不是实时向 Broker 提交,而是客户端会启用以线程,默认每隔 5s 向 Broker 集中上报一次。

上面的示例演示的是一个消费组只有一个消费者,如果有多个消费组呢?这里就涉及到队列的重新分配,而每一个消费者是否只负责拉取分配的队列,是不是觉得这个直接使用 PULL 模式,是不是觉得有点复杂了。笔者也觉得是,接下来我们来看一下 PUSH 模式。

RocketMQ 推模式使用示例

在 RocketMQ 中绝大数场景中,通常会选择使用 PUSH 模式,因为 PUSH 模式是对 PULL 模式的封装,将消息的拉取、消息队列的自动负载、消息进度(位点)自动提交、消息消费重试都进行了封装,无需使用者关心,其示例代码如下: public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“dw_test_consumer_6”); consumer.setNamesrvAddr(“127.0.0.1:9876”); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe(“TOPIC_TEST”, “/*”); consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle()); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { try { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Throwable e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.printf("Consumer Started.%n"); }

上面的代码是不是非常简单。在后续的文章我们会重点对 PUSH 模式消费者的核心属性即工作原理做详细的介绍。使用 DefaultMQPushConsumer 开发一个消费者,其代码基本覆盖如下几个方面:

  • 首先 new DefaultMQPushConsumer 对象,并指定一个消费组名。
  • 然后设置相关参数,例如 nameSrvAdd、消费失败重试次数、线程数等(可以设置哪些参数将在下篇文章中详细介绍)。
  • 通过调用 setConsumeFromWhere 方法指定初次启动时从什么地方消费,默认是最新的消息开始消费。
  • 通过调用 setAllocateMessageQueueStrategy 指定队列负载机制,默认平均分配。
  • 通过调用 registerMessageListener 设置消息监听器,即消息处理逻辑,最终返回 CONSUME_SUCCESS(成功消费)或 RECONSUME_LATER(需要重试)。 温馨提示,关于消息消费的详细使用,将在后面的文章中进行详细介绍。

消息消费 API 版本演变说明

RocketMQ 在消费端的 API 相对来说还是比较稳定的,只是在 RocketMQ 4.6.0 版本引入了 DefaultLitePullConsumer,如果上述 PULL 示例代码引用的 Client 包版本为 4.6.0,细心的读者朋友们肯定会发现 DefaultMQPullConsumer 已过期,取代它的正是 DefaultLitePullConsumer。那这是什么原因呢?

我想只要大家使用过 DefaultMQPullConsumer 编写代码后,就会发现这个类的 API 太底层,使用者需要考虑的问题太多,例如队列负载、消费进度存储等等方面,可以毫不夸张地说,要用好 DefaultMQPullConsumer 还是不那么容易的。

RocketMQ 设计者也意识到了这样的问题,故引入了 DefaultLitePullConsumer,按照官方文档上的介绍,该类具备如下特性:

  • 支持以订阅方式进行消息消费,支持消费队列自动再平衡。
  • 支持手动分配队列的方式进行消息消费,此模式不支持队列自动再平衡。
  • 提供 seek/commit 方法来重置、提交消费位点。

温馨提示:由于 DefaultLitePullConsumer 的内容比较多,我们将在后续单独一篇文章对其参数、方法、使用示例进行详细介绍,故本篇只是告知其引入的目的。

小结

本篇详细介绍了 RocketMQ PUSH、PULl 两种消息消费模式的核心 API,并对相关 API 进行了演示,后续会详细结合实际场景,并梳理核心参数,给出使用建议,后面引出了 RocketMQ 消费者一个重大的版本变更。

参考资料

https://learn.lianglianglee.com/%e4%b8%93%e6%a0%8f/RocketMQ%20%e5%ae%9e%e6%88%98%e4%b8%8e%e8%bf%9b%e9%98%b6%ef%bc%88%e5%ae%8c%ef%bc%89/08%20%e6%b6%88%e6%81%af%e6%b6%88%e8%b4%b9%20API%20%e4%b8%8e%e7%89%88%e6%9c%ac%e5%8f%98%e8%bf%81%e8%af%b4%e6%98%8e.md