Java IO流之Netty实现聊天通信功能

文章目录

  • 1 Netty
    • 1.1 概要设计
      • 1.1.1 技术选型
      • 1.1.2 数据库设计
      • 1.1.3 通信设计
        • 1.1.3.1 报文协议格式
        • 1.1.3.2 报文交互场景
    • 1.2 Netty简单示例
      • 1.2.1 pom.xml
      • 1.2.2 发送和接收
      • 1.2.3 示例说明
        • 1.2.3.1 线程阻塞问题
        • 1.2.3.2 服务端和接收端 EventLoopGroup
    • 1.3 Netty中handler概述
    • 1.4 聊天服务端
      • 1.4.1 示例
      • 1.4.2 示例代码解释
      • 1.4.3 各种出入站handler详解
        • 1.4.3.1 IdleStateHandler
        • 1.4.3.2 HeartBeatHandler
        • 1.4.3.3 StringLengthFieldDecoder
        • 1.4.3.4 StringDecoder
        • 1.4.3.5 JsonDecoder
        • 1.4.3.6 BussMessageHandler
        • 1.4.3.7 JsonEncoder
        • 1.4.3.8 SessionManager
    • 1.5 聊天客户端
      • 1.5.1 示例
      • 1.5.2 ServerBootstrap和Bootstrap区别
      • 1.5.3 示例代码解释
      • 1.5.4 BussMessageHandler

1 Netty

学习此篇 可以先学习下 IO流之IO,NIO和AIO讲解

1.1 概要设计

1.1.1 技术选型

聊天服务端
聊天服务器与客户端通过TCP协议进行通信,使用长连接、全双工通信模式,基于经典通信框架Netty实现。

那么什么是长连接

顾名思义,客户端和服务器连上后,会在这条连接上面反复收发消息,连接不会断开。与长连接对应的当然就是短连接了,短连接每次发消息之前都需要先建立连接,然后发消息,最后断开连接。显然,即时聊天适合使用长连接。

那么什么又是全双工

当长连接建立起来后,在这条连接上既有上行的数据,又有下行的数据,这就叫全双工

Web管理控制台
Web管理端使用SpringBoot脚手架,前端使用Layuimini,后端使用SpringMVC+Jpa+Shiro

聊天客户端
使用SpringBoot+JavaFX,做了一个极其简陋的客户端,JavaFX是一个开发Java桌面程序的框架

1.1.2 数据库设计

我们只简单用到一张用户表:

CREATE TABLE `sys_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_name` varchar(64) DEFAULT NULL COMMENT '用户名:登陆账号',
  `pass_word` varchar(128) DEFAULT NULL COMMENT '密码',
  `name` varchar(16) DEFAULT NULL COMMENT '昵称',
  `sex` char(1) DEFAULT NULL COMMENT '性别:1-男,2女',
  `status` bit(1) DEFAULT NULL COMMENT '用户状态:1-有效,0-无效',
  `online` bit(1) DEFAULT NULL COMMENT '在线状态:1-在线,0-离线',
  `salt` varchar(128) DEFAULT NULL COMMENT '密码盐值',
  `admin` bit(1) DEFAULT NULL COMMENT '是否管理员(只有管理员才能登录Web端):1-是,0-否',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

这张表都在什么时候用到?

  • Web管理端登陆的时候;
  • 聊天客户端将登陆请求发送到聊天服务端时,聊天服务端进行用户认证;
  • 聊天客户端的好友列表加载。

1.1.3 通信设计

1.1.3.1 报文协议格式

在这里插入图片描述

报文格式设计目的:

  • 粘包问题TCP长连接中,粘包是第一个需要解决的问题。通俗的讲,粘包的意思是消息接收方往往收到的不是整个报文,有时候比整个多一点,有时候比整个少一点,这样就导致接收方无法解析这个报文。那么上图中的头8个字节就为了解决这个问题,接收方根据头8个字节标识的长度来获取到“整个”报文,从而进行正常的业务处理;
  • 2字节报文类型,为了方便解析报文而设计。根据这两个字节将后面的json转成相应的实体以便进行后续处理;
  • 变长报文体实际上就是json格式的串,当然,也可以自己设计报文格式
  • 还可以把报文设计的更复杂、更专业,比如加密、加签名等。
1.1.3.2 报文交互场景

登陆
图片
发送消息-成功
图片
发送消息-目标客户端不在线
图片
发送消息-目标客户端在线,但消息转发失败
图片

1.2 Netty简单示例

1.2.1 pom.xml

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.70.Final</version> <!-- 或者是最新版本 -->
</dependency>

1.2.2 发送和接收

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class TempNetty {
    private int port = 8011;
    public void server()  {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new ServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, port)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture future = bootstrap.bind(8011).sync();
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    private class ServerHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println("Server received: " + msg);
            ctx.writeAndFlush("Server response: " + msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

    public void client() {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect("127.0.0.1", port).sync();
            future.channel().writeAndFlush("Hello, Server!");
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    private class ClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println("Client received: " + msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("Client active, sending message...");
            super.channelActive(ctx);
        }
    }

    public static void main(String[] args) throws Exception{
        TempNetty temp = new TempNetty();
        new Thread(temp::server).start(); // 启动服务器
        new Thread(temp::client).start(); // 启动客户端
    }
}

1.2.3 示例说明

1.2.3.1 线程阻塞问题

由于 上面示例是 把 serverclient 放到一个 类中 会导致 接收不到信息会导致线程问题

线程阻塞:确保服务器和客户端的启动是在不同的线程中进行的,以避免线程阻塞。如果在同一个线程中启动服务器和客户端,可能会导致服务器在接收消息时被阻塞,无法同时发送消息。因此需要用多线程开启异步

1.2.3.2 服务端和接收端 EventLoopGroup

Netty 中,EventLoopGroup 是一个处理 I/O 操作的多线程事件循环,用于接收进来的连接、接受数据、写数据等。当使用 Netty 创建服务器时,通常会创建两个 EventLoopGroup,一个用于接收客户端的连接(通常称为 boss group),另一个用于处理已被接受的连接上的 I/O 操作(通常称为 worker group)。

  • bossGroup:主要负责接收客户端的连接
    当一个新的连接到达时,boss threadboss group 中的线程)会处理这个连接,完成 TCP 的三次握手,然后这个连接就被注册到 workerGroup 中的一个 EventLoop
    boss thread 之后不再参与这个连接上的任何 I/O 操作;它只负责接收新的连接。
  • workerGroup:负责处理所有已被 bossGroup 接受的新连接的 I/O 操作。
    一旦连接被接受并注册到 workerGroup 的某个 EventLoop,该 EventLoop 就负责该连接上的所有 I/O 操作,包括读取数据和写入数据。

ServerBootstrap,它是 Netty 中用于设置服务器端的引导类。使用 ServerBootstrap,可以配置服务器端的参数,比如 EventLoopGroup、Channel 类型、ChannelHandler 等。客户端通常使用的是 Bootstrap 类来配置客户端端的参数
客户端的 Bootstrap 用于初始化客户端连接。客户端通过它配置连接服务器的参数,如要连接的服务器地址和端口、使用的 EventLoopGroupChannel 类型以及 ChannelHandler 等。

在客户端,通常只需要一个 EventLoopGroup 来处理所有的 I/O 操作,包括连接远程服务器、发送和接收数据等。这个 EventLoopGroup 中的 EventLoop 负责处理所有的事件,包括连接、读取、写入等。客户端通常只需要单个 EventLoopGroup 来管理所有的连接和操作,因为客户端的连接数量通常较少,而且对连接的管理相对简单

1.3 Netty中handler概述

点击了解 IO流之IO,NIO和AIO讲解
Netty 是一个相当优秀的通信框架,大多数的顶级开源框架中都有Netty的身影。

应用过程中,它最核心的东西叫handler,我们可以简单理解它为消息处理器。收到的消息和出去的消息都会经过一系列的handler加工处理。收到的消息我们叫它入站消息,发出去的消息我们叫它出站消息,因此handler又分为出站handler入站handler。收到的消息只会被入站handler处理,发出去的消息只会被出站handler处理。

举个例子,我们从网络上收到的消息是二进制的字节码,我们的目标是将消息转换成java bean,这样方便我们程序处理,针对这个场景设计这么几个入handler:

  • 将字节转换成String的handler
  • String转成java beanhandler
  • java bean进行业务处理的handler

发出去的消息呢,设计这么几个出站handler:

  • java bean 转成String的handler;
  • String转成byte的handler。

接下来再说一下Netty的异步。异步的意思是当做完一个操作后,不会立马得到操作结果,而是有结果后Netty会通知你。通过下面的一段代码来说明:

channel.writeAndFlush(sendMsgRequest).addListener(new GenericFutureListener<Future<? super Void>>() {
           @Override
           public void operationComplete(Future<? super Void> future) throws Exception {
               if (future.isSuccess()){
                   logger.info("消息发送成功:{}",sendMsgRequest);
               }else {
                   logger.info("消息发送失败:{}",sendMsgRequest);
               }
           }
       });

上面的writeAndFlush操作无法立即返回结果,如果关注结果,那么添加一个listener,有结果后会在listener中响应。

1.4 聊天服务端

1.4.1 示例

首先看主入口的代码

public void start(){
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //心跳
                        ch.pipeline().addLast(new IdleStateHandler(25, 20, 0, TimeUnit.SECONDS));
                        //收整包
                        ch.pipeline().addLast(new StringLengthFieldDecoder());
                        //转字符串
                        ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                        //json转对象
                        ch.pipeline().addLast(new JsonDecoder());
                        //心跳
                        ch.pipeline().addLast(new HeartBeatHandler());
                        //实体转json
                        ch.pipeline().addLast(new JsonEncoder());
                        //消息处理
                        ch.pipeline().addLast(bussMessageHandler);
                    }
                });
        try {
            ChannelFuture f = serverBootstrap.bind(port).sync();
            f.channel().closeFuture().sync();
        }catch (InterruptedException e) {
            logger.error("服务启动失败:{}", ExceptionUtils.getStackTrace(e));
        }finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }

1.4.2 示例代码解释

EventLoopGroupServerBootstrapNetty框架中的两个关键类,用于构建和管理网络应用程序的事件处理和服务启动。

  • EventLoopGroup
    EventLoopGroup是一个事件循环组,用于管理和调度事件循环(EventLoop)。
    Netty中,事件循环是一个处理I/O操作和触发事件的单线程执行器,它负责处理连接、读取、写入等事件。
    EventLoopGroup可以包含一个或多个事件循环,通常用于处理不同类型的事件,例如接受连接的事件循环、处理读写事件的事件循环等。
    通过EventLoopGroupNetty可以实现高效的事件处理和多路复用。
  • ServerBootstrap
    ServerBootstrapNetty中用于创建和配置服务器的启动类。
    它提供了一组方法来配置服务器的各种参数,例如指定EventLoopGroup、设置服务器通道类型、添加处理器(Handler)等。
    使用ServerBootstrap可以方便地创建一个服务器,并配置其各种属性,例如绑定端口、设置TCP参数、添加SSL支持等。
    通过ServerBootstrap,可以将各种网络组件(如ChannelEventLoopGroup等)组合在一起,从而构建出一个完整的网络应用程序服务器。
  • 如果 nio 与Spring Boot整合还需要 ServerBootstrap 吗
    如果使用Spring Boot来构建应用程序,Spring Boot通常会封装对Netty或其他服务器框架的集成,使得在创建和配置服务器时不需要直接使用ServerBootstrap类。
    通常情况下,在将Spring BootNetty进行整合时,不需要直接使用ServerBootstrap类。相反,可以利用Spring Boot提供的自动配置和集成功能来创建和配置服务器。Spring BootNetty提供了一些自定义的starter,例如spring-boot-starter-webflux,它使用了Netty作为默认的服务器引擎。

1.4.3 各种出入站handler详解

下面我们着重看initChannel方法里面的代码。这里面就是上面讲到的各种handler,我们下面挨个讲这些handler都是干啥的。

1.4.3.1 IdleStateHandler

IdleStateHandler:这个是Netty内置的一个handler,既是出站handler又是入站handler。它的作用一般是用来实现心跳监测。所谓心跳,就是客户端和服务端建立连接后,服务端要实时监控客户端的健康状态,如果客户端挂了,服务端及时释放相应的资源,以及做出其他处理比如通知运维。所以在我们的场景中,客户端需要定时上报自己的心跳,如果服务端检测到一段时间内没收到客户端上报的心跳,那么及时做出处理,我们这里就是简单的将其连接断开,并修改数据库中相应账户的在线状态。
第一个参数叫读超时时间,第二个参数叫写超时时间,第三个参数叫读写超时时间,第四个参数是时间单位秒
这个handler表达的意思是当25秒内没读到客户端的消息,或者20秒内没往客户端发消息,就会产生一个超时事件。那么这个超时事件我们该对他做什么处理呢,请看下一条。

1.4.3.2 HeartBeatHandler

HeartBeatHandler:当发生超时事件时,HeartBeatHandler会收到这个事件,并对它做出处理:第一将链接断开;第二讲数据库中相应的账户更新为不在线状态。

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    private static Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state() == IdleState.READER_IDLE) {
                //读超时,应将连接断掉
                InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
                String ip = socketAddress.getAddress().getHostAddress();
                ctx.channel().disconnect();
                logger.info("【{}】连接超时,断开",ip);
                String userName = SessionManager.removeSession(ctx.channel());
                SpringContextUtil.getBean(UserService.class).updateOnlineStatus(userName,Boolean.FALSE);
            }else {
                super.userEventTriggered(ctx, evt);
            }
        }else {
            super.userEventTriggered(ctx, evt);
        }
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HeartBeat){
            //收到心跳包,不处理
            logger.info("server收到心跳包:{}",msg);
            return;
        }
        super.channelRead(ctx, msg);
    }
}
1.4.3.3 StringLengthFieldDecoder

StringLengthFieldDecoder:这是个入站handler,作用就是解决上面提到的粘包问题:

public class StringLengthFieldDecoder extends LengthFieldBasedFrameDecoder {
    public StringLengthFieldDecoder() {
        super(10*1024*1024,0,8,0,8);
    }
    @Override
    protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
        buf = buf.order(order);
        byte[] lenByte = new byte[length];
        buf.getBytes(offset, lenByte);
        String lenStr = new String(lenByte);
        Long len =  Long.valueOf(lenStr);
        return len;
    }
}

只需要集成Netty提供的LengthFieldBasedFrameDecoder 类,并重写getUnadjustedFrameLength方法即可。
首先看构造方法中的5个参数。
第一个表示能处理的包的最大长度;第二三个参数应该结合起来理解,表示长度字段从第几位开始,长度的长度是多少,也就是上面报文格式协议中的头8个字节;
第四个参数表示长度是否需要校正,举例理解,比如头8个字节解析出来的长度=包体长度+头8个字节的长度,那么这里就需要校正8个字节,我们的协议中长度只包含报文体,因此这个参数填0;
最后一个参数,表示接收到的报文是否要跳过一些字节,本例中设置为8,表示跳过头8个字节,因此经过这个handler后,我们收到的数据就只有报文本身了,不再包含8个长度字节了。
再看getUnadjustedFrameLength方法,其实就是将头8个字符串型的长度为转换成long型。重写完这个方法后,Netty就知道如何收一个完整的数据包了。

1.4.3.4 StringDecoder

StringDecoder:这个是Netty自带的入站handler,会将字节流以指定的编码解析成String

1.4.3.5 JsonDecoder

JsonDecoder:是自定义的一个入站handler,目的是将json String转换成java bean,以方便后续处理:

public class JsonDecoder extends MessageToMessageDecoder<String> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, String o, List<Object> list) throws Exception {
        Message msg = MessageEnDeCoder.decode(o);
        list.add(msg);
    }

}
这里会调用我们自定义的一个编解码帮助类进行转换:
public static Message decode(String message){
        if (StringUtils.isEmpty(message) || message.length() < 2){
            return null;
        }
        String type = message.substring(0,2);
        message = message.substring(2);
        if (type.equals(LoginRequest)){
            return JsonUtil.jsonToObject(message,LoginRequest.class);
        }else if (type.equals(LoginResponse)){
            return JsonUtil.jsonToObject(message,LoginResponse.class);
        }else if (type.equals(LogoutRequest)){
            return JsonUtil.jsonToObject(message,LogoutRequest.class);
        }else if (type.equals(LogoutResponse)){
            return JsonUtil.jsonToObject(message,LogoutResponse.class);
        }else if (type.equals(SendMsgRequest)){
            return JsonUtil.jsonToObject(message,SendMsgRequest.class);
        }else if (type.equals(SendMsgResponse)){
            return JsonUtil.jsonToObject(message,SendMsgResponse.class);
        }else if (type.equals(HeartBeat)){
            return JsonUtil.jsonToObject(message,HeartBeat.class);
        }
        return null;
    }
1.4.3.6 BussMessageHandler

BussMessageHandler:先看这个入站handler,是我们的一个业务处理主入口,主要工作就是将消息分发给线程池去处理,另外还负责一个小场景,当客户端主动断开时,需要将相应的账户数据库中状态更新为不在线。

public class BussMessageHandler extends ChannelInboundHandlerAdapter {
    private static Logger logger = LoggerFactory.getLogger(BussMessageHandler.class);

    @Autowired
    private TaskDispatcher taskDispatcher;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        logger.info("收到消息:{}",msg);
        if (msg instanceof Message){
            taskDispatcher.submit(ctx.channel(),(Message)msg);
        }
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //客户端连接断开
        InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
        ctx.channel().disconnect();
        String ip = socketAddress.getAddress().getHostAddress();        
        logger.info("客户端断开:{}",ip);
        String userName = SessionManager.removeSession(ctx.channel());
        SpringContextUtil.getBean(UserService.class).updateOnlineStatus(userName,Boolean.FALSE);
        super.channelInactive(ctx);
    }
}

接下来还差线程池的处理逻辑,也非常简单,就是将任务封装成executor然后交给线程池处理:

public class TaskDispatcher {
    private ThreadPoolExecutor threadPool;

    public TaskDispatcher(){
        int corePoolSize = 15;
        int maxPoolSize = 50;
        int keepAliveSeconds = 30;
        int queueCapacity = 1024;
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueCapacity);
        this.threadPool = new ThreadPoolExecutor(
                corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS,
                queue);
    }

    public void submit(Channel channel, Message msg){
        ExecutorBase executor = null;
        String messageType = msg.getMessageType();
        if (messageType.equals(MessageEnDeCoder.LoginRequest)){
            executor = new LoginExecutor(channel,msg);
        }
        if (messageType.equalsIgnoreCase(MessageEnDeCoder.SendMsgRequest)){
            executor = new SendMsgExecutor(channel,msg);
        }
        if (executor != null){
            this.threadPool.submit(executor);
        }
    }
}

接下来看一下消息转发executor是怎么做的:

public class SendMsgExecutor extends ExecutorBase {
    private static Logger logger = LoggerFactory.getLogger(SendMsgExecutor.class);

    public SendMsgExecutor(Channel channel, Message message) {
        super(channel, message);
    }
    @Override
    public void run() {
        SendMsgResponse response = new SendMsgResponse();
        response.setMessageType(MessageEnDeCoder.SendMsgResponse);
        response.setTime(new Date());
        SendMsgRequest request = (SendMsgRequest)message;
        String recvUserName = request.getRecvUserName();
        String sendContent = request.getSendMessage();
        Channel recvChannel = SessionManager.getSession(recvUserName);
        if (recvChannel != null){
            SendMsgRequest sendMsgRequest = new SendMsgRequest();
            sendMsgRequest.setTime(new Date());
            sendMsgRequest.setMessageType(MessageEnDeCoder.SendMsgRequest);
            sendMsgRequest.setRecvUserName(recvUserName);
            sendMsgRequest.setSendMessage(sendContent);
            sendMsgRequest.setSendUserName(request.getSendUserName());
            recvChannel.writeAndFlush(sendMsgRequest).addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()){
                        logger.info("消息转发成功:{}",sendMsgRequest);
                        response.setResultCode("0000");
                        response.setResultMessage(String.format("发给用户[%s]消息成功",recvUserName));
                        channel.writeAndFlush(response);
                    }else {
                        logger.error(ExceptionUtils.getStackTrace(future.cause()));
                        logger.info("消息转发失败:{}",sendMsgRequest);
                        response.setResultCode("9999");
                        response.setResultMessage(String.format("发给用户[%s]消息失败",recvUserName));
                        channel.writeAndFlush(response);
                    }
                }
            });
        }else {
            logger.info("用户{}不在线,消息转发失败",recvUserName);
            response.setResultCode("9999");
            response.setResultMessage(String.format("用户[%s]不在线",recvUserName));
            channel.writeAndFlush(response);
        }
    }
}

整体逻辑:一获取要把消息发给那个账号;二获取该账号对应的连接;三在此连接上发送消息;四获取消息发送结果,将结果发给消息“发起者”。

下面是登陆处理的executor:

public class LoginExecutor extends ExecutorBase {
    private static Logger logger = LoggerFactory.getLogger(LoginExecutor.class);

    public LoginExecutor(Channel channel, Message message) {
        super(channel, message);
    }
    @Override
    public void run() {
        LoginRequest request = (LoginRequest)message;
        String userName = request.getUserName();
        String password = request.getPassword();
        UserService userService = SpringContextUtil.getBean(UserService.class);
        boolean check = userService.checkLogin(userName,password);
        LoginResponse response = new LoginResponse();
        response.setUserName(userName);
        response.setMessageType(MessageEnDeCoder.LoginResponse);
        response.setTime(new Date());
        response.setResultCode(check?"0000":"9999");
        response.setResultMessage(check?"登陆成功":"登陆失败,用户名或密码错");
        if (check){
            userService.updateOnlineStatus(userName,Boolean.TRUE);
            SessionManager.addSession(userName,channel);
        }
        channel.writeAndFlush(response).addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                //登陆失败,断开连接
                if (!check){
                    logger.info("用户{}登陆失败,断开连接",((LoginRequest) message).getUserName());
                    channel.disconnect();
                }
            }
        });
    }
}

登陆逻辑也不复杂,登陆成功则更新用户在线状态,并且无论登陆成功还是失败,都会返一个登陆应答。同时,如果登陆校验失败,在返回应答成功后,需要将链接断开。

1.4.3.7 JsonEncoder

JsonEncoder:最后看这个唯一的出站handler,服务端发出去的消息都会被出站handler处理,职责就是将java bean转成我们之前定义的报文协议格式:

public class JsonEncoder extends MessageToByteEncoder<Message> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
        String msgStr = MessageEnDeCoder.encode(message);
        int length = msgStr.getBytes(Charset.forName("UTF-8")).length;
        String str = String.valueOf(length);
        String lenStr = StringUtils.leftPad(str,8,'0');
        msgStr = lenStr + msgStr;
        byteBuf.writeBytes(msgStr.getBytes("UTF-8"));
    }
}
1.4.3.8 SessionManager

SessionManager:剩下最后一个东西没说,这个是用来保存每个登陆成功账户的链接的,底层是个map,key为用户账户,value为链接:

public class SessionManager {
    private static ConcurrentHashMap<String,Channel> sessionMap = new ConcurrentHashMap<>();

    public static void addSession(String userName,Channel channel){
        sessionMap.put(userName,channel);
    }
    public static String removeSession(String userName){
        sessionMap.remove(userName);
        return userName;
    }
    public static String removeSession(Channel channel){
        for (String key:sessionMap.keySet()){
            if (channel.id().asLongText().equalsIgnoreCase(sessionMap.get(key).id().asLongText())){
                sessionMap.remove(key);
                return key;
            }
        }
        return null;
    }

    public static Channel getSession(String userName){
        return sessionMap.get(userName);
    }
}

1.5 聊天客户端

客户端中界面相关的东西是基于JavaFX框架做的

1.5.1 示例

public void login(String userName,String password) throws Exception {
        Bootstrap clientBootstrap = new Bootstrap();
        EventLoopGroup clientGroup = new NioEventLoopGroup();
        try {
            clientBootstrap.group(clientGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
            clientBootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new IdleStateHandler(20, 15, 0, TimeUnit.SECONDS));
                    ch.pipeline().addLast(new StringLengthFieldDecoder());
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new JsonDecoder());
                    ch.pipeline().addLast(new JsonEncoder());
                    ch.pipeline().addLast(bussMessageHandler);
                    ch.pipeline().addLast(new HeartBeatHandler());
                }
            });
            ChannelFuture future = clientBootstrap.connect(server,port).sync();
            if (future.isSuccess()){
                channel = (SocketChannel)future.channel();
                LoginRequest request = new LoginRequest();
                request.setTime(new Date());
                request.setUserName(userName);
                request.setPassword(password);
                request.setMessageType(MessageEnDeCoder.LoginRequest);
                channel.writeAndFlush(request).addListener(new GenericFutureListener<Future<? super Void>>() {
                    @Override
                    public void operationComplete(Future<? super Void> future) throws Exception {
                        if (future.isSuccess()){
                            logger.info("登陆消息发送成功");
                        }else {
                            logger.info("登陆消息发送失败:{}", ExceptionUtils.getStackTrace(future.cause()));
                            Platform.runLater(new Runnable() {
                                @Override
                                public void run() {
                                    LoginController.setLoginResult("网络错误,登陆消息发送失败");
                                }
                            });
                        }
                    }
                });
            }else {
                clientGroup.shutdownGracefully();
                throw new RuntimeException("网络错误");
            }
        }catch (Exception e){
            clientGroup.shutdownGracefully();
            throw new RuntimeException("网络错误");
        }
    }

1.5.2 ServerBootstrap和Bootstrap区别

在网络编程中,特别是在基于Netty等框架的应用中,ServerBootstrapBootstrap是两个关键类,它们分别用于服务端和客户端的引导启动。
下面是它们的区别:

  • ServerBootstrap
    ServerBootstrapNetty中用于创建和配置服务器的启动类。
    它专门用于服务端应用程序,在服务端启动时使用。
    ServerBootstrap负责创建并配置用于接受客户端连接的服务器。
    通过ServerBootstrap可以设置服务器的各种参数,例如绑定端口、设置TCP参数、添加SSL支持等。
  • Bootstrap
    BootstrapNetty中用于创建和配置客户端的启动类。
    它专门用于客户端应用程序,在客户端启动时使用。
    Bootstrap负责创建并配置用于连接到服务器的客户端。
    通过Bootstrap可以设置客户端的各种参数,例如远程主机地址、远程主机端口、连接超时等。

总之,ServerBootstrap用于创建和配置服务器端,而Bootstrap用于创建和配置客户端。它们分别针对服务端和客户端的启动进行了专门的设计和实现。

1.5.3 示例代码解释

对这段代码,我们主要关注这几点:一所有handler的初始化;二connect服务端。
所有handler中,除了bussMessageHandler是客户端特有的外,其他的handler在服务端章节已经讲过了

先看连接服务端的操作。
首先发起连接,连接成功后发送登陆报文。发起连接需要对成功和失败进行处理。发送登陆报文也需要对成功和失败进行处理。注意,这里的成功失败只是代表当前操作的网络层面的成功失败,这时候并不能获取服务端返回的应答中的业务层面的成功失败

1.5.4 BussMessageHandler

BussMessageHandler:整体流程还是跟服务端一样,将受到的消息扔给线程池处理,我们直接看处理消息的各个executor

先看客户端发出登陆请求后,收到登陆应答消息后是怎么处理的:

public class LoginRespExecutor extends ExecutorBase {
    private static Logger logger = LoggerFactory.getLogger(LoginRespExecutor.class);

    public LoginRespExecutor(Channel channel, Message message) {
        super(channel, message);
    }

    @Override
    public void run() {
        LoginResponse response = (LoginResponse)message;
        logger.info("登陆结果:{}->{}",response.getResultCode(),response.getResultMessage());
        if (!response.getResultCode().equals("0000")){
            Platform.runLater(new Runnable() {
                @Override
                public void run() {
                    LoginController.setLoginResult("登陆失败,用户名或密码错误");
                }
            });
        }else {
            LoginController.setCurUserName(response.getUserName());
            ClientApplication.getScene().setRoot(SpringContextUtil.getBean(MainView.class).getView());
        }
    }
}

接下来看客户端是怎么发聊天信息的:

public void sendMessage(Message message) {
        channel.writeAndFlush(message).addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                SendMsgRequest send = (SendMsgRequest)message;
                if (future.isSuccess()){
                    Platform.runLater(new Runnable() {
                        @Override
                        public void run() {
                            MainController.setMessageHistory(String.format("[我]在[%s]发给[%s]的消息[%s],发送成功",
                                    DateFormatUtils.format(send.getTime(),"yyyy-MM-dd HH:mm:ss"),send.getRecvUserName(),send.getSendMessage()));
                        }
                    });
                }else {
                    Platform.runLater(new Runnable() {
                        @Override
                        public void run() {
                            MainController.setMessageHistory(String.format("[我]在[%s]发给[%s]的消息[%s],发送失败",
                                    DateFormatUtils.format(send.getTime(),"yyyy-MM-dd HH:mm:ss"),send.getRecvUserName(),send.getSendMessage()));
                        }
                    });
                }
            }
        });
    }

参考连接:https://mp.weixin.qq.com/s/zlyexr2ix3PaK_Qm6yezHQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/472385.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python中字典相关知识点总结

1.字典的定义 字典&#xff1a;在Python中&#xff0c;字典是一系列键-值对。每个键都与一个值相关联&#xff0c;程序员可以通过键来访问与之相关联的值。 实际举例&#xff1a; student{name:xincun,age:18} 通过实例我们可以发现&#xff0c;键-值对是两个相关联的值。指…

Qualcomm AI Hub-示例(二)模型性能分析

文章介绍 模型性能分析&#xff08;Profiling&#xff09; 当模型尝试部署到设备时&#xff0c;会面临许多重要问题&#xff1a; 目标硬件的推理延迟是多少&#xff1f;该模型是否符合一定的内存预算&#xff1f;模型能够利用神经处理单元吗&#xff1f; 通过在云端的物理设…

邮件客户端 Thunderbird 简单配置

1. 基本情况介绍 原来使用的邮箱客户端是 Office 365 自带的 Outlook 365切换原因&#xff1a;新装电脑&#xff0c;发现原 Outlook 中的账号信息无法迁移&#xff0c;需要耗费大量时间手动配置邮箱使用的邮箱&#xff1a;微软 O365 邮箱、qq 邮箱、163 邮箱、公司私有邮箱 …

【计算机网络篇】计算机网络的定义和分类

文章目录 &#x1f354;什么是计算机网络&#x1f5c3;️计算机网络的分类⭐按交换方式分类⭐按使用者分类⭐按传输介质分类⭐按覆盖范围分类⭐按拓扑结构分类 &#x1f6f8;小结 &#x1f354;什么是计算机网络 计算机网络是指将多台计算机或其他网络设备通过通信链路连接起来…

55、服务攻防——数据库安全RedisHadoopMysql未授权访问RCE

文章目录 常见服务应用的安全测试&#xff1a; 配置不当——未授权访问安全机制——特定安全漏洞安全机制——弱口令爆破攻击 应用服务安全测试流程&#xff1a; 判断服务开放情况——端口扫描&组合猜解等 端口扫描&#xff1a;服务开放&#xff0c;绑定端口没开放&#…

关于继承是怎么样的?那当然是很好理解之

本文描述了关于继承的大部分知识&#xff0c;但是并不全&#xff0c;每篇博客之间的知识都有互串&#xff0c;所以需要把几篇文章合起来看&#xff0c;学会融会贯通&#xff01; 温馨提示&#xff1a;使用PC端观看&#xff0c;效果更佳&#xff01; 目录 1.继承是什么 2.什…

es 聚合操作(一)

前言 Elasticsearch除搜索以外&#xff0c;提供了针对ES 数据进行统计分析的功能。聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。例如&#xff1a; 衣服品牌的受欢迎程度这些衣服的平均价格、最高价格、最低价格这些衣服的每天、每月销量如何 使用…

Bito插件

此文档只作用于指导性工作&#xff0c;更多资料请自行探索。 1、插件安装与介绍 1.1 插件下载与安装 在idea中搜索&#xff1a;Bito Bito is also available for:​编辑VSCode​编辑JetBrains​编辑CLI 1.2 官方介绍 插件&#xff1a;ChatGPT GPT-4 - Bito AI Code Assista…

LTD267次升级 | 商城升级线下退款功能 • 内容URL生成高清二维码 • 官微名片展示产品视频

1、商城优化退款功能&#xff0c;支持手动退款&#xff1b; 2、内容生成二维码支持高清分辨率&#xff1b; 3、平台版名片小程序产品橱窗支持视频内容&#xff1b; 4、 其他已知问题修复与优化&#xff1b; 01 商城 在本次升级中&#xff0c;我们对商城的退款功能做了改进与…

首席财务官期刊投稿邮箱

《首席财务官》杂志是由国家新闻出版总署批准的金融类期刊。杂志围绕“打造CFO新定义”而展开&#xff0c;定位于“国内国内第一本公开发行的面向CFO人群提供服务的专业资讯媒体”&#xff0c;核心围绕“竞争、资本、运营”三大要点展开&#xff0c;以CFO视角解读“公司金融&am…

Python的内建比较函数cmp比较原理剖析

Python中的cmp()函数用于比较两个对象的大小。 cmp( x, y)&#xff1a;比较2个对象&#xff0c;前者小于后者返回-1&#xff0c;相等则返回0&#xff0c;大于后者返回1. Python的cmp比较函数比较原理 Python的cmp函数可以比较同类型之间&#xff0c;或者不同数据类型之间。然…

Nebula Graph-01-Nebula Graph简介和安装以及客户端连接

前言 NoSQL 数据库 图数据库并不是可以克服关系型数据库缺点的唯一替代方案。现在市面上还有很多非关系型数据库的产品&#xff0c;这些产品都可以叫做 NoSQL。NoSQL 一词最早于上世纪 90 年代末提出&#xff0c;可以解释为“非 SQL” 或“不仅是 SQL”&#xff0c;具体解释要…

初识HOOK框架frida

hook是什么 hook框架是一种技术&#xff0c;用于在运行时拦截和修改应用程序的行为&#xff0c;通过hook&#xff0c;可以劫持应用程序的方法调用、修改参数、篡改返回值等&#xff0c;以达到对应用程序的修改、增强或调试的目的。 常见的hook框架有哪些 Xposed Framework&am…

固态浸压计

Solid State Dip Meter(固态浸没仪/固态浸压计) 是真空管栅极浸入式仪表的固态半导体版本。它是一种用于测量 LC 电路谐振频率的仪器。LC 电路是由电感 (L) 和电容 (C) 组成的电路。当电感的感抗与电容的容抗相互抵消时&#xff0c;这些元件可以谐振于特定频率。 固态浸入式仪…

matlab中Signal Editor定义梯形信号输出矩形信号

matlab中Signal Editor定义梯形信号输出矩形信号&#xff0c;可以通过如下勾选差值数据实现梯形信号输出。

MySQL数据库介绍与部署

背景 MySQL 是一个开源的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;最初由瑞典公司 MySQL AB 开发&#xff0c;后被 Oracle 公司收购。MySQL 使用标准 SQL 进行查询和管理数据&#xff0c;并支持多种操作系统。它是最流行的开源数据库之一&#xff0c;被…

金属表面缺陷检测设备通常采用计算机视觉技术和机器学习算法

金属表面缺陷检测是在金属制造过程中非常重要的质量控制步骤。它涉及检测金属表面可能存在的各种缺陷&#xff0c;如裂纹、气泡、凹坑、氧化、斑点等。这些缺陷可能会影响金属制品的性能和质量&#xff0c;因此需要及早发现并进行处理。 目前&#xff0c;金属表面缺陷检测通常采…

C++:部分题目

1. 封装、继承、多态 封装&#xff1a;将所需的数据成员&#xff0c;以及对数据的操作方法&#xff08;成员函数&#xff09;&#xff0c;绑定在一起成为类&#xff08;类型&#xff09;&#xff0c;定义该类型的对象时&#xff0c;成员被自动隐藏在对象内部。通过封装可以限定…

模拟算法总述

模拟 1.模拟算法介绍 模拟算法通过模拟实际情况来解决问题&#xff0c;一般容易理解但是实现起来比较复杂&#xff0c;有很多需要注意的细节&#xff0c;或者是一些所谓很”麻烦”的东西。 模拟题一般不涉及太难的算法&#xff0c;一般就是由较多的简单但是不好处理的部分组成…

.net使用excel的cells对象没有value方法——学习.net的Excel工作表问题

$exception {"Public member Value on type Range not found."} System.MissingMemberException 代码准备运行问题解决1. 下载别的版本的.net框架2. 安装3. 运行 代码 Imports Excel Microsoft.office.Interop.Excel Public Class Form1Private Sub Button1_Click(…