.Net Core中使用RabbitMQ

news/2024/10/4 19:24:20

开发中经常用到发布订阅的功能,之前一直用的Redis,使用过程中也出现了一些问题,后来换了RabbitMQ,用上去更顺手,简单记录一下。

正文开始:

RabbitMQ是一个开源的,基于AMQP(Advanced Message Queuing Protocol)协议的完整的可复用的企业级消息队,RabbitMQ可以实现点对点,发布订阅等消息处理模式。

RabbitMQ有五种模式

  1. 简单工作模式(一对一):一个生产者,一个队列,一个消费者
  2. 工作模式(一对多):一个生产者,一个队列,多个消费者
  3. 发布订阅模式(Fanout):一个生产者,一个交换机,多个队列,多个消费者,一个消息可以被多个消费者消费
  4. 路由模式(Direct):一个生产者,一个交换机,多个队列,多个消费者,key,消息只发送给符合条件的消息队列
  5. 通配符模式(Topic):通配符和路由模式类似,路由是匹配Key,通配符是模糊匹配,主要符合模糊匹配条件,都可以收到消息

下面粘贴一下,项目中用到的一些RabbitMQ相关内容。

1、配置文件设置RabbitMQ的URL链接

{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"AllowedHosts": "*","ConnectionStrings": {"MySql": "server=192.168.0.140;port=3306;userid=root;password=1qaz@WSX3edc;database=netsituation","RedisConnection": "127.0.0.1:6379,defaultDatabase=0"},"rabbit": {"uri": "amqp://guest:guest@127.0.0.1:5672/"},"Kestrel": {"EndPoints": {"Http": {"Url": "http://*:8001"}}}
}
View Code

2、连接字符串类

    /// <summary>/// 连接字符串类/// </summary>public class RabbitOption{public RabbitOption(IConfiguration config){if (config == null)throw new ArgumentException(nameof(config));var section = config.GetSection("rabbit");section.Bind(this);}public string Uri { get; set; }}
View Code

3、通过IOC创建单例的RabbitMQ的客户端

    /// <summary>/// 通过IOC创建单例的RabbitMQ的客户端/// </summary>public static class ServiceCollectionExtensions{public static IServiceCollection AddeRabbitMQConnection(this IServiceCollection services, IConfiguration configuration){if (services == null)throw new ArgumentException(nameof(services));if (configuration == null)throw new ArgumentException(nameof(configuration));var rabbitOption = new RabbitOption(configuration);var factory = new ConnectionFactory { Uri = new Uri(rabbitOption.Uri), AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(60) };services.AddSingleton<IRabbitMQPersistentConnection>(x =>{var logger = x.GetRequiredService<ILogger<RabbitMQPersistentConnection>>();return new RabbitMQPersistentConnection(factory, logger);});return services;}}
View Code

4、定义连接RabbitMQ的接口

    public interface IRabbitMQPersistentConnection{/// <summary>/// 判断是否连接/// </summary>bool IsConnected { get; }/// <summary>/// 连接/// </summary>/// <returns></returns>bool TryConnect();/// <summary>/// 创建模型/// </summary>/// <returns></returns>
        IModel CreateModel();}
View Code

5、接口实现

    public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection{private readonly IConnectionFactory connectionFactory;private readonly ILogger<RabbitMQPersistentConnection> logger;private IConnection connection;private const int RETTRYCOUNT = 6;private static readonly object lockObj = new object();public static bool IsBreak = false;public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQPersistentConnection> logger){this.connectionFactory = connectionFactory;this.logger = logger;}public bool IsConnected{get{return connection != null && connection.IsOpen;}}public void Cleanup(){try{connection.Dispose();connection = null;}catch (IOException ex){logger.LogCritical(ex.ToString());}}public IModel CreateModel(){if (!IsConnected){connection.Close();throw new InvalidOperationException("连接不到rabbitmq");}return connection.CreateModel();}public bool TryConnect(){logger.LogInformation("RabbitMQ客户端尝试连接");lock (lockObj){if (connection == null){var policy = Policy.Handle<SocketException>().Or<BrokerUnreachableException>().WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>{logger.LogWarning(ex.ToString());});policy.Execute(() =>{connection = connectionFactory.CreateConnection();});}if (IsConnected){connection.ConnectionShutdown += OnConnectionShutdown;connection.CallbackException += OnCallbackException;connection.ConnectionBlocked += OnConnectionBlocked;logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}获取了连接");return true;}else{logger.LogCritical("无法创建和打开RabbitMQ连接");return false;}}}private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e){logger.LogWarning("RabbitMQ连接异常,尝试重连...");Cleanup();TryConnect();}private void OnCallbackException(object sender, CallbackExceptionEventArgs e){logger.LogWarning("RabbitMQ连接异常,尝试重连...");Cleanup();TryConnect();}private void OnConnectionShutdown(object sender, ShutdownEventArgs reason){logger.LogWarning("RabbitMQ连接异常,尝试重连...");IsBreak = true;Cleanup();TryConnect();}}
View Code

6、定义发布消息接口

    public interface IMessageService{/// <summary>/// 发送消息(工作队列模式)/// </summary>/// <param name="queueName"></param>/// <param name="body"></param>void SendMessage(string queueName, byte[] body);/// <summary>/// 发送消息(发布订阅模式)/// </summary>/// <param name="exchangeName"></param>/// <param name="body"></param>void SendFanoutMessage(string exchangeName, byte[] body);}
View Code

7、实现发布消息接口

    /// <summary>///  RabbitMQ实现/// </summary>public class MessageService : IMessageService{private readonly IRabbitMQPersistentConnection rabbitMQPersistentConnection;public MessageService(IRabbitMQPersistentConnection rabbitMQPersistentConnection){this.rabbitMQPersistentConnection = rabbitMQPersistentConnection;}public void SendFanoutMessage(string exchangeName,byte[] body){if (!rabbitMQPersistentConnection.IsConnected){rabbitMQPersistentConnection.TryConnect();}using (var channel = rabbitMQPersistentConnection.CreateModel()){//把交换机设置成Fanout模式channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false);var properties = channel.CreateBasicProperties();properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());properties.Persistent = true;channel.BasicPublish(exchangeName, "", properties, body);}}public void SendMessage(string queueName, byte[] body){if (!rabbitMQPersistentConnection.IsConnected){rabbitMQPersistentConnection.TryConnect();}using (var channel = rabbitMQPersistentConnection.CreateModel()){channel.QueueDeclare(queueName, true, false, false, null);var properties = channel.CreateBasicProperties();properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());properties.Persistent = true;channel.BasicPublish("", queueName, properties, body);}}}
View Code

8、RabbitMQ连接参数

    /// <summary>/// RabbitMQ连接参数/// </summary>public class RabbitMQModel{/// <summary>/// 交换机名称/// </summary>public string ExchangeName = "pvm";/// <summary>/// 消息队列名称/// </summary>public string QueueName { get; set; }/// <summary>/// 消息内容/// </summary>public byte[] Body { get; set; }}
View Code

 

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/28878.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

一行SQL语句实现统计未来7天、按月统计数据,无数据填充0

未来7天、按月统计数据,无数据填充0 help_topic1 背景由于业务需求,在项目的报表中心中需要未来7天、按月统计数据,且要求按天补全数据,补数据填为0。  附实测SQL语句,请大家指正。 2 举例 2.1未来7天,按天补全数据,无数据填充0 sql语句:select t1.lastDays as x, IF…

【译】使用(滥用)LLM 压缩文本

来源:o565.com/llm-text-compression/介绍 大型语言模型是在大型文本数据集上进行训练的,以学习更大文档中单词的关系和上下文。这些关系是模型生成文本的基础。 最近,我读到了关于 LLMs 被训练在受版权保护的文本上并将其复制的担忧。这让我想:可以从 LLM 中提取训练文本吗…

NodeJS路径遍历:示例及预防

让我们来看看什么是路径遍历攻击,以及在Node.js中可以采用哪些方法来阻止这种攻击。构建一个安全而健壮的应用程序需要考虑的因素很多,并非一件容易的事情。要确保覆盖所有潜在的漏洞是一项十分艰巨的任务,这需要大量的经验和指导。在这些漏洞中,有一个和系统目录访问安全相…

k8s搭建集群

1.单master集群 模式缺点:如果master宕机了,就整个集群也没有办法访问了。2.多master集群 模式 3.快速搭建k8s集群--Kubeadm: 4.快速搭建k8s集群--二进制方式:

在Windows运行Gitlab Runner打包基于.NET Framework 4.6.1的项目

摘要 本文详细描述了运行在Windows商的Gitlab Runner,如何自动集成.NET Framework的项目。 Gitlab中的变量 变量1:NUPKG_OUTPUT_ROOT 这个目录是在git获取的解决方案根目录之外,因为stages变了以后,当前Gitlab Runner工作的当前解决方案根目录下会被清空。我们希望build了以…

何小鹏:活过淘汰赛,要做多边形战士下的规模第一

在北京车展上,小鹏汽车董事长 CEO 何小鹏接受媒体采访,就价格战、未来趋势、小米汽车等行业热点话题回答了媒体的提问。作为造车新势力的重要厂商,小鹏接下来也面临着市场淘汰赛的考验。 一方面是越来越激烈的价格战,另一方面还有小米、华为这样的科技大厂的强势入局,何小…