NIO Reactor模型(含代码)

概览

        我们知道NIO就是调用系统内核的的select/poll/epoll方法来实现,这些系统内核方法会扫描或监控IO,每次将所有的IO的状态返回给NIO线程。让NIO线程可以选择处理读取可读状态的IO流,也可以选择继续监控轮询监控IO的其它状态。

        reactor模型也叫做Dispatcher模型,即分发模型,NIO中分发线程和处理线程策略不同而衍生出了四种网络编程模型:单Reactor单线程、单Reactor多线程、多Reactor多线程。

单Reactor单线程

       这里无非就是整个NIO Server里都是单线程顺序执行,Reactor调用select()方法IO状态为ACCEPT就派发给Acceptor处理,IO状态为OP_READ或OP_WRITE则派发给handler进行处理。

上图Reactor获取到IO状态后,依旧是单线程按顺序进行处理。具体代码如下:

package com.longqi.boottest.io;

import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author LongQi
 * @projectName boot-integration
 * @description: TODO
 * @date 2023/3/29 20:06
 */

public class SingleReactorSingleThread {

    public static void main(String[] args) {
        int port = 8080;
        Server server = new Server(port);
        server.start();
        sendMessage(port);
    }

    static class Server{
        private int port;
        public Server(int port){
            this.port=port;
        }
        public void start(){
            try {
                ServerSocketChannel server = ServerSocketChannel.open();
                server.configureBlocking(false);
                server.socket().bind(new InetSocketAddress(port));
                Selector selector = Selector.open();
                // 设置select监听的事件
                server.register(selector, SelectionKey.OP_ACCEPT);
                new Thread(new Reactor(selector)).start();
            }catch (Exception e){
                System.out.println("服务器启动失败:"+e.getMessage());
                e.printStackTrace();
            }
        }
    }

    static class Reactor implements Runnable{
        private Selector selector;
        private Dispatch dispatch = new Dispatch();
        public Reactor(Selector selector){
            this.selector=selector;
        }

        @Override
        public void run() {
            try{
                while (true){
                    int n = selector.select();
                    if(n > 0){
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while (iterator.hasNext()){
                            SelectionKey key = iterator.next();
                            dispatch.dispatch(selector,key);
                            iterator.remove();
                        }
                    }
                }
            }catch (Exception e){
                System.out.println("服务器运行异常:"+e.getMessage());
                e.printStackTrace();
            }
        }
    }

    static class Dispatch{
        private ReadHandler handler = new ReadHandler();
        private Acceptor acceptor = new Acceptor();

        public void dispatch(Selector selector,SelectionKey key){
            if(key.isAcceptable()){
                acceptor.hand(selector,key);
            }
            if(key.isReadable()){
                handler.hand(key);
            }
        }
    }

    static class Acceptor{
        public void hand(Selector selector,SelectionKey key){
            try{
                ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                SocketChannel clientChannel = serverChannel.accept();
                clientChannel.configureBlocking(false);
                clientChannel.register(selector,SelectionKey.OP_READ);
            }catch (Exception e){
                System.out.println("处理请求异常:"+e.getMessage());
                e.printStackTrace();
            }
        }
    }

    static class ReadHandler{
        public void hand(SelectionKey key){
            try{
                SocketChannel channel = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                while (channel.read(buffer)!=-1){
                    //复位,转化为读模式
                    buffer.flip();
                    while (buffer.hasRemaining()){
                        System.out.println("收到客户端"+channel.socket().getPort()+"的信息:"+ StandardCharsets.UTF_8.decode(buffer).toString());
                    }
                    //清空缓存区,转化为写模式
                    buffer.clear();
                }
            }catch (Exception e){
                System.out.println("处理请求异常:"+e.getMessage());
                e.printStackTrace();
            }
        }
    }

    public static void sendMessage(int port) {
        try{
            Thread.sleep(2000);
            ThreadPoolExecutor clientPool = new ThreadPoolExecutor(200,300,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(800));
            for(int i=0;i<300;i++){
                clientPool.submit(new Runnable() {
                    @Override
                    public void run() {
                        try{
                            Socket client = new Socket();
                            client.connect(new InetSocketAddress("127.0.0.1",port));
                            client.getOutputStream().write(("hello world,send time:"+System.nanoTime()).getBytes());
                            // 注意TCP粘包,这里不调用close,服务器是收不到消息的
                            client.close();
                        }catch (Exception e){
                            System.out.println("客户端发送信息失败");
                        }
                    }
                });
            }
        }catch (Exception e){
            System.out.println("客户端发送信息失败");
        }
    }
}

 单Reactor多线程

       这里NIO Server里都是单个Reactor调用select()方法,然后派发给handler时使用线程池进行处理。大大提升处理速度。

上图Reactor获取到IO状态后,可读可写的channel由线程池开线程进行处理。具体代码如下:

package com.longqi.boottest.io;

import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author LongQi
 * @projectName boot-integration
 * @description: TODO
 * @date 2023/3/29 20:06
 */

public class SingleReactorMultiThread{

    public static void main(String[] args) {
        int port = 8080;
        Server server = new Server(port);
        server.start();
        sendMessage(port);
    }

    static class Server{
        private int port;
        public Server(int port){
            this.port=port;
        }
        public void start(){
            try {
                ServerSocketChannel server = ServerSocketChannel.open();
                server.configureBlocking(false);
                server.socket().bind(new InetSocketAddress(port));
                Selector selector = Selector.open();
                // 设置select监听的事件
                server.register(selector, SelectionKey.OP_ACCEPT);
                new Thread(new Reactor(selector)).start();
            }catch (Exception e){
                System.out.println("服务器启动失败:"+e.getMessage());
                e.printStackTrace();
            }
        }
    }

    static class Reactor implements Runnable{
        private Selector selector;
        private Dispatch dispatch = new Dispatch();
        public Reactor(Selector selector){
            this.selector=selector;
        }

        @Override
        public void run() {
            try{
                while (true){
                    int n = selector.select();
                    if(n > 0){
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while (iterator.hasNext()){
                            SelectionKey key = iterator.next();
                            dispatch.dispatch(selector,key);
                            iterator.remove();
                        }
                    }
                }
            }catch (Exception e){
                System.out.println("服务器运行异常:"+e.getMessage());
                e.printStackTrace();
            }
        }
    }

    static class Dispatch{
        private Acceptor acceptor = new Acceptor();
        private ThreadPoolExecutor handPool = new ThreadPoolExecutor(200,300,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(800));

        public void dispatch(Selector selector,SelectionKey key){
            if(key.isAcceptable()){
                acceptor.hand(selector,key);
            }
            if(key.isReadable()){
                handPool.submit(new ReadHandler(key));
            }
        }
    }

    static class Acceptor{
        public void hand(Selector selector,SelectionKey key){
            try{
                ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                SocketChannel clientChannel = serverChannel.accept();
                clientChannel.configureBlocking(false);
                clientChannel.register(selector,SelectionKey.OP_READ);
            }catch (Exception e){
                System.out.println("处理请求异常:"+e.getMessage());
                e.printStackTrace();
            }
        }
    }

    static class ReadHandler implements Runnable{
        private SelectionKey key;
        public ReadHandler(SelectionKey key){
            this.key = key;
        }

        @Override
        public void run() {
            try{
                SocketChannel channel = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                while (channel.read(buffer)!=-1){
                    //复位,转化为读模式
                    buffer.flip();
                    while (buffer.hasRemaining()){
                        System.out.println("收到客户端"+channel.socket().getPort()+"的信息:"+ StandardCharsets.UTF_8.decode(buffer).toString());
                    }
                    //清空缓存区,转化为写模式
                    buffer.clear();
                }
            }catch (Exception e){
                System.out.println("处理请求异常:"+e.getMessage());
                e.printStackTrace();
            }
        }
    }

    public static void sendMessage(int port) {
        try{
            Thread.sleep(2000);
            ThreadPoolExecutor clientPool = new ThreadPoolExecutor(200,300,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(800));
            for(int i=0;i<300;i++){
                clientPool.submit(new Runnable() {
                    @Override
                    public void run() {
                        try{
                            Socket client = new Socket();
                            client.connect(new InetSocketAddress("127.0.0.1",port));
                            client.getOutputStream().write(("hello world,send time:"+System.nanoTime()).getBytes());
                            // 注意TCP粘包,这里不调用close,服务器是收不到消息的
                            client.close();
                        }catch (Exception e){
                            System.out.println("客户端发送信息失败");
                        }
                    }
                });
            }
        }catch (Exception e){
            System.out.println("客户端发送信息失败");
        }
    }
}

 多Reactor多线程

       这里NIO Server里有两个Reactor,一个分发给acceptor,一个分发给处理线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/4481.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【web前端开发】超详细讲解CSS盒子模型

文章目录1.盒子模型介绍2.内容3.边框4.内边距5.⭐盒子大小计算6.⭐内减模式7.外边距外边距的合并外边距的塌陷行内元素的垂直外边距8.⭐清除默认样式9.⭐版心居中1.盒子模型介绍 所有HTML元素可以看作盒子,CSS盒模型本质上是一个盒子&#xff0c;封装周围的HTML元素&#xff0c…

C#多线程锁

背景&#xff1a;再一次测试中用户和我几乎同一时刻&#xff08;不知道谁先谁后&#xff0c;估计间隔在毫秒级&#xff09;操作了系统。 用户那边反馈显示的操作日志是我登录的信息。于是开始查找问题。首先排除了全局变量先后操作被覆盖的原因。首先A账户登录&#xff0c;然后…

基于stm32mp157 linux开发板ARM裸机开发教程3:Cortex-A7 架构与工作模式(连载中)

前言&#xff1a; 目前针对ARM Cortex-A7裸机开发文档及视频进行了二次升级持续更新中&#xff0c;使其内容更加丰富&#xff0c;讲解更加细致&#xff0c;全文所使用的开发平台均为华清远见FS-MP1A开发板&#xff08;STM32MP157开发板&#xff09; 针对对FS-MP1A开发板&…

用 ChatGPT 尝试 JavaScript 交互式学习体验,有用但不完美

很好&#xff0c;但还不能取代专家导师&#xff0c;有时还会犯错&#xff01;ChatGPT 教小狗编程&#xff08; Midjourney 创作&#xff09;GPT-4刚刚发布&#xff0c;相较于GPT-3.5&#xff0c;它有显著的增强功能。其中之一是它在更长时间的交互和更大的提示下&#xff0c;能…

Pytorch环境配置 完整流程 从CUDA和cuDNN到Torch安装

目录1. 安装CUDA2. 安装cuDNN3. 安装Pytorch1. 安装CUDA 确认需要的CUDA版本 nvidia-smi 下载CUDA.exe CUDA下载地址 结合自己电脑的情况下载对印度个版本 安装 双击后安装&#xff0c;可以修改安装路径&#xff0c;我安装在了D盘 安装方式选择自定义 全部勾选 这里如果电脑没…

nnAudio的简单介绍

官方实现 https://github.com/KinWaiCheuk/nnAudio&#xff1b; 论文实现&#xff1a; nnAudio: An on-the-Fly GPU Audio to Spectrogram Conversion Toolbox Using 1D Convolutional Neural Networks&#xff1b; 以下先对文章解读&#xff1a; abstract 在本文中&#x…

美国站针对磁铁产品新政策16 CFR 1262详解

近日&#xff0c;亚马逊美国站公布磁铁产品&#xff08;不包括玩具&#xff09;的新政策更新公告&#xff0c;公告如下&#xff1a; 公告显示&#xff0c;由于美国消费品安全委员会&#xff08;US Consumer Product Safety Commission&#xff09;出台了新的安全规定&#xff…

海王算法(看完不会变成海王)

&#x1f4a7;学了海王算法会变成海王吗&#xff0c;它又能解决什么样的问题呢&#xff1f;&#x1f4a7; &#x1f337; 仰望天空&#xff0c;妳我亦是行人.✨ &#x1f984; 个人主页——微风撞见云的博客&#x1f390; &#x1f433; 数据结构与算法专栏的文章图文…

内存池解释及线程池(Linux)实现

1.内存池1.什么是内存池内存池是一种内存分配方式。在真正使用内存之前&#xff0c;先申请分配一定数量的、大小相等的内存块留作备用。当有新的内存需求时&#xff0c;就从内存池中分出一部分内存块&#xff0c;若内存块不够再继续申请新的内存。使用内存池的优点有&#xff1…

Pyspark_SQL3

Pyspark 注&#xff1a;大家觉得博客好的话&#xff0c;别忘了点赞收藏呀&#xff0c;本人每周都会更新关于人工智能和大数据相关的内容&#xff0c;内容多为原创&#xff0c;Python Java Scala SQL 代码&#xff0c;CV NLP 推荐系统等&#xff0c;Spark Flink Kafka Hbase Hi…

会声会影2023新版本功能详情讲解

会声会影2023Corel VideoStudio一款功能丰富的视频编辑软件。会声会影2023简单易用&#xff0c;具有史无前例的强大功能&#xff0c;拖放式标题、转场、覆叠和滤镜&#xff0c;色彩分级、动态分屏视频和新增强的遮罩创建器&#xff0c;超越基本编辑&#xff0c;实现影院级效果。…

【Django 网页Web开发】12. 实战项目:分页组件的封装 面向接口编程(05)(保姆级图文)

目录1. 对象的方式使用分页组件2. 项目结构3. 编写pagination.py3.1 pagination.py3.2 view.py4. bug修改之&#xff1a;url中搜索关键词q和page4.1 构造url的一个雏形4.2 修改我们的分页组件4.3 搜索小bug5. 应用分页组件&#xff0c;几行代码实现用户管理分页5.1 批量创建用户…

『 MySQL篇 』:MySQL 索引相关问题

目录 一 . 认识索引 二. 索引的数据结构 1 . B Tree vs Hash 2 . B Tree vs 二叉树/红黑树 3 . B 树 vs B树 三. 索引的使用 1. 索引分类 2. 索引用法 一 . 认识索引 当我们在查询一本书中的内容时 , 你会选择翻页每一页去查询呢 ? 还是说按照书的目录去找 ? 答案是…

springmvc(一)

SpringMVC是隶属于Spring框架的一部分&#xff0c;主要是用来进行Web开发&#xff0c;是对Servlet进行了封装。 对于SpringMVC我们主要学习如下内容: SpringMVC简介 请求与响应 REST风格 SSM整合(注解版) 拦截器 SpringMVC是处于Web层的框架&#xff0c;所以其主要的作用就是用…

微信小程序开发:微信小程序生命周期总结

前言 在微信小程序开发中&#xff0c;关于微信小程序API的使用是必备技能&#xff0c;但是关于微信小程序的生命周期也是首先要了解和掌握的知识点。尤其是现在的前端开发领域&#xff0c;关于前端的各种框架和技术都要会&#xff0c;而且微信小程序的语法就是JS的翻版&#xf…

Java 线程安全

一、什么是线程安全 当多个线程访问共享资源时&#xff0c;每个线程都会各自对共享资源进程操作&#xff0c;导致数据不一致&#xff0c;造成程序不能正确的得到结果&#xff0c;此时需要让多个线程排队访问共享资源&#xff0c;让线程安全&#xff0c;才能保证数据安全的被访问…

Jdk动态代理和Cglib动态代理的区别

一&#xff1a; 前言&#xff1a; 代理模式分为 静态代理 和 动态代理&#xff0c;今天我要讲的是动态代理的两种常见、也是被广泛使用的实现方式-------jdk动态代理 和 Cglib动态代理 二&#xff1a;Jdk动态代理实现分析&#xff1a; 结构示意图如下&#xff0c;我定义了一…

FrIf-FrIf_Transmit发送流程【配置参数FrIfImmediate:立即传输还是解耦传输】和代码分析

总目录链接==>> AutoSAR入门和实战系列总目录 文章目录 1 FrIf_Transmit中的 PDU 的配置的传输模式2 代码分析1 FrIf_Transmit中的 PDU 的配置的传输模式 每当FrIf的上层模块想要请求特定 PDU 的传输时,它都会调用 FrIf_Transmit 函数。调用 FrIf_Transmit的时候传递…

C语言--文件操作

目录前言什么是文件程序文件数据文件文件指针FILE结构的维护文件的打开和关闭文件的打开方式文件的顺序读写fputcfgetcfputsfgetsfprintffscanf文件流 标准输入/输出流sscanf和sprintf前言 在讲文件操作之前&#xff0c;我们先来思考这个问题&#xff1a; 我们为什么要使用文件…

大数据技术之Spark(一)——Spark概述

大数据技术之Spark&#xff08;一&#xff09;——Spark概述 文章目录前言一、Spark基础1.1 Spark是什么1.2 Spark VS Hadoop1.3 Spark优势及特点1.3.1 优秀的数据模型和丰富计算抽象1.3.3 spark的特点1.4 Spark 运行环境1.5 Spark运行架构1.5.1 Driver1.5.2 Executor1.5.3 Mas…
最新文章