云计算百科
云计算领域专业知识百科平台

【C++仿Muduo库#3】Server 服务器模块实现上

在这里插入图片描述

📃个人主页:island1314

⛺️ 欢迎关注:👍点赞 👂🏽留言 😍收藏 💞 💞 💞

  • 生活总是不会一帆风顺,前进的道路也不会永远一马平川,如何面对挫折影响人生走向 – 《人民日报》

🔥 目录

    • 一、Buffer 模块
    • 二、日志模块
    • 三、套接字 Socket 设计
      • 1. 代码实现
      • 2. 代码检测
      • 3. 细节处理
        • 细节1:处理 Recv 函数时, errno 的来源以及 为啥不用 `EWOULDBLOCK`
        • 细节2:MSG_DONWAIT 的概述
        • 细节3:关于 ReuseAddr()
          • 📌 为什么默认不允许端口复用?
          • 🧠 举个例子:服务重启时的 `TIME_WAIT` 问题
          • 🧾小结
        • 细节4:宏污染
    • 四、Channel 类设计
      • 1. 代码实现
      • 2. 细节处理
        • 细节1:在 `HandleEvent` 函数中使用 `if-else if` 结构而非多个独立的 `if`
    • 五、Poller 模块实现
      • 1. 代码实现
      • 2. 细节处理
        • 细节1:epoll 是水平触发(LT)还是边缘触发(ET)?区别是什么?
        • 细节2:Poll 方法返回的活跃事件是如何处理的?
        • 细节3:Poller 是否支持多线程同时调用 Poll 方法?
        • 细节3:epoll_wait 的超时时间为何设置为 -1?是否合理?
      • 3. 与 Channel 的整合测试
    • 六、EventLoop 模块实现
      • 1. 关于 evenfd 函数
        • 1.1 函数概述
        • 1.2 代码示例
        • 1.3 使用场景及注意事项
      • 2. Eventloop 模块概述
      • 3. 与 TimeWheel 模块整合
        • 代码整合示例
      • 4. 代码测试
      • 5. 细节分析
        • 细节1:定时器任务中异步执行回调
        • 细节2:服务器端关闭再启动的文件描述符(fd)不变
        • 细节3:`Channel` 类中的 `Remove` 和 `Update` 方法为何调用 `EventLoop` 的接口?
        • 细节4:如何避免定时器任务的重复添加?

一、Buffer 模块

本质:缓冲区模板

功能:存储数据,取出数据

实现思想:

  • 实现缓冲区得有一块内存空间,采用 vector<char>

    vector 底层其实使用的是一个线性的内存空间

  • 要素:

  • 默认的空间大小
  • 当前的读取数据位置
  • 当前的写入数据位置
  • 操作:

  • 写入数据:当前写入位置指向哪里,就从哪里开始写入,如果后续剩余空闲空间不够了,这个就考虑征途缓冲区空闲空间是否足够(因为 读位置也会向后偏移,也就是说前面也可能有空闲空间)

    • 足够:将数据移动到起始位置即可

    • 不够:扩容,从当前写位置开始扩容足够大小

    • 数据一旦写入成功,当前写位置,就要往后偏移

  • 读取数据:当前读取位置指向哪里,就从哪里开始读取,前提是有数据可读

    • 可读数据大小:当前写入位置 – 当前读取位置
  • 设计如下:

    class Buffer{
    public:
    // 1. 获取当前写位置地址
    // 2. 确保可写空间足够
    // 3. 获取前沿空间大小
    // 4. 获取后沿空间大小
    // 5. 将写位置向后移动指定长度
    // 6. 获取当前读位置地址
    // 7. 获取可读数据大小
    // 8. 将读位置向后移动指定长度
    // 9. 清理功能
    private:
    std::vector<char> _buffer;
    // 位置, 是一个相对偏移量, 而不是绝对地址
    uint64_t _read_idx; // 相对读偏移
    uint64_t _write_idx; // 相对写偏移
    };

  • 代码如下:

    class Buffer{
    private:
    // 注意: 这里的起始地址:_buffer.data() 或者 &*_buffer.begin() 都可以
    char *Begin(){return &*_buffer.begin();}

    // 读写数据
    void ReadData(void *data, uint64_t len) {
    assert(len <= ReadableSize());
    std::copy(GetReadPos(), GetReadPos() + len, (char*)data);
    // MoveReadOffset(len);
    }
    void WriteData(const void *data, uint64_t len) {
    // 1. 确保可写空间足够 2. 拷贝数据
    if(len <= 0) return ;
    EnsureWriteSpace(len);
    const char *d = static_cast<const char *>(data);
    std::copy(d, d + len, GetWritePos()); // 把 data 复制到 缓冲区
    // MoveWriteOffset(len);
    }

    void WriteBuffer(Buffer &buf) {
    return WriteData(buf.GetReadPos(), buf.ReadableSize());
    }

    void WriteString(const std::string &str) {
    return WriteData(str.c_str(), str.size());
    }

    public:
    Buffer(uint64_t size = 1024): _reader_idx(0), _writer_idx(0){
    _buffer.resize(size);
    }

    // 获取当前读写位置地址
    char *GetWritePos() {return Begin() + _writer_idx;}
    char *GetReadPos(){return Begin() + _reader_idx;}

    // 将读写位置向后移动指定长度
    void MoveReadOffset(uint64_t len){
    assert(len <= ReadableSize());
    _reader_idx += len;
    }
    void MoveWriteOffset(uint64_t len){
    assert(len <= BufferHeadSize() + BufferTailSize());
    _writer_idx += len;
    }

    // 获取一行数据
    std::string GetLine(){
    char *pos = FindCRLF();
    if (pos == nullptr) {
    return "";
    }
    return ReadAsString(pos – ReadPos() + 1);
    }

    // 获取缓冲区末尾空闲空间大小 — 写偏移之后的空闲空间
    uint64_t BufferTailSize() {return _buffer.size() – _writer_idx;}
    // 获取缓冲区起始空闲空间大小 — 读偏移之前的空闲空间
    uint64_t BufferHeadSize(){return _reader_idx;}

    // 获取可读数据大小
    uint64_t ReadableSize() {return _writer_idx – _reader_idx;}

    // 确保可写空间足够 (移动 / 扩容)
    void EnsureWriteSpace(uint64_t len) {
    // 1. 末尾空闲空间大小足够, 直接返回
    if(BufferTailSize() >= len) return;
    // 2. 先移动读偏移
    if (len <= BufferHeadSize() + BufferTailSize()) {
    // 3. 空闲空间足够, 数据移动到起始位置
    uint64_t readable_size = ReadableSize(); // 保存当前数据大小
    // std::memmove(_buffer.data(), _buffer.data() + _reader_idx, ReadableSize());
    std::copy(GetReadPos(), GetReadPos() + readable_size, Begin()); // 将可读数据保存到起始位置

    // 更新读写偏移
    _writer_idx = readable_size;
    _reader_idx = 0;
    } else {
    // 4. 扩容
    uint64_t new_size = _buffer.size() * 2; // 避免持续扩容
    while (new_size < len) {
    new_size *= 2;
    }
    _buffer.resize(new_size);
    }
    }
    void WriteAndPush(const void *data, uint64_t len) {
    WriteData(data, len);
    MoveWriteOffset(len);
    }

    void WriteStringAndPush(const std::string &str) {
    WriteString(str);
    MoveWriteOffset(str.size());
    }

    void WriteBufferAndPush(Buffer &buf) {
    WriteBuffer(buf);
    MoveWriteOffset(buf.ReadableSize());
    }

    void ReadAndPop(void *buf, uint64_t len) {
    ReadData(buf, len);
    MoveReadOffset(len);
    }

    std::string ReadAsString(uint64_t len){
    assert(len <= ReadableSize());
    std::string str;
    str.resize(len);
    ReadData(&str[0], len);
    return str;
    }

    std::string ReadAsStringAndPop(uint64_t len){
    std::string str = ReadAsString(len);
    MoveReadOffset(len);
    return str;
    }

    char *FindCRLF(){
    char *res = (char*)std::memchr(GetReadPos(), '\\n', ReadableSize());
    return res;
    }

    std::string GetLineAndPop(){
    std::string str = GetLine();
    MoveReadOffset(str.size());
    return str;
    }

    // 9. 清空缓冲区
    void clear(){
    _buffer.clear();
    _reader_idx = 0;
    _writer_idx = 0;
    }

    private:
    std::vector<char> _buffer; // 使用 vector 进行内存空间管理
    // 位置, 是一个相对偏移量, 而不是绝对地址
    uint64_t _reader_idx; // 相对读偏移
    uint64_t _writer_idx; // 相对写偏移
    };

    测试代码如下

    int main() {
    Buffer buffer;

    for(int i = 0; i < 300; i++){
    std::string str = "hello world" + std::to_string(i) + "\\n";
    buffer.WriteStringAndPush(str);
    }

    while(buffer.GetReadableSize() > 0){
    std::string line = buffer.GetLineAndPop();
    std::cout << "Line: " << line << std::endl;
    }

    // // std::string str = "hello world";
    // // buffer.WriteStringAndPush(str);
    // std::cout << "Buffer size: " << buffer.GetReadableSize() << std::endl;
    // std::cout << "Buffer content: " << buffer.ReadAsStringAndPop(buffer.GetReadableSize()) << std::endl;

    // buffer.WriteStringAndPush("hello world\\n");
    // std::string tmp = buffer.ReadAsStringAndPop(buffer.GetReadableSize());
    // std::cout << "tmp: " << tmp << std::endl;
    // std::cout << "Buffer size: " << buffer.GetReadableSize() << std::endl;

    return 0;
    }

    二、日志模块

    详情可以参考我的这篇文章:

    #define INF 0
    #define DBG 1
    #define ERR 2
    #define LOG_LEVEL 1

    #define LOG(level, format, ...) do{\\
    if(level < LOG_LEVEL) break;\\
    time_t t = time(nullptr);\\
    struct tm *tm = localtime(&t);\\
    char buf[64];\\
    strftime(buf, sizeof(buf) 1, "%Y-%m-%d %H:%M:%S", tm);\\
    printf("%s [%s:%d] " format "\\n", buf, __FILE__, __LINE__, ##__VA_ARGS__);\\
    }while(0)

    #define LOG_INFO(format, ...) LOG(INF, format, ##__VA_ARGS__)
    #define LOG_DEBUG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
    #define LOG_ERROR(format, ...) LOG(ERR, format, ##__VA_ARGS__)

    三、套接字 Socket 设计

    这个可以参考我之前的这篇文章:

    1. 代码实现

    // Socket 类
    #define MAXLISTEN 1024
    class Socket{
    private:
    // 1.创建套接字
    bool Create(){
    _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if(_sockfd < 0){
    LOG_ERROR("CREATE SOCKET ERROR");
    return false;
    }
    return true;
    }
    // 2.绑定地址信息
    bool Bind(const std::string &ip, uint16_t port){
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr(ip.c_str());

    socklen_t len = sizeof(struct sockaddr_in);
    int ret = bind(_sockfd, (struct sockaddr*)&addr, len);
    if(ret < 0){
    LOG_ERROR("BIND SOCKET ERROR");
    return false;
    }
    return true;
    }
    // 3.监听
    bool Listen(int backlog = MAXLISTEN){
    int ret = listen(_sockfd, backlog);
    if(ret < 0){
    LOG_ERROR("LISTEN SOCKET ERROR");
    return false;
    }
    return true;
    }
    // 4.向服务器发起连接
    bool Connect(const std::string &ip, uint16_t port){
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr(ip.c_str());

    socklen_t len = sizeof(struct sockaddr_in);
    int ret = connect(_sockfd, (struct sockaddr*)&addr, len);
    if(ret < 0){
    LOG_ERROR("CONNECT SOCKET ERROR");
    return false;
    }
    return true;
    }

    public:
    Socket():_sockfd(1){}
    ~Socket(){Close();}
    Socket(int sockfd): _sockfd(sockfd){}
    // 避免拷贝问题
    Socket(const Socket&) = delete;
    Socket& operator=(const Socket&) = delete;

    int Fd() const {return _sockfd;}

    // 5.获取新连接
    int Accept(){
    int newfd = accept(_sockfd, nullptr, nullptr);
    if(newfd < 0){
    LOG_ERROR("ACCEPT SOCKET ERROR");
    return 1;
    }
    return newfd; // 返回新连接的套接字
    }

    // 6.接收数据
    ssize_t Recv(void *buf, size_t len, int flag = 0){
    ssize_t ret = recv(_sockfd, buf, len, flag);
    // < 0 出错
    // = 0 连接断开
    // > 0 接收成功
    if(ret <= 0){
    // EAGAIN | EWOULDBLOCK: 当前 socket 的非阻塞缓冲区没有数据了
    // EINTR: 当前 socket 的阻塞等待被信号中断
    // ECONNRESET: 连接重置
    // ENOTCONN: 套接字未连接
    // ETIMEDOUT: 接收超时
    if(errno == EAGAIN || errno == EINTR){
    return 0; // 表示: 这次发送没有发送成功, 需重试
    }
    LOG_ERROR("Recv SOCKET %s" , strerror(errno));
    return 1; // 其他错误
    }
    return ret; // 实际接收长度
    }
    ssize_t NonBlockRecv(void *buf, size_t len){
    return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。
    }

    // 7.发送数据
    ssize_t Send(const void* buf, size_t len, int flag = 0){
    ssize_t ret = send(_sockfd, buf, len, flag);
    if(ret < 0){
    if(errno == EAGAIN || errno == EINTR){
    return 0; // 表示: 这次发送没有发送成功, 需重试
    }
    LOG_ERROR("SEND SOCKET %s" , strerror(errno));
    return 1; // 其他错误
    }
    return ret;
    }
    ssize_t NonBlockSend(const void* buf, size_t len){
    if(len == 0) return 0;
    return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞
    }

    // 8.关闭套接字
    void Close(){
    if(_sockfd != 1){
    close(_sockfd);
    _sockfd = 1;
    }
    }

    // 9.设置套接字选项 — 开启地址端口重用
    void ReuseAddr() {
    int opt = 1;
    // SO_REUSEADDR: 允许重用本地地址和端口
    // SO_REUSEPORT: 允许重用本地端口
    setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(opt));
    opt = 1;
    setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&opt, sizeof(opt));
    }

    // 10.设置套接字阻塞属性 — 设置为非阻塞
    void NonBlock(){
    int flag = fcntl(_sockfd, F_GETFL, 0);
    if(flag == 1){
    LOG_ERROR("GET SOCKET FLAG ERROR");
    return;
    }
    int ret = fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
    if(ret < 0){
    LOG_ERROR("SET SOCKET NONBLOCK ERROR");
    return;
    }
    }

    // 9. 创建一个服务器连接
    bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool nonblock_flag = false){
    if(!Create()) return false;
    if(nonblock_flag) NonBlock(); // 设置非阻塞
    ReuseAddr(); // 设置地址端口重用

    if(!Bind(ip, port)) return false;
    if(!Listen()) return false;
    return true;
    }
    // 10. 创建一个客户端连接
    bool CreateClient(uint16_t port, const std::string &ip){
    // 1. 创建套接字 2. 连接服务器
    if(!Create()) return false;
    if(!Connect(ip, port)) return false;
    return true;
    }

    private:
    int _sockfd;
    };

    2. 代码检测

    server.cpp

    int main()
    {
    Socket lst_sock;
    lst_sock.CreateServer(8080);

    while(1){
    int newfd = lst_sock.Accept();
    if(newfd < 0){
    continue;
    }
    Socket cli_sock(newfd);
    char buf[1024] = {0};
    int ret = cli_sock.Recv(buf, 1023);
    if(ret < 0){
    cli_sock.Close();
    continue;
    }
    cli_sock.Send(buf, ret);
    cli_sock.Close();
    }
    lst_sock.Close();
    return 0;
    }

    client.cpp

    int main()
    {
    Socket cli_sock;
    cli_sock.CreateClient(8080, "127.0.0.1");

    cli_sock.Send("Hello IsLand", strlen("Hello IsLand"));
    char buf[1024] = {0};
    cli_sock.Recv(buf, 1023);
    LOG_DEBUG("recv data: %s", buf);
    cli_sock.Close();
    return 0;
    }

    结果如下:

    lighthouse@VM-8-10-ubuntu:Test1$ ./client
    2025-05-02 10:48:41 [tcp_cli.cc:11] recv data: Hello IsLand

    3. 细节处理

    细节1:处理 Recv 函数时, errno 的来源以及 为啥不用 EWOULDBLOCK

    ① errno 的来源

    • errno 是 C标准库 中定义的全局变量(线程安全环境下由 __errno_location() 实现),用于存储系统调用或库函数失败时的错误码。

    • 当 recv 返回 -1 时,表示发生错误,具体的错误原因会通过 errno 变量传递(如 EAGAIN, EINTR 等)

      if (ret <= 0) {
      if (errno == EAGAIN || errno == EINTR) {
      return 0; // 表示非致命错误,继续尝试接收
      }
      }

    • 注意:recv 返回 -1 时,错误码需通过 errno 获取,而非直接从 ret 的值推断

    ② EAGAIN 和 EWOULDBLOCK 的关系

    • 在 Linux 系统 中,EAGAIN 和 EWOULDBLOCK 的值是相同的(均为 11),定义在 /usr/include/asm-generic/errno-base.h 中:

      #define EAGAIN 11 /* Try again */
      #define EWOULDBLOCK EAGAIN /* Operation would block */

    细节2:MSG_DONWAIT 的概述

    在 NonBlockRecv 和 NonBlockSend函数中,使用了 MSG_DONTWAIT 标志来实现 非阻塞接收 (Non-blocking Receive),这是网络编程中一种常见的异步通信机制。以下是对其工作原理和用途的详细解析:

    • MSG_DONTWAIT 是 recv 系统调用的一个标志位,用于 临时启用非阻塞模式 。
    • 它的作用是:即使当前套接字(socket)本身是阻塞模式(默认行为),也会让本次 recv 调用立即返回,而不是等待数据到达。
    • 如果此时接收缓冲区中没有数据,recv 会返回 -1,并将 errno 设置为 EAGAIN 或 EWOULDBLOCK(两者等价),表示“暂时无数据,稍后再试”
    细节3:关于 ReuseAddr()
    • SO_REUSEADDR:安全复用,允许同一地址和端口被多个套接字绑定(常用于快速重启服务)
    • SO_REUSEPORT:多进程共享端口,允许多个套接字绑定到完全相同的地址和端口(需所有套接字均设置此选项),用于负载均衡
    • 建议 :若无需多进程/线程共享端口,仅保留 SO_REUSEADDR

    那么之前 我们说过如下:

    • 在 TCP/IP 协议中,一个 IP 地址和端口组合(即 socket 地址)默认情况下只能被一个 socket 绑定 ,这是为了防止多个进程同时监听同一个端口,造成数据混乱。但在某些特殊场景下,我们确实需要“复用”端口,这就引入了 SO_REUSEADDR 和 SO_REUSEPORT 选项。

    📌 为什么默认不允许端口复用?
    • 每个 TCP/UDP 连接由五元组唯一标识:{协议, 源IP, 源端口, 目的IP, 目的端口}
    • 其中,服务器监听的 socket 地址(即 bind() 的地址)决定了它能接收哪些连接。如果多个 socket 绑定到相同的地址和端口,系统将无法判断哪个 socket 应该处理新连接,从而导致冲突。

    因此,默认情况下,系统禁止两个 socket 绑定到相同的地址和端口(之前在这篇【Linux网络#2】: Socket 编程 就提过一个端口号只能被一个进程占用,但是一个进程能够绑定多个端口)

    ① SO_REUSEADDR:允许“安全”的端口复用

    ✅ 用途

    • 快速重启服务 :当服务意外崩溃或正常关闭后,TCP 连接可能仍处于 TIME_WAIT 状态(通常持续 2MSL,约 60 秒),此时端口仍被占用。
    • 避免“Address already in use”错误 :通过设置 SO_REUSEADDR,可以让服务在 TIME_WAIT 状态期间重新绑定到端口。

    🧠 原理

    • SO_REUSEADDR 允许绑定到已被其他 socket 使用的地址,但前提是:
      • 该 socket 已关闭(即不再有活跃连接)。
      • 或者该 socket 也设置了 SO_REUSEADDR。
    • 内核会检查当前是否有活跃连接,如果没有,则允许复用。

    ② SO_REUSEPORT:允许多个 socket 同时绑定到相同地址和端口

    ✅ 用途

    • 负载均衡 :多个进程或线程可以同时监听相同的地址和端口,内核负责将连接均匀分配给它们。
    • 高并发场景 :适用于需要并行处理大量连接的服务(如 Web 服务器)。

    🧠 原理

    • 多个 socket 可以绑定到相同的地址和端口,但所有 socket 必须都设置 SO_REUSEPORT 。
    • 内核会使用一种机制(如哈希算法)将连接请求分发给各个 socket。

    ⚠️ 注意事项

    • SO_REUSEPORT 是 Linux 3.9+ 引入的特性,旧版本系统不支持。
    • 不同 socket 之间的负载均衡策略依赖内核实现,不同系统行为可能不同。

    🧩 为什么 SO_REUSEADDR 能绕过“一个端口只能被一个进程占用”?

    虽然 TCP/IP 协议规定一个 socket 地址只能被一个 socket 绑定,但 SO_REUSEADDR 是一个“例外规则”,它允许在特定条件下复用地址。

    ✅ 条件如下:

  • 原 socket 已关闭 :即没有活跃连接。
  • 新 socket 设置了 SO_REUSEADDR 。
  • 原 socket 也设置了 SO_REUSEADDR (可选)。
  • 在这种情况下,内核认为复用是“安全”的,不会导致连接混乱,因此允许绑定。


    🧠 举个例子:服务重启时的 TIME_WAIT 问题

    这个情况之前在 【Linux网络#11】: 传输层协议 TCP 四次挥手的内容下也提过

    假设你写了一个 TCP 服务器,监听在 0.0.0.0:8080。当你关闭服务器后立即重启,可能会遇到如下错误:

    bind: Address already in use

    这是因为:

    • 关闭连接后,TCP 连接进入 TIME_WAIT 状态(持续 2MSL,约 60 秒),以确保所有残留数据包都被丢弃。
    • 在此期间,端口仍被占用,系统不允许新 socket 绑定。

    解决办法 :在 bind() 之前设置 SO_REUSEADDR,这样即使端口仍处于 TIME_WAIT 状态,也可以绑定成功。


    🧾小结
    特性SO_REUSEADDRSO_REUSEPORT
    是否允许多个 socket 否(默认)
    是否需要所有 socket 设置
    用途 快速重启服务 多进程/线程负载均衡
    是否破坏唯一性 否(仅在安全条件下复用) 是(允许多个 socket 监听相同地址)
    是否跨平台 高度支持 Linux 3.9+
    是否影响连接分发 是(内核分发连接)

    ✅ 推荐使用方式

    • 普通服务重启 :使用 SO_REUSEADDR,避免 TIME_WAIT 导致的绑定失败。
    • 高并发负载均衡 :使用 SO_REUSEPORT,多个进程/线程共享端口,提升性能。
    • 避免滥用 :除非明确需要,否则不要同时设置两者,防止行为不可预测。

    虽然 TCP/IP 协议规定一个端口只能被一个 socket 绑定,但 SO_REUSEADDR 和 SO_REUSEPORT 提供了“例外”机制,分别用于 安全复用 和多进程共享端口


    细节4:宏污染

    由于最开始的时候,我的日志实现代码 和 测试代码都用了相同的 局部变量 char buf

    • 日志定义了一个局部变量 char buf[64],与 server.cpp 和 client.cpp 中的 char buf[1024] 同名但作用域不同
    • 虽然从语法上看,宏中的 buf 是局部变量,不会影响外部的 buf,但在某些编译器或特定优化条件下,栈内存的布局可能会导致 buf 被意外覆盖 。

    // LOG_DEBUG 宏定义
    char buf[64]; // 与 server.cpp 和 client.cpp 中的 char buf[1024] 同名

    当客户端调用 LOG_DEBUG("recv data: %s", buf) 时,宏展开后会生成一个 char buf[64],用于存储时间戳字符串。

    如果编译器在栈上分配内存时,将宏内的 buf 和外部的 buf 紧邻存放,printf 的格式化输出可能会溢出到外部的 buf 中 ,导致接收到的数据被覆盖为时间戳字符串。

    这样就导致我输出了如下的数据结果:

    lighthouse@VM810ubuntu:Test1$ ./client
    20250502 10:45:38 [tcp_cli.cc:11] recv data: 20250502 10:45:38

    四、Channel 类设计

    目的:对描述符的监控事件管理

    功能:

  • 事件管理:描述符是否可读写,对描述符的监控可读可写,解除 事件 监控
  • 事件触发后处理的管理
  • 需要处理的事件:可读,可写,挂断,错误,任意
  • 事件处理的回调函数
  • 成员:

    • epoll 进行事件监控
      • EPoollIN:可读
      • EPoollOUT:可写
      • EPoollRDHUP:连接断开
      • EPoollPRI:优先数据
      • EPoollERR:出错
      • EPoollHUP:挂断

    1. 代码实现

    class Channel{
    public:
    using EventCallback = std::function<void()>; // 注意: 这里不能放在 private 中, 否则会报错

    explicit Channel(int fd):_fd(fd),_events(0),_revents(0){} // 显式调用

    ~Channel(){
    if (_fd != 1) {
    close(_fd);
    _fd = 1; // 避免重复关闭
    }
    }

    int Fd() const {return _fd;}
    uint32_t Events() const {return _events;}

    // 判断当前事件是否可读写
    bool ReadAble() const { return (_events & EPOLLIN); }
    bool WriteAble() const { return (_events & EPOLLOUT);}

    // 设置回调函数
    void SetReadCallback(const EventCallback& cb){ _read_callback = cb;}
    void SetWriteCallback(const EventCallback& cb){_write_callback = cb;}
    void SetCloseCallback(const EventCallback& cb){_close_callback = cb;}
    void SetErrorCallback(const EventCallback& cb){_error_callback = cb;}
    void SetEventCallback(const EventCallback& cb){_event_callback = cb;}
    /* 监控事件开关 — 进行事件监控连接后, 描述符就绪事件, 设置实际就绪事件*/
    void SetREvents(uint32_t events){ _revents = events;}

    /* 开启事件监控 */
    void EnableRead(){ _events |= EPOLLIN;}
    void EnableWrite(){ _events |= EPOLLOUT;}

    /* 关闭事件监控 */
    void DisableRead(){ _events &= ~EPOLLIN;}
    void DisableWrite(){ _events &= ~EPOLLOUT;}
    void DisableAll(){_events = 0;} // 关闭所有事件

    /* 后面调用 Poller 和 EventLoop 接口来移除事件监控 */
    void Remove(){} // 移除事件
    void Update(){} // 更新事件

    void HandleEvent(){ // 处理事件, 判断连接触发了什么事件
    // 实际项目中, 连接断开并不会直接先断开, 还是看到是否有数据可读, 先读完数据或者发送出错再断开
    if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){
    // 不管任何事件, 都会调用的回调函数
    if(_event_callback) _event_callback();
    if(_read_callback) _read_callback();
    }
    else if(_revents & EPOLLOUT){
    // 不管任何事件, 都会调用的回调函数
    if(_event_callback) _event_callback();
    if(_write_callback) _write_callback();
    }
    else if(_revents & EPOLLERR){
    if(_error_callback) _error_callback();
    }
    else if(_revents & EPOLLHUP){
    if(_close_callback) _close_callback();
    }
    }
    private:
    int _fd; // 事件对应的文件描述符
    uint32_t _events; // 当前需要监控的事件
    uint32_t _revents; // 当前连接触发的事件

    EventCallback _read_callback; // 读事件回调
    EventCallback _write_callback; // 写事件回调
    EventCallback _close_callback; // 关闭事件回调
    EventCallback _error_callback; // 错误事件回调
    EventCallback _event_callback; // 任意事件回调
    };

    2. 细节处理

    细节1:在 HandleEvent 函数中使用 if-else if 结构而非多个独立的 if

    使用 if-else if 是为了保障资源安全、明确事件优先级 ,并避免因同时处理多个事件导致的未定义行为

    ① 事件优先级和互斥性

    • 错误和挂起事件的优先级更高 EPOLLERR(错误)和 EPOLLHUP(挂起)通常表示连接出现严重问题(如对端关闭、网络中断),需要立即处理 如果这些事件与读写事件同时发生,应优先处理错误/挂起事件,避免在无效连接上继续执行读写操作
    • 互斥处理:某些事件的处理逻辑可能互斥。例如:
      • 写事件(EPOLLOUT)处理可能触发连接关闭,导致后续的错误/挂起事件处理访问已释放的对象
      • 错误/挂起事件处理通常直接终止连接,无需再处理读写

    ② 资源安全:避免访问已释放对象

    • 写事件处理可能释放连接 在注释中明确指出:“有可能会释放连接的操作事件,一次只处理一个” 若写事件的回调(如 _write_callback)中关闭了连接(如 close(fd) 或删除 Channel 对象),后续的错误/挂起事件处理若继续执行,可能导致访问已释放资源 (如空指针、无效文件描述符)
    • 使用 else if 阻断后续逻辑 通过 else if 结构,确保一旦处理了写事件,错误/挂起事件将不再被处理,从而避免潜在的资源管理问题

    ③ 逻辑清晰和可维护

    • 明确事件处理顺序:if-else if 结构清晰表达了事件处理的优先级顺序 ,使代码更易理解和维护
    • 避免冗余判断:若多个事件同时发生(如 EPOLLOUT | EPOLLERR),使用 else if 可避免重复判断和执行无关逻辑,提升效率。

    那么什么时候可以使用独立的 if 呢 ???

    若多个事件的处理逻辑相互独立且不会导致对象销毁或状态变化 ,可以使用独立的 if 语句,例如

    if (_revents & EPOLLIN) { /* 读事件 */ }
    if (_revents & EPOLLOUT) { /* 写事件 */ }

    此时,即使同时触发读和写事件,两者都会被处理,且不会互相干扰

    五、Poller 模块实现

    意义:通过 epoll 实现对描述符的 IO 事件监控

    功能:

  • 添加 / 修改描述符的事件监控(不存在则添加,存在则修改)
  • 移除描述符的事件监控
  • 封装思想

  • 必须拥有一个 epoll 的操作句柄
  • 拥有一个 struct epoll_event 的结构数组,监控时保存所有的活跃事件
  • 使用 hash 表管理描述符与描述符对应的事件管理 Channel 对象
  • 逻辑流程
    • 对描述符进行监控,通过 Channel 才能知道描述符需要监控的事件
    • 当描述符就绪了,通过描述符在 hash 表中找到对应的 Channel(得到了 Channel 才能什么事件如何处理)
    • 当描述符就绪了,返回就绪描述符对应的 Channel
  • 1. 代码实现

    #define MAX_EPOLLEREVENTS 1024
    class Poller{
    private:
    // 对 epoll 的之间操作
    void Update(Channel* channel, int op){
    // int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    int fd = channel->Fd();
    struct epoll_event event;
    event.data.fd = fd; // 事件对应的文件描述符
    event.events = channel->Events(); // 需要监控的事件
    int ret = epoll_ctl(_epfd, op, fd, &event);
    if(ret < 0){
    LOG_ERROR("EPOLL_CTL ERROR");
    }
    return ;
    }

    // 判断一个 Channel 是否已经添加了 事件监控
    bool HasChannel(Channel* channel){
    return _channels.find(channel->Fd()) != _channels.end();
    }

    public:
    Poller(){
    // _epfd = epoll_create(MAX_EPOLLEREVENTS); // 这个已经过时
    _epfd = epoll_create1(EPOLL_CLOEXEC); // 创建 epoll 文件描述符
    if(_epfd < 0){
    LOG_ERROR("EPOLL_CREATE ERROR");
    abort(); // 退出程序
    }
    }

    Poller(const Poller&) = delete;
    Poller& operator=(const Poller&) = delete;

    // 添加 / 修改 监控事件
    void UpdateEvent(Channel* channel){
    bool ret = HasChannel(channel);
    if(!ret){
    // 不存在, 添加事件
    _channels.insert(std::make_pair(channel->Fd(), channel)); // 添加到映射表
    return Update(channel, EPOLL_CTL_ADD);
    }
    return Update(channel, EPOLL_CTL_MOD); // 已经存在, 修改事件
    }

    void RemoveEvent(Channel* channel){
    auto it = _channels.find(channel->Fd());
    if(it != _channels.end()){
    _channels.erase(it); // 从映射表中删除
    }
    Update(channel, EPOLL_CTL_DEL); // 删除事件
    }

    // 开始监控, 返回活跃连接
    void Poll(std::vector<Channel*> *active){
    // int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
    int nfds = epoll_wait(_epfd, _events, MAX_EPOLLEREVENTS, 1);
    if(nfds < 0){
    if(errno == EINTR){
    return; // 被信号中断
    }
    LOG_ERROR("EPOLL_WAIT ERROR: %s\\n", strerror(errno));
    abort(); // 退出程序
    }
    // 遍历活跃连接
    for(int i = 0; i < nfds; ++i){
    int fd = _events[i].data.fd;
    auto it = _channels.find(fd);
    assert(it != _channels.end()); // 断言: 事件一定存在

    Channel* channel = it->second;
    channel->SetREvents(_events[i].events); // 设置当前事件
    active->push_back(channel); // 添加到活跃连接列表
    }
    }

    private:
    int _epfd; // epoll 文件描述符
    struct epoll_event _events[MAX_EPOLLEREVENTS]; // epoll 事件数组
    std::unordered_map<int, Channel*> _channels; // fd -> Channel 映射表
    };

    2. 细节处理

    细节1:epoll 是水平触发(LT)还是边缘触发(ET)?区别是什么?
    • 答案 :默认是 LT,LT 在数据未处理完时会持续通知;ET 仅在状态变化时通知一次,需配合非阻塞 I/O 使用
    细节2:Poll 方法返回的活跃事件是如何处理的?
    • 答案 :遍历 epoll_wait 返回的事件,填充到 active 列表中,并设置 Channel 的 _revents
    细节3:Poller 是否支持多线程同时调用 Poll 方法?
    • 答案 :不支持,需通过锁或每个线程使用独立的 epoll 实例(如 Reactor 模式)
    细节3:epoll_wait 的超时时间为何设置为 -1?是否合理?
    • 答案 :-1 表示无限等待,适合服务器模型;但需根据业务需求调整,如设置超时处理定时任务

    3. 与 Channel 的整合测试

    由于我们有些东西,需要 Poller 来结合测试,所以对 Channel 模块也需要做一些修改,如下:

    class Poller;

    class Channel{
    public:
    using EventCallback = std::function<void()>; // 注意: 这里不能放在 private 中, 否则会报错
    explicit Channel(Poller*poller, int fd):_fd(fd),_events(0),_revents(0),_poller(poller){} // 显式调用

    ~Channel(){
    if (_fd != 1) {
    close(_fd);
    _fd = 1; // 避免重复关闭
    }
    }

    int Fd() const {return _fd;}
    uint32_t Events() const {return _events;}

    // 判断当前事件是否可读写
    bool ReadAble() const { return (_events & EPOLLIN); }
    bool WriteAble() const { return (_events & EPOLLOUT);}

    // 设置回调函数
    void SetReadCallback(const EventCallback& cb) {_read_callback = cb;}
    void SetWriteCallback(const EventCallback& cb){_write_callback = cb;}
    void SetCloseCallback(const EventCallback& cb){_close_callback = cb;}
    void SetErrorCallback(const EventCallback& cb){_error_callback = cb;}
    void SetEventCallback(const EventCallback& cb){_event_callback = cb;}
    /* 监控事件开关 — 进行事件监控连接后, 描述符就绪事件, 设置实际就绪事件*/
    void SetREvents(uint32_t events){ _revents = events;}

    /* 开启事件监控 */
    void EnableRead(){ _events |= EPOLLIN; Update();}
    void EnableWrite(){ _events |= EPOLLOUT; Update();}

    /* 关闭事件监控 */
    void DisableRead(){ _events &= ~EPOLLIN; Update();}
    void DisableWrite(){ _events &= ~EPOLLOUT; Update();}
    void DisableAll(){_events = 0; Update();} // 关闭所有事件

    /* 后面调用 EventLoop 接口来移除事件监控 */
    void Remove(); // 移除事件
    void Update(); // 更新事件

    void HandleEvent(){ // 处理事件, 判断连接触发了什么事件
    // 实际项目中, 连接断开并不会直接先断开, 还是看到是否有数据可读, 先读完数据或者发送出错再断开
    if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){
    // 不管任何事件, 都会调用的回调函数
    if(_event_callback) _event_callback();
    if(_read_callback) _read_callback();
    }
    else if(_revents & EPOLLOUT){
    // 不管任何事件, 都会调用的回调函数
    if(_event_callback) _event_callback();
    if(_write_callback) _write_callback();
    }
    else if(_revents & EPOLLERR){
    if(_error_callback) _error_callback();
    }
    else if(_revents & EPOLLHUP){
    if(_close_callback) _close_callback();
    }
    }
    private:
    int _fd; // 事件对应的文件描述符
    uint32_t _events; // 当前需要监控的事件
    uint32_t _revents; // 当前连接触发的事件
    Poller* _poller; // 事件循环

    EventCallback _read_callback; // 读事件回调
    EventCallback _write_callback; // 写事件回调
    EventCallback _close_callback; // 关闭事件回调
    EventCallback _error_callback; // 错误事件回调
    EventCallback _event_callback; // 任意事件回调
    };

    void Channel::Remove(){return _poller->RemoveEvent(this); } // 移除事件
    void Channel::Update(){return _poller->UpdateEvent(this);} // 更新事件

    测试代码如下:

    server.cpp

    #include "../../source/server.hpp"

    void HandleClose(Channel *channel){
    std::cout << "HandleClose: " << channel->Fd() << std::endl;
    channel->Remove(); // 移除事件
    delete channel; // 释放内存
    }

    void HandleRead(Channel *channel){
    int fd = channel->Fd();
    char buf[1024] = {0};
    ssize_t ret = recv(fd, buf, 1023, 0);
    if(ret <= 0){
    return HandleClose(channel); // 关闭事件
    }
    std::cout << buf << std::endl;
    channel->EnableWrite(); // 启动可写事件
    }

    void HandleWrite(Channel *channel){
    int fd = channel->Fd();
    const char *data = "I miss You";
    ssize_t ret = send(fd, data, strlen(data), 0);
    if(ret < 0){
    return HandleClose(channel); // 关闭事件
    }
    channel->DisableWrite(); // 关闭可写事件
    }

    void HandleError(Channel *channel){
    return HandleClose(channel);
    }
    void HandlEvent(Channel *channel){
    std::cout << "有了一个事件" << std::endl;
    }

    void Acceptor(Poller* poller, Channel *lst_channel)
    {
    int fd = lst_channel->Fd();
    int newfd = accept(fd, nullptr, nullptr);
    if(newfd < 0) return;

    Channel *channel = new Channel(poller, newfd);
    channel->SetReadCallback(std::bind(HandleRead, channel)); // 为通信套接字设置可读事件回调函数
    channel->SetWriteCallback(std::bind(HandleWrite, channel)); // 可写事件的回调函数
    channel->SetCloseCallback(std::bind(HandleClose, channel)); // 关闭事件的回调函数
    channel->SetErrorCallback(std::bind(HandleError, channel)); // 错误事件的回调函数
    channel->SetEventCallback(std::bind(HandlEvent, channel)); // 任意事件的回调函数

    channel->EnableRead(); // 监听读事件
    }

    int main()
    {
    Poller poller;
    Socket lst_sock;
    lst_sock.CreateServer(8080);

    // 为监听套接字, 创建一个 Channel 进行事件的管理及处理
    Channel channel(&poller, lst_sock.Fd());
    channel.SetReadCallback(std::bind(Acceptor, &poller, &channel)); // 设置监听套接字的可读事件回调函数
    channel.EnableRead();

    while(1){
    std::vector<Channel*> actives;
    poller.Poll(&actives); // 开始监控, 返回活跃连接
    for(auto& a: actives){
    a->HandleEvent(); // 处理事件
    }
    }
    lst_sock.Close();
    return 0;
    }

    client.cpp

    #include "../../source/server.hpp"

    int main()
    {
    Socket cli_sock;
    cli_sock.CreateClient(8080, "127.0.0.1");

    while(1){
    std::string str = "Hello IsLand";
    cli_sock.Send(str.c_str(), str.size());
    char buf[1024] = {0};
    cli_sock.Recv(buf, 1023);
    LOG_DEBUG("%s", buf);
    sleep(1);
    }
    return 0;
    }

    结果如下:

    lighthouse@VM-8-10-ubuntu:Test2$ ./client
    2025-05-04 21:54:10 [tcp_cli.cc:13] I miss You
    2025-05-04 21:54:11 [tcp_cli.cc:13] I miss You
    ^C

    lighthouse@VM-8-10-ubuntu:Test2$ ./server
    有了一个事件
    Hello IsLand
    有了一个事件
    有了一个事件
    Hello IsLand
    有了一个事件
    有了一个事件
    HandleClose: 5

    六、EventLoop 模块实现

    1. 关于 evenfd 函数

    eventfd 是 Linux 提供的一种轻量级的进程间通信(IPC)机制,用于在进程或线程之间传递事件通知。它通过一个文件描述符来实现计数器的功能,支持读写操作,适合用于事件通知或信号量的实现

    1.1 函数概述

    #include <sys/eventfd.h>

    int eventfd(unsigned int initval, int flags);

    • initval: 初始化计数器的值(uint64_t 类型)。这是 eventfd 的初始计数值。

    • flags: 用于设置文件描述符的行为,常见的标志包括:

      • EFD_CLOEXEC: 设置 close-on-exec 标志,表示在调用 exec 系列函数时自动关闭该文件描述符,禁止进程复制
      • EFD_NONBLOCK: 设置非阻塞模式,读写操作不会阻塞。
      • EFD_SEMAPHORE: 启用信号量语义(每次读取时计数器减 1,而不是清零)

    返回值

    • 成功时返回一个文件描述符(efd),可以通过 read 和 write 操作与 eventfd 交互(注意:read&write 进行 IO 的时候数据只能是 一个 8 字节数据 )

    • 失败时返回 -1,并设置 errno 以指示错误原因。

    功能

  • 写入计数器:

    • 使用 write 向 eventfd 写入一个 uint64_t 值,计数器会累加该值。
    • 如果计数器的值超过 UINT64_MAX,会返回错误 EOVERFLOW。
  • 读取计数器:

    • 使用 read 从 eventfd 读取一个 uint64_t 值:
      • 如果未设置 EFD_SEMAPHORE,读取操作会返回计数器的当前值,并将计数器清零。
      • 如果设置了 EFD_SEMAPHORE,每次读取会返回 1,并将计数器减 1。
    • 如果计数器为 0 且未设置 EFD_NONBLOCK,read 会阻塞;如果设置了 EFD_NONBLOCK,则返回 -1 并设置 errno 为 EAGAIN。

  • 1.2 代码示例

    #include <stdio.h>
    #include <stdint.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <sys/eventfd.h>

    int main()
    {
    int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
    if(efd < 0){
    perror("eventfd");
    return 1;
    }

    uint64_t val = 1;
    write(efd, &val, sizeof(val));
    write(efd, &val, sizeof(val));
    write(efd, &val, sizeof(val));

    uint64_t res = 0;
    read(efd, &res, sizeof(res));
    printf("res = %lu\\n", res);

    close(efd);
    return 0;
    }

    // 结果如下:
    lighthouse@VM810ubuntu:eventfd$ ./ev
    res = 3

    分析如下:

    int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);

    • 创建一个 eventfd,初始计数器值为 0。
    • 设置了 EFD_CLOEXEC 和 EFD_NONBLOCK:
      • EFD_CLOEXEC: 在调用 exec 系列函数时自动关闭文件描述符。
      • EFD_NONBLOCK: 使读写操作非阻塞。

    uint64_t val = 1;
    write(efd, &val, sizeof(val));
    write(efd, &val, sizeof(val));
    write(efd, &val, sizeof(val));

    • 向 eventfd 写入 1 三次,计数器的值累加为 3。

    uint64_t res = 0;
    read(efd, &res, sizeof(res));
    printf("res = %lu\\n", res);

    • 从 eventfd 读取计数器的值,res 被设置为 3,同时计数器清零。

    • 输出结果为:

      res = 3


    close(efd);

    • 关闭 eventfd 文件描述符,释放资源。
    1.3 使用场景及注意事项

    常见用途

  • 线程间同步:

    • 使用 eventfd 实现生产者-消费者模型或信号量机制。
  • 事件通知:

    • 在多线程或多进程环境中,用于通知某些事件的发生。
  • 与 epoll 配合:

    • 将 eventfd 文件描述符加入 epoll,用于事件驱动的程序中。
  • 注意事项

  • 计数器溢出:

    • 如果计数器的值超过 UINT64_MAX,write 会返回错误 EOVERFLOW
  • 非阻塞模式:

    • 如果设置了 EFD_NONBLOCK,在计数器为 0 时调用 read 会返回 -1 并设置 errno 为 EAGAIN
  • 信号量模式:

    • 如果设置了 EFD_SEMAPHORE,每次读取会返回 1,并将计数器减 1,而不是清零
  • 2. Eventloop 模块概述

    Eventloop:进行事件监控,以及事件处理的模块(关键点:这个模块和线程是一一对应的)

    • 监控了一个连接,而这个连接一旦就绪,就要进行事件处理。但是如果这个描述符,在多个线程中都触发了事件,进行处理,就会存在线程安全问题
    • 因此我们需要将一个连接的事件监控,以及连接事件处理,以及其他操作都放在同一个线程中进行

    如何保证一个连接的所有操作都在 eventloop 对应的线程中

    • 解决方案:给 eventloop 模块中,添加一个任务队列,对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当作任务添加到任务队列中

    eventloop 处理流程:

  • 在线程中对描述符进行事件监控
  • 有描述符就绪则对描述符进行事件处理(如何保证处理回调函数中的操作都在线程中)
  • 所有的就绪事件处理完了,这时候再去将任务队列中的所有任务–执行
  • 事件监控

  • 事件监控:使用 Poller 模块,有事件就绪则进行事件处理
  • 执行任务队列中的任务:一个线程安全的任务队列
  • 注意:由于有可能因为等待描述符 IO 事件就绪,导致执行流流程阻塞,这时候任务队列中的任务得不到指向

    • 因此需要需要有一个事件通知的东西,能够唤醒事件监控的阻塞

    代码实现

    class EventLoop{
    public:
    void RunAllTask(){
    // 在加锁期间取出所有任务, 给锁限定作用域
    std::vector<Functor> tasks;
    {
    std::lock_guard<std::mutex> lock(_mutex); // 加锁
    tasks.swap(_tasks); // 交换任务池, 取出所有任务
    }
    for(auto &t: tasks){
    t(); // 执行任务
    }
    return ;
    }

    static int CreateEventfd(){
    int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if(efd < 0){
    LOG_ERROR("CREATE EVENTFD ERROR");
    abort(); // 退出程序
    }
    return efd;
    }

    void ReadEventFd(){
    uint64_t data = 0;
    ssize_t ret = read(_event_fd, &data, sizeof(data));
    if(ret < 0){
    if(errno == EAGAIN || errno == EINTR){
    return; // 没有数据可读
    }
    LOG_ERROR("READ EVENTFD ERROR");
    abort(); // 退出程序
    }
    return ;
    }
    // 唤醒事件循环
    void WakeupEventFd(){
    uint64_t data = 1;
    ssize_t ret = write(_event_fd, &data, sizeof(data));
    if(ret < 0){
    if(errno == EAGAIN || errno == EINTR){
    return; // 没有数据可读
    }
    LOG_ERROR("WRITE EVENTFD ERROR");
    abort(); // 退出程序
    }
    return ;
    }
    public:
    using Functor = std::function<void()>;

    EventLoop()
    : _thread_id(std::this_thread::get_id()), // 获取当前线程 ID
    _event_fd(CreateEventfd()), // 创建 eventfd 唤醒 IO 事件监控
    _event_channel(new Channel(this, _event_fd)) // 创建事件循环的 Channel
    {
    //给eventfd添加可读事件回调函数,读取eventfd事件通知次数
    _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));
    _event_channel->EnableRead(); // 设置可读事件
    }

    // 判断当前线程是否是 EventLoop 中对应线程
    bool IsInLoop() {return _thread_id == std::this_thread::get_id();}

    // 修改/添加 描述符的事件监控
    void UpdateEvent(Channel* channel){
    assert(IsInLoop()); // 断言: 当前线程是事件循环线程
    _poller.UpdateEvent(channel); // 修改/添加事件监控
    }

    void RemoveEvent(Channel* channel){
    assert(IsInLoop());
    _poller.RemoveEvent(channel); // 移除事件监控
    }

    // 事件监控->就绪事件处理->执行任务
    void Start(){
    // 1, 事件监控
    std::vector<Channel*> actives; // 活跃连接
    _poller.Poll(&actives); // 进行事件监控
    // 2, 事件处理
    for(auto &channel: actives){
    channel->HandleEvent(); // 处理事件
    }
    // 3, 执行任务
    RunAllTask(); // 执行任务
    }

    // 压入任务队列
    void QueueInLoop(const Functor& cb){
    {
    std::lock_guard<std::mutex> lock(_mutex); // 加锁
    _tasks.emplace_back(cb); // 压入任务
    }
    // 唤醒事件循环 — 由于没有事件就绪 导致的 epoll 阻塞
    // 其实就是给 eventfd 写入一个数据, 使得 epoll 事件就绪
    WakeupEventFd();
    }

    // 判断要执行任务是否处于当前线程, 如果是则执行, 不是则压入队列
    void RunInLoop(const Functor& cb){
    if(IsInLoop()){
    cb();
    }else{
    QueueInLoop(cb);
    }
    }

    private:
    std::thread::id _thread_id; // 事件循环线程 ID
    int _event_fd; // eventfd 唤醒 IO 事件监控可能导致的阻塞
    // 注意: 这里的 Channel用智能指针进行管理, Poller 使用的对象
    std::unique_ptr<Channel> _event_channel; // 事件循环的 Channel
    Poller _poller; // 进行所有描述符的事件监控

    std::vector<Functor> _tasks; // 任务池
    std::mutex _mutex; // 互斥锁
    };

    // 注意: 这里的 Channel 类也要做一些改变, 类似于 Poller 模块的处理改变
    void Channel::Remove(){return _loop->RemoveEvent(this); } // 移除事件
    void Channel::Update(){return _loop->UpdateEvent(this);} // 更新事件

    3. 与 TimeWheel 模块整合

    由于我们需要用到我们之前所说的 TimeWheel 模块,并且对其做一些改变

    • 将定时器任务与事件循环绑定 :确保定时器回调在 EventLoop 线程中执行,避免线程安全问题
    • 利用事件驱动机制:通过 timerfd 触发定时任务,与 epoll 事件监控无缝结合
    • 支持任务的添加、刷新、取消和周期性执行

    ① TimerWheel 与 EventLoop 的绑定

    • TimerWheel 构造函数 :

      TimerWheel(EventLoop *loop)
      : _capacity(60), _tick(0), _loop(loop),
      _timerfd(CreateTimerfd()),
      _timer_channel(new Channel(_loop, _timerfd)) {
      _wheel.resize(_capacity);
      _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
      _timer_channel->EnableRead(); // 启动读事件监控
      }

      • 绑定关系 :TimerWheel 依赖于一个 EventLoop 实例,所有定时任务的执行都通过该 EventLoop 的线程完成
      • 事件驱动 :通过 timerfd 的可读事件触发定时任务处理(OnTime)

    ② 定时器任务的执行流程

    • OnTime 函数

      void RunTimerTask() {
      _tick = (_tick + 1) % _capacity;
      auto& tasks = _wheel[_tick];
      for (auto& task : tasks) {
      if (!task->_canceled) {
      task->_cb(); // 执行回调
      }
      task->_release(); // 释放资源
      }
      tasks.clear(); // 清空当前 tick 的任务
      }

      • 触发机制 :当 timerfd 被触发时,OnTime 会被调用,读取超时次数并依次处理每个 tick 的任务。

      • 线程安全 :OnTime 是 Channel 的读事件回调,由 EventLoop 的线程调用,确保所有定时任务在事件循环线程中执行。

    • RunTimerTask 函数 :

      void RunTimerTask() {
      _tick = (_tick + 1) % _capacity;
      auto& tasks = _wheel[_tick];
      for (auto& task : tasks) {
      if (!task->_canceled) {
      task->_cb(); // 执行回调
      }
      task->_release(); // 释放资源
      }
      tasks.clear(); // 清空当前 tick 的任务
      }

      • 任务执行 :遍历当前 tick 的所有任务,执行回调函数。
      • 资源管理 :通过 _release 删除 TimerWheel 中保存的任务映射,避免内存泄漏。

    ③ 定时器任务的添加与刷新

    • 添加任务 :

      void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc& cb) {
      PtrTask pt(new TimerTask(id, delay, cb));
      pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
      int pos = (_tick + delay) % _capacity;
      _wheel[pos].push_back(pt); // 将任务添加到轮子中
      _timers[id] = WeakTask(pt); // 保存任务映射
      }

      • 任务封装 :使用 shared_ptr 管理任务生命周期,weak_ptr 避免循环引用。
      • 位置计算 :根据当前 tick 和延迟时间 delay 计算任务在轮子中的位置。
    • 刷新任务 :

      void TimerRefreshInLoop(uint64_t id) {
      auto it = _timers.find(id);
      if (it == _timers.end()) return;
      PtrTask pt = it->second.lock();
      if (!pt) return;
      int remaining = pt->DelayTime();
      int pos = (_tick + remaining) % _capacity;
      _wheel[pos].push_back(pt); // 重新插入任务
      }

    • 重新插入 :将任务移动到新的 tick 位置,实现延迟效果。

    ④ 线程安全保证

    • 任务队列机制 :

      • QueueInLoop 方法 :所有对 Channel 或定时器的操作都通过 EventLoop::QueueInLoop 提交到任务队列。
      • RunInLoop 方法 :确保操作在事件循环线程中执行。
      • WakeupEventFd :通过写入 eventfd 唤醒阻塞的 epoll_wait,及时处理任务队列。
    • 定时器回调的线程一致性 :

      • TimerTask 析构函数 :

        ~TimerTask() {
        if (!_canceled) _cb(); // 直接执行回调
        _release();
        }

        关键点 :_cb() 的执行必须在 EventLoop 线程中,通过 TimerWheel::OnTime 触发,无需额外线程同步

    代码整合示例

    class EventLoop {
    public:
    // 添加定时器任务
    void AddTimer(uint64_t id, uint32_t delay, const TaskFunc &cb) {
    return _timer_wheel.AddTimer(id, delay, cb);
    }

    // 刷新定时器
    void RefreshTimer(uint64_t id) {
    return _timer_wheel.RefreshTimer(id);
    }

    // 取消定时器
    void CancelTimer(uint64_t id) {
    return _timer_wheel.CancelTimer(id);
    }

    private:
    TimerWheel _timer_wheel; // 定时器轮
    };

    TimerWheel 类关键函数

    class TimerWheel {
    public:
    // 添加定时任务(供 EventLoop 调用)
    void AddTimer(uint64_t id, uint32_t delay, const TaskFunc& cb) {
    _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
    }

    // 刷新定时任务
    void RefreshTimer(uint64_t id) {
    _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
    }

    // 取消定时任务
    void CancelTimer(uint64_t id) {
    _loop->RunInLoop(std::bind(&TimerWheel::TimerCanceInLoop, this, id));
    }

    private:
    // 实际添加任务的逻辑
    void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc& cb) {
    PtrTask pt(new TimerTask(id, delay, cb));
    pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
    int pos = (_tick + delay) % _capacity;
    _wheel[pos].push_back(pt);
    _timers[id] = WeakTask(pt);
    }

    // 实际刷新任务的逻辑
    void TimerRefreshInLoop(uint64_t id) {
    auto it = _timers.find(id);
    if (it == _timers.end()) return;
    PtrTask pt = it->second.lock();
    if (!pt) return;
    int remaining = pt->DelayTime();
    int pos = (_tick + remaining) % _capacity;
    _wheel[pos].push_back(pt);
    }

    // 实际取消任务的逻辑
    void TimerCanceInLoop(uint64_t id) {
    auto it = _timers.find(id);
    if (it == _timers.end()) return;
    PtrTask pt = it->second.lock();
    if (pt) pt->Cancel();
    }

    // 移除任务
    void RemoveTimer(uint64_t id) {
    auto it = _timers.find(id);
    if (it != _timers.end()) _timers.erase(it);
    }

    // 执行定时任务
    void RunTimerTask() {
    _tick = (_tick + 1) % _capacity;
    auto& tasks = _wheel[_tick];
    for (auto& task : tasks) {
    if (!task->_canceled) task->_cb(); // 执行回调
    task->_release(); // 释放资源
    }
    tasks.clear(); // 清空当前 tick 的任务
    }

    // 定时器事件回调
    void OnTime() {
    int times = ReadTimerfd();
    for (int i = 0; i < times; ++i) {
    RunTimerTask(); // 执行定时任务
    }
    }

    // 创建 timerfd
    static int CreateTimerfd() {
    int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
    if (timerfd < 0) {
    LOG_ERROR("Create timerfd error");
    abort();
    }
    struct itimerspec itime;
    itime.it_value.tv_sec = 1;
    itime.it_value.tv_nsec = 0;
    itime.it_interval.tv_sec = 1;
    itime.it_interval.tv_nsec = 0;
    timerfd_settime(timerfd, 0, &itime, nullptr);
    return timerfd;
    }

    // 读取 timerfd
    int ReadTimerfd() {
    uint64_t times = 0;
    ssize_t ret = read(_timerfd, &times, sizeof(times));
    if (ret < 0) {
    LOG_ERROR("READ TIMERFD ERROR");
    abort();
    }
    return times;
    }

    private:
    using WeakTask = std::weak_ptr<TimerTask>;
    using PtrTask = std::shared_ptr<TimerTask>;
    const int _capacity;
    int _tick;
    int _timerfd;
    std::unique_ptr<Channel> _timer_channel;
    EventLoop* _loop;
    std::vector<std::vector<PtrTask>> _wheel; // 定时器轮
    std::unordered_map<uint64_t, WeakTask> _timers; // ID 映射
    };

    4. 代码测试

    server.cpp

    #include "../../source/server.hpp"

    void HandleClose(Channel *channel){
    LOG_DEBUG("close fd: %d", channel->Fd());
    channel->Remove(); // 移除事件
    delete channel; // 释放内存
    }

    void HandleRead(Channel *channel){
    int fd = channel->Fd();
    char buf[1024] = {0};
    ssize_t ret = recv(fd, buf, 1023, 0);
    if(ret <= 0){
    return HandleClose(channel); // 关闭事件
    }
    LOG_DEBUG("Read: %s", buf);
    channel->EnableWrite(); // 启动可写事件
    }

    void HandleWrite(Channel *channel){
    int fd = channel->Fd();
    const char *data = "I miss You";
    ssize_t ret = send(fd, data, strlen(data), 0);
    if(ret < 0){
    return HandleClose(channel); // 关闭事件
    }
    channel->DisableWrite(); // 关闭可写事件
    }

    void HandleError(Channel *channel){
    return HandleClose(channel);
    }
    void HandlEvent(EventLoop* loop, Channel *channel, uint64_t timerid){
    loop->RefreshTimer(timerid); // 刷新定时器
    }

    void Acceptor(EventLoop* loop, Channel *lst_channel)
    {
    int fd = lst_channel->Fd();
    int newfd = accept(fd, nullptr, nullptr);
    if(newfd < 0) return;

    uint64_t timerid = rand() % 1000;
    Channel *channel = new Channel(loop, newfd);
    channel->SetReadCallback(std::bind(HandleRead, channel)); // 为通信套接字设置可读事件回调函数
    channel->SetWriteCallback(std::bind(HandleWrite, channel)); // 可写事件的回调函数
    channel->SetCloseCallback(std::bind(HandleClose, channel)); // 关闭事件的回调函数
    channel->SetErrorCallback(std::bind(HandleError, channel)); // 错误事件的回调函数
    channel->SetEventCallback(std::bind(HandlEvent, loop, channel, timerid)); // 任意事件的回调函数

    // 非活跃连接的超时释放操作 — 5s 后关闭
    // 注意: 定时销毁任务必须在启动读事件之前, 因为读事件会启动可写事件, 但这个时候还没有任务
    loop->AddTimer(timerid, 5, std::bind(HandleClose, channel));
    channel->EnableRead(); // 监听读事件

    }

    int main()
    {
    srand(time(nullptr)); // 随机数种子
    EventLoop loop;
    Socket lst_sock;
    lst_sock.CreateServer(8080);

    // 为监听套接字, 创建一个 Channel 进行事件的管理及处理
    Channel channel(&loop, lst_sock.Fd());
    channel.SetReadCallback(std::bind(Acceptor, &loop, &channel)); // 设置监听套接字的可读事件回调函数
    channel.EnableRead();

    while(1){
    loop.Start(); // 事件循环
    }
    lst_sock.Close();
    return 0;
    }

    client.cpp

    #include "../../source/server.hpp"

    int main()
    {
    Socket cli_sock;
    cli_sock.CreateClient(8080, "127.0.0.1");

    for(int i = 0; i < 3; ++i){
    std::string str = "Hello IsLand";
    cli_sock.Send(str.c_str(), str.size());
    char buf[1024] = {0};
    cli_sock.Recv(buf, 1023);
    LOG_DEBUG("%s", buf);
    sleep(1);
    }
    while(1) sleep(1);
    return 0;
    }

    结果如下:

    lighthouse@VM810ubuntu:Test4$ ./client
    20250505 22:53:49 [tcp_cli.cc:13] I miss You
    20250505 22:53:50 [tcp_cli.cc:13] I miss You
    20250505 22:53:51 [tcp_cli.cc:13] I miss You
    ^C
    lighthouse@VM810ubuntu:Test4$ ./client
    20250505 22:54:00 [tcp_cli.cc:13] I miss You
    20250505 22:54:01 [tcp_cli.cc:13] I miss You
    20250505 22:54:02 [tcp_cli.cc:13] I miss You
    ^C

    lighthouse@VM810ubuntu:Test4$ ./server
    20250505 22:53:49 [tcp_srv.cc:16] Read: Hello IsLand
    20250505 22:53:50 [tcp_srv.cc:16] Read: Hello IsLand
    20250505 22:53:51 [tcp_srv.cc:16] Read: Hello IsLand
    20250505 22:53:55 [tcp_srv.cc:4] close fd: 7
    20250505 22:54:00 [tcp_srv.cc:16] Read: Hello IsLand
    20250505 22:54:01 [tcp_srv.cc:16] Read: Hello IsLand
    20250505 22:54:02 [tcp_srv.cc:16] Read: Hello IsLand
    20250505 22:54:04 [tcp_srv.cc:4] close fd: 7
    ^C

    5. 细节分析

    细节1:定时器任务中异步执行回调

    由于我之前在实现 TimeWheel 代码是这样写的,如下:

    ~TimerTask() {
    if (!_canceled) {
    std::thread(_cb).detach(); // ❌ 异步执行回调
    }
    _release();
    }

    这个写法会导致定时器回调(如 HandleClose)在子线程中执行 ,而不是在 EventLoop 所属的线程中执行,然后就出现了如下的问题:

    Assertion `IsInLoop()' failed.
    Aborted (core dumped)

    根本原因 是:在非事件循环线程中调用了 RemoveEvent 或 Channel::Remove() ,而 EventLoop 的所有操作都要求必须在事件循环线程中执行(通过 assert(IsInLoop()) 检查)

    分析

  • HandleClose 函数内部调用了:channel->Remove(); // 会调用 EventLoop::RemoveEvent
  • 此时 RemoveEvent 中有断言:assert(IsInLoop()); // 检查当前线程是否是事件循环线程
  • 由于回调在子线程中执行,断言失败,程序崩溃
  • 细节2:服务器端关闭再启动的文件描述符(fd)不变

    上面我们演示的时候,可以发现,当我们服务器端关闭再启动之后 fd 并没有发生改变,如下:

    2025-05-05 22:53:55 [tcp_srv.cc:4] close fd: 7
    2025-05-05 22:54:04 [tcp_srv.cc:4] close fd: 7

    Linux 系统中,文件描述符的分配遵循 “最小可用原则” ,即:

    • 总是分配当前进程中最小的未被占用的整数 fd
    • 当一个 fd 被关闭后,它会被标记为“可重用”,下次分配新文件或 socket 时会优先使用这些被释放的 fd

    分析

  • 客户端连接流程
    • 每次运行 client,都会创建一个新的 socket,系统返回一个可用的 fd。
    • 由于服务器端在连接关闭时 主动关闭了 socket 并释放了 fd ,因此下一次客户端连接时,系统会优先使用刚刚释放的 fd(例如 7)。
  • 服务器端处理连接的方式
    • 在 Acceptor 函数中,每当有新连接到来时:

      int newfd = accept(fd, nullptr, nullptr);
      Channel *channel = new Channel(loop, newfd);

    • newfd 是系统分配的文件描述符

    • 如果前一个连接的 newfd 刚好是 7,并且已经被关闭(close(7)),那么下一个新连接就会再次分配 7

  • Channel 的析构行为
    • 你定义的 Channel 类析构函数会 主动关闭 fd :

      ~Channel(){
      if (_fd != 1) {
      close(_fd);
      _fd = 1;
      }
      }

    • 这意味着每次连接关闭时,Channel 对象被销毁时会调用 close(fd),从而释放该 fd

    • 释放后,系统可以再次分配该 fd 给新的连接

    补充(关于这种做法的意义)

    • 无需担心 fd 复用问题 :只要每次连接关闭时正确调用 close(fd),系统会安全地回收和复用 fd
    • 避免 fd 泄漏 :确保所有连接关闭时都正确删除 Channel 对象,防止 fd 被占用不释放
    细节3:Channel 类中的 Remove 和 Update 方法为何调用 EventLoop 的接口?

    问题本质:模块职责分离 回答要点:

    • 职责分离 :Channel 仅负责事件注册,Poller 负责底层 I/O 事件监控。

    • 统一管理 :通过 EventLoop 统一管理事件增删改,确保事件状态一致性。

    • 示例代码 :

      void Channel::Remove() { return _loop->RemoveEvent(this); }
      void Channel::Update() { return _loop->UpdateEvent(this); }

    细节4:如何避免定时器任务的重复添加?

    问题本质:资源泄漏与逻辑错误 回答要点:

    • HasTimer 检查 :在添加定时任务前调用 HasTimer(id) 避免重复。
    • 刷新替代新增 :若定时任务已存在,调用 RefreshTimer(id) 延迟销毁时间。
    • 线程安全 :所有定时器操作通过 EventLoop 串行化执行。

    在这里插入图片描述

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 【C++仿Muduo库#3】Server 服务器模块实现上
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!