Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(二)

接上文

二、注册OP_WRITE写数据

服务端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 基于NIO实现服务端,通过Selector基于事件驱动客户端的读取
 * 服务端接收到数据后,缓存,注册OP_WRITE事件,收到状态转发数据
 *
 */
class NIOSelectorOpWriteServer1 {
    Selector selector;

    public static void main(String[] args) throws IOException {
        NIOSelectorOpWriteServer1 server = new NIOSelectorOpWriteServer1();
        server.start(); // 开启监听和事件处理
    }

    public void start() {
        initServer();
        // selector非阻塞轮询有哪些感兴趣的事件到了
        doService();
    }

    private void doService() {
        if (selector == null) {
            System.out.println("server init failed, without doing read/write");
            return;
        }
        try {
            while (true) {
                while (selector.select() > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys(); // 感兴趣且准备好的事件
                    Iterator<SelectionKey> iterator = keys.iterator(); // 迭代器遍历处理,后面要删除集合元素
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove(); // 删除当前元素,防止重复处理
                        // 下面根据事件进行分别处理
                        if (key.isAcceptable()) {
                            // 客户端连接事件
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            // 读取客户端数据
                            readHandler(key);
                        } else if (key.isWritable()) {
                            // 为了避免重复写,需要先去除OP_WRITE注册状态
                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                            writeHandler(key);
                        }
                    }
                }
            }
        } catch (IOException exception) {
            exception.printStackTrace();
        }
    }

    private void initServer() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(9090));

            // 此时在selector上注册感兴趣的事件
            // 这里先注册OP_ACCEPT: 客户端连接事件
            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("server init success");
        } catch (IOException exception) {
            exception.printStackTrace();
            System.out.println("server init failied");
        }
    }

    public void acceptHandler(SelectionKey key) {
        ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取客户端的channel
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false); // 设置client非阻塞
            System.out.println("server receive a client :" + client);
            // 注册OP_READ事件,用于从客户端读取数据
            // 给Client分配一个buffer,用于读取数据,注意buffer的线程安全
            ByteBuffer buffer = ByteBuffer.allocate(1024); // buffer这个参数注册的时候也可以不用
            client.register(key.selector(), SelectionKey.OP_READ, buffer);
        } catch (IOException exception) {
            exception.printStackTrace();
        }
    }

    public void readHandler(SelectionKey key) {
        System.out.println("read handler");
        SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
        ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer
        buffer.clear(); // 使用前clear

        // 防止数据分包,需要while循环读取
        try {
            while (true) {
                int readLen = client.read(buffer);
                if (readLen > 0) {
                    // 读取到数据了
                    buffer.flip();
                    byte[] data = new byte[buffer.limit()];
                    buffer.get(data);
                    System.out.println("server read data from " + client + ", data is :" + new String(data));
                    // 给其他客户端注册OP_WRITE;
                    registerWrite(client, data);
                } else if (readLen == 0) {
                    // 没读到数据
                    System.out.println(client + " : no data");
                    break;
                } else if (readLen < 0) {
                    // client 关闭连接
                    // 当客户端主动连接断开时,为了让服务器知道断开了连接,会产生OP_READ事件。所以需要判断读取长度,当读到-1时,关闭channel。
                    System.out.println(client + " close");
                    client.close();
                    break;
                }
            }
        } catch (IOException exception) {
            exception.printStackTrace();
            // client 关闭连接
            System.out.println(client + " disconnect");
            // todo:disconnect 导致一直有read事件,怎么办?
            try {
                client.close();
            } catch (IOException ex) {
                System.out.println("close ex");
            }
        }
    }

    public void writeHandler(SelectionKey key) {
        System.out.println("write handler");
        SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
        ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer,此时处于读模式

        try {
            while (buffer.hasRemaining()) {
                client.write(buffer);
            }
        } catch (IOException exception) {
            System.out.println("write error");
            exception.printStackTrace();
        }
    }

    private void registerWrite(SocketChannel myself, byte[] data) throws IOException {
        Set<SelectionKey> keys = selector.keys();
        // read/write 对应同一个key,同一个client不会发送两遍
        for (SelectionKey key : keys) {
            SelectableChannel channel = key.channel();
            if (channel instanceof SocketChannel && channel != myself) {
                key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);
                ByteBuffer attachment = (ByteBuffer) key.attachment();
                attachment.clear();
                attachment.put(data);
                attachment.flip();
            }
        }
    }
}

这里有几个注意项:

1.在注册OP_WRITE时,需要给所有其他客户端注册;

2.注册OP_WRITE时:是使用key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);避免对原来的OP_READ事件进行覆盖;在OP_WRITE事件来的时候,要把先把OP_WRITE事件去掉,key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); 防止重复写事件状态发生;

3.注册OP_WRITE时,要写的数据,直接给到了原来channel对应的attachment里了;在OP_WRITE事件来的时候,可以直接用;

到此,我们一定有个疑问:既然服务端关不关注OP_WRITE事件,都可以给客户端发送数据,意义何在?

那我们就要看下OP_WRITE事件的具备条件:send-queue是否有空间

而服务端要写数据要关注:服务端数据是否准备好了 + send-queue是否有空间

服务端一般都是在自己数据准备好了后,再注册对客户端的OP_WRITE事件。

但是上面的代码中,在给客户端写数据时,是一直写,直到数据写完,但是 send-queue空间有限,当 send-queue写满后,写操作就会阻塞,导致单线程下业务阻塞。。。

此时,OP_WRITE的优势就体现出来了

我们可以对OP_WRITE的使用方式稍微调整,就可以解决上面的问题:

当我们收到OP_WRITE事件,开始给客户端写数据后,当我们发现该客户端对应的send-queue写满,SocketChannel.write(buffer)会返回已经写出去的字节数,此时为0;我们根据此标志知道,此时send-queue满,不能再写了,此时我们记录下没有写的数据,再次给该客户端注册一个OP_WRITE事件,结束本次写过程;让业务线程继续处理其他事件,等到该客户端对应的send-queue有空闲的时候,会再次收到收到OP_WRITE事件,我们就可以继续写数据了;这样就是解决了写数据满导致业务阻塞的问题了。

关于上面的观点可以参考:

java nio selectedKey,关于 NIO 你不得不知道的一些“地雷”-CSDN博客文章浏览阅读302次。本文是笔者在学习NIO过程中发现的一些比较容易让人忽略的知识的一个总结,而这些让人忽略的小细节恰恰是NIO网络编程中必不可少。虽然现在我们不会直接编写NIO来完成我们的网络层通讯,而是使用成熟的基于NIO的网络框架来实现我们的网络层。如,netty、mina。但对NIO网络编程过程的了解,非常有助于我们更深入的理解netty、mina等网络框架,以至于能更好的使用它们。因此,本文并不对NIO的一些..._java nio selectionkey中的事件多次变化都能每监听到吗https://blog.csdn.net/weixin_39850920/article/details/115994629?spm=1001.2014.3001.5506

Java网络编程——NIO处理写事件(SelectionKey.OP_WRITE)-CSDN博客文章浏览阅读2.1k次,点赞5次,收藏23次。selectionKey.interestOps()就是已经注册的事件,SelectionKey中可以只用1个整形数字来表示多个注册的事件(interestOps变量),SelectionKey.OP_READ=1(二进制为 00000001),SelectionKey.OP_WRITE=4(二进制为 00000100),SelectionKey.OP_CONNECT=8(二进制为 00001000),SelectionKey.OP_ACCEPT=16(二进制为 00010000)。..._selectionkey.op_writehttps://blog.csdn.net/huyuyang6688/article/details/126106949?spm=1001.2014.3001.5506

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/580080.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【cf】Codeforces Round 941(Div.2)题解 A - D

前三题出的最快的一次&#xff0c;但是d没出 A. Card Exchange 只要有一种颜色大于等于 k&#xff0c;那就是 k-1&#xff0c;否则就是 n #include <bits/stdc.h>using namespace std;#define int long long using i64 long long;typedef pair<int, int> PII;…

CONSOB 又下令封锁5个未经授权的投资网站,总数达1065

FX110讯&#xff1a;意大利金融市场监管局 CONSOB 已下令关闭 5 个非法提供金融服务/金融产品的网站。自2019年7月CONSOB有权下令封锁欺诈性金融网站以来&#xff0c;被封禁的网站数量已升至1065个。 以下是 CONSOB 下令新屏蔽的 5个网站&#xff1a; “Luno Invest” Vantage …

C#基础:WPF中常见控件的布局基础

一、用ViewBox实现放缩控件不变 二、布局代码 <Window x:Class"WpfApp1.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"Title"MainWindow"…

将静态资源解析成组件使用的库

vite版本的vite-plugin-svgr vite-plugin-svgr - npm 使用

排序试题解析(二)

8.4.3 01.在以下排序算法中&#xff0c;每次从未排序的记录中选取最小关键字的记录&#xff0c;加入已排序记录的 末尾&#xff0c;该排序算法是( A ). A.简单选择排序 B.冒泡排序 C.堆排序 D.直接插入排序 02&#xff0e;简单选择排序算法的比较次数和移动次数分别为( C )。…

MongoDB的安装(Linux环境)

登录到Linux服务器执行 lsb_release -a &#xff0c;即可得知服务器的版本信息为&#xff1a;CentOS 7。 # CentOS安装lsb_release包 [rootlinux100 ~]# sudo yum install redhat-lsb# 查看Linux版本 [rootlinux100 ~]# lsb_release -a LSB Version: :core-4.1-amd64:core-…

Signed的本质和作用

前言 Verilog中的signed是一个很多人用不好&#xff0c;或者说不太愿意用的一个语法。因为不熟悉它的机制&#xff0c;所以经常会导致运算结果莫名奇妙地出错。其实了解了signed以后&#xff0c;很多时候用起来还是挺方便的。 signed的使用方法主要有两种&#xff0c;其中一种…

【C语言】动态内存分配(一)

目录 1.为什么要有动态内存分配 2.malloc和free 2.1malloc 2.2free 1.为什么要有动态内存分配 我们已经掌握的内存开辟方式有: 但是上述的开辟空间的方式有两个特点: ⭐空间开辟大小是固定的。 ⭐数组在申明的时候&#xff0c;必须指定数组的长度&#xff0c;数组空间一旦…

网络安全与密码学

一、密码学概述 一、 密码学是一门研究信息安全保密的学科&#xff0c;主要涉及对信息进行加密、解密以及相关的安全技术和理论。 它通过使用各种加密算法和技术&#xff0c;将明文信息转换为密文&#xff0c;以确保信息在传输和存储过程中的保密性、完整性和真实性。密码学在…

【Day1】【React学习笔记二】JSX基础

1 概念和本质 Babel官网 2 高频使用场景 2.1 JSX中使用JS表达式 2.2 JSX中实现列表渲染

LeetCode 面试题 17.08 —— 马戏团人塔

阅读目录 1. 题目2. 解题思路3. 代码实现 1. 题目 2. 解题思路 首先&#xff0c;我们对人的身高按照从小到大排序&#xff0c;特别注意&#xff0c;对于身高相等的人&#xff0c;要按照体重从高到低排序。这时候&#xff0c;序列已经满足了在上面的人要比下面的人矮一点&#…

输电线路的“天眼”:双目协同图像视频监测装置

在广袤的天地之间&#xff0c;纵横交错的输电线路如同血脉一般&#xff0c;为我们的生活输送着源源不断的电力。然而&#xff0c;这些“血脉”也常常面临着各种挑战&#xff0c;如外力破坏、恶劣天气等。为了守护这些重要的“生命线”&#xff0c;鼎信智慧研发了一款智能监控设…

类和对象【下】

本节博客主要围绕构造函数、static成员、友元、内部类、匿名对象等待关于“类和对象”这些细节性知识进行收尾&#xff0c;有需要借鉴即可 类和对象_下目录 1.再谈构造函数1.1初始化列表1.2意义 2.static成员2.1概念2.2特性2.3习题 3.友元3.1友元函数概念3.2友元函数的特性 4.内…

Blender笔记之基本操作

code review! —— 2024-04-27 杭州 Blender笔记…

pytest教程-27-分布式执行用例插件-pytest-xdist

上一小节我们学习了pytest随机执行用例插件-pytest-random-order&#xff0c;本小节我们讲解一下pytest分布式执行用例插件pytest-xdist。 前言 平常我们手工测试用例非常多时&#xff0c;比如有1千条用例&#xff0c;假设每个用例执行需要1分钟。如果一个测试人员执行需要10…

选择汽车制造业数据外发解决方案,核心在这三点

汽车制造业是我国国民经济发展的支柱产业之一&#xff0c;汽车制造行业景气度与宏观经济、居民收入水平和固定资产投资密切相关。汽车制造业产业链长&#xff0c;关联度高&#xff0c;汽车制造上游行业主要为钢铁、化工等行业&#xff0c;下游主要为个人消 费、基建、客运和军事…

Linux 常用命令分类

一、帮助命令 命令功能语法man求助man [命令]info求助info [命令]help求助[命令] --help 1.1、man 命令 按键功能空格向下翻页pagedown也就是fn ↓ \downarrow ↓向下翻页pageup向上翻页/string向下查找string这个字符串?string向上查找string这个字符串n,Nn表示继续, N表示…

PotatoPie 4.0 实验教程(26) —— FPGA实现摄像头图像拉普拉斯锐化

为什么要对图像进行拉普拉斯锐化 对图像进行拉普拉斯锐化的目的是增强图像的边缘和细节&#xff0c;使图像看起来更加清晰和锐利。这种技术常用于图像处理中&#xff0c;具体原因如下&#xff1a; 增强图像的边缘信息&#xff1a;拉普拉斯锐化可以突出图像中的边缘特征&#x…

Spring AOP(1)

AOP概述 AOP是Spring框架的第二大核心(第一大核心是IoC). 什么是AOP? 即Aspect Oriented Programming(面向切面编程) 什么是面向切面编程呢? 切面就是指某一类特定的问题, 所以AOP也可以叫做面向特定方法编程. 什么是面向特定方法编程呢?比如上一篇中讲到的拦截器, 就是…

windows无法启动Remote Desktop Services服务(位于本地计算机上) 错误2:系统找不到指定文件

在使用远程计算机时出现的错误&#xff0c;计算机在后台能正常打开&#xff0c;而无法使用远程连接&#xff0c;初步判定为远程服务问题&#xff0c;检查步骤如下&#xff1a; 一、检查计算机Remote Desktop Services服务 该服务是开启计算机远程时必要的服务&#xff0c;若该…