基于Reactor模式下的epoll多路复用服务器

文章目录

  • 一、认识Reactor模式
    • 1.1 Reactor 模式的概念
    • 1.2 Reactor 模式的组件
    • 1.3 Reactor 模式的流程
    • 1.4 Reactor 模式的优点
  • 二、Reactor模式下的 epoll ET服务器
    • 2.1 总体设计思路
    • 2.2 Connection 类结构
    • 2.3 封装 socket 实现 Sock 类
    • 2.4 封装 epoll 实现 Epoller 类
      • 2.4.1 CreateEpoller函数
      • 2.4.2 AddEvent函数
      • 2.4.3 DelEvent函数
      • 2.4.4 ModEvent函数
      • 2.4.5 Loop函数
    • 2.5 SetNonBlock函数
    • 2.6 基于Reactor模式设计TcpServer类
      • 2.6.1 TcpServer类的结构
      • 2.6.2 回调函数
        • 2.6.2.1 Accepter函数
        • 2.6.2.2 TcpRecver函数
        • 2.6.2.3 TcpSender函数
        • 2.6.2.4 TcpExcepter函数
      • 2.6.3 AddConnection函数
      • 2.6.4 Dispatcher事件派发函数
      • 2.6.5 EnableReadWrite函数
  • 三、简单的业务处理
    • 3.1 简单协议定制
    • 3.2 业务处理函数
    • 3.4 运行服务器
  • 四、总结


一、认识Reactor模式

1.1 Reactor 模式的概念

Reactor模式称为反应器模式或应答者模式,是基于事件驱动的设计模式。Reactor模式是一种常用于网络编程的设计模式,它旨在提供一种高效且可扩展的方式来处理并发请求。该模式的核心思想是将请求的处理逻辑与输入输出分离开来,通过异步I/O和事件驱动的方式来处理请求。

1.2 Reactor 模式的组件

Reactor模式包含以下主要组件:

  1. Reactor:该组件负责处理事件循环并分发事件给对应的处理器。它使用一个或多个I/O线程来监听事件,并根据不同的事件类型将请求路由到不同的处理器。
  2. Handlers:处理器负责处理特定类型的请求。例如,HTTP请求可以由HTTP处理器处理,TCP请求可以由TCP处理器处理。每个处理器都包含了处理特定请求类型的逻辑,并且在事件触发时调用对应的处理函数来完成请求处理。
  3. Synchronous Event Demultiplexer:该组件用于等待并监视输入事件,例如来自客户端的连接请求。当事件发生时,它会将事件通知给Reactor组件。
  4. Asynchronous Event Demultiplexer:该组件用于等待并监视异步I/O操作的完成事件。当I/O操作完成时,它会将事件通知给Reactor组件。

1.3 Reactor 模式的流程

Reactor模式的基本流程如下:

  1. Reactor组件启动并开始监听输入事件。
  2. 当有输入事件发生时,Reactor组件会将事件通知给对应的处理器。
  3. 处理器使用异步I/O进行请求处理,当请求处理完成时,它会将响应数据写入输出缓冲区。
  4. Reactor组件检查输出缓冲区是否有数据需要写入客户端,如果有则进行输出操作。
  5. 重复步骤2-4,直到连接关闭或出现错误。

1.4 Reactor 模式的优点

Reactor模式具有以下优点:

  1. 高并发:Reactor模式采用异步I/O和事件驱动的方式,可以处理大量并发请求。

  2. 高性能:Reactor模式避免了每个请求都创建一个新的线程或进程的开销,从而提高了性能。

  3. 可扩展性:Reactor模式支持添加新的处理器来处理不同类型的请求,并且可以通过添加更多的I/O线程来处理更多的请求。

  4. 可维护性:Reactor模式将请求处理逻辑与输入输出分离开来,降低了系统的耦合度,从而使得系统更易于维护和扩展。

二、Reactor模式下的 epoll ET服务器

2.1 总体设计思路

在epoll ET服务器中,主要处理三个事件,那就是读、写、异常就绪。在服务器中设置了关于处理这些事件的回调函数。

  • 读就绪:如果是监听套接字的读就绪则调用Accepter函数获取连接,如果其他套接字的读就绪则调用TcpRecver读取客户端发来的消息。
  • 写就绪:如果写事件就绪就调用 TcpSender函数将待发送的数据写入到发生缓冲区中。
  • 异常就绪:如果程序运行中出现了异常,就调用TcpExcepter函数来关闭相应的套接字及释放其资源。

在该服务器中,将所有的文件描述符都交给epoll进行监管,如果发生了事件就绪,就通过Dispatcher事件派发函数派发事件给相应的回调函数进行处理。

2.2 Connection 类结构

因为每一个服务端与客户端的连接都有一个套接字、输入输出缓冲区、读写异常回调函数,因此将这个连接封装成一个Connection类。
Connection 类的结构如下:

class Connection
{
public:
    int _sock;
    // 当前sock对应的TcpServer对象
    TcpServer *_R;
    // 输入缓冲区
    std::string _inbuffer;
    // 输出缓冲区
    std::string _outbuffer;

    // 回调方法
    // 读取
    func_t _recver;
    // 发送
    func_t _sender;
    // 异常
    func_t _excepter;

public:
    Connection(int sock, TcpServer *r) : _sock(sock), _R(r) {}
    ~Connection() {}
    void SetRecver(func_t recver) { _recver = recver; }
    void SetSender(func_t sender) { _sender = sender; }
    void SetExcepter(func_t excepter) { _excepter = excepter; }
};
  • 该类中包含了一个_sock的成员变量,即当前连接的文件描述符。
  • 该类中包含了一个指向Reactor服务器的回指指针_R,便于在外部使用Connection对象调用Reactor中的成员函数。
  • 该类中包含了对应文件描述符的读写缓冲区_inbuffer_outbuffer,用于暂时缓存数据。
  • 该类中包含了_recver_sender_excepter三个回调函数对象,分别用于处理读、写、异常事件。
  • 在该类中还提供了SetRecverSetSenderSetExcepter三个设置读写异常的回调方法。其中func_t 函数对象的定义如下:
using func_t = std::function<int(Connection *)>;

该函数对象的参数是一个Connection类型的指针,后续在进行回调的时候可以通过该指针访问Connection中的元素,返回值为整型。

2.3 封装 socket 实现 Sock 类

这里对socket进行了简单的封装,并且其中的方法全部设置为静态,便于使用。其结构如下:

#pragma once

#include <iostream>
#include <cstring>
#include <cstdlib>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "Log.hpp"

class Sock
{
public:
    static const int gbacklog = 3;

    static int Socket()
    {
        int listenSock = socket(PF_INET, SOCK_STREAM, 0);
        if (listenSock < 0)
        {
            logMsg(FATAL, "socket create: %d:%s", errno, strerror(errno));
            exit(-1);
        }
        // 运行服务器快速重启
        int opt = 1;
        setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listenSock;
    }

    static void Bind(int sock, uint16_t port)
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof local);
        local.sin_family = PF_INET;
        local.sin_addr.s_addr = INADDR_ANY;
        local.sin_port = htons(port);

        if (bind(sock, (const sockaddr *)&local, sizeof local) < 0)
        {
            logMsg(FATAL, "socket bind: %d:%s", errno, strerror(errno));
            exit(-1);
        }
    }

    static void Listen(int sock)
    {
        if (listen(sock, gbacklog) < 0)
        {
            logMsg(FATAL, "socket listen: %d:%s", errno, strerror(errno));
            exit(-1);
        }
    }

    static int Accept(int sock, std::string *clientIp, uint16_t *clientPort)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int sock_fd = accept(sock, (sockaddr *)&peer, &len);
        if (sock_fd < 0)
        {
            // logMsg(FATAL, "socket accept: %d:%s", errno, strerror(errno));
            return -1;
        }
        if (clientIp)
            *clientIp = inet_ntoa(peer.sin_addr);
        if (clientPort)
            *clientPort = ntohs(peer.sin_port);

        return sock_fd;
    }
};

  • 其中Socket函数用于创建监听套接字,并且其中设置了setsocketopt函数使得服务器进程在退出后能够快速重启。
  • Bind函数用于绑定监听套接字对应的网络信息。
  • Listen函数用于监听客户端的连接。
  • Accept函数用于获取客户端的连接请求。

2.4 封装 epoll 实现 Epoller 类

这里同样也对epoll进行简单的封装,同样将其成员函数设置为静态方便后续调用。关于epoll可见博主的另一篇文章:IO多路复用(select、poll、epoll网络编程)。

2.4.1 CreateEpoller函数

CreateEpoller函数用于创建epoll实例:

class Epoller{
	static int CreateEpoller()
	{
	    int epfd = epoll_create(SIZE);
	    if (epfd == -1)
	    {
	        logMsg(FATAL, "epoll_create error: %d: %s", errno, strerror(errno));
	        exit(-1);
	    }
	    return epfd;
	}
}

2.4.2 AddEvent函数

AddEvent函数用于添加文件描述sockepoll实例中对其进行监视:

class AddEvent{
    static bool AddEvent(int epfd, int sock, uint32_t event)
    {
        epoll_event ev;
        ev.data.fd = sock;
        ev.events = event;
        int res = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
        return res == 0;
    }
}

2.4.3 DelEvent函数

DelEvent函数函数用于将指定的文件描述符从epoll实例中删除:

class DelEvent{
    static bool DelEvent(int epfd, int sock)
    {
        int res = epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);
        return res == 0;
    }
}

2.4.4 ModEvent函数

ModEvent函数用于修改epoll实例监视的文件描述符的事件:

class ModEvent{
    static bool ModEvent(int epfd, int sock, uint32_t event)
    {
        epoll_event ev;
        ev.data.fd = sock;
        ev.events = event;
        int res = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev);
        return res == 0;
    }
}

2.4.5 Loop函数

Loop函数用于从epoll中获取事件已经就绪的文件描述符:

class Loop
{
    static int Loop(int epfd, epoll_event revs[], int num)
    {
        int n = epoll_wait(epfd, revs, num, -1);
        if(n == -1)
        {
            logMsg(FATAL, "epoll_wait error: %d: %s", errno, strerror(errno));
        }
        return n;
    }
}

2.5 SetNonBlock函数

因为在epoll服务器中基本采用的是ET模式,而该模式要求程序使用非阻塞的IO操作方式,因此要将文件描述符设置为非阻塞,因此定义了SetNonBlock函数,将其封装在工具类Util中:

class Util
{
public:
    // 设置为非阻塞
    static void SetNonBlock(int fd)
    {
        int _fd = fcntl(fd, F_GETFL);
        fcntl(fd, F_SETFL, _fd | O_NONBLOCK);
    }
};

2.6 基于Reactor模式设计TcpServer类

2.6.1 TcpServer类的结构

TcpServer类就是基于Reactor模式所设计的,其基本框架如下:

class TcpServer
{
public:
    TcpServer(callback_t cb, int port) : _listenSock(-1), _port(port), _epfd(-1), _cb(cb)
    {
        _revs = new epoll_event[revs_num];
        // 网络
        _listenSock = Sock::Socket();
        Sock::Bind(_listenSock, _port);
        Sock::Listen(_listenSock);

        // 创建epoll实例
        _epfd = Epoller::CreateEpoller();

        // 添加listenSock到epoll, 并且建立与Connection之间的映射
        AddConnection(_listenSock, EPOLLIN | EPOLLET, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
    }
    ~TcpServer()
    {
        if (_listenSock != -1)
            close(_listenSock);
        if (_epfd != -1)
            close(_epfd);

        delete[] _revs;
    }

public:
	//添加文件描述符到epoll, 创建对应的Connection对象
    void AddConnection(int sockfd, uint32_t event, func_t recver, func_t sender, func_t excepter);

    // 用于处理连接请求就绪
    int Accepter(Connection *conn);

    // 普通IO的读、写、异常就绪处理函数
    int TcpRecver(Connection *conn);

    int TcpSender(Connection *conn);

    int TcpExcepter(Connection *conn);

    // 修改epoll对sock所关心的事件
    void EnableReadWrite(int sock, bool readable, bool writeable);

    // 派发就绪事件
    void Dispatcher();

	//判断unordered_map释放存在该sock
    bool IsExists(int sock);
	
	//用于运行服务器
    void Run();

private:
    // 就绪事件列表的上限
    static const int revs_num = 64;
    // 监听的socket
    int _listenSock;
    // 监听的端口号
    int _port;
    // epoll实例
    int _epfd;
    // 就绪事件列表
    epoll_event *_revs;
    // sock 与 Connection之间的映射关系
    std::unordered_map<int, Connection *> _connections;

    // 处理业务请求的回调方法
    callback_t _cb;
};

该类中设置的成员变量有:

  • _listenSock:服务器监听的socket套接字。
  • _port:监听的端口号。
  • _epfd:epoll实例的文件描述符。
  • _revs:就绪事件列表,用于存储就绪的文件描述符epoll_event 的节点。
  • revs_num:就绪事件列表的最大数量。
  • _connections:所有文件描述符与其对应的Connection对象的映射关系集合。
  • _cb:处理业务请求的回调方法。

其中处理业务请求的回调方法_cb的类型是自定义的callback_t 回调函数,其定义如下:

using callback_t = std::function<int(Connection *, std::string &)>;

其中参数分别为Connection类型的指针和string类型的字符串,传入的Connection对象指针便于对相关文件描述符进行操作,而string字符串用于存储处理后的结果。

构造函数说明:
TcpServer类的构造函数中对成员变量进行初始化,包括socket套接字的创建、绑定和监听,创建epoll实例,以及将listenSock添加到epoll实例中进行监听和建立其与Connection对象之间的映射关系。由于listenSock关心的事件只有获取连接,因此在AddConnection函数中2传入的回调函数只有Accepter,其他两个不关心则传入nullptr即可。

2.6.2 回调函数

2.6.2.1 Accepter函数

Accepter函数用于2处理连接请求就绪,由于可能存在同时有大量的连接请求,因此在该函数的实现中使用了while循环:

int TcpServer::Accepter(Connection *conn)
{
    // 如果同时有大量的连接请求到了,就要使用循环进行处理
    while (true)
    {
        std::string clientIp;
        uint16_t clientPort = 0;
        int sockfd = Sock::Accept(conn->_sock, &clientIp, &clientPort);
        if (sockfd < 0)
        {
            if (errno == EINTR) // 信号中断
                continue;
            else if (errno == EAGAIN || errno == EWOULDBLOCK) // 发生阻塞
                break;
            else
            {
                logMsg(FATAL, "accpet error: %d: %s", errno, strerror(errno));
                return -1;
            }
        }

        logMsg(DEBUG, "get a new link: %d", sockfd);
        // 默认设置epoll只关心读事件,因为再最开始,写事件是就绪的,最后续代码运行过程中条件发生了改变,再修改epoll要关心的事件
        AddConnection(sockfd, EPOLLIN | EPOLLET, std::bind(&TcpServer::TcpRecver, this, std::placeholders::_1),
                      std::bind(&TcpServer::TcpSender, this, std::placeholders::_1),
                      std::bind(&TcpServer::TcpExcepter, this, std::placeholders::_1));
    }

在调用AddConnection函数传入回调函数参数时使用了C++11中的bind函数绑定相应的回调函数。

2.6.2.2 TcpRecver函数

TcpRecver函数用于处理普通文件描述符的读事件就绪,函数实现如下:

int TCPServer::TcpRecver(Connection *conn)
    {
        // 由于ET模式下采取非阻塞,因此需要循环读取
        while (true)
        {
            char buffer[1024];
            // 123\n -> 实际读取4个字节,因此要减 1
            ssize_t s = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0);
            if (s > 0)
            {
                buffer[s] = 0; // 去掉 \n
                conn->_inbuffer += buffer;
            }
            else if (s == 0)
            {
                logMsg(DEBUG, "client quit");
                conn->_excepter(conn);
                break;
            }
            else
            {
                if (errno == EINTR)
                    continue;
                else if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else
                {
                    logMsg(WARNING, "recv error: %d: %s", errno, strerror(errno));
                    conn->_excepter(conn);
                    break;
                }
            }
        }
        // 本轮读取完成
        std::vector<std::string> result;
        PackageSplit(conn->_inbuffer, &result);
        for (auto &message : result)
        {
            std::cout << "message: " << message << " inbuffer: " << conn->_inbuffer << std::endl;
            _cb(conn, message);
        }
        return 0;
    }

由于采用了非阻塞的操作方法,为了保证读取报文的完整性,因此要进行循环式的不断读取,直到该轮读取完毕才退出,而且每次循环读取都要将读取的数据拼接的对应的Connection对象的输入缓冲区_inbuffer中。

其中:

  • EINTR:指操作被中断唤醒,需要重新读/写。
  • EAGAINEWOULDBLOCK:表明读取完毕,输入缓冲为空。
  • 如果在函数的调用过程中出现了错误,就调用相应的异常处理函数。

EAGAINEWOULDBLOCK的说明:

非阻塞模式(比如epollET模式下进行recv,对应的fd文件描述符设置为非阻塞)下调用了阻塞操作(可理解为已经将输入缓冲区的数据读取完毕),在该操作没有完成就返回这个错误,这个错误不会破坏socket的同步,因此不用管它,下次循环接着recv读取就可以。所以对非阻塞socket而言,EAGAIN不是一种错误。在VxWorksWindows上,EAGAIN的名字也叫做EWOULDBLOCK

2.6.2.3 TcpSender函数

TcpSender函数用于处理普通文件描述符的写事件就绪:

int TcpServer::TcpSender(Connection *conn)
{
    while (true)
    {
        ssize_t n = send(conn->_sock, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
        if (n > 0)
        {
            conn->_outbuffer.erase(0, n);
        }
        else
        {
            if (errno == EINTR)
                continue;
            else if (errno == EAGAIN || errno == EWOULDBLOCK)
                break;
            else
            {
                conn->_excepter(conn);
                logMsg(WARNING, "send error: %d: %s", errno, strerror(errno));
                break;
            }
        }
    }
    return 0;
}

该函数的设计思路基本上和TcpRecver一样。

2.6.2.4 TcpExcepter函数

TcpExcepter函数用于处理普通文件描述符的异常事件就绪:

int TcpServer::TcpExcepter(Connection *conn)
{
    if (!IsExists(conn->_sock))
        return -1;
    // 1. 从epoll中移除sock
    Epoller::DelEvent(_epfd, conn->_sock);
    logMsg(DEBUG, "remove sock from epoll!");
    // 2. 关闭sock
    close(conn->_sock);
    logMsg(DEBUG, "close sock: %d!", conn->_sock);

    // 3. delete conn
    delete _connections[conn->_sock];
    logMsg(DEBUG, "delete connection!");
    
    // 4. map中移除sock
    _connections.erase(conn->_sock);
    logMsg(DEBUG, "rease connection from connections");

    return 0;

}

在本服务器的实现中就是当某一个连接出现异常时,处理方式就是断开该连接。在异常处理中包含四个步骤:

  1. epoll实例中移除对该文件描述符的监管。
  2. 调用close函数关闭文件描述符。
  3. 使用delete释放 Connection 对象的资源。
  4. 把该文件描述符与其Connection对象的映射关系从unordered_map集合中移除。

2.6.3 AddConnection函数

该函数用于添加连接关系:

void TcpServer::AddConnection(int sockfd, uint32_t event, func_t recver, func_t sender, func_t excepter)
{
    // 1. 添加sock到epoll
    // 如果是ET模式,设置为非阻塞
    if (event & EPOLLET)
        Util::SetNonBlock(sockfd);

    Epoller::AddEvent(_epfd, sockfd, event);

    Connection *conn = new Connection(sockfd, this);
    conn->SetRecver(recver);
    conn->SetSender(sender);
    conn->SetExcepter(excepter);

    _connections.insert(std::make_pair(sockfd, conn));
    logMsg(DEBUG, "添加新连接到connections成功: %d", sockfd);
}

该函数的实现包括了:将文件描述符交给epoll实例进行监管,创建对应的Connection对象,建立该文件描述符与Connection之间的映射关系并将其添加到unordered_map集合中。

2.6.4 Dispatcher事件派发函数

该函数用于对就绪的文件描述符进行事件的派发:

void TcpServer::Dispatcher()
{
    int n = Epoller::Loop(_epfd, _revs, revs_num);
    for (int i = 0; i < n; ++i)
    {
        int sock = _revs[i].data.fd;
        uint32_t revent = _revs[i].events;

        // 如果出现错误,将错误统一交给读写事件
        if (revent & EPOLLERR)
            revent |= (EPOLLIN | EPOLLOUT);
        if (revent & EPOLLHUP)
            revent |= (EPOLLIN | EPOLLOUT);

        // 读事件就绪
        if (revent & EPOLLIN)
        {
            // 先判断map中是否有sock的映射关系,再判断回调函数是否被设置
            if (IsExists(sock) && _connections[sock]->_recver)
            {
                _connections[sock]->_recver(_connections[sock]);
            }
        }
        // 写事件就绪
        if (revent & EPOLLOUT)
        {
            // 先判断map中是否有sock的映射关系,再判断回调函数是否被设置
            if (IsExists(sock) && _connections[sock]->_sender)
            {
                _connections[sock]->_sender(_connections[sock]);
            }
        }
    }
}

在该函数的实现中,首先调用Epoller中的Loop成员函数获取事件就绪的文件描述符,然后在遍历就绪的文件描述符进行事件的派发,如果其中的文件描述符出现了异常错误,则将其统一交给读写事件,因为在读写事件处理的函数中也调用了对异常的处理函数。

2.6.5 EnableReadWrite函数

该函数用于修改指定文件描述符中epoll实例中被关系的事件:

 void TcpServer::EnableReadWrite(int sock, bool readable, bool writeable)
 {
     uint32_t event = 0;
     event |= (readable ? EPOLLIN : 0);
     event |= (writeable ? EPOLLOUT : 0);
     Epoller::ModEvent(_epfd, sock, event);
 }

因为在最初添加文件描述符到epoll中时只设置了关心读事件,原因是在刚建立连接时,写事件是就绪的,最后续代码运行过程中条件发生了改变,再对epoll要关心的事件进行修改。

三、简单的业务处理

这里实现一个简单的网络版本计算器,客户端连接服务器,以特定的格式向服务器发送运算请求,然后服务器响应运算结果给客户端。

3.1 简单协议定制


struct Request
{
    int x;
    int y;
    char op;
};

struct Response
{
    int code;
    int result;
};

bool Deserialize(std::string &in, Request *req)
{
    // 1 + 1, 2 * 4, 5 * 9, 6 *1
    std::size_t spaceOne = in.find(SPACE);
    if (std::string::npos == spaceOne)
        return false;
    std::size_t spaceTwo = in.rfind(SPACE);
    if (std::string::npos == spaceTwo)
        return false;

    std::string dataOne = in.substr(0, spaceOne);
    std::string dataTwo = in.substr(spaceTwo + SPACE_LEN);
    std::string oper = in.substr(spaceOne + SPACE_LEN, spaceTwo - (spaceOne + SPACE_LEN));
    if (oper.size() != 1)
        return false;

    // 转成内部成员
    req->x = atoi(dataOne.c_str());
    req->y = atoi(dataTwo.c_str());
    req->op = oper[0];

    return true;
}

void Serialize(const Response &resp, std::string *out)
{
    // "exitCode_ result_"
    std::string ec = std::to_string(resp.code);
    std::string res = std::to_string(resp.result);

    *out = ec;
    *out += SPACE;
    *out += res;
    *out += CRLF;
}

3.2 业务处理函数

calculator函数用于简单的计算逻辑:

using service_t = std::function<Response (const Request &req)>;

static Response calculator(const Request &req)
{
    Response resp = {0, 0};
    switch (req.op)
    {
    case '+':
        resp.result = req.x + req.y;
        break;
    case '-':
        resp.result = req.x - req.y;
        break;
    case '*':
        resp.result = req.x * req.y;
        break;
    case '/':
    { // x_ / y_
        if (req.y == 0)
            resp.code = -1; // -1. 除0
        else
            resp.result = req.x / req.y;
    }
    break;
    case '%':
    { // x_ / y_
        if (req.y == 0)
            resp.code = -2; // -2. 模0
        else
            resp.result = req.x % req.y;
    }
    break;
    default:
        resp.code = -3; // -3: 非法操作符
        break;
    }

    return resp;
}

main函数代码:

#include "TcpServer.hpp"
#include "Service.hpp"
#include <memory>

using namespace std;

static void usage(std::string process)
{
    cerr << "\nUsage: " << process << " port\n"
         << endl;
}
int BeginHandler(Connection *conn, std::string &message, service_t service)
{
    // 我们能保证,message一定是一个完整的报文,因为我们已经对它进行了解码
    Request req;
    // 反序列化,进行处理的问题
    if (!Deserialize(message, &req))
    {
        // 写回错误消息
        return -1;
        // 可以直接关闭连接
        // conn->excepter_(conn);
    }
    // 业务逻辑
    Response resp = service(req);

    std::cout << req.x << " " << req.op << " " << req.y << std::endl;
    std::cout << resp.code << " " << resp.result << std::endl;

    // 序列化
    std::string sendstr;
    Serialize(resp, &sendstr);

    // 处理完毕的结果,发送回给client
    conn->_outbuffer += sendstr;

    //std::cout << conn->_outbuffer << std::endl;

    // conn->_sender(conn);
    // if(conn->_outbuffer.empty()) conn->_R->EnableReadWrite(conn->_sock, true, false);
    // else conn->_R->EnableReadWrite(conn->_sock, true, true);

    // conn->R_->EnableReadWrite(conn->sock_, true, true);
    // conn->_sender()


    std::cout << "这里就是上次的业务逻辑啦 --- end" << std::endl;

    return 0;
}

// 1 + 1X2 + 3X5 + 6X8 -> 1 + 1
int HandlerRequest(Connection *conn, std::string &message)
{
    std::cout << "HandlerRequest" << std::endl;
    return BeginHandler(conn, message, calculator);
}

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        usage(argv[0]);
        exit(0);
    }

    unique_ptr<TcpServer> server(new TcpServer(HandlerRequest, atoi(argv[1])));
    server->Run();

    return 0;
}

3.4 运行服务器

首先启动服务器,可以看到3号文件描述符被添加到connections集合中,3号描述符就是监听的套接字描述符:

由于没有实现客户端程序,就使用telnet工具充当客户端:

此时可以发现新增了一个5号文件描述符的连接,因为4号描述符就是epoll实例的文件描述符,所以第一个客户端连接的文件描述符就是5号。

此时客户端向服务端发起指定格式的计算请求:

注意这里定制的协议就是每个计算请求以“X”作为分隔符,每个请求中的数字与运算符之间以空格隔开。

因为这里采用的是epoll多路复用的方式,虽然该服务器是单进程的,但是却可以同时为多个客户端提供服务:

当一个客户端退出时,其对应的文件描述符已经相关资源也会被释放,下一次客户端再进行连接的时候,就会使用空出来的这个文件描述符:

四、总结

当前服务器存在的弊端:
虽然当前服务器已经实现了多路复用的功能了,但是处理连接请求和业务逻辑等所有的工作都是由当前的服务器来完成的。况且当前的业务逻辑只是进行简单的运算,处理的连接数量也很少,因此对应服务器而言并没有什么压力。如果需要处理更加复杂的业务逻辑或者是同时面临大量的连接,那么此时服务器就会显得有些吃力了。

解决思路:
可再当前服务器的基础之上引入线程池,当读事件回调函数读取完相关的请求之后,就可以将响应的业务逻辑交给线程池进行处理,当线程池处理完毕之后,再将处理结果返回给服务器,由服务器组织响应给客户端程序。此时服务器就可以只关心连接请求,读写异常事件的处理,而将业务逻辑的处理交给线程池,就大大降低了服务器的负担,从而也能够处理更多的连接请求了。

线程池代码:

#include <iostream>
#include <queue>
#include <cassert>
#include <memory>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include "Lock.hpp"
#include "Log.hpp"

using namespace std;

const uint32_t gDefaultNum = 5;

//改造为单例模式:

template <class T>
class ThreadPool
{
private:
    ThreadPool(uint32_t threadNum = gDefaultNum)
        : _isStart(false),
          _threadNum(threadNum)
    {
        assert(threadNum > 0);
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ThreadPool(const ThreadPool<T> &) = delete;
    ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

public:
    static ThreadPool<T> *getInstance()
    {
        static Mutex mutex;
        if (nullptr == instance) //限定LockGuard的生命周期
        {
            LockGuard lockGurand(&mutex); //RAII
            if (nullptr == instance)
            {
                instance = new ThreadPool<T>();
            }
        }
        return instance;
    }

    static void *threadRoutine(void *args)
    {
        ThreadPool<T> *ptp = static_cast<ThreadPool<T> *>(args);

        while (true)
        {
            ptp->lockQueue();
            //判断当前任务队列有没有任务
            while (!ptp->hasTask())
            {
                //没有任务,当前线程等待
                ptp->waitTask();
            }

            //当前线程处理任务
            T t = ptp->pop();

            ptp->unlockQueue();

            Log() << "新线程完成任务:" << t.run() << endl;
        }
    }

    void start()
    {
        assert(!_isStart); //判断线程是否已经启动
        for (int i = 0; i < _threadNum; ++i)
        {
            pthread_t tid;
            pthread_create(&tid, nullptr, threadRoutine, this);
        }
        _isStart = true;
    }

    //放任务
    void push(const T &in)
    {
        lockQueue();
        _taskQueue.push(in);
        choiceThreadHandle();
        unlockQueue();
    }

    //消费任务
    T pop()
    {
        T tmp = _taskQueue.front();
        _taskQueue.pop();
        return tmp;
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    void lockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }

    void unlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void waitTask()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    void choiceThreadHandle()
    {
        pthread_cond_signal(&_cond);
    }

    bool hasTask()
    {
        return !_taskQueue.empty();
    }

private:
    bool _isStart;          //判断线程池是否开启
    uint32_t _threadNum;    //线程池中线程数量
    queue<T> _taskQueue;    //任务队列
    pthread_mutex_t _mutex; //保护任务队列的锁
    pthread_cond_t _cond;   //线程的条件变量

    static ThreadPool<T> *instance;
};

template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;

创建并启动线程池:

unique_ptr<ThreadPool<Task>> tp(ThreadPool<Task>::getInstance());

tp->start();

实现一个任务类Task:

#include <iostream>
#include <string>
#include "Protocol.hpp"
#include "TcpServer.hpp"

class Task
{
public:
    Task(Request &req, Connection *conn) : _req(req), _conn(conn)
    {
    }

    int operator()()
    {
        return run();
    }
    int run()
    {
        Response resp = {0, 0};
        switch (_req.op)
        {
        case '+':
            resp.result = _req.x + _req.y;
            break;
        case '-':
            resp.result = _req.x - _req.y;
            break;
        case '*':
            resp.result = _req.x * _req.y;
            break;
        case '/':
        {
            if (_req.y == 0)
            {
                resp.code = -1;
            }
            else
            {
                resp.result = _req.x / _req.y;
            }
            break;
        }
        case '%':
        {
            if (_req.y == 0)
            {
                resp.code = -2;
            }
            else
            {
                resp.result = _req.x % _req.y;
            }
            break;
        }
        default:
            resp.code = -3; // -3: 非法操作符
            break;
        }

        std::cout << resp.code << ":" << resp.result << std::endl;
        std::string sendstr;
        Serialize(resp, &sendstr);

        _conn->_outbuffer += sendstr;
        if (_conn->_outbuffer.empty())
            _conn->_R->EnableReadWrite(_conn->_sock, true, false);
        else
            _conn->_R->EnableReadWrite(_conn->_sock, true, true);

        return resp.result;
    }

private:
    Request _req;
    Connection *_conn;
};

运行展示:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/263.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2023年BeijngCrypt勒索病毒家族最新变种之.halo勒索病毒

目录 前言&#xff1a;简介 一、什么是.halo勒索病毒&#xff1f; 二、.halo勒索病毒是如何传播感染的&#xff1f; 三、感染.halo后缀勒索病毒建议立即做以下几件事情 四、中了.halo后缀的勒索病毒文件怎么恢复&#xff1f; 五、加密数据恢复情况 六、系统安全防护措施建…

宣布推出 .NET 社区工具包 8.1!

我们很高兴地宣布 .NET Community Toolkit 8.1 版正式发布&#xff01;这个新版本包括呼声很高的新功能、bug 修复和对 MVVM 工具包源代码生成器的大量性能改进&#xff0c;使开发人员在使用它们时的用户体验比以往更好&#xff01; 就像在我们之前的版本中一样&#xff0c;我…

STM32F1硬件SPI驱动nRF24L01通过按键控制数据收发带状态反馈

STM32F1硬件SPI驱动nRF24L01通过按键控制数据收发带状态反馈&#x1f4cc;相关篇《STM32F1基于STM32CubeMX配置硬件SPI驱动nRF24L01数据收发》 &#x1f3ac;功能演示 &#x1f33f;工程默认配置的是STM32F103VC单片机&#xff0c;其他型号的修改需要修改启动文件startup_st…

python+django+vue图书个性化推荐系统

整个系统是由多个功能模块组合而成的&#xff0c;要将所有的功能模块都一一列举出来&#xff0c;然后进行逐个的功能设计&#xff0c;使得每一个模块都有相对应的功能设计&#xff0c;然后进行系统整体的设计。 本图书个性化推荐系统结构图如图python manage.py runserver 开…

宇宙最强-GPT-4 横空出世:最先进、更安全、更有用

文章目录前言一、准确性提升1.创造力2.视觉输入3.更长的上下文二、相比于ChatGPT有哪些提升1.GPT-4 的高级推理能力超越了 ChatGPT2.GPT-4 在多种测试考试中均优于 ChatGPT。三、研究团队在GPT-4模型都做了哪些改善1.遵循 GPT、GPT-2 和 GPT-3 的研究路径2.我们花了 6 个月的时…

分享10个不错的C语言开源项目

今天跟大家分享10个重量级的C语言开源项目&#xff0c;C语言确实经得住考验&#xff1a; Redis&#xff1a;Redis是一个开源的高性能的键值对数据库。它以C语言编写&#xff0c;具有极高的性能和可靠性。 Nginx&#xff1a;Nginx是一个高性能的HTTP和反向代理服务器&#xff0…

刚工作3天就被裁了....

前言 还有谁&#xff1f;刚上三天班就被公司公司的工作不适合我&#xff0c;叫我先提升一下。 后面我也向公司那边讨要了一个说法&#xff0c;我只能说他们那边的说辞让我有些不服气。 现在之所以把这件事上记录一下&#xff0c;一是记录一下自己的成长轨迹&#xff0c;二是…

改进YOLO系列 | CVPR2023最新Backbone | FasterNet 远超 ShuffleNet、MobileNet、MobileViT 等模型

论文地址:https://export.arxiv.org/pdf/2303.03667v1.pdf 为了设计快速神经网络,许多工作都集中在减少浮点运算(FLOPs)的数量上。然而,作者观察到FLOPs的这种减少不一定会带来延迟的类似程度的减少。这主要源于每秒低浮点运算(FLOPS)效率低下。并且,如此低的FLOPS主要…

javaSE系列之类与对象

javaSE系列之类与方法什么是类类的定义书写事项什么是实例化this引用this的注意事项对象的初始化构造方法封装的概念访问限定符封装扩展之包static成员static的特性static的初始化代码块注意事项内部类1.实例内部类&#x1f497; &#x1f497; 博客:小怡同学&#x1f497; &am…

【LeetCode】1171. 从链表中删去总和值为零的连续节点、面试题 02.05. 链表求和

作者&#xff1a;小卢 专栏&#xff1a;《Leetcode》 喜欢的话&#xff1a;世间因为少年的挺身而出&#xff0c;而更加瑰丽。 ——《人民日报》 目录 1171. 从链表中删去总和值为零的连续节点 面试题 02.05. 链表求和 1171. 从链表中删去总和…

【面试题】面试官:如果后端给你 1w 条数据,你如何做展示?

最近一位朋友参加阿b的面试&#xff0c;然后面试官问了她这个问题&#xff0c;我问她咋写的&#xff0c;她一脸淡定的说&#xff1a;“虚拟列表。”大厂面试题分享 面试题库前后端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★地址&#xff1a;前端面…

tp6实现邮件发送

tp6实现邮件发送 phpMailer 是一个非常强大的 ph p发送邮件类,可以设定发送邮件地址、回复地址、邮件主题、html网页,上传附件,并且使用起来非常方便。 phpMailer 的特点&#xff1a; 1、在邮件中包含多个 TO、CC、BCC 和 REPLY-TO。2、平台应用广泛&#xff0c;支持的 SMTP…

阿里p8测试总监,让我们用这份《测试用例规范》,再也没加班过

经常看到无论是刚入职场的新人&#xff0c;还是工作了一段时间的老人&#xff0c;都会对编写测试用例感到困扰&#xff1f;例如&#xff1a; 固然&#xff0c;编写一份好的测试用例需要&#xff1a;充分的需求分析能力 理论及经验加持&#xff0c;作为测试职场摸爬打滚的老人&…

ElasticSearch 8 学习笔记总结(六)

文章目录一. ES8 的Java API 环境准备二. ES8 的Java API 索引操作三. ES8 的Java API 文档操作1. 文档的 插入 批量插入 删除等操作2. 文档的查询四、异步客户端操作一. ES8 的Java API 环境准备 ES8 废除了Type的概念。为了适应这种数据结构的改变&#xff0c;ES官方从1.7版…

学习 Python 之 Pygame 开发魂斗罗(十一)

学习 Python 之 Pygame 开发魂斗罗&#xff08;十一&#xff09;继续编写魂斗罗1. 改写主类函数中的代码顺序2. 修改玩家初始化3. 显示玩家生命值4. 设置玩家碰到敌人死亡5. 设置敌人子弹击中玩家6. 修改updatePlayerPosition()函数逻辑继续编写魂斗罗 在上次的博客学习 Pytho…

(四)HDFS双HA高可用机制

目录 概述 原理 主备切换 小结: 概述 进入到了hadoop 2.x的时代&#xff0c;为了保证namenode上的元数据不会丢失&#xff0c;而且是高可用的&#xff0c;出现了双实例HA的机制 原理 集群里启动两个namenode&#xff0c;一个是active状态(主)&#xff0c;一个是standby(备…

HDFS黑名单退役服务器

黑名单&#xff1a;表示在黑名单的主机IP地址不可以&#xff0c;用来存储数据。 企业中&#xff1a;配置黑名单&#xff0c;用来退役服务器。 黑名单配置步骤如下&#xff1a; 1&#xff09;编辑/opt/module/hadoop-3.1.3/etc/hadoop目录下的blacklist文件 添加如下主机名称&…

Spring —— 初学 Spring, 理解控制反转

JavaEE传送门JavaEE Servlet —— Smart Tomcat,以及一些访问出错可能的原因 Servlet —— Servlet API 目录SpringIoC (理解控制反转)传统代码ioc代码DISpring Spring 通常指的是 Spring Farmework (Spring 框架), 它是一个开源框架 用一句话来概括: Spring 是一个包含了众多…

【linux】Linux基本指令(上)

前言&#xff1a; 在之前我们已经简单了介绍了一下【Linux】&#xff0c;包括它的概念&#xff0c;由来啊等进行了讲解&#xff0c;接下来我们就将正式的踏入对其的学习&#xff01;&#xff01;&#xff01; 本文目录&#x1f449;操作系统的概念1.命令的语法1.1命令介绍1.2选…

ChatGPT告诉你:项目管理能干到60岁吗?

早上好&#xff0c;我是老原。这段时间最火的莫过于ChatGPT&#xff0c;从文章创作到论文写作&#xff0c;甚至编程序&#xff0c;简直厉害的不要不要的。本以为过几天热度就自然消退了&#xff0c;结果是愈演愈烈&#xff0c;热度未减……大家也从一开始得玩乐心态&#xff0c…
最新文章