RabbitMQ简单模式和工作模式

RabbitMQ 是一个消息队列中间件,用于在分布式系统中进行消息传递。在 RabbitMQ 中,有几种工作模式,其中简单模式和工作模式是其中两种基本的模式之一。

  1. 简单模式(Simple Mode):

    • 在简单模式中,有一个生产者(Producer)将消息发送到一个队列(Queue)中,然后有一个消费者(Consumer)从队列中接收并处理消息。
    • 这是最基本的消息队列模式,适用于单个生产者和单个消费者的场景。
    • 生产者将消息发送到队列,而消费者从队列中接收并处理消息,消息的传递是单向的。
  2. 工作模式(Work Queue Mode):

    • 工作模式也被称为竞争消费者模式。在这种模式下,有多个消费者监听同一个队列,但每条消息只能被其中一个消费者接收和处理。
    • 当消息被发送到队列时,它将被发送给下一个空闲的消费者,从而实现消息的分发和并发处理。
    • 这种模式对于处理大量工作的情况很有用,可以通过增加消费者的数量来提高消息处理的速度。

在 RabbitMQ 中,简单模式和工作模式的实现通常使用一些基本的概念,包括生产者、消费者、队列和消息。生产者负责发送消息到队列,而消费者则负责从队列中接收和处理消息。

下面是一个使用 RabbitMQ 和 Node.js(使用 amqplib 库)以及 TypeScript 实现工作模式的简单示例。在这个例子中,我们将使用 amqplib 库来连接 RabbitMQ 服务器,并使用 TypeScript 来编写代码。

首先,确保你已经安装了 amqplib 库。可以使用以下命令进行安装:

npm install amqplib

接下来,创建一个生产者和一个消费者的 TypeScript 文件。以下是示例代码:

producer.ts:

import * as amqp from 'amqplib';

async function produce() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'work_queue';

  await channel.assertQueue(queue, { durable: true });

  for (let i = 0; i < 10; i++) {
    const message = `Message ${i}`;
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
    console.log(` [x] Sent '${message}'`);
  }

  setTimeout(() => {
    connection.close();
    process.exit(0);
  }, 500);
}

produce();

consumer.ts:

import * as amqp from 'amqplib';

async function consume() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'work_queue';

  await channel.assertQueue(queue, { durable: true });
   // 设置每次只处理一个消息[平均分配的概念,不会让一个work太忙和太闲]
    //这一行代码的作用是告诉 RabbitMQ 不要在消费者未确认(ack)之前向其发送新的消息
  await channel.prefetch(1);

  console.log(' [*] Waiting for messages. To exit press CTRL+C');

  await channel.consume(queue, async (msg) => {
    if (msg !== null) {
      const message = msg.content.toString();
      console.log(` [x] Received ${message}`);

      // Simulate some work
      await new Promise(resolve => setTimeout(resolve, 1000));

      console.log(' [x] Done');
      channel.ack(msg);
    }
  });
}

consume();

这个示例中,生产者将消息发送到名为 work_queue 的队列中,而消费者则监听该队列并处理消息。消费者使用 channel.prefetch(1) 来确保一次只接收一个消息,从而实现竞争消费者模式。

记得在运行前启动 RabbitMQ 服务器,并确保 TypeScript 文件已编译成 JavaScript。你可以使用以下命令进行编译:

tsc producer.ts
tsc consumer.ts

然后,分别运行 producer.jsconsumer.js。这样你就可以在 RabbitMQ 中看到消息的生产和消费过程。

RabbitMQ消息持久化和手动应答

在 RabbitMQ 中,消息持久化和手动应答是两个关键的概念,它们可以帮助确保消息的可靠传递和处理。下面简要介绍这两个概念:

  1. 消息持久化(Message Durability):

    • 默认情况下,RabbitMQ 中的消息是瞬时的,也就是说,如果 RabbitMQ 服务器停止或崩溃,所有未处理的消息都会丢失。
    • 通过将消息标记为持久化,你可以确保消息在 RabbitMQ 服务器重启后仍然可用。要实现消息持久化,需要在发送消息时设置消息的 deliveryMode 属性为 2persistent)。
    • 例如,在生产者端设置消息为持久化:
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
    
    • 在消费者端,你需要确保队列和消息都被声明为持久化:
    channel.assertQueue(queue, { durable: true });
    

    这样,即使 RabbitMQ 服务器重启,持久化的消息也不会丢失。

  2. 手动应答(Manual Acknowledgment):

    • 默认情况下,RabbitMQ 使用自动应答(auto-acknowledgment)模式,即一旦消息被传递给消费者,RabbitMQ 就将其标记为已处理。
    • 在某些情况下,你可能需要更细粒度的控制,以确保消息在被消费者完全处理之后才被标记为已处理。这就是手动应答的用途。
    • 在消费者端,需要将 noAck 设置为 false,表示手动应答模式:
channel.consume(queueName, async (msg: Message | null) => {
  if (msg) {
    const data: EmailTask = JSON.parse(msg.content.toString());
    console.log('Processing mail task:', msg.content.toString());
    try {
      //模拟邮件发送
      await new Promise(resolve => setTimeout(resolve, 1000));
      console.log(' [x] Done');
      channel.ack(msg);
    } catch (error) {
      console.log('error:', data);
      // 处理消息失败,判断是否需要进行重试
      if (canRetry(msg)) {
        // 重新入队,进行下一次尝试
        channel.reject(msg, true);
      } else {
        // 不进行重试,将消息从队列中移除
        channel.ack(msg);
      }
    }
  }
});

  • 在这种情况下,消费者需要在处理完消息后显式调用 channel.ack(msg) 来确认消息已被处理。如果消费者崩溃或在处理消息时发生错误,消息将保持在队列中,直到被明确确认。
  • 在 RabbitMQ 中,channel.reject 方法用于拒绝一条消息。它的参数如下channel.reject(msg, requeue);
    msg: 要拒绝的消息对象。
    requeue: 如果设置为 true,则被拒绝的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true。

综合使用消息持久化和手动应答,可以确保在面对不同情况时,消息的可靠传递和处理。

重试间隔和次数

在这里插入图片描述
在这里插入图片描述

  1. 重新投递消息并设置头部信息:

    • 在处理消息失败时,将消息重新投递到队列,并设置一个头部信息,例如 x-redelivered-count,用来记录消息的重试次数。
    • 在消费者端,根据这个头部信息来判断是否达到重试次数的上限,如果是,则不再重新投递,可能将消息放入死信交换机。
  2. 使用外部存储记录重试次数:

    • 每次消息处理失败时,将消息的唯一标识(例如 UUID)和重试次数记录到外部存储中(例如 Redis、Memcache、MySQL)。
    • 在消费者端,在每次重新处理时,从外部存储中获取当前重试次数,并判断是否达到重试次数的上限。
  3. 自定义插件:

    • 编写一个 RabbitMQ 插件,实现自定义的消息重试逻辑,包括记录重试次数、判断是否重新投递等。
    • 这样可以更灵活地控制消息的处理流程。

需要注意的是,这些方法都是基于 RabbitMQ 不直接提供重试次数限制的情况下的一些自定义实践。在回答中也提到了关于 quorum queues 的更新,以及支持通过策略(policy)来限制重投递次数的可能性。因此,具体的实现方式可能会随着 RabbitMQ 版本的更新而有所变化。

await channel.consume(queueName, async (msg: Message | null) => {
  if (msg) {
    const data: EmailTask = JSON.parse(msg.content.toString());
    let retryCount = msg.properties.headers['x-retry-count'] || 0;
    console.log('Processing message:', data);
    console.log('Retry count:', retryCount);
    try {
      if (data.to.includes('recipient1@example.com')) {
        throw new Error('邮件发送失败...');
      }
      //发送邮件
      await new Promise(resolve => setTimeout(resolve, 1000));
      channel.ack(msg);
    } catch (error) {
      console.log('error:', data);
      // 增加重试次数
      retryCount++;
      // 判断是否达到最大重试次数
      if (retryCount < maxRetryAttempts) {
        // 重新发送消息到队列
        channel.sendToQueue(queueName, msg.content, {
          persistent: true,
          headers: {
            'x-retry-count': retryCount,
          },
        });
      } else {
        // 不进行重试,将消息从队列中移除
        channel.ack(msg);
      }
    }
  }
});

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/349232.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++大学教程(第九版)6.48掷骰子游戏的改进

文章目录 题目代码运行截图 题目 (掷骰子游戏的改进)请修改图6.11 中的双游戏序允许家下赌注。 把序中运行掷骰子游戏的部分打包为一个函数。 初始化变量 bankBalance 为 1000美元。 提示玩家输入赌注数&#xff1a;wager。 利用一个 while 循环来检查 wager 是否小于或等于 b…

直线导轨运行不顺畅时怎么办?

为了确保直线导轨正常工作&#xff0c;确保设备的精度和稳定性&#xff0c;避免因此带来的生产损失和质量问题&#xff0c;需要及时处理直线导轨运行不顺畅或产生噪音等问题&#xff0c;今天我们就来详说如何解决直线导轨运行不顺畅。 1、长时间使用后&#xff0c;直线导轨表面…

达美乐3年亏9亿,披萨下沉能否“救市”?

“达门”在西北首店“出圈”。 作为中国首家西式快餐连锁品牌&#xff0c;达美乐于2023年12月24日在西安开出西北首店&#xff0c;单日销售额创造全球纪录32万。 此前&#xff0c;达势股份&#xff08;下称“达美乐中国”&#xff0c;01405.HK&#xff09;在港交所挂牌上市&a…

【Linux系统编程应用层开发目录】介绍Linux应用层开发的知识点和文章

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f440;专栏地址&#x1f440;&#xff1a;&#x1f680;Linux C语言&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、…

详细介绍 Go 中如何实现 bitset

文章目录 bitset 结构元素位置代码实现构造函数BitSet 的方法基础方法containsclearadd 集合方法computeSize方法定义intersectuniondifference 遍历集合的元素总结 最近尝试在 B 站录些小视频&#xff0c;我的 B 站主页。录视频当是为了彻底搞懂某个知识点的最后一步吧&#x…

向量数据库(1)

一、向量数据库 1&#xff0c;什么是向量数据库 专门存储和查询向量数据的数据库系统&#xff0c;通过高翔的向量索引和查询功能&#xff0c;使得在大规模向量数据集上进行相似性搜索和分析变得更高效和容易。 存储向量数据&#xff1a;处理百万或者十亿的大规模数据集向量索…

多符号表达式的共同子表达式提取教程

生成的符号表达式&#xff0c;可能会存在过于冗长的问题&#xff0c;且多个符号表达式中&#xff0c;有可能存在相同的计算部分&#xff0c;如果不进行处理&#xff0c;计算过程中会导致某些算式计算多次&#xff0c;从而影响计算效率。 那么多个符号表达式生成函数时&#xf…

基于一款热门大屏可视化设计器使用教程

乐吾乐大屏可视化设计器是一个用于创建和定制大屏幕数据可视化展示的工具&#xff0c;支持零代码实现物联网、工业智能制造等领域的可视化大屏、触摸屏端UI以及工控可视化的解决方案。同时也是一个Web组态工具&#xff0c;支持2D、3D等多种形式&#xff0c;用于构建具有实时数据…

详解APQC流程分级分类框架PCF13个高阶分类和5级业务流程

一&#xff1a;什么是APQC 美国生产力与质量中心(American Productivity and Quality Center&#xff0c;简称为APQC)&#xff0c;创立于1977年是一个会员制的非营利机构&#xff0c;使命是“发现有效的改进方法&#xff0c;广泛地传播其发现成果&#xff0c;实现个人之间及其…

JavaScript高级:垃圾回收机制

1 引言 垃圾回收机制&#xff08;Garbage Collection&#xff09;简称 GC。js中的内存的分配和回收都是自动完成的&#xff0c;内存在不使用的时候会被垃圾回收器自动回收。 2 内存的生命周期 js环境中分配的内存&#xff0c;一般有如下的生命周期&#xff1a; 1. 内存分配&…

浅析HTTP协议

首先&#xff0c;前端请求后端数据&#xff0c;后端响应数据给前端&#xff0c;这是我们大家都知道的&#xff0c;那其中所涉及到的数据传输协议又是什么呢&#xff1f;这个传输规范就是我们大名鼎鼎的HTTP协议&#xff01; 什么是HTTP协议&#xff1f; HTTP&#xff08;超文本…

【医学图像隐私保护】PLAN方法:解决 GAN 生成医学图像 Latent 空间中的隐私保护

PLAN方法&#xff1a;解决 GAN 生成医学图像 Latent 空间中的隐私保护方法 PLAN 原理StyleGAN 生成视网膜图k-SALSA 生成视网膜图PLAN方法 生成视网膜图 总结 PLAN 原理 论文&#xff1a;https://arxiv.org/abs/2307.02984 代码&#xff1a;https://github.com/perceivelab/P…

第二证券:深夜突发,油价大涨!惊魂一夜,5700亿市值蒸发

当地时间1月25日&#xff0c;美股三大股指延续涨势&#xff0c;前一日大涨的抢手中概股走势分解。成绩低于预期的特斯拉单日大跌逾12%&#xff0c;总市值蒸腾超越5700亿元人民币&#xff0c;其后市目标价还遭多家组织下调。 从隔夜发布的重要经济及政策数据看&#xff0c;美国…

【RabbitMQ】死信(延迟队列)的使用

目录 一、介绍 1、什么是死信队列(延迟队列) 2、应用场景 3、死信队列(延迟队列)的使用 4、死信消息来源 二、案例实践 1、案例一 2、案例二&#xff08;消息接收确认 &#xff09; 3、总结 一、介绍 1、什么是死信队列(延迟队列) 死信&#xff0c;在官网中对应的单词…

【c语言】扫雷

前言&#xff1a; 扫雷是一款经典的单人益智游戏&#xff0c;它的目标是在一个方格矩阵中找出所有的地雷&#xff0c;而不触碰到任何一颗地雷。在计算机编程领域&#xff0c;扫雷也是一个非常受欢迎的项目&#xff0c;因为它涉及到许多重要的编程概念&#xff0c;如数组、循环…

基于卡尔曼滤波的平面轨迹优化

文章目录 概要卡尔曼滤波代码主函数代码CMakeLists.txt概要 在进行目标跟踪时,算法实时测量得到的目标平面位置,是具有误差的,连续观测,所形成的轨迹如下图所示,需要对其进行噪声滤除。这篇博客将使用卡尔曼滤波,对轨迹进行优化。 优化的结果为黄色线。 卡尔曼滤波代码…

带【科技感】的Echarts 图表

Echarts脚本在线地址 https://cdn.jsdelivr.net/npm/echarts5.4.3/dist/echarts.min.js 引入Echarts 脚本后粘贴代码 vue2 代码&#xff1a; <template><div><div ref"col-2-row-2" class"col-2-row-2"></div></div> <…

PHP - Yii2 异步队列

1. 前言使用场景 在 PHP Yii2 中&#xff0c;队列是一种特殊的数据结构&#xff0c;用于处理和管理后台任务。队列允许我们将耗时的任务&#xff08;如发送电子邮件、push通知等&#xff09;放入队列中&#xff0c;然后在后台异步执行。这样可以避免在处理大量请求时阻塞主应用…

sklearn 学习-混淆矩阵 Confusion matrix

混淆矩阵Confusion matrix&#xff1a;也称为误差矩阵&#xff0c;通过计算得出矩阵的结果用来表示分类器的精度。其每一列代表预测值&#xff0c;每一行代表的是实际的类别。 from sklearn.metrics import confusion_matrixy_true [2, 0, 2, 2, 0, 1] y_pred [0, 0, 2, 2, 0…

ICMP协议详解

ICMP&#xff08;Internet Control Message Protocol&#xff09;协议是一个网络层协议。 一个新搭建好的网络&#xff0c;往往需要先进行一个简单的测试&#xff0c;来验证网络是否畅通&#xff1b;但是IP协议并不提供可靠传输。如果丢包了&#xff0c;IP协议并不能通知传输层…
最新文章