【Linux】Reactor模式

Reactor模式

Reactor模式的定义

Reactor反应器模式,也叫做分发者模式或通知者模式,是一种将就绪事件派发给对应服务处理程序的事件设计模式。

Reactor模式的角色构成

Reactor主要由以下五个角色构成:

reactor模式的角色
角色解释
Handle(句柄)用于标识不同的事件,本质就是一个文件描述符
Sychronous Event Demultiplexer(同步事件分离器)本质就是一个系统调用,用于等待时间发声。对于Linux来说,同步事件分离器指的就是IO/多路复用,比如select、poll、epoll等
Event Handler(事件处理器)由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的处理反馈
Concrete Event Handler(具体事件处理器)事件处理器中各种回调方法的具体实现
Initiation Dispatcher(初始分发器)初始分发器实际上就是reactor角色,初始分发器会通过同步事件分离器来等待事件的发生,当对应事件就绪就调用事件处理器,最后调用对应的回调方法来处理这个事件

Reactor模式的工作流程

Reactor模式的工作流程如下:

  1. 当应用向初始分发器注册具体事件处理器时,应用会标识出该事件处理器希望初始分发器在某个事件发生时向其通知,该事件于Handle关联。
  2. 初始分发器会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器。
  3. 当所有的事件处理器注册完毕后,应用会启动初始分发器的事件循环,这时初始分发器会将每个事件处理器的Handle合并起来,并使用同步事件分离器等待这些事件的发生。
  4. 当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初始分发器。
  5. 初始分发器会将Ready状态的Handle作为key,来寻找其对应的事件处理器。
  6. 初始分发器会调用其对应事件处理器当中的回调方法来响应该事件。

epoll ET服务器(Reactor模式)

如果在此之前没有了解过Reactor模式,相信在看了Reactor模式的工作流程后一定是一头雾水,下面我们实现一个Reactor模式下的epoll ET服务器,来感受一下Reactor模式。

设计思路

功能完善于问题提出

之前echo版的EpollServer,只能做了检测读取事件的就绪,现在需要添加以下功能:

  1. 读取数据保存的问题:
    1. 如何保证读取上来的数据就是一条完整的报文?一次读取不能保证,需要边读取边检测。
    2. 既然一次的读取数据不能保证读完,那读上来的数据如何保存呢?建议一个堆的缓存区。套接字会有很多,那又如何确保套接字和缓冲区一 一对应呢?
    3. 在ET的工作模式下,怎么能确保通知一次,就把内核中缓冲区的数据全部读取完呢?循环读取,非阻塞的方式。那什么时候停止读取操作呢?读取上来的数据小于预期的值。
  2. 如果得到了一条完整的报文,如何提取提取有效载荷呢?定制协议+Json的序列化和反序列化!
  3. 如果读、写和异常事件发生了,如何知道?Epoll接口。怎么样执行对应的方法呢?
  4. 写事件的就绪条件是缓冲区没有满就是就绪的,所以,如果一直监视写事件的话,就需要一直调用写事件所对应的方法,而大部分时候是没有数据可以发送的,这样调用方法就直接返回了,浪费CPU资源 —— 结论&细节:写事件是按需进行监视的!也就是说有数据要发送的时候才开启对写事件的监视。
  5. 在ET的工作模式下,要循环写入,确保一次通知就把写的工作干完。当输出缓冲区的数据为空的时候结束写的事件!

设计思路

epoll ET服务器

在epoll ET服务器中,我们需要处理如下几种事件:

  • 读事件:如果是监听套接字的读事件就绪则调用accept函数获取底层的连接,如果是其他套接字的读事件就绪则调用recv函数读取客户端发来的数据。
  • 写事件:写事件就绪则将待发送的数据写入到发送缓冲区当中。
  • 异常事件:当某个套接字的异常事件就绪时我们不做过多处理,直接关闭该套接字。

当epoll ET服务器监测到某一事件就绪后,就会将该事件交给对应的服务处理程序进行处理。

Reactor模式的五个角色

在这个epoll ET服务器中,Reactor模式中的五个角色对应如下:

  • 句柄:文件描述符
  • 同步事件分离器:IO/多路复用epoll。
  • 事件处理器:包括读回调、写回调和异常回调。
  • 具体事件处理器:读回调、写回调和异常回调的具体实现。
  • 初始分发器:Reactor类当中的Dispatcher函数

Dispatcher函数要做的就是调用epoll_wait函数等待事件发生,当有事件发生后就将就绪的事件派发给对应的服务处理程序即可。

EventItem类

  • 在Reactor的工作流程中说道,在注册事件处理器时需要将其与Handle关联,本质上就是将读回调、写回调和异常回调与某个文件描述符关联起来。
  • 这样做的目的就是为了当某个文件描述符上的事件就绪就可以找到其对应的各种回调函数,进而执行对应的回调方法来处理该事件。

所以我们可以设计一个Eventtem类,该类中的成员就包括一个文件描述符,以及该文件描述符对应的各种回调函数。

Reactor类

  • 在Reactor的工作流程中说道,当所有事件处理器注册完毕后,会使用同步事件分离器等待这些事件发生,当某个事件处理的Handle变成为Ready状态时,同步事件分离器会通知初始分发器,然后初始分发器会将Ready状态的Handle作为key来寻找其对应的事件处理器,并调用该事件处理器中对应的回调方法来响应该事件。
  • 本质就是当事件注册完毕后,会调用epoll_wait函数来等待这些事件发生,当某个事件就绪时epoll_wait函数就会告诉调用方,然后调用方就根据就绪的文件描述符来找到其对应的各种回调函数,并调用对应的回调函数进行事件处理。

对此我们可以设计一个Reactor类。

  • 该类当中有一个成员函数叫做Dispatcher,这个函数就是所谓的初始分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件。
  • 该类当中有一个成员函数叫做Dispatcher,这个函数就是所谓的初始分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件。
  • 我们可以使用C++ STL当中的unordered_map,来建立各个文件描述符与其对应的EventItem结构之间的映射,这个unordered_map可以作为Reactor类的一个成员变量,当需要找某个文件描述符的EventItem结构时就可以通过该成员变量找到。
  • 当然,Reactor类当中还需要提供成员函数AddEvent和DelEvent,用于向Dispatcher当中注册和删除事件。

此外,在Reactor类当中还有一些其他成员,后面实现的时候再做详细论述。

epoll ET服务器的工作流程

这个epoll ET服务器在Reactor模式下的工作流程如下:

  • 首先epoll ET服务器需要进行套接字的创建、绑定和监听。
  • 然后定义一个Reactor对象并初始化,初始化时要做的就是创建epoll模型。
  • 紧接着需要为监听套接字创建对应的EventItem结构,并调用Reactor类中提供的AddEvent函数将监听套接字添加到epoll模型中,并建立监听套接字与其对应的EventItem结构之间的映射关系。
  • 之后就可以不断调用Reactor类中的Dispatcher函数进行事件派发。

在事件处理过程中,会不断向Dispatcher当中新增或删除事件,而每个事件就绪时都会自动调用其对应的回调函数进行处理,所以我们要做的就是不断调用Dispatcher函数进行事件派发即可。

EventItem结构

EventItem结构中除了包含文件描述符和其对应的读回调、写回调和异常回调之外,还包含一个输入缓冲区inbuffer、一个输出缓冲区outbuffer以及一个回指指针R。

  • 当某个文件描述符的读事件就绪时,我们会调用recv函数读取客户端发来的数据,但我们并不能保证我们读取到了一个完整的报文,因此需要将读取到的数据暂时存放到该文件描述符对应的inbuffer当中,当inbuffer当中可以分离出一个完整的报文后再将其分离出来进行数据处理,这里的inbuffer本质就是用来解决粘包问题的。
  • 当处理完一个报文请求后,需要将响应数据发送给客户端,但我们并不能保证底层TCP的发送缓冲区中有足够的空间供我们写入,因此需要将要发送的数据暂时存放到该文件描述符对应的outbuffer当中,当底层TCP的发送缓冲区中有空间,即写事件就绪时,再依次发送outbuffer当中的数据。
  • EventItem结构当中设置回指指针R,便于快速找到我们定义的Reactor对象,因为后续我们需要根据EventItem结构找到这个Reactor对象。比如当连接事件就绪时,需要调用Reactor类当中的AddEvent函数将其添加到Dispatcher当中。

此外,EventItem结构当中需要提供一个管理回调的成员函数,便于外部对EventItem结构当中的各种回调进行设置。

代码如下:

typedef int(*callback_t)(EventItem*);

class EventItem{
public:
	int _sock; //文件描述符
	Reactor* _R; //回指指针

	callback_t _recv_handler; //读回调
	callback_t _send_handler; //写回调
	callback_t _error_handler; //异常回调

	std::string _inbuffer; //输入缓冲区
	std::string _outbuffer; //输出缓冲区
public:
	EventItem()
		: _sock(-1)
		, _R(nullptr)
		, _recv_handler(nullptr)
		, _send_handler(nullptr)
		, _error_handler(nullptr)
	{}
	//管理回调
	void ManageCallbacks(callback_t recv_handler, callback_t send_handler, callback_t error_handler)
	{
		_recv_handler = recv_handler;
		_send_handler = send_handler;
		_error_handler = error_handler;
	}
	~EventItem()
	{}
};

Reactor类

在Reactor类当中有一个unordered_map成员,用于建立文件描述符和与其对应的EventItem结构之间的映射,还有一个epfd成员,该成员是epoll模型对应的文件描述符。

  • 在初始化Reactor对象的时候就可以调用epoll_create函数创建epoll模型,并将该epoll模型对应的文件描述符用epfd成员记录下来,便于后续使用。
  • Reactor对象在析构的时候,需要调用close函数将该epoll模型进行关闭。

代码如下:

#define SIZE 256

class Reactor{
private:
	int _epfd; //epoll模型
	std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
	Reactor()
		: _epfd(-1)
	{}
	void InitReactor()
	{
		//创建epoll模型
		_epfd = epoll_create(SIZE);
		if (_epfd < 0){
			std::cerr << "epoll_create error" << std::endl;
			exit(5);
		}
	}
	~Reactor()
	{
		if (_epfd >= 0){
			close(_epfd);
		}
	}
};

Dispatcher函数(事件分派器)

Reactor类当中的Dispatcher函数就是之前所说的初始分发器,这里我们更形象的将其称之为事件分派器。

  • 事件分派器要做的就是调用epoll_wait函数等待事件发生。
  • 当某个文件描述符上的事件发生后,先通过unordered_map找到该文件描述符对应的EventItem结构,然后调用EventItem结构当中对应的回调函数对该事件进行处理即可。

代码如下:

#define MAX_NUM 64

class Reactor{
private:
	int _epfd; //epoll模型
	std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
	//事件分派器
	void Dispatcher(int timeout)
	{
		struct epoll_event revs[MAX_NUM];
		int num = epoll_wait(_epfd, revs, MAX_NUM, timeout);
		for (int i = 0; i < num; i++){
			int sock = revs[i].data.fd; //就绪的文件描述符
			if ((revs[i].events&EPOLLERR) || (revs[i].events&EPOLLHUP)){ //异常事件就绪(优先处理)
				if (_event_items[sock]._error_handler)
					_event_items[sock]._error_handler(&_event_items[sock]); //调用异常回调
			}
			if (revs[i].events&EPOLLIN){ //读事件就绪
				if (_event_items[sock]._recv_handler)
					_event_items[sock]._recv_handler(&_event_items[sock]); //调用读回调
			}
			if (revs[i].events&EPOLLOUT){ //写事件就绪
				if (_event_items[sock]._send_handler)
					_event_items[sock]._send_handler(&_event_items[sock]); //调用写回调
			}
		}
	}
};

说明一下:

  • 这里没有用switch或if语句对epoll_wait函数的返回值进行判断,而是借用for循环对其返回值进行了判断。
  • 如果epoll_wait的返回值为-1则说明epoll_wait函数调用失败,此时不会进入到for循环内部进行事件处理。
  • 如果epoll_wait的返回值为0则说明epoll_wait函数超时返回,此时也不会进入到for循环内部进行事件处理。
  • 如果epoll_wait的返回值大于0则说明epoll_wait函数调用成功,此时才会进入到for循环内部调用对应的回调函数对事件进行处理。
  • 事件处理时最好先对异常事件进行处理,因此代码中将异常事件的判断放在了最前面。

AddEvent函数

Reactor类当中的AddEvent函数是用于进行事件注册的。   

  • 在注册事件时需要传入一个文件描述符和一个事件集合,表示需要监视哪个文件描述符上的哪些事件。
  • 还需要传入该文件描述符对应的EventItem结构,表示当该文件描述符上的事件就绪后应该执行的回调方法。
  • 在AddEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符及其对应的事件集合注册到epoll模型当中,然后建立该文件描述符与其对应的EventItem结构的映射关系。

代码如下:

class Reactor{
private:
	int _epfd; //epoll模型
	std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
	void AddEvent(int sock, uint32_t event, const EventItem& item)
	{
		struct epoll_event ev;
		ev.data.fd = sock;
		ev.events = event;
		
		if (epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev) < 0){ //将该文件描述符添加到epoll模型当中
			std::cerr << "epoll_ctl add error, fd: " << sock << std::endl;
		}
		else{
			//建立sock与EventItem结构的映射关系
			_event_items.insert({ sock, item });
			std::cout << "添加: " << sock << " 到epoll模型中,成功" << std::endl;
		}
	}
};

DelEvent函数 

Reactor类当中的DelEvent函数是用于进行事件删除的。

  • 在删除事件时只需要传入一个文件描述符即可。
  • 在DelEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符从epoll模型中删除,并取消该文件描述符与其对应的EventItem结构的映射关系。

代码如下:

class Reactor{
private:
	int _epfd; //epoll模型
	std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
	void DelEvent(int sock)
	{
		if (epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr) < 0){ //将该文件描述符从epoll模型中删除
			std::cerr << "epoll_ctl del error, fd: " << sock << std::endl;
		}
		else{
			//取消sock与EventItem结构的映射关系
			_event_items.erase(sock);
			std::cout << "从epoll模型中删除: " << sock << ",成功" << std::endl;
		}
	}
};

EnableReadWrite函数

Reactor类当中的EnableReadWrite函数,用于使能或使能某个文件描述符的读写事件。

  • 调用EnableReadWrite函数时需要传入一个文件描述符,表示需要设置的是哪个文件描述符对应的事件。
  • 还需要传入两个bool值,分别表示需要使能还是使能读写事件。
  • EnableReadWrite函数内部会调用epoll_ctl函数修改将该文件描述符的监听事件。

代码如下:

class Reactor{
private:
	int _epfd; //epoll模型
	std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
	void EnableReadWrite(int sock, bool read, bool write){
		struct epoll_event ev;
		ev.data.fd = sock;
		ev.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
		if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0){ //修改该文件描述符所需要监视的事件
			std::cerr << "epoll_ctl mod error, fd: " << sock << std::endl;
		}
	}
};

回调函数

下面我们就可以实现一些回调函数,这里主要实现四个回调函数。

  • accepter:当连接事件到来时可以调用该回调函数获取底层建立好的连接。
  • recver:当读事件就绪时可以调用该回调函数读取客户端发来的数据并进行处理。
  • sender:当写事件就绪时可以调用该回调函数向客户端发送响应数据。
  • errorer:当异常事件就绪时可以调用该函数将对应的文件描述符进行关闭。

当我们为某个文件描述符创建EventItem结构时,就可以调用EventItem类提供的ManageCallbacks函数,将这些回调函数到EventItem结构当中。

  • 我们会将监听套接字对应的EventItem结构当中的recv_handler设置为accepter,因为监听套接字的读事件就绪就意味着连接事件就绪了,而监听套接字一般只关心读事件,因此监听套接字对应的send_handler和error_handler可以设置为nullptr。
  • 当Dispatcher监测到监听套接字的读事件就绪时,会调用监听套接字对应的EventItem结构当中的recv_handler回调,此时就会调用accepter回调获取底层建立好的连接。
  • 而对于与客户端建立连接的套接字,我们会将其对应的EventItem结构当中的recv_handler、send_handler和error_handler分别设置为这里的recver、sender和error。
  • 当Dispatcher监测到这些套接字的事件就绪时,就会调用其对应的EventItem结构当中对应的回调函数,也就是这里的recver、sender和error。

accepter回调

accepter回调用于处理连接事件,其工作流程如下:

  1. 调用accept函数获取底层建立好的连接。
  2. 将获取到的套接字设置为非阻塞,并为其创建EventItem结构,填充EventItem结构当中的各个字段,并注册该套接字相关的回调方法。
  3. 将该套接字及其对应需要关心的事件注册到Dispatcher当中。

下一次Dispatcher在进行事件派发时就会帮我们关注该套接字对应的事件,当事件就绪时就会执行该套接字对应的EventItem结构中对应的回调方法。

代码如下:

int accepter(EventItem* item)
{
	while (true){
		struct sockaddr_in peer;
		memset(&peer, 0, sizeof(peer));
		socklen_t len = sizeof(peer);
		int sock = accept(item->_sock, (struct sockaddr*)&peer, &len);
		if (sock < 0){
			if (errno == EAGAIN || errno == EWOULDBLOCK){ //并没有读取出错,只是底层没有连接了
				return 0;
			}
			else if (errno == EINTR){ //读取的过程被信号中断了
				continue;
			}
			else{ //获取连接失败
				std::cerr << "accept error" << std::endl;
				return -1;
			}
		}
		SetNonBlock(sock); //将该套接字设置为非阻塞
		//构建EventItem结构
		EventItem sock_item;
		sock_item._sock = sock;
		sock_item._R = item->_R;
		sock_item.ManageCallbacks(recver, sender, errorer); //注册回调方法
		
		Reactor* R = item->_R;
		R->AddEvent(sock, EPOLLIN | EPOLLET, sock_item); //将该套接字及其对应的事件注册到Dispatcher中
	}
	return 0;
}

需要注意的是,因为这里实现的ET模式下的epoll服务器,因此在获取底层连接时需要循环调用accept函数进行读取,并且监听套接字必须设置为非阻塞。

  • 因为ET模式下只有当底层建立的连接从无到有或是从有到多时才会通知上层,如果没有一次性将底层建立好的连接全部获取,并且此后再也没有建立好的连接,那么底层没有读取完的连接就相当于丢失了,所以需要循环多次调用accept函数获取底层建立好的连接。
  • 循环调用accept函数也就意味着,当底层连接全部被获取后再调用accept函数,此时就会因为底层已经没有连接了而被阻塞住,因此需要将监听套接字设置为非阻塞,这样当底层没有连接时accept就会返回,而不会被阻塞住。

accept获取到的新的套接字也需要设置为非阻塞,就是为了避免将来循环调用recv、send等函数时被阻塞。

设置文件描述符为非阻塞

设置文件描述符为非阻塞时,需要先调用fcntl函数获取该文件描述符对应的文件状态标记,然后在该文件状态标记的基础上添加非阻塞标记O_NONBLOCK,最后调用fcntl函数对该文件描述符的状态标记进行设置即可。

代码如下:

//设置文件描述符为非阻塞
bool SetNonBlock(int sock)
{
	int fl = fcntl(sock, F_GETFL);
	if (fl < 0){
		std::cerr << "fcntl error" << std::endl;
		return false;
	}
	fcntl(sock, F_SETFL, fl | O_NONBLOCK);
	return true;
}

监听套接字设置为非阻塞后,当底层连接不就绪时,accept函数会以出错的形式返回,因此当调用accept函数的返回值小于0时,需要继续判断错误码。

  • 如果错误码为EAGAINEWOULDBLOCK,说明本次出错返回是因为底层已经没有可获取的连接了,此时底层连接全部获取完毕,这时我们可以返回0,表示本次accepter调用成功。
  • 如果错误码为EINTR,说明本次调用accept函数获取底层连接时被信号中断了,这时还应该继续调用accept函数进行获取。
  • 除此之外,才说明accept函数是真正调用失败了,这时我们可以返回-1,表示本次accepter调用失败。

accept、recv和send等IO系统调用为什么会被信号中断?

IO系统调用函数出错返回并且将错误码设置为EINTR,表明本次在进行数据读取或数据写入之前就被信号中断了,也就是说IO系统调用在陷入内核,但并没有返回用户态的时候内核跑去处理其他信号了。

  • 在内核态返回用户态之前检查信号的pending位图,也就是未决信号集,如果pending位图中有未处理的信号,那么内核就会对该信号进行处理。
  • 但IO系统调用函数在进行IO操作之前就被信号中断了,这实际上就是一个特例,因为IO过程分为“等”和“拷贝”两个步骤,而一般“等”的过程比较漫长,而这个过程中我们的执行流其实是处于闲置状态的,因此在“等”的过程中如果有信号产生,内核就会立即进行信号的处理。

写事件是按需打开的

这里调用accept获取上来的套接字在添加到Dispatcher中时,只添加了EOPLLINEPOLLET事件,也就是说只让epoll帮我们关心该套接字的读事件。

  • 这里之所以没有添加写事件,是因为当前我们并没有要发送的数据,因此没有必要让epoll帮我们关心写事件。
  • 一般读事件是经常会被设置的,而写事件则是按需打开的,只有当我们有数据要发送时才会将写事件打开,并且在数据全部写入完毕后又会立即将写事件关闭。

recver回调

recver回调用于处理读事件,其工作流程如下:

  1. 循环调用recv函数读取数据,并将读取到的数据添加到该套接字对应EventItem结构的inbuffer当中。
  2. 对inbuffer当中的数据进行切割,将完整的报文切割出来,剩余的留在inbuffer当中。
  3. 对切割出来的完整报文进行反序列化。
  4. 业务处理。
  5. 业务处理后形成响应报文。
  6. 将响应报头添加到对应EventItem结构的outbuffer当中,并打开写事件。

下一次Dispatcher在进行事件派发时就会帮我们关注该套接字的写事件,当写事件就绪时就会执行该套接字对应的EventItem结构中写回调方法,进而将outbuffer中的响应数据发送给客户端。

代码如下:

int recver(EventItem* item)
{
	if (item->_sock < 0) //该文件描述符已经被关闭
		return -1;
	//1、数据读取
	if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){ //读取失败
		item->_error_handler(item);
		return -1;
	}

	//2、报文切割
	std::vector<std::string> datagrams;
	StringUtil::Split(item->_inbuffer, &datagrams, "X");

	for (auto s : datagrams){
		//3、反序列化
		struct data d;
		StringUtil::Deserialize(s, &d._x, &d._y, &d._op);

		//4、业务处理
		int result = 0;
		switch (d._op)
		{
			case '+':
				result = d._x + d._y;
				break;
			case '-':
				result = d._x - d._y;
				break;
			case '*':
				result = d._x * d._y;
				break;
			case '/':
				if (d._y == 0){
					std::cerr << "Error: div zero!" << std::endl;
					continue; //继续处理下一个报文
				}
				else{
					result = d._x / d._y;
				}
				break;
			case '%':
				if (d._y == 0){
					std::cerr << "Error: mod zero!" << std::endl;
					continue; //继续处理下一个报文
				}
				else{
					result = d._x % d._y;
				}
				break;
			default:
				std::cerr << "operation error!" << std::endl;
				continue; //继续处理下一个报文
		}

		//5、形成响应报文
		std::string response;
		response += std::to_string(d._x);
		response += d._op;
		response += std::to_string(d._y);
		response += "=";
		response += std::to_string(result);
		response += "X"; //报文与报文之间的分隔符
		
		//6、将响应报文添加到outbuffer中
		item->_outbuffer += response;
		if (!item->_outbuffer.empty())
			item->_R->EnableReadWrite(item->_sock, true, true); //打开写事件
	}
	return 0;
}

一、数据读取

我们可以将循环调用recv函数读取数据的过程封装成一个recver_helper函数。

  • recver_helper函数要做的就是循环调用recv函数将读取到的数据添加到inbuffer当中。
  • 当recv函数的返回值小于0时同样需要进一步判断错误码,如果错误码为EAGAINEWOULDBLOCK则说明底层数据读取完毕了,如果错误码为EINTR则说明读取过程被信号中断了,此时还需要继续调用recv函数进行读取,否则就是读取出错了。
  • 当读取出错时直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。

代码如下:

int recver_helper(int sock, std::string* out)
{
	while (true){
		char buffer[128];
		ssize_t size = recv(sock, buffer, sizeof(buffer)-1, 0);
		if (size < 0){
			if (errno == EAGAIN || errno == EWOULDBLOCK){ //数据读取完毕
				return 0;
			}
			else if (errno == EINTR){ //被信号中断,继续尝试读取
				continue;
			}
			else{ //读取出错
				return -1;
			}
		}
		else if (size == 0){ //对端连接关闭
			return -1;
		}
		//读取成功
		buffer[size] = '\0';
		*out += buffer; //将读取到的数据添加到该套接字对应EventItem结构的inbuffer中
	}
}

二、报文切割

报文切割本质就是为了防止粘包问题,而粘包问题实际是涉及到协议定制的。

  • 因为我们需要根据协议知道如何将各个报文进行分离,比如UDP分离报文采用的就是定长报头+自描述字段。
  • 我们的目的是演示整个数据处理的过程,为了简单起见就不进行过于复杂的协议定制了,这里我们就以“X”作为各个报文之间的分隔符,每个报文的最后都会以一个“X”作为报文结束的标志。
  • 因此现在要做的就是以“X”作为分隔符对inbuffer当中的字符串进行切割,这里将这个过程封装成一个Split函数并放到一个StringUtil工具类当中。
  • Split函数要做的就是对inbuffer当中的字符串进行切割,将切割出来的一个个报文放到vector当中,对于最后无法切出完整报文的数据就留在inbuffer当中即可。

代码如下:

class StringUtil{
public:
	static void Split(std::string& in, std::vector<std::string>* out, std::string sep)
	{
		int start = 0;
		size_t pos = in.find(sep, start);
		while (pos != std::string::npos){
			out->push_back(in.substr(start, pos - start));
			start = pos + sep.size();
			pos = in.find(sep, start);
		}
		in = in.substr(start);
	}
};

三、反序列化

在数据发送之前需要进行序列化encode,接收到数据之后需要对数据进行反序列化decode。

  • 序列化就是将对象的状态信息转换为可以存储或传输的形式(字节序列)的过程。
  • 反序列化就是把字节序列恢复为原对象的过程。

实际反序列化也是与协议定制相关的,假设这里的epoll服务器向客户端提供的就是计算服务,客户端向服务器发来的都是需要服务器计算的计算表达式,因此可以用一个结构体来描述这样一个计算表达式,结构体当中包含两个操作数x和y,以及一个操作符op。

struct data{
	int _x;
	int _y;
	char _op;
};

此时这里所谓的反序列化就是将一个计算表达式转换成这样一个结构体,

  • 因此现在要做的就是将形如“1+2”这样的计算表达式转换成一个结构体,该结构体当中的x成员的值就是1,y的值就是2,op的值就是‘+’,这里将这个过程封装成一个Deserialize函数并放到StringUtil工具类当中。
  • Deserialize函数要做的工作其实也很简单,就是在传入的字符串当中找到操作符op,此时操作符左边的就是操作数x,右边的就是操作数y。

代码如下:

class StringUtil{
public:
	static void Deserialize(std::string& in, int* x, int* y, char* op)
	{
		size_t pos = 0;
		for (pos = 0; pos < in.size(); pos++){
			if (in[pos] == '+' || in[pos] == '-' || in[pos] == '*' || in[pos] == '/' || in[pos] == '%')
				break;
		}
		if (pos < in.size()){
			std::string left = in.substr(0, pos);
			std::string right = in.substr(pos + 1);

			*x = atoi(left.c_str());
			*y = atoi(right.c_str());
			*op = in[pos];
		}
		else{
			*op = -1;
		}
	}
};

说明一下: 实际在做项目时不需要我们自己进行序列化和反序列化,我们一般会直接用JSON或XML这样的序列化反序列化工具。

四、业务处理

业务处理就是服务器拿到客户端发来的数据后,对数据进行数据分析,最终拿到客户端想要的资源。

  • 我们这里要做的业务处理非常简单,就是用反序列化后的数据进行数据计算,此时得到的计算结果就是客户端想要的。

五、形成响应报文

在业务处理后我们已经拿到了客户端想要的数据,现在我们要做的就是形成响应报文,由于我们这里规定每个报文都以“X”作为报文结束的标志,因此在形成响应报文的时候,就需要在每一个计算结果后面都添加上一个“X”,表示这是之前某一个请求报文的响应报文,因为协议制定后就需要双方遵守。

六、将响应报文添加到outbuffer中

响应报文构建完后需要将其添加到该套接字对应的outbuffer中,并打开该套接字的写事件,此后当写事件就绪时就会将outbuffer当中的数据发送出去。

sender回调

sender回调用于处理写事件,其工作流程如下:

  1. 循环调用send函数发送数据,并将发送出去的数据从该套接字对应EventItem结构的outbuffer中删除。
  2. 如果循环调用send函数后该套接字对应的outbuffer当中的数据被全部发送,此时就需要将该套接字对应的写事件关闭,因为已经没有要发送的数据了,如果outbuffer当中的数据还有剩余,那么该套接字对应的写事件就应该继续打开。

代码如下:

int sender(EventItem* item)
{
	if (item->_sock < 0) //该文件描述符已经被关闭
		return -1;

	int ret = sender_helper(item->_sock, item->_outbuffer);
	if (ret == 0){ //全部发送成功,不再关心写事件
		item->_R->EnableReadWrite(item->_sock, true, false);
	}
	else if (ret == 1){ //没有发送完毕,还需要继续关心写事件
		item->_R->EnableReadWrite(item->_sock, true, true);
	}
	else{ //写入出错
		item->_error_handler(item);
	}
	return 0;
}

我们可以将循环调用send函数发送数据的过程封装成一个sender_helper函数。

  • sender_helper函数要做的就是循环调用send函数将outbuffer中的数据发送出去。
  • 当send函数的返回值小于0时也需要进一步判断错误码,如果错误码为EAGAINEWOULDBLOCK则说明底层TCP发送缓冲区已经被写满了,这时需要将已经发送的数据从outbuffer中移除。
  • 如果错误码为EINTR则说明发送过程被信号中断了,此时还需要继续调用send函数进行发送,否则就是发送出错了。
  • 当发送出错时也直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。
  • 如果最终outbuffer当中的数据全部发送成功,则将outbuffer清空即可。

代码如下:

int sender_helper(int sock, std::string& in)
{
	size_t total = 0; //累加已经发送的字节数
	while (true){
		ssize_t size = send(sock, in.c_str() + total, in.size() - total, 0);
		if (size < 0){
			if (errno == EAGAIN || errno == EWOULDBLOCK){ //底层发送缓冲区已经没有空间了
				in.erase(0, total); //将已经发送的数据移出outbuffer
				return 1; //缓冲区写满,没写完
			}
			else if (errno == EINTR){ //被信号中断,继续尝试写入
				continue;
			}
			else{ //写入出错
				return -1;
			}
		}
		total += size;
		if (total >= in.size()){
			in.clear(); //清空outbuffer
			return 0; //全部写入完毕
		}
	}
}

errorer回调

errorer回调用于处理异常事件。

  • 对于异常事件就绪的套接字我们这里不做其他过多的处理,简单的调用close函数将该套接字关闭即可。
  • 但是在关闭该套接字之前,需要先调用DelEvent函数将该套接字从epoll模型中删除,并取消该套接字与其对应的EventItem结构的映射关系。
  • 由于在Dispatcher当中是先处理的异常事件,为了避免该套接字被关闭后继续进行读写操作,然后因为读写操作失败再次调用errorer回调重复关闭该文件描述符,因此在关闭该套接字后将其EventItem当中的文件描述符值设置为-1。
  • 在调用recver和sender回调执行读写操作之前,都会判断该EventItem结构当中的文件描述符值是否有效,如果无效则不会进行后续操作。

代码如下:

int errorer(EventItem* item)
{
	item->_R->DelEvent(item->_sock); //将该文件描述符从epoll模型中删除,并取消该文件描述符与其EventItem结构的映射关系
	close(item->_sock); //关闭该文件描述符

	item->_sock = -1; //防止关闭后继续执行读写回调
	return 0;
}

套接字相关

这里可以编写一个Socket类,对套接字相关的接口进行一定程度的封装,为了让外部能够直接调用Socket类当中封装的函数,于是将这些函数定义成了静态成员函数。

代码如下:

class Socket{
public:
	//创建套接字
	static int SocketCreate()
	{
		int sock = socket(AF_INET, SOCK_STREAM, 0);
		if (sock < 0){
			std::cerr << "socket error" << std::endl;
			exit(2);
		}
		//设置端口复用
		int opt = 1;
		setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
		return sock;
	}
	//绑定
	static void SocketBind(int sock, int port)
	{
		struct sockaddr_in local;
		memset(&local, 0, sizeof(local));
		local.sin_family = AF_INET;
		local.sin_port = htons(port);
		local.sin_addr.s_addr = INADDR_ANY;
		
		socklen_t len = sizeof(local);

		if (bind(sock, (struct sockaddr*)&local, len) < 0){
			std::cerr << "bind error" << std::endl;
			exit(3);
		}
	}
	//监听
	static void SocketListen(int sock, int backlog)
	{
		if (listen(sock, backlog) < 0){
			std::cerr << "listen error" << std::endl;
			exit(4);
		}
	}
};

运行epoll ET服务器

运行我们的epoll ET服务器的步骤如下:

  • 首先需要进行的就是套接字的创建、绑定和监听,因为是ET模式下的epoll服务器,因此监听套接字创建出来后需要将其设置为非阻塞。
  • 然后就可以实例化一个Reactor对象,并对其进行初始化,也就是创建epoll模型。
  • 紧接着需要为监听套接字定义一个EventItem结构,填充EventItem结构当中的各个字段,并将accepter回调设置为监听套接字的读回调方法。
  • 然后调用AddEvent函数将监听套接字及其需要关系的事件添加到Dispatcher当中,该过程包括将监听套接字注册到epoll模型中,以及建立监听套接字与其对应EventItem结构的映射。
  • 最后就可以循环调用Reactor类当中的Dispatcher函数进行事件派发了。

代码如下:

#include "app_interface.hpp"
#include "reactor.hpp"
#include "socket.hpp"
#include "util.hpp"
#include <string>

#define BACK_LOG 5

static void Usage(std::string proc)
{
	std::cout << "Usage: " << proc << " port" << std::endl;
}
int main(int argc, char* argv[])
{
	if (argc != 2){
		Usage(argv[0]);
		exit(1);
	}
	int port = atoi(argv[1]);

	//服务器监听套接字的创建、绑定和监听
	int listen_sock = Socket::SocketCreate();
	SetNonBlock(listen_sock); //将监听套接字设置为非阻塞
	Socket::SocketBind(listen_sock, port);
	Socket::SocketListen(listen_sock, BACK_LOG);
	
	//创建Reactor,并初始化
	Reactor R;
	R.InitReactor();

	//创建监听套接字对应的EventItem结构            
	EventItem item;
	item._sock = listen_sock;
	item._R = &R;
	item.ManageCallbacks(accepter, nullptr, nullptr); //监听套接字只需要关心读事件
	
	//将监听套接字托管给Dispatcher
	R.AddEvent(listen_sock, EPOLLIN | EPOLLET, item);
	
	//循环进行事件派发
	int timeout = 1000;
	while (true){
		R.Dispatcher(timeout);
	}
	return 0;
}

参考文献:

http://t.csdn.cn/pN4A9

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/76965.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

EVE-NG MPLS L2VPN BGP pw -- static route,static mpls lsp

1 拓扑 2 配置步骤 2.1 配置接口IP 和静态路由 PE1 interface LoopBack 0ip address 1.1.1.9 32 quitinterface GigabitEthernet 1/0ip address 10.1.1.1 255.255.255.0quitip route-static 2.2.2.9 32 10.1.1.2 ip route-static 3.3.3.9 32 10.1.1.2 P interface LoopBac…

代码审计-ASP.NET项目-未授权访问漏洞

代码审计必备知识点&#xff1a; 1、代码审计开始前准备&#xff1a; 环境搭建使用&#xff0c;工具插件安装使用&#xff0c;掌握各种漏洞原理及利用,代码开发类知识点。 2、代码审计前信息收集&#xff1a; 审计目标的程序名&#xff0c;版本&#xff0c;当前环境(系统,中间件…

2022年电赛C题——小车跟随行驶系统——做题记录以及经验分享

前言 自己打算将做过的电赛真题&#xff0c;主要包含控制组的&#xff0c;近几年出现的小车控制题目&#xff0c;自己做过的真题以及在准备电赛期间刷真题出现的问题以及经验分享给大家 这次带来的是22年电赛C题——小车跟随行驶系统&#xff0c;这道题目指定使用的是TI的单片…

leetcode做题笔记83删除排序链表中的重复元素

给定一个已排序的链表的头 head &#xff0c; 删除所有重复的元素&#xff0c;使每个元素只出现一次 。返回 已排序的链表 。 输入&#xff1a;head [1,1,2] 输出&#xff1a;[1,2] 思路一&#xff1a;模拟题意 struct ListNode* deleteDuplicates(struct ListNode* head){i…

如何使用Python编写小游戏?

大家好&#xff0c;我是沐尘而生&#xff0c;如果你是一个热爱编程的小伙伴&#xff0c;又想尝试游戏开发&#xff0c;那么这篇文章一定能满足你的好奇心。不废话&#xff0c;让我们马上进入Python游戏开发的精彩世界吧&#xff01; Python游戏开发的魅力 编写小游戏不仅仅是锻…

力扣:63. 不同路径 II(Python3)

题目&#xff1a; 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish”&#xff09;。 现在考虑网格中有障碍物。那么从…

基于深度学习的指针式仪表倾斜校正方法——论文解读

中文论文题目:基于深度学习的指针式仪表倾斜校正方法 英文论文题目&#xff1a;Tilt Correction Method of Pointer Meter Based on Deep Learning 周登科、杨颖、朱杰、王库.基于深度学习的指针式仪表倾斜校正方法[J].计算机辅助设计与图形学学报, 2020, 32(12):9.DOI:10.3724…

[自学记录06|*百人计划]Gamma矫正与线性工作流

一、前言 Gamma矫正其实也属于我前面落下的一块内容&#xff0c;打算把它补上&#xff0c;其它的没补是因为我之前写的GAMES101笔记里已经涵盖了&#xff0c;而Gamma矫正在101里面确实没提到&#xff0c;于是打算把它补上&#xff0c;这块内容并不难&#xff0c;但是想通透的理…

使用 wxPython和ECharts生成和保存HTML图表

使用wxPython和ECharts库来生成和保存HTML图表。wxPython是一个基于wxWidgets的Python GUI库&#xff0c;而ECharts是一个用于数据可视化的JavaScript库。 C:\pythoncode\blog\echartshow.py 参考网址&#xff1a;https://echarts.apache.org/v4/examples/zh/index.html 安装…

2023最新水果编曲软件FL Studio 21.1.0.3267音频工作站电脑参考配置单及系统配置要求

音乐在人们心中的地位日益增高&#xff0c;近几年音乐选秀的节目更是层出不穷&#xff0c;喜爱音乐&#xff0c;创作音乐的朋友们也是越来越多&#xff0c;音乐的类型有很多&#xff0c;好比古典&#xff0c;流行&#xff0c;摇滚等等。对新手友好程度基本上在首位&#xff0c;…

区间预测 | MATLAB实现QRLSTM长短期记忆神经网络分位数回归时间序列区间预测

区间预测 | MATLAB实现QRLSTM长短期记忆神经网络分位数回归时间序列区间预测 目录 区间预测 | MATLAB实现QRLSTM长短期记忆神经网络分位数回归时间序列区间预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍 MATLAB实现QRLSTM长短期记忆神经网络分位数回归时间序…

解决内网GitLab 社区版 15.11.13项目拉取失败

问题描述 GitLab 社区版 发布不久&#xff0c;搭建在内网拉取项目报错&#xff0c;可能提示 unable to access https://github.comxxxxxxxxxxx: Failed to connect to xxxxxxxxxxxxxGit clone error - Invalid argument error:14077438:SSL routines:SSL23_GET_S 15.11.13ht…

ATF(TF-A)安全通告 TFV-5 (CVE-2017-15031)

安全之安全(security)博客目录导读 ATF(TF-A)安全通告汇总 目录 一、ATF(TF-A)安全通告 TFV-5 (CVE-2017-15031) 二、CVE-2017-15031 一、ATF(TF-A)安全通告 TFV-5 (CVE-2017-15031) Title 未初始化或保存/恢复PMCR_EL0可能会泄露安全世界的时间信息 CVE ID CVE-2017-1503…

支持https访问

文章目录 1. 打开自己的云服务器的 80 和 443 端口2. 安装 nginx3. 安装 snapd4. 安装 certbot5. 生成证书6. 拷贝生成的证书到项目工作目录7. 修改 main.go 程序如下8. 编译程序9. 启动程序10. 使用 https 和端口 8081 访问页面成功11. 下面修改程序&#xff0c;支持 https 和…

Rust软件外包开发语言的特点

Rust 是一种系统级编程语言&#xff0c;强调性能、安全性和并发性的编程语言&#xff0c;适用于广泛的应用领域&#xff0c;特别是那些需要高度可靠性和高性能的场景。下面和大家分享 Rust 语言的一些主要特点以及适用的场合&#xff0c;希望对大家有所帮助。北京木奇移动技术有…

分布式 - 消息队列Kafka:Kafka 消费者的消费位移

文章目录 01. Kafka 分区位移02. Kafka 消费位移03. kafka 消费位移的作用04. Kafka 消费位移的提交05. kafka 消费位移的存储位置06. Kafka 消费位移与消费者提交的位移07. kafka 消费位移的提交时机08. Kafka 维护消费状态跟踪的方法 01. Kafka 分区位移 对于Kafka中的分区而…

Linux系统安装Google Chrome

1.进入谷歌浏览器官网 Google Chrome - Download the Fast, Secure Browser from GoogleGet more done with the new Google Chrome. A more simple, secure, and faster web browser than ever, with Google’s smarts built-in. Download now.http://www.google.cn/intl/en_…

rust入门系列之Rust介绍及开发环境搭建

Rust教程 Rust基本介绍 网站: https://www.rust-lang.org/ rust是什么 开发rust语言的初衷是&#xff1a; 在软件发展速度跟不上硬件发展速度&#xff0c;无法在语言层面充分的利用硬件多核cpu不断提升的性能和 在系统界别软件开发上&#xff0c;C出生比较早&#xff0c;内…

零售行业供应链管理核心KPI指标(三)

完美订单满足率和退货率 完美订单满足率有三个方面的因素影响&#xff1a;订单按时、足量、无损交货。通常情况下零售企业追求线上订单履行周期慢慢达到行业平均水平&#xff0c;就是交付的速度变快了&#xff0c;这个肯定是一件好事情&#xff0c;趋势越来越好。 同时&#…

周期 角频率 频率 振幅 初相角

周期 角频率 频率 振幅 初相角 当我们谈论傅里叶级数或波形分析时&#xff0c;以下术语经常出现&#xff1a; 周期 T T T: 函数在其图形上重复的时间或空间的长度。周期的倒数是频率。 频率 f f f: 周期的倒数&#xff0c;即一秒内波形重复的次数。单位通常为赫兹&#xff…
最新文章