ActiveMQ主从架构和集群架构的介绍及搭建

目录

一、主从和集群架构的特点

1.1 主从架构的-Master/slave模式特点

1.2 集群架构-Cluster模式特点

二、ActiveMQ的主从架构

2.1 架构图

2.2 特点

2.3 实现方式(3种)

2.4 实现 (基于LevelDB复制)

 2.4.1 准备环境

2.4.2 启动

2.4.3 宕机测试

三、ActiveMq的集群架构

3.1 架构图

3.2 集群的配置实现

3.2.1 静态的发现

3.2.3 动态的发现

3.3 集群的具体实现(动态的发现)

  3.4.1 准备环境 (同2.4.1)

3.4.2 修改配置文件  config/activemq.xml (三台服务器activemq相同配置)

3.4.3 启动三台服务

3.4.4 集群成功,查看页面

 3.4.5 代码测试


一、主从和集群架构的特点

1.1 主从架构的-Master/slave模式特点

读写分离,纵向扩展,所有的写操作一般在master上完成,slave只提供一个热备

1.2 集群架构-Cluster模式特点

分布式的一种存储,水平的扩展,消息的分布式共享

二、ActiveMQ的主从架构

2.1 架构图

2.2 特点

  •  只有master对外提供服务,也就是说,producer和consumer智能连接master
  •  一个master下面可以有一个或者多个slave,slave不对外提供服务
  •  一个slave只能属于一个master
  •  整个主从架构中只有一个master,否则容易造成数据复制到混乱
  •  master、slave之间的数据是同步的(共享方式有3种)是一个同步的复制

2.3 实现方式(3种)

  1. 基于文件(Shared File System),需要创建一个共享的持久化文件
        <persistenceAdapter>
            <kahaDB directory="/data/kahadb"/> # 自定义的地址
        </persistenceAdapter>

        主要是通过共享目录存储目录来实现master和slave的热备,谁先启动,谁就可以最终取得共享目录的控制权成为master,其它的应用就只能作为slave 

  1. 基于数据库(JDBC Master Slave),需要创建一个共享的数据库
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destory-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="root" />
    <property name="poolPreparedStatements" value="true"/>
</bean>
<persistenceAdapter>
    <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceAdapter>

        与shared filesystem方式类似,知识共享的存储介质由文件系统改成数据库而已

  1. 基于LevelDB复制(Replicated LevelDB Store),需要zookeeper的支持,例如使用activemq本身就支持的LevelDB持久化。

2.4 实现 (基于LevelDB复制)

 2.4.1 准备环境

这里准备3台服务器:192.168.190.200、192.168.190.201、192.168.190.202

我这里三台主机名分别设置了 master node1 node2

三台服务器均下载安装activemq

[root@master /]# mkdir myactivemq
[root@master /]# cd myactivemq
[root@master opt]# wget https://archive.apache.org/dist/activemq/5.15.9/apache-activemq-5.15.9-bin.tar.gz
[root@master opt]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz 

2.4.2 修改配置文件  config/activemq.xml (三台服务器activemq相同配置)

zkAddress 为zookeeper的地址 

        <!-- 
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>
        -->
        <persistenceAdapter>
            <replicatedLevelDB
            	directory="${activemq.data}/leveldb"
            	replicas="3"
            	bind="tcp://0.0.0.0:0"
            	zkAddress="192.168.190.200:2181"
            	hostname="192.168.190.201"  <!-- 集群中任意一台服务器ip或通过host文件配置的主机名 -->
            	sync="local_disk"
            	zkPath="/activemq/leveldb-stores"
            />
        </persistenceAdapter>

如果是单机需要修改下边配置,61616端口修改,其它全部注掉,防止端口冲突,我们这里是多服务器配置,可以不用更改,本配置忽略下边配置 

        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <!--
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            -->
        </transportConnectors>

2.4.2 启动

先启动zookeeper,然后分别启动三台服务器的activemq

[root@master bin]# pwd
/myactivemq/apache-activemq-5.15.9/bin
[root@master bin]# ./activemq start 

我这里先启动了241的节点,所以241为主master节点

因此我们只能访问241的控制界面

2.4.3 宕机测试

选择master节点, ./activemq stop

我们发现其中一台从节点被选举为主节点

重新启动原来master,其会作为slave服务器继续提供服务

三、ActiveMq的集群架构

3.1 架构图

3.2 集群的配置实现

3.2.1 静态的发现

需要在节点的配置文件中,显示的配置其他节点的IP地址和服务端口号,例如:

<networkConnectors>
    <networkConnector uri="static:(tcp://192.168.190.200:61616,tcp://192.168.190.201:61616,tcp://192.168.190.202:61616)" />
</networkConnectors>

3.2.3 动态的发现

通过广播的方式,动态的发现其它节点。例如:

<networkConnectors>
    <networkConnector uri="multicast://default" />  <!--  这里defalut广播名可以是随意取的  -->
</networkConnectors>
<transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default"
/>
            <!--
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            -->
        </transportConnectors>

在每一个节点中activemq.xml配置文件中配置上述代码。其中defalut是我们自定义的名称,在discoveryUri属性中进行引用即可。有利于集群节点的动态变更。

注意,如单机部署多个activemq示例,jetty.xml文件中的8161端口也需要更改

3.3 集群的具体实现(动态的发现)

  3.4.1 准备环境 (同2.4.1)

这里准备3台服务器:192.168.190.200、192.168.190.201、192.168.190.202

三台服务器均下载安装activemq

[root@master /]# mkdir myactivemq
[root@master /]# cd myactivemq
[root@master opt]# wget https://archive.apache.org/dist/activemq/5.15.9/apache-activemq-5.15.9-bin.tar.gz
[root@master opt]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz 

3.4.2 修改配置文件  config/activemq.xml (三台服务器activemq相同配置)

<networkConnectors>
    <networkConnector uri="multicast://default" />  <!--  这里defalut广播名可以是随意取的  -->
</networkConnectors>
<transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default"
/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

 每台brokerName不能重名,名字随意

 

3.4.3 启动三台服务

我这里启动2台的能成功加入集群,启动三台失败,...当启动第三台的时候,查看日志报错如下

2024-04-15 12:06:18,843 | WARN  | Failed to add Connection id=localhost->localhost-36779-1713153927901-42:2, clientId=NC_localhost_outbound due to {} | org.apache.activemq.broker.TransportConnection | ActiveMQ Transport: tcp:///192.168.190.202:50616@61616
javax.jms.InvalidClientIDException: Broker: localhost - Client: NC_localhost_outbound already connected from tcp://192.168.190.200:47904
        at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:247)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:119)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)[activemq-client-5.15.9.jar:5.15.9]
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)[activemq-broker-5.15.9.jar:5.15.9]
        at org.apache.activemq.transport.MutexTransport.onComm

 我们打开2台集群中的其中一台可以看到,连接的名字都是NC:localhost:outbound,

其中localhost是brokerName,所以这里需要修改每一台的brokerName不能重名

启动第一台服务器activemq 

这里查看的都是一台服务器的日志

 加入第二台服务器的日志

加入第三台服务器的日志

3.4.4 集群成功,查看页面

随便查看一台服务器的地址的可视页面,可以看到集群成功

 3.4.5 代码测试

生产者连接其中一个activemq实例生产消息

package com.dolphin;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.core.MessagePostProcessor;

import javax.jms.*;

public class JmsProduce {
    public static final String ACTIVEMQ_URL = "tcp://192.168.190.200:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        //1 创建连接工厂,按照规定的url地址,采用默认用户名和密码 admin/admin
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2 通过连接工厂,获得链接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        //3、创建会话session
        //两个参数,第一个叫事务/第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        //6、 通过使用messageProducer生产3条消息发送到MQ的队列里面
        for (int i = 1;i<=6;i++) {
            //7 创建消息
            TextMessage textMessage = session.createTextMessage("message---" + i);//理解为一个字符串
            textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //8 通过messageProducer发送给mq
            messageProducer.send(textMessage);
        }
        //9 关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("*****消息发布完成");
    }
}

消费者连接另一台服务实例

package com.dolphin;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

public class JmsConsumer1 {
    public static final String ACTIVEMQ_URL = "tcp://192.168.190.201:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        System.out.println("1号消费者");
        //1 创建连接工厂,按照规定的url地址,采用默认用户名和密码 admin/admin
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2 通过连接工厂,获得链接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        //3、创建会话session
        //两个参数,第一个叫事务/第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4、创建目的地(具体是队列还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5、创建消息的消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //6、 通过使用messageProducer生产3条消息发送到MQ的队列里面

        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("*****消费者接受到消息"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }

                }
            }
        });
        System.in.read();  //进程处于运行状态
        messageConsumer.close();
        session.close();
        connection.close();
        System.out.println("*****消息消费完成");
    }
}

运行生产者生产了6条消息,消息发送到指定的服务实例上,其它集群实例中看不到消息

运行连接了另一台服务的消费者 

运行结果,这里消费了6条上述生产者的消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/556503.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2024化工制造企业数字化白皮书

来源&#xff1a;蓝凌研究院 中国石油和化学工业联合会发布2023年中国石油和化工行业经济运行情况。数据显示&#xff0c;2023年&#xff0c;我国石化行业实现营业收入15.95万亿元&#xff0c; 同比下降1.1%&#xff0c;利润总额8733.6亿元&#xff0c;行业经济运行总体呈现低…

vscode搭建C/C++环境

文章目录 一、安装vscode 二、下载安装g 三、安装VSCode插件 四、配置运行环境 一、安装vscode 直接官网免费下载&#xff1a;下载完成后进行安装&#xff0c;记得更换安装路径Visual Studio Code - Code Editing. RedefinedVisual Studio Code is a code editor redefine…

大数据------额外插件及技术------Git(完整知识点汇总)

Git 定义 它是分布式版本控制工具&#xff0c;主要用于管理开发过程中的源代码文件&#xff08;如&#xff1a;Java类、xml文件、html页面等&#xff09;&#xff0c;在软件开发过程中被广泛应用 作用 代码回溯&#xff1a;快速回到某一代码历史版本版本切换&#xff1a;同一个…

Python文件处理--进阶

Python标准库文件操作相关模块&#xff1a; 1.使用pickle序列化 序列化指的是&#xff1a;将对象转化成“串行化”数据形式&#xff0c;存储到硬盘或通过网络传输到其他地方。反序列化是指相反的过程&#xff0c;将读取到的“串行化数据”转化成对象。我们可以使用pickle模块…

算法部署 | 使用TensorRT部署AlphaPose姿态估计算法

项目应用场景 面向 AlphaPose 姿态估计算法的推理加速场景&#xff0c;项目采用 TensorRT 进行 GPU 算法加速推理。 项目效果 项目细节 > 具体参见项目 README.md (1) 模型转换 python pytorch2onnx.py --cfg ./configs/coco/resnet/256x192_res50_lr1e-3_1x.yaml --chec…

基于Springboot的简历系统

基于SpringbootVue的简历系统的设计与实现 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringbootMybatis工具&#xff1a;IDEA、Maven、Navicat 系统展示 用户登录 首页 简历模板 招聘会 求职论坛 系统公告 后台登录 后台首页 用户管理 简历模板 模板…

面试八股——JVM★

类加载 类加载器的定义 类加载器的类别 类装载的执行过程 类的装载过程&#xff1a; 加载&#xff1a; 验证&#xff1a; 准备&#xff1a; 这里设置初始值并不是传统意义的设置初始值&#xff08;那个过程在初始化阶段&#xff09;。 解析&#xff1a; 初始化&#xff1a; …

YOLOv8改进 | Conv篇 | CVPR2024最新DynamicConv替换下采样(包含C2f创新改进,解决低FLOPs陷阱)

一、本文介绍 本文给大家带来的改进机制是CVPR2024的最新改进机制DynamicConv其是CVPR2024的最新改进机制&#xff0c;这个论文中介绍了一个名为ParameterNet的新型设计原则&#xff0c;它旨在在大规模视觉预训练模型中增加参数数量&#xff0c;同时尽量不增加浮点运算&#x…

servlet的三个重要的类(httpServlet 、httpServletRequst、 httpServletResponse)

一、httpServlet 写一个servlet代码一般都是要继承httpServlet 这个类&#xff0c;然后重写里面的方法 但是它有一个特点&#xff0c;根据之前写的代码&#xff0c;我们发现好像没有写main方法也能正常执行。 原因是&#xff1a;这个代码不是直接运行的&#xff0c;而是放到…

气象观测站点数据下载与处理

一、下载途径 全国400多个气象站气候数据&#xff08;1942-2022&#xff09; 王晓磊&#xff1a;中国空气质量/气象历史数据 | 北京市空气质量历史数据 气象数据免费下载网站整理 中国气象站观测的气象数据怎么下载 二、R语言处理 2.1 提取站点文件 library(dplyr) library(…

(助力国赛)数学建模可视化!!!含代码1(折线图、地图(点)、地图(线)、地图(多边形)、地图(密度)、环形图、环形柱状图、局部放大图)

众所周知&#xff0c;数学建模的过程中&#xff0c;将复杂的数据和模型结果通过可视化图形呈现出来&#xff0c;不仅能够帮助我们更深入地理解问题&#xff0c;还能够有效地向评委展示我们的研究成果。   今天&#xff0c;作者将与大家分享8种强大的数学建模可视化图形及其在…

解决VirtualBox虚拟机启动失败的问题

一.出现的问题&#xff08;未能启动虚拟电脑&#xff0c;由于物理网卡未找到&#xff09; 一、错误信息分析 “未能启动虚拟电脑&#xff0c;由于物理网卡未找到”&#xff1a;这个错误通常是由于VirtualBox无法识别或连接到物理网卡造成的。可能是由于驱动程序问题、网络设置错…

2024年阿里云最便宜的轻量应用服务器与云服务器价格及优惠购买入口

2024年&#xff0c;阿里云推出了几款价格最便宜的轻量应用服务器和云服务器&#xff0c;其中轻量应用服务器2核2G3M公网带宽50GB 高效云盘活动价格61元1年&#xff0c;经济型e实例4核16G10M公网带宽100G ESSD Entry云盘活动价格最低只要30元/1个月&#xff0c;通用算力型u1实例…

pyskl手势/动作识别的实现与pytorch cuda环境部署保姆教程

恭喜你&#xff0c;找到这篇不需要翻墙就能够成功部署的方法。在国内布置这个挺麻烦的&#xff0c;其他帖子会出现各种问题不能完全贯通。便宜你了。。 实话5年前我用1080训练过一个基于卷积和ltsm的手势识别&#xff0c;实话实说感觉比现在效果好。是因为现在的注意力都在tra…

【Spring】Spring MVC入门

Spring MVC入门 一、什么是Spring Web MVC&#xff1f; 1.1 MVC定义 MVC是Model View Controller的缩写&#xff0c;是一种软件架构的设计模式&#xff0c;将软件系统分为模型、视图、控制器三个部分。 示意图如下: 可以看到&#xff0c;Controller作为一个“粘合剂”处于M…

Hadoop——Yarn基础架构

Hadoop——Yarn基础架构 Hadoop YARN&#xff08;Yet Another Resource Negotiator&#xff09;是Apache Hadoop生态系统中的一个子项目&#xff0c;它是用于集群资源管理的框架&#xff0c;负责为运算程序提供服务器运算资源&#xff0c;相当于一个分布式的操作系统平台&…

Mac 利用Homebrew安装JDK

一、安装JDK17 1.安装openjdk17 2.把homebrew安装的openjdk17软链接到系统目录&#xff1a; brew install openjdk17 sudo ln -sfn $(brew --prefix)/opt/openjdk17/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk-17.jdk 一、检查是否安装成功 在Termina…

基于SSM的列车订票管理系统(含源码+sql+视频导入教程+文档+PPT)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1 、功能描述 基于SSM的列车订票管理系统3拥有两种角色&#xff1b;管理员、用户 管理员&#xff1a;用户管理、车票管理、购票指南管理、系统管理等 用户&#xff1a;发布帖子、登录注册、购票等 1.…

Spring Boot 整合 Mockito:提升Java单元测试的高效实践

引言 在Java开发领域&#xff0c;Spring Boot因其便捷的配置和强大的功能而受到广泛欢迎&#xff0c;而Mockito作为一款成熟的单元测试模拟框架&#xff0c;则在提高测试质量、确保代码模块间解耦方面扮演着至关重要的角色。本文将详细介绍如何在Spring Boot项目中整合Mockito&…

千锤百炼算法系列之动态规划

题外话 这段时间,我必须把算法弄明白 这篇直接讲解动态规划所有细节! 前面那篇 千锤百炼之每日算法(一)-CSDN博客 也有关于动态规划的讲解,也非常详细 很简单,我成尊不就是了?!!! 正题 动态规划 这里我们主要是让大家明白什么是动态规划,怎么用动态规划解题 我就不用…
最新文章