RabbitMQ Java开发教程(二)—官方原版

一、通道和并发注意事项(线程安全)

应避免在线程之间共享通道实例。应用程序应该每个线程使用一个通道,而不是在多个线程之间共享同一个通道。

虽然通道上的一些操作可以安全地同时调用,但有些操作则不然,并且会导致线路上不正确的帧交织、双重确认等。

共享通道上的并发发布可能会导致连线上不正确的帧交错,从而触发连接级协议异常,并由代理立即关闭连接。因此,它需要在应用程序代码中进行显式同步(必须在关键部分调用Channel#basicPublish)。线程之间共享通道也会干扰Publisher Confirms。最好完全避免在共享通道上并发发布,例如每个线程使用一个通道。

可以使用通道池来避免在共享通道上并发发布:一旦一个线程处理完一个通道,它就会将其返回到池中,使该通道可供另一个线程使用。信道池可以被认为是一种特定的同步解决方案。建议使用现有的池库,而不是自行开发的解决方案。例如,Spring AMQP,它附带了一个可随时使用的通道池功能。

通道消耗资源,在大多数情况下,应用程序很少需要在同一JVM进程中打开数百个以上的通道。如果我们假设应用程序为每个通道都有一个线程(因为通道不应该同时使用),那么单个JVM的数千个线程已经是可以避免的相当大的开销。此外,一些快速发布者可以很容易地使网络接口和代理节点饱和:发布所需的工作比路由、存储和传递消息所需的工作量少。

要避免的一个经典反模式是为每个发布的消息打开一个通道。信道应该是相当长寿的,而打开一个新的信道是一个网络往返,这使得这种模式效率极低。

在一个线程中消费并在共享通道上的另一个线程上发布是安全的。

服务器推送的交付(请参阅下面的部分)是在保证保留每个通道的订购的同时进行调度的。调度机制使用java.util.concurrent.ExecutorService,每个连接一个。可以提供一个自定义执行器,该执行器将由单个ConnectionFactory使用ConnectionFactory#setSharedExecutor setter生成的所有连接共享。

当使用手动确认时,重要的是要考虑是哪个线程进行确认。如果它与接收传递的线程不同(例如,Consumer#handleDelivery将传递处理委托给不同的线程),将多个参数设置为true进行确认是不安全的,并且会导致双重确认,从而导致关闭通道的通道级协议异常。一次确认一条信息是安全的。

二、通过订阅接收消息(“PUSH API”)

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

接收消息的最有效方法是设置 使用使用者界面进行订阅。然后消息将被传递 在他们到达时自动,而不必 明确要求。

调用与使用者相关的 API 方法时,单个订阅是 总是由他们的消费者标签引用。消费者标签是消费者 标识符,可以是客户端生成的,也可以是服务器生成的。要让 RabbitMQ 生成节点范围的唯一标签,使用 Channel#basicConsumption 覆盖 不接受消费者标签参数或传递空字符串 ,并使用 Channel#basicConsumption 返回的值。 消费者标签用于取消消费者。

不同的使用者实例必须具有不同的 消费者标记。连接上的重复使用者标记是 强烈建议不要使用,并可能导致自动问题 连接恢复和混淆监控数据时 消费者受到监控。

实现消费者的最简单方法是 子类 便利类 DefaultConsumer。 可以通过 basicConsumption 调用传递此子类的对象以设置订阅:

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.getContentType();
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });

这里,由于我们指定了autoAck=false,因此有必要确认传递给Consumer的消息,这在handleDelivery方法中最方便,如图所示。

更复杂的消费者将需要推翻进一步的方法。特别是,当通道和连接关闭时,将调用handleShutdownSignal,并且在调用对该consumer的任何其他回调之前,将向handleConsumeOk传递consumer标记。

消费者还可以实现handleCancelOk和handleCancel方法,分别获得显式和隐式取消的通知。

您可以使用Channel.basicCancel明确取消特定消费者:

channel.basicCancel(consumerTag);

传递消费者标签。

就像发行商一样,考虑消费者的并发危害安全也是很重要的。

对消费者的回调是在一个线程池中调度的,该线程池与实例化其通道的线程分开。这意味着消费者可以安全地在Connection或Channel上调用阻塞方法,如Channel#queueDeclare或Channel#basicCancel。

每个通道将按照RabbitMQ发送的顺序,将所有交付发送到其上的Consumer处理程序方法。无法保证渠道之间的交货顺序:这些交货可以并行发送。

对于每个频道一个消费者的最常见用例,这意味着消费者不会阻碍其他消费者。由于每个频道有多个消费者,请注意,长时间运行的消费者可能会阻碍向该频道上的其他消费者发送回调。

三、检索单个消息(“pull API”)

也可以按需检索单个消息(“pull API”,即轮询)。 这种消费方法效率非常低,因为它实际上是轮询 并且应用程序必须反复要求结果,即使绝大多数请求 没有结果。因此,强烈建议不要使用此方法。

若要“拉取”消息,请使用 Channel.basicGet 方法。返回值为 GetResponse 实例,标头信息(属性)来自该实例 并且可以提取消息正文:

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
    // No message retrieved.
} else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag();
    // ...

由于此示例使用手动确认(上面的 autoAck = false), 您还必须调用 Channel.basicAck 以确认您已成功收到消息:

channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message

四、处理不可路由的消息

如果发布消息时设置了“强制”标志, 但无法路由,经纪人会将其返回给 发送客户端(通过 AMQP。基本返回命令)。

要收到此类返回的通知,客户端可以实现 ReturnListener 接口并调用 Channel.addReturnListener。 如果客户端尚未为特定通道配置返回侦听器, 然后,关联的返回消息将被静默丢弃。

channel.addReturnListener(new ReturnListener() {
    publicvoidhandleReturn(int replyCode,
                                  String replyText,
                                  String exchange,
                                  String routingKey,
                                  AMQP.BasicProperties properties,
                                  byte[] body)throws IOException {
        ...
    }
});

将调用返回侦听器,例如,如果客户端发布消息 “强制”标志设置为未绑定到队列的“直接”类型的交换。

五、关断协议

1、客户端关闭过程概述

AMQP 0-9-1 连接和通道共享相同的常规 管理网络故障、内部故障的方法、 和显式本地关闭。

AMQP 0-9-1 连接和通道具有以下生命周期状态:

  • 打开:对象已准备就绪,可供使用

  • 关闭:对象已显式 通知本地关闭,已发出关闭 请求任何支持的下层对象,并且 等待其关闭程序完成

  • 已关闭:对象已收到全部 来自任何较低层的关机完成通知 对象,并因此自行关闭

这些对象总是以关闭状态结束, 无论导致关闭的原因是什么,例如 应用程序请求,内部客户端库 故障、远程网络请求或网络故障。

连接和通道对象具有 以下与关机相关的方法:

  • addShutdownListener(ShutdownListener listener) 和

  • removeShutdownListener(ShutdownListener listener)),以管理任何侦听器,这将 在对象转换到关闭状态时触发。请注意,添加 关闭已关闭对象的侦听器 会立即解雇侦听器

  • getCloseReason(), 以允许 调查物体的原因是什么 关闭

  • isOpen(),用于测试是否 对象处于打开状态

  • close(int closeCode, String closeMessage),用于显式通知对象 要关闭

侦听器的简单用法如下所示:

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause)
    {
        ...
    }
});

2、有关关闭情况的信息

可以检索 ShutdownSignalException,其中包含所有 有关关闭原因的可用信息,或者 通过显式调用 getCloseReason() 方法或使用 ShutdownListener 类的服务(ShutdownSignalException cause) 方法。

类提供 分析关机原因的方法。由 调用 isHardError() 方法我们得到 信息是连接还是通道 错误,getReason() 返回信息 关于原因,以AMQP方法的形式 - 要么AMQP。Channel.Close 或 AMQP。Connection.Close (如果原因为 null,则为 null 是库中的一些例外,例如网络 通信失败,在这种情况下,该异常可以 使用 getCause()) 检索。

public void shutdownCompleted(ShutdownSignalException cause)
{
  if (cause.isHardError())
  {
    Connection conn = (Connection)cause.getReference();
    if (!cause.isInitiatedByApplication())
    {
      Method reason = cause.getReason();
      ...
    }
    ...
  } else {
    Channel ch = (Channel)cause.getReference();
    ...
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/882.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

第三代api自动化测试框架使用教程(pytest+allure+sql+yaml)

使用教程一、配置1、环境配置2、框架配置3、启动入口二、用例编写1、用例模板2、参数依赖写法2、函数(方法插件)写法3、接口上传文件和表单参数4、接口上传json参数5、接口无数据填写6、code断言7、body断言7、json断言8、sql断言9、完整断言写法&#x…

TCP UDP详解

文章目录TCP UDP协议1. 概述2. 端口号 复用 分用3. TCP3.1 TCP首部格式3.2 建立连接-三次握手3.3 释放连接-四次挥手3.4 TCP流量控制3.5 TCP拥塞控制3.6 TCP可靠传输的实现3.7 TCP超时重传4. UDP5.TCP与UDP的区别TCP UDP协议 1. 概述 TCP、UDP协议是TCP/IP体系结构传输层中的…

手把手的教你安装PyCharm --Pycharm安装详细教程(一)(非常详细,非常实用)

简介 Jetbrains家族和Pycharm版本划分: pycharm是Jetbrains家族中的一个明星产品,Jetbrains开发了许多好用的编辑器,包括Java编辑器(IntelliJ IDEA)、JavaScript编辑器(WebStorm)、PHP编辑器&…

C/C++考试必考题目(含答案*仅供参考)

今天继续来分享几个C经常考试的几道题目,大家快快拿去,赶紧做一下 目录 (小事一桩)约瑟夫问题 discreb input output 效果展示: 1、 猜价格游戏 2、 计算 N 以内的所有素数 3、 袋中取球 4、 乘法口诀表 …

尚医通-(三十三)就诊人管理功能实现

目录: (1)前台用户系统-就诊人管理-需求说明 (2)就诊人管理-接口开发-列表接口 (3)就诊人管理-接口开发-其他接口 (4)前台用户系统-就诊人管理-前端整合 &#xff0…

react的基础使用

react中为什么使用jsxReact 认为渲染逻辑本质上与其他 UI 逻辑内在耦合,比如,在 UI 中需要绑定处理事件、在某些时刻状态发生变化时需要通知到 UI,以及需要在 UI 中展示准备好的数据。react认为将业务代码和数据以及事件等等 需要和UI高度耦合…

竞赛无人机搭积木式编程——以2022年TI电赛送货无人机一等奖复现为例学习(7月B题)

在学习本教程前,请确保已经学习了前4讲中无人机相关坐标系知识、基础飞行控制函数、激光雷达SLAM定位条件下的室内定点控制、自动飞行支持函数、导航控制函数等入门阶段的先导教程。 同时用户在做二次开发自定义的飞行任务时,可以参照第5讲中2021年国赛植…

【uniapp小程序实战】—— 使用腾讯地图获取定位

文章目录🍍前言🍋正文1、首先看官网uni.getLocation(OBJECT)#注意2、腾讯位置服务平台申请密钥和下载SDK2.1 申请开发者秘钥2.2 开通webserviceAPI服务2.3 下载微信小程序JavaScriptSDK2.4 安全域名设置3、配置manifest.json文件4、示例代码展示4.1 引用…

面试重难点问题(C++)

持续更新!!!!! 网络部分 1.问,四次挥手的过程,和双方状态变化? 挥手这前,两边都是established状态,客户端发起断开请求,向服务器发送fin请求&…

Docker6种网络配置详解,网络模式应该这么选

文章目录一、Bridge网络模式二、Host网络模式三、Overlay网络模式四、None网络模式五、Macvlan网络模式六、Ipvlan网络模式七、网络模式选择在Docker中,网络配置是一个重要的主题,因为容器需要与其他容器或外部网络进行通信。Docker提供了多种网络模式和…

注意下C语言整形提升

C语言整形提升 C语言整形提升是指在表达式中使用多种类型的数据时,编译器会自动将较小的类型转换为较大的类型,以便进行运算。在C语言中,整型提升规则如下: 如果表达式中存在short类型,则将其自动转换为int类型。 如…

【JavaEE】初识线程

一、简述进程认识线程之前我们应该去学习一下“进程" 的概念,我们可以把一个运行起来的程序称之为进程,进程的调度,进程的管理是由我们的操作系统来管理的,创建一个进程,操作系统会为每一个进程创建一个 PCB&…

C++之深浅拷贝

一、浅拷贝 我们看下以下代码 Test.h 文件 #pragma once #include<iostream> using namespace std; class Student { public:Student(){}~Student(){if (m_Id ! nullptr){delete m_Id;m_Id nullptr;}}Student(int id, string strName){m_Id new int[id];m_strName s…

字符函数和字符串函数(上)-C语言详解

CSDN的各位友友们你们好,今天千泽为大家带来的是C语言中字符函数和字符串函数的详解,掌握了这些内容能够让我们更加灵活的运用字符串,接下来让我们一起走进今天的内容吧!写这篇文章需要在cplusplus.com上大量截图,十分不易!如果对您有帮助的话希望能够得到您的支持和帮助,我会持…

信号处理-小波变换4-DWT离散小波变换概念及离散小波变换实现滤波

连续小波变换的适用场景&#xff1a;能够获取某一段信号的瞬时信息、时频信息 缺点&#xff1a;计算量大&#xff0c;无法进行数据压缩- 针对连续小波存在的缺点提出离散小波变换 离散小波变换 离散小波变换 分解过程&#xff1a;&#xff08;离散2进正交&#xff09; cD1: …

数据结构与算法——栈和队列<也不过如此>

&#x1f3c6;作者主页&#xff1a;king&南星 &#x1f384;专栏链接&#xff1a;数据结构 &#x1f3c5;文章目录一、&#x1f947;栈1、&#x1f948;概念理解2、&#x1f948;链表头插头删实现栈1、&#x1f949;预备准备2、&#x1f949;创建结点函数3、&#x1f949;遍…

SPI读写SD卡速度有多快?

SD卡是一个嵌入式中非常常用的外设&#xff0c;可以用于存储一些大容量的数据。但用单片机读写SD卡速度一般都有限&#xff08;对于高速SD卡&#xff0c;主要是受限于单片机本身的接口速度&#xff09;&#xff0c;在高速、实时数据存储时可能会有影响。但具体速度可以达到多少…

vue2+高德地图web端开发使用

创建vue2项目我们创建一个vue2项目&#xff0c;创建vue2项目就不用再多说了吧&#xff0c;使用“vue create 项目名 ”创建即可注册高德地图高德地图官网地址&#xff1a;https://lbs.amap.com/如果是第一次使用&#xff0c;点击注册然后进入我们的控制台注册完之后进入控制台&…

<Linux>计算机体系结构和操作系统

计算机体系结构(冯 • 诺依曼体系)和操作系统&#xff08;Operator System&#xff09; 文章目录计算机体系结构(冯 • 诺依曼体系)和操作系统&#xff08;Operator System&#xff09;一、冯 • 诺依曼体系结构1.存储器&#xff08;内存&#xff09;2.运算器和控制器&#xff…

系统重装漏洞

zzcms系统重装漏洞 一、配置zzcms环境 1. 使用小皮搭建zzcms框架 2. 安装zzcms 按照下面的操作进行,傻瓜式操作即可 3. 打开网站 二、漏洞利用 在访问install目录的默认文件后,会出现zzcms安装向导 http://www.zzcms.com/install/index.php 但是会显示 “安装向导…
最新文章