Netty应用(三) 之 NIO开发使用 网络编程 多路复用

目录

重要:logback日志的引入以及整合步骤

5.NIO的开发使用

5.1 文件操作

5.1.1 读取文件内容

5.1.2 写入文件内容

5.1.3 文件的复制

5.2 网络编程

5.2.1 accept,read阻塞的NIO编程

5.2.2 把accept,read设置成非阻塞的NIO编程

5.2.3 引入Selector监管者 【IO多路复用】

5.2.4 补充几个仍然存在的问题

5.2.5 引入服务器端的写操作

5.2.6 Selector多路复用的核心含义


重要:logback日志的引入以及整合步骤

sl4j是一个日志门面,它有两种具体的实现:

1.log4j2(年代久远,时间长,用的少)

2.logback(用的多,支持好)

而且最重要的一点就是:logback可以输出线程名,在多线程场景下这一点很重要。

  • 引入依赖
<dependency>
     <groupId>ch.qos.logback</groupId>
     <artifactId>logback-classic</artifactId>
     <version>1.2.12</version>
</dependency>
  • 引入logback.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <!-- 控制台输出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
    </appender>

    <root level="DEBUG">
        <appender-ref ref="STDOUT"/>
    </root>

</configuration>
  • 按照idea插件:log support

安装插件后,进行相关的配置:

  • 测试相关的快捷键

引入idea插件后,就可以使用快捷键并且插件会帮你自动生成相关的类!

运行:

  • 总结

  • 补充

为了让控制台对不同日志级别进行不同颜色的甄别输出,再安装一个插件:Grep Console!

重启idea后,再一次进行测试展示具体效果:哈哈哈哈,颜色不一样了哦!

5.NIO的开发使用

5.1 文件操作

5.1.1 读取文件内容

第一个程序,就是读取文件内容的程序

public class TestNIO1 {

    public static void main(String[] args) throws Exception{
        //1.创建Channel
        FileChannel channel = new FileInputStream("D:\\Daily_Code\\Netty_NEW\\data.txt").getChannel();

        //2.创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(10) ;

        //因为可能缓冲区一次无法读取完文件的所有字节,所以要while循环读取,可能多次反复读取到申请的缓冲区中
        while (true) {
            //在创建完缓冲区后,默认是写模式
            //3.把通道内获取的文件数据,写入缓冲区
            int read = channel.read(buffer) ;

            //此时说明文件已读取完,循环应该退出
            if(read == -1) {
                break;
            }

            //4.程序读取buffer的内容数据,后续的操作,设置buffer为读模式
            buffer.flip();

            //5.循环读取缓冲区的数据
            while(buffer.hasRemaining()) {
                byte b = buffer.get();
                //把字节转换成可视化的字符
                //UTF-8编码中:一个字符占用1~4的字节大小。汉字,日文,韩文占用3个字节,ASCII表的字符占用1个字节,xxx字符占用2个字节,一些少数语言字符占用4个字节
                System.out.println("(char)b = " + (char) b);
            }

            //6.设置buffer为写模式,因为循环上去后需要写入缓冲区,所以需要重新设置为写模式
            buffer.clear();
        }
    }

}
5.1.2 写入文件内容
public class TestNIO11 {

    public static void main(String[] args) throws IOException {
        //1.获取Channel ,方法:1.FileOutputStream.getChannel() 2.RandomAccessFile
        FileChannel channel = new FileOutputStream("D:\\Daily_Code\\Netty_NEW\\data1").getChannel();

        //2.获取ByteBuffer
        ByteBuffer buffer = Charset.forName("UTF-8").encode("梅西");

        //3.write : 把ByteBuffer写入到data1这个文件
        channel.write(buffer);

    }

}
5.1.3 文件的复制

对于文件的复制来说:Channel管道对象的transferTo方法拷贝的性能要高于IO流的拷贝性能以及IOUtils.copy的性能。因为transferTo方法底层采用的是零拷贝技术,后面会详细总结。

public class TestNIO12 {
    
    public static void main(String[] args) throws Exception{
        //把data的数据拷贝到data2

        //方法1:使用IO流方式实现拷贝 效率较低
//        FileInputStream inputStream = new FileInputStream("D:\\Daily_Code\\Netty_NEW\\data.txt");
//        FileOutputStream fileOutputStream = new FileOutputStream("D:\\Daily_Code\\Netty_NEW\\data2.txt");
//        byte[] byteBuffer = new byte[1024];
//        while(true) {
//            //把data文件的数据流读取到byteBuffer这个缓冲区
//            int read = inputStream.read(byteBuffer);
//            //读取完毕,退出
//            if(read == -1) {
//                break;
//            }
//            //把data文件的数据拷贝到fileOutputStream流对应的data1文件
//            fileOutputStream.write(byteBuffer,0,read);
//        }

        //方法2:使用commons-io这一工具类
//        FileInputStream inputStream = new FileInputStream("D:\\Daily_Code\\Netty_NEW\\data.txt");
//        FileOutputStream fileOutputStream = new FileOutputStream("D:\\Daily_Code\\Netty_NEW\\data2.txt");
//        IOUtils.copy(inputStream,fileOutputStream) ;

        //方法3:使用NIO方式拷贝,底层是零拷贝,效率较高。但是一次最多拷贝2GB大小的数据,一旦超过2GB,则需要分段拷贝
        FileChannel from = new FileInputStream("D:\\Daily_Code\\Netty_NEW\\data.txt").getChannel();
        FileChannel to = new FileOutputStream("D:\\Daily_Code\\Netty_NEW\\data2.txt").getChannel();

        long left = from.size();
        while (left > 0) {
            //一旦拷贝数据超过2GB,则需要分段拷贝!
            left = left - from.transferTo(from.size()-left,left,to) ;
        }
    }


}

transferTo方法:

5.2 网络编程

在日常开发中,通常都是多个客户端去连接并请求一个服务器端的,所以我们要把更多的关注点放在服务端如何处理这么多个客户端的请求压力(即:连接操作+读写操作的处理)

  • 网络编程NIO的核心结构

1.Channel管道

服务端---》ServerSocketChannel (接受请求)

客户端---》SocketChannel (进行实际的通信)

2.Buffer缓冲区

无论什么类型,最终都要转换成字节类型,所以缓冲区使用的是ByteBuffer

  • 端口号的意义

端口号:

端口号就是进行区分相同ip下的相同协议的不同程序的一种标识

5.2.1 accept,read阻塞的NIO编程
public class MyClient {

    public static void main(String[] args) throws Exception{
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8000)) ;

        System.out.println("-------------------------------");
    }

}
public class MyServer {

    public static void main(String[] args) throws Exception {

        //1.创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //2.设置服务端的监听端口
        serverSocketChannel.bind(new InetSocketAddress(8000)) ;
        //3.把建立连接成功的SocketChannel放入一个集合中
        List<SocketChannel> channelList = new ArrayList<>() ;
        //4.创建ByteBuffer缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(20) ;
        while (true) {
            //5.阻塞时监听客户端的连接
            System.out.println("等待连接服务器...");
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("服务器已经连接..."+socketChannel);
            //6.加入集合
            if(socketChannel != null) {
                channelList.add(socketChannel);
            }
            for(SocketChannel channel:channelList) {
                System.out.println("开始实际的数据通信....");
                //7.把客户端发送的数据写入到ByteBuffer缓冲区
                channel.read(buffer);
                //把buffer转换成读模式
                buffer.flip();
                //8.显示输出到控制台
                CharBuffer decode = Charset.forName("UTF-8").decode(buffer);
                System.out.println("decode.toString() = " + decode.toString());
                //把buffer转换成写模式
                buffer.clear();
                System.out.println("通信已经结束....");
            }
        }
    }

}
5.2.2 把accept,read设置成非阻塞的NIO编程
public class MyClient {

    public static void main(String[] args) throws Exception{
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8000)) ;

        System.out.println("-------------------------------");
    }

}
public class MyServer1 {

    public static void main(String[] args) throws Exception{
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8000));

        List<SocketChannel> channelList = new ArrayList<>() ;

        ByteBuffer buffer = ByteBuffer.allocate(20);

        while (true) {
//            System.out.println("等待连接服务器.....");
            SocketChannel socketChannel = serverSocketChannel.accept();
//            System.out.println("服务器已经连接...." + socketChannel) ;
            if(socketChannel != null) {
                socketChannel.configureBlocking(false);
                channelList.add(socketChannel) ;
            }
            for (SocketChannel channel : channelList) {
                int read = channel.read(buffer);
                if(read > 0) {
                    System.out.println("开始实际的数据通信....");
                    buffer.flip();
                    CharBuffer decode = Charset.forName("UTF-8").decode(buffer);
                    System.out.println("decode.toString() = " + decode.toString());
                    buffer.clear();
                    System.out.println("通信已经结束.......");
                }
            }

        }

    }

}
5.2.3 引入Selector监管者 【IO多路复用】
  • 为什么要引入监管者Selector?

由于是非阻塞,所以会一直while循环。不断的while一直循环着十分影响性能问题,所以需要一个监管者来监管着事件状态的发生,而不要以whie死循环进行监管查看事件状态的发生

使用Selector充当监管者进行监管ServerSocketChannel和SocketChannel,但是Selector做了更加细粒度和高效的监管,Selector并不是像while循环一样无时无刻的监管着,而是Selector只在ServerSocketChannel触发ACCEPT状态事件的时候才会监管监控到,只在SocketChannel触发READ,WRITE状态事件的时候才会监管监控到。Selector就像一个警察,他不会无时无刻的监管人,只有当人触发了某些状态事件(如:抢劫,杀人等)时才会进行监控监管到人。Selector同理。这样可以极大的提高Selector监管监控的能力!也解决了while一直循环的问题。

  • Selector的核心结构

Selector有两个核心结构:一个keys,一个SelectionKeys。都是HashSet。

当serverSocketChannel.register(selector, 0, null) 或 socketChannel.register(selector, 0, null) 调用时,是把serverSocketChannel或socketChannel注册存储到keys这个HashSet中

当SSC实际触发了Accept状态事件或SC实际触发了Read或Write状态事件后,会把触发事件的Channel注册存储到SelectionKeys这个HashSet中,但一旦处理完事件,就立马把该事件对应的Channel从SelectionKeys中删除。但是keys中一直存储着register注册的Channel。

  • 为什么设置成非阻塞后,需要一直while循环?为什么引入Selector监管者后,性能就优化了?

深度分析一下吧。

首先明白一个点:调用read或write或accept这些方法,实际上需要从用户态陷入内核态,在内核态中去调用系统函数才能真正的执行对应方法,然后再调相关的硬件驱动,最终才调用到具体的硬件。

可以简单的这样理解:用户态就是我们编写的java代码,内核态就是操作系统内核的代码。我们在用户态做while循环,就是不断的轮询陷入内核去询问内核的缓冲区 "你有read或write或accept事件触发了吗?",如果没有,那么也不阻塞。如果有,那么会拷贝内核缓冲区数据到用户态进行处理。

这就是同步非组塞,我们需要时刻轮询陷入内核询问是否处理完结果了,我们还需要去管理这个过程。

不断的while循环,实际上是不断的陷入内核,开销是极大的。

未引入Selector前,我们的性能消耗为:n次从用户态陷入内核态的切换消耗性能 + n次内核轮询询问是否有accept或read或write事件触发的性能消耗

所以要引入Selector监管者:就是selector.select()方法的调用监管

把所有的Channel,无论是ServerSocketChannel,还是SocketChannel都加入到Selector中。当调用selector.select()时,表示一次就把所有注册的SC或SSC都陷入内核,在内核中会不断的轮询监听SC或SSC,在内核中会不断的监听”注册到SSC或SC是否有accept或read或write事件的触发呢?“

如果没有事件发生,selector.select()会一直阻塞!但是selector.select()只是在用户态阻塞等待着的,而它在内核中还是会不断的去做轮询,不断的去询问内核缓存区 "注册在Selector上的SC或SSC是否有accept等事件的触发呢?",如果有,那么会把内核缓冲区的数据拷贝到用户态,然后进行用户态进行处理该拷贝的数据(插一嘴,如果一次拷贝不完,那么会再一次调用selector.select()去拷贝内核缓冲区的数据)。如果没有,那么继续轮询。

引入Selector前,我们的性能消耗为:1次从用户态陷入内核态的切换消耗性能 + n次内核轮询询问是否有accept或read或write事件触发的性能消耗

总结:

引入Selector这个监管者后,我们降低了(n-1)次从用户态陷入内核态的切换消耗性能

  • 引入Selector后,Server服务器端代码的具体步骤如下:

具体步骤如下:

第一步:当SSC,SC注册到Selector对象后,SSC,SC对象都会被保存到keys中。

第二步:当selector.select()监控到SSC实际触发了Accept状态事件或SC实际触发了Read或Write状态事件后,会把这些实际触发了Accept状态事件的SSC对象 或 实际触发了Read或Write状态事件的SC对象保存到SelectionKeys中。为什么可以这样?因为SC或SSC事先已经在keys这个HashSet中完成了register存储!!selector.select()监控到事件发生后,只是把SSC或SC对象引用做了一个迁移存储。

第三步:后续我们迭代器遍历的就是SelectionKeys这个触发事件Channel集合。一旦事件处理完毕后,就应该把该事件对应的SSC或SC对象从SelectionKeys中删除掉

第四步:当再一次发生相同或不同类型事件时,selector.select()方法会再一次监控到事件的发生,再重新把SC或SSC的对象引用从keys这个HashSet迁移存储到SelectionKeys这个HashSet中!这样才是良性的循环。

补充说明:

1.等于说,无论是keys还是SelectionKeys,存储的都是SSC对象或SC对象。

2.还有一点需要注意:在keys中第一次存储SSC,SC对象时,是真正new创建出SSC,SC对象。但在SelectionKeys再进行存储时,其实就是把SSC,SC对象的引用复制到SelectionKeys中!

3.理论上:keys中存储的SSC或SC对象的数量是要大于等于selectionKeys中存储的SSC或SC的数量。因为有的SSC,SC没有触发事件状态,那么只会在keys中存储着,而不会移动迁移到selectionKeys中。

4.虽然客户端越来越多,注册的SocketChannel就会越来越多,但是注册的ServerSocketChannel一直只有一个

  • 具体代码如下:
public class MyClient {

    public static void main(String[] args) throws Exception{
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8000)) ;

        System.out.println("-------------------------------");
    }

}
public class MyServer2 {

    public static void main(String[] args) throws Exception{
        //获取ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //监听绑定端口8000
        serverSocketChannel.bind(new InetSocketAddress(8000));
        //设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        //获取Selector监管者
        Selector selector = Selector.open();
        //把ServerSocketChannel注册到Selector监管者上。实际上register注册存储到的是Selector的keys这个HashSet中
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
        //添加感兴趣的事件:ACCEPT
        selectionKey.interestOps(SelectionKey.OP_ACCEPT) ;

        System.out.println("MyServer2.main");

        while (true) {
            //应用层阻塞等待监听Selector注册的所有Channel对应的所有事件状态,但是在内核层会不断的轮询询问是否事件触发开启

            //当内核层监听到有事件触发,那么应用层会停止等待,并且会把该触发事件对应的Channel存储到Selector对应的SelectionKeys这个HashSet中
            //如何存储到SelectionKeys中的?是从keys中拷贝对象引用到SelectionKeys中
            selector.select();

            System.out.println("-----------------MyServer2---------------------");

            //获取的是触发事件的所有Channel对象,即SelectionKeys这个HashSet中的所有Channel对象
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            //使用迭代器进行遍历,为了使用完毕后就把Channel从SelectionKeys中删除
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //把使用完毕的触发事件Channel从SelectionKeys中删除,避免空指针异常
                //举个例子:如果不删除,第二次进入channel.accept()返回的是null,那么自然就会空指针异常!所以处理完事件后要把Channel从SelectionKeys中及时删除,但keys中一直保留着register注册的Channel
                iterator.remove();

                if(key.isAcceptable()) {

                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    //获取到连接的客户端Channel
                    SocketChannel sc = channel.accept();
                    //设为非阻塞
                    sc.configureBlocking(false);

                    //实际上register注册存储到的是Selector的keys这个HashSet中
                    SelectionKey scKey = sc.register(selector, 0, null);
                    //添加感兴趣的事件
                    scKey.interestOps(SelectionKey.OP_READ);

                } else if(key.isReadable()) {
                    try {
                        //此时为SocketChannel的读事件

                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(5);
                        //把Channel管道中的数据读取到Buffer缓冲区
                        int read = sc.read(buffer);
                        if(read == -1) {
                            //当客户端调用socketChannel.close()请求与服务器断开时,会触发一个READ事件给服务器端的。但是这个READ事件比较特殊,服务器端处理后sc.read(buffer)==-1,此时必须cancel()一下
                            //cancel()方法的调用表示服务器已经完成对客户端socketChannel.close()断开的处理。
                            //如果不调用cancel,那么服务端会一直认为没有处理完该特殊的READ事件,会一直反复调用selector.select()方法
                            key.cancel();
                        } else {
                            //转换成读模式
                            buffer.flip();
                            //解码buffer缓冲区中的数据成字符类型
                            CharBuffer decode = Charset.forName("UTF-8").decode(buffer);
                            //打印输出
                            System.out.println("decode.toString() = " + decode.toString());
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        //这里是做健壮性的处理
                        //当出现异常时,客户端与服务器也是断开,此时服务器会接受到一个断开的READ事件。所以需要调用cancel方法处理该READ事件
                        key.cancel();
                    }
                }

            }

        }

    }

}
  • debug测试

1.debug启动Server端

2.debug启动Client端

3.测试客户端连接服务端,服务端selector.select()监听到

客户端连接上服务器后,selector.select()方法就会监听到该连接事件,后续就会把该连接事件对应的Channel对象存储到SelectionKeys

4.测试客户端write写,服务端selector.select()监听到READ事件的产生,后续会把该READ事件对应的Channel存储到SelectionKeys中,使用channel.read(buffer),一次处理不完的话会多次调用selector.select()

Server端:

5.测试客户端socketChannel.close(),服务端selector.select()监听到该特殊的READ事件的产生,后续会把该特殊READ事件对应的Channel存储到SelectionKeys中,使用key.cancel()处理。如果不使用key.cancel()进行处理的话,会一直调用selector.select()

Server端:

  • 细节分析1:

客户端一次发送数据"leomessi",如果缓冲区ByteBuffer大小为5,那么一次读取不完客户端发送的数据,会分批处理客户端发送的数据,直到处理完毕,那么会while true循环2次selector.select(),服务端第一次调用selector.select()处理的是leome,第二次调用select执行的是ssi。

结论:

1.当client客户端连接服务器发起操作后,服务器必须全部处理完成后整个交互才算结束,如果没有全部处理完成,select方法就会被一直调用

上述的案例中,由于leomessi这个数据一次没有处理完,所以才会被调用多次了

2.在某些特殊的操作下,服务器端无法处理,那么select方法就会被频繁的调用 (如:socketChannel.close()的调用,服务端不可以通过channel.read(buffer)简单的进行处理,需要调用cancle()处理该断开的READ事件)

  • 细节分析2:针对于细节分析1中结论第二点,会需要cancle方法的引入

为什么引入cancle方法?

客户端与服务器端建立连接后,过一段时间后,SocketChannel调用close方法与服务端断开。此时相当于客户端发送了一个READ事件给到服务端,只不过这个READ事件十分特殊,它是一个告知服务端我要断开的READ事件,服务端需要进行特殊处理,而不是简单的调用read(buffer)!针对于这种特殊的READ事件,在处理时需要调用cancle方法。如果不这样处理,会给服务端一种错觉,总感觉还有数据没有处理完成,会导致selector.select()一直轮询访问内核,性能消耗极大。

  • 解决半包粘包 并且 引入attatchment附件机制

先说半包粘包问题:

当发送数据"leomessi",如果对方ByteBuffer缓冲区大小只有5,那么第一次接收到的为 leome,第二次接收到的为ssi。其实这就是半包粘包的现象,原本服务端想以完整leomessi进行接收的

为解决该问题,引入相关方法,但是又出现了一个新问题:

如果发送的数据为"leo\nmessi"在解决半包粘包过程中,由于ByteBuffer缓冲区为7,那么第一次读取到的是leo\nmes,通过解决半包粘包方法可知,compact会让mes迁移到最前面等待下一次,但是由于每一次我们都创建的是新ByteBuffer缓冲区,所以mes丢失!所以最终输出的是leo 和 si。

为解决该问题,引入attatchment机制,把Channel与ByteBuffer绑定!具体操作就是在Channel.register的时候,把ByteBuffer作为附件注册给该Channel,之后直接通过Channel获取附件即可拿到对应唯一的ByteBuffer缓冲区,只要缓冲区保持唯一,那么就不会出现compact后数据丢失的情况

首先演示半包粘包的问题:

public class MyServer3 {

    public static void main(String[] args) throws Exception{
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        
        serverSocketChannel.bind(new InetSocketAddress(8000));
        serverSocketChannel.configureBlocking(false);
        
        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
        
        while (true) {
            selector.select();
            System.out.println("MyServer3.main");

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if(key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = channel.accept();
                    socketChannel.configureBlocking(false);
                    SelectionKey scKey = socketChannel.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    try {
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(5);
                        int read = channel.read(buffer);
                        if (read == -1) {
                            key.cancel();
                        } else {
                            buffer.flip();
                            System.out.println("Charset.forName(\"UTF-8\").decode(buffer).toString() = " + Charset.forName("UTF-8").decode(buffer).toString());
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();
                    }
                }
            }

        }
        
    }

}

服务端:启动

客户端:

向服务端发送数据leomessi,由于服务端ByteBuffer过小,导致半包粘包问题

解决方法:

再测试:解决半包粘包问题

但是出现新问题!通过上面解决问题步骤可知,我们必须要把ByteBuffer缓冲区设为"一次可以读取完客户端发送的数据的大小",如果不这样设置,会出现问题!演示如下:

为什么会出现这种问题?

解决方法,把Channel与Buffer进行绑定,使用附件机制进行绑定!!!

测试:

解决半包粘包过程中,由于ByteBuffer不断重新创建导致数据丢失的问题:

public class MyServer3 {

    private static void doLineSplit(ByteBuffer buffer) {
        buffer.flip();
        for (int i = 0; i < buffer.limit(); i++) {
            if (buffer.get(i) == '\n') {//hellosu
                int length = i + 1 - buffer.position();
                ByteBuffer target = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    target.put(buffer.get());
                }

                //截取工作完成
                target.flip();
                System.out.println("StandardCharsets.UTF_8.decode(target).toString() = " + StandardCharsets.UTF_8.decode(target).toString());
            }
        }
        buffer.compact();
    }

    public static void main(String[] args) throws Exception{
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        serverSocketChannel.bind(new InetSocketAddress(8000));
        serverSocketChannel.configureBlocking(false);

        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();
            System.out.println("MyServer3.main");

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if(key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = channel.accept();
                    socketChannel.configureBlocking(false);
                    //使用附件机制,把当前SocketChannel与ByteBuffer缓冲区进行绑定
                    ByteBuffer buffer = ByteBuffer.allocate(10);
                    SelectionKey scKey = socketChannel.register(selector, 0, buffer);
                    scKey.interestOps(SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    try {
                        SocketChannel channel = (SocketChannel) key.channel();
//                        ByteBuffer buffer = ByteBuffer.allocate(10);
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        int read = channel.read(buffer);
                        if (read == -1) {
                            key.cancel();
                        } else {
//                            buffer.flip();
//                            System.out.println("Charset.forName(\"UTF-8\").decode(buffer).toString() = " + Charset.forName("UTF-8").decode(buffer).toString());
                            //解决半包粘包问题
                            doLineSplit(buffer);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();
                    }
                }
            }

        }

    }

}

补充:

假如说:我们不仅想要把ByteBuffer缓冲区作为附件,还想要其他的数据作为附件,该咋怎么办???

方法1:封装多个attatchment元素成一个集合然后register。优点:不存在僵化,后续如果有新的attatchment元素增加,只需要add到集合中即可。缺点:不具有业务含义

方法2:封装多个attatchment元素成一个类然后register。优点:具有业务含义。缺点:存在僵化,如果后续有新的attatchment元素增加,那就无法再增加。当然:类里面可以添加一个集合属性!这样就完美解决了类的缺点

为什么封装成一个类后才能具有业务含义?一个属性name="小明",如果单看name这个属性,你知道name代表的是一个狗的名字还是人的名字,只有把name封装到一个类中,比如 public class User {private String name="小明"}后才可以体现该name代表的是一个人名

  • 解决缓冲区大小不够导致频繁循环。需要扩容解决该问题

客户端发送数据'leomessi\nleo'给服务器,但是Buffer缓冲区大小只有5字节,所以第一次只能接收到leome,由于在接收的数据中查询不到\n,所以不会读取任何字节,所以buffer.compact()调用时,会把未读取的leome迁移到最前面,如下:

演示问题如下:

分析:

解决方法:

扩容,每次按照2倍扩容。

演示:

5.2.4 补充几个仍然存在的问题

1.

除了扩容需要我们清楚,我们依然要考虑缩容的问题:

为什么要缩容?

在第一段时间操作时,由于ByteBuffer缓冲区太小(也可能是客户端发送的数据过大导致后续不断扩容),我们不断的进行扩容操作。但是过了一段时间后,客户端发送的消息数据相对当前缓冲区太小,那么我们应该不断的缩小缓冲区大小,也就是缩容。如果不考虑缩容的话,那么在多线程并发的场景下,每过来一个客户端,我们都需要建立一个SocketChannel,都需要与SocketChannel对应建立一个ByteBuffer缓冲区。假设有10万个客户端与服务端连接,需要建立10万个ByteBuffer。如果ByteBuffer不在客户端发送相对小的消息数据进行合理缩容,由于ByteBuffer是占用内存的!!所以会造成极大的内存浪费!

无论是是扩容还是缩容,后续netty帮我们做的很好了。。。

2.

我们无论是进行扩容还是缩容时,需要把老ByteBuffer缓冲区的数据拷贝到新ByteBuffer缓冲区。ByteBuffer底层就是数组,需要一个个的去拷贝,效率特别低,为了优化,需要引入零拷贝机制。

3.

在网络通信过程中,如何处理解决半包粘包的问题?其实就两种方式,如下:

方法1.使用 \n 区分不同的信息数据,保证了消息的完整性。但是这种挨个字节进行检索\n的方式,效率较低。(使用\n+compact方式进行解决半包粘包问题)

方法2.使用头体分离的方式。在'头'中记录元数据,所谓元数据就是真实数据的各种信息,但是不包括真实数据本身。如:记录真实数据的大小,真实数据的类型。在'体'中记录的才是真实数据本身。

使用头体分离的话,我们就可以通过'头'中元数据中记录的真实数据的大小,然后按这个大小去读取'体'中真实数据。这样也解决了半包粘包的问题。这种方式在当前是流行的。效率较高。

如:HTTP协议,在该协议中响应头中有一个Content-Length,根据这个大小我们再去响应体中去读取Content-Length大小的真实数据!

  • 补充网络知识:分包与流量控制

分包:

要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生分包。

待发送数据大于MSS(最大报文长度),TCP在传输前将进行分包。

流量控制:

5.2.5 引入服务器端的写操作
  • 第一版代码
public class MyServer5 {

    public static void main(String[] args) throws Exception{
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8000));

        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector,0,null);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();
            System.out.println("MyServer5.main");
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = channel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector,SelectionKey.OP_READ);
                    StringBuffer stringBuffer = new StringBuffer();
                    for (int i = 0; i < 200000000; i++) {
                        stringBuffer.append("l");
                    }
                    ByteBuffer buffer = Charset.forName("UTF-8").encode(stringBuffer.toString());
                    while (buffer.hasRemaining()) {
                        int write = socketChannel.write(buffer);
                        System.out.println("每次写给客户端的长度为: " + write);
                    }
                }
            }
        }

    }
}
public class MyClient1 {

    public static void main(String[] args) throws Exception{
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8000));
        int read = 0;
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            read += socketChannel.read(buffer);
            System.out.println("本次读取的大小为: " + read);
            buffer.clear();
        }
    }

}

测试:

Server:

Client:

  • 分析第一版存在的问题

  • 解决第一版存在的问题,引出第二版代码

代码思路如图所示:

public class MyServer6 {

    public static void main(String[] args) throws Exception{
        //
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        //服务端监听端口8000
        serverSocketChannel.bind(new InetSocketAddress(8000));
        //创建监管者
        Selector selector = Selector.open();
        //注册ServerSocketChannel到Selector监管者上。具体表现为:把该Channel存储到keys这一HashSet中
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            //监管者调用select()在操作系统内核轮询询问文件描述符是否就绪(是否触发事件?)【应用层等待阻塞,但是在内核层一直轮询询问是否有事件触发!】
            // 一旦触发某一事件,则把该触发事件的Channel的引用从keys中复制copy到SelectionKeys这一HashSet中
            selector.select();
            System.out.println("MyServer6.main");
            //获取注册在Selector中的SelectionKeys集合
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            //遍历
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //一旦处理完该事件,那么就要把该事件对应的Channel从SelectionKeys中删除 避免重复处理该事件
                iterator.remove();

                //连接事件
                if (key.isAcceptable()) {
                    //拿到ServerSocketChannel
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    //accept监听获取到对应连接上的SocketChannel
                    SocketChannel socketChannel = channel.accept();
                    //设为非阻塞
                    socketChannel.configureBlocking(false);
                    //注册READ事件
                    SelectionKey scKey = socketChannel.register(selector, SelectionKey.OP_READ);

                    //拼接
                    StringBuffer sb = new StringBuffer();
                    for (int i = 0; i < 200000000; i++) {
                        sb.append("s");
                    }
                    //创建缓冲区,默认为写模式
                    ByteBuffer buffer = Charset.forName("UTF-8").encode(sb.toString());
                    //把该Buffer缓冲区的数据通过SocketChannel写给client客户端
                    int write = socketChannel.write(buffer);
                    System.out.println("第一次写出的数据大小:" + write);
                    //如果Buffer一次没有写完,那么需要增加WRITE事件,并且把Buffer缓冲区传递下去(共享但要保证线程安全,所以采取附件的方式)
                    //为什么不能把Buffer设为全局变量进行共享?因为在多线程情况下,共享全局变量,产生并发安全问题。局部变量一般都是线程安全的,因为每一个线程都会在自己的栈帧中保持自己独立的局部变量
                    //为什么要增加WRITE事件?触发WRITE事件的条件:客户端此时处理的及,允许你服务端发数据。
                    //啥时候不能触发WRITE事件?客户端处理不过来,压力大,不允许服务端再发。如果不使用WRITE事件方式,则服务端在不能发数据的情况下,也会发空数据包给客户端。这样是不是导致浪费了服务端有限的线程资源?所以需要WRITE事件!
                    if (buffer.hasRemaining()) {
                        scKey.interestOps(scKey.interestOps()+SelectionKey.OP_WRITE);
                        scKey.attach(buffer);
                    }

                } else if (key.isWritable()) {//写事件

                    //获取发送数据的Channel
                    SocketChannel channel = (SocketChannel) key.channel();

                    //获取附件,目的是接着第一次写之后继续写,保证Buffer唯一传递
                    ByteBuffer attachment = (ByteBuffer) key.attachment();

                    //写!但是不循环写,本次写完后,如果客户端还让继续写,那么Selector.select()会再一次监听到WRITABLE事件,那么可以再一次到这里写!
                    int write = channel.write(attachment);
                    System.out.println("本次写出的数据大小为: " + write);

                    //如果Buffer写完了,那么需要把附件置空,并且清空WRITE事件
                    if (!attachment.hasRemaining()) {
                        key.attach(null);
                        key.interestOps(key.interestOps()-SelectionKey.OP_WRITE);
                    }
                }
            }
        }
    }

}
public class MyClient1 {

    public static void main(String[] args) throws Exception{
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8000));
        int read = 0;
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            read += socketChannel.read(buffer);
            System.out.println("本次读取的大小为: " + read);
            buffer.clear();
        }
    }

}

测试:

Server端

Client端:

5.2.6 Selector多路复用的核心含义

把多个客户端(SocketChannel)和服务端(ServerSocketChannel)都注册到一个Selector上,Selector监听内核缓冲区,当允许触发READ或WRITE的时候,才会让当前线程去执行该就绪事件的操作。这样就更加高效的利用了当前线程的资源性能。也使得单线程的资源利用率发挥的很好。


linux操作系统 默认可以允许客户端连接的socket数量不够多,我们需要在合适的事件增加修改这个socket数量。啥时候修改呢?当并发上来后,CPU和内存相比之前没什么太大变化,说明此时socket连接数量不够了,客户端再来请求连接时,服务端(linux)直接给拒绝了,所以CPU和内存不受这次并发流量大的影响。此时我们应该提升linux服务器对应的socket数量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/384278.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

WPF中值转换器的使用

什么是值转换器 在WPF&#xff08;Windows Presentation Foundation&#xff09;中&#xff0c;值转换器&#xff08;Value Converter&#xff09;是一种机制&#xff0c;允许你在绑定时转换绑定源和绑定目标之间的值。值转换器实现了 IValueConverter 接口&#xff0c;该接口…

SSM实现支付宝沙盒支付

文章目录 沙盒支付准备配置测试 沙盒支付 这里用的支付宝的一个沙盒环境&#xff0c;是支付宝提供给开发者测试用的。 下面主要梳理一下&#xff0c;支付功能的实现&#xff0c;其实还是很简单的&#xff0c;因为支付宝都提供好了&#xff0c;我们只要调用接口去传入参数即可…

【后端高频面试题--Linux篇】

&#x1f680; 作者 &#xff1a;“码上有前” &#x1f680; 文章简介 &#xff1a;后端高频面试题 &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; 后端高频面试题--Linux篇 Windows和Linux的区别&#xff1f;Unix和Linux有什么区别&#xff1f…

【原创 附源码】Flutter安卓及iOS海外登录--Google登录最详细流程

最近接触了几个海外登录的平台&#xff0c;踩了很多坑&#xff0c;也总结了很多东西&#xff0c;决定记录下来给路过的兄弟坐个参考&#xff0c;也留着以后留着回顾。更新时间为2024年2月8日&#xff0c;后续集成方式可能会有变动&#xff0c;所以目前的集成流程仅供参考&#…

代码随想录算法训练营第42天(动态规划04 ● 01背包问题,你该了解这些! ● 01背包问题,你该了解这些! 滚动数组 ● 416. 分割等和子集

动态规划part04 01背包问题 二维动态规划五部曲 01背包问题 一维 (没理解动态规划五部曲 416. 分割等和子集解题思路 对于面试的话&#xff0c;其实掌握01背包&#xff0c;和完全背包&#xff0c;就够用了&#xff0c;最多可以再来一个多重背包。 每一件物品其实只有两个状态&a…

使用耳机壳UV树脂制作私模定制耳塞有哪些选择呢?

私模定制耳塞人士的选择可以从以下几个方面考虑&#xff1a; 专业经验&#xff1a;选择有丰富经验的私模定制耳塞人士&#xff0c;能够更好地理解用户需求&#xff0c;提供更专业的建议和服务。可以通过查看其作品和客户评价来了解其经验和口碑。材料质量&#xff1a;选择使用…

模型 AISAS(注意、兴趣、搜索、行动、分享)

系列文章 主要是 分享 思维模型&#xff0c;涉及各个领域&#xff0c;重在提升认知。消费者行为模型。 1 模型AISAS(注意、兴趣、搜索、行动、分享)的应用 1.1 AISAS用于社交媒体营销 假设我们有一家健身中心&#xff0c;想要通过社交媒体营销来吸引新客户。 A&#xff08;A…

使用耳机壳UV树脂制作私模定制耳塞有什么优点和缺点呢?

使用耳机壳UV树脂制作私模定制耳塞具有以下优点&#xff1a; 个性化定制&#xff1a;UV树脂可以根据用户的耳型进行个性化定制&#xff0c;使耳塞与用户的耳朵形状完美契合&#xff0c;提高舒适度和佩戴稳定性。高强度和耐磨性&#xff1a;UV树脂具有高强度和耐磨性&#xff0…

宿舍报修|宿舍报修小程序|基于微信小程序的宿舍报修系统的设计与实现(源码+数据库+文档)

宿舍报修小程序目录 目录 基于微信小程序的宿舍报修系统的设计与实现 一、前言 二、系统功能设计 三、系统实现 1、用户小程序功能模块 2、学生信息管理 3、维修人员管理 4、故障上报管理 5、论坛信息管理 四、数据库设计 1、实体ER图 2、具体的表设计如下所示&…

C++重新入门-C++ 函数

函数是一组一起执行一个任务的语句。每个 C 程序都至少有一个函数&#xff0c;即主函数 main() &#xff0c;所有简单的程序都可以定义其他额外的函数。 您可以把代码划分到不同的函数中。如何划分代码到不同的函数中是由您来决定的&#xff0c;但在逻辑上&#xff0c;划分通常…

python+flask+django医院预约挂号系统6nrhh

医院预约挂号系统主要有管理员、用户和医生三个功能模块。以下将对这三个功能的作用进行详细的剖析。 技术栈 后端&#xff1a;python 前端&#xff1a;vue.jselementui 框架&#xff1a;django/flask Python版本&#xff1a;python3.7 数据库&#xff1a;mysql5.7 数据库工具…

基于无线传感器网络的LC-DANSE波束形成算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 4.1LC-DANSE算法原理 4.2 LCMV算法原理 5.完整程序 1.程序功能描述 在无线传感器网络中&#xff0c;通过MATLAB对比LC-DANSE波束形成算法和LCMV波束形成算法。对比SNR&#xff0c;mse等指标…

QT学习文件操作类 QFile

&#xff08;一&#xff09;QFile QFile 是 Qt 框架中用于文件处理的一个类。它提供了读取和写入文件的功能&#xff0c;支持文本和二进制文件。QFile 继承自 QIODevice &#xff0c;因此它可以像其他 IO 设备一样使用。 &#xff08;1&#xff09;主要功能 1. 文件读写…

深度优先搜索(DFS)与广度优先搜索(BFS):探索图与树的算法

一、引言 在图论和树形结构中&#xff0c;搜索算法是寻找从起点到终点的路径的关键。其中&#xff0c;深度优先搜索&#xff08;DFS&#xff09;和广度优先搜索&#xff08;BFS&#xff09;是最常用且最基础的两种搜索算法。本文将详细介绍广度优先搜索&#xff08;BFS&#xf…

CVE-2022-25487 漏洞复现

漏洞描述&#xff1a;Atom CMS 2.0版本存在远程代码执行漏洞&#xff0c;该漏洞源于/admin/uploads.php 未能正确过滤构造代码段的特殊元素。攻击者可利用该漏洞导致任意代码执行。 其实这就是一个文件上传漏洞罢了。。。。 打开之后&#xff0c;/home路由是个空白 信息搜集&…

分享88个鼠标特效,总有一款适合您

分享88个鼠标特效&#xff0c;总有一款适合您 88个鼠标特效下载链接&#xff1a;https://pan.baidu.com/s/1ljcxwgXGpw7baiufUGJjZA?pwd8888 提取码&#xff1a;8888 Python采集代码下载链接&#xff1a;采集代码.zip - 蓝奏云 学习知识费力气&#xff0c;收集整理更不…

C++之继承

一&#xff0c;概念及用法 1&#xff09;概念 首先我们来了解一下官方的概念&#xff1a;继承(inheritance)机制是面向对象程序设计使代码可以复用的最重要的手段&#xff0c;它允许程序员在保持原有类特性的基础上进行扩展&#xff0c;增加功能&#xff0c;这样产生新的类&a…

【Linux学习】线程详解

目录 十八.多线程 18.1 线程与进程 18.2 内核视角看待创建线程与进程 18.3 线程优缺点总结 线程的优点&#xff1a; 线程的缺点&#xff1a; 线程的用途&#xff1a; 18.4 线程与进程的联系 十九.线程控制 19.1 POSIX线程库 19.2 线程创建 19.3 线程等待 19.4 线程终止 19.5 线…

C# OCR识别图片中的文字

1、从NuGet里面安装Spire.OCR 2、安装之后&#xff0c;找到安装路径下&#xff0c;默认生成的packages文件夹&#xff0c;复制该文件夹路径下的 6 个dll文件到程序的根目录 3、调用读取方法 OcrScanner scanner new OcrScanner(); string path "C:\1.png"; scann…

原来你也可以DDOS?

首先祝大家新年快乐 这次内容就主要是纸上谈兵&#xff08;因为无法未经试验无法保证成功&#xff09;&#xff0c;而且有关这方面的科普视频也已经有很多了。 原文地址&#xff1a;原来你也可以DDOS&#xff1f; - Pleasure的博客 下面是正文内容&#xff1a; 前言 DDOS是一…