开发中经常用到发布订阅的功能,之前一直用的Redis,使用过程中也出现了一些问题,后来换了RabbitMQ,用上去更顺手,简单记录一下。
正文开始:
RabbitMQ是一个开源的,基于AMQP(Advanced Message Queuing Protocol)协议的完整的可复用的企业级消息队,RabbitMQ可以实现点对点,发布订阅等消息处理模式。
RabbitMQ有五种模式
- 简单工作模式(一对一):一个生产者,一个队列,一个消费者
- 工作模式(一对多):一个生产者,一个队列,多个消费者
- 发布订阅模式(Fanout):一个生产者,一个交换机,多个队列,多个消费者,一个消息可以被多个消费者消费
- 路由模式(Direct):一个生产者,一个交换机,多个队列,多个消费者,key,消息只发送给符合条件的消息队列
- 通配符模式(Topic):通配符和路由模式类似,路由是匹配Key,通配符是模糊匹配,主要符合模糊匹配条件,都可以收到消息
下面粘贴一下,项目中用到的一些RabbitMQ相关内容。
1、配置文件设置RabbitMQ的URL链接
{"Logging": {"LogLevel": {"Default": "Information","Microsoft.AspNetCore": "Warning"}},"AllowedHosts": "*","ConnectionStrings": {"MySql": "server=192.168.0.140;port=3306;userid=root;password=1qaz@WSX3edc;database=netsituation","RedisConnection": "127.0.0.1:6379,defaultDatabase=0"},"rabbit": {"uri": "amqp://guest:guest@127.0.0.1:5672/"},"Kestrel": {"EndPoints": {"Http": {"Url": "http://*:8001"}}} }
2、连接字符串类
/// <summary>/// 连接字符串类/// </summary>public class RabbitOption{public RabbitOption(IConfiguration config){if (config == null)throw new ArgumentException(nameof(config));var section = config.GetSection("rabbit");section.Bind(this);}public string Uri { get; set; }}
3、通过IOC创建单例的RabbitMQ的客户端
/// <summary>/// 通过IOC创建单例的RabbitMQ的客户端/// </summary>public static class ServiceCollectionExtensions{public static IServiceCollection AddeRabbitMQConnection(this IServiceCollection services, IConfiguration configuration){if (services == null)throw new ArgumentException(nameof(services));if (configuration == null)throw new ArgumentException(nameof(configuration));var rabbitOption = new RabbitOption(configuration);var factory = new ConnectionFactory { Uri = new Uri(rabbitOption.Uri), AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(60) };services.AddSingleton<IRabbitMQPersistentConnection>(x =>{var logger = x.GetRequiredService<ILogger<RabbitMQPersistentConnection>>();return new RabbitMQPersistentConnection(factory, logger);});return services;}}
4、定义连接RabbitMQ的接口
public interface IRabbitMQPersistentConnection{/// <summary>/// 判断是否连接/// </summary>bool IsConnected { get; }/// <summary>/// 连接/// </summary>/// <returns></returns>bool TryConnect();/// <summary>/// 创建模型/// </summary>/// <returns></returns> IModel CreateModel();}
5、接口实现
public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection{private readonly IConnectionFactory connectionFactory;private readonly ILogger<RabbitMQPersistentConnection> logger;private IConnection connection;private const int RETTRYCOUNT = 6;private static readonly object lockObj = new object();public static bool IsBreak = false;public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQPersistentConnection> logger){this.connectionFactory = connectionFactory;this.logger = logger;}public bool IsConnected{get{return connection != null && connection.IsOpen;}}public void Cleanup(){try{connection.Dispose();connection = null;}catch (IOException ex){logger.LogCritical(ex.ToString());}}public IModel CreateModel(){if (!IsConnected){connection.Close();throw new InvalidOperationException("连接不到rabbitmq");}return connection.CreateModel();}public bool TryConnect(){logger.LogInformation("RabbitMQ客户端尝试连接");lock (lockObj){if (connection == null){var policy = Policy.Handle<SocketException>().Or<BrokerUnreachableException>().WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>{logger.LogWarning(ex.ToString());});policy.Execute(() =>{connection = connectionFactory.CreateConnection();});}if (IsConnected){connection.ConnectionShutdown += OnConnectionShutdown;connection.CallbackException += OnCallbackException;connection.ConnectionBlocked += OnConnectionBlocked;logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}获取了连接");return true;}else{logger.LogCritical("无法创建和打开RabbitMQ连接");return false;}}}private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e){logger.LogWarning("RabbitMQ连接异常,尝试重连...");Cleanup();TryConnect();}private void OnCallbackException(object sender, CallbackExceptionEventArgs e){logger.LogWarning("RabbitMQ连接异常,尝试重连...");Cleanup();TryConnect();}private void OnConnectionShutdown(object sender, ShutdownEventArgs reason){logger.LogWarning("RabbitMQ连接异常,尝试重连...");IsBreak = true;Cleanup();TryConnect();}}
6、定义发布消息接口
public interface IMessageService{/// <summary>/// 发送消息(工作队列模式)/// </summary>/// <param name="queueName"></param>/// <param name="body"></param>void SendMessage(string queueName, byte[] body);/// <summary>/// 发送消息(发布订阅模式)/// </summary>/// <param name="exchangeName"></param>/// <param name="body"></param>void SendFanoutMessage(string exchangeName, byte[] body);}
7、实现发布消息接口
/// <summary>/// RabbitMQ实现/// </summary>public class MessageService : IMessageService{private readonly IRabbitMQPersistentConnection rabbitMQPersistentConnection;public MessageService(IRabbitMQPersistentConnection rabbitMQPersistentConnection){this.rabbitMQPersistentConnection = rabbitMQPersistentConnection;}public void SendFanoutMessage(string exchangeName,byte[] body){if (!rabbitMQPersistentConnection.IsConnected){rabbitMQPersistentConnection.TryConnect();}using (var channel = rabbitMQPersistentConnection.CreateModel()){//把交换机设置成Fanout模式channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false);var properties = channel.CreateBasicProperties();properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());properties.Persistent = true;channel.BasicPublish(exchangeName, "", properties, body);}}public void SendMessage(string queueName, byte[] body){if (!rabbitMQPersistentConnection.IsConnected){rabbitMQPersistentConnection.TryConnect();}using (var channel = rabbitMQPersistentConnection.CreateModel()){channel.QueueDeclare(queueName, true, false, false, null);var properties = channel.CreateBasicProperties();properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());properties.Persistent = true;channel.BasicPublish("", queueName, properties, body);}}}
8、RabbitMQ连接参数
/// <summary>/// RabbitMQ连接参数/// </summary>public class RabbitMQModel{/// <summary>/// 交换机名称/// </summary>public string ExchangeName = "pvm";/// <summary>/// 消息队列名称/// </summary>public string QueueName { get; set; }/// <summary>/// 消息内容/// </summary>public byte[] Body { get; set; }}