Rocketmq 不同的topic要配不同的consumegroup
使用Rocketmq一定要注意,如果项目中要订阅两个topic,一定要保证consumeGroup是两个不同的。
这是因为,Consumer会定期发送心跳,默认是30s一次。心跳会像全部broker发送,心跳包内容包括groupname,topicname1。然后broker端会缓存这个信息,以groupname为key
代码在 ClientManagerProcessor # heartBeat
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {RemotingCommand response = RemotingCommand.createResponseCommand(null);HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion());for (ConsumerDatadata : heartbeatData.getConsumerDataSet()) {SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag = 0;if (data.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}String newTopic = MixAll.getRetryTopic(data.getGroupName());this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);}boolean changed = this.brokerController.getConsumerManager().registerConsumer(@1data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);if (changed) {log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}}for (ProducerData data : heartbeatData.getProducerDataSet()) {this.brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo);}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}
代码很长,就不一一分析了。重点是 心跳里带着数据
heartbeatData.getConsumerDataSet()。
public class ConsumerData {private String groupName;private ConsumeType consumeType;private MessageModel messageModel;private ConsumeFromWhere consumeFromWhere;private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();private boolean unitMode;
SubscriptionData 是关于topic的相关信息,里面最重要的就是topic
@1处的代码
boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null == consumerGroupInfo) {ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);consumerGroupInfo = prev != null ? prev : tmp;}boolean r1 =consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);boolean r2 = consumerGroupInfo.updateSubscription(subList);
public boolean updateSubscription(final Set<SubscriptionData> subList) {boolean updated = false;for (SubscriptionData sub : subList) {SubscriptionData old = this.subscriptionTable.get(sub.getTopic());if (old == null) {SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);if (null == prev) {updated = true;log.info("subscription changed, add new topic, group: {} {}",this.groupName,sub.toString());}} else if (sub.getSubVersion() > old.getSubVersion()) {if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {log.info("subscription changed, group: {} OLD: {} NEW: {}",this.groupName,old.toString(),sub.toString());}this.subscriptionTable.put(sub.getTopic(), sub);}}
从上面的代码看得出来,如果两个不同的topic,consmer客户端不小心配成了相同的consumeGroup,在broker端的缓存里,每次心跳就有可能覆盖之前的订阅信息。导致某一个consume消费不到自己要订阅的topic了
标签: RocketMq
好文要顶 关注我 收藏该文 微信分享
0
0
升级成为会员
« 上一篇: SpringBoot之使用外部的启动类
» 下一篇: Seata AT模式全局锁源码分析
» 下一篇: Seata AT模式全局锁源码分析
posted on 2021-08-13 16:24 MaXianZhe 阅读(2148) 评论(0) 编辑 收藏 举报
不改了 退出 订阅评论 我的博客
[Ctrl+Enter快捷键提交]
【推荐】园子周边第二季:更大的鼠标垫,没有logo的鼠标垫
【推荐】阿里云云市场联合博客园推出开发者商店,欢迎关注
【推荐】会员力量,点亮园子希望,期待您升级成为园子会员
· 压榨数据库的真实处理速度
· 深入剖析:如何使用 Pulsar 和 Arthas 高效排查消息队列延迟问题
· 「动画进阶」巧用 CSS/SVG 实现复杂线条光效动画
· 如何阅读 Paper
· 理解前端工程化
· 压榨数据库的真实处理速度
· 使用 Docker 部署 TaleBook 私人书籍管理系统
· .NET有哪些好用的定时任务调度框架
· C#.Net筑基-基础知识
· 车牌识别控制台 可快速整合二次开发