rocketmq源码解析之管理请求查询topic被哪些消费者消费
说在前面
管理请求 QUERY_TOPIC_CONSUME_BY_WHO 查询topic被哪些消费者消费
源码解析
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryTopicConsumeByWho查询topic被哪些消费者消费
private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryTopicConsumeByWhoRequestHeader requestHeader = (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class); // 根据topic查找消费者的消费组信息=》 HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic()); Set<String> groupInOffset = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(requestHeader.getTopic()); if (groupInOffset != null && !groupInOffset.isEmpty()) { groups.addAll(groupInOffset); } GroupList groupList = new GroupList(); groupList.setGroupList(groups); byte[] body = groupList.encode(); response.setBody(body); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
进入这个方法org.apache.rocketmq.broker.client.ConsumerManager#queryTopicConsumeByWho根据topic从消费组信息中查询消费组
public HashSet<String> queryTopicConsumeByWho(final String topic) { HashSet<String> groups = new HashSet<>(); // 遍历换粗中消费组信息 Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConsumerGroupInfo> entry = it.next(); // 获取组中的缓存订阅信息 ConcurrentMap<String, SubscriptionData> subscriptionTable = entry.getValue().getSubscriptionTable(); if (subscriptionTable.containsKey(topic)) { groups.add(entry.getKey()); } } return groups; }
往上返回到这个方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#whichGroupByTopic按topic从offset信息中查询消费组
public Set<String> whichGroupByTopic(final String topic) { Set<String> groups = new HashSet<String>(); // 遍历topic和组的offset信息 Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConcurrentMap<Integer, Long>> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { if (topic.equals(arrays[0])) { groups.add(arrays[1]); } } } return groups; }
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryTopicConsumeByWho结束
说在最后
本次解析仅代表个人观点,仅供参考。
加入技术微信群
钉钉技术群
转载于:https://my.oschina.net/u/3775437/blog/3094961