.Net Core/.net 6/.Net 8 实现Mqtt服务器

.Net Core/.net 6/.Net 8 实现Mqtt服务端

  • Mqtt服务端代码
  • `IMqttServer` 接口
  • 业务类,实现 `IMqttServer` 接口
  • `Program.cs`

直接上代码
nuget 引用
MQTTnet

Mqtt服务端代码



using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;

namespace Code.Mqtt
{
    /// <summary>
    /// mqtt服务端
    /// </summary>
    public class MqttServerBase
    {


        public MqttServer _server;

        readonly IMqttServer _mqttServer;


        /// <summary>
        /// 向指定主题发送消息
        /// </summary>
        public Action<string, string> ToTopic;


        /// <summary>
        /// 主题/客户端列表
        /// </summary>
        public Dictionary<string,List<string>> Topic_Client=new Dictionary<string, List<string>>();



        public MqttServerBase(IMqttServer mqttServer)
        {
            _mqttServer = mqttServer;

            if(mqttServer == null)
            {
                throw new Exception("MqttServer配置错误");
            }

            var optionbuilder = new MqttServerOptionsBuilder()
           .WithDefaultEndpoint()//设置默认地址127.0.0.1
           .WithDefaultEndpointPort(_mqttServer.Port);//1883

            _server = new MqttFactory().CreateMqttServer(optionbuilder.Build());


            ToTopic = (topic, msg) => {
                _server.InjectApplicationMessage(
                new InjectedMqttApplicationMessage(
                    new MqttApplicationMessageBuilder()
                    .WithTopic(topic)
                    .WithPayload(msg)
                    .Build()
                    )
                );
            };


            _server.ClientConnectedAsync += (e) =>
            {
                _mqttServer.ClientConnectedAsync(e.ClientId, e);
                return Task.CompletedTask;
            };


            _server.ClientDisconnectedAsync += (e) => {
                _mqttServer.ClientDisconnectedAsync(e.ClientId, e);
                return Task.CompletedTask;
            };

            _server.InterceptingPublishAsync += (e)=> {
                var msg = e.ApplicationMessage?.PayloadSegment.Array?.BToString();
                var Topic = e.ApplicationMessage.Topic;

                //判断主题是否存在
                if (Topic_Client.ContainsKey(Topic))
                {
                    _mqttServer.InterceptingPublishAsync(e.ClientId, Topic, msg, e, ToTopic);
                }

                return Task.CompletedTask;
            };


            _server.ApplicationMessageNotConsumedAsync += (e) => {
                var Topic = e.ApplicationMessage.Topic;
                var msg = e.ApplicationMessage.PayloadSegment.Array.BToString();

                //判断主题是否存在,否则会进入死循环
                if (Topic_Client.ContainsKey(Topic))
                {
                    _mqttServer.ApplicationMessageNotConsumedAsync(Topic, msg, e);
                }

                return Task.CompletedTask;
            };


            _server.ValidatingConnectionAsync += (e) => {
                if (_mqttServer.ValidatingConnectionAsync(e.UserName, e.Password,e.ClientId, e))
                {
                    e.ReasonCode = MqttConnectReasonCode.Success;//验证通过
                }
                else
                {
                    e.ReasonCode = MqttConnectReasonCode.Banned;//验证不通过
                }
                return Task.CompletedTask;
            };

            //订阅主题
            _server.ClientSubscribedTopicAsync += (e) =>
            {
                var _topic = e.TopicFilter.Topic;
                //保存主题
                if (!Topic_Client.ContainsKey(_topic))
                {
                    Topic_Client.Add(_topic, new List<string>());
                }

                //添加订阅主题的客户端
                if (!Topic_Client[_topic].Any(x=>x== e.ClientId))
                {
                    Topic_Client[_topic].Add(e.ClientId);
                }

                _mqttServer.ClientSubscribedTopicAsync(e.ClientId, _topic, e);
                return Task.CompletedTask;
            };


            //取消订阅
            _server.ClientUnsubscribedTopicAsync += (e) =>
            {

                var _topic = e.TopicFilter;
                //移除客户端
                if (!Topic_Client.ContainsKey(_topic))
                {
                    Topic_Client[_topic].Remove(e.ClientId);
                    if (Topic_Client[_topic].Count == 0)
                    {
                        // 移除没有客户端订阅的主题
                        Topic_Client.Remove(_topic);
                    }

                    _mqttServer.ClientUnsubscribedTopicAsync(e.ClientId, e.TopicFilter, e);
                }

                return Task.CompletedTask;
            };

            //服务启动事件
            _server.StartedAsync += _mqttServer.StartedAsync;

            //服务停止事件
            _server.StoppedAsync += _mqttServer.StoppedAsync;

            Start();
        }

        public async Task Start()
        {
            Console.WriteLine("正在启动Mqtt服务");
            await _server.StartAsync();
            Console.WriteLine("Mqtt服务启动成功,端口:" + _mqttServer.Port);
        }


        public async Task Stop()
        {
            Console.WriteLine("正在停止Mqtt服务");
            await _server.StopAsync();
            Console.WriteLine("Mqtt服务停止");
        }

        /// <summary>
        /// 重启服务
        /// </summary>
        /// <returns></returns>
        public async Task ReStart()
        {
            await Stop();
            await Start();
        }


        


    }
}



IMqttServer 接口


using MQTTnet.Server;

namespace Code.Mqtt
{
    public interface IMqttServer
    {

        /// <summary>
        /// 服务端口
        /// </summary>
        int Port { get;}


        /// <summary>
        /// 服务启动事件
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task StartedAsync(EventArgs args);

        /// <summary>
        /// 服务停止事件
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task StoppedAsync(EventArgs args);


        /// <summary>
        /// 客户端上线
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task ClientConnectedAsync(string clientId,ClientConnectedEventArgs args);

        /// <summary>
        /// 客户端下线
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task ClientDisconnectedAsync(string clientId,ClientDisconnectedEventArgs args);

        /// <summary>
        /// 消息事件
        /// </summary>
        /// <param name="args"></param>
        /// <param name="ToTopic">发送消息</param>
        /// <returns></returns>
        public Task InterceptingPublishAsync(string clientId,string Topic,string msg,InterceptingPublishEventArgs args, Action<string, string> ToTopic);

        /// <summary>
        /// 验证
        /// </summary>
        /// <param name="username">账号</param>
        /// <param name="password">密码</param>
        /// <param name="args"></param>
        /// <returns></returns>
        public bool ValidatingConnectionAsync(string username,string password,string clientId,ValidatingConnectionEventArgs args);

        /// <summary>
        /// 消息未消费事件
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args);

        /// <summary>
        /// 订阅主题事件
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task ClientSubscribedTopicAsync(string clientId,string Topic,ClientSubscribedTopicEventArgs args);

        /// <summary>
        /// 取消订阅主题事件
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task ClientUnsubscribedTopicAsync(string clientId,string Topic,ClientUnsubscribedTopicEventArgs args);
    }
}

业务类,实现 IMqttServer 接口


    public class MqttApp : IMqttServer
    {

        /// <summary>
        /// 服务端口
        /// </summary>
        int IMqttServer.Port { get => 10883; }

        public MqttApp()
        {

        }

        /// <summary>
        /// 消息未消费
        /// </summary>
        /// <param name="Topic">主题</param>
        /// <param name="msg">消息内容</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args)
        {
            Console.WriteLine($"消息未消费{Topic}:");
            Console.WriteLine(msg);
        }

        /// <summary>
        /// 客户端上线
        /// </summary>
        /// <param name="clientId">客户端id</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.ClientConnectedAsync(string clientId, ClientConnectedEventArgs args)
        {
            Console.WriteLine($"客户端上线 id:{clientId}");
        }

        /// <summary>
        /// 客户端下线
        /// </summary>
        /// <param name="clientId">客户端id</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.ClientDisconnectedAsync(string clientId, ClientDisconnectedEventArgs args)
        {
            Console.WriteLine($"客户端下线 id:{clientId}");
        }

        /// <summary>
        /// 订阅主题
        /// </summary>
        /// <param name="clientId">客户端id</param>
        /// <param name="Topic">主题</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.ClientSubscribedTopicAsync(string clientId, string Topic, ClientSubscribedTopicEventArgs args)
        {
            Console.WriteLine($"客户端{clientId}订阅主题:{Topic}");
        }

        /// <summary>
        /// 取消主题订阅
        /// </summary>
        /// <param name="clientId">客户端id</param>
        /// <param name="Topic">主题</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.ClientUnsubscribedTopicAsync(string clientId, string Topic,ClientUnsubscribedTopicEventArgs args)
        {
            Console.WriteLine($"客户端{clientId} 取消主题订阅:{Topic}");
        }


        /// <summary>
        /// 收到客户端消息
        /// </summary>
        /// <param name="clientId">客户端id</param>
        /// <param name="Topic">主题</param>
        /// <param name="msg">消息内容</param>
        /// <param name="args">事件原始参数</param>
        /// <param name="ToTopic">推送消息到指定主题 ("主题","内容")</param>
        /// <returns></returns>
        async Task IMqttServer.InterceptingPublishAsync(string clientId, string Topic, string msg, InterceptingPublishEventArgs args, Action<string, string> ToTopic)
        {
            Console.WriteLine($"客户端{clientId} 主题{Topic} 发送消息 内容:");
            Console.WriteLine(msg);

            //推送消息到指定主题
            ToTopic("主题","内容");
        }


        /// <summary>
        /// 服务启动事件
        /// </summary>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.StartedAsync(EventArgs args)
        {
        }


        /// <summary>
        /// 服务停止事件
        /// </summary>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.StoppedAsync(EventArgs args)
        {
        }


        /// <summary>
        /// 验证账号密码
        /// </summary>
        /// <param name="username">账号</param>
        /// <param name="password">密码</param>
        /// <param name="clientId">客户端id</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        bool IMqttServer.ValidatingConnectionAsync(string username, string password, string clientId, ValidatingConnectionEventArgs args)
        {
            Console.WriteLine($"验证客户端{clientId}信息:{args.UserName} {args.Password}");

            return true;//验证通过
            //return false;//验证不通过

        }
    }



Program.cs

// 注入
builder.Services.AddSingleton<IMqttServer, MqttApp>();
builder.Services.AddSingleton<MqttServerBase>();

/* 
如果没有下面这段代码,那么程序启动后不会立即启动mqtt服务,需要在控制器注入来初始化实列,
app.Services.GetService 相当于访问了一次对象
*/
//立即启动Mqtt服务
//app.Services.GetService<MqttServerBase>();

//延时启动Mqtt服务
Task.Run(async () => {
    await Task.Delay(3000);
    app.Services.GetService<MqttServerBase>();
});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/439834.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python学习笔记-Flask实现简单的抽奖程序

1.导入flask包和randint包 from flask import Flask,render_template from random import randint 2.初始化 Flask 应用: app Flask(__name__) 3. 定义英雄列表 hero [黑暗之女,狂战士,正义巨像,卡牌大师,德邦总管,无畏战车,诡术妖姬,猩红收割者,远古恐惧,正义天使,无极剑…

Clickhouse表引擎介绍

作者&#xff1a;俊达 1 引擎分类 ClickHouse表引擎一共分为四个系列&#xff0c;分别是Log、MergeTree、Integration、Special。其中包含了两种特殊的表引擎Replicated、Distributed&#xff0c;功能上与其他表引擎正交&#xff0c;根据场景组合使用。 2 Log系列 Log系列…

高阶提示词工程、幻觉综述

提示词工程技术 类比提示 “类比提示”利用类比推理的概念&#xff0c;鼓励模型生成自己的例子和知识&#xff0c;从而实现更灵活和高效的解决问题。 后退提示 “后退提示”专注于抽象&#xff0c;引导模型推导出高级概念和原理&#xff0c;进而提高其推理能力。 使用一个…

mysql学习笔记8——常用5个内置方法

1count 对查询内容进行计数&#xff0c;并返回结果 2as as可以将查询出来结果赋予新名字 3sum sum可以查询某字段特定条件下的和值 4concat concat可以将多列数据合并成一列&#xff0c;只要concat&#xff08;&#xff09;即可 5group_concat group_concat可以把多列…

SpringBoot+Ajax+redis实现隐藏重要接口地址

&#x1f3e1;浩泽学编程&#xff1a;个人主页 &#x1f525; 推荐专栏&#xff1a;《深入浅出SpringBoot》《java对AI的调用开发》 《RabbitMQ》《Spring》《SpringMVC》《项目实战》 &#x1f6f8;学无止境&#xff0c;不骄不躁&#xff0c;知行合一 文章目录 …

预付费电表的应用和预付费平台的操作方式

*、智能预付费电能表的应用分析 1应用功能的分析 这里主要讲的是与远程抄表系统的结合&#xff0e;如图2所示&#xff0e;为系统工作的程序&#xff0e;在远程抄表中&#xff0c;通信方式多种多样&#xff0e;主要有互联网、电话线通信、有线电视通信、光纤通信、GPRS、卫星通…

关于esp8266的一些经验汇总,新手必看

说实话&#xff0c;esp8266的nodemcu 已经使用了2年多了&#xff0c;各种问题遇到过&#xff0c;就尝试各种解决&#xff0c;而现在回头来看真的是稀里糊涂的在用&#xff0c;当然这个问题也同样涉及到esp32. 因为最近打算自己打一块esp8266的板&#xff0c;之前打的比较多的是…

Hi3516DV500+SC2210 AIISP 黑光相机

1. Hi3516DV500 Hi3516DV500是一颗面向行业市场推出的高清智能网络摄像头SoC。该芯片最高支持2路sensor输入&#xff0c;支持最高5M30fps的ISP图像处理能力&#xff0c;支持2F WDR、多级降噪、六轴防抖、多光谱融合等多种传统图像增强和处理算法&#xff0c;支持通过AI算法对输…

ABAP 内表排序总结

目录 ABAP 内表排序总结需求的场景二分法查找SAP 二分法查找SAP SORT排序 ABAP 内表排序总结 ABAP 内表排序SORT总结&#xff1a; 在创建完内表之后&#xff0c;最好使用sort去排序一下使用read读取内表&#xff0c;如果没有排序的话&#xff0c;可能会读取失败read内表只能读…

Fortran语法介绍(一)

个人专栏—ABAQUS专栏 Abaqus2023的用法教程——与VS2022、oneAPI 2024子程序的关联方法 Abaqus2023的用法教程——与VS2022、oneAPI 2024子程序的关联方法Abaqus有限元分析——有限元网格划分基本原则 Abaqus有限元分析——有限元网格划分基本原则各向同性线弹性材料本构模型…

创维汽车SKYHOME获德国设计奖,中国红设计闪耀世界

祝贺&#xff01;创维汽车SKYHOME以卓越的国潮设计理念和突破性的设计语言强势出圈&#xff0c;荣获被誉为设计界“奥斯卡”德国iF设计奖&#xff01; 创维汽车SYHOME是一款集完美设计理念、出色用户体验及创新实用功能为一体的优秀设计产品。SKYHOME的设计灵感来源于中式亭台楼…

【掌握数学公式的魔法】LatexEasy:让你的数学写作不再是难题!

内容摘要&#xff1a;在学术和研究领域&#xff0c;数学公式的准确表达至关重要。然而&#xff0c;传统的LaTeX编辑过程往往复杂且耗时。幸运的是&#xff0c;有了LatexEasy&#xff0c;一切都变得简单起来。这款工具不仅简化了数学公式的编辑流程&#xff0c;还大大提高了工作…

【梳理】k8s使用Operator搭建Flink集群

文章目录 架构图安装cert-manager依赖helm 安装operator运行集群实例k8s上的两种模式&#xff1a;Native和Standalone两种CRDemo1&#xff1a;Application 单任务Demo2&#xff1a;Session 多任务创建ingress 总结 架构图 参考&#xff1a;部署验证demo 安装cert-manager依赖 …

面试高频 牛群的位置排序---搜索插入位置

题目描述 农场里有一群牛&#xff0c;每头牛都有一个标签值&#xff0c;这些标签值组成一个升序排列的数组 labels。现在农场主想知道&#xff0c;给定一个目标标签值 target&#xff0c;如果在牛群中存在这个标签&#xff0c;返回它的位置&#xff0c;如果不存在&#xff0c;…

NSSCTF Round#13 WEB

1.flask?jwt? 在忘记密码下面有提示secretkey,那么就可以jwt伪造 自己注册个账号然后登录 点击拿flag提示你不是admin&#xff0c;并且cookie里面有个session,用工具解密一下 python flask_session_cookie_manager3.py decode -s th3f1askisfunny -c .eJwlzjsOAyEMANG7UK…

JavaScript实现小球移动(二)

这次采用了封装函数的方法&#xff0c;将小球向左向右移动封装在同一个函数内。 代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-wi…

高效办公-浏览器基本操作

日常我们使用电脑&#xff0c;其实很大部分是用于网络功能&#xff0c;这里面除了客户端程序剩余的就是通过我们的浏览器获取信息或者使用业务系统了&#xff0c;这里就简单学习下浏览器基本常识与操作。 一、浏览器是什么&#xff1f; 白话讲浏览器就是一个软件&#xff0c;我…

springboot3.x集成nacos踩坑,并实现多环境配置

一、nacos安装部署 springboot3.x集成Nacos首先需要将Nacos从1.x升级到2.x&#xff0c;建议直接安装2.x版本&#xff0c;手动将1.x的配置信息迁移到2.x中&#xff0c;先并行一段时间&#xff0c;待全部迁移完成稳定运行之后再停掉1.x&#xff0c;升级和安装、操作请查看官方文…

了解开源可视化表单的主要优势

为什么可视化表单深受大家喜爱&#xff1f;这就需要了解开源可视化表单的优势和特点了。在流程化办公深入人心的今天&#xff0c;提高办公协作效率早已成为大家的发展目标&#xff0c;低代码技术平台、开源可视化表单是提升办公协作效率的得力助手&#xff0c;一起来看看它的优…

2024新版微信小程序登录注册功能的实现,授权登录,退出,缓存讲解,小程序个人中心的实现,修改头像 图片上传功能的实现 新版登陆注册,头像上传,修改昵称

新版小程序授权登录注册获取头像昵称文档 一&#xff0c;无法获取用户的微信头像和昵称 最近好多同学在学习石头哥小程序课程的时候&#xff0c;遇到了下面这样的问题&#xff0c;在小程序授权获取用户头像和昵称时&#xff0c;获取到的是下面这样的。 到底是什么原因导致的…