Redis高级(消息队列)

news/2024/10/15 19:44:07

 消息队列(Message Queue)

1.什么是消息

两台设备(例如服务与服务)之间传递的数据就可以称之为消息

2.什么是消息队列

消息队列是在消息的传输过程中保存消息的容器。

为什么使用消息队列

  • 异步

    • 例如发送验证码

  • 解耦

    • 例如服务与服务之间的调用

  • 削峰限流

一个消息队列的基本组成 

  • 生产者producer

    • 根据主题发送消息

  • 消费者consumer

    • 根据主题接收消息

  • 代理者broker

    • 存储消息,自动分发消息

  • 主题topic

    • 区分消息

  • 消费者监听Listener

    • 监听消息的到来

图示

编辑 Springboot整合redis订阅发布基本流程

  • 1.定义Topic(主题)/Channel(频道)

    • 使用接口定义Topic常量

  • 2.创建订阅者类,接收发布者发布的消息

    • 实现MessageListener,重写onMessage方法

  • 3.添加订阅者类对象到容器中

    • 注入redis连接工厂RedisConnectionFactory

    • 获取redis连接getConnection

    • 调用订阅方法

      • subscribe

  • 4.测试发布订阅

    • convertAndSend 

 基本使用

1.在service模块下新建listener包

创建

package com.zzyl.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RedisQueueListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {String s = new String(message.getBody());String topic = new String(message.getChannel());log.info("监听到消息{},来自{}",s,topic);}
}

 2.创建topic

package com.zzyl.constant;public interface RedisQueueTopic {String TEST_TOPIC = "test_topic";String TEST_TOPIC2 = "test_topic2";}

 3.创建配置类

package com.zzyl.config;import com.zzyl.constant.RedisQueueTopic;
import com.zzyl.listener.RedisQueueListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;import javax.annotation.Resource;@Configuration
public class RedisQueueConfig {@Resourceprivate RedisQueueListener redisQueueListener;@Beanpublic RedisConnectionFactory redisQueueConnectionFactory(RedisConnectionFactory redisConnectionFactory){RedisConnection connection = redisConnectionFactory.getConnection();connection.subscribe(redisQueueListener, RedisQueueTopic.TEST_TOPIC.getBytes(),RedisQueueTopic.TEST_TOPIC2.getBytes());return redisConnectionFactory;}
}

 4.测试使用

package com.zzyl;import com.zzyl.constant.RedisQueueTopic;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;import javax.annotation.Resource;@SpringBootTest
public class TestRedisQueue {@Resourceprivate RedisTemplate<String,Object> redisTemplate;@Testpublic void test(){redisTemplate.convertAndSend(RedisQueueTopic.TEST_TOPIC,"hello");}@Testpublic void tes1t(){redisTemplate.convertAndSend(RedisQueueTopic.TEST_TOPIC2,"hello2");}
}

 5.输出结果

2024-09-26 10:38:33.724 [lettuce-nioEventLoop-4-2] INFO  com.zzyl.listener.RedisQueueListener-监听到消息"hello2",来自test_topic2

接收Iot平台消息服务 

编辑

 1.导入jar包

<!-- amqp 1.0 qpid client -->
<dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.57.0</version>
</dependency>
<!-- util for base64-->
<dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version>
</dependency>

 2.新建服务

编辑

 3.新服务jar包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.lu</groupId><artifactId>zzyl-transfer-server</artifactId><version>0.0.1-SNAPSHOT</version><name>zzyl-transfer-server</name><description>zzyl-transfer-server</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--阿里云iot平台对接服务端订阅依赖--><!-- amqp 1.0 qpid client --><dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.57.0</version></dependency><!-- util for base64--><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

4.redis配置类 

 主要用于连接redis操作redis

package com.lu.zzyltransferserver.config;import com.alibaba.fastjson.support.spring.GenericFastJsonRedisSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;/*** @author L*/
@Configuration
public class RedisConfig {@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port}")private int port;@Value("${spring.redis.password}")private String password;@Value("${spring.redis.database}")private int database;@Value("${spring.redis.username}")private String username;@Beanpublic LettuceConnectionFactory redisConnectionFactory(){//配置redis数据库连接池信息RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();configuration.setHostName(host);configuration.setPort(port);configuration.setDatabase(database);configuration.setPassword(password);configuration.setUsername(username);LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(configuration);return lettuceConnectionFactory;}@Beanpublic RedisTemplate<?,?> redisTemplate(){//创建redis操作Bean实例对象RedisTemplate<?,?> stringRedisTemplate = new StringRedisTemplate();stringRedisTemplate.setConnectionFactory(redisConnectionFactory());return stringRedisTemplate ;}//中转服务使用@Scheduled注解,写定时任务//每五分钟往redis队列当中生产一条信息,上报老人健康手表设备信息
}

 5.配置主题常量类

RedisQueueTopic接口:用于定义Redis队列的主题名称,它是一个常量接口。将主题名称定义为接口中的静态常量,方便在项目中引用。

package com.lu.zzyltransferserver.constants;/*** @author L*/
public interface RedisQueueTopic {String TEST_TOPIC = "test_topic";String TEST_TOPIC2 = "test_topic2";
}

 6.监听转发

监听阿里云消息,转发消息

package com.lu.zzyltransferserver.listener;import com.lu.zzyltransferserver.constants.RedisQueueTopic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @author L*/
@Slf4j
@Component
public class IotListener implements MessageListener {@Resourceprivate RedisTemplate<String,Object> redisTemplate;//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue(50000));@Overridepublic void onMessage(Message message) {try {//1.收到消息之后一定要ACK。// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(new Runnable() {@Overridepublic void run() {processMessage(message);}});} catch (Exception e) {log.error("submit task occurs exception ", e);}}/*** 在这里处理您收到消息后的具体业务逻辑。*/private void processMessage(Message message) {try {byte[] body = message.getBody(byte[].class);String content = new String(body);String topic = message.getStringProperty("topic");String messageId = message.getStringProperty("messageId");log.info("接收消息"+ ",\n 来自于 = " + topic+ ",\n 消息Id = " + messageId+ ",\n content = " + content);//作为iot消费者获取iot平台订阅的消息//作为redis生产者将消息发送到redis的队列中redisTemplate.convertAndSend(RedisQueueTopic.TEST_TOPIC,content);} catch (Exception e) {log.error("processMessage occurs error ", e);}}
}

 7.与阿里云 IoT 平台进行通信的客户端

这个代码片段实现了基于 AMQP 协议与阿里云 IoT 平台进行通信的客户端,并且它在 Spring Boot 启动时自动运行,用于与 IoT 平台进行消息的消费。消息消费完成后,会通过 Spring 的 IotListener 来处理消息,接下来通过 Redis 发布订阅将消息传递到其他服务。

package com.lu.zzyltransferserver.runner;import com.lu.zzyltransferserver.listener.IotListener;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;@Component
public class AmqpClientRunner implements CommandLineRunner {private final static Logger logger = LoggerFactory.getLogger(AmqpClientRunner.class);/*** 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考*/@Value(value = "${zzyl.aliyun.accessKeyId}")String accessKey;@Value(value = "${zzyl.aliyun.accessKeySecret}")String accessSecret;@Value(value = "${zzyl.aliyun.consumerGroupId}")String consumerGroupId;//iotInstanceId:实例ID。若是2021年07月30日之前(不含当日)开通的公共实例,请填空字符串。@Value(value = "${zzyl.aliyun.iotInstanceId}")String iotInstanceId;@ResourceIotListener iotListener;//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。private static String clientId;static {try {clientId = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {e.printStackTrace();}}//${YourHost}为接入域名,请参见AMQP客户端接入说明文档。private static String host = "iot-06z00ix0kvl4g9z.amqp.iothub.aliyuncs.com";// 指定单个进程启动的连接数// 单个连接消费速率有限,请参考使用限制,最大64个连接// 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接private static int connectionCount = 4;private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {/*** 连接成功建立。*/@Overridepublic void onConnectionEstablished(URI remoteURI) {logger.info("连接成功建立, 远程uri:{}", remoteURI);}/*** 尝试过最大重试次数之后,最终连接失败。*/@Overridepublic void onConnectionFailure(Throwable error) {logger.error("连接失败: {}", error.getMessage());}/*** 连接中断。*/@Overridepublic void onConnectionInterrupted(URI remoteURI) {logger.info("连接中断, 远程uri:{}", remoteURI);}/*** 连接中断后又自动重连上。*/@Overridepublic void onConnectionRestored(URI remoteURI) {logger.info("连接中断后又自动重连上, 远程uri:{}", remoteURI);}@Overridepublic void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Overridepublic void onSessionClosed(Session session, Throwable cause) {}@Overridepublic void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Overridepublic void onProducerClosed(MessageProducer producer, Throwable cause) {}};/*** 计算签名,password组装方法,请参见AMQP客户端接入说明文档。*/private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);Mac mac = Mac.getInstance(signMethod);mac.init(signingKey);byte[] rawHmac = mac.doFinal(toSignString.getBytes());return Base64.encodeBase64String(rawHmac);}@Overridepublic void run(String... args) throws Exception {List<Connection> connections = new ArrayList<>();//参数说明,请参见AMQP客户端接入说明文档。for (int i = 0; i < connectionCount; i++) {long timeStamp = System.currentTimeMillis();//签名方法:支持hmacmd5、hmacsha1和hmacsha256。String signMethod = "hmacsha1";//userName组装方法,请参见AMQP客户端接入说明文档。String userName = clientId + "-" + i + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + accessKey+ ",iotInstanceId=" + iotInstanceId+ ",consumerGroupId=" + consumerGroupId+ "|";//计算签名,password组装方法,请参见AMQP客户端接入说明文档。String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;String password = doSign(signContent, accessSecret, signMethod);String connectionUrl = "failover:(amqps://" + host + ":5671?amqp.idleTimeout=80000)"+ "?failover.reconnectDelay=30";Hashtable<String, String> hashtable = new Hashtable<>();hashtable.put("connectionfactory.SBCF", connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");Destination queue = (Destination) context.lookup("QUEUE");// 创建连接。Connection connection = cf.createConnection(userName, password);connections.add(connection);((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);// 创建会话。// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// 创建Receiver连接。MessageConsumer consumer = session.createConsumer(queue);//添加iot平台服务端消费者监听consumer.setMessageListener(iotListener);}logger.info("连接成功!");// 结束程序运行Thread.sleep(6000 * 1000);logger.info("run shutdown");connections.forEach(c -> {try {c.close();} catch (JMSException e) {logger.error("failed to close connection", e);}});}
}

 8.配置yml

spring.application.name=zzyl-transfer-server
spring.profiles.active=devserver.port=9996
zzyl:aliyun:accessKeyId: LTAI5tMA5wSWRmeuyYPei9VkaccessKeySecret: GSNxohmhEcFa4g7p61RRTzRdABMzcHconsumerGroupId: DEFAULT_GROUPregionId: cn-shanghaiiotInstanceId: iot-06z00cxspzjxue9host: iot-06z00cxspzjxue9.amqp.iothub.aliyuncs.com
spring:redis:host: 192.168.200.146port: 6379password: kK6/fG8&pN7< #指定密码database: 1 #指定使用数据库username: default # 用户名

 9.业务服务消息监听

需要要导入redis包

package com.zzyl.listener;import com.alibaba.fastjson.JSONObject;
import com.zzyl.dto.ContentDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RedisQueueListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {String s = new String(message.getBody());String topic = new String(message.getChannel());log.info("监听到消息{},来自{}",s,topic);ContentDTO contentDTO = JSONObject.parseObject(s, ContentDTO.class);log.info("设备json字符串转对象:{}",contentDTO);}
}

 10.测试

测试流程

1.设备上报信息

这里没有设备用MQTTX模拟设备

编辑

2.中间件服务收到上报消息 

编辑 3.业务服务器收到上报消息

编辑

11.业务服务消息订阅者配置

package com.zzyl.config;import com.zzyl.constant.RedisQueueTopic;
import com.zzyl.listener.RedisQueueListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;import javax.annotation.Resource;@Configuration
public class RedisQueueConfig {@Resourceprivate RedisQueueListener redisQueueListener;@Beanpublic RedisConnectionFactory redisQueueConnectionFactory(RedisConnectionFactory redisConnectionFactory){RedisConnection connection = redisConnectionFactory.getConnection();connection.subscribe(redisQueueListener, RedisQueueTopic.TEST_TOPIC.getBytes(),RedisQueueTopic.TEST_TOPIC2.getBytes());return redisConnectionFactory;}
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/71947.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

代码随想录刷题记录 - 字符串

代码随想录刷题记录 - 字符串 1. 反转字符串 编写一个函数,其作用是将输入的字符串反转过来。输入字符串以字符数组 char[] 的形式给出。 不要给另外的数组分配额外的空间,你必须原地修改输入数组、使用 O(1) 的额外空间解决这一问题。 你可以假设数组中的所有字符都是 ASCII…

WPF中为Popup和ToolTip使用WindowMaterial特效 win10/win11

先看效果图:大致思路是:通过反射获取Popup内部的原生窗口句柄,然后通过前文已经实现的WindowMaterial类来应用窗口特效;对于ToolTip,为了保持其易用性,我使用了附加属性+全局样式的方式来实现,ToolTip也是一个特殊的Popup.前文链接:WPF 模拟UWP原生窗口样式——亚克力|…

数据采集与融合技术第一次作业

作业1 1)用requests和BeautifulSoup库方法定向爬取给定网址(http://www.shanghairanking.cn/rankings/bcur/2020 )的数据,屏幕打印爬取的大学排名信息。 import urllib.request from bs4 import BeautifulSoup# 定义 URL url = http://www.shanghairanking.cn/rankings/bcu…

『模拟赛』多校A层冲刺NOIP2024模拟赛07

『模拟赛记录』多校A层冲刺NOIP2024模拟赛07Rank 一般,挂大分场。A. 限速(speed) 签。 直接跑一棵最小生成树出来,然后 dfs 一遍,如果有边权不小于 \(k\) 的就给答案加上绝对值的差,若没有则再遍历一遍所有边找到与 \(k\) 之差绝对值最小的边插进去就行,答案就是这个绝对…

数据采集第二次作业

数据采集实践第二次作业 目录点击展开/收起作业①:定向爬取7日天气预报 作业②:定向爬取股票相关信息 作业③:定向爬取中国大学2021主榜信息 总结● 码云链接 作业1 xh102202145/crawl_project作业①:定向爬取7日天气预报 1.1 实验要求 在中国气象网(http://www.weather.…

全链路营销|基于事件驱动的流程编排系统 策略中心系统

全链路营销|基于事件驱动的流程编排系统 https://mp.weixin.qq.com/s/RHXyGaGyp_CK7FJPDqS3Cg 全链路营销|基于事件驱动的流程编排系统 原创 西赞 阿里云开发者 2024年10月14日 08:30 浙江 阿里妹导读本文主要介绍了 AE 策略中心的技术方案选型与落地实战。项目背景 全链路营…

去除 iPhone 设置右上角强制升级红色数字方法

iPhone 强制用户升级在设置右上角有红色数字提示,即使关闭了自动升级也清不掉。可以用快捷指令伪造一个设置的快捷方式来替代原生设置图标打开快捷指令,点击 + 新建快捷指令选择“打开App”点击 App标签,选取“设置”然后点击当前正在创建的快捷指令顶端的 “打开App”,自定…

免费又强大!这五款报表工具你一定要试试

1. 山海鲸可视化报表 简介:山海鲸报表是一款完全免费的专业报表工具,旨在帮助企业和个人用户轻松创建、管理、分享各类数据报表。该工具提供了免费一站式数据处理和展示平台,具备灵活的定制化能力,能够满足各种行业的报表需求,不仅能够处理各式复杂报表,而且提供了非常丰…