EventLoop整合与TimerWheel联合调试(整合二)

目录

概要

tcp_cli.cc

tcp_srv.cc

server.hpp

测试结果


第二次整合

概要

本主要是将以下模块进行整合测试

Poller模块与Channel模块整合-CSDN博客

时间轮设计-CSDN博客

timerfd的认识与基本使用-CSDN博客

整合基于的理念

tcp_cli.cc

#include "../source/server.hpp"

int main()
{
    Socket cli_sock;
    cli_sock.CreateClient(8500, "127.0.0.1");
    for(int i = 0; i < 5; i++)
    {
        std::string str = "hello qingfengyuge!";
        cli_sock.Send(str.c_str(), str.size());
        char buf[1024] = {0};
        cli_sock.Recv(buf, 1023);
        DBG_LOG("%s", buf);
        sleep(1);
    }
    while (1) sleep(1);
    return 0;
}

tcp_srv.cc

#include "../source/server.hpp"

void HandleClose(Channel *channel)
{
    DBG_LOG("close fd:%d", channel->Fd());
    channel->Remove(); // 移除监控
    delete channel;
}
void HandleRead(Channel *channel)
{
    int fd = channel->Fd();
    char buf[1024] = {0};
    int ret = recv(fd, buf, 1023, 0);
    if (ret <= 0)
    {
        return HandleClose(channel); // 关闭释放
    }
    DBG_LOG("%s", buf);
    channel->EnableWrite(); // 启动可写事件
}
void HandleWrite(Channel *channel)
{
    int fd = channel->Fd();
    const char *data = "天气还不错!!";
    int ret = send(fd, data, strlen(data), 0);
    if (ret < 0)
    {
        return HandleClose(channel); // 关闭释放
    }
    channel->DisableWrite(); // 关闭写监控
}
void HandleError(Channel *channel)
{
    return HandleClose(channel); // 关闭释放
}
void HandleEvent(EventLoop *loop, Channel *channel, uint64_t timerid)
{
    loop->TimerRefresh(timerid);
}
void Acceptor(EventLoop *loop, Channel *lst_channel)
{
    int fd = lst_channel->Fd();
    int newfd = accept(fd, NULL, NULL);
    if (newfd < 0)
    {
        return;
    }

    uint64_t timerid = rand() % 10000;
    Channel *channel = new Channel(loop, newfd);
    channel->SetReadCallback(std::bind(HandleRead, channel));                  // 为通信套接字设置可读事件的回调函数
    channel->SetWriteCallback(std::bind(HandleWrite, channel));                // 可写事件的回调函数
    channel->SetCloseCallback(std::bind(HandleClose, channel));                // 关闭事件的回调函数
    channel->SetErrorCallback(std::bind(HandleError, channel));                // 错误事件的回调函数
    channel->SetEventCallback(std::bind(HandleEvent, loop, channel, timerid)); // 任意事件的回调函数
    channel->EnableRead();
    // 活跃连接的超时释放操作,10s后关闭连接
    // 主要定时销毁任务,必须在启动读事件之前,因为有可能启动了事件监控后,就立即有了事件,但是这时候还没有任务
    loop->TimerAdd(timerid, 10, std::bind(HandleClose, channel));
}

int main()
{
    srand(time(NULL));
    EventLoop loop;
    Socket lst_sock;
    bool ret = lst_sock.CreateServer(8500);
    // 为监听套接字,创建一个Channel进行事件的管理,以及事件的处理
    Channel channel(&loop, lst_sock.Fd());
    // 回调中,获取新连接,为新连接创建Channel并且添加监控
    channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));
    channel.EnableRead(); // 启动可读事件监控
    while (1)
    {
        loop.Start();
    }
    lst_sock.Close();
    return 0;
}

 

server.hpp

#include <iostream>
#include <vector>
#include <cstdint>
#include <cassert>
#include <string>
#include <cstring>
#include <ctime>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <functional>
#include <sys/epoll.h>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <sys/eventfd.h>
#include <memory>
#include <sys/timerfd.h>

#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG

#define LOG(level, format, ...)                                                             \
    do                                                                                      \
    {                                                                                       \
        if (level < LOG_LEVEL)                                                              \
            break;                                                                          \
        time_t t = time(NULL);                                                              \
        struct tm *ltm = localtime(&t);                                                     \
        char tmp[32] = {0};                                                                 \
        strftime(tmp, 31, "%H:%M:%S", ltm);                                                 \
        fprintf(stdout, "[%s %s:%d] " format "\n", tmp, __FILE__, __LINE__, ##__VA_ARGS__); \
    } while (0)

#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)

// 缓冲区类
#define BUFFER_DEFAULT_SIZE 1024 // Buffer 默认起始大小
class Buffer
{
private:
    std::vector<char> _buffer; // 使用vector进行内存空间管理
    uint64_t _reader_idx;      // 读偏移
    uint64_t _writer_idx;      // 写偏移
public:
    Buffer() : _reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}
    char *Begin() { return &*_buffer.begin(); }
    // 获取当前写入起始地址
    char *WirtePosition() { return Begin() + _writer_idx; }
    // 获取当前读取起始地址
    char *ReadPosition() { return Begin() + _reader_idx; }
    // 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移
    uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }
    // 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间
    uint64_t HeadIdleSize() { return _reader_idx; }
    // 获取可读数据大小 = 写偏移 - 读偏移
    uint16_t ReadAbleSize() { return _writer_idx - _reader_idx; };
    // 将读偏移向后移动
    void MoveReadOffset(uint64_t len)
    {
        if (len == 0)
            return;
        // 向后移动的大小, 必须小于可读数据大小
        assert(len <= ReadAbleSize());
        _reader_idx += len;
    }
    // 将写偏移向后移动
    void MoveWriteOffset(uint64_t len)
    {
        // 向后移动的大小,必须小于当前后边的空闲空间大小
        assert(len <= TailIdleSize());
        _writer_idx += len;
    }
    // 确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容)
    void EnsureWriteSpace(uint64_t len)
    {
        // 如果末尾空闲空间大小足够,直接返回
        if (TailIdleSize() >= len)
        {
            return;
        }
        // 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够,够了就将数据移动到起始位置
        if (len <= TailIdleSize() + HeadIdleSize())
        {
            // 将数据移动到起始位置
            uint64_t rsz = ReadAbleSize();                            // 把当前数据大小先保存起来
            std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); // 把可读数据拷贝到起始位置
            _reader_idx = 0;                                          // 将读偏移归0
            _writer_idx = rsz;                                        // 将写位置置为可读数据大小, 因为当前的可读数据大小就是写偏移量
        }
        else
        {
            // 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可
            _buffer.resize(_writer_idx + len);
        }
    }
    // 写入数据
    void Write(const void *data, uint64_t len)
    {
        // 1.保证有足够空间, 2.拷贝数据进去
        EnsureWriteSpace(len);
        const char *d = (const char *)data;
        std::copy(d, d + len, WirtePosition());
    }
    void WirteAndPush(const void *data, uint64_t len)
    {
        Write(data, len);
        MoveWriteOffset(len);
    }
    void WriteString(const std::string &data)
    {
        return Write(data.c_str(), data.size());
    }
    void WriteStringAndPush(const std::string &data)
    {
        WriteString(data);
        MoveWriteOffset(data.size());
    }
    void WriteBuffer(Buffer &data)
    {
        return Write(data.ReadPosition(), data.ReadAbleSize());
    }
    void WirteBufferAndPush(Buffer &data)
    {
        WriteBuffer(data);
        MoveWriteOffset(data.ReadAbleSize());
    }
    // 读取数据
    void Read(void *buf, uint64_t len)
    {
        // 要求获取的数据大小必须小于可读数据大小
        assert(len <= ReadAbleSize());
        std::copy(ReadPosition(), ReadPosition() + len, (char *)buf);
    }
    void ReadAndPop(void *buf, uint64_t len)
    {
        Read(buf, len);
        MoveReadOffset(len);
    }
    std::string ReadAsString(uint64_t len)
    {
        // 要求获取的数据大小必须小于可读数据大小
        assert(len <= ReadAbleSize());
        std::string str;
        str.resize(len);
        Read(&str[0], len); // 这里不直接用str.c_str()的原因是,这个的返回值是const类型
        return str;
    }
    std::string ReadAsStringAndPop(uint64_t len)
    {
        assert(len <= ReadAbleSize());
        std::string str = ReadAsString(len);
        MoveReadOffset(len);
        return str;
    }
    char *FindCRLF()
    {
        char *res = (char *)memchr(ReadPosition(), '\n', ReadAbleSize());
        return res;
    }
    // 这种情况针对的是,通常获取一行数据
    std::string GetLine()
    {
        char *pos = FindCRLF();
        if (pos == NULL)
            return "";
        // +1 是为了把换行字符也取出来
        return ReadAsString(pos - ReadPosition() + 1);
    }
    std::string GetLineAndPop()
    {
        std::string str = GetLine();
        MoveReadOffset(str.size());
        return str;
    }

    // 清空缓冲区
    void Clear()
    {
        // 只需要将偏移量归0即可
        _reader_idx = 0;
        _writer_idx = 0;
    }
};

// 套接字类
#define MAX_LISTEN 1024
class Socket
{
private:
    int _sockfd;

public:
    Socket() : _sockfd(-1) {}
    Socket(int fd) : _sockfd(fd) {}
    ~Socket() { Close(); };
    int Fd() { return _sockfd; }
    // 创建套接字
    bool Create()
    {
        // int socket(int domain, int type, int protocol)
        _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (_sockfd < 0)
        {
            ERR_LOG("CREATE SOCKET FAILED!");
            return false;
        }
        return true;
    }
    // 绑定地址信息
    bool Bind(const std::string &ip, uint64_t port)
    {
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        socklen_t len = sizeof(struct sockaddr_in);
        // int bind(int sockfd, struct sockaddr* addr, socklen_t len)
        int ret = bind(_sockfd, (struct sockaddr *)&addr, len);
        if (ret < 0)
        {
            ERR_LOG("BIND ADDRESS FAILED!");
            return false;
        }
        return true;
    }
    // 开始监听
    bool Listen(int backlog = MAX_LISTEN)
    {
        // int listen(int backlog)
        int ret = listen(_sockfd, backlog);
        if (ret < 0)
        {
            ERR_LOG("SOCKET LISTEN FAILED!");
            return false;
        }
        return true;
    }
    // 向服务器发起连接
    bool Connect(const std::string &ip, uint16_t port)
    {
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        socklen_t len = sizeof(struct sockaddr_in);
        // int connect(int sockfd, struct sockaddr* addr, socklen_t len)
        int ret = connect(_sockfd, (struct sockaddr *)&addr, len);
        if (ret < 0)
        {
            ERR_LOG("CONNECT SERVER FAILED!");
            return false;
        }
        return true;
    }
    // 获取新连接
    int Accept()
    {
        // int accept(int sockfd, struct sockaddr *addr, socklen_t *len);
        int newfd = accept(_sockfd, NULL, NULL);
        if (newfd < 0)
        {
            ERR_LOG("SOCKET ACCEPT FAILED!");
            return -1;
        }
        return newfd;
    }
    // 接收数据
    ssize_t Recv(void *buf, size_t len, int flag = 0) // 0 阻塞
    {
        // ssize_t recv(int sockfd, void *buf, size_t len, int flag)
        ssize_t ret = recv(_sockfd, buf, len, flag);
        if (ret <= 0)
        {
            // EAGAIN 当前的接收缓冲区中没用数据了,在非阻塞的情况下才有这个错误
            // EINTR 表示当前socket的阻塞等待,被信号打断了
            if (errno == EAGAIN || errno == EINTR)
            {
                return 0; // 表示这次没用接收到数据
            }
            ERR_LOG("SOCKET RECV FAILED!");
            return -1;
        }
        return ret; // 实际接收的数据长度
    }
    ssize_t NonBlockRecv(void *buf, size_t len)
    {
        return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
    }
    // 发送数据
    ssize_t Send(const void *buf, size_t len, int flag = 0)
    {
        // ssize_t send(int sockfd, void *data, size_t len, int flag)
        ssize_t ret = send(_sockfd, buf, len, flag);
        if (ret < 0)
        {
            ERR_LOG("SOCKET SEND FAILED!");
            return -1;
        }
        return ret; // 实际发送的数据长度
    }
    ssize_t NonBlockSend(void *buf, size_t len)
    {
        return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
    }
    // 关闭套接字
    void Close()
    {
        if (_sockfd != -1)
        {
            close(_sockfd);
            _sockfd = -1;
        }
    }
    // 创建一个服务器连接
    bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) // 接收全部
    {
        // 1.创建套接字 2.绑定地址 3.开始监听 4.设置非阻塞 5.启动地址重用
        if (Create() == false)
            return false;
        if (block_flag) // 默认阻塞
            NonBlock();
        if (Bind(ip, port) == false)
            return false;
        if (Listen() == false)
            return false;
        ReuseAddress();
        return true;
    }
    // 创建一个客户端连接
    bool CreateClient(uint16_t port, const std::string &ip)
    {
        // 1.创建套接字 2.指向连接服务器
        if (Create() == false)
            return false;
        if (Connect(ip, port) == false)
            return false;
        return true;
    }
    // 设置套接字选项 -- 开启地址端口重用
    void ReuseAddress()
    {
        // int setsockopt(int fd, int level, int optname, void *val, int vallen)
        int val = 1;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int)); // 地址
        val = 1;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int)); // 端口号
    }
    // 设置套接字阻塞属性 -- 设置为非阻塞
    void NonBlock()
    {
        // int fcntl(int fd, int cmd, .../*arg*/)
        int flag = fcntl(_sockfd, F_GETFL, 0);
        fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
    }
};

class Poller; // 整合测试1:声明
class EventLoop;
// Channel类
class Channel
{
private:
    int _fd;
    EventLoop *_loop;
    uint32_t _events;  // 当前需要监控的事件
    uint32_t _revents; // 当前连接触发的事件
    using EventCallback = std::function<void()>;
    EventCallback _read_callback;  // 可读事件被触发的回调函数
    EventCallback _write_callback; // 可写事件被触发的回调函数
    EventCallback _error_callback; // 错误事件被触发的回调函数
    EventCallback _close_callback; // 连接断开事件被触发的回调函数
    EventCallback _event_callback; // 任意事件被触发的回调函数
public:
    Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {}
    int Fd() { return _fd; }
    uint32_t Events() { return _events; } // 获取想要监控的事件
    void SetREvents(uint32_t events) { _revents = events; }
    void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } // 设置实际就绪的事件
    void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
    void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
    void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
    void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }
    // 当前是否监控了可读
    bool ReadAble() { return (_events & EPOLLIN); }
    // 当前是否监控了可写
    bool WriteAble() { return (_events & EPOLLOUT); }
    // 启动读事件监控
    void EnableRead()
    {
        _events |= EPOLLIN;
        Update();
    }
    // 启动写事件监控
    void EnableWrite()
    {
        _events |= EPOLLOUT;
        Update();
    }
    // 关闭读事件监控
    void DisableRead()
    {
        _events &= ~EPOLLIN;
        Update();
    }
    // 关闭写事件监控
    void DisableWrite()
    {
        _events &= ~EPOLLOUT;
        Update();
    }
    // 关闭所有事件监控
    void DisableAll()
    {
        _events = 0;
        Update();
    }
    // 移除监控
    void Remove(); // 声明和实现要分离,因为实现的时候是不知道里面有什么函数成员的
    void Update(); // 这两个特殊,所以把实现放在Poller类的下面进行实现
    // 事件处理,一旦触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定
    void HandleEvent()
    {
        // 第二参数,对方关闭连接,第三参数,带外数据
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            if (_event_callback) // 不管任何事件,都调用的回调函数
                _event_callback();
            if (_read_callback)
                _read_callback();
        }
        /*有可能会释放连接的操作事件,一次只处理一个*/
        if (_revents & EPOLLOUT)
        {
            if (_event_callback)
                _event_callback(); // 放到事件处理完毕后调用,刷新活跃度
            if (_write_callback)
                _write_callback();
        }
        else if (_revents & EPOLLERR)
        {
            if (_event_callback)
                _event_callback();
            if (_error_callback)
                _error_callback();
        }
        else if (_revents & EPOLLHUP)
        {
            if (_event_callback)
                _event_callback();
            if (_close_callback)
                _close_callback();
        }
    }
};

// Poller描述符监控类
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
    int _epfd;
    struct epoll_event _evs[MAX_EPOLLEVENTS];
    std::unordered_map<int, Channel *> _channels;

private:
    // 对epoll的直接操作
    void Update(Channel *channel, int op)
    {
        // int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev)
        int fd = channel->Fd();
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = channel->Events();
        int ret = epoll_ctl(_epfd, op, fd, &ev);
        if (ret < 0)
        {
            ERR_LOG("EPOLLCTL FAILED!");
        }
        return;
    }
    // 判断一个Channel 是否已经添加了事件监控
    bool HasChannel(Channel *channel)
    {
        auto it = _channels.find(channel->Fd());
        if (it == _channels.end())
        {
            return false;
        }
        return true;
    }

public:
    Poller()
    {
        _epfd = epoll_create(MAX_EPOLLEVENTS); // 这个值大于0就行了,无用处
        if (_epfd < 0)
        {
            ERR_LOG("EPOLL CREATE FAILED!");
            abort(); // 退出程序
        }
    }
    // 添加或修改监控事件
    void UpdateEvent(Channel *channel)
    {
        bool ret = HasChannel(channel);
        if (ret == false)
        {
            // 不存在则添加
            _channels.insert(std::make_pair(channel->Fd(), channel));
            return Update(channel, EPOLL_CTL_ADD);
        }
        return Update(channel, EPOLL_CTL_MOD);
    }
    // 移除监控
    void RemoveEvent(Channel *channel)
    {
        auto it = _channels.find(channel->Fd());
        if (it != _channels.end())
        {
            _channels.erase(it);
        }
        Update(channel, EPOLL_CTL_DEL);
    }
    // 开始监控, 返回活跃连接
    void Poll(std::vector<Channel *> *active)
    {
        // int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);
        int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); // -1阻塞监控
        if (nfds < 0)
        {
            if (errno == EINTR) // 信号打断
            {
                return;
            }
            ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));
            abort();
        }
        for (int i = 0; i < nfds; i++) // 添加活跃信息
        {
            auto it = _channels.find(_evs[i].data.fd); // 没找到就说明不在我们的管理之下,这是不正常的
            assert(it != _channels.end());
            it->second->SetREvents(_evs[i].events); // 设置实际就绪的事件
            active->push_back(it->second);
        }
        return;
    }
};

// timerwheel时间轮定时器类
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
    uint64_t _id;         // 定时器任务对象
    uint32_t _timeout;    // 定时任务的超时时间
    bool _canceled;       // false-表示没有被取消,true-表示被取消
    TaskFunc _task_cb;    // 定时器要执行的定时任务
    ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
public:
    TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) : _id(id), _timeout(delay), _task_cb(cb), _canceled(false) {}
    ~TimerTask()
    {
        if (_canceled == false)
            _task_cb();
        _release();
    }
    void Cancel() { _canceled = true; }
    void SetRelease(const ReleaseFunc &cb) { _release = cb; }
    uint32_t DelayTime() { return _timeout; } // 返回时间
};

class TimerWheel
{
private:
    using WeakTask = std::weak_ptr<TimerTask>;
    using PtrTask = std::shared_ptr<TimerTask>;
    int _tick;     // 当前的的秒针,走到哪里哪里就释放执行
    int _capacity; // 表盘最大数量 -- 其实就是最大延迟时间
    std::vector<std::vector<PtrTask>> _wheel;
    // 用weak_ptr来构造出新的shared_ptr用来计数,不过后续要记得释放
    std::unordered_map<uint64_t, WeakTask> _timers;

    EventLoop *_loop;
    int _timerfd; // 定时器描述符 -- 可读事件回调就是读取计数器,执行定时任务
    std::unique_ptr<Channel> _timer_channel;

private:
    void RemoveTimer(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it != _timers.end())
        {
            _timers.erase(it);
        }
    }
    static int CreateTimerfd()
    {
        // int timerfd_create(int clockid, int flags);
        int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
        if (timerfd < 0)
        {
            ERR_LOG("TIMERFD CREATE FAILED!");
            abort();
        }
        // int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec);
        struct itimerspec itime;
        itime.it_value.tv_sec = 1;                 // 设置 秒钟
        itime.it_value.tv_nsec = 0;                // 设置 纳秒 第一次超时时间为1s后
        itime.it_interval.tv_sec = 1;              // 同上
        itime.it_interval.tv_nsec = 0;             // 第一次超时后,每隔超时的间隔时
        timerfd_settime(timerfd, 0, &itime, NULL); // 0代表阻塞式
        return timerfd;
    }
    void ReadTimefd()
    {
        uint64_t times;
        int ret = read(_timerfd, &times, 8);
        if (ret < 0)
        {
            perror("READ TIMERFD FAILED!");
            abort();
        }
        return;
    }
    // 这个函数应该每秒钟被执行一次,相当于秒钟向后走了一步
    void RunTimerTask()
    {
        _tick = (_tick + 1) % _capacity;
        _wheel[_tick].clear(); // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉.从而执行函数
    }
    void OnTime()
    {
        ReadTimefd();
        RunTimerTask();
    }
    void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) // 添加定时任务
    {
        PtrTask pt(new TimerTask(id, delay, cb));                      // 实例化定时任务对象
        pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); // 第0个位置是隐藏的this指针。再把任务id绑定进去
        int pos = (_tick + delay) % _capacity;
        _wheel[pos].push_back(pt);
        _timers[id] = WeakTask(pt);
    }
    // 刷新/延迟定时任务
    void TimerRefreshInLoop(uint64_t id)
    {
        // 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来, 添加到轮子中
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return; // 没找到定时任务, 没法刷新,没法延迟
        }
        PtrTask pt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr
        int delay = pt->DelayTime();    // 获取到了初始的延迟时间
        int pos = (_tick + delay) % _capacity;
        _wheel[pos].push_back(pt);
    }
    void TimerCancelInLoop(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return; // 没找到定时任务, 没法刷新,没法延迟
        }
        PtrTask pt = it->second.lock(); // 当还没有过期才进行取消
        if (pt)
            pt->Cancel();
    }

public:
    TimerWheel(EventLoop *loop) : _capacity(60), _tick(0), _wheel(_capacity), _loop(loop),
                                  _timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd))
    {
        _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
        _timer_channel->EnableRead(); // 启动读事件监控
    }
    /*定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题*/
    /*如果不想加锁,那就把对定期的所有操作,都放在一个线程中进行*/
    void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);
    // 刷新/延迟定时任务
    void TimerRefresh(uint64_t id);
    void TimerCancel(uint64_t id);
    /*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,对应的EventLoop线程内执行*/
    bool HasTimer(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return false; // 没找到定时任务, 没法刷新,没法延迟
        }
        return true;
    }
};

// EventLoop事件监控处理类
class EventLoop
{
private:
    using Functor = std::function<void()>;
    std::thread::id _thread_id;              // 线程ID
    int _event_fd;                           // eventfd唤醒IO事件监控有可能导致的阻塞
    std::unique_ptr<Channel> _event_channel; // 智能指针
    Poller _poller;                          // 进行所有描述符的事件监控
    std::vector<Functor> _tasks;             // 任务池
    std::mutex _mutex;                       // 实现任务池操作的线程安全
    TimerWheel _timer_wheel;                 // 定时器模块
public:
    //  执行任务池中的所有任务
    void RunAllTask()
    {
        std::vector<Functor> functor;
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.swap(functor);
        }
        for (auto &f : functor)
        {
            f();
        }
        return;
    }
    static int CreateEventFd()
    {
        int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
        if (efd < 0)
        {
            ERR_LOG("CREATE EVENTFD FAILED!!");
            abort(); // 让程序异常退出
        }
        return efd;
    }
    void ReadEventfd()
    {
        uint64_t res = 0;
        int ret = read(_event_fd, &res, sizeof(res));
        if (ret < 0)
        {
            // EINTR -- 被信号打断, EAGAIN -- 表示无数据可读
            if (errno == EINTR || EAGAIN)
            {
                return;
            }
            ERR_LOG("READ EVENTFD FAILED!");
            abort();
        }
        return;
    }
    void WeakUpEventFd()
    {
        uint64_t val = 1;
        int ret = write(_event_fd, &val, sizeof(val));
        if (ret < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            ERR_LOG("READ EVENTFD FAILED!");
            abort();
        }
        return;
    }

public:
    EventLoop() : _thread_id(std::this_thread::get_id()),
                  _event_fd(CreateEventFd()),
                  _event_channel(new Channel(this, _event_fd)),
                  _timer_wheel(this)
    {
        // 给eventfd添加可读事件回调函数,读取eventfd事件通知次数
        _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
        // 启动eventfd的读事件监控
        _event_channel->EnableRead();
    }
    // 三步走--事件监控-》就绪事件处理-》执行任务
    void Start()
    {
        // 1.事件监控
        std::vector<Channel *> actives;
        _poller.Poll(&actives);
        // 2.事件处理
        for (auto &channel : actives)
        {
            channel->HandleEvent();
        }
        // 3.执行任务
        RunAllTask();
    }
    // 用于判断当前线程是否是EventLoop对应的线程
    bool IsInLoop()
    {
        return (_thread_id == std::this_thread::get_id());
    }
    // 判断将要执行的任务是否处于当前线程中,如果是则执行,否则压入队列
    void RunInLoop(const Functor &cb)
    {
        if (IsInLoop())
        {
            return cb();
        }
        return QueueInLoop(cb);
    }
    // 将操作压入任务池
    void QueueInLoop(const Functor &cb)
    {
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.push_back(cb);
        }
        // 唤醒有可能因为没有事件就绪,而导致的epoll阻塞
        // 其实就是给eventfd写入一个数据,eventfd就会触发可读事件
        WeakUpEventFd();
    }
    // 添加/修改描述符的事件监控
    void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
    // 移除描述符的监控
    void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }
    void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }
    void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }
    void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }
    bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};

// 移除监控
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
    void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
    {
        _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
    }
    void TimerWheel::TimerRefresh(uint64_t id)
    {
        _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
    }
    void TimerWheel::TimerCancel(uint64_t id)
    {
        _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
    }

测试结果

符合预期

后记:贴必须得贴上去,代码多拷贝就要多贴几份,不管不管

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/399158.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ThreadLocal内存泄漏?

ThreadLocal内存泄漏&#xff1f; 在分析ThreadLocal导致的内存泄露前&#xff0c;需要普及了解一下内存泄露、强引用与弱引用以及GC回收机制&#xff0c;这样才能更好的分析为什么ThreadLocal会导致内存泄露呢&#xff1f;更重要的是知道该如何避免这样情况发生&#xff0c;增…

Facebook与数字创新:引领社交媒体的数字化革命

在当今数字化时代&#xff0c;社交媒体已经成为了人们日常生活中不可或缺的一部分。而在众多社交媒体平台中&#xff0c;Facebook作为领头羊&#xff0c;一直致力于推动数字创新&#xff0c;引领着社交媒体的数字化革命。本文将探讨Facebook在数字创新方面的表现&#xff0c;以…

第100讲:MHA+Atlas实现MySQL主从复制读写分离分布式集群

文章目录 1.Atlas读写分离简介2.搭建MHA高可用MySQL主从复制集群3.部署配置Atlas读写分离中间件3.1.安装Atlas读写分离中间件3.2.配置读写分离3.3.启动Atlas读写分离 4.读写分离集群测试5.生产环境中创建一个用户通过Atlas使用6.Atlas通过管理接口实现在线管理7.Atlas自动分表 …

ETW Bypass

1.NET 程序集 像 Java 是由 JVM 托管的&#xff0c;.NET 程序集(比如C_Sharp.exe) 都是由 CLR 托管的 硬盘加载 从硬盘中读取加载到内存 通过三个接口可以启动 CLR 来对 .NET 程序集 进行硬盘加载 Program.cs&#xff1a; x64.cpp&#xff1a; Copy #include <iostream…

【已解决】PPT无法复制内容怎么办?

想要复制PPT文件里的内容&#xff0c;却发现复制不了&#xff0c;怎么办&#xff1f; 这种情况&#xff0c;一般是PPT文件被设置了以“只读方式”打开&#xff0c;“只读方式”下的PPT无法进行编辑更改&#xff0c;也无法进行复制粘贴的操作。 想要解决这个问题&#xff0c;我…

c语言---函数指针变量

目录 一、函数指针变量的概念二、函数指针变量的创建三、函数指针变量的使用 一、函数指针变量的概念 函数指针变量应该是用来存放函数地址的&#xff0c;未来通过地址能够调用函数的。 二、函数指针变量的创建 那么函数是否有地址呢&#xff1f; 接下来看看&#xff1a; #…

imazing软件安全吗?2024中文永久免费许可证

以下是iMazing更多的使用场景描述&#xff1a; iMazing3Mac-最新绿色安装包下载如下&#xff1a; https://wm.makeding.com/iclk/?zoneid49816 iMazing3Win-最新绿色安装包下载如下&#xff1a; https://wm.makeding.com/iclk/?zoneid49817 1. 数据迁移 当你换新的iOS设…

Outlook 中关闭特殊字符显示

故障现象&#xff1a; 不知道什么原因&#xff0c;Outlook发mail时候&#xff0c;突然显示如下的特殊字符&#xff0c;看起来非常不方便&#xff0c;需要关闭; 解决办法&#xff1a; 在Outlook正文里面&#xff0c;输入CtrlShift*&#xff08; 其中*是键盘数字8旁边的那个*&a…

KEIL软件新建工程示例

很感谢这位博主的分享&#xff0c;参考链接&#xff1a;KEIL软件安装教程 在实践的过程中&#xff0c;会有些不一样&#xff0c;把我遇到的问题也贴出来&#xff0c;谨做记录与参考。 一 在环境中配置相关文件&#xff08;第一种&#xff09; 1、新建一个文件夹demo&#xff…

IDEA的版本控制Local Changes和settings按钮显示问题

经常用idea的小伙伴应该对标题的这两个功能不陌生&#xff0c;特别是Local Changes 周日刚开工&#xff0c;我的idea就过期了&#xff0c;索性就下载了一个2023.3.3版本的&#xff0c;安装好打开一看&#xff0c;发现Local Changes 和 settings的按钮消失了&#xff0c;虽然说…

我的NPI项目之Android USB 系列(一) - 遥望和USB的相识

和USB应该是老朋友了&#xff0c;从2011年接触Android开发开始&#xff0c;就天天和USB打交道了。那时候还有不 对称扁头的usb/方口的usb&#xff0c;直到如今使用广泛的防反插USB3.0 type-C。 但是&#xff0c;一直有一个不是很清楚的问题萦绕在心头&#xff0c;那就是。先有…

韶音、南卡、Cleer怎么样?2024年主流产品硬核测评对比

​作为一位资深的开放式耳机使用者&#xff0c;我要强调&#xff0c;在选购耳机时应避免仅仅因为某款产品的流行度或是品牌名气而做出选择。适合自己需求的耳机才是最佳之选。我和我的团队经过多次测试&#xff0c;对比了几种市场上热门的开放式耳机&#xff0c;从中总结了我们…

【Algorithms 4】算法(第4版)学习笔记 09 - 3.2 二叉查找树

文章目录 前言参考目录学习笔记1&#xff1a;二叉树与二叉搜索树定义1.1&#xff1a;二叉树定义1.2&#xff1a;二叉搜索树定义1.3&#xff1a;Java定义1.4&#xff1a;BST基本实现1.5&#xff1a;BST demo 演示1.5.1&#xff1a;节点搜索成功命中演示1.5.2&#xff1a;节点搜索…

ElementUI table表格组件实现双击编辑单元格失去焦点还原,支持多单元格

在使用ElementUI table表格组件时有时需要双击单元格显示编辑状态&#xff0c;失去焦点时还原表格显示。 实现思路&#xff1a; 在数据中增加isFocus:false.控制是否显示在table中用cell-dblclick双击方法 先看效果&#xff1a; 上源码&#xff1a;在表格模板中用scope.row…

Nginx缓存相关配置解析

文章目录 前言配置示例proxy_cacheproxy_cache_pathproxy_cache_keyproxy_cache_validproxy_cache_lockproxy_cache_methodsproxy_cache_bypassproxy_no_cacheproxy_cache_min_usesadd_header 可选项 使用示例通过响应头判断是否走缓存 缓存手动删除原博客 前言 客户端需要访问…

10、内网安全-横向移动域控提权NetLogonADCSPACKDC永恒之蓝

用途&#xff1a;个人学习笔记&#xff0c;有所借鉴&#xff0c;欢迎指正&#xff01; 背景&#xff1a; 主要针对内网主机中的 域控提权漏洞&#xff0c;包含漏洞探针和漏洞复现利用。 1、横向移动-系统漏洞-CVE-2017-0146&#xff08;ms17-010&#xff0c;永恒之蓝&#xff0…

如何添加或编辑自定义WordPress侧边栏

WordPress侧边栏是许多WordPress网站上的固定装置。它为您的内容提供了一个垂直空间&#xff0c;您可以在其中帮助读者导航、增加电子邮件列表或社交关注、展示广告等。 因为它是许多WordPress网站不可或缺的一部分&#xff0c;所以我们认为侧边栏值得拥有自己的大型指南。在这…

SpringBootWeb学习笔记——12万字详细总结!

0. 写在前面 注:这套笔记是根据黑马程序员B站2023-3-21的视频学习的成果,其中省略了前端基础部分、Maven部分和数据库基础部分,详情可见目录。 注注:目前文章内结尾处多幅图片加载不出来,因为图片还存在本地没被传上来,过段时间再改~ 所有的Spring项目都基于Spring Fra…

3、windows环境下vscode开发c/c++环境配置(二)

前言&#xff1a;上一篇文章写了windows环境下&#xff0c;配置vscode的c/c开发环境&#xff0c;这一篇讲vscode开发c/c的配置文件&#xff0c;包括c_cpp_propertues.json&#xff0c;task.json及launch.json。 一、总体流程 通过c/c插件我们就可以来编写c/c程序了&#xff0c…

网贷大数据查询多了对征信有影响吗?

网贷大数据在日常的金融借贷中起到很重要的风控作用&#xff0c;不少银行已经将大数据检测作为重要的风控环节。很多人在申贷之前都会提前了解自己的大数据信用情况&#xff0c;那网贷大数据查询多了对征信有影响吗?本文带你一起去看看。 首先要说结论&#xff1a;那就是查询网…
最新文章