RocketMQ模型和生产实践

news/2024/10/4 3:20:22
RocketMQ的客户端编程模型相对⽐较固定,基本都有⼀个固定的步骤。掌握这个固定步骤,对于学习其他复杂的消息模型也是很有帮助的。
消息⽣产者的固定步骤
1.创建消息⽣产者producer,并指定⽣产者组名
2.指定Nameserver地址,可以在代码中固定写IP,也可以通过配置项来写,最好是配置项,这样更灵活。
3.启动producer。 这个步骤⽐较容易忘记。可以认为这是消息⽣产者与服务端建⽴连接的过程。和broker多次请求交互,要进行多次通讯,而不是就进行一次通讯拿到broker地址就断开了连接。这是长连接。保存启动状态,可以和服务端进行多次通信的。
4.创建消息对象,指定主题Topic、Tag和消息体。tag可以进行分类。
5.发送消息。三种方式发送。sendonway(msg),无返回值,单向发送一次,效率最高;但是可能没有发送成功,但是生产者不知道,性能高,但是不安全。
6.关闭⽣产者producer,释放资源。
 
 
消费者组,默认每个消费者组的逻辑都是一样的,如果一个消费者组有多个消费者,那么从broker的messageQueue中推过来的消息只会被一个消费者来消费。
推模式中,消费者必须是要一直启动的,否则broker就无法感知到消费者,就无法保证消息的实时性。只有一直启动,才能进行推消息。broker也会记录消费者的消费进度。
以消费者组为单位进行分配记录。

代理者位点:表示消息存储到多少号了,消费者位点:表示消费者消费到哪条消息了,消费到哪个点了。差值就是剩余还没有消费的。如果差值越来越大,就可能会有丢消息的可能。

注意:上面三个值都是broker来维护的。

消费者一定要给broker返回一个值,让broker去决定这条消息是否还要继续进行发送。

ConsumeConcurrentlyStatus.CONSUME_SUCCESS;如果是successes,那么就是发送成功,无需再次发送。RECONSUME_LATER,证明是消费失败,需要再次重新发送。如果不返回,或者抛出异常,也会认为是失败,有重试机制。通过创建队列进行重试,重试次数超过了限度,那么就会进入死信队列。
Consumer的进度是以消费者组为概念进行记录的,与具体的每个消费者无关。不关注是哪个终端节点消费的。与topic也无关。订阅了多个topic,可能都进入这一个重试队列。
消费者组,是处理业务逻辑相同的生产者组成的。死信消息默认的权限是2,禁读的,需要手动处理或者是修改其权限为4或者6,然后才能进行消费。
 
集群模式下,broker的messageQueue的消息只会被消费者组中的一个消费者消费,不会每个消费者都消费一次。这里面offset的进度是由broker来维护的。服务端的offset是不能丢失的,要知道下一条消息推送的位置,从哪里开始继续推送。
广播模式下,每个消费者都能消费到。这里面也是有offset的,这个offset的值是由消费者本身来维护的。记录我消费到哪一条,如果没有消费成功,就重试,接收到消息后进行重试。
 
这种就是广播模式下的消费者位点,值都是0,offset由消费者本身去控制。没有了消费者组的概念了。本地的offset可能会出现丢失,broker是不知道这些信息的。广播模式下的offset是可以丢失的。
所以广播消息是不能保证消息不丢失的,有很大丢失消息的可能。这个一定要多注意。

 

顺序消息机制

RocketMQ的消费者接收消息的顺序可能会和生产者发送消息的顺序不一致,但是其保证的是局部的一组消息的一致性;保证的是局部的有序性,大部分情况下,我们说的保证有序性是保证的局部有序性。

有序,就是生产者要保证发送到同一个队列messageQueue中,可以通过取余等机制,保证相同的一组信息发送到同一个queue中;

又因为queue就能自己保证有序性,就能保证消息的顺序传递,先进先出。

然后还有一点就是消费者方也要进行处理,保证顺序消费;使用不同的监听器,使用MessageListenerOrderly这个监听器。又因为为了保证吞吐量,拉取消息的时候是多个线程去并发拉取?消费者拉取消息的时候是每次拉32条,从哪里拉的不确定,随意,这样就可能会导致无序。多线程情况下就无法保证有序性了。

为了保证有序性,先锁定一个队列,将这个队列的消息消费完毕之后,才会去切换到别的队列进行处理。这个是不同的模型。

如果失败了,就阻塞当前队列一会儿,为了保证顺序性,就会阻塞整个队列,等到处理成功了才会解除这个阻塞。顺序消息对吞吐量有影响的。

如果是并发处理模型,中间一个处理失败了,可能会去后面继续消费,只要保证本次拿到32条消息能够消费就可以了。

一般都是保证局部有序就行了。要全局有序的话,所有的消息都要放到同一个messageQueue,这样就没有分布式的优势了。

顺序消费,消费者拉取的时候锁定一个队列,也是可以使用多线程线程池去并发拉取这个队列中的消息,但是需要消费者本身进行控制,保证有序性。

 

延迟消息:

msg.setDelayTimeLevel(3);// 第3等级。
类似一个定时任务。需要重点设计的东西。延迟的时间,RocketMQ设置了一些默认级别,没有必要记录,一共18个级别,messageDelayLevel。可以在broker.conf中进行修改,但是不建议修改。
4.x版本,只有这些延迟机制。没有办法指定时间点去发送。七天确认收货就不行。
5.x版本可以指定时间戳,到毫秒级进行消息推送。这个版本,开源版和商业版没有太大区别了。
是系统中的SCHEDULE_TOPIC_XXXX这个topic下的18个不同messageQueue。

 

批量消息

可以减少IO的消耗,放到一个list中;减少了IO次数,如果网络异常,那么消息就丢失了;有限制,大小不超过1M,这个限制不是强制性的。消息进行分割,分成几个批次,进行请求。

批量发,减少IO,然后要考虑安全性,每次消息大小不能太大,可以对消息进行压缩成二进制,消费者再解压等等。RocketMQ已经进行了一些压缩。

批量发送就是尽量减少请求次数,可以使用spilt进行拆分一下。

针对边缘的优化。这才是RocketMQ的精髓。

 

过滤消息

消费者组中多个消费者进行协作,一般情况下是不可控的,为了保证可控;就可以用到消息过滤机制。发送消息的时候生产者可以指定tag,消费者同样也订阅这个tag。subscribe这个方法订阅指定的tag,多个tag使用 || 隔开。订阅的时候订阅指定的tag,过滤是在哪里?一个是broker端,这样发送到消费者的就少了;还有就是在消费者端进行过滤,全部接收,但是进行过滤的消费。

broker端处理:网络压力小了,但是其本身的压力就大了。这里面是要有取舍的。

RocketMQ优先保证的是网络的性能,也就是在broker端实现的。不建议定义很复杂的订阅条件。

如果是更复杂的过滤条件,生产者发送的时候还是正常发送带tag的消息;然后在消费者端定义MessageSelector.bySql。

默认情况下,服务端broker不支持bySql的方式。在集群中有一个配置可以进行配置,enablePropertyFilter,通过这个进行配置。默认值是false。修改为true。修改之后broker才支持bySql。为了防止broker端更繁忙。

典型的流式处理方式:先拿一个应用来过滤,然后通过topic来发送,保证每个messageQueue使用同一个方式去消费;这样的方式要明显高于broker端进行sql过滤。

 

事务消息:

重点,非常重点。

写本地事务,和发消息如果不控制,就会出现不一致;提供一个机制,保证这两个是同步的。完整的业务场景下,上游客户下单,下游商家发货,这才是完整的事务。但是在分布式情况下,这个链路太长了。RocketMQ进行了折中处理,只保证前面一半保证原子性。后面从broker到消费者的这个过程,事务可以由消费者来控制,自己确认,同步进行返回消息消费状态。

这并不是标准的分布式解决方案。是半事务。后面的一半由消费者自己去实现。

RocketMQ的事务消息和消费者无关,只是和生产者和broker相关。producer.setTransactionListener().

所谓半消息half消息,是生产者发送这个半消息,然后broker将其挪到内部的一个topic中,该topic是RMQ_SYS_TRANS_HALF_TOPIC。如果确认成功,broker将这个消息从这个topic中挪出来,发到消费者订阅的topic中。

返回unknown状态,broker就要进行回查;多久回查一次,回查多少次这些是要配置的。本地事务回查次数通过参数transactionCheckMax设定,默认15次。本地事务回查的间隔通过参数transactionCheckInterval设定,默认60秒。超过回查次数后,消息将会被丢弃。

定时任务,例如买票订单确认,15分钟未支付就取消订单,支付了就分配座位。如果使用定时任务,15分钟后去执行,那么如果是15分钟内支付了,但是不能马上去分配位置,必须要等定时任务启动才行,这样当然是不行的。也不符合业务逻辑。这个场景是broker级别配置的,配置完成之后这个broker中的所有message Queue都会生效。

messageQueue一般建议配置是broker数量的两倍。broker中一个topic的messageQueue数量默认是8个,也可以自动进行确认。

RocketMQ如果启动了长时间,那么从控制台可能会看到消息的最小位点就不是0了,因为日志文件默认是1G,超过之后就写一个新的文件;历史文件默认保存120个小时,凌晨4点会删除过期的文件,删除之后历史的消息就找不到了。清空了。

消费模型:消费者组中的消费者可以消费一个broker中的一个messageQueue,也可以消费多个messageQueue,也可以跨broker消费messageQueue;但是broker中的messageQueue只会被消费者组中的一个消费者去消费,不会出现一个messageQueue被一个消费者组中的多个消费者消费。

 

权限控制

非核心功能,一般来说MQ是内部使用的,根本就无序进行权限控制,但是RocketMQ提供了权限控制的功能。perm,6可读可写;4是可读不可写;2禁写禁读。太粗粒度了。

 

幂等

对消费者来说,不保证只推送一次,保证最少一次;可以向同一个消费者推送多次,或者可以向不同的消费者推送。

RocketMQ提供messageId,这个不靠谱,事务消息,或者多次转发的过程中,这个可能会变;建议自己去实现这个消息id。

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/29037.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

Spring SpringMVC——前端控制器初始化过程

创建完DispatcherServlet对象时,会执行类中的init方法如果不配置 load-on-startup,那么 DispatcherServlet 将在第一次收到请求时才会被实例化和初始化。这意味着 DispatcherServlet 不会在服务器启动时立即执行创建和初始化的操作。当第一个请求到达时,Servlet 容器(如 To…

【VMware vSphere】存储提供程序中I/O 筛选器状态显示为脱机以及证书已到期的解决办法。

存储提供程序是由 VMware 提供或由第三方通过 vSphere APIs for Storage Awareness (VASA) 开发的软件组件。存储提供程序也可以称为 VASA 提供程序。存储提供程序可与包含外部物理存储和存储抽象的各种存储实体(例如 vSAN 和 Virtual Volumes)集成。存储提供程序也可以支持软…

服务器分层拓扑架构图形化显示工具

目录服务器分层拓扑架构图形化显示工具 --- HWLOC下载依赖包安装源码编译安装执行命令示例显示 PCI 层次结构参考文档服务器分层拓扑架构图形化显示工具 --- HWLOC可移植硬件局部 (hwloc) 软件包提供了现代架构分层拓扑的可移植抽象(跨操作系统、版本、体系结构等),包括 N…

流畅的python学习笔记

示例1-1 一摞有序的纸牌知识点:collections.namedtuple 构建了一个简单的类,表示单张纸牌。from collections import namedtuple Card = namedtuple(Card, [rank, suit])class FrenchDeck:ranks = [str(n) for n in range(2,11)] + list(JQKA)suits = spades diamonds clubs …

+63+条消息++狂神+docker+学习笔记_GaleTeng+的博客+-+CSDN+博客

+63+条消息++狂神+docker+学习笔记_GaleTeng+的博客+-+CSDN+博客 文章目录前言Docker 概述1.Docker 为什么会出现?2.Docker 历史3.Docker 能干嘛Docker 安装1. Docker 的基本组成2. 安装 Docker3. 阿里云镜像加速4. 回顾 HelloWorld 流程5. 底层原理Docker 常用命令1. 帮助命令…

GeometryCollection 的类型映射器(TypeHandler)

GeometryCollection 是 GeoJSON 数据模型中的一个类型,用于表示一个几何对象的集合。MySQL8 中支持了 GeometryCollection 类型,在对数据库和实体类进行对象映射时需要我们自己编写类型映射器来完成映射。java 本身不支持 GeometryCollection 类型,我们需要引入第三方包来获…

把.nuget文件夹从C盘移到其它盘

C盘是系统盘,考虑到很多程序都会占用系统盘资源,所以500G的固态硬盘究竟,一开始C盘就划了300G的大小。但即便这样,不知不觉中,C盘的空间也快不够用了。 分析了一下C盘的空间占用情况,发现.nuget文件夹大概有40多G的大小。这个不能忍,直接上网搜了一下移到其它盘的方法。…

磁盘恢复,照片误删 解决策略-photorec

# 下载软件TestDisk Download - CGSecurity # 解压文件夹 testdisk-7.2 # 打开qphotorec_win 界面如下 # 筛选要恢复的文件格式 ## 恢复图片就选中jpg,png,jpeg 先点击一个词条,键盘按j,就可以定位到j开头的格式位置