【Netty专题】用Netty手写一个远程长连接通信框架

目录

  • 前言
  • 阅读对象
  • 阅读导航
  • 前置知识
  • 课程内容
    • 一、使用Netty实现一个通信框架需要考虑什么问题
    • 二、通信框架功能设计
      • 2.1 功能描述
      • 2.2 通信模型
      • 2.3 消息体定义
      • 2.4 心跳机制
      • 2.5 重连机制
      • *2.6 Handler的组织顺序
      • 2.7 交互式调试
    • 三、代码实现:非必要。感兴趣的自行查看
      • 3.1 最外层的通信入口
        • 3.1.1 NettyRpcServer:服务端通信入口
        • 3.3 NettyRpcClient:客户端通信入口
        • 3.3 NettyRpcClient:交互式调试
      • 3.2 server包下
        • 3.2.1 ServerInitializer:服务端的Handler链化
        • 3.2.2 handler包下所有的handler
        • 3.2.3 helper包:工具包
        • 3.2.4 async包:异步处理类
      • 3.3 client包下
        • 3.3.1 ClientInitializer:客户端的Handler链化
        • 3.3.2 handler包下所有的handler
      • 3.4 common包下:一些公用的定义
        • 3.4.1 NettyConstant:一些公用常量
        • 3.4.2 helper包:工具包
        • 3.4.3 codec包:编解码(反)序列化工具
      • 3.4 biz包下:业务模拟
    • 四、业务流程图
  • 学习总结
  • 感谢

前言

阅读对象

  1. 有一定网络编程基础
  2. 了解Netty常用API

阅读导航

系列上一篇文章:《【Netty专题】Netty实战与核心组件详解》

前置知识

长连接:
长连接,也叫持久连接,是指在TCP层握手成功后,不立即断开连接,并在此连接的基础上进行多次消息(包括心跳)交互,直至连接的任意一方(客户端OR服务端)主动断开连接,此过程称为一次完整的长连接。
SOCKET 连接后不管是否使用都保持连接的一种连接

短连接:
短连接,顾名思义,与长连接的区别就是,客户端收到服务端的响应后,立刻发送FIN消息,主动释放连接。也有服务端主动断连的情况,凡是在一次消息交互(发请求-收响应)之后立刻断开连接的情况都称为短连接。短连接是建立在TCP协议上的,有完整的握手挥手流程,区别于UDP协议。(SOCKET 连接发送数据接收完数据后马上断开连接的一种连接

课程内容

一、使用Netty实现一个通信框架需要考虑什么问题

这里已经假设,我们确定使用Netty作为我们的网络编程框架了。但是我想,跟楼主一样的初学者,还是会有疑惑:那我想使用Netty开发一个简单的通信交互程序,该如何做到呢?
讲真的, 这个问题确实难倒了我。因为不在那个层次,我甚至连【需要关心什么问题】都搞不懂。思来想去,我只能以【没见过猪跑,但吃过猪肉】的视角出发, 大概说一下自己的想法了。
在这里插入图片描述

  1. 序列化和反序列化问题

Q1:什么是序列化、反序列化?
答:我们知道,数据在网络中传输不可能是原文传输的,人家机器设备只认得二进制01串。所以,把原文转换为字节流这个过程就是序列化;反之,则叫做反序列化。

Q2:序列化有什么作用?
答:主要目的有:【网络传输】及【对象持久化保存】。持久化保存知道啥意思吧,就是存到各种数据库中。

  1. 编解码问题

Q3:什么是编解码问题?
答:编解码就是将一个格式转变为另一个格式的过程。比如:MP3格式转为MP4,JSON转为XML。在我们这里,就是将字节流反序列化后的数据,如何转变为我们Java应用程序识别的数据结构

  1. 心跳保活机制

摘抄自【百度:文心一言】。
心跳保活机制是一种维持网络连接长连接的机制,它通过定时发送心跳包来检测双方是否存活。如果没有特意的设置某些选项或者实现应用层心跳包,TCP空闲的时候是不会发送任何数据包。也就是说,当一个TCP的socket,客户端与服务端谁也不发送数据,会一直保持着连接。这其中如果有一方异常掉线(例如死机、路由被破坏、防火墙切断连接等),另一端如果没有发送数据,永远也不可能知道。这对于一些服务型的程序来说,是灾难性的后果,将会导致服务端socket资源耗尽。

因此,需要心跳保活机制来维持连接的有效性,及时有效地检测到一方的非正常断开,保证连接的资源被有效的利用。心跳保活机制可以应用在TCP协议层实现(例如使用TCP Keepalive),也可以在应用层实现(例如使用心跳包)。在应用层实现心跳保活机制时,通常由客户端向服务端发送自定义消息命令,服务端收到消息后回复自定义的消息给客户端。如果服务端未收到消息,则表示连接失败,如果失败的次数达到指定上限后,则重新发起连接。

  1. 公共消息体定义

这个无需多说,哪怕是我们在做web开发期间也会定义一个统一口径

特别提醒
个人在学习的时候发现网上对于【持久化】和【编解码】这两个概念多少有点混淆,或者说边界模糊。确实,他们是有些关联的,但区别也有。以下是摘抄自【百度:文心一言】

编码器和解码器常用于处理数据在不同格式之间的转换;
而序列化是将数据结构或对象状态转换为可以存储或传输的格式的过程。
因此,编码器和解码器主要关注的是数据的表示和转换,而序列化和反序列化主要关注的是对象状态的转换。

二、通信框架功能设计

2.1 功能描述

我在后面的编码中,将围绕以下功能来实现一个简单的长连接通信框架。功能如下:

  1. 基于 Netty 的 NIO 通信框架
  2. 提供消息的编解码框架,可以实现 POJO 的序列化和反序列化(【编解码】与【序列化】一块)
  3. 消息内容防篡改机制(就跟我们web开发的鉴权一样,在处理之前先校验一下内容合法性)
  4. 提供基于 IP 地址的白名单接入认证机制
  5. 断线重连机制

2.2 通信模型

在这里插入图片描述
模型解读如下:
1)客户端发送应用握手请求消息,携带节点 ID 等有效身份认证信息
2)服务端对应用握手请求消息进行合法性校验,包括节点 ID 有效性校验、节点重复登录校验和 IP 地址合法性校验,校验通过后,返回登录成功的应用握手应答消息
3)链路建立成功之后,客户端发送业务消息
4)链路成功之后,服务端发送心跳消息
5)链路建立成功之后,客户端发送心跳消息
6)链路建立成功之后,服务端发送业务消息
7)服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接

PS:协议通信双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息给对方,所以通信方式有如下两种:

  • TWO-WAY:即需要响应的请求。如请求登录
  • ONE-WAY:即无需响应的请求。如日志记录

双方之间的心跳采用 Ping-Pong 机制,当链路处于空闲状态时,客户端主动发送Ping 消息给服务端,服务端接收到 Ping 消息后发送应答消息 Pong 给客户端,如果客户端连续发送 N 条 Ping 消息都没有接收到服务端返回的 Pong 消息,说明链路已经挂死或者对方处于异常状态,客户端主动关闭连接,间隔周期 T 后发起重连操作,直到重连成功。

2.3 消息体定义

在我的设计中,把消息定义分为了两个部分:消息头、消息体。代码如下:

@Getter
@Setter
@ToString
public class CommonMessage {

    /**
     * 消息头
     */
    private CommonMessageHeader header;

    /**
     * 0-失败;1-成功
     */
    private Byte result;

    /**
     * 消息体
     */
    private Object body;
}

@Getter
@Setter
@ToString
public final class CommonMessageHeader {

    /**
     * 消息体的MD5摘要,用来做简单校验
     */
    private String md5;

    /**
     * 服务标识
     */
    private int severId;

    /**
     * 消息id
     */
    private long msgID;

    /**
     * 消息类型,枚举值。见MessageType
     */
    private byte type;
}

然后是消息类型:

public enum MessageType {

    /**
     * 业务请求消息
     */
    SERVICE_REQ((byte) 0),

    /**
     * 业务应答消息
     */
    SERVICE_RESP((byte) 1),

    /**
     * 无需应答的业务请求消息
     */
    SERVICE_REQ_ONE_WAY((byte) 2),

    /**
     * 心跳请求消息
     */
    HEARTBEAT_REQ((byte) 99),

    /**
     * 心跳应答消息
     */
    HEARTBEAT_RESP((byte) 100),


    ;

    private byte value;

    MessageType(byte value) {
        this.value = value;
    }

    public byte value() {
        return this.value;
    }
}

2.4 心跳机制

心跳机制我估计大家多少能理解,这个名字就起的很形象。当读或者写心跳消息发生 I/O 异常的时候,说明已经中断,此时需要立即关闭连接,如果是客户端,需要重新发起连接;如果是服务端,需要清空缓存的半包信息,等到客户端重连。

是的,两边都需要心跳检测,毕竟是【全双工】

但是心跳机制的设计,也是有点说法的。比如,什么时候需要传心跳包过去;发什么包过去。

先说发什么包过去。这个就比较简单了,正常来说发一个空包就行了,除非你有什么特别的要求。比如我就在消息定义中新增了一个类型简单标记一下而已。
在这里插入图片描述
再说,什么时候发。

方案一:最粗暴
最粗暴的当然是,TCP握手完成之后开始启动一个心跳任务,然后以固定的频率发送,不管三次二十一,我就要在存续期间一直发。这当然可以实现目标,但是,这【合李】吗?

方案二:小改进
很简单的道理啊,如果我们互相之间本身就正在进行业务上的通信,咱俩都正在【说话】呢,你还发个心跳过来问我【你死没死】啊,你礼貌吗这?所以,我们可以使用Netty提供的一个【写空闲检测】机制来完成。直接上源码给你们看:

// IdleStateHandler。一个实现了InBound和OutBound的Handler
public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {

        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
             TimeUnit.SECONDS);
    }

参数解读:

  • readerIdleTimeSeconds:当在指定的时间段内没有执行读操作时,将触发 IdleState.READER_IDLE。0表示禁用
  • writerIdleTimeSeconds:当在指定的时间段内没有执行写操作时,将触发IdleState.WRITER_IDLE。0表示禁用
  • allIdleTimeSeconds:当在指定的时间段内没有进行读写操作时,将触发IdleState.ALL_IDLE。0表示禁用

PS:检测空闲连接以及超时对于及时释放资源来说是至关重要的。这就是心跳机制做的事情。因为很重要,所以Netty也给我们预提供了这些Handler,就是上面说的IdleStateHandler

2.5 重连机制

如果链路中断,等到INTERVAL时间后,由客户端发起重连操作,如果重连失败,间隔周期INTERVAL后再次发起重连,直到重连成功。
为了保持服务端能够有充足的时间释放句柄资源,在首次断连时客户端需要等待INTERVAL时间之后再发起重连,而不是失败后立即重连。
为了保证句柄资源能够及时释放,无论什么场景下重连失败,客户端必须保证自身的资源被及时释放,包括但不限于SocketChannel、Socket 等。
重连失败后,可以打印异常堆栈信息,方便后续的问题定位。

*2.6 Handler的组织顺序

大家还记得吗?Handler出入境可以是无序的,但是,同是入境、出境的Handler之间是局部有序的。这不难理解,就跟JDK8的Stream一样,前面对流的操作会影响后面的结果。所以,顺序很重要。这边大概的模型如下:
在这里插入图片描述
我想,大家应该能理解为什么我的顺序是这样组织的吧…

  1. 写空闲监控或者性能监控放前面没毛病,正常来说这个业务不会去操作原始报文
  2. 粘包半包处理。开始对包数据做拆分了,这一步肯定要在所有需要操作【业务报文】的前面做。为啥?我都没拆包给你呢,你咋知道这个就是你要的
  3. 序列化反序列化。正常【2】之后拿到的就是【字节流业务报文】,这个时候需要先【序列化/反序列化】再【编解码】(这两步我合在一起做了)
  4. 【读空闲】即:心跳机制。 【心跳】跟【认证申请/检查】需要分客户端跟服务端。客户端都没有登录呢,没必要开启【心跳】对吧,所以客户端的【心跳】在【认证申请】之前;服务端我就不多解释了吧
  5. 【心跳请求/应答】也是心跳机制里面的东西
  6. 业务处理

大家好好思考下,理论上5/6是可以随意调换顺序的,毕竟【心跳包】是一种特殊的业务

我估计有很多人不理解【读空闲】跟【心跳请求/应答】的关系,大家可以再看看【2.4 心跳机制】的【方案二】。我说他们都是心跳机制里面的,怎么理解?

  • 读空闲:其实就是一种事件监听机制。监听Channel上的【读事件】。当事件发生的时候触发对应事件,并且往管道中传输(说到这里了,写空闲也知道了吧)
  • 心跳请求/应答:对上面说的事件的响应

2.7 交互式调试

写了通信的客户端、服务端怎么调试呢?咱也没有可视化界面,所以我就搞了一个最原始的,通过Scanner输入命令的交互式调试方案。像这样:
在这里插入图片描述
你懂我意思吧在这里插入图片描述

三、代码实现:非必要。感兴趣的自行查看

头疼。代码貌似也挺多的,本来想打包压缩包上来,然后只贴核心代码。但是我的电脑有加密软件,打包上来的代码可能会有问题。代码包结构如下:
在这里插入图片描述
红框内的pojo已经给过了,咱就不重复上代码了。如果大家要本地运行的话,可以跟我一样先创建好对应的包,然后一个一个复制上去吧。真不是我不想给源码压缩包,而是加密问题。给了你们打开也是乱码

3.1 最外层的通信入口

在这里插入图片描述
这三个类。

3.1.1 NettyRpcServer:服务端通信入口
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.server.ServerInitializer;

/**
 * RPC Server服务端
 *
 * @author zhangshen
 * @date 2023/10/25 9:16
 * @slogan 编码即学习,注释断语义
 **/
@Slf4j
public class NettyRpcServer {

    public static void main(String[] args) {
        try {
            start();
        } catch (InterruptedException e) {
            e.printStackTrace();
            log.error("【Netty服务器】启动失败");
        }
    }

    private static void start() throws InterruptedException {
        EventLoopGroup acceptLoopGroup = new NioEventLoopGroup();
        EventLoopGroup reactorLoopGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(acceptLoopGroup, reactorLoopGroup)
                .channel(NioServerSocketChannel.class)
                .localAddress(NettyConstant.PORT)
                .childHandler(new ServerInitializer());

        serverBootstrap.bind().sync();
        log.info("【Netty服务器】启动成功");
    }
}

上面这个很简单啦,跟上一篇文章的使用示例如出一辙。不同的是,在Pipeline中链化Handler的逻辑我单独抽出来写成一个ServerInitializer。后面会讲

3.3 NettyRpcClient:客户端通信入口
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.client.ClientInitializer;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.MessageType;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * RPC Client客户端
 *
 * @author zhangshen
 * @date 2023/10/25 9:16
 * @slogan 编码即学习,注释断语义
 **/
@Slf4j
public class NettyRpcClient {

    private Channel channel;
    private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    private volatile boolean userClose = false;

    /**
     * 定时线程池,用于断线重连
     */
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

    public void connect() throws InterruptedException {
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ClientInitializer());
            ChannelFuture sync = bootstrap.connect(new InetSocketAddress(NettyConstant.SERVER_IP, NettyConstant.PORT)).sync();
            log.info("【Netty客户端】启动成功");
            channel = sync.channel();
            channel.closeFuture().sync();
        } finally {
            if (userClose) {
                channel = null;
                eventLoopGroup.shutdownGracefully().sync();
            } else {
            	
            	// 断线重连
                reconect();
            }
        }
    }

    /**
     * 断线重连
     */
    private void reconect() {
        log.info("【Netty客户端】开始断线重连");
        executor.execute(() -> {
            try {

                // 给操作系统足够的时间,去释放相关的资源
                TimeUnit.SECONDS.sleep(1);
                connect();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    public void sendMessage(Object msg) {
        if (channel == null || !channel.isActive()) {
            throw new IllegalStateException("和服务器还未未建立起有效连接,请稍后再试!!");
        }

        CommonMessage message = MessageGenerateHelper.requestWithMsgId(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_REQ.value(), msg);
        log.info("【Netty客户端】发送消息。CommonMessage={}", message);
        channel.writeAndFlush(message);
    }

    public void sendOneWay(Object msg) {
        if (channel == null || !channel.isActive()) {
            throw new IllegalStateException("和服务器还未未建立起有效连接,请稍后再试!!");
        }
        CommonMessage message = MessageGenerateHelper.requestWithMsgId(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_REQ_ONE_WAY.value(), msg);
        log.info("【Netty客户端】发送消息。CommonMessage={}", message);
        channel.writeAndFlush(message);
    }

    public void close() {
        userClose = true;
        channel.close();
    }
}

在这里插入图片描述
这个相对于服务端,以及前面的使用示例的客户端确实稍微复杂一点。主要的变化是【断线重连】机制引起的:

  1. 引入了【定时线程池】,定时调用reconnect方法
  2. 新增了reconnect()方法,处理断线重连
  3. 【断线重连】还得看是不是自己主动发起的【关闭】,如果是自己主动发起的关闭肯定不能重连啊
3.3 NettyRpcClient:交互式调试
import java.util.Scanner;

/**
 * @author zhangshen
 * @date 2023/10/25 12:31
 * @slogan 编码即学习,注释断语义
 **/
public class ScannerCmdClient {
    public static void main(String[] args) throws InterruptedException {

        // 新建客户端
        NettyRpcClient client = new NettyRpcClient();

        // 显示菜单栏
        showMenu();

        Scanner scanner = new Scanner(System.in);

        while (true) {
            int cmd = scanner.nextInt();
            switch (cmd) {
                case 1:
                    client.connect();
                    Thread.sleep(3000);
                    break;
                case 2:
                    client.sendMessage("客户端发送双端信息");
                    break;
                case 3:
                    client.sendOneWay("客户端发送ONE-WAY信息");
                    break;
                case 4:
                    client.close();
                case 5:
                    showMenu();
                default:
                    client.close();
            }
        }
    }

    /**
     * 展示菜单
     */
    private static void showMenu() {
        System.out.println("请选择以下功能:");
        System.out.println("【1】与服务端建立连接");
        System.out.println("【2】发送一个有响应的消息");
        System.out.println("【3】发送一个无响应的消息");
        System.out.println("【4】关闭连接");
        System.out.println("【5】显示菜单栏");
    }
}

前面介绍过作用了,不多说了

3.2 server包下

3.2.1 ServerInitializer:服务端的Handler链化
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.codec.KryoDecodeHandler;
import org.tuling.io.rpc.common.codec.KryoEncodeHandler;
import org.tuling.io.rpc.server.handler.ServerHeartBeatHandler;
import org.tuling.io.rpc.server.handler.ServerLoginHandler;
import org.tuling.io.rpc.server.handler.ServerOrderHandler;

/**
 * 服务端,通道初始化器
 *
 * @author zhangshen
 * @date 2023/10/25 9:44
 * @slogan 编码即学习,注释断语义
 **/
public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();

        // 添加【粘包/分包】处理器。 由Netty预备提供的
        pipeline.addLast(new LengthFieldBasedFrameDecoder(
                65535, 0, 2, 0,2
        ));
        pipeline.addLast(new LengthFieldPrepender(2));

        // 添加【序列化/反序列化】处理器,开源序列化工具
        pipeline.addLast(new KryoDecodeHandler());
        pipeline.addLast(new KryoEncodeHandler());

        // 添加【心跳】处理器,Netty预备提供的
        pipeline.addLast(new ReadTimeoutHandler(NettyConstant.HEARBEAT_FREQUENCY));

        // 添加业务处理器
        pipeline.addLast(new ServerLoginHandler());
        pipeline.addLast(new ServerHeartBeatHandler());
        pipeline.addLast(new ServerOrderHandler());
    }
}
3.2.2 handler包下所有的handler

在这里插入图片描述
我就不一一说代码了,只贴。没啥难度的,最重要的还是顺序,在【2.6 Handler的顺序组织】已经解释了一波了。

ServerHeartBeatHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;
import org.tuling.io.rpc.server.helper.SecurityCenterHelper;

/**
 * 服务器心跳处理
 *
 * @author zhangshen
 * @date 2023/10/25 10:33
 * @slogan 编码即学习,注释断语义
 **/
@Slf4j
public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		CommonMessage message = (CommonMessage) msg;
		CommonMessageHeader header = message.getHeader();
		if (header == null) {
			log.error("【Netty服务器】非法消息");
			ctx.writeAndFlush("非法消息");
			ctx.close();
			ReferenceCountUtil.release(msg);
			return;
		}

		if (header.getType() != MessageType.HEARTBEAT_REQ.value()) {
			ctx.fireChannelRead(msg);
			return;
		}

		// 处理心跳业务
		log.info("【Netty服务器】心跳应答");
		CommonMessage heartBeatResponse = MessageGenerateHelper.success(-1, MessageType.HEARTBEAT_RESP.value(), null);
		ctx.writeAndFlush(heartBeatResponse);
		ReferenceCountUtil.release(msg);
    }


	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		if(cause instanceof ReadTimeoutException){
			log.debug("【Netty服务器】客户端长时间未通信,可能已经宕机,关闭链路");
			SecurityCenterHelper.removeLoginUser(ctx.channel().remoteAddress().toString());
			ctx.close();
		}
		super.exceptionCaught(ctx, cause);
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		log.debug("【Netty服务器】客户端已关闭连接");
		super.channelInactive(ctx);
	}
}

ServerLoginHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;
import org.tuling.io.rpc.server.helper.SecurityCenterHelper;

import java.net.InetSocketAddress;

/**
 * 登录服务器处理器
 *
 * @author zhangshen
 * @date 2023/10/25 10:05
 * @slogan 编码即学习,注释断语义
 **/
@Slf4j
public class ServerLoginHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CommonMessage message = (CommonMessage) msg;
        CommonMessageHeader header = message.getHeader();
        if (header == null) {
            log.error("【Netty服务器】非法消息");
            ctx.writeAndFlush("非法消息");
            ctx.close();
            ReferenceCountUtil.release(msg);
            return;
        }

        if (header.getSeverId() != NettyConstant.LOGIN_SERVER_ID) {
            ctx.fireChannelRead(msg);
            return;
        }

        // 处理登录业务
        this.checkLogin(ctx, msg);
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        // 删除缓存
        SecurityCenterHelper.removeLoginUser(ctx.channel().remoteAddress().toString());
        ctx.close();
    }

    private void checkLogin(ChannelHandlerContext ctx, Object msg) {
        log.info("【Netty服务器】登录消息CommonMessage={}", msg);

        InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        String userLoginIP = socketAddress.getAddress().getHostAddress();

        // 白名单校验
        boolean whiteIP = SecurityCenterHelper.isWhiteIP(userLoginIP);
        if (!whiteIP) {
            String errorMessage = "不在白名单内";
            log.error("【Netty服务器】{}", errorMessage);
            CommonMessage message = MessageGenerateHelper.fail(
                    NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), errorMessage);
            ctx.writeAndFlush(message);
            ctx.close();
            ReferenceCountUtil.release(msg);
            return;
        }

        // 重复登录校验
        String userInfo = ctx.channel().remoteAddress().toString();
        boolean repeatLogin = SecurityCenterHelper.isRepeatLogin(userInfo);
        if (repeatLogin) {
            String errorMessage = "重复登录";
            log.error("【Netty服务器】{}", errorMessage);
            CommonMessage message = MessageGenerateHelper.fail(
                    NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), errorMessage);
            ctx.writeAndFlush(message);
            ctx.close();
            ReferenceCountUtil.release(msg);
            return;
        }

        // 通过校验,记录
        SecurityCenterHelper.addLoginUser(userInfo);
        String successMessage = "登录成功";
        log.info("【Netty服务器】{}", successMessage);
        CommonMessage message = MessageGenerateHelper.success(
                NettyConstant.LOGIN_SERVER_ID, MessageType.SERVICE_RESP.value(), successMessage
        );
        ctx.writeAndFlush(message);
    }
}

ServerOrderHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.biz.OrderInfo;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.EncryptHelper;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;
import org.tuling.io.rpc.server.async.AsyncBusiProcessor;

import java.math.BigDecimal;

/**
 * 订单业务处理类
 *
 * @author zhangshen
 * @date 2023/10/25 10:39
 * @slogan 编码即学习,注释断语义
 **/
@Slf4j
public class ServerOrderHandler extends SimpleChannelInboundHandler<CommonMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, CommonMessage msg) throws Exception {

        // 检查MD5
        final CommonMessageHeader header = msg.getHeader();
        if (header == null) {
            log.error("【Netty服务器】非法消息");
            ctx.writeAndFlush("非法消息");
            ctx.close();
            ReferenceCountUtil.release(msg);
            return;
        }

        if (header.getSeverId() != NettyConstant.ORDER_SERVER_ID) {
            ctx.fireChannelRead(msg);
            return;
        }

        log.info("【Netty服务器】CommonMessage={}", msg);
        String headMd5 = header.getMd5();
        String calcMd5 = EncryptHelper.encryptObj(msg.getBody());
        if (!headMd5.equals(calcMd5)) {
            log.error("【Netty服务器】报文md5检查不通过:" + headMd5 + " vs " + calcMd5 + ",关闭连接");
            CommonMessage message = MessageGenerateHelper.fail(
                    NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(),"报文md5检查不通过,关闭连接"
            );
            ctx.writeAndFlush(message);
            ctx.close();
        }

        log.info(msg.toString());
        if (header.getType() == MessageType.SERVICE_REQ_ONE_WAY.value()) {
            log.debug("【Netty服务器】ONE_WAY类型消息,异步处理");
            AsyncBusiProcessor.submitTask(() -> {
                log.info("【Netty服务器】模仿异步,ONE_WEY业务处理");
            });
        } else {
            log.debug("【Netty服务器】TWO_WAY类型消息,应答");

            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setOrderId("123456");
            orderInfo.setProductCount(2);
            orderInfo.setAmount(BigDecimal.valueOf(1499.99));
            CommonMessage message = MessageGenerateHelper.success(
                    NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), orderInfo
            );

            ctx.writeAndFlush(message);
        }
    }
}

3.2.3 helper包:工具包

helper包就是utils工具包。我喜欢叫做helper而已。里面只有一个类SecurityCenterHelper。用来实现【白名单】,还有【重复登录校验】机制。

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * 登录安全校验助手
 *
 * @author zhangshen
 * @date 2023/10/25 10:14
 * @slogan 编码即学习,注释断语义
 **/
public class SecurityCenterHelper {

    /**
     * 用以检查用户是否重复登录的缓存
     */
    private static Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>();

    /**
     * 用户登录的白名单
     */
    private static Set<String> whiteList = new CopyOnWriteArraySet<>();

    static {
        whiteList.add("127.0.0.1");
    }

    /**
     * 是否白名单内
     */
    public static boolean isWhiteIP(String ip) {
        return whiteList.contains(ip);
    }

    /**
     * 给定用户信息是否重复登录
     */
    public static boolean isRepeatLogin(String usrInfo) {
        return nodeCheck.containsKey(usrInfo);
    }

    /**
     * 添加登录用户信息
     */
    public static void addLoginUser(String usrInfo) {
        nodeCheck.put(usrInfo, true);
    }

    /**
     * 移除登录用户信息
     */
    public static void removeLoginUser(String usrInfo) {
        nodeCheck.remove(usrInfo, true);
    }

}
3.2.4 async包:异步处理类

里面只有一个类AsyncBusiProcessor ,用来处理需要【异步】的任务。

import io.netty.util.NettyRuntime;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * 异步业务处理器。某些消息可以异步处理,比如ONE_WAY类型消息
 *
 * @author zhangshen
 * @date 2023/10/25 10:56
 * @slogan 编码即学习,注释断语义
 **/
@Slf4j
public class AsyncBusiProcessor {


    private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(3000);
    private static final ExecutorService executorService;


    static {
        int cores = NettyRuntime.availableProcessors();
        executorService = new ThreadPoolExecutor(
                1,
                cores,
                60,
                TimeUnit.SECONDS,
                taskQueue
        );
    }


    /**
     * 提交异步执行的任务
     *
     * @param task 任务
     */
    public static void submitTask(Runnable task) {
        executorService.execute(task);
    }
}

3.3 client包下

Client包下跟Server包下的东西其实差不多,大家自行理解

3.3.1 ClientInitializer:客户端的Handler链化
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.tuling.io.rpc.client.handler.CheckWriteIdleHandler;
import org.tuling.io.rpc.client.handler.ClientHeartBeatHandler;
import org.tuling.io.rpc.client.handler.ClientLoginHandler;
import org.tuling.io.rpc.client.handler.ClientOrderHandler;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.codec.KryoDecodeHandler;
import org.tuling.io.rpc.common.codec.KryoEncodeHandler;

/**
 * 客户端,通道初始化器
 *
 * @author zhangshen
 * @date 2023/10/25 9:44
 * @slogan 编码即学习,注释断语义
 **/
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
    
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();

        // 写空闲自己检测
        pipeline.addLast(new CheckWriteIdleHandler());

        // 添加【粘包/分包】处理器。 由Netty预备提供的
        pipeline.addLast(new LengthFieldBasedFrameDecoder(
                65535, 0, 2, 0,2
        ));
        pipeline.addLast(new LengthFieldPrepender(2));

        // 添加【序列化/反序列化】处理器,开源序列化工具
        pipeline.addLast(new KryoDecodeHandler());
        pipeline.addLast(new KryoEncodeHandler());


        // 添加登录处理器
        // 登录处理器需放在心跳前面
        pipeline.addLast(new ClientLoginHandler());

        // 添加【心跳】处理器,Netty预备提供的
        pipeline.addLast(new ReadTimeoutHandler(NettyConstant.HEARBEAT_FREQUENCY));
        pipeline.addLast(new ClientHeartBeatHandler());
        pipeline.addLast(new ClientOrderHandler());
    }
}
3.3.2 handler包下所有的handler

CheckWriteIdleHandler:客户端写空闲检测

import io.netty.handler.timeout.IdleStateHandler;

/**
 * 客户端检测自己的写空闲
 *
 * @author zhangshen
 * @date 2023/10/25 11:08
 * @slogan 编码即学习,注释断语义
 **/
public class CheckWriteIdleHandler extends IdleStateHandler {

    public CheckWriteIdleHandler() {
        super(0, 8, 0);
    }
}

ClientHeartBeatHandler:客户端心跳处理

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;

/**
 * 客户端在长久未向服务器业务请求时,发出心跳请求报文
 *
 * @author zhangshen
 * @date 2023/10/25 11:33
 * @slogan 编码即学习,注释断语义
 **/
@Slf4j
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
            CommonMessage request = MessageGenerateHelper.request(-1, MessageType.HEARTBEAT_REQ.value(), null);
            log.debug("【Netty客户端】写空闲,发出心跳报文维持连接: " + request);
            ctx.writeAndFlush(request);
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CommonMessage message = (CommonMessage) msg;
        CommonMessageHeader header = message.getHeader();
        if (header != null
                && header.getType() == MessageType.HEARTBEAT_RESP.value()
        ) {
            log.debug("【Netty客户端】收到服务器心跳应答,服务器正常");
            ReferenceCountUtil.release(msg);
        }  else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof ReadTimeoutException) {
            log.debug("【Netty客户端】服务器长时间未应答,关闭链路");
        }
        super.exceptionCaught(ctx, cause);
    }
}

ClientLoginHandler:客户端登录请求。TCP三次握手成功之后就请求

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;

/**
 * 客户端,发起登录请求
 *
 * @author zhangshen
 * @date 2023/10/25 11:11
 * @slogan 编码即学习,注释断语义
 **/
@Slf4j
public class ClientLoginHandler extends ChannelInboundHandlerAdapter {

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        // TCP三次握手完成,发出认证请求
        CommonMessage message = MessageGenerateHelper.request(NettyConstant.LOGIN_SERVER_ID, MessageType.SERVICE_REQ.value(), null);
        log.info("【Netty客户端】请求服务器认证 : " + message);
        ctx.writeAndFlush(message);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CommonMessage message = (CommonMessage) msg;
        CommonMessageHeader header = message.getHeader();
        if (header != null
                && header.getType() == MessageType.SERVICE_RESP.value()
                && header.getSeverId() == NettyConstant.LOGIN_SERVER_ID
        ) {
            log.info("【Netty客户端】收到认证应答报文,服务器是否验证通过?");
            byte loginResult = message.getResult();
            if (loginResult != 1) {

                // 握手失败,关闭连接
                log.debug("【Netty客户端】未通过认证,关闭连接: " + message);
                ctx.close();
            } else {
                log.info("【Netty客户端】通过认证,移除本处理器,进入业务通信 : " + message);
                ctx.pipeline().remove(this);
                ReferenceCountUtil.release(msg);
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
}

ClientOrderHandler:瞎写的一个业务拓展类,目前只是打印。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.pojo.CommonMessage;

/**
 * @author Mark老师
 * 类说明:接收业务应答消息并处理
 */
@Slf4j
public class ClientOrderHandler extends SimpleChannelInboundHandler<CommonMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, CommonMessage msg) throws Exception {
        log.info("【Netty客户端】业务应答消息:" + msg.toString());
        ReferenceCountUtil.release(msg);
    }
}

3.4 common包下:一些公用的定义

pojo的我就不贴了,看【2.3 消息体定义】

3.4.1 NettyConstant:一些公用常量
/**
 * 常量定义
 *
 * @author zhangshen
 * @date 2023/10/25 9:41
 * @slogan 编码即学习,注释断语义
 **/
public interface NettyConstant {

    /**
     * 程序绑定端口
     */
    int PORT = 8585;

    /**
     * 程序ip地址
     */
    String SERVER_IP = "127.0.0.1";

    /**
     * 成功
     */
    byte SUCCESS = 1;

    /**
     * 失败
     */
    byte FAIL = 0;

    /**
     * 心跳检测频率
     * 单位:秒
     */
    int HEARBEAT_FREQUENCY = 15;

    /**
     * 登录服务器标识
     */
    int LOGIN_SERVER_ID = 1;

    /**
     * 订单服务器标识
     */
    int ORDER_SERVER_ID = 2;
}
3.4.2 helper包:工具包

EncryptHelper:防篡改的加密摘要

import org.tuling.io.rpc.common.codec.KryoSerializer;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;


/**
 * 摘要的工具类
 *
 * @author zhangshen
 * @date 2023/10/25 10:47
 * @slogan 编码即学习,注释断语义
 **/
public class EncryptHelper {

    /**
     * 加密信息
     *
     * @param strSrc  需要被摘要的字符串
     * @param encName 摘要方式,有 MD5、SHA-1和SHA-256 这三种,缺省为MD5
     * @return 返回摘要字符串
     */
    private static String EncryptStr(String strSrc, String encName) {
        MessageDigest md = null;
        String strDes = null;

        byte[] bt = strSrc.getBytes();
        try {
            if (encName == null || encName.equals("")) {
                encName = "MD5";
            }
            md = MessageDigest.getInstance(encName);
            md.update(bt);
            strDes = bytes2Hex(md.digest()); // to HexString
        } catch (NoSuchAlgorithmException e) {
            System.out.println("Invalid algorithm.");
            return null;
        }
        return strDes;
    }

    /**
     * MD5摘要
     *
     * @param str 需要被摘要的字符串
     * @return 对字符串str进行MD5摘要后,将摘要字符串返回
     */
    public static String EncryptByMD5(String str) {
        return EncryptStr(str, "MD5");
    }

    /**
     * SHA1摘要
     *
     * @param str 需要被摘要的字符串
     * @return 对字符串str进行SHA-1摘要后,将摘要字符串返回
     */
    public static String EncryptBySHA1(String str) {
        return EncryptStr(str, "SHA-1");
    }

    /**
     * SHA256摘要
     *
     * @param str 需要被摘要的字符串
     * @return 对字符串str进行SHA-256摘要后,将摘要字符串返回
     */
    public static String EncryptBySHA256(String str) {
        return EncryptStr(str, "SHA-256");
    }

    /**
     * 字节转十六进制,结果以字符串形式呈现
     */
    private static String bytes2Hex(byte[] bts) {
        String des = "";
        String tmp = null;
        for (int i = 0; i < bts.length; i++) {
            tmp = (Integer.toHexString(bts[i] & 0xFF));
            if (tmp.length() == 1) {
                des += "0";
            }
            des += tmp;
        }
        return des;
    }

    /**
     * 对字符串进行MD5加盐摘要
     * 先将str进行一次MD5摘要,摘要后再取摘要后的字符串的第1、3、5个字符追加到摘要串,
     * 再拿这个摘要串再次进行摘要
     */
    public static String encrypt(String str) {
        String encryptStr = EncryptByMD5(str);
        if (encryptStr != null) {
            encryptStr = encryptStr + encryptStr.charAt(0) + encryptStr.charAt(2) + encryptStr.charAt(4);
            encryptStr = EncryptByMD5(encryptStr);
        }
        return encryptStr;
    }

    /**
     * 对对象进行MD5摘要,先对对象进行序列化,转为byte数组,
     * 再将byte数组转为字符串,然后进行MD5加盐摘要
     */
    public static String encryptObj(Object o) {
        return encrypt(bytes2Hex(KryoSerializer.obj2Bytes(o)));
    }
}

MessageGenerateHelper:消息生成工具,消除代码重复用的

import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;

import java.util.concurrent.atomic.AtomicLong;

/**
 * 消息生成助手
 *
 * @author zhangshen
 * @date 2023/10/25 11:21
 * @slogan 编码即学习,注释断语义
 **/
public class MessageGenerateHelper {


    private static AtomicLong msgId = new AtomicLong(1);

    public static long getID() {
        return msgId.getAndIncrement();
    }

    /**
     * 构建成功的业务消息
     */
    public static CommonMessage success(int serverId, byte type, Object msg) {
        CommonMessage message = new CommonMessage();
        CommonMessageHeader header = getHeader(serverId, type);
        message.setHeader(header);
        message.setResult(NettyConstant.SUCCESS);
        message.setBody(msg);
        return message;
    }

    /**
     * 构建失败的业务消息
     */
    public static CommonMessage fail(int serverId, byte type, Object msg) {
        CommonMessage message = new CommonMessage();
        CommonMessageHeader header = getHeader(serverId, type);
        message.setHeader(header);
        message.setResult(NettyConstant.FAIL);
        message.setBody(msg);
        return message;
    }

    /**
     * 构建请求业务消息
     */
    public static CommonMessage request(int serverId, byte type, Object msg) {
        CommonMessage message = new CommonMessage();
        CommonMessageHeader header = getHeader(serverId, type);
        message.setHeader(header);
        message.setBody(msg);
        return message;
    }


    /**
     * 构建请求业务消息
     */
    public static CommonMessage requestWithMsgId(int serverId, byte type, Object msg) {
        CommonMessage message = new CommonMessage();
        CommonMessageHeader header = getHeader(serverId, type);
        header.setMsgID(getID());
        header.setMd5(EncryptHelper.encryptObj(msg));
        message.setHeader(header);
        message.setBody(msg);
        return message;
    }

    private static CommonMessageHeader getHeader(int serverId, byte type) {
        CommonMessageHeader header = new CommonMessageHeader();
        header.setSeverId(serverId);
        header.setType(type);
        return header;
    }
}
3.4.3 codec包:编解码(反)序列化工具

这里面的是一个基于Kryo编解码序列化API实现的。pom.xml如下:

        <dependency>
            <groupId>de.javakaffee</groupId>
            <artifactId>kryo-serializers</artifactId>
            <version>0.42</version>
        </dependency>

当然大家可以使用其他的API,我这里是抄的。

KryoFactory:Kryo实例,API要求的

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import de.javakaffee.kryoserializers.*;

import java.lang.reflect.InvocationHandler;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

/**
 * Kryo的工厂,拿到Kryo的实例
 *
 * @author zhanghuitong
 * @date 2023/10/25 20:12
 * @slogan 编码即学习,注释断语义
 **/
public class KryoFactory {

    public static Kryo createKryo() {

        Kryo kryo = new Kryo();
        kryo.setRegistrationRequired(false);
        kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
        kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());
        kryo.register(InvocationHandler.class, new JdkProxySerializer());
        kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());
        kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
        kryo.register(Pattern.class, new RegexSerializer());
        kryo.register(BitSet.class, new BitSetSerializer());
        kryo.register(URI.class, new URISerializer());
        kryo.register(UUID.class, new UUIDSerializer());
        UnmodifiableCollectionsSerializer.registerSerializers(kryo);
        SynchronizedCollectionsSerializer.registerSerializers(kryo);

        kryo.register(HashMap.class);
        kryo.register(ArrayList.class);
        kryo.register(LinkedList.class);
        kryo.register(HashSet.class);
        kryo.register(TreeSet.class);
        kryo.register(Hashtable.class);
        kryo.register(Date.class);
        kryo.register(Calendar.class);
        kryo.register(ConcurrentHashMap.class);
        kryo.register(SimpleDateFormat.class);
        kryo.register(GregorianCalendar.class);
        kryo.register(Vector.class);
        kryo.register(BitSet.class);
        kryo.register(StringBuffer.class);
        kryo.register(StringBuilder.class);
        kryo.register(Object.class);
        kryo.register(Object[].class);
        kryo.register(String[].class);
        kryo.register(byte[].class);
        kryo.register(char[].class);
        kryo.register(int[].class);
        kryo.register(float[].class);
        kryo.register(double[].class);

        return kryo;
    }
}

KryoSerializer序列化工具:

/**
 * Kryo的序列化器,负责序列化和反序列化
 *
 * @author zhanghuitong
 * @date 2023/10/25 20:12
 * @slogan 编码即学习,注释断语义
 **/
public class KryoSerializer {
    private static Kryo kryo = KryoFactory.createKryo();

    /*序列化*/
    public static void serialize(Object object, ByteBuf out) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Output output = new Output(baos);
        kryo.writeClassAndObject(output, object);
        output.flush();
        output.close();

        byte[] b = baos.toByteArray();
        try {
            baos.flush();
            baos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        out.writeBytes(b);
    }

    /*序列化为一个字节数组,主要用在消息摘要上*/
    public static byte[] obj2Bytes(Object object) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Output output = new Output(baos);
        kryo.writeClassAndObject(output, object);
        output.flush();
        output.close();

        byte[] b = baos.toByteArray();
        try {
            baos.flush();
            baos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return b;
    }

    /*反序列化*/
    public static Object deserialize(ByteBuf out) {
        if (out == null) {
            return null;
        }
        Input input = new Input(new ByteBufInputStream(out));
        return kryo.readClassAndObject(input);
    }
}

KryoEncodeHandler:编码处理器,实现了NettyMessageToByteEncoder接口的Handler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.tuling.io.rpc.common.pojo.CommonMessage;

/**
 * 序列化的Handler
 *
 * @author zhangshen
 * @date 2023/10/25 9:54
 * @slogan 编码即学习,注释断语义
 **/
public class KryoEncodeHandler extends MessageToByteEncoder<CommonMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, CommonMessage message, ByteBuf out) throws Exception {
        KryoSerializer.serialize(message, out);
        ctx.flush();
    }
}

KryoDecodeHandler:解码处理器,实现了NettyMessageToByteEncoder接口的Handler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 反序列化的Handler
 *
 * @author zhangshen
 * @date 2023/10/25 9:54
 * @slogan 编码即学习,注释断语义
 **/
public class KryoDecodeHandler extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object obj = KryoSerializer.deserialize(in);
        out.add(obj);
    }
}

3.4 biz包下:业务模拟

下面只有一个类,OrderInfo

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.math.BigDecimal;

/**
 * 订单信息
 *
 * @author zhangshen
 * @date 2023/10/25 10:44
 * @slogan 编码即学习,注释断语义
 **/
@Getter
@Setter
@ToString
public class OrderInfo {

    private String orderId;
    private Integer productCount;
    private BigDecimal amount;
}

四、业务流程图

在这里插入图片描述

学习总结

  1. 使用Netty写了一个简单的通信示例

感谢

感谢【百度:文心一言】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/104125.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

适用于 Windows 10 和 Windows 11 设备的笔记本电脑管理软件

便携式计算机管理软件使 IT 管理员能够简化企业中使用的便携式计算机的部署和管理&#xff0c;当今大多数员工使用Windows 笔记本电脑作为他们的主要工作机器&#xff0c;他们确实已成为几乎每个组织不可或缺的一部分。由于与台式机相比&#xff0c;笔记本电脑足够便携&#xf…

ModbusTCP 转 Profinet 主站网关控制汇川伺服驱动器配置案例

ModbusTCP Client 通过 ModbusTCP 控制 Profinet 接口设备&#xff0c;Profinet 接口设备接入 DCS/工控机等 兴达易控ModbusTCP转Profinet主站网关&#xff08;XD-ETHPNM20&#xff09;采用数据映射方式进行工作。 使用设备&#xff1a;兴达易控ModbusTCP 转 Profinet 主站网关…

图论05-【无权无向】-图的广度优先BFS遍历-路径问题/检测环/二分图/最短路径问题

文章目录 1. 代码仓库2. 单源路径2.1 思路2.2 主要代码 3. 所有点对路径3.1 思路3.2 主要代码 4. 联通分量5. 环检测5.1 思路5.2 主要代码 6. 二分图检测6.1 思路6.2 主要代码6.2.1 遍历每个联通分量6.2.2 判断相邻两点的颜色是否一致 7. 最短路径问题7.1 思路7.2 代码 1. 代码…

kafka3.X集群安装(不使用zookeeper)

参考: 【kafka专栏】不用zookeeper怎么安装kafka集群-最新kafka3.0版本 一、kafka集群实例角色规划 在本专栏的之前的一篇文章《kafka3种zk的替代方案》已经为大家介绍过在kafka3.0种已经可以将zookeeper去掉。 上图中黑色代表broker&#xff08;消息代理服务&#xff09;&…

IMU预积分的过程详解

一、IMU和相机数据融合保证位姿的有效性&#xff1a; 当运动过快时&#xff0c;相机会出现运动模糊&#xff0c;或者两帧之间重叠区域太少以至于无法进行特征匹配&#xff0c;所以纯视觉SLAM对快速的运动很敏感。而有了IMU&#xff0c;即使在相机数据无效的那段时间内&#xff…

分类预测 | MATLAB实现SSA-CNN-BiGRU-Attention数据分类预测(SE注意力机制)

分类预测 | MATLAB实现SSA-CNN-BiGRU-Attention数据分类预测&#xff08;SE注意力机制&#xff09; 目录 分类预测 | MATLAB实现SSA-CNN-BiGRU-Attention数据分类预测&#xff08;SE注意力机制&#xff09;分类效果基本描述模型描述程序设计参考资料 分类效果 基本描述 1.MATLA…

Android Studio错误修复Connect to repo.maven.apache.org:443

环境 名称版本操作系统Windows10(64位)AndroidStudio2022.3.1 Patch 2 前言 最近更新了AndroidStudio编写程序的时候发现gradle时老是报read time out错误提示 分析 当出现这个警告时&#xff0c;你应该猜到这是一个连接不上的问题(Connect to repo.maven.apache.org:443)&…

kafka3.X基本概念和使用

参考: 【kafka专栏】不用zookeeper怎么安装kafka集群-最新kafka3.0版本 一、kafka集群实例角色规划 在本专栏的之前的一篇文章《kafka3种zk的替代方案》已经为大家介绍过在kafka3.0种已经可以将zookeeper去掉。 上图中黑色代表broker&#xff08;消息代理服务&#xff09;&…

ubuntu 中使用Qt连接MMSQl,报错libqsqlodbc.so: undefined symbol: SQLAllocHandle

Qt4.8.7的源码编译出来的libqsqlodbc.so&#xff0c;在使用时报错libqsqlodbc.so: undefined symbol: SQLAllocHandle&#xff0c;需要在编译libqsqlodbc.so 的项目pro文件加上LIBS -L/usr/local/lib -lodbc。 这里的路径根据自己的实际情况填写。 编辑&#xff1a; 使用uni…

CAP定理下:Zookeeper、Eureka、Nacos简单分析

CAP定理下&#xff1a;Zookeeper、Eureka、Nacos简单分析 CAP定理 C: 一致性&#xff08;Consistency&#xff09;&#xff1a;写操作之后的读操作也需要读到之前的 A: 可用性&#xff08;Availability&#xff09;&#xff1a;收到用户请求&#xff0c;服务器就必须给出响应 P…

SQL sever中函数(2)

目录 一、函数分类及应用 1.1标量函数&#xff08;Scalar Functions&#xff09;&#xff1a; 1.1.1格式 1.1.2示例 1.1.3作用 1.2表值函数&#xff08;Table-Valued Functions&#xff09;&#xff1a; 1.2.1内联表值函数&#xff08;Inline Table-Valued Functions&am…

Spark_SQL函数定义(定义UDF函数、使用窗口函数)

一、UDF函数定义 &#xff08;1&#xff09;函数定义 &#xff08;2&#xff09;Spark支持定义函数 &#xff08;3&#xff09;定义UDF函数 &#xff08;4&#xff09;定义返回Array类型的UDF &#xff08;5&#xff09;定义返回字典类型的UDF 二、窗口函数 &#xff08;1&…

基于数字电路交通灯信号灯控制系统设计-单片机设计

**单片机设计介绍&#xff0c;1617基于数字电路交通灯信号灯控制系统设计&#xff08;仿真电路&#xff0c;论文报告 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序文档 六、 文章目录 一 概要 交通灯控制系统在城市交通控制中发挥着重要的作用&#xf…

Unsatisfied dependency expressed through bean property ‘sqlSessionTemplate‘;

代码没有问题&#xff0c;但是启动运行报错 2023-10-25 16:59:38.165 INFO 228964 --- [ main] c.h.h.HailiaowenanApplication : Starting HailiaowenanApplication on ganluhua with PID 228964 (D:\ganluhua\code\java\hailiao-java\target\classes …

【CSS】伪类和伪元素

伪类 :hover&#xff1a;悬停active&#xff1a;激活focus&#xff1a;获取焦点:link&#xff1a;未访问&#xff08;链接&#xff09;:checked&#xff1a;勾选&#xff08;表单&#xff09;first-child&#xff1a;第一个子元素nth-child()&#xff1a;指定索引的子元素&…

电脑软件:推荐一款非常强大的pdf阅读编辑软件

目录 一、软件简介 二、功能介绍 1、界面美观&#xff0c;打开速度快 2、可直接编辑pdf 3、非常强大好用的注释功能 4、很好用的页面组织和提取功能 5、PDF转word效果非常棒 6、强大的OCR功能 三、软件特色 四、软件下载 pdf是日常办公非常常见的文档格式&#xff0c;…

MOS管特性及其几种常用驱动电路详解,电子工程师手把手教你

在电子工程中&#xff0c;MOS管&#xff08;金属氧化物半导体场效应管&#xff09;是一种非常重要的半导体元件。 在这篇文章中&#xff0c;我们将深入探讨MOS管的特性&#xff0c;以及几种常用的驱动电路的工作原理和设计方法。无论你是初学者还是经验丰富的电子工程师&#…

Unity - 导出的FBX模型,无法将 vector4 保存在 uv 中(使用 Unity Mesh 保存即可)

文章目录 目的问题解决方案验证保存为 Unity Mesh 结果 - OK保存为 *.obj 文件结果 - not OK&#xff0c;但是可以 DIY importer注意References 目的 备忘&#xff0c;便于日后自己索引 问题 为了学习了解大厂项目的效果&#xff1a; 上周为了将 王者荣耀的 杨玉环 的某个皮肤…

音频类型识别方案-audioset_tagging

audioset_tagging github上开源的音频识别模型&#xff0c;可以识别音频文件的类型并打分给出标签占比&#xff0c;如图 echo off set CHECKPOINT_PATH"module/Cnn14_mAP0.431.pth" set MODEL_TYPE"Cnn14" set CUDA_VISIBLE_DEVICES0 python pytorch\in…

ROS笔记之visualization_msgs-Marker学习

ROS笔记之visualization_msgs-Marker学习 code review! 文章目录 ROS笔记之visualization_msgs-Marker学习一.line_strip例程二.line_list例程一二.line_list例程二二.TEXT_VIEW_FACING例程三.附CMakeLists.txt和package.xml五.关于odom、base_link和map坐标系六.关于visualiz…
最新文章