C# 基于MQTT创建客户端的可靠数据传输

C# 基于MQTT创建客户端的可靠数据传输

  • 引言
  • MQTT简介
  • C# MQTT库引用
  • 代码和描述
    • 1、 代码
    • 2、 描述

引言

MQTT是tcpip的应用层协议,这里我们简单介绍一下MQTT的基本概念,并用C# 描述客户端的订阅和发布。

MQTT简介

  • MQTT(Message Queuing Telemetry Transport)即 消息队列遥测传输,是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它也是工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。

  • MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

  • 通过MQTT协议,目前已经扩展出了数十个MQTT服务器端程序,可以通过PHP,JAVA,Python,C,C#等系统语言来向MQTT发送相关消息。

  • MQTT 服务器是发布-订阅架构的转发中心,它可以非常简单地在Internet 服务器上实现。服务器分发消息,因此是推送者,客户端可以发布消息(发送方)、订阅消息(接收方)或两者兼而有之。客户端(也称为节点)是一种智能设备,如微控制器或具有 TCP/IP 堆栈和实现 MQTT 协议的软件的计算机。
    在这里插入图片描述

  • QoS(Quality of Service levels)服务质量是 MQTT 的一个重要特性。当我们使用时,连接已经在一定程度上是不是面向连接的。MQTT 在这里帮助避免信息丢失及其服务质量水平。 服务质量水平包括三个等级。

    1. AtMostOnce ——最多一次
    2. AtLeastOnce ——至少一次
    3. ExactlyOnce ——恰好一次,这个服务质量可以确保数据准确的发送到订阅端。

C# MQTT库引用

这里我使用是VS2022,在工具中的netGet包管理器中打开“管理解决方案包“,搜索MQTTnet,找到对应的包。这里注意要选择自己合适的版本!注意要选择自己合适的版本!注意要选择自己合适的版本!重要的话说3遍。我在win10环境下使用的VS2022,验证了多次才发现4.1.2.350是合适的。如下图所示。
在这里插入图片描述

代码和描述

1、 代码

using System;
using System.Collections.Generic; 
using System.Threading.Tasks; 
using MQTTnet;
using MQTTnet.Client; 
using MQTTnet.Protocol;
using MQTTnet.Server;

namespace WriteSN
{
  internal class Mqtt
  {
      public static byte[] Mqtt_Message_Received_Str = new byte[] { };
  	//构造函数
      public Mqtt()
      {
         //初始化全局变量
          Mqtt_Message_Received_Str = new byte[] { };
      }

      /// <summary>
      ///  发布和订阅MQTT专用主题授权获取数据
      /// </summary>
      /// <param name="Data">发布传输字符串</param>
      /// <returns></returns>
      public async Task MQTT_Client_Handle(string Data)
      {
  		//服务端地址
  		string broker = "broker.emqx.io";
  		//端口·
  		int port = 8090;
  		//客户端ID
  		string clientId = Guid.NewGuid().ToString();
  		//订阅和发布的主题
  		string  req_topic = "trace_id_req";//发
  		string  rsp_topic = "trace_id_rsp";//收 
  		//用户和密码
  		string username = "";//"admin";
  		string password = "";//public";
  		
  		// Create a MQTT client factory 创建客户端代理
  		var factory = new MqttFactory();
  		
  		// Create a MQTT client instance 创建客户端实例
  		var mqttClient = factory.CreateMqttClient();
  		
          // Create MQTT client options 为连接服务器创建客户端选项
          var options = new MqttClientOptionsBuilder().
              .WithTcpServer(broker, port) // MQTT broker address and port
              .WithCredentials(username, password) // Set username and password
              .WithClientId(clientId).WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)//QOS=2
              .WithCleanSession()
              .Build();
          // Connect to MQTT broker //连接代理服务器
          var connectResult = await mqttClient.ConnectAsync(options);
      
          if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
          {   
              //连接成功后打印
              Console.WriteLine("\r\nConnected to MQTT broker successfully.\r\n");

              // Subscribe to a topic //订阅一个要接收的主题
              await mqttClient.SubscribeAsync(rsp_topic, MqttQualityOfServiceLevel.ExactlyOnce);//QOS=2
              // Callback function when a message is received //添加接收回调的消息
              mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;

              //=======================================================================================       
              //开始发布消息        
              var message = new MqttApplicationMessageBuilder()
                  .WithTopic(req_topic)
                  .WithPayload(Data)
                  .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)//QOS=2
                  .WithRetainFlag(false)
                  .Build();
              Console.WriteLine($"Send topic:{message.Topic}\r\nSend QoS:{message.QualityOfServiceLevel}\r\nSend RetainFlag: {message.Retain}\r\n");
              //发送一个主题消息
              await mqttClient.PublishAsync(message);
              // Wait for 1 second //自定义等待一段时间后关闭
              await Task.Delay(1000); 
              // Unsubscribe and disconnect//关闭接收主题
              await mqttClient.UnsubscribeAsync(rsp_topic);
              await mqttClient.DisconnectAsync();
              Console.WriteLine($"Unsubscribe and disconnect.\r\n");
          }
          else
          {
              Console.WriteLine($"Failed to connect to MQTT broker: {connectResult.ResultCode}");
          }
      }

     //回调函数,打印和传递订阅接收到数据
      private static Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
      {
          Mqtt_Message_Received_Str = arg.ApplicationMessage.Payload;

          Console.WriteLine($"Received message:\r\n{Utils.bytesToString(Mqtt_Message_Received_Str)}\r\n[{arg.ApplicationMessage.Payload.Length}]\r\n");
          Console.WriteLine("Received topic:" + $"{arg.ApplicationMessage.Topic}\t\nReceived QoS:{arg.ApplicationMessage.QualityOfServiceLevel}\r\nReceived RetainFlag: {arg.ApplicationMessage.Retain}\r\n");
          return Task.CompletedTask;
      }

  }
}

2、 描述

  • 小结以上代码可以理解为3部分功能,创建连接、回调接收和发布消息。且3部分中均可以设置服务质量QoS(Quality of Service levels),确保可靠性传输,当然,服务器端也必须对标QOS设置:
    1、创建连接

    var connectResult = await mqttClient.ConnectAsync(options);

    2、回调接收

    mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync

    3、发布消息

    await mqttClient.PublishAsync(message);

    4、Qos设置
    参看代码备注 “//QOS=2”

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/265970.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AcWing125. 耍杂技的牛(贪心+推公式)

题目链接AcWing125. 耍杂技的牛 分析: 这是一道贪心问题&#xff0c;我们假设牛最终的摆放顺序(从上大小)为1,2,3,...i,i1,...,n&#xff0c;当存在相邻的两头牛i,i1如果 w i s i > w i 1 s j 1 w_is_i> w_{i1}s_{j1} wi​si​>wi1​sj1​ 那么交换两头牛i,i1的…

ansibe的脚本---playbook剧本(1)

playbook剧本组成部分&#xff1a; 1、task 任务&#xff1a; 主要是包含要在目标主机上的操作&#xff0c;使用模块定义操作。每个任务都是模块的调用。 2、variables变量&#xff1a;存储和传递数据。变量可自定义&#xff0c;可以在playbook中定义为全局变量&#xff0c;可…

centos7服务器安装 mysql

centos7服务器安装 mysql 一、下载 官网&#xff1a;https://dev.mysql.com/downloads/mysql/ 二、安装 1.查看有没有需要卸载的SQL包&#xff0c;一般系统的mariadb是自带的&#xff0c;需要卸载 rpm -qa | grep mysql rpm -qa | grep mariadb例如&#xff1a;安装过MySQL…

ubuntu保存分辨率失效解决办法

在VM虚拟机中&#xff0c;遇到修改ubuntu分辨率后&#xff0c;重启后又重置的解决办法。 目前我的ubuntu版本是&#xff1a;ubuntu 18.04.6 版本。 1.首先&#xff0c;在你喜欢的目录建立一个.sh 脚本文件。 终端执行命令&#xff1a;sudo vim xrandr.sh 2.按 i 进入编辑状…

开放原子线下训练营---STM32H7搭载TobudOS开发心得

导语&#xff1a; 本次线下活动是以STM32H7为核心的一个功能强大的开发板&#xff0c;不仅支持Audio&#xff0c;HDMI&#xff0c;还支持4G或者WiFi模块&#xff0c;也可以外接屏幕&#xff0c;本次线下训练营是以4G模块进行开发。 线下的实物开发板如下所示&#xff1a; 注意…

更改WiseAlign软件界面图标方法

更改WiseAlign软件界面图标方法 未替换时 首先将图片转换为BMP格式&#xff0c;在搜索栏处输入画图&#xff0c;点击打开画图工具 按住图标拖动到画布内&#xff0c;或是直接CtrlV将图标复制到画布内 点击文件&#xff0c;再点击另存为 保存类型选择“24位位图&#xff08;*.bm…

超分辨数据集:Set5 Set14 BSD100 Urban100 Manga109

DIV2K数据集官网上很好找到&#xff0c;但是网上流传的Set5 14 BSD100,Urban100 Manga109都是私人进行处理过的版本&#xff0c;各个处理方式都不同&#xff0c;为了统一方式写了这篇文章。 官方的DIV2K x2、x3、x4的LR图片使用下面matlab代码生成&#xff08;已经经过测试最后…

Hadoop入门学习笔记——五、在虚拟机中部署Hive

视频课程地址&#xff1a;https://www.bilibili.com/video/BV1WY4y197g7 课程资料链接&#xff1a;https://pan.baidu.com/s/15KpnWeKpvExpKmOC8xjmtQ?pwd5ay8 Hadoop入门学习笔记&#xff08;汇总&#xff09; 目录 五、在虚拟机中部署Hive5.1. 在node1虚拟机安装MySQL5.2.…

drawio绘制组织架构图和树形图

drawio绘制组织架构图和树形图 drawio是一款强大的图表绘制软件&#xff0c;支持在线云端版本以及windows, macOS, linux安装版。 如果想在线直接使用&#xff0c;则直接输入网址draw.io或者使用drawon(桌案), drawon.cn内部完整的集成了drawio的所有功能&#xff0c;并实现了云…

VSCode SSH 连接提示: spawn UNKNOWN

随笔记录 目录 1. 背景介绍 2. 确认问题 : ssh -V 3. 解决问题 3.1 确认本地 ssh.exe 路径 3.2 修改vscode Remote.ssh:Path 3.2.1 设置 Reomte.ssh:Path - 方法一 3.2.2 设置 Reomte.ssh:Path - 方法二 1. 背景介绍 windows 系统vscode ssh remote CentOS7&#xff…

【Nacos】—客户端与服务端源码解析

Nacos系列 Nacos—简述、注册中心、配置中心 Nacos安装教程 SpringBoot项目与Nacos配置 一、背景介绍 Nacos&#xff08;Naming and Configuration Service&#xff09;是阿里巴巴开源的服务发现和配置管理工具&#xff0c;它是一个全面的微服务基础设施组件&#xff0c;提供…

Linux命令-查看内存、GC情况及jmap 用法

查看进程占用内存、CPU使用情况 1、查看进程 #jps 查看所有java进程 #top 查看cpu占用高进程 输入m &#xff1a;根据内存排序 topMem: 16333644k total, 9472968k used, 6860676k free, 165616k buffers Swap: 0k total, 0k used, 0k free, 6…

【vue】前端页面点击按钮弹窗播放m3u8格式视频

最终效果&#xff1a; 1.表格操作列 <el-table ref"tables" v-loading"loading" :data"list" :default-sort"defaultSort" sort-change"handleSortChange" border><el-table-column label"id" align&qu…

【面向对象】对比JavaScript、Go、Ada、Python、C++、Java、PHP的访问限制。

在不同编程语言中&#xff0c;控制成员&#xff08;变量、方法、类等&#xff09;可见性的机制不尽相同。以下是对比JavaScript、Go、Ada、Python、C、Java、PHP所使用的访问限制关键字和约定&#xff1a; 一、JavaScript ### JavaScript访问限制 早期的JavaScript并没有类似…

vue 使用 html2canvas 截取图片保存

vue 使用 html2canvas 截取图片保存 好久没有写博文了&#xff0c;写够了&#xff0c;没啥想写的了&#xff0c;这个号算是废了&#xff0c;哎&#xff0c;气人啊&#xff01;越来越胖&#xff0c;越来越懒了。 html2canvas 简介 html2canvas是一个JavaScript库&#xff0c;它…

自定义类型:结构体,枚举,联合(2)

2. 位段 2.1 什么是位段 位段的声明和结构是类似的&#xff0c;有两个不同&#xff1a; 1.位段的成员必须是 int、unsigned int 或signed int 。 2.位段的成员名后边有一个冒号和一个数字。 比如&#xff1a; struct A {int _a:2;int _b:5;int _c:10;int _d:30; };A就是一…

Spark Shell的简单使用

简介 Spark shell是一个特别适合快速开发Spark原型程序的工具&#xff0c;可以帮助我们熟悉Scala语言。即使你对Scala不熟悉&#xff0c;仍然可以使用这个工具。Spark shell使得用户可以和Spark集群交互&#xff0c;提交查询&#xff0c;这便于调试&#xff0c;也便于初学者使用…

Apache Flink 进阶教程(七):网络流控及反压剖析

目录 前言 网络流控的概念与背景 为什么需要网络流控 网络流控的实现&#xff1a;静态限速 网络流控的实现&#xff1a;动态反馈/自动反压 案例一&#xff1a;Storm 反压实现 案例二&#xff1a;Spark Streaming 反压实现 疑问&#xff1a;为什么 Flink&#xff08;bef…

15-Echarts简化系列之:geo 地理坐标系,地图资源基本绘制和配置项使用

Echarts版本&#xff1a;5.4.3 geo&#xff1a;地理坐标系组件用于地图的绘制&#xff0c;支持在地理坐标系上绘制散点图&#xff0c;线集。绘制地图的数据源可支持 geojson和 svg 格式。 本文章中提供 实例代码 和地图 静态资源 &#xff0c;项目以 react ts 为主&#xff0…

html旋转相册

一、实验题目 做一个旋转的3d相册&#xff0c;当鼠标停留在相册时&#xff0c;相册向四面散开 二、实验代码 <!DOCTYPE html> <html lang"zh"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" con…