RabbitMQ 发布订阅(Publish Subscribe)模式示例

news/2024/10/19 10:36:27

总结自:BV15k4y1k7Ep

交换机

订阅模式示例图:

1556014499573

在简单模式和工作队列模式中,只有 3 个角色:

  • P:生产者,也就是要发送消息的程序。

  • C:消费者,消息的接受者,会一直等待消息到来。

  • Queue:消息队列,图中红色部分。

而在订阅模型中,多了一个 Exchange 角色,而且工作过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X(交换机)。

  • C:消费者,消息的接受者,会一直等待消息到来。

  • Queue:消息队列,接收消息、缓存消息。

  • Exchange:交换机,图中的 X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。

    Exchange 有常见以下 3 种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列。

    • Direct:定向,把消息交给符合指定 routing key 的队列。

    • Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列。

Exchange(交换机)只负责转发消息,不具备存储消息的能力。因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

模式说明

1556010329032

发布订阅模式:

1、每个消费者监听自己的队列。

2、生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

代码

1)生产者

和简单模式和工作队列模式相比,发布订阅模式会声明交换机,声明多个队列,并将队列和交换机绑定:

/** 声明交换机* 参数 1:交换机名称* 参数 2:交换机类型,fanout、topic、direct、headers*/
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列
/** 参数 1:队列名称* 参数 2:是否定义持久化队列* 参数 3:是否独占本次连接* 参数 4:是否在不使用的时候自动删除队列* 参数 5:队列其它参数*/
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);// 队列绑定交换机
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");

发送消息时,会发送到指定交换机:

/** 参数 1:交换机名称,如果没有指定则使用默认 Default Exchange* 参数 2:路由 key,简单模式可以传递队列名称* 参数 3:消息其它属性* 参数 4:消息内容*/
channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes());

以下为完整代码:

package com.zhangmingge.rabbitmq.ps;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhangmingge.rabbitmq.ConnectionUtil;/*** 发布与订阅使用的交换机类型为:fanout*/
public class Producer {// 交换机名称static final String FANOUT_EXCHANGE = "fanout_exchange";// 队列名称static final String FANOUT_QUEUE_1 = "fanout_queue_1";// 队列名称static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {// 创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();/** 声明交换机* 参数 1:交换机名称* 参数 2:交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/** 参数 1:队列名称* 参数 2:是否定义持久化队列* 参数 3:是否独占本次连接* 参数 4:是否在不使用的时候自动删除队列* 参数 5:队列其它参数*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);// 队列绑定交换机channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");for (int i = 1; i <= 10; i++) {// 发送信息String message = "你好:小兔子!发布订阅模式--" + i;/** 参数 1:交换机名称,如果没有指定则使用默认 Default Exchange* 参数 2:路由 key,简单模式可以传递队列名称* 参数 3:消息其它属性* 参数 4:消息内容*/channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes());System.out.println("已发送消息:" + message);}// 关闭资源channel.close();connection.close();}
}

2)消费者 1

和简单模式和工作队列模式相比,发布订阅模式的消费者多了声明交换机和将队列绑定到交换机的过程:

// 声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列
/** 参数 1:队列名称* 参数 2:是否定义持久化队列* 参数 3:是否独占本次连接* 参数 4:是否在不使用的时候自动删除队列* 参数 5:队列其它参数*/
channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);// 队列绑定交换机
channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHANGE, "");

下面为完整代码:

package com.zhangmingge.rabbitmq.ps;import com.rabbitmq.client.*;
import com.zhangmingge.rabbitmq.ConnectionUtil;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/** 参数 1:队列名称* 参数 2:是否定义持久化队列* 参数 3:是否独占本次连接* 参数 4:是否在不使用的时候自动删除队列* 参数 5:队列其它参数*/channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);// 队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHANGE, "");// 创建消费者,并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {/** consumerTag 消息者标签,在 channel.basicConsume 时候可以指定* envelope 消息包的内容,可从中获取消息 id,消息 routingKey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {// 路由 keySystem.out.println("路由 key 为:" + envelope.getRoutingKey());// 交换机System.out.println("交换机为:" + envelope.getExchange());// 消息 idSystem.out.println("消息 id 为:" + envelope.getDeliveryTag());// 收到的消息System.out.println("消费者 1-接收到的消息为:" + new String(body, "utf-8"));}};// 监听消息/** 参数 1:队列名称* 参数 2:是否自动确认,设置为 true 为表示消息接收到自动向 mq 回复接收到了,mq 接收到回复会删除消息,设置为 false 则需要手动确认* 参数 3:消息接收到后回调*/channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);}
}

3)消费者 2

消费者 2 和消费者 1 只有日志内容和绑定的队列不同,其他没有区别。

package com.zhangmingge.rabbitmq.ps;import com.rabbitmq.client.*;
import com.zhangmingge.rabbitmq.ConnectionUtil;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {...channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE, "");// 创建消费者,并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {...System.out.println("消费者 2-接收到的消息为:" + new String(body, "utf-8"));}};// 监听消息...channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);}
}

测试

启动所有消费者,然后使用生产者发送消息,在每个消费者对应的控制台可以查看到生产者发送的所有消息,达到广播的效果。

在执行完测试代码后,其实到 RabbitMQ 的管理后台找到Exchanges选项卡,点击fanout_exchange的交换机,可以查看到如下的绑定:

1556015006220

小结

交换机需要与队列进行绑定,绑定之后,一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/73362.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

Leetcode 1135. 最低成本连通所有城市

1.题目基本信息 1.1.题目描述 想象一下你是个城市基建规划者,地图上有 n 座城市,它们按以 1 到 n 的次序编号。 给你整数 n 和一个数组 conections,其中 connections[i] = [x_i, y_i, cost_i] 表示将城市 x_i 和城市 y_i 连接所要的cost_i(连接是双向的)。 返回连接所有城…

Leetcode 1129. 颜色交替的最短路径

1.题目基本信息 1.1.题目描述 给定一个整数 n,即有向图中的节点数,其中节点标记为 0 到 n – 1。图中的每条边为红色或者蓝色,并且可能存在自环或平行边。 给定两个数组 redEdges 和 blueEdges,其中:redEdges[i] = [a_i, b_i] 表示图中存在一条从节点 a_i 到节点 b_i 的红…

Linux系统命令3

1、df 查看磁盘使用情况Filesystem:代表该文件系统时哪个分区,所以列出的是设备名称。 1K-blocks:说明下面的数字单位是1KB,可利用-h或-m来改变单位大小,也可以用-B来设置。 Used:已经使用的空间大小。Available:剩余的空间大小。 Use%:磁盘使用率。如果使用率在90%以上…

线性表学习1

线性结构 若结构是非空有限集,则有且仅有一个开始结点和一个终端结点,并且除了首尾节点外所有结点都最多只有一个直接前趋和一个直接后继。 可表示为:(a1,a2,a3,...) 特点:只有一个首结点和尾结点 本质特征:除首尾结点外,其他结点只有一个直接前驱和一个直 接后继。 简言…

学习 gradle 基础

简介 Gradle 的优势一款最新的,功能最强大的构建工具,用它逼格更高 使用 Groovy 或者 Kotlin 代替 XML,使用程序代替传统的 XML 配置,项目构建更灵活 丰富的第三方插件,让你随心所欲使用 完善 Android,Java 开发技术体系下载和安装 官网地址 https://services.gradle.org…

AutoResetEvent双向信号(生产者和消费者)例子

AutoResetEvent是一个非常有用的线程同步机制,尤其是在处理生产者和消费者问题的时候,尤其适用。本随笔记录下生产者和消费者一对一问题的两种写法并进行代码执行逻辑的分析,来加深对AutoResetEvent的理解。 写法一:internal class Program {public static AutoResetEvent …

数据采集和融合技术作业1

作业① 1)用requests和BeautifulSoup库方法定向爬取给定网址的数据,屏幕打印爬取的大学排名信息。 a、主要代码解析 该函数从获取的JSON数据中提取前 num 名大学的信息,并将这些信息存储到 ulist 列表中,同时格式化输出这些大学的排名信息 def printUnivList(ulist, html, …

沃顿商学院商业人工智能笔记-六-

沃顿商学院商业人工智能笔记(六) P46:12_简介.zh_en - GPT中英字幕课程资源 - BV1Ju4y157dK 嗨,我是迈克尔罗伯茨。我是威廉H罗伯茨教授。 我是宾夕法尼亚大学沃顿商学院的金融学劳伦斯教授。 在这一系列视频中,我们将讨论金融、机器学习。以及人工智能。因此,当我想到金…