Rocketmq 不同的topic要配不同的consumegroup

news/2024/10/1 7:30:27

Rocketmq 不同的topic要配不同的consumegroup

  使用Rocketmq一定要注意,如果项目中要订阅两个topic,一定要保证consumeGroup是两个不同的。

  这是因为,Consumer会定期发送心跳,默认是30s一次。心跳会像全部broker发送,心跳包内容包括groupname,topicname1。然后broker端会缓存这个信息,以groupname为key

  代码在 ClientManagerProcessor # heartBeat

复制代码
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {RemotingCommand response = RemotingCommand.createResponseCommand(null);HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion());for (ConsumerDatadata : heartbeatData.getConsumerDataSet()) {SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag = 0;if (data.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}String newTopic = MixAll.getRetryTopic(data.getGroupName());this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);}boolean changed = this.brokerController.getConsumerManager().registerConsumer(@1data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);if (changed) {log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}}for (ProducerData data : heartbeatData.getProducerDataSet()) {this.brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo);}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}
复制代码

  代码很长,就不一一分析了。重点是 心跳里带着数据 

  heartbeatData.getConsumerDataSet()。

复制代码
public class ConsumerData {private String groupName;private ConsumeType consumeType;private MessageModel messageModel;private ConsumeFromWhere consumeFromWhere;private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();private boolean unitMode;
复制代码

  SubscriptionData 是关于topic的相关信息,里面最重要的就是topic

  @1处的代码

复制代码
boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);
复制代码
复制代码
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null == consumerGroupInfo) {ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);consumerGroupInfo = prev != null ? prev : tmp;}boolean r1 =consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);boolean r2 = consumerGroupInfo.updateSubscription(subList);
复制代码
复制代码
public boolean updateSubscription(final Set<SubscriptionData> subList) {boolean updated = false;for (SubscriptionData sub : subList) {SubscriptionData old = this.subscriptionTable.get(sub.getTopic());if (old == null) {SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);if (null == prev) {updated = true;log.info("subscription changed, add new topic, group: {} {}",this.groupName,sub.toString());}} else if (sub.getSubVersion() > old.getSubVersion()) {if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {log.info("subscription changed, group: {} OLD: {} NEW: {}",this.groupName,old.toString(),sub.toString());}this.subscriptionTable.put(sub.getTopic(), sub);}}
复制代码

  从上面的代码看得出来,如果两个不同的topic,consmer客户端不小心配成了相同的consumeGroup,在broker端的缓存里,每次心跳就有可能覆盖之前的订阅信息。导致某一个consume消费不到自己要订阅的topic了

 

  

 
标签: RocketMq
好文要顶 关注我 收藏该文 微信分享
MaXianZhe
粉丝 - 5关注 - 5
 
 
0
0
 
 
升级成为会员
 
« 上一篇: SpringBoot之使用外部的启动类
» 下一篇: Seata AT模式全局锁源码分析

posted on 2021-08-13 16:24  MaXianZhe  阅读(2148)  评论(0)  编辑  收藏  举报

 
 
刷新评论刷新页面返回顶部
升级成为园子VIP会员
编辑预览
 
88ee990f-bb7c-489a-46fb-08d6d3fea897

退出 订阅评论 我的博客

 

[Ctrl+Enter快捷键提交]

 
【推荐】融资做与众不同的众包平台,让开发能力成为一种服务
【推荐】园子周边第二季:更大的鼠标垫,没有logo的鼠标垫
【推荐】阿里云云市场联合博客园推出开发者商店,欢迎关注
【推荐】会员力量,点亮园子希望,期待您升级成为园子会员
 
编辑推荐:
· 压榨数据库的真实处理速度
· 深入剖析:如何使用 Pulsar 和 Arthas 高效排查消息队列延迟问题
· 「动画进阶」巧用 CSS/SVG 实现复杂线条光效动画
· 如何阅读 Paper
· 理解前端工程化
阅读排行:
· 压榨数据库的真实处理速度
· 使用 Docker 部署 TaleBook 私人书籍管理系统
· .NET有哪些好用的定时任务调度框架
· C#.Net筑基-基础知识
· 车牌识别控制台 可快速整合二次开发
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/29660.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

Fastjson反序列化漏洞

与原生的 Java 反序列化不同,FastJson 反序列化并未使用 readObject 方法,而是自定义了反序列化的过程Fastjson简介 Fastjson是一个Java库,可以实现json和对象之间的转换。 将数据与对象进行转化,这个操作涉及到了反序列化。 与原生的 Java 反序列化不同,FastJson 反序列化…

mysql事务

1. 事务事务是一组操作的集合,它是一个不可分割的工作单位,事务会把所有的操作作为一个整体一起向系统提交或撤销操作请求,即这些操作要么同时成功,要么同时失败。 2. 控制事务控制事务一查看/设置事务提交方式SELECT @@autocommit ;SET @@autocommit = 0 ;提交事务COMMIT…

2022年windows的Visual Studio常用插件及使用手册

前景提要Viusual Studio 是一款很好用的C/C++集成开发工具,具有强大的扩展功能,好用的插件,但是,很多人都是只写了有什么插件,但是,没写怎么使用这种插件,使得使用的时候很是不方便,所以,笔者最近本着自己的学习,在这里写下自己关于好用的插件的研究,希望对您的学习/工作有帮助…

JDK源码阅读-------自学笔记(二十六)(java.util.Map 自定义讲解)

一、简介Map就是用来存储“键(key)-值(value)”对的. 通过键寻找value,所以键不能重复. 数组的本质也是一种键值对,区别就是索引一般是数字,而Map的Key可以是任意对象(字符串,数字),相当于把数组的索引范围扩的更大,使用更方便. 实际开发中较为常用.二、Map的常用方法实例(1)存…

Oracle修改字段长度及属性

首发微信公众号:SQL数据库运维 原文链接:https://mp.weixin.qq.com/s?__biz=MzI1NTQyNzg3MQ==&mid=2247486117&idx=1&sn=02e2cd05e5db7eaa5758c70e81cf3972&chksm=ea375ed5dd40d7c367727562bdb00788f3bd139cbbda377f599586a47ce13ad9d04c56fd4d2d&token…

C#/.NET/.NET Core优秀项目和框架2024年4月简报

前言 公众号每月定期推广和分享的C#/.NET/.NET Core优秀项目和框架(每周至少会推荐两个优秀的项目和框架当然节假日除外),公众号推文中有项目和框架的介绍、功能特点、使用方式以及部分功能截图等(打不开或者打开GitHub很慢的同学可以优先查看公众号推文,文末一定会附带项…

localhost 重定向次数过多

在完成javaweb作业时出现了错误初始页面只有两个功能, 但是无论是点击登录还是注册,都会跳转到login.jsp页面从网上找到的答案是代码陷入死循环,因为总是跳转到login.jsp, 所以我查看了所有servlet类中跳转到login.jsp页面的代码,逻辑上并没有问题;然后我又查看了过滤器以…

Windows平台使用CMake+MinGW64编译OpenCV

Windows平台使用CMake+MinGW64编译OpenCV (注:2年前写的笔记, 可能有些地方过时了) 目录Windows平台使用CMake+MinGW64编译OpenCV1.安装及配置环境1.1 MinGW-w641.2 CMake1.3 OpenCV源码2.CMake配置及生成2.1 新建目录2.2 CMake-GUI2.3 编译配置2.4 生成2.5 Make编译和安装3.配…