消息队列(Message Queue)
1.什么是消息
两台设备(例如服务与服务)之间传递的数据就可以称之为消息
2.什么是消息队列
消息队列是在消息的传输过程中保存消息的容器。
为什么使用消息队列
异步
例如发送验证码
解耦
例如服务与服务之间的调用
削峰限流
一个消息队列的基本组成
生产者producer
根据主题发送消息
消费者consumer
根据主题接收消息
代理者broker
存储消息,自动分发消息
主题topic
区分消息
消费者监听Listener
监听消息的到来
图示
1.定义Topic(主题)/Channel(频道)
使用接口定义Topic常量
2.创建订阅者类,接收发布者发布的消息
实现MessageListener,重写onMessage方法
3.添加订阅者类对象到容器中
注入redis连接工厂RedisConnectionFactory
获取redis连接getConnection
调用订阅方法
subscribe
4.测试发布订阅
convertAndSend
基本使用
1.在service模块下新建listener包
创建
package com.zzyl.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RedisQueueListener implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {String s = new String(message.getBody());String topic = new String(message.getChannel());log.info("监听到消息{},来自{}",s,topic);}
}
2.创建topic
package com.zzyl.constant;public interface RedisQueueTopic {String TEST_TOPIC = "test_topic";String TEST_TOPIC2 = "test_topic2";}
3.创建配置类
package com.zzyl.config;import com.zzyl.constant.RedisQueueTopic;
import com.zzyl.listener.RedisQueueListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;import javax.annotation.Resource;@Configuration
public class RedisQueueConfig {@Resourceprivate RedisQueueListener redisQueueListener;@Beanpublic RedisConnectionFactory redisQueueConnectionFactory(RedisConnectionFactory redisConnectionFactory){RedisConnection connection = redisConnectionFactory.getConnection();connection.subscribe(redisQueueListener, RedisQueueTopic.TEST_TOPIC.getBytes(),RedisQueueTopic.TEST_TOPIC2.getBytes());return redisConnectionFactory;}
}
4.测试使用
package com.zzyl;import com.zzyl.constant.RedisQueueTopic;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;import javax.annotation.Resource;@SpringBootTest
public class TestRedisQueue {@Resourceprivate RedisTemplate<String,Object> redisTemplate;@Testpublic void test(){redisTemplate.convertAndSend(RedisQueueTopic.TEST_TOPIC,"hello");}@Testpublic void tes1t(){redisTemplate.convertAndSend(RedisQueueTopic.TEST_TOPIC2,"hello2");}
}
5.输出结果
2024-09-26 10:38:33.724 [lettuce-nioEventLoop-4-2] INFO com.zzyl.listener.RedisQueueListener-监听到消息"hello2",来自test_topic2
接收Iot平台消息服务
1.导入jar包
<!-- amqp 1.0 qpid client -->
<dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.57.0</version>
</dependency>
<!-- util for base64-->
<dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version>
</dependency>
2.新建服务
3.新服务jar包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.lu</groupId><artifactId>zzyl-transfer-server</artifactId><version>0.0.1-SNAPSHOT</version><name>zzyl-transfer-server</name><description>zzyl-transfer-server</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--阿里云iot平台对接服务端订阅依赖--><!-- amqp 1.0 qpid client --><dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.57.0</version></dependency><!-- util for base64--><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude>