RabbitMQ入门实战

文章目录

  • RabbitMQ入门实战
    • 基本概念
    • 安装
    • 快速入门
      • 单向发送
      • 多消费者

RabbitMQ入门实战

官方:https://www.rabbitmq.com

基本概念

AMQP协议:https://www.rabbitmq.com/tutorials/amqp-concepts.html
定义:高级信息队列协议(Advanced Message Queue Protocol)
生产者:发消息到某个交换机
消费者:从某个队列中取信息
交换机(Exchange):负责把消息转发到对应的队列
队列(Queue):存储消息
路由(Routes):将消息从一个地方转发到另一个地方
AMQP模型
image.png

安装

windows 安装:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.12.0image.png
安装 erlang 25.3.2(因为 RabbitMQ 依赖 erlang,不安装这个安装RabbitMQ会报错),这个语言的性能非常高。
erlang 下载:https://www.erlang.org/patches/otp-25.3.2
image.png
安装完 erlang 后,安装 rabbitmq 即可。
win + r 打开 services.msc(服务菜单),查看 rabbitmq 服务是否已启动:


安装 rabbitmq 监控面板:
在 rabbitmq 安装目录的 sbin 中执行下述脚本:
D:\software\rabbitmq\rabbitmq_server-3.12.0\sbin

rabbitmq-plugins.bat enable rabbitmq_management

image.png
访问:http://localhost:15672,用户名密码都是 guest:
image.png
如果想要在远程服务器安装访问 rabbitmq 管理面板,你要自己创建一个管理员账号,不能用默认的 guest,否则会被拦截(官方出于安全考虑)。

如果被拦截,可以自己创建管理员用户:
参考文档的 Adding a User:https://www.rabbitmq.com/access-control.html

rabbitmq 端口占用:
5672:程序连接的端口
15672:webUI

快速入门

单向发送

一个生产者给一个队列发消息,一个消费者从这个队列取消息,一对一

引入消息队列 Java 客户端:

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.17.0</version>
</dependency>

生产者代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

Channel频道:理解为操作消息队列的client,提供了和消息队列server建立通信的方法(为了复用连接,提高传输效率)。程序通过channel操作rabbitmq
创建消息队列:
参数:
queueName:消息队列名称(注意,同名称的消息队列,只能用同样的参数创建一次)
durabale:消息队列重启后,消息是否丢失
exclusive:是否只允许当前这个创建消息队列的连接操作消息队列
autoDelete:没有人用队列后,是否要删除队列
执行程序后,可以看到有 1 条消息:

消费者代码:

public class SingleConsumer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 定义了如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        // 消费消息,会持续阻塞
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

启动消费者后,可以看到消息被消费了

多消费者

官方教程:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
场景:多个机器同时去接受并处理任务(尤其是每个机器的处理能力有限)
一个生产者给一个队列发消息,多个消费者从这个队列中取消息

1)队列持久化
durable参数设置为true,服务器重启后队列不丢失:

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

2)消息持久化
指定MessageProperties.PERSISTENT_TEXT_PLAIN 参数:

channel.basicPublish("", TASK_QUEUE_NAME,
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes("UTF-8"));

生产者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.util.Scanner;

public class MultiProducer {
    //队列名字
  private static final String TASK_QUEUE_NAME = "multi_queue";

  public static void main(String[] argv) throws Exception {
      //建立连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        //创建队列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.nextLine();
            //在指定的交换机上发布消息到TASK_QUEUE_NAME的队列中,使用 MessageProperties.PERSISTENT_TEXT_PLAIN将消息标为持久化,最后将消息体转换为字节数组
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
  }

}

消费者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class MultiConsumer {

    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        // 建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        for (int i = 0; i < 2; i++) {
            final Channel channel = connection.createChannel();

            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            //消费者每次只能处理一个消息
            channel.basicQos(1);

            // 定义了如何处理消息
            int finalI = i;
            // 创建消息接收回调函数,以便接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                try {
                    // 处理工作
                    System.out.println(" [x] Received '" + "编号:" + finalI + ":" + message + "'");
                    // 停 20 秒,模拟机器处理能力有限
                    Thread.sleep(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                    // 手动发送应答,告诉RabbitMQ消息已经被处理
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            // 开始消费消息,传入队列名称,是否自动确认,投递回调和消费者取消回调
            channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        }
    }
}

这样你就可以跑通你的第一个RabbitMQ并且了解了单向发送和多消费者两种方式,下期分享RabbitMQ一个重要的概念—交换机

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/557598.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ORA-600 ktsiseginfo1故障---惜分飞

oracle 9i的库在运行途中突然报ORA-600 kcbnew_3错误 Sun Mar 31 14:25:11 2024 Undo Segment 69 Onlined Sun Mar 31 14:25:11 2024 Created Undo Segment _SYSSMU69$ Sun Mar 31 14:25:11 2024 Created Undo Segment _SYSSMU70$ Undo Segment 70 Onlined Sun Mar 31 14:28:41…

开启Three.js之旅(会持续完善)

文章目录 Three.js必备构建项目场景Scene相机CameraPerspectiveCamera 渲染器WebGLRendererCSS3DRenderer 灯光LightAmbientLightDirectionalLight 平行光PointLight 加载器CacheFileLoaderLoaderGLTFLoaderRGBELoaderTextureLoader 材质MetarialMeshBasicMaterialMeshLambertM…

【C++程序员的自我修炼】拷贝构造函数

心存希冀 追光而遇目有繁星 沐光而行 目录 拷贝构造函数概念 拷贝构造的特征 无穷递归的解释 浅拷贝 总结&#xff1a; 深拷贝 拷贝构造函数典型调用场景 总结 契子✨ 在生活中总有很多琐事&#xff0c;不做不行做了又怕麻烦&#xff0c;有时候想要是有个和自己一模一样的人就…

机器学习和深度学习-- 李宏毅(笔记于个人理解)Day 21

Day 21 Self- Attention 选修部分 ​ 学完自适应 再回来看看 Sequence Labling 假如我们现在有一个需要读完全部句子才能解的问题&#xff0c; 那么red window 就需要变得是最大的&#xff08;最长的句子&#xff09;&#xff1b; 其实这里大家有没有想过&#xff0c;这个玩意…

【机器学习】数据变换---小波变换特征提取及应用案列介绍

引言 在机器学习领域&#xff0c;数据变换是一种常见且重要的预处理步骤。通过对原始数据进行变换&#xff0c;我们可以提取出更有意义的特征&#xff0c;提高模型的性能。在众多数据变换方法中&#xff0c;小波变换是一种非常有效的方法&#xff0c;尤其适用于处理非平稳信号和…

maridb双数据源联查解决方案:联合存储引擎(Federated Storage Engine)

本地MySQL数据库要访问远程MySQL数据库的表中的数据, 必须通过FEDERATED存储引擎来实现. 有点类似Oracle中的数据库链接(DBLINK)。使用FEDERATED存储引擎的表,本地只存储表的结构信息,数据都存放在远程数据库上,查询时通过建表时指定的连接符去获取远程库的数据返回到本地。操作…

爬虫机试题-爬取新闻网站

之前投简历时遇到了这样的一个笔试。本以为会是数据结构算法之类的没想到直接发了一个word直接提需求&#xff0c;感觉挺有意思就写了这篇文章&#xff0c;感兴趣的朋友可以看看。 拿到urllist 通过分析页面结构我们得以知道&#xff0c;这个页面本身没有新闻信息&#xff0c;是…

计算机软考流程介绍

笔者来介绍一下软考流程 1、考试简介 计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试&#xff1a;简称 计算机软考 认证&#xff1a; 国家人力资源和社会保障部 国家工业和信息化部 目的&#xff1a; 科学、公正地对全国计算机与软件专业技术人员进行职业资格…

Hotcoin 热门资产上新速报:以太坊互操作性基础设施Omni Network(OMNI)

Hotcoin持续为全球600万用户发掘优质潜力资产&#xff0c;热门币种交易上热币。一文快速了解今日上新资产:Omni Network&#xff08;OMNI&#xff09; 推荐指数 8.4 交易对 OMNI/USDT 交易时间 4月17日 GMT8 20&#xff1a;30 资产赛道 Layer1 项目简介 Omni 是以太坊…

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台级联时,下级平台未发流是什么原因?

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…

黑洞路由、 DDoS 攻击 、 环路

黑洞路由 DDoS 攻击 DDoS 攻击是一种针对服务器、服务或网络的恶意行为。DDoS 攻击通过向目标发送大量流量&#xff0c;使其不堪重负&#xff0c;导致资源和带宽被耗尽。因此&#xff0c;目标可能会变慢或崩溃&#xff0c;无法正常处理合法的流量。DDoS 攻击通常是由僵尸网络…

Jmeter 性能-内存溢出问题定位分析

1、堆内存溢出 ①稳定性压测一段时间后&#xff0c;Jmeter报错&#xff0c;日志报&#xff1a; java.lang.OutOfMemoryError.Java heap space ②用jmap -histo pid命令dump堆内存使用情况&#xff0c;查看堆内存排名前20个对象。 看是否有自己应用程序的方法&#xff0c;从…

CentOS7下安装mysql8或者mysql5.7

mysql8 1、下载 访问mysql官网下载mysql8软件包 https://dev.mysql.com/downloads/mysql/ 选择相应的版本如&#xff1a;RPM Bundle mysql-8.0.33-1.el7.x86_64.rpm-bundle.tar RPM Bundle 8.0.33 下载地址&#xff1a;https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.…

电脑桌面便签软件哪个好?好用的电脑桌面便签

电脑作为我们日常工作的重要工具&#xff0c;承载着大量的任务和项目。当工作任务繁重时&#xff0c;如何在电脑桌面上高效管理这些任务就显得尤为重要。这时&#xff0c;选择一款优秀的桌面便签软件&#xff0c;无疑会给我们带来极大的便利。 一款好的桌面便签软件&#xff0…

【React】Ant Design自定义主题风格及主题切换

Ant Design 的自定义主题&#xff0c;对于刚入手的时候感觉真是一脸蒙圈&#xff0c;那今天给它梳理倒腾下&#xff1b; 1、自定义主题要点 整体样式变化&#xff0c;主要两个部分&#xff1a; 1.1、Design Token https://ant.design/docs/react/customize-theme-cn#theme 官…

ffmpeg入门

ffmpeg入——安装 Fmpeg地址 FFmpeg源码地址&#xff1a;https://github.com/FFmpeg/FFmpeg FFmpeg可执行文件地址&#xff1a;https://ffmpeg.org/download.html Windows平台 Windows平台下载解压后如图所示&#xff08;文件名称以-share结尾的是开发库&#xff09; FFmpeg…

Eagle for Mac v1.9.13注册版:强大的图片管理工具

Eagle for Mac是一款专为Mac用户设计的图片管理工具&#xff0c;旨在帮助用户更高效、有序地管理和查找图片资源。 Eagle for Mac v1.9.13注册版下载 Eagle支持多种图片格式&#xff0c;包括JPG、PNG、GIF、SVG、PSD、AI等&#xff0c;无论是矢量图还是位图&#xff0c;都能以清…

AndroidStudio AGP 7+, 编译aar并输出到本地仓库

1 编写构建gradle脚本代码 1.1 配置publication和repository 在指定moudle目录下新建名为"maven-publish.gradle"文件&#xff0c;其声明的publication和repository如下所示&#xff1a; apply plugin: maven-publish// This creates a task called publishReleas…

《星光对话》系列直播:带你入门数据要素

2020年12月9日&#xff0c;财政部提出企业数据资源可作为资产列入财务报表&#xff0c;打响数据要素“1N”的第一枪&#xff1b; 2022年12月2日&#xff0c;《关于构建数据基础制度更好发挥数据要素作用的意见》“数据二十条”通过提出构建数据产权、流通交易、收益分配、安全治…

维护SQLite的私有分支(二十六)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;SQLite、MySQL 和 PostgreSQL 数据库速度比较&#xff08;本文阐述时间很早比较&#xff0c;不具有最新参考性&#xff09;&#xff08;二十五&#xff09; 下一篇&#xff1a;SQLite数据库中JSON 函数和运算符 1…