DefaultPullConsumer
类简介
-
DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer
-
DefaultMQPullConsumer
主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,也可以自定义与控制offset位置。 -
优势:consumer可以按需消费,不用担心自己处理能力,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适消息延迟与忙等。
-
缺点:由于主动权在消费方,消费方无法及时获取最新的消息。
比较适合不及时批处理场景。
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
public class MQPullConsumer {
private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
// 从指定topic中拉取所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
for(MessageQueue mq:mqs){
try {
// 获取消息的offset,指定从store中获取
long offset = consumer.fetchConsumeOffset(mq,true);
System.out.println("consumer from the queue:"+mq+":"+offset);
while(true){
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null,
getMessageQueueOffset(mq), 32);
putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
switch(pullResult.getPullStatus()){
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.shutdown();
}
// 保存上次消费的消息下标
private static void putMessageQueueOffset(MessageQueue mq,
long nextBeginOffset) {
OFFSE_TABLE.put(mq, nextBeginOffset);
}
// 获取上次消费的消息的下标
private static Long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if(offset != null){
return offset;
}
return 0l;
}
}
字段摘要
类型 | 字段名称 | 描述 |
---|---|---|
DefaultMQPullConsumerImpl | defaultMQPullConsumerImpl | DefaultMQPullConsumer的内部核心处理默认实现 |
String | consumerGroup | 消费的唯一分组 |
long | brokerSuspendMaxTimeMillis | consumer取连接broker的最大延迟时间,不建议修改 |
long | consumerTimeoutMillisWhenSuspend | pull取连接的最大超时时间,必须大于brokerSuspendMaxTimeMillis,不建议修改 |
long | consumerPullTimeoutMillis | socket连接的最大超时时间,不建议修改 |
String | messageModel | 默认cluster模式 |
int | messageQueueListener | 消息queue监听器,用来获取topic的queue变化 |
int | offsetStore | RemoteBrokerOffsetStore 远程与本地offset存储器 |
int | registerTopics | 注册到该consumer的topic集合 |
int | allocateMessageQueueStrategy | consumer的默认获取queue的负载分配策略算法 |
构造方法摘要
方法名称 | 方法描述 |
---|---|
DefaultMQPullConsumer() | 由默认参数值创建一个Pull消费者 |
DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) | 使用指定的分组名,hook创建一个消费者 |
DefaultMQPullConsumer(final String consumerGroup) | 使用指定的分组名消费者 |
DefaultMQPullConsumer(RPCHook rpcHook) | 使用指定的hook创建一个生产者 |
使用方法摘要
返回值 | 方法名称 | 方法描述 |
---|---|---|
MQAdmin接口method | ——- | ———— |
void | createTopic(String key, String newTopic, int queueNum) | 在broker上创建指定的topic |
void | createTopic(String key, String newTopic, int queueNum, int topicSysFlag) | 在broker上创建指定的topic |
long | earliestMsgStoreTime(MessageQueue mq) | 查询最早的消息存储时间 |
long | maxOffset(MessageQueue mq) | 查询给定消息队列的最大offset |
long | minOffset(MessageQueue mq) | 查询给定消息队列的最小offset |
QueryResult | queryMessage(String topic, String key, int maxNum, long begin, long end) | 按关键字查询消息 |
long | searchOffset(MessageQueue mq, long timestamp) | 查找指定时间的消息队列的物理offset |
MessageExt | viewMessage(String offsetMsgId) | 根据给定的msgId查询消息 |
MessageExt | public MessageExt viewMessage(String topic, String msgId) | 根据给定的msgId查询消息,并指定topic |
MQConsumer接口method | ——- | ———— |
Set<MessageQueue> |
fetchSubscribeMessageQueues(String topic) | 根据topic获取订阅的Queue |
void | sendMessageBack(final MessageExt msg, final int delayLevel) | 如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL |
void | sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) | 如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL |
MQPullConsumer接口method | ——- | ———— |
long | fetchConsumeOffset(MessageQueue mq, boolean fromStore) | 查询给定消息队列的最大offset |
PullResult | pull(final MessageQueue mq, final String subExpression, final long offset,final int maxNums) | 异步拉取制定匹配的消息 |
PullResult | pull(final MessageQueue mq, final String subExpression, final long offset,final int maxNums, final long timeout) | 异步拉取制定匹配的消息 |
PullResult | pull(final MessageQueue mq, final MessageSelector selector, final long offset,final int maxNums) | 异步拉取制定匹配的消息,通过MessageSelector器来过滤消息,参考org.apache.rocketmq.common.filter.ExpressionType |
PullResult | pullBlockIfNotFound(final MessageQueue mq, final String subExpression,final long offset, final int maxNums) | 异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis |
void | pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset,final int maxNums, final PullCallback pullCallback) | 异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis,通过回调pullCallback来消费 |
void | updateConsumeOffset(final MessageQueue mq, final long offset) | 更新指定mq的offset |
long | fetchMessageQueuesInBalance(String topic) | 根据topic获取订阅的Queue(是balance分配后的) |
void | void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) | 如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL,消息可能在同一个consumerGroup消费 |
void | shutdown() | 关闭当前消费者实例并释放相关资源 |
void | start() | 启动消费者 |
参考资料
https://github.com/apache/rocketmq/blob/master/docs/cn/client/java/API_Reference_DefaultMQProducer.md