根据源码,模拟实现 RabbitMQ - 从需求分析到实现核心类(1)

目录

一、需求分析

1.1、对 Message Queue 的认识

1.2、消息队列核心概念

1.3、Broker Server 内部关键概念

1.4、Broker Server 核心 API (重点实现)

1.5、交换机类型

Direct 直接交换机

Fanout 扇出交换机

Topic 主题交换机

1.6、持久化

1.7、网络通信

通信流程

远程调用设计思想

1.8、模块设计图

二、实现核心类

2.1、交换机和队列的属性及绑定关系

2.2、Message 消息


一、需求分析


1.1、对 Message Queue 的认识

消息队列就是把 阻塞队列 这样的数据结构,单独提取成了一个独立的程序进行部署,实现 “进程和进程之间 / 服务和服务之间” 的生产者消费者模型(分布式系统中则是一组服务器构成的集群).

生产者消费者模型的好处如下:

  1. 解耦合:在一个分布式系统中 服务器A 给 服务器B 发送请求,B 给 A 返回响应,这样 A 和 B 的耦合是比较大的(B 一旦挂了,A 这边也无法正常接收响应);引入消息队列后,A 把请求发送给消息队列,B 再从消息队列中获取请求,就降低了耦合度(哪怕 B 挂了,A 也不用管,继续发送请求给消息队列,队列反馈响应).
  2. 削峰填谷:假设这样一个常见, A 是入口服务器, A 再调用 B 完成一些具体的业务,如果是 A 和 B 直接通信,突然有一天 A 收到用户请求的峰值,此时 B 也会随之感受到峰值;引入消息队列之后, A 把请求发给队列,这时候 B 就可以按照自己原有的节奏从队列中取请求,不至于一下子收到太多的并发量.

1.2、消息队列核心概念

生产者(Producer):发布消息的客户端应用程序.

消费者(Consumer):订阅消息的客户端应用程序,用于处理生产者产生的消息.

中间人(Broker):消费者要拿到生产者的消息,需要经过中间人,用来削峰填谷.

发布(Publish):生产者向中间人这里投递消息的过程.

订阅(Subscribe):类似于订阅报纸,消费者要从中间人这里取到对应消息的前提是,先订阅消息.

消费(Consume):消费者从中间人这里取数据的操作.

1.3、Broker Server 内部关键概念

1.虚拟主机(Virtual Host):类似于 MySQL 中的 database,是一个 “逻辑” 上的数据集合.

实际的开发中,一个 Broker Server  也可以有多种不同类型的数据,可能会同时用来管理多组 业务线 上的数据,可以使用 Virtual Host 进行逻辑上的区分~

2.交换机(Exchange):实际上,生产者是先把消息给了 Broker Server 上的某一个交换机,再由交换机把消息转发给对应的队列.

交换机就类似于公司的前台小姐姐,有一天你来面试,你就告诉前台小姐姐,然后她就会把你带到对应的楼层(队列).

3.队列(Queue):在一个大的消息队列中,可以又很多哥具体的小队列,他们用来存储消息实体,后续消费者也是从对应的队列中取数据.

4.绑定(Binding):把交换机和队列之间,建立起关联关系.

可以把 交换机 和 队列 看作 数据库 中的 “多对多” 这样的关系.

5.消息(Message):服务器 A  给 B 发的请求(通过 MQ 转发),就是一个消息,服务器 B 给 A 返回的响应(通过 MQ 转发)也是一个消息.

一个消息,可以视为是一个 字符串(二进制数据)。消息中具体包含啥样的数据,都是程序员自定义的.

Ps:这些概念,既需要在内存中存储(方便,快),也需要在硬盘中存储(持久化).

1.4、Broker Server 核心 API (重点实现)

1. 创建队列(queueDeclare)

这里不使用 Create 创建 这样的术语,而使用 Declare ,是因为这里以 RabbitMQ 为蓝本, Declare 表示 不存在则创建,存在就什么也不干.

2.销毁队列(queueDelete)

3.创建交换机(exchangeDeclare)

4.销毁交换机(exchangeDelete)

5.创建绑定(queueBind)

6.解除绑定(queueUnbind)

7.发布消息(basicPublish)

8.订阅消息(basicConsume)

9.确定消息(basicAck)

类似于 TCP 的确认应答机制,确认消息这个 api 就是让消费者显式告诉 broker server ,这个消息,我已经处理完毕了,提高整个系统的可靠性.

客户端除了提供以上 9 中方法,还需要提供 4 个方法:

1.创建 Connection

2.关闭 Connection

3.创建 Channel

4.关闭 Channel

 一个 Connection 对象,就代表一个 TCP 连接,里面可以包含多个 Channel ,每隔 Channel 上面传输的数据都是互不相干的.

有了 Connection 了,为什么还要搞一个 Channel ?

TCP 中,建立 / 断开一个连接成本还是挺高的,因此,很多时候,不希望频繁的建立断开 TCP 连接,因此引入 Channel ,相比于 TCP,就要轻量很多. Connection 和 Channel 之间的关系,就类似于 “网线” 一样.

1.5、交换机类型

交换机在转发消息的时候,按照转发规则,提供了几种不同的 交换机类型(ExchangeType)来描述这里不同的转发规则.

RabbitMQ 主要实现了 四种 交换机类型(也是 AMQP 协议定义的)

  1. Direct 直接交换机
  2. Fanout 扇出交换机
  3. Topic 主题交换机
  4. Header 消息头交换机

Ps:这里主要实现前三种交换机类型,因为第四种交换机规则复杂,应用场景少,前三种就已经够用了.

Direct 直接交换机

有两个关键概念:

bindingKey:把队列和交换机绑定的时候,指定一个单词(类似于暗号).

routingKey:生产者发送消息的时候,也指定一个单词.

当 routingKey 和 BindingKey 对上暗号了,就可以把这个消息转发到对应的队列中了.

生产者发送消息的时候,会指定一个 “目标队列的名字”(routingKey),交换机收到之后,就看看绑定的队列里,有没有匹配的队列(BindingKey),如果有,就转发过去(把消息塞进对应的队列中),如果没有,消息直接丢弃.

Fanout 扇出交换机

会将接收到的消息广播到每一个跟其绑定的queue,最后交给对应的消费者,值得注意的是,exchange负责消息路由,而不是存储,路由失败则消息丢失。

Topic 主题交换机

有两个关键概念:

bindingKey:把队列和交换机绑定的时候,指定一个单词(类似于暗号).

routingKey:生产者发送消息的时候,也指定一个单词.

当 routingKey 和 BindingKey 对上暗号了,就可以把这个消息转发到对应的队列中了.

Ps:TopicExchange 与 DirectExchange 十分类似,区别在于routingKey必须是多个单词的列表,并且以 分割。

1.6、持久化

上述这些概念(交换机、队列、绑定、消息....)对应的数据,都需要存储和管理起来,此时内存和硬盘都会各自存储一份,内存为主,硬盘为辅.

在内存中存储的原因:对于 MQ 来说,能够高效的转发处理数据式非常关键的,因此在内存上组织数据比硬盘上要快的多.

在硬盘上存储的原因:防止内存中的数据随着 进程重启/主机重启 而丢失.

Ps:硬盘存储,这个持久化是相对于 内存 的,对于一个硬盘来说,存储的消息寿命一般为 几年~几十年(一直不通电的情况下).

1.7、网络通信

通信流程

⽣产者和消费者都是客⼾端程序, broker server 则是作为服务器. 通过⽹络进⾏通信.

例如如下过程:

  1. 客户端发送请求:生产者的代码中需要有一个方法 queueDeclare 来创建队列,这个方法内部要做的事情就是给服务器发送一个请求,告诉服务器,咱们要创建一个队列,以及队列长啥样子......
  2. 服务器处理请求,并返回响应:broker server 收到这个请求之后,再执行服务器这边的 queueDeclare 方法,真正取给 内存/硬盘 上写一些数据,把这个队列给真正创建出来,再把创建 成功/失败 结果包装成响应,返回给客户端.
  3. 客户端接收响应:当响应回来了,客户端的 queueDeclare 就会获取到这个响应,看到 创建队列成功,此时 queueDeclare 就算执行完毕了.

远程调用设计思想

此处,客户端调用了一个本地方法,结果这个方法的背后,又给服务器发送了一系列消息,由服务器完成了一系列工作,站在调用者的角度,只是看到了这个功能完成了,并不知道背后的实现细节.

虽然调用的是一个本地方法,但实际上好像调用另一个远端服务器的方法一样~

这里可以认为是编写客户端服务器程序,通信过程中的一种设计思想——远程调用(RPC)

举个例子

以前有个美国的老哥,再大厂干活,但是他特别想摸鱼,于是他就找了一个中国人帮他干(有偿). 同样级别的程序员,在美国工作的薪水是国内的 3-4 倍.

1.8、模块设计图

二、实现核心类


Ps:Spring boot、MyBatis

2.1、交换机和队列的属性及绑定关系

我们可以使用 一个枚举类 表示三种交换机.

public enum ExchangeType {

    DIRECT(0),
    FANOUT(1),
    TOPIC(2);

    private final int type;

    private ExchangeType(int type) {
        this.type = type;
    }

    public int getType() {
        return type;
    }

}

交换机属性如下

public class Exchange {

    //模仿 rabbitmq 使用 name 作为唯一身份标识
    private String name;
    //交换价类型,DIRECT,FANOUT,TOPIC
    private ExchangeType type = ExchangeType.DIRECT;
    //当前交换机是否要持久化存储,true 标识持久化.
    private boolean durable = false;
    //TODO: 若当前交换机没人使用了,就会自动删除
    private boolean autoDelete = false;
    //TODO: 表示创建交换机时可以指定一些额外的选项
    private Map<String, Object> arguments = new HashMap<>();

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public ExchangeType getType() {
        return type;
    }

    public void setType(ExchangeType type) {
        this.type = type;
    }

    public boolean isDurable() {
        return durable;
    }

    public void setDurable(boolean durable) {
        this.durable = durable;
    }

    public boolean isAutoDelete() {
        return autoDelete;
    }

    public void setAutoDelete(boolean autoDelete) {
        this.autoDelete = autoDelete;
    }

    public Map<String, Object> getArguments() {
        return arguments;
    }

    public void setArguments(Map<String, Object> arguments) {
        this.arguments = arguments;
    }
}

队列属性如下

public class MSGQueue {
    //表示队列的身份标识
    private String name;
    //是否持久化,true 表示支持
    private boolean durable;
    //TODO: 这个属性为 true,表示队列只能被一个消费者使用, false表示大家都能用
    private boolean exclusive = false;
    //TODO: 自动删除,为 true 表示没有人使用以后自动删除
    private boolean autoDelete = false;
    //TODO: 扩展参数,自定义选项
    private Map<String, Object> arguments = new HashMap<>();

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public boolean isDurable() {
        return durable;
    }

    public void setDurable(boolean durable) {
        this.durable = durable;
    }

    public boolean isExclusive() {
        return exclusive;
    }

    public void setExclusive(boolean exclusive) {
        this.exclusive = exclusive;
    }

    public boolean isAutoDelete() {
        return autoDelete;
    }

    public void setAutoDelete(boolean autoDelete) {
        this.autoDelete = autoDelete;
    }

    public Map<String, Object> getArguments() {
        return arguments;
    }

    public void setArguments(Map<String, Object> arguments) {
        this.arguments = arguments;
    }
}

交换机和队列绑定关系如下

public class Binding {
    //交换机身份标识
    private String exchangeName;
    //队列身份标识
    private String queueName;
    private String bindingKey;

    //绑定没必要设置持久化,因为没有意义,他存在的意义前提是 交换机和队列存在(持久化)

    public String getExchangeName() {
        return exchangeName;
    }

    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public String getBindingKey() {
        return bindingKey;
    }

    public void setBindingKey(String bindingKey) {
        this.bindingKey = bindingKey;
    }
}

2.2、Message 消息

Message 消息主要分为:

  1.  属性部分 BasicProperties(网络):这是一个自定义类,里面承载着 Message 核心属性,如 消息的唯一身份标识、routingKey、是否需要持久化.......
  2.  正文部分 byte[]:承载着具体的消息内容.
  3. 辅助属性:通过 offsetBeg 和 offsetEnd 快速找到文件中某一个消息的具体位置(一个文件中会存储很多消息),这里约定的规则为 “前闭后开”(以字节为单位);isValid 标识消息要删除,如果删除采用逻辑删除(并不是真的删除,而是标记为无效数据),这里约定 0x1 为有效数据、0x0 表示无效数据.

Ps:前两个信息需要在网络上传输,并且写入文件,因此就需要通过 Serializable 序列化和反序列化(直接继承接口即可,不需要具体实现)~~ 

而第三条不需要被序列化保存到文件中,这个属性主要是为了内存中的 Message 对象快速找到 文件中的 Message  对象,因此使用 trasient 修饰即可防止序列化 

public class Message implements Serializable {
    //这两个属性是 Message 最核心的属性
    private BasicProperties basicProperties = new BasicProperties();
    private byte[] body;

    //以下是辅助类型的属性
    private transient long offsetBeg = 0; //消息数据的开头距离文件开头的位置偏移量(字节)
    private transient long offsetEnd = 0; //消息数据的结尾距离文件开头的位置偏移量(字节)
    //表示文件中的消息是否是有效消息(对文件中的消息,如果删除,使用逻辑删除)
    // 0x1 表示有效, 0x0 表示无效
    private byte isValid = 0x1;

    //创建一个工厂方法,让工厂方法创建 Message 对象
    //此方法中创建的 Message 对象,会自动生成唯一的 MessageId
    //万一 routingKey 和 basicProperties 里的 routingKey 冲突,以外面为主
    public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {
        Message message = new Message();
        if(basicProperties != null) {
            message.setBasicProperties(basicProperties);
        }
        //生成的 MessageId 以 M- 作为前缀
        message.setMessageId("M-" + UUID.randomUUID());
        message.setRoutingKey(routingKey);
        message.body = body;
        // offsetBeg, offsetEnd, isValid ,是消息持久化的时候才会用到,消息写入文件之前在进行设定
        return message;
    }

    public String getMessageId() {
        return basicProperties.getMessageId();
    }

    public void setMessageId(String messageId) {
        basicProperties.setMessageId(messageId);
    }

    public String getRoutingKey() {
        return basicProperties.getRoutingKey();
    }

    public void setRoutingKey(String routingKey) {
        basicProperties.setRoutingKey(routingKey);
    }

    public int getDeliverMode() {
        return basicProperties.getDeliverMode();
    }

    public void setDeliverMode(int mode) {
        basicProperties.setDeliverMode(mode);
    }

    public BasicProperties getBasicProperties() {
        return basicProperties;
    }

    public void setBasicProperties(BasicProperties basicProperties) {
        this.basicProperties = basicProperties;
    }

    public byte[] getBody() {
        return body;
    }

    public void setBody(byte[] body) {
        this.body = body;
    }

    public long getOffsetBeg() {
        return offsetBeg;
    }

    public void setOffsetBeg(long offsetBeg) {
        this.offsetBeg = offsetBeg;
    }

    public long getOffsetEnd() {
        return offsetEnd;
    }

    public void setOffsetEnd(long offsetEnd) {
        this.offsetEnd = offsetEnd;
    }

    public byte getIsValid() {
        return isValid;
    }

    public void setIsValid(byte isValid) {
        this.isValid = isValid;
    }
}

Message 中的核心属性如下

public class BasicProperties implements Serializable {

    //消息的唯一标识(使用 UUID 保证唯一性)
    private String messageId;
    //和 bindingKey 做匹配
    //如果当前交换机类型是 DIRECT ,此时 routingKey 就表示要转发的队列名
    //如果当前交换机类型是 FANOUT ,此时 routingKey 无意义(不使用)
    //如果当前交换机类型是 TOPIC ,此时 routingKey 就是要和 bindingKey 做匹配,匹配才进行转发
    private String routingKey;
    //标识消息是否要持久化,1 表示不持久化, 2 表示持久化(rabbitmq 是这么搞得)
    private int deliverMode = 1;

    //RabbitMQ 还有其他属性,这里就不考虑了


    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getRoutingKey() {
        return routingKey;
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public int getDeliverMode() {
        return deliverMode;
    }

    public void setDeliverMode(int deliverMode) {
        this.deliverMode = deliverMode;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/75457.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

pytest自动化测试框架tep环境变量、fixtures、用例三者之间的关系

tep是一款测试工具&#xff0c;在pytest测试框架基础上集成了第三方包&#xff0c;提供项目脚手架&#xff0c;帮助以写Python代码方式&#xff0c;快速实现自动化项目落地。 在tep项目中&#xff0c;自动化测试用例都是放到tests目录下的&#xff0c;每个.py文件相互独立&…

分布式图数据库 NebulaGraph v3.6.0 正式发布,强化全文索引能力

本次 v3.6.0 版本&#xff0c;主要强化全文索引能力&#xff0c;以及优化部分场景下的 MATCH 性能。 强化 强化增强全文索引功能&#xff0c;具体 pr 参见&#xff1a;#5567、#5575、#5577、#5580、#5584、#5587 优化 支持使用 MATCH 子句检索 VID 或属性索引时使用变量&am…

【Windows系统编程】02.进程与线程(一)-笔记

进程&#xff0c;进程对象 虚拟内存 进程不能执行代码&#xff0c;数据结构&#xff0c;三环PEB&#xff0c;0怀EPROCESS对进程进行管理 线程列表 线程才是真正执行代码 主线程&#xff1a;主函数 线程依赖于cpu时间片切换 单核&#xff0c;多核 主线程消息&#xff0c…

ChatGPT等人工智能编写文章的内容今后将成为常态

BuzzFeed股价上涨200%可能标志着“转向人工智能”媒体趋势的开始。 周四&#xff0c;一份内部备忘录被华尔街日报透露BuzzFeed正计划使用ChatGPT聊天机器人-风格文本合成技术来自OpenAI&#xff0c;用于创建个性化盘问和将来可能的其他内容。消息传出后&#xff0c;BuzzFeed的…

【数据结构与算法】十大经典排序算法-归并排序

&#x1f31f;个人博客&#xff1a;www.hellocode.top &#x1f3f0;Java知识导航&#xff1a;Java-Navigate &#x1f525;CSDN&#xff1a;HelloCode. &#x1f31e;知乎&#xff1a;HelloCode &#x1f334;掘金&#xff1a;HelloCode ⚡如有问题&#xff0c;欢迎指正&#…

地理数据的双重呈现:GIS与数据可视化

前一篇文章带大家了解了GIS与三维GIS的关系&#xff0c;本文就GIS话题带大家一起探讨一下GIS和数据可视化之间的关系。 GIS&#xff08;地理信息系统&#xff09;和数据可视化在地理信息科学领域扮演着重要的角色&#xff0c;它们之间密切相关且相互增强。GIS是一种用于采集、…

unity新输入系统的简单使用(New InputSystem)

1、在包管理器 unity注册表中下载安装InputSystem 2、给玩家添加组件PlayerInput&#xff0c;点击CreatAction,创建一个InputAct InputAct,这是玩家的输入文件&#xff0c;在里面可以设置玩家输入 3、使用 例如玩家控制角色移动 在InputAct中&#xff0c;默认已经设置好了移…

誉天HCIP-Datacom课程简介

HCIP-Datacom课程介绍&#xff1a;HCIP-Datacom分为一个核心技术方向&#xff1a;HCIP-Datacom-Core Technology H12-821 &#xff08;核心技术&#xff09;六个可选子方向&#xff1a;HCIP-Datacom-Advanced Routing & Switching Technology H12-831 &#xff08;高级路…

用Node.js吭哧吭哧撸一个运动主页

简单唠唠 某乎问题&#xff1a;人这一生&#xff0c;应该养成哪些好习惯&#xff1f; 问题链接&#xff1a;https://www.zhihu.com/question/460674063 如果我来回答肯定会有定期运动的字眼。 平日里也有煅练的习惯&#xff0c;时间久了后一直想把运动数据公开&#xff0c;…

Ant Design Mobile是什么?

在当今的数字时代&#xff0c;移动应用程序和网页设计已经成为各行各业的重要组成部分。用户界面的设计直接影响到用户体验和产品的成功。为了帮助设计师在移动设计领域更好&#xff0c;Antdesignmobile应运而生。Antdesignmobile是蚂蚁金服的移动UI设计语言和框架&#xff0c;…

网络通信TCP/IP协议逐层分析数据链路层(第四十课)

Ethernet Ⅱ帧,也称为Ethernet V2帧,是如今局域网里最常见的以太帧,是以太网事实标准。如今大多数的TCP/IP应用(如HTTP、FTP、SMTP、POP3等)都是采用Ethernet II帧承载。 1、MAC地址概述 -MAC地址,即以太网地址,用来标识一个以太网上的某个单独设备或一组设备 -长度…

认识excel篇3之数据的有效性(数据验证)

数据有效性不仅能够对单元格的输入数据进行条件限制&#xff0c;还可以在单元格中创建下拉列表菜单方便用户选择输入。如果没有做数据验证&#xff0c;单元格内默认可以输入任意类型的数据。数据验证就是限制单元格输入数据&#xff08;必须输入符合要求的才能输入&#xff09;…

Spring Boot+Redis 实现消息队列实践示例

Spring BootRedis 实现一个轻量级的消息队列 文章目录 Spring BootRedis 实现一个轻量级的消息队列0.前言1.基础介绍2.步骤2.1. 引入依赖2.2. 配置文件2.3. 核心源码 4.总结答疑 5.参考文档6. Redis从入门到精通系列文章 0.前言 本文将介绍如何利用Spring Boot与Redis结合实现…

Docker容器与虚拟化技术:Docker资源控制、数据管理

目录 一、理论 1.资源控制 2.Docker数据管理 二、实验 1.Docker资源控制 2.Docker数据管理 三、问题 1.docker容器故障导致大量日志集满&#xff0c;造成磁盘空间满 2、当日志占满之后如何处理 四、总结 一、理论 1.资源控制 (1) CPU 资源控制 cgroups&#xff0…

数据结构:堆的实现

1.堆的概念 如果有一个关键码的集合 K { k1 &#xff0c;k2 &#xff0c;k3 &#xff0c;…&#xff0c;kn }&#xff0c;把它的所有元素按完全二叉树的顺序存储方式存储在一个一维数组中&#xff0c;并且 k(i) < k(i*21) 和 k(i) < k(i*22)&#xff0c; i 0 &#xff…

BUUCTF [安洵杯 2019]easy_serialize_php 1 详细讲解

题目来自buuctf&#xff0c;这是一题关于php序列化逃逸的题 1. 题目 题目给出的代码 <?php$function $_GET[f];function filter($img){$filter_arr array(php,flag,php5,php4,fl1g);$filter /.implode(|,$filter_arr)./i;return preg_replace($filter,,$img); }if($_S…

炬芯科技发布全新第二代智能手表芯片,引领腕上新趋势!

2023年7月&#xff0c;炬芯科技宣布全新第二代智能手表芯片正式发布。自2021年底炬芯科技推出第一代的智能手表芯片开始便快速获得了市场广泛认可和品牌客户的普遍好评。随着技术的不断创新和突破&#xff0c;为了更加精准地满足市场多元化的变幻和用户日益增长的体验需求&…

localhost:8080 is already in use

报错原因&#xff1a;本机的8080端口号已经被占用。因为机器的空闲端口号是随机分配的&#xff0c;而idea默认启动的端口号是8080,所以是存在这种情况。 对于这个问题&#xff0c;我们只需要重启idea或者修改项目的启动端口号即可。 更推荐第二种。对于修改项目启动端口号&…