云计算百科
云计算领域专业知识百科平台

网络和Linux网络-15(IO多路转接)reactor编程-服务器

1. reactor的服务器

1.0 reactor服务器介绍

        Reactor是一种事件驱动的高性能网络模型,核心是通过单线程/少量线程管理大量并发连接。其核心组件包括:

  • I/O多路复用器(如epoll):监听多个文件描述符(fd)的事件(读、写、异常)。

  • 事件分发器:将就绪事件分发给对应的处理器。

  • 事件处理器:执行具体的I/O操作和业务逻辑

下面实现一个基于epoll的Reactor服务器,主要分为两部分:

  • Epoll类:封装epoll系统调用(创建、添加、修改、删除fd)。

  • TcpServer类:实现Reactor核心逻辑(事件循环、回调机制、连接管理)。

关键组件解析

1. Epoll类

  • 功能:封装Linux的epoll接口,提供对fd的增删改查。

  • 核心方法:

    • CreateEpoll():创建epoll实例。

    • AddSockToEpoll():添加fd到epoll监听(默认监听读事件)。

    • CtrlEpoll():修改fd的监听事件(如开启写事件)。

    • WaitEpoll():阻塞等待就绪事件(核心事件循环调用)。

2. Connection类

  • 功能:代表一个客户端连接,管理其状态和缓冲区。
  • 核心成员:
    • _sock:连接对应的文件描述符。

    • _inbuffer/_outbuffer:输入/输出缓冲区(存储未处理的数据或待发送的数据)。

    • 回调函数:

      • _recv_cb:读事件回调(如接收数据)。

      • _send_cb:写事件回调(如发送数据)。

      • _except_cb:异常回调(如连接断开)。

3. TcpServer类

  • 功能:Reactor的核心实现,管理监听套接字、连接和事件循环。

  • 核心流程:

  • 初始化:

  • 创建监听套接字(_listensock)并添加到epoll。

  • 设置Accepter为_listensock的读回调(处理新连接)。

  • 事件循环:

    • 调用WaitEpoll()获取就绪事件。

    • 根据事件类型(EPOLLIN/EPOLLOUT)触发对应的回调(如Recver或Sender)。

  • 回调函数:

    • Accepter:接受新连接,创建Connection对象并注册到epoll。

    • Recver:非阻塞读取数据到_inbuffer,解析后调用业务回调_cb。

    • Sender:非阻塞发送_outbuffer中的数据,按需开启/关闭写事件监听。

    • Excepter:处理连接异常(移除fd、释放资源)。

Reactor工作流程

  • 启动服务器:

    • 创建epoll实例,监听_listensock的读事件。

  • 接受新连接:

    • 当_listensock就绪时,Accepter被调用,通过accept获取新连接,注册到epoll。

  • 处理I/O事件:

    • 读事件:Recver读取数据到缓冲区,解析后交给业务逻辑(_cb)。

    • 写事件:Sender发送_outbuffer中的数据,发完后关闭写事件监听(避免忙等待)。

  • 异常处理:

    • 连接出错或关闭时,Excepter清理资源(移除fd、关闭socket)

  • Log.hpp和以前一样,因为下面要写ET模式所以Sock.hpp加了一个把sock设置成非阻塞的函数:(要#include <fcntl.h>)

    写到TcpServer.hpp的Accepter函数再改一下Sock.hpp的Accept:(加一个输出错误码的参数)

    1.1 Sock.hpp

    #pragma once

    #include <iostream>
    #include <string>
    #include <cstring>
    #include <cerrno>
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <arpa/inet.h>
    #include <netinet/in.h>
    #include <ctype.h>
    #include "Log.hpp"
    #include <fcntl.h>

    class Sock
    {
    private:
    const static int gbacklog = 20; // listen的第二个参数,现在先不管
    public:
    Sock()
    {}
    ~Sock()
    {}
    static int Socket()
    {
    int listensock = socket(AF_INET, SOCK_STREAM, 0); // 域 + 类型 + 0 // UDP第二个参数是SOCK_DGRAM
    if (listensock < 0)
    {
    logMessage(FATAL, "create socket error, %d:%s", errno, strerror(errno));
    exit(2);
    }
    int opt = 1;
    setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
    logMessage(NORMAL, "create socket success, listensock: %d", listensock);
    return listensock;
    }

    static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
    {
    struct sockaddr_in local;
    memset(&local, 0, sizeof local);
    local.sin_family = AF_INET;
    local.sin_port = htons(port);
    inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
    if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
    {
    logMessage(FATAL, "bind error, %d:%s", errno, strerror(errno));
    exit(3);
    }
    }

    static void Listen(int sock)
    {
    if (listen(sock, gbacklog) < 0)
    {
    logMessage(FATAL, "listen error, %d:%s", errno, strerror(errno));
    exit(4);
    }
    logMessage(NORMAL, "init server success");
    }

    // 一般情况下:
    // const std::string &: 输入型参数
    // std::string *: 输出型参数
    // std::string &: 输入输出型参数
    static int Accept(int listensock, std::string *ip, uint16_t *port, int *accept_errno = nullptr)
    {
    struct sockaddr_in src;
    socklen_t len = sizeof(src);
    *accept_errno = 0;
    int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
    if (servicesock < 0)
    {
    *accept_errno = errno;
    // logMessage(ERROR, "accept error, %d:%s", errno, strerror(errno));
    return -1;
    }
    if (port)
    *port = ntohs(src.sin_port);
    if (ip)
    *ip = inet_ntoa(src.sin_addr);
    return servicesock;
    }

    static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
    {
    struct sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = htons(server_port);
    server.sin_addr.s_addr = inet_addr(server_ip.c_str());

    if (connect(sock, (struct sockaddr *)&server, sizeof(server)) == 0)
    return true;
    else
    return false;
    }

    static bool SetNonBlock(int sock)
    {
    int fl = fcntl(sock, F_GETFL);
    if(fl < 0)
    return false;
    fcntl(sock, F_SETFL, fl | O_NONBLOCK);
    return true;
    }
    };

    下面直接放一部分TcpServer.hpp代码跟着注释看:(建议复制到VSCode里看)

    #pragma once

    #include <iostream>
    #include <functional>
    #include <string>
    #include <unordered_map>
    #include "Sock.hpp"
    #include "Log.hpp"
    #include "Epoll.hpp"

    class TcpServer;
    class Connection;

    using func_t = std::function<void(Connection *)>;
    // 为了能够正常工作,常规的sock必须要有独立的接收缓冲区和发送缓冲区(写入)
    class Connection // 一个链接类
    {
    public:
    Connection(int sock = -1)
    : _sock(sock), _tsvr(nullptr)
    {}
    void SetCallBack(func_t recv_cb, func_t send_cb, func_t except_cb)
    { // 设置三个回调方法
    _recv_cb = recv_cb;
    _send_cb = send_cb;
    _except_cb = except_cb;
    }
    ~Connection()
    {}
    public:
    int _sock; // 负责进行IO的文件描述符
    func_t _recv_cb; // 三个回调方法,是对_sock进行特定读写的对应方法
    func_t _send_cb;
    func_t _except_cb;

    std::string _inbuffer; // 接收缓冲区&&发送缓冲区
    std::string _outbuffer; // 这两string暂时没有办法处理二进制流,文本是可以的

    TcpServer *_tsvr; // 设置对TcpServer的回指指针,对写事件的关心是按需打开
    };

    class TcpServer
    {
    const static int gport = 8080;
    const static int gnum = 128;
    public:
    TcpServer(int port = gport)
    : _port(port), _revs_num(gnum)
    {
    // 1. 创建listensock
    _listensock = Sock::Socket();
    Sock::Bind(_listensock, _port);
    Sock::Listen(_listensock);

    // 2. 创建多路转接对象
    _poll.CreateEpoll();

    // 3. 添加listensock到服务器中 -> 三步(类的构造函数也能调用类的成员方法,走到函数体中对象已经存在了)
    // 后三个参数是函数对象,要bind绑定返回一个函数对象->类内函数有this指针,_1是预留的参数
    AddConnection(_listensock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);

    // 4. 构建一个获取就绪事件的缓冲区
    _revs = new struct epoll_event[_revs_num];
    }
    void AddConnection(int sock, func_t recv_cb, func_t send_cb, func_t except_cb) // 把任意sock进行添加到TcpServer
    {
    Sock::SetNonBlock(sock); // ET模式要把sock设置成非阻塞 -> 在Sock.hpp中写成函数

    // 除了_listensock,后面还会存在大量的socket,每一个sock都必须被封装成为一个Connection
    // 当服务器中存在大量的Connection时,TcpServer需要将所有Connection进行管理:上面描述了,组织 -> unordered_map
    // 3.1 构建conn对象,封装sock
    Connection *conn = new Connection(sock);
    conn->SetCallBack(recv_cb, send_cb, except_cb);
    conn->_tsvr = this;

    // 3.2 添加sock到epoll中(任务通知)->要知道sock和事件(任何多路转接的服务器,一般只会打开读取事件,写入事件按需打开)
    _poll.AddSockToEpoll(sock, EPOLLIN | EPOLLET);

    // 3.3 将对应的Connection*对象指针添加到Connections映射表中(业务处理)
    _connections.insert(std::make_pair(sock, conn));
    }

    void Accepter(Connection *conn)
    {
    logMessage(DEBUG, "Accepter been called");
    }

    void Dispather() // 根据就绪的事件,进行特定事件的派发
    {
    while (true)
    {
    LoopOnce();
    }
    }
    void LoopOnce()
    {
    int n = _poll.WaitEpoll(_revs, _revs_num);
    for (int i = 0; i < n; i++) // 获取事件
    {
    int sock = _revs[i].data.fd;
    uint32_t revents = _revs[i].events;
    if (revents & EPOLLIN) // 读就绪
    {
    // if(Connection是存在并且_connections[sock]->_recv_cb被设置过)
    if (IsConnectionExists(sock) && _connections[sock]->_recv_cb != nullptr)
    _connections[sock]->_recv_cb(_connections[sock]); // 调用读事件的回调
    }
    if (revents & EPOLLOUT) // 写就绪
    {
    // if(Connection是存在并且_connections[sock]->_send_cb被设置过)
    if (IsConnectionExists(sock) && _connections[sock]->_send_cb != nullptr)
    _connections[sock]->_send_cb(_connections[sock]); // 调用写事件的回调
    }
    }
    }
    bool IsConnectionExists(int sock) // 判定Connection是否存在
    {
    auto iter = _connections.find(sock);
    if (iter == _connections.end())
    return false;
    else
    return true;
    }
    ~TcpServer()
    {
    if (_listensock >= 0)
    close(_listensock);
    if (_revs)
    delete[] _revs;
    }
    private:
    int _listensock;
    int _port;
    Epoll _poll;
    std::unordered_map<int, Connection *> _connections; // 管理:sock映射到Connection
    struct epoll_event *_revs; // 就绪事件缓冲区,就绪的文件描述符投递到这里
    int _revs_num; // 就绪事件缓冲区大小
    };

    编译运行:

    此时成功调用了Accepter,因为是ET模式,所以是阻塞的,事件没处理也没有连续打印。

    写一下Accepter再测试一下:

    void Accepter(Connection *conn)
    {
    // logMessage(DEBUG, "Accepter been called");
    // 一定是listensock已经就绪了,此次读取不会阻塞,
    // 怎么保证,底层只有一个连接就绪呢 -> 循环,直到获取失败
    while (true)
    {
    std::string clientip;
    uint16_t clientport;
    int accept_errno = 0;
    // sock一定是常规的IO sock
    int sock = Sock::Accept(conn->_sock, &clientip, &clientport, &accept_errno);
    if (sock < 0) // 获取失败
    {
    if (accept_errno == EAGAIN || accept_errno == EWOULDBLOCK)
    break;
    else if (accept_errno == EINTR) // 概率非常低
    continue;
    else // accept失败
    {
    logMessage(WARNING, "accept error, %d : %s", accept_errno, strerror(accept_errno));
    break;
    }
    }
    else // (sock>=0)获取链接成功->将sock托管给TcpServer
    {
    AddConnection(sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1),
    std::bind(&TcpServer::Sender, this, std::placeholders::_1),
    std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
    logMessage(DEBUG, "accept client %s:%d success, add to epoll&&TcpServer success, sock: %d",\\
    clientip.c_str(), clientport, sock);
    }
    }
    }

    void Recver(Connection *conn) // 读取一个正常的sock
    {
    logMessage(DEBUG, "Recver event exists, Recver() been called");
    }

    void Sender(Connection *conn)
    {
    }

    void Excepter(Connection *conn)
    {
    }

    成功获取到读取事件,下面来处理一下:

    先写Recver的第一个版本:直接面向字节流,进行常规读取:

    void Recver(Connection *conn) // 读取一个正常的sock
    {
    // logMessage(DEBUG, "Recver event exists, Recver() been called");
    // v1: 直接面向字节流,先进行常规读取
    const int num = 1024;
    while (true)
    {
    char buffer[num];
    ssize_t n = recv(conn->_sock, buffer, sizeof(buffer) – 1, 0);
    if (n < 0)
    {
    if (errno == EAGAIN || errno == EWOULDBLOCK) // 读取完毕了,正常的
    break;
    else if (errno == EINTR) // 读取被中断了,重新开始读
    continue;
    else // 真正读取失败 -> 交给异常回调
    {
    logMessage(ERROR, "recv error, %d : %s", errno, strerror(errno));
    conn->_except_cb(conn);
    break;
    }
    }
    else if (n == 0)
    {
    logMessage(DEBUG, "client[%d] quit, server close [%d]", conn->_sock, conn->_sock);
    conn->_except_cb(conn);
    break;
    }
    else // 读取成功
    {
    buffer[n] = 0;
    conn->_inbuffer += buffer; // 读取到的数据全部拼接到接收缓冲区
    }
    }
    logMessage(DEBUG, "conn->_inbuffer[sock: %d]: %s", conn->_sock, conn->_inbuffer.c_str());
    }

    每个服务端都有自己的接收缓冲区,互不影响(这里回车也被输入进去了只是telnet的原因,这里不写客户端了就这么用了),但还是那句话:怎么保证你读到的是一个完整的报文呢?->就要定制协议了,写一个Protocol.hpp:

    1.2 加协议分割报文

    在前面加上这行:

    加个类内成员:

    Dispather:

    void Dispather(callback_t cb) // 根据就绪的事件,进行特定事件的派发
    {
    _cb = cb;
    while (true)
    {
    LoopOnce();
    }
    }

    main.cc:

    #include "TcpServer.hpp"
    #include <memory>

    void NetCal(Connection *conn, std::string &request)
    {
    logMessage(DEBUG, "NetCal been called, get request: %s", request.c_str());
    }

    int main()
    {
    std::unique_ptr<TcpServer> svr(new TcpServer());
    svr->Dispather(NetCal);

    return 0;
    }

    改进的Recver:

    void Recver(Connection *conn) // 读取一个正常的sock
    {
    const int num = 1024;
    bool err = false;
    // logMessage(DEBUG, "Recver event exists, Recver() been called");
    while (true)
    {
    char buffer[num];
    ssize_t n = recv(conn->_sock, buffer, sizeof(buffer) – 1, 0);
    if (n < 0)
    {
    if (errno == EAGAIN || errno == EWOULDBLOCK) // 读取完毕了,正常的
    break;
    else if (errno == EINTR) // 读取被中断了,重新开始读
    continue;
    else // 真正读取失败 -> 交给异常回调
    {
    logMessage(ERROR, "recv error, %d : %s", errno, strerror(errno));
    conn->_except_cb(conn);
    err = true;
    break;
    }
    }
    else if (n == 0)
    {
    logMessage(DEBUG, "client[%d] quit, server close [%d]", conn->_sock, conn->_sock);
    conn->_except_cb(conn);
    err = true;
    break;
    }
    else // 读取成功
    {
    buffer[n] = 0;
    conn->_inbuffer += buffer; // 读取到的数据全部拼接到接收缓冲区
    }
    }
    logMessage(DEBUG, "conn->_inbuffer[sock: %d]: %s", conn->_sock, conn->_inbuffer.c_str());
    if (!err) // 如果错误码还是false就是正常break的
    {
    std::vector<std::string> messages;
    SpliteMessage(conn->_inbuffer, &messages);
    // 保证走到这里,就是一个完整报文
    for (auto &msg : messages)
    { // 可以在这里将message封装成为task,然后push到任务队列,任务处理交给后端线程池,这里不处理
    _cb(conn, msg);
    }
    }
    }

    一部分Protocol.hpp:(这里用大写X作为切分)

    #pragma once

    #include <iostream>
    #include <cstring>
    #include <string>
    #include <vector>

    // 1. 报文和报文之间,我们采用特殊字符来进行解决粘包问题
    // 2. 获取一个一个独立完整的报文,序列和反序列化 — 自定义
    // 100+19X100+19X100+19
    #define SEP "X"
    #define SEP_LEN strlen(SEP)

    // 要把传入进来的缓冲区进行切分,要求:
    // 1. buffer被切走的,也同时要从buffer中移除
    // 2. 可能会存在多个报文,多个报文依次放入out
    void SpliteMessage(std::string &buffer, std::vector<std::string> *out) // 分割报文
    { // buffer: 输入输出型参数,out: 输出型参数
    while (true)
    {
    auto pos = buffer.find(SEP); // 在缓冲区里找分隔符
    if (std::string::npos == pos) // 找不到就break
    break;
    std::string message = buffer.substr(0, pos); // 提取子串:前闭后开区间
    buffer.erase(0, pos + SEP_LEN); // 移除子串和衡娥福
    out->push_back(message); // push_back完整的子串
    // std::cout << "debug: " << message << " : " << buffer << std::endl;
    // sleep(1);
    }
    }


    1.3 序列化和反序列化

    把以前自己写的序列化和反序列化复制到Protocol.hpp:

    Protocol.hpp

    #pragma once

    #include <iostream>
    #include <cstring>
    #include <string>
    #include <vector>

    // 1. 报文和报文之间,我们采用特殊字符来进行解决粘包问题
    // 2. 获取一个一个独立完整的报文,序列和反序列化 — 自定义
    // 100+19X100+19X100+19
    #define SEP "X"
    #define SEP_LEN strlen(SEP)

    // 要把传入进来的缓冲区进行切分,要求:
    // 1. buffer被切走的,也同时要从buffer中移除
    // 2. 可能会存在多个报文,多个报文依次放入out
    void SpliteMessage(std::string &buffer, std::vector<std::string> *out) // 分割报文
    { // buffer: 输入输出型参数,out: 输出型参数
    while (true)
    {
    auto pos = buffer.find(SEP); // 在缓冲区里找分隔符
    if (std::string::npos == pos) // 找不到就break
    break;
    std::string message = buffer.substr(0, pos); // 提取子串:前闭后开区间
    buffer.erase(0, pos + SEP_LEN); // 移除子串和衡娥福
    out->push_back(message); // push_back完整的子串
    // std::cout << "debug: " << message << " : " << buffer << std::endl;
    // sleep(1);
    }
    }

    // 自己手写序列反序列化
    #define SPACE " "
    #define SPACE_LEN strlen(SPACE)

    std::string Encode(std::string& s)
    {
    return s + SEP;
    }

    class Request
    {
    public:
    std::string Serialize()
    {
    std::string str;
    str = std::to_string(_x);
    str += SPACE;
    str += _op;
    str += SPACE;
    str += std::to_string(_y);
    return str;
    }
    bool Deserialized(const std::string& str) // 1 + 1
    {
    std::size_t left = str.find(SPACE);
    if (left == std::string::npos)
    return false;
    std::size_t right = str.rfind(SPACE);
    if (right == std::string::npos)
    return false;
    _x = atoi(str.substr(0, left).c_str());
    _y = atoi(str.substr(right + SPACE_LEN).c_str());
    if (left + SPACE_LEN > str.size())
    return false;
    else
    _op = str[left + SPACE_LEN];
    return true;
    }

    public:
    Request()
    {}
    Request(int x, int y, char op)
    : _x(x), _y(y), _op(op)
    {}
    ~Request()
    {}
    public:
    int _x;
    int _y;
    char _op; // '+' '-' '*' '/' '%'
    };

    class Response
    {
    public:
    std::string Serialize() // "code_ result_"
    {

    std::string s;
    s = std::to_string(_code);
    s += SPACE;
    s += std::to_string(_result);

    return s;
    }
    bool Deserialized(const std::string& s)
    {
    std::size_t pos = s.find(SPACE);
    if (pos == std::string::npos)
    return false;
    _code = atoi(s.substr(0, pos).c_str());
    _result = atoi(s.substr(pos + SPACE_LEN).c_str());
    return true;
    }
    public:
    Response()
    {}
    Response(int result, int code)
    : _result(result), _code(code)
    {}
    ~Response()
    {}
    public:
    int _result; // 计算结果
    int _code; // 计算结果的状态码
    };

    main.cc

    #include "TcpServer.hpp"
    #include <memory>

    static Response calculator(const Request &req)
    {
    Response resp(0, 0);
    switch (req.op_)
    {
    case '+':
    resp.result_ = req.x_ + req.y_;
    break;
    case '-':
    resp.result_ = req.x_ – req.y_;
    break;
    case '*':
    resp.result_ = req.x_ * req.y_;
    break;
    case '/':
    if (0 == req.y_)
    resp.code_ = 1;
    else
    resp.result_ = req.x_ / req.y_;
    break;
    case '%':
    if (0 == req.y_)
    resp.code_ = 2;
    else
    resp.result_ = req.x_ % req.y_;
    break;
    default:
    resp.code_ = 3;
    break;
    }
    return resp;
    }

    void NetCal(Connection *conn, std::string &request)
    {
    logMessage(DEBUG, "NetCal been called, get request: %s", request.c_str());
    Request req; // 1. 反序列化,1 + 1 2 + 3
    if(!req.Deserialized(request))
    return;

    Response resp = calculator(req); // 2. 业务处理

    std::string sendstr = resp.Serialize(); // 3. 序列化,构建应答
    sendstr = Encode(sendstr);

    conn->_outbuffer += sendstr; // 4. 交给服务器conn

    // 5. 想办法,让底层的TcpServer开始发送 -> 需要有完整的发送逻辑
    // 触发发送的动作,一旦开启EPOLLOUT,epoll会自动立马触发一次发送事件就绪,如果后续保持发送的开启,epoll会一直发送
    conn->_tsvr->EnableReadWrite(conn, true, true); // 写完EnableReadWrite才发现回指指针的作用
    }

    int main()
    {
    std::unique_ptr<TcpServer> svr(new TcpServer());
    svr->Dispather(NetCal);

    return 0;
    }

    Sender函数:

    void Sender(Connection *conn)
    {
    while(true)
    {
    ssize_t n = send(conn->_sock, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
    if(n > 0) // 发送成功 -> 移除
    {
    conn->_outbuffer.erase(0, n);
    if(conn->_outbuffer.empty()) // 发完了 -> break
    break;
    }
    else
    {
    if(errno == EAGAIN || errno == EWOULDBLOCK) // 缓冲区满了 -> break下次再发
    break;
    else if(errno == EINTR) // 发送被中断 -> 重新发送
    continue;
    else // 真正读取失败 -> 交给异常回调
    {
    logMessage(ERROR, "send error, %d : %s", errno, strerror(errno));
    conn->_except_cb(conn);
    break;
    }
    }
    }
    // 不确定有没有发完,但是可以保证,如果没有出错,一定是要么发完,要么发送条件不满足,下次再发
    if(conn->_outbuffer.empty())
    EnableReadWrite(conn, true, false);
    else
    EnableReadWrite(conn, true, true);
    }
    void EnableReadWrite(Connection *conn, bool readable, bool writeable) // 控制读写开关
    {
    uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0));
    bool res = _poll.CtrlEpoll(conn->_sock, events);
    assert(res);
    }

    Epoll.hpp

    加了打开和删除就完整了:

    #pragma once

    #include <iostream>
    #include <sys/epoll.h>

    class Epoll
    {
    const static int gnum = 128;
    const static int gtimeout = 5000;
    public:
    Epoll(int timeout = gtimeout)
    : _timeout(timeout)
    {}
    void CreateEpoll()
    {
    _epfd = epoll_create(gnum);
    if (_epfd < 0)
    exit(5);
    }
    bool DelFromEpoll(int sock) // 移除sock的所有事件
    {
    int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
    return n == 0;
    }
    bool CtrlEpoll(int sock, uint32_t events) // 打开sock的事件
    {
    events |= EPOLLET;
    struct epoll_event ev;
    ev.events = events;
    ev.data.fd = sock;
    int n = epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev);
    return n == 0;
    }
    bool AddSockToEpoll(int sock, uint32_t events)
    { // 添加sock到epoll中(任务通知)->要知道sock和事件(任何多路转接的服务器,一般只会打开读取事件,写入事件按需打开)
    struct epoll_event ev;
    ev.events = events;
    ev.data.fd = sock;
    int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
    return n == 0;
    }
    int WaitEpoll(struct epoll_event revs[], int num)
    {
    return epoll_wait(_epfd, revs, num, _timeout);
    }
    ~Epoll()
    {}

    private:
    int _epfd;
    int _timeout;
    };

    Excepter函数:

    void Excepter(Connection *conn)
    {
    if(!IsConnectionExists(conn->_sock)) // _sock不存在就返回
    return;
    bool res = _poll.DelFromEpoll(conn->_sock); // 1. 从epoll中移除
    assert(res);

    _connections.erase(conn->_sock); // 2. 从unorder_map中移除

    close(conn->_sock); // 3. 关闭sock

    delete conn; // 4. 释放 conn;
    logMessage(DEBUG, "Excepter 回收完毕所有的异常情况");
    }

    编译运行:

    此时代码就结束了,可以自己拓展一下。下面放一下完整的TcpServer.hpp:

    TcpServer.hpp

    #pragma once

    #include <iostream>
    #include <functional>
    #include <string>
    #include <cassert>
    #include <unordered_map>
    #include <vector>
    #include "Sock.hpp"
    #include "Log.hpp"
    #include "Epoll.hpp"
    #include "Protocol.hpp"

    class TcpServer;
    class Connection;

    using func_t = std::function<void(Connection *)>;
    using callback_t = std::function<void (Connection*, std::string &request)>; // 上层业务处理的方法

    // 为了能够正常工作,常规的sock必须要有独立的接收缓冲区和发送缓冲区(写入)
    class Connection // 一个链接类
    {
    public:
    Connection(int sock = -1)
    : _sock(sock), _tsvr(nullptr)
    {
    }
    void SetCallBack(func_t recv_cb, func_t send_cb, func_t except_cb)
    { // 设置三个回调方法
    _recv_cb = recv_cb;
    _send_cb = send_cb;
    _except_cb = except_cb;
    }
    ~Connection()
    {
    }

    public:
    int _sock; // 负责进行IO的文件描述符
    func_t _recv_cb; // 三个回调方法,是对_sock进行特定读写的对应方法
    func_t _send_cb;
    func_t _except_cb;

    std::string _inbuffer; // 接收缓冲区&&发送缓冲区
    std::string _outbuffer; // 这两string暂时没有办法处理二进制流,文本是可以的

    TcpServer *_tsvr; // 设置对TcpServer的回指指针,对写事件的关心是按需打开
    };

    class TcpServer
    {
    const static int gport = 8080;
    const static int gnum = 128;

    public:
    TcpServer(int port = gport)
    : _port(port), _revs_num(gnum)
    {
    // 1. 创建listensock
    _listensock = Sock::Socket();
    Sock::Bind(_listensock, _port);
    Sock::Listen(_listensock);

    // 2. 创建多路转接对象
    _poll.CreateEpoll();

    // 3. 添加listensock到服务器中 -> 三步(类的构造函数也能调用类的成员方法,走到函数体中对象已经存在了)
    // 后三个参数是函数对象,要bind绑定返回一个函数对象->类内函数有this指针,_1是预留的参数
    AddConnection(_listensock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);

    // 4. 构建一个获取就绪事件的缓冲区
    _revs = new struct epoll_event[_revs_num];
    }
    void AddConnection(int sock, func_t recv_cb, func_t send_cb, func_t except_cb) // 把任意sock进行添加到TcpServer
    {
    Sock::SetNonBlock(sock); // ET模式要把sock设置成非阻塞 -> 在Sock.hpp中写成函数

    // 除了_listensock,后面还会存在大量的socket,每一个sock都必须被封装成为一个Connection
    // 当服务器中存在大量的Connection时,TcpServer需要将所有Connection进行管理:上面描述了,组织 -> unordered_map
    // 3.1 构建conn对象,封装sock
    Connection *conn = new Connection(sock);
    conn->SetCallBack(recv_cb, send_cb, except_cb);
    conn->_tsvr = this;

    // 3.2 添加sock到epoll中(任务通知)->要知道sock和事件(任何多路转接的服务器,一般只会打开读取事件,写入事件按需打开)
    _poll.AddSockToEpoll(sock, EPOLLIN | EPOLLET);

    // 3.3 将对应的Connection*对象指针添加到Connections映射表中(业务处理)
    _connections.insert(std::make_pair(sock, conn));
    }

    void Accepter(Connection *conn)
    {
    // logMessage(DEBUG, "Accepter been called");
    // 一定是listensock已经就绪了,此次读取不会阻塞,
    // 怎么保证,底层只有一个连接就绪呢 -> 循环,直到获取失败
    while (true)
    {
    std::string clientip;
    uint16_t clientport;
    int accept_errno = 0;
    // sock一定是常规的IO sock
    int sock = Sock::Accept(conn->_sock, &clientip, &clientport, &accept_errno);
    if (sock < 0) // 获取失败
    {
    if (accept_errno == EAGAIN || accept_errno == EWOULDBLOCK)
    break;
    else if (accept_errno == EINTR) // 概率非常低
    continue;
    else // accept失败
    {
    logMessage(WARNING, "accept error, %d : %s", accept_errno, strerror(accept_errno));
    break;
    }
    }
    else // (sock>=0)获取链接成功->将sock托管给TcpServer
    {
    AddConnection(sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1),
    std::bind(&TcpServer::Sender, this, std::placeholders::_1),
    std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
    logMessage(DEBUG, "accept client %s:%d success, add to epoll&&TcpServer success, sock: %d",
    clientip.c_str(), clientport, sock);
    }
    }
    }

    void Recver(Connection *conn) // 读取一个正常的sock
    {
    const int num = 1024;
    bool err = false;
    // logMessage(DEBUG, "Recver event exists, Recver() been called");
    while (true)
    {
    char buffer[num];
    ssize_t n = recv(conn->_sock, buffer, sizeof(buffer) – 1, 0);
    if (n < 0)
    {
    if (errno == EAGAIN || errno == EWOULDBLOCK) // 读取完毕了,正常的
    break;
    else if (errno == EINTR) // 读取被中断了,重新开始读
    continue;
    else // 真正读取失败 -> 交给异常回调
    {
    logMessage(ERROR, "recv error, %d : %s", errno, strerror(errno));
    conn->_except_cb(conn);
    err = true;
    break;
    }
    }
    else if (n == 0)
    {
    logMessage(DEBUG, "client[%d] quit, server close [%d]", conn->_sock, conn->_sock);
    conn->_except_cb(conn);
    err = true;
    break;
    }
    else // 读取成功
    {
    buffer[n] = 0;
    conn->_inbuffer += buffer; // 读取到的数据全部拼接到接收缓冲区
    }
    }
    logMessage(DEBUG, "conn->_inbuffer[sock: %d]: %s", conn->_sock, conn->_inbuffer.c_str());
    if (!err) // 如果错误码还是false就是正常break的
    {
    std::vector<std::string> messages;
    SpliteMessage(conn->_inbuffer, &messages);
    // 保证走到这里,就是一个完整报文
    for (auto &msg : messages)
    { // 可以在这里将message封装成为task,然后push到任务队列,任务处理交给后端线程池,这里不处理
    _cb(conn, msg);
    }
    }
    }

    void Sender(Connection *conn)
    {
    while(true)
    {
    ssize_t n = send(conn->_sock, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
    if(n > 0) // 发送成功 -> 移除
    {
    conn->_outbuffer.erase(0, n);
    if(conn->_outbuffer.empty()) // 发完了 -> break
    break;
    }
    else
    {
    if(errno == EAGAIN || errno == EWOULDBLOCK) // 缓冲区满了 -> break下次再发
    break;
    else if(errno == EINTR) // 发送被中断 -> 重新发送
    continue;
    else // 真正读取失败 -> 交给异常回调
    {
    logMessage(ERROR, "send error, %d : %s", errno, strerror(errno));
    conn->_except_cb(conn);
    break;
    }
    }
    }
    // 不确定有没有发完,但是可以保证,如果没有出错,一定是发完了,或者发送条件不满足,下次再发
    if(conn->_outbuffer.empty()) // 发完了->不用关心写
    EnableReadWrite(conn, true, false);
    else // 发送条件不满足,下次再发
    EnableReadWrite(conn, true, true);
    }
    void EnableReadWrite(Connection *conn, bool readable, bool writeable) // 控制读写开关
    { // 下面的三目:如readable为真就关心读事件,否则为0,writeable就关心写事件
    uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0));
    bool res = _poll.CtrlEpoll(conn->_sock, events);
    assert(res);
    }

    void Excepter(Connection *conn)
    {
    if(!IsConnectionExists(conn->_sock)) // _sock不存在就返回
    return;
    bool res = _poll.DelFromEpoll(conn->_sock); // 1. 从epoll中移除
    assert(res);

    _connections.erase(conn->_sock); // 2. 从unorder_map中移除

    close(conn->_sock); // 3. 关闭sock

    delete conn; // 4. 释放 conn;
    logMessage(DEBUG, "Excepter 回收完毕所有的异常情况");
    }

    void Dispather(callback_t cb) // 根据就绪的事件,进行特定事件的派发
    {
    _cb = cb;
    while (true)
    {
    LoopOnce();
    }
    }
    void LoopOnce()
    {
    int n = _poll.WaitEpoll(_revs, _revs_num);
    for (int i = 0; i < n; i++) // 获取事件
    {
    int sock = _revs[i].data.fd;
    uint32_t revents = _revs[i].events;
    if (revents & EPOLLIN) // 读就绪
    {
    // if(Connection是存在并且_connections[sock]->_recv_cb被设置过)
    if (IsConnectionExists(sock) && _connections[sock]->_recv_cb != nullptr)
    _connections[sock]->_recv_cb(_connections[sock]); // 调用读事件的回调
    }
    if (revents & EPOLLOUT) // 写就绪
    {
    // if(Connection是存在并且_connections[sock]->_send_cb被设置过)
    if (IsConnectionExists(sock) && _connections[sock]->_send_cb != nullptr)
    _connections[sock]->_send_cb(_connections[sock]); // 调用写事件的回调
    }
    }
    }
    bool IsConnectionExists(int sock) // 判定Connection是否存在
    {
    auto iter = _connections.find(sock);
    if (iter == _connections.end())
    return false;
    else
    return true;
    }
    ~TcpServer()
    {
    if (_listensock >= 0)
    close(_listensock);
    if (_revs)
    delete[] _revs;
    }

    private:
    int _listensock;
    int _port;
    Epoll _poll;
    std::unordered_map<int, Connection *> _connections; // 管理:sock映射到Connection
    struct epoll_event *_revs; // 就绪事件缓冲区,就绪的文件描述符投递到这里
    int _revs_num; // 就绪事件缓冲区大小

    callback_t _cb; // 处理上层的业务的回调函数
    };


    本篇完。

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 网络和Linux网络-15(IO多路转接)reactor编程-服务器
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!