DefaultPullConsumer

类简介

  1. DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer

  2. DefaultMQPullConsumer主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,也可以自定义与控制offset位置。

  3. 优势:consumer可以按需消费,不用担心自己处理能力,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适消息延迟与忙等。

  4. 缺点:由于主动权在消费方,消费方无法及时获取最新的消息。

比较适合不及时批处理场景。

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
 
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
 
public class MQPullConsumer {
 
	private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
	
	public static void main(String[] args) throws MQClientException {
		DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
		consumer.setNamesrvAddr("127.0.0.1:9876");
		consumer.start();
		// 从指定topic中拉取所有消息队列
		Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
		for(MessageQueue mq:mqs){
			try {
				// 获取消息的offset,指定从store中获取
				long offset = consumer.fetchConsumeOffset(mq,true);
				System.out.println("consumer from the queue:"+mq+":"+offset);
				while(true){
					PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, 
							getMessageQueueOffset(mq), 32);
					putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
					switch(pullResult.getPullStatus()){
					case FOUND:
						List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                        for (MessageExt m : messageExtList) {
                            System.out.println(new String(m.getBody()));
                        }
						break;
					case NO_MATCHED_MSG:
						break;
					case NO_NEW_MSG:
						break;
					case OFFSET_ILLEGAL:
						break;
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		consumer.shutdown();
	}
 
	// 保存上次消费的消息下标
	private static void putMessageQueueOffset(MessageQueue mq,
			long nextBeginOffset) {
		OFFSE_TABLE.put(mq, nextBeginOffset);
	}
	
	// 获取上次消费的消息的下标
	private static Long getMessageQueueOffset(MessageQueue mq) {
		Long offset = OFFSE_TABLE.get(mq);
		if(offset != null){
			return offset;
		}
		return 0l;
	}
}

字段摘要

类型 字段名称 描述
DefaultMQPullConsumerImpl defaultMQPullConsumerImpl DefaultMQPullConsumer的内部核心处理默认实现
String consumerGroup 消费的唯一分组
long brokerSuspendMaxTimeMillis consumer取连接broker的最大延迟时间,不建议修改
long consumerTimeoutMillisWhenSuspend pull取连接的最大超时时间,必须大于brokerSuspendMaxTimeMillis,不建议修改
long consumerPullTimeoutMillis socket连接的最大超时时间,不建议修改
String messageModel 默认cluster模式
int messageQueueListener 消息queue监听器,用来获取topic的queue变化
int offsetStore RemoteBrokerOffsetStore 远程与本地offset存储器
int registerTopics 注册到该consumer的topic集合
int allocateMessageQueueStrategy consumer的默认获取queue的负载分配策略算法

构造方法摘要

方法名称 方法描述
DefaultMQPullConsumer() 由默认参数值创建一个Pull消费者
DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) 使用指定的分组名,hook创建一个消费者
DefaultMQPullConsumer(final String consumerGroup) 使用指定的分组名消费者
DefaultMQPullConsumer(RPCHook rpcHook) 使用指定的hook创建一个生产者

使用方法摘要

返回值 方法名称 方法描述
MQAdmin接口method ——- ————
void createTopic(String key, String newTopic, int queueNum) 在broker上创建指定的topic
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) 在broker上创建指定的topic
long earliestMsgStoreTime(MessageQueue mq) 查询最早的消息存储时间
long maxOffset(MessageQueue mq) 查询给定消息队列的最大offset
long minOffset(MessageQueue mq) 查询给定消息队列的最小offset
QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) 按关键字查询消息
long searchOffset(MessageQueue mq, long timestamp) 查找指定时间的消息队列的物理offset
MessageExt viewMessage(String offsetMsgId) 根据给定的msgId查询消息
MessageExt public MessageExt viewMessage(String topic, String msgId) 根据给定的msgId查询消息,并指定topic
MQConsumer接口method ——- ————
Set<MessageQueue> fetchSubscribeMessageQueues(String topic) 根据topic获取订阅的Queue
void sendMessageBack(final MessageExt msg, final int delayLevel) 如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) 如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL
MQPullConsumer接口method ——- ————
long fetchConsumeOffset(MessageQueue mq, boolean fromStore) 查询给定消息队列的最大offset
PullResult pull(final MessageQueue mq, final String subExpression, final long offset,final int maxNums) 异步拉取制定匹配的消息
PullResult pull(final MessageQueue mq, final String subExpression, final long offset,final int maxNums, final long timeout) 异步拉取制定匹配的消息
PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,final int maxNums) 异步拉取制定匹配的消息,通过MessageSelector器来过滤消息,参考org.apache.rocketmq.common.filter.ExpressionType
PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression,final long offset, final int maxNums) 异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis
void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset,final int maxNums, final PullCallback pullCallback) 异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis,通过回调pullCallback来消费
void updateConsumeOffset(final MessageQueue mq, final long offset) 更新指定mq的offset
long fetchMessageQueuesInBalance(String topic) 根据topic获取订阅的Queue(是balance分配后的)
void void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) 如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL,消息可能在同一个consumerGroup消费
void shutdown() 关闭当前消费者实例并释放相关资源
void start() 启动消费者

参考资料

https://github.com/apache/rocketmq/blob/master/docs/cn/client/java/API_Reference_DefaultMQProducer.md