基于DotNetty实现一个接口自动发布工具 - 通信实现

基于 DotNetty 实现通信

DotNetty : 是微软的 Azure 团队,使用 C#实现的 Netty 的版本发布。是.NET 平台的优秀网络库。

项目介绍

OpenDeploy.Communication 类库项目,是通信相关基础设施层

image

  • Codec 模块实现编码解码

  • Convention 模块定义约定,比如抽象的业务 Handler, 消息载体 NettyMessage, 消息上下文 'NettyContext' 等

自定义消息格式

消息类为 NettyMessage ,封装了消息头 NettyHeader 和消息体 Body

image

NettyMessage

封装了消息头 NettyHeader 和消息体 Body

NettyMessage 点击查看代码

/// <summary> Netty消息 </summary>
public class NettyMessage
{
    /// <summary> 消息头 </summary>
    public NettyHeader Header { get; init; } = default!;

    /// <summary> 消息体(可空,可根据具体业务而定) </summary>
    public byte[]? Body { get; init; }

    /// <summary> 消息头转为字节数组 </summary>
    public byte[] GetHeaderBytes()
    {
        var headerString = Header.ToString();
        return Encoding.UTF8.GetBytes(headerString);
    }

    /// <summary> 是否同步消息 </summary>
    public bool IsSync() => Header.Sync;

    /// <summary> 创建Netty消息工厂方法 </summary>
    public static NettyMessage Create(string endpoint, bool sync = false, byte[]? body = null)
    {
        return new NettyMessage
        {
            Header = new NettyHeader { EndPoint = endpoint, Sync = sync },
            Body = body
        };
    }

    /// <summary> 序列化为JSON字符串 </summary>
    public override string ToString() => Header.ToString();
}

NettyHeader

消息头,包含请求唯一标识,是否同步消息,终结点等, 在传输数据时会序列化为 JSON

NettyHeader 点击查看代码

/// <summary> Netty消息头 </summary>
public class NettyHeader
{
    /// <summary> 请求消息唯一标识 </summary>
    public Guid RequestId { get; init; } = Guid.NewGuid();

    /// <summary> 是否同步消息, 默认false是异步消息 </summary>
    public bool Sync { get; init; }

    /// <summary> 终结点 (借鉴MVC,约定为Control/Action模式) </summary>
    public string EndPoint { get; init; } = string.Empty;

    /// <summary> 序列化为JSON字符串 </summary>
    public override string ToString() => this.ToJsonString();
}


  • 请求消息唯一标识 RequestId , 用来唯一标识消息, 主要用于 发送同步请求, 因为默认的消息是异步的,只管发出去,不需要等待响应

  • 是否同步消息 Sync , 可以不需要,主要为了可视化,便于调试

  • 终结点 EndPoint , (借鉴 MVC,约定为 Control/Action 模式), 服务端直接解析出对应的处理器

编码器

DefaultEncoder 点击查看代码

public class DefaultEncoder : MessageToByteEncoder<NettyMessage>
{
    protected override void Encode(IChannelHandlerContext context, NettyMessage message, IByteBuffer output)
    {
        //消息头转为字节数组
        var headerBytes = message.GetHeaderBytes();

        //写入消息头长度
        output.WriteInt(headerBytes.Length);

        //写入消息头字节数组
        output.WriteBytes(headerBytes);

        //写入消息体字节数组
        if (message.Body != null && message.Body.Length > 0)
        {
            output.WriteBytes(message.Body);
        }
    }
}

解码器

DefaultDecoder 点击查看代码

public class DefaultDecoder : MessageToMessageDecoder<IByteBuffer>
{
    protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
    {
        //消息总长度
        var totalLength = input.ReadableBytes;

        //消息头长度
        var headerLength = input.GetInt(input.ReaderIndex);

        //消息体长度
        var bodyLength = totalLength - 4 - headerLength;

        //读取消息头字节数组
        var headerBytes = new byte[headerLength];
        input.GetBytes(input.ReaderIndex + 4, headerBytes, 0, headerLength);

        byte[]? bodyBytes = null;
        string? rawHeaderString = null;
        NettyHeader? header;

        try
        {
            //把消息头字节数组,反序列化为JSON
            rawHeaderString = Encoding.UTF8.GetString(headerBytes);
            header = JsonSerializer.Deserialize<NettyHeader>(rawHeaderString);
        }
        catch (Exception ex)
        {
            Logger.Error($"解码失败: {rawHeaderString}, {ex}");
            return;
        }

        if (header is null)
        {
            Logger.Error($"解码失败: {rawHeaderString}");
            return;
        }

        //读取消息体字节数组
        if (bodyLength > 0)
        {
            bodyBytes = new byte[bodyLength];
            input.GetBytes(input.ReaderIndex + 4 + headerLength, bodyBytes, 0, bodyLength);
        }

        //封装为NettyMessage对象
        var message = new NettyMessage
        {
            Header = header,
            Body = bodyBytes,
        };

        output.Add(message);
    }
}

NettyServer 实现

NettyServer 点击查看代码

public static class NettyServer
{
    /// <summary>
    /// 开启Netty服务
    /// </summary>
    public static async Task RunAsync(int port = 20007)
    {
        var bossGroup = new MultithreadEventLoopGroup(1);
        var workerGroup = new MultithreadEventLoopGroup();

        try
        {
            var bootstrap = new ServerBootstrap().Group(bossGroup, workerGroup);

            bootstrap
                .Channel<TcpServerSocketChannel>()
                .Option(ChannelOption.SoBacklog, 100)
                .Option(ChannelOption.SoReuseaddr, true)
                .Option(ChannelOption.SoReuseport, true)
                .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
                {
                    IChannelPipeline pipeline = channel.Pipeline;
                    pipeline.AddLast("framing-enc", new LengthFieldPrepender(4));
                    pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
                    pipeline.AddLast("decoder", new DefaultDecoder());
                    pipeline.AddLast("encoder", new DefaultEncoder());
                    pipeline.AddLast("handler", new ServerMessageEntry());
                }));

            var boundChannel = await bootstrap.BindAsync(port);

            Logger.Info($"NettyServer启动成功...{boundChannel}");

            Console.ReadLine();

            await boundChannel.CloseAsync();

            Logger.Info($"NettyServer关闭监听了...{boundChannel}");
        }
        finally
        {
            await Task.WhenAll(
                bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
                workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))
            );

            Logger.Info($"NettyServer退出了...");
        }

    }
}

  • 服务端管道最后我们添加了 ServerMessageEntry ,作为消息处理的入口

ServerMessageEntry 点击查看代码

public class ServerMessageEntry : ChannelHandlerAdapter
{
    /// <summary> Netty处理器选择器 </summary>
    private readonly DefaultNettyHandlerSelector handlerSelector = new();

    public ServerMessageEntry()
    {
        //注册Netty处理器
        handlerSelector.RegisterHandlerTypes(typeof(EchoHandler).Assembly.GetTypes());
    }

    /// <summary> 通道激活 </summary>
    public override void ChannelActive(IChannelHandlerContext context)
    {
        Logger.Warn($"ChannelActive: {context.Channel}");
    }

    /// <summary> 通道关闭 </summary>
    public override void ChannelInactive(IChannelHandlerContext context)
    {
        Logger.Warn($"ChannelInactive: {context.Channel}");
    }

    /// <summary> 收到客户端的消息 </summary>
    public override async void ChannelRead(IChannelHandlerContext context, object message)
    {
        if (message is not NettyMessage nettyMessage)
        {
            Logger.Error("从客户端接收消息为空");
            return;
        }

        try
        {
            Logger.Info($"收到客户端的消息: {nettyMessage}");

            //封装请求
            var nettyContext = new NettyContext(context.Channel, nettyMessage);

            //选择处理器
            AbstractNettyHandler handler = handlerSelector.SelectHandler(nettyContext);

            //处理请求
            await handler.ProcessAsync();
        }
        catch(Exception ex)
        {
            Logger.Error($"ServerMessageEntry.ChannelRead: {ex}");
        }
    }
}

  • 按照约定, 把继承 AbstractNettyHandler 的类视为业务处理器

  • ServerMessageEntry 拿到消息后,首先把消息封装为 NettyContext, 类似与 MVC 中的 HttpContext, 封装了请求和响应对象, 内部解析请求的 EndPoint, 拆分为 HandlerNameActionName

  • DefaultNettyHandlerSelector 提供注册处理器的方法 RegisterHandlerTypes, 和选择处理器的方法 SelectHandler

  • SelectHandler, 默认规则是查找已注册的处理器中以 HandlerName 开头的类型

  • AbstractNettyHandler 的 ProcessAsync 方法,通过 ActionName, 反射拿到 MethodInfo, 调用终结点

NettyClient 实现

NettyClient 点击查看代码

public sealed class NettyClient(string serverHost, int serverPort) : IDisposable
{
    public EndPoint ServerEndPoint { get; } = new IPEndPoint(IPAddress.Parse(serverHost), serverPort);

    private static readonly Bootstrap bootstrap = new();
    private static readonly IEventLoopGroup eventLoopGroup = new SingleThreadEventLoop();

    private bool _disposed;
    private IChannel? _channel;
    public bool IsConnected => _channel != null && _channel.Open;
    public bool IsWritable => _channel != null && _channel.IsWritable;

    static NettyClient()
    {
        bootstrap
            .Group(eventLoopGroup)
            .Channel<TcpSocketChannel>()
            .Option(ChannelOption.SoReuseaddr, true)
            .Option(ChannelOption.SoReuseport, true)
            .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
            {
                IChannelPipeline pipeline = channel.Pipeline;
                //pipeline.AddLast("ping", new IdleStateHandler(0, 5, 0));
                pipeline.AddLast("framing-enc", new LengthFieldPrepender(4));
                pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
                pipeline.AddLast("decoder", new DefaultDecoder());
                pipeline.AddLast("encoder", new DefaultEncoder());
                pipeline.AddLast("handler", new ClientMessageEntry());
            }));
    }

    /// <summary> 连接服务器 </summary>
    private async Task TryConnectAsync()
    {
        try
        {
            if (IsConnected) { return; }
            _channel = await bootstrap.ConnectAsync(ServerEndPoint);
        }
        catch (Exception ex)
        {
            throw new Exception($"连接服务器失败 : {ServerEndPoint} {ex.Message}");
        }
    }

    /// <summary>
    /// 发送消息
    /// </summary>
    /// <param name="endpoint">终结点</param>
    /// <param name="sync">是否同步等待响应</param>
    /// <param name="body">正文</param>
    public async Task SendAsync(string endpoint, bool sync = false, byte[]? body = null)
    {
        var message = NettyMessage.Create(endpoint, sync, body);
        if (sync)
        {
            var task = ClientMessageSynchronizer.TryAdd(message);
            try
            {
                await SendAsync(message);
                await task;
            }
            catch
            {
                ClientMessageSynchronizer.TryRemove(message);
                throw;
            }
        }
        else
        {
            await SendAsync(message);
        }
    }

    /// <summary>
    /// 发送消息
    /// </summary>
    private async Task SendAsync(NettyMessage message)
    {
        await TryConnectAsync();
        await _channel!.WriteAndFlushAsync(message);
    }

    /// <summary> 释放连接(程序员手动释放, 一般在代码使用using语句,或在finally里面Dispose) </summary>
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary> 释放连接 </summary>
    private void Dispose(bool disposing)
    {
        if (_disposed)
        {
            return;
        }

        //释放托管资源,比如嵌套的对象
        if (disposing)
        {

        }

        //释放非托管资源
        if (_channel != null)
        {
            _channel.CloseAsync();
            _channel = null;
        }

        _disposed = true;
    }

    ~NettyClient()
    {
        Dispose(true);
    }
}

  • NettyClient 封装了 Netty 客户端逻辑,提供发送异步请求(默认)和发布同步请求方法

  • DotNetty 默认不提供同步请求,但是有些情况我们需要同步等待服务器的响应,所有需要自行实现,实现也很简单,把消息 ID 缓存起来,收到服务器响应后激活就行了,具体实现在消息同步器 ClientMessageSynchronizer, 就不贴了

总结

至此,我们实现了基于 DotNetty 搭建通信模块, 实现了客户端和服务器的编解码,处理器选择,客户端实现了同步消息等,大家可以在 ConsoleHost 结尾的控制台项目中,测试下同步和异步的消息,实现的简单的 Echo 模式

代码仓库

项目暂且就叫 OpenDeploy 吧

  • OpenDeploy: https://gitee.com/broadm-dotnet/OpenDeploy

欢迎大家拍砖,Star

下一步

计划下一步,基于WPF的客户端, 实现接口项目的配置与发现

文章转载自:Broadm

原文链接:https://www.cnblogs.com/broadm/p/17875559.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/219408.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

华为手环 8 五款免费表盘已上线,请注意查收

华为手环 8&#xff0c;作为一款集时尚与实用于一体的智能手环&#xff0c;不仅具备强大的功能&#xff0c;还经常更新的表盘样式&#xff0c;让用户掌控时间与健康的同时&#xff0c;也能展现自己的时尚品味。这不&#xff0c;12 月官方免费表盘又上新了&#xff0c;推出了五款…

批量AI创作文案的工具,批量AI创作文章的软件

人工智能&#xff08;AI&#xff09;的应用不断拓展&#xff0c;其中批量AI创作逐渐成为许多文本创作者和企业编辑的热门选择。面对海量的文章需求&#xff0c;批量AI创作工具能够高效、快速地生成大量文本内容&#xff0c;从而减轻创作者的工作负担。本文将专心分享批量AI创作…

Linux last命令教程:如何查看用户的登录和注销历史(附案例详解和注意事项)

Linux last命令介绍 last命令在Linux中用于显示自文件/var/log/wtmp创建以来所有用户的登录和注销列表。可以给出一个或多个用户名作为参数&#xff0c;以显示他们的登录&#xff08;和注销&#xff09;时间和主机名。 Linux last命令适用的Linux版本 last命令在大多数Linux…

高温老化房稳定性、应用领域

高温老化房控制系统的稳定性主要有四点: 1. 温度控制:控制系统确保老化室内的温度在整个老化试验过程中保持稳定并在期望的范围内。这种稳定性对于准确可靠的测试结果至关重要。 2. 湿度控制:控制系统同时保持老化室内湿度水平稳定。这一点很重要&#xff0c;因为某些材料或产…

UE5 - 把ArchvizExplorer项目改造成自己的数字孪生项目 - 开发记要

参考&#xff1a; https://blog.csdn.net/qq_17523181/article/details/133853099 https://blog.csdn.net/qq_17523181/article/details/134455597 1. 安装项目 https://www.unrealengine.com/marketplace/zh-CN/product/archviz-explorer https://karldetroit.com/archviz-exp…

inBuilder低代码平台新特性推荐-第十三期

各位知乎的友友们&#xff0c;大家好~ 今天来给大家介绍一下inBuilder低代码平台社区版中特性推荐系列第十三期——登录配置&#xff01; inBuilder低代码平台内置了多种表单登录方式&#xff1a;用户名密码、AD域、数字证书。用户可以通过系统的登录页面进行登录。登录界面样…

提升--22---ReentrantReadWriteLock读写锁

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 ReadWriteLock----读写锁1.读写锁介绍线程进入读锁的前提条件&#xff1a;线程进入写锁的前提条件&#xff1a;而读写锁有以下三个重要的特性&#xff1a; Reentran…

从浅入深掌握进阶结构体(C语言)

前言 这一期我们将继续讲解结构体的知识&#xff0c;还没有看过上一期的小伙伴一定要赶紧去学习哦。 上一期&#xff0c;冲鸭&#xff01; 那么话不多说我们开始今天的学习吧&#xff01; 文章目录 1,结构体的自引用2,匿名结构体3,位段4,结构体的传参5,尾声 1,结构体的自引用 …

Docker篇之docker部署harbor仓库

一、首先需要安装docker step1&#xff1a;安装docker #1、安装yun源 yum install -y yum-utils #2、配置yum源 yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo # 如果上面源不稳定的话&#xff0c;更换为下列的aliyun源 yu…

Redis数据结构之压缩列表

压缩列表是Redis为节约内存而开发的&#xff0c;是由一系列特殊编码的连续内存块组成的顺序型数据结构。一个压缩列表可以包含任意多个节点&#xff0c;每个节点可以保存一个字节数组或者整数值。 压缩列表构成 zlbytes: 记录整个压缩列表占用的内存字节数&#xff0c;对压缩列…

预付费用电管理系统在商场及宿舍的应用

安科瑞电气股份有限公司 上海嘉定 201801 【摘要】本文主要讨论了预付费用电管理系统软、硬件的构建方法&#xff0c;软件系统的各个模块设计&#xff0c;以及软、硬件设计过程中解决的主要问题。1联5系8电2话171微3信5同2号2 【关键词】预付费电能表硬件设计软件设计 引言 …

linux基础五:linux 系统(进程状态+进程优先级+调度和切换+环境变量)

linux 系统 一.进程状态&#xff1a;1.睡眠状态(sleep)&#xff1a;2.磁盘休眠状态(disk sleep)&#xff1a;3.停止状态(stoped --- T)&#xff1a;4.死亡状态&#xff1a;5.控制状态&#xff08;t&#xff09; 二.僵尸进程和孤儿进程&#xff1a;1.僵尸状态&#xff1a;2.孤儿…

基于51单片机风扇控制器系统设计

**单片机设计介绍&#xff0c;基于51单片机风扇控制器系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于51单片机的风扇控制器系统是一种用于控制风扇转速和温度管理的电子设备。下面是一个简单的设计介绍&#xff1a;…

Python小案例:打印10以内的素数

解析 1、利用循环控制范围&#xff08;1,100&#xff09; 2、通过循环判断素数 3、利用标记位法进行打印素数 代码 #求1——100之间的素数 for i in range(2,101):# 设置标记位is_primeNumis_primeNum Truefor j in range(2,i):if i%j 0:# print(f"{i}不是素数"…

第三方支付原理

1.什么是第三方支付 所谓第三方支付&#xff0c;就是一些和各大银行签约、并具备一定实力和信誉保障的第三方独立机构提供的交易支持平台。在通过第三方支付平台的交易中&#xff0c;买方选购商品后&#xff0c;使用第三方平台提供的账户进行货款支付&#xff0c;由第三方通知卖…

04.里氏替换原则(Liskov Substitution Principle)

暴论&#xff1a;一般的&#xff0c;如果一个富二代不想着证明自己&#xff0c;那么他一辈子都会衣食无忧。 一言 里氏替换原则想告诉我们在继承过程中会遇到什么问题&#xff0c;以及继承有哪些注意事项。 概述 这是流传较广的一个段子&#xff1a; “一个坐拥万贯家财的富二…

博客文章SEO:提升博客排名和吸引更多读者的方法来啦!

互联网发展到现在&#xff0c;搜索引擎优化&#xff08;SEO&#xff09;一直发挥着不可替代的作用。搜索引擎的流量往往更加定向&#xff0c;来自搜索引擎的流量转化率更高&#xff0c;可以帮助企业更好地实现销售和推广目标。因此&#xff0c;通过合理的SEO策略&#xff0c;你…

Data Linked UI

DataLinkedUl是一个Unity框架,它允许您在为您的应用程序创建用户界面时实现专业的数据驱动方法。使用此资产,您可以创建灵活的基于瓦片的任意大小的复杂接口系统。 核心功能: 灵活性-允许适应和调整数据变化,允许各种结构和功能配置,而不需要对现有系统进行重大破坏。 可伸…

跨境独立站反向代购系统是什么?如何搭建?

淘宝代购是近年兴起的一种购物模式&#xff0c;是帮国外客户购买中国商品。主要是通过万邦 科技的外贸代购系统&#xff0c;把淘宝、天猫等电商平台的全站商品通过API 接入到你的网站 上&#xff0c;瞬间就可以架设一个有数亿产品的大型网上商城&#xff0c;而且可以把这些中文…

BFD多跳检测配置

定义 双向转发检测BFD&#xff08;Bidirectional Forwarding Detection&#xff09;是一种全网统一的检测机制&#xff0c;用于快速检测、监控网络中链路或者IP路由的转发连通状况。 目的 为了减小设备故障对业务的影响&#xff0c;提高网络的可靠性&#xff0c;网络设备需要…