.Net 6 下WorkerService+RabbitMq实现消息的异步发布订阅

        近期项目里有需要用到RabbitMq实现一些业务,学习整理之后在此记录一下,如有问题或者不对的地方,欢迎留言指正。

一、首先创建连接工厂

 public class RabbitMQProvider
    {
        private readonly string _ipAddress;
        private readonly int? _port;
        private readonly string _username;
        private readonly string _password;

        public RabbitMQProvider()
        {
            _ipAddress = ConfigurationHelper.GetKey("RabbitMQIPAddress") ?? throw new ArgumentException("IP地址未配置!");
            _username = ConfigurationHelper.GetKey("RabbitMQUserName") ?? throw new ArgumentException("用户名不能为空");
            _password = ConfigurationHelper.GetKey("RabbitMQPassword") ?? throw new ArgumentException("密码不能为空");

            var timeApan = new TimeSpan(0, 5, 0);
            if (ConnectionFactory == null)
            {
                ConnectionFactory = new ConnectionFactory//创建连接工厂对象
                {
                    HostName = _ipAddress,//IP地址
                    UserName = _username,//用户账号
                    Password = _password,//用户密码
                    //启用自动连接恢复
                    AutomaticRecoveryEnabled = true,
                    //VirtualHost = "/mqtest",//RabbitMQ中要请求的VirtualHost名称
                    ContinuationTimeout = timeApan,
                    HandshakeContinuationTimeout = timeApan,
                    RequestedConnectionTimeout = timeApan,
                    SocketReadTimeout = timeApan,
                    SocketWriteTimeout = timeApan,
                    //启用异步消费
                    DispatchConsumersAsync = true,
                    //RequestedChannelMax = 5000
                };

            }

        }

        public ConnectionFactory ConnectionFactory { get; }


        private static IConnection connection;
        /// <summary>
        /// 获取RabbitMQ连接对象方法(创建与RabbitMQ的连接)
        /// </summary>
        /// <returns></returns>
        public IConnection GetConnection()
        {
            if (connection == null || !connection.IsOpen)
            {
                //通过工厂创建连接对象
                connection = ConnectionFactory.CreateConnection();
            }
            return connection;
        }

        int times = 0;
        private static IModel Channel;
        public IModel GetChannel()
        {
            if (Channel != null)
                return Channel;
            else
            {
                //times++;
               // Console.WriteLine($"CreateModel{times}次");
                return GetConnection().CreateModel();
            }
        }


    }

二、消息发布

1、获取连接、交换机和队列

 public class RabbitMQPublisher : IPublisher
    {
        static int x_message_ttl;
        static RabbitMQPublisher()
        {
            int.TryParse(ConfigurationHelper.GetKey("RabbitMQ_x-message-ttl"), out x_message_ttl);
            x_message_ttl = x_message_ttl * 60 * 1000;
        }
        #region
        private readonly RabbitMQProvider _provider;
        private IConnection _connection;

        public RabbitMQPublisher(RabbitMQProvider provider)
        {
            try
            {
                _provider = provider;
                //if (_connection == null || !_connection.IsOpen)
                //{
                //    _connection = _provider.ConnectionFactory.CreateConnection();
                //}
                _connection = _provider.GetConnection();
                _channel = _provider.GetChannel();

            }
            catch (Exception ex)
            {
                //记录异常日志
                Util.LogError($"RabbitMQPublisher createConnection exception. Exception message:{ex.Message}");
            }
        }

        public IConnection Connection
        {
            get
            {
                
                if (_connection != null)
                    return _connection;
                return _connection = _provider.GetConnection(); ;
            }
            //get; set;
        }

        private IModel _channel;
        public IModel Channel
        {
            get
            {
                if (_channel != null)
                    return _channel;
                else
                {
                    //if (_connection == null || !_connection.IsOpen)
                    //{
                    //    _connection = _provider.GetConnection(); ;
                    //}

                    return _channel = _provider.GetChannel();
                }

            }
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            if (Channel != null)
            {
                if (Channel.IsOpen)
                    Channel.Close();
                Channel.Abort();
                Channel.Dispose();
            }

            if (Connection != null)
            {
                if (Connection.IsOpen)
                    Connection.Close();
            }
        }
        #endregion
}

2、同步消息发送

 /// <summary>
        /// 发布(生产)消息
        /// </summary>
        /// <param name="message">消息内容</param>
        /// <param name="exchangeName">交换机名称</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="exchangeType">交换机类型</param>
        /// <param name="routingKey">路由键</param>
        /// <param name="durable">是否持久化</param>
        /// <param name="autoDelete">是否自动删除</param>
        /// <param name="arguments">用于插件和代理特定功能,如消息TTL、队列长度限制等</param>
        /// 1.x-message-ttl             发送到队列的消息在丢弃之前可以存活多长时间(毫秒)。
        /// 2.x-expires                 队列在被自动删除(毫秒)之前可以使用多长时间。
        /// 3.x-max-length              队列在开始从头部删除之前可以包含多少就绪消息。
        /// 4.x-max-length-bytes        队列在开始从头部删除之前可以包含的就绪消息的总体大小。
        /// 5.x-dead-letter-exchange    设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称。
        /// 6.x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,将使用消息的原始路由密钥。
        /// 7.x-max-priority            队列支持的最大优先级数; 如果未设置,队列将不支持消息优先级。
        /// 8.x-queue-mode              将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息。
        /// 9.x-queue-master-locator    将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。
        private Task Publish(string message, string exchangeName, string queueName, string exchangeType, string routingKey = null, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null)
        {
            if (x_message_ttl > 0)
            {
                arguments = new Dictionary<string, object>();
                arguments.Add("x-message-ttl", x_message_ttl);
            }
            //声明交换机
            Channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments);
            //声明队列
            Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments);
            Channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey == null ? string.Empty : routingKey);
            var msgByte = Encoding.UTF8.GetBytes(message);
            //设置消息持久化
            var props = Channel.CreateBasicProperties();
            props.Persistent = true;
            try
            {
                Channel.TxSelect();
                Channel.BasicPublish
                (
                    exchange: exchangeName,
                    routingKey: routingKey == null ? string.Empty : routingKey,
                    mandatory: false,
                    basicProperties: props,
                    body: msgByte
                );
                Channel.TxCommit();
            }
            catch (Exception ex)
            {
                Channel.TxRollback();
                //记录异常日志
                Util.LogError($"RabbitMQPublisher publish message exception. Exception message:{ex.Message}");
            }
            return Task.FromResult(0);
        }

3、批量发布

 /// <summary>
        /// 批量发布
        /// </summary>
        /// <param name="message"></param>
        /// <param name="exchangeName"></param>
        /// <param name="queueName"></param>
        /// <param name="exchangeType"></param>
        /// <param name="routingKey"></param>
        /// <param name="durable"></param>
        /// <param name="autoDelete"></param>
        /// <param name="arguments"></param>
        /// <returns></returns>
        private async Task PublishAsyncBatch(List<string> message, string exchangeName, string queueName, string exchangeType, string routingKey = null, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null)
        {
            //using (var conn = _provider.ConnectionFactory.CreateConnection())
            //{
            using (var channel = Connection.CreateModel())
            {
                ///Console.WriteLine(1);
                if (x_message_ttl > 0)
                {
                    arguments = new Dictionary<string, object>();
                    arguments.Add("x-message-ttl", x_message_ttl);
                }
                //声明交换机
                Channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments);
                //声明队列
                Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments);
                Channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey == null ? string.Empty : routingKey);

                //设置消息持久化
                var props = Channel.CreateBasicProperties();
                props.Persistent = true;
                try
                {
                    Channel.TxSelect();
                    var basicPublishBatch = Channel.CreateBasicPublishBatch();

                    byte[] msgByte;
                    ReadOnlyMemory<byte> memory;
                    foreach (var msg in message)
                    {
                        msgByte = Encoding.UTF8.GetBytes(msg);
                        memory = new ReadOnlyMemory<byte>(msgByte);
                        basicPublishBatch.Add
                        (
                            exchange: exchangeName,
                            routingKey: routingKey == null ? string.Empty : routingKey,
                            mandatory: false,
                            properties: props,
                            body: memory
                        );
                    }
                    basicPublishBatch.Publish();

                    Channel.TxCommit();

                    await Task.Yield();
                }
                catch (Exception ex)
                {

                    Channel.TxRollback();
                    channel.Close(); channel.Dispose();
                    //conn.Close(); conn.Dispose();
                    //记录异常日志
                    Util.LogError($"RabbitMQPublisher publish message exception. Exception message:{ex.Message}");

                    Console.WriteLine("消息订阅时错误:" + ex.Message);
                }

            }
            
        }

注意:多线程消息发布时,应避免多个线程使用同一个IModel实例,必须保证Imodel被一个线程独享,如果必须要多个线程访问呢一个实例的话,则可以通过加锁来处理,详见:.NET/C# Client API Guide — RabbitMQ

IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
  ch.BasicPublish(...);
}

三、消息订阅

1、获取连接、交换机和队列同上消息发布,不再赘述

 private void QueueInitialization(string queueName, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null)
        {
            try
            {
                if (x_message_ttl > 0)
                {
                    arguments = new Dictionary<string, object>();
                    arguments.Add("x-message-ttl", x_message_ttl);
                }
                Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments);
            }
            catch (Exception )
            {

            }
        }
/// <summary>
        /// 
        /// </summary>
        /// <param name="queueName"></param>
        /// <param name="callback"></param>
        /// <param name="autoAck"></param>
        /// <param name="consumPerTimes">每次消费的消息条数</param>
        /// <returns></returns>
        private async Task SubscribeAsync(string queueName, Func<string, bool> callback, bool autoAck, ushort consumPerTimes = 1)
        {
            try
            {
                QueueInitialization(queueName);
                //声明为手动确认,每次只消费1条消息。
                Channel.BasicQos(0, consumPerTimes, false);
                //定义消费者
                //var consumer = new EventingBasicConsumer(Channel);

                var consumer = new AsyncEventingBasicConsumer(Channel);

                //接收事件
                consumer.Received += async (eventSender, args) =>
                {
                    var message = args.Body.ToArray();//接收到的消息

                    var res = callback(Encoding.UTF8.GetString(message));
                    //返回消息确认
                    Channel.BasicAck(args.DeliveryTag, res);
                    await Task.Yield();
                };

                //开启监听 -- gai2023-11-1
                Channel.BasicConsume(queueName, autoAck, consumer);
                // await Task.Delay(1000);
            }
            catch (Exception e)
            {
                Console.WriteLine("消息订阅时错误:" + e.Message);
            }
        }

四、通过workerService订阅处理消息

internal class SubscribeWorker : BackgroundService
    {
        #region override
        public override Task StartAsync(CancellationToken cancellationToken)
        {
            try
            {
                //一些数据初始化
                _logger.LogInformation($"Settings 初始化完成");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
            }


            return base.StartAsync(cancellationToken);
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //这里注意,不能写在while里,否则会一直进行重复订阅,会导致连接数一直增长
            await MainSubscribe();
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await Task.Delay(2000);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, ex.Message);
                }
                
            }
        }

        /// <summary>
        /// 服务停止
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public override Task StopAsync(CancellationToken cancellationToken)
        {
            Task.WaitAll();
            subscriber.Dispose();
            _logger.LogInformation("Worker stop at: {time}", DateTimeOffset.Now);
            return base.StopAsync(cancellationToken);
        }
        #endregion





    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/328478.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【性能优化】GSON解性能瓶颈分析

一、背景 GSON是Google提供的开源库&#xff0c;使用很便捷&#xff0c;但是在使用过程中也发现了其短板。在Bean类结构复杂时&#xff0c;进行反序列化耗时较长&#xff0c;尤其是很多在应用启动阶段需要反序列化一些内置的数据时&#xff0c;很让人头疼&#xff0c;通过抓Tra…

学生云服务器_学生云主机_学生云数据库_云+校园特惠套餐

腾讯云学生服务器优惠活动&#xff1a;轻量应用服务器2核2G学生价30元3个月、58元6个月、112元一年&#xff0c;轻量应用服务器4核8G配置112元3个月、352.8元6个月、646.8元一年&#xff0c;CVM云服务器2核4G3M公网带宽配置842.4元一年&#xff0c;腾讯云服务器网txyfwq.com分享…

力扣电话号码的组合

文章目录 题目说明做题思路代码实现代码解析 题目链接 题目说明 首先我们先分析一下这个题目题目中说呢先给出一个字符串这个字符串其实就是这个九键数字我们要按照要求将数字所代表的字符进行自由组合形成一个字符串并且这个字符串的长度和输入的数字字符串长度相同&#xff0…

element-ui tree树形结构全选、取消全选,展开收起

控制树形结构全选、取消全选&#xff0c;展开收起 <template><div><!-- 添加 ref"tree" 属性--><el-tree:data"data"show-checkboxdefault-expand-allnode-key"id"ref"tree"highlight-current:props"defa…

使用Python操纵Word自动编写离职报告

目录 一、背景介绍 二、技术原理 三、实现步骤 1、安装python-docx库 2、创建Word文档 3、添加标题和内容 4、添加表格和图片 5、设置样式和格式化文本 6、保存文档 四、注意事项与建议 总结 随着现代社会的发展&#xff0c;自动化和智能化已经成为各行各业追求的目…

Pytorch各种Dropout层应用于详解

目录 torch框架Dropout functions详解 dropout 用途 用法 使用技巧 参数 数学理论公式 代码示例 alpha_dropout 用途 用法 使用技巧 参数 数学理论公式 代码示例 feature_alpha_dropout 用途 用法 使用技巧 参数 数学理论 代码示例 dropout1d 用途 用…

每日一记:一个windows的bat脚本工具集

最近在工作上遇到要校验文件的问题&#xff0c;例如&#xff0c;下载了一个文件之后&#xff0c;通过查看文件的md5来校验文件是否完整&#xff0c;这个动作在linux上很简单&#xff0c;但在windows上也不难&#xff0c;可以通过 certutil 命令实现&#xff0c;该命令通常可用于…

vue流程图

效果图 组件 <template><div class="processBox" v-if="list.length"><div class="childs"><div class="child" v-for="(item,index) in list" :key="item.id +-child-+index"><div…

指定Top名校|管理学教师拜师香港理工大学院士麾下访学

X老师拟自费赴香港访学&#xff0c;并指定了香港Top5之内的高校。申请一个月后&#xff0c;我们落实了香港理工大学的访学职位&#xff0c;导师为香港工程科学院和国际系统与控制科学院的两院院士、讲座教授。 X老师背景&#xff1a; 申请类型&#xff1a;自费访问学者 工作背…

迷你洗衣机哪个牌子好用?家用小型洗衣机推荐

迷你洗衣机主要分为立式洗衣机、壁挂式洗衣机&#xff0c;在特定的情况下是能够为用户提供一定的方便的。就好比如说宝宝的衣物需要和大人的分开洗&#xff0c;或者我们日常都所要清洗内衣裤、袜子等等这些贴身的衣物&#xff0c;直接将这些小件的衣物放进到迷你洗衣机中分类单…

2024“华数杯”国际赛(B题ICM)|光伏发电|国际大学生数学建模竞赛建模解析,小鹿学长带队指引全代码文章与思路

我是小鹿学长&#xff0c;就读于上海交通大学&#xff0c;截至目前已经帮200人完成了建模与思路的构建的处理了&#xff5e; 完整内容可以在文章末尾领取&#xff01; 问题重述 这个问题涉及创建一个数学模型&#xff0c;以解决与光伏发电和中国电力供应相关的各个方面。 电…

计算机网络-甘晴void学习笔记

计算机网络 计科210X 甘晴void 202108010XXX 文章目录 计算机网络期中复习1计算机网络和因特网1.1 因特网1.2 网络边缘1.3 网络核心1.4 分组交换的时延/丢包和吞吐量1.5 协议层次与服务模型 2 应用层原理2.1 应用层协议原理2.2 Web和Http2.3 因特网中的电子邮件2.4 DNS&#x…

史星海先生入选 2024中英双语版《世界名人录》【综合22卷·文化卷】(中国)

史星海(中国) Shi Xinghai (China) 经海内外各界名人及世界相关权威文化机构的大力举荐&#xff0c;鉴于史星海先生在国内外文艺&#xff0c;经济&#xff0c;政治等领域的重大影响力&#xff0c;荣幸获得入编大型人物辞书2024中英双语版《世界名人录》&#xff08;综合卷文化…

Open CASCADE学习|显示模型

目录 1、编写代码 Viewer.h Viewer.cpp ViewerInteractor.h ViewerInteractor.cpp helloworld.cpp 2、配置 3、编译运行 1、编写代码 Viewer.h #pragma once ​ #ifdef _WIN32 #include <Windows.h> #endif ​ // Local includes #include "ViewerInteract…

各省快递量数据, shp+excel,2001-2021年,已实现数据可视化

基本信息. 数据名称: 各省快递量数据 数据格式: shpexcel 数据时间&#xff1a;2001-2021年 数据几何类型: 面 数据坐标系: WGS84 数据来源&#xff1a;网络公开数据 数据字段&#xff1a; 序号字段名称字段说明1a_2001快递量/万件_2001年2a_2002快递量/万件_2002年3…

热压机PLC数据采集远程监控物联网解决方案

热压机PLC数据采集远程监控物联网解决方案 随着工业4.0时代的到来&#xff0c;智能制造已经成为制造业发展的重要方向。在热压机领域&#xff0c;PLC数据采集远程监控物联网解决方案为提高生产效率、降低维护成本、优化生产工艺提供了有效的手段。 一、热压机PLC数据采集远程…

一文带你揭秘淘宝终端技术

作者&#xff1a;周杰&#xff08;寻弦&#xff09; 在这个数字化迅速发展的时代&#xff0c;技术的每一次飞跃都不仅仅意味着一个产品的升级&#xff0c;更是对未来世界的一次大胆想象。从 PC 到 iPhone&#xff0c;从 Model 3 到 ChatGPT&#xff0c;都引领了全新的一个行业。…

Linux miniGUI移植分析

框架介绍 常用GUI程序对比 https://www.cnblogs.com/zyly/p/17378659.html MiniGUI分为底层的GAL&#xff08;图形抽象层&#xff09;和IAL&#xff08;输入抽象层&#xff09;&#xff0c;向上为基于标准POSIX接口中pthread库的Mini-Thread架构和基于Server/Client的Mini-L…

护眼灯有没有护眼的效果啊?有效果的护眼灯推荐

近几年&#xff0c;青少年近视人数呈现猛增的势头&#xff0c;且低龄化趋势也越来越明显&#xff0c;社会各界开始重视对青少年视力问题的关注。伤害视力的原因有很多种&#xff0c;其中没有良好的光线条件是问题之一&#xff0c;当使用的台灯不合格&#xff0c;频闪与蓝光问题…

渗透测试(12)- WireShark 网络数据包分析

目录 1、WireShack 简介 2、WireShark 基本使用方法 3、 WireShack 抓包分析 3.1 Hypertext Transfer Protocol (应用层) 3.2 Transmission Control Protocol (传输层) 3.3 Internet Protocol Version 4(网络层) 3.4 Ethernet Il (链路层): 数据链路层以太网头部信息 …
最新文章