云计算百科
云计算领域专业知识百科平台

【linux】高级IO,以ET模式运行的epoll版本的TCP服务器实现reactor反应堆

小编个人主页详情<—请点击 小编个人gitee代码仓库<—请点击 linux系统编程专栏<—请点击 linux网络编程专栏<—请点击 倘若命中无此运,孤身亦可登昆仑,送给屏幕面前的读者朋友们和小编自己! 在这里插入图片描述


目录

    • 前言
    • 一、前置知识
    • 二、第一阶段,基本框架的实现
      • Connection
      • Main.cc
      • TcpServer
        • 测试
    • 三、第二阶段,引入业务协议
      • TcpServer
      • Main.cc
      • TcpServer
        • 测试
    • 四、拓展
    • 五、写博客一年的总结
    • 六、源代码
      • ClientCal.cc
      • Comm.hpp
      • Epoller.hpp
      • Log.hpp
      • Main.cc
      • makefile
      • nocopy.hpp
      • Protocol.hpp
      • ServerCal.hpp
      • Socket.hpp
      • TcpServer.hpp
    • 总结

前言

【linux】高级IO,I/O多路转接之epoll的两种工作模式:LT水平触发模式和ET边缘触发模式——书接上文 详情请点击<——,本文会在上文的基础上进行讲解,所以对上文不了解的读者友友请点击前方的蓝字链接进行学习 本文由小编为大家介绍——【linux】高级IO,以ET模式运行的epoll版本的TCP服务器实现reactor反应堆


一、前置知识

  • 上一篇文章,详情请点击<——,那么在上一篇文章中,我们讲解了epoll的默认工作模式是LT水平触发模式,而epoll的工作模式分为两种,分别是LT水平触发模式和ET边缘触发模式,所以也就意味着我们可以将epoll的工作模式设置为ET边缘触发模式,那么我们该如何设置呢?
  • 那么我们只需要通过epoll_ctl将要设置的fd关心的事件按位与上EPOLLET然后传入即可,关于fd可以关心的事件在后面蓝字链接文章第二点的epoll的接口中的epoll_wait的第三小点进行讲解,其中就有EPOLLET这个事件,epoll_ctl在后面蓝字链接文章第二点的epoll的接口中的epoll_ctl进行的讲解,详情请点击<—— 在这里插入图片描述
  • 以ET模式运行的epoll实现reactor反应堆,所以也就意味着我们要实现的还是一个epoll版本的TCP服务器,只不过这个epoll是以ET模式运行的,如上目前以ET模式运行的epoll版本的TCP服务器需要建立以及包含的源文件和头文件概况如上,所以目前我们需要引入4个.hpp文件 (一)Epoller.hpp,由于我们要实现的是epoll版本的TCP服务器,所以就要创建,等待,控制epoll模型,那么使用到epoll的三个接口,那么关于epoll的三个接口以及文件描述符epfd我们期望进行一定的封装,所以我们就要实现一个Epoller类放在Epoller.hpp文件中,Epoller.hpp在第三点Epoller中进行讲解,详情请点击<—— (二)Log.hpp是日志,其中定义了Log lg,所以我们可以直接使用lg打印日志,关于Log.hpp日志的讲解,详情请点击<—— (三)Main.cc是主函数,包含调用服务器的逻辑 (四)makefile用于编译构建可执行程序 (五)nocopy.hpp用于方式nocopy类,作为一款服务器,这个服务器只能有一份,所以我们并不期望服务器被拷贝,所以我们让服务器EpollServer继承nocopy即可,同样的Epoller这个类是用于创建,等待,控制epoll模型,那么我们期望创建的epoll模型不可被拷贝,所以我们也让Epoller继承nocopy,nocopy.hpp在第二点nocopy中进行讲解,详情请点击<—— (六)Socket.hpp用于封装套接字的原生接口,关于Socket.hpp套接字的讲解,在第三点的TCP服务器的Socket.hpp进行的讲解,详情请点击<—— (七)TcpServer.hpp,用于放置用来描述连接Connection类,以及以ET模式运行的epoll版本的TCP服务器对应的TcpServer类
  • 并且在学习本文以ET模式运行的epoll版本的TCP服务器去实现reactor反应堆之前,建议读者友友学习一下epoll以LT模式运行实现TCP服务器这一篇文章,因为前期编写中,大部分内容都相同详情请点击<——
  • 由于reactor反应堆的实现较为复杂,不要畏惧,小编会带领大家由简入深学习,小编在讲解过程中还会引入小编的其它文章的模块的代码直接进行接入使用,进行逐步的完善讲解reactor反应堆的实现
  • 二、第一阶段,基本框架的实现

  • 在第一阶段中,小编将以ET模式运行的epoll版本的TCP服务器实现reactor反应堆的基本框架进行引入,确保TcpServer服务器可以正常获取来自客户端的连接
  • Connection

  • epoll版本的TCP服务器的实现,详情请点击<——,我们回看一下之前讲解的epoll版本的TCP服务器的实现,其中我们只处理了读事件,fd的读事件就绪分为listensock的读事件就绪和普通sock的读事件就绪,对于listensock的读事件就绪表示连接的到来,需要使用连接管理器Accepter来处理
  • 那么对于普通sock的读事件就绪表示文件描述符sock底层的tcp的接收缓冲区中有数据了,需要我们读取上来,此时就需要使用如下的数据读取器Recver来进行处理,所以此时我们就可以调用read将底层的数据读取上来,但是由于是ET模式运行的epoll,所以文件描述符sock需要被设置为非阻塞,然后以循环读取,直到出错的方式,将底层的tcp的接收缓冲区的数据全部读取上来,上一篇文章中讲解了ET模式,详情请点击<——
  • void Recver(int fd)
    {
    char buffer[1024];
    ssize_t n = read(fd, buffer, sizeof(buffer) 1);
    if(n > 0)
    {
    buffer[n] = 0;
    std::cout << "get a message: " << buffer << std::endl;

    std::string echo_str = "server echo$ ";
    echo_str += buffer;
    write(fd, echo_str.c_str(), echo_str.size());
    }
    else if(n == 0)
    {
    lg(Info, "client quit, me to, close fd: %d", fd);
    _epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
    close(fd);
    }
    else
    {
    lg(Warning, "recv error, fd: %d", fd);
    _epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
    close(fd);
    }
    }

  • 那么此时我们思考一下,此时我们就可以将读取上来的数据进行处理吗?不一定可以处理,只有报文是完整的,我们才能进行处理,此时读取上来的报文一定完整吗?不一定,因为客户端将请求报文写给客户端的tcp,那么报文什么时候发送,发送多少,出错了怎么办,完全由对方客户端的tcp自主决定
  • 换句话来说,tcp是面向字节流的,所以有可能由于一些其它原因,例如网路状况不好或者请求报文很大等,需要将一个完整的请求报文进行拆分,拆分为多次进行发送,假设客户端第一次发送的请求报文只发送了1/2,那么此时服务器收到了这个1/2请求报文,尽管服务端这边是以ET模式运行的epoll版本的TCP服务器,连接对应的普通sock被设置为了非阻塞,sock可以支持循环读取,直到出错的方式,将底层的tcp的接收缓冲区的数据全部读取上来
  • 可是此时尽管读取上来了,读取到了用户级缓冲区中,但是这个请求报文只有1/2,协议规定只有报文是完整的才可以进行解码,反序列化,进行处理,所以此时上层并不能处理这个1/2的请求报文,只有对方客户端再次发送余下的1/2的请求报文,服务器这边连接对应的文件描述符sock的读事件再次就绪,服务器调用read循环读取,直到出错将余下的1/2的请求报文也读取上来的时候,此时可以凑成一个完成的请求报文,服务器才可以使用协议对这个完整的请求报文进行解码,反序列化,进行处理,然后才能对处理后的结果进行序列化,转码,然后将相应报文发送回给客户端,客户端才可以收到来自服务器的响应报文
  • 所以说我们需要将读取上来的数据进行保存,那么此时上面数据读取器Recver的buffer还能满足我们的需求吗?不能,因为这个buffer是固定大小的,有可能对方发来的请求报文的大小很大的,如果固定大小那么有可能会溢出,所以我们期望用户级缓冲区可以支持扩容,并且我们仔细看一下数据读取器Recver的这个buffer是否单单属于这一个连接对应的文件描述符sock呢?
  • 不单单只属于这一个连接的文件描述符sock,你连接对应的sock的读事件就绪了,需要调用数据读取器Recver进行处理,可是此时的情景是服务器,服务器要服务多个客户端,所以也就意味着服务器中的连接不单单只有这一个和客户端连接对应的sock,服务器中还有和其它的客户端对应的sock,那么当其它客户端向服务器发来请求报文的时候,那么和其它客户端对应的其它的文件描述符sock的读事件也会就绪的
  • 那么其它的文件描述符sock的读事件就绪之后,自然而然的要调用数据读取器Recver进行处理,所以自然的其它的文件描述符sock底层的tcp的接收缓冲区的数据经过循环读取,直到出错,将数据全部读取上来之后,有可能buffer的空间不够,这还不是最难受的,最难受的是此时用户层的缓冲区buffer之前存储的客户端连接对应的sock的1/2的请求报文就会被覆盖
  • 所以此时尽管之前的客户端将余下的1/2的请求报文发过来了,此时尽管可以将余下的1/2请求报文读取上来,但是最初的1/2的请求报文已经被覆盖了,即这个请求报文无法被拼凑为一个完整的请求报文,所以自然的这个请求报文也无法被处理,所以服务器就无法构建响应报文,自然的客户端也接收不到响应报文,不是说好了你服务器可以为客户端提供服务,接收到请求报文可以进行处理返回响应报文吗,所以此时的服务器就有问题了,那么我们该如何处理呢?
  • 所以针对sock上的读事件就绪,为了每一个文件描述符sock的请求报文不被覆盖,我们需要给每一个文件描述符sock维护一个用户层的输入缓冲区inbuffer,我们思考一下,sock就绪的事件除了读事件还有什么事件?还有写事件,所以我们也需要在用户层给sock维护一个用户层的输出缓冲区outbuffer
  • 所以此时我们就使用Connection类将客户端和服务器进行连接对应的文件描述符sock需要使用的一切相关的变量,方法,数据,结构等都描述起来,于是在TcpServer.hpp这个文件中,我们开始编写Connection这个类
  • 首先我们来看Connection类中要包含的私有成员变量,那么要包含文件描述符_sock,自然也要包含对应的输入缓冲区_inbuffer,输出缓冲区_outbuffer,那么对于缓冲区的类型,我们这里设置为string类型即可,string的底层就是指向堆区空间的指针,并且支持对空间进行扩容,符合我们的需求
  • 但是别忘了这个类可是Connection,即要对一个连接进行操作,也就意味着未来无论谁拿到了这个Connection类,都可以向缓冲区中写入,或者拿到缓冲区的内容进行修改,所以此时对于输入缓冲区_inbuffer,我们就可以使用成员函数AppendInBuffer,向将读取上来的输出放到输入缓冲区中,同理对于输出缓冲区_outbuffer,我们使用AppendOutBuffer,将要输出的数据放到输出缓冲区中即可
  • 那么如果我们想要拿到输入缓冲区的数据进行上层的处理请求报文,如果输入缓冲区的数据足够一个完整的请求报文了,那么就可以根据上层协议对请求报文进行处理,然后由于此时请求报文已经被上层处理了,所以需要将这个请求报文在输入缓冲区_inbuffer删除掉,所以我们需要以引用的形式返回输入缓冲区_inbuffer
  • 所以此时我们就使用成员函数InBuffer返回输入缓冲区,让上层删除对应已经处理的位于输入缓冲区_inbuffer的一个请求报文,同样的道理对于输出缓冲区来讲,上层也可能需要获取输出缓冲区,所以我们就使用成员函数OutBuffer以引用的形式返回输出缓冲区_outbuffer
  • 别忘了这个Connection管理的是一个连接对应的文件描述符sock,那么获得Connection想要对文件描述符进行操作的是否知道该如何对文件描述符sock进行操作?不知道,例如,这个sock的读事件就绪可能有两种情况,那么第一种就是文件描述符是listensock的读事件就绪了,意味着此时有客户端进行连接,需要调用accept将连接获取上来,并且让epoll关心对应的fd上的事件等操作
  • 那么第二种文件描述符是普通sock的读事件就绪,意味着此时客户端发来了数据,需要调用read或recv等系统调用将数据从普通sock底层的tcp接收缓冲区中拿上来拷贝到输入缓冲区_inbuffer中等操作,所以调用者并不知道,那么我们作为这个文件描述符sock的管理者,应该想要对文件描述符sock进行操作的提供对应的方法,那么什么时候才需要对文件描述符sock进行操作呢?当然文件描述符sock上的事件就绪的时候
  • 而事件就绪,分为读事件就绪,写事件就绪,还有异常事件就绪,所以对应的我们就要提供三个调用,那么这些事假的类型都应该是参数为Connection对象,返回值为void,因为获得了文件描述符sock对应的Connection对象之后就可以获得缓冲区,以及对缓冲区进行操作等,所以此时我们就可以使用using定义一个类型func_t,然后用实际类型使用包装器包装为std::function<void(std::shared_ptr<Connection>)>即可,有了包装器,我们就可以定义出类型func_t,然后在Connection类中我们就可以使用func_t类型定义方法了
  • 所以此时对于sock的读事件就绪,我们使用func_t类型定义_recv_cb,同理对于sock的写事件就绪,我们使用func_t类型定义_send_cb,对于sock的异常事件就绪,我们使用func_t类型定义_except_cb,那么这些回调函数的方法都要可以被外界直接调用,所以此时我们将这些回调的函数方法都设置为公有的成员变量即可,那么问题来了谁知道这些回调函数该被设置成什么?
  • 那么此时当sock对应的Connection对象创建后,由创建者进行传入设置这些回调函数,因为你要关心sock上的事件,所以你肯定知道该如何对这些事件进行操作,所以需要传入对应的回调函数,那么此时我们给外部提供一个成员函数SetHandler,让外部将这些回调函数进行传入,然后我们在成员函数SetHandler内进行赋值到Connection内部对应的回调函数即可
  • 那么此时外界想要获取这个Connection内部究竟是哪一个文件描述符sock还无法获取,所以此时我们给外界提供一个函数SockFd,将Connection对象内部的文件描述符_sock进行返回即可,在一些场景中,仅仅获得这些成员或者回调方法还不够,获得Connection对象在一些场景中不仅仅想要调用Connection对象内部的回调方法,同样的还想要调用TcpServer的成员方法,所以此时我们给Connection设置一个公有的成员变量_tcp_server_ptr
  • 那么对于这个公有的成员变量_tcp_server_ptr的类型我们期望可以自动管理这个_tcp_server_ptr的生命周期,所以此时我们需要使用智能指针,关于智能指针的讲解,详情请点击<——,那么智能指针有4个,分别是auto_ptr, unique_ptr, shared_ptr, weak_ptr,那么读者友友请思考一下,这里使用什么类型的智能指针最好?
  • 首先我们排除auto_ptr,因为auto_ptr由指针悬空的问题,所以在实际使用中我们并不期望使用auto_ptr,其次我们排除unique_ptr,因为unique_ptr顾名思义,只允许一个智能指针管理TcpServer,而首先在main函数中我们就要使用智能指针管理TcpServer,并且在其它的Connection对象中也需要有管理TcpServer的指针_tcp_server_ptr,所以unique_ptr不符合我们的需求,那么有的读者友友又想了,我们使用shared_ptr呢?
  • 如果使用shared_ptr这里更严重,因为会造成shared_ptr循环引用的问题,最终会导致内存泄露,关于shared_ptr的循环引用问题在第六点shared_ptr的循环引用中进行讲解,详情请点击<——,下面基于前方文章讲解的循环引用,这里小编讲解一下为什么不能使用shared_ptr?
  • 首先要知道TcpServer是一个服务器,服务器要服务多个客户端,一个客户端对应一个文件描述符sock,所以也就意味着要有多个sock对应的Connection类对象,那么我们管理这些类对象期望可以让这些Connection类对象自动管理生命周期,所以我们同样要使用智能指针,那么我们选择的是shared_ptr
  • 因为未来在Connection类对象进行调用回调函数的时候传入的参数就是Connection类对象的shared_ptr智能指针,所以会存在多个智能指针管理同一个对象的情况,所以我们选择shared_ptr,而由于是函数的参数是智能指针,所以这个智能指针shared_ptr<Connection>是一个临时变量,当函数调用结束这个临时对象的生命周期也就结束了
  • 所以对回调函数进行传参引用计数加1,那么在函数调用结束之后,其中的引用计数就会减1,变回原来的值,并且在TcpServer中服务器是要管理Connection类对象的生命周期的,所以此时我们使用shared_ptr管理Connection类对象并没有问题,但是问题就出现在了Connection类中
  • 但是如果Connection类中的回指指针指向TcpServer的智能指针的类型是shared_ptr,并且在main函数中,我们当初创建服务器的时候不得不使用shared_ptr智能指针管理服务器TcpServer(小编后面讲解为什么不得不是shared_ptr),所以此时就造成了循环引用的场景 在这里插入图片描述
  • 类似于上面的场景,那么小编将模型提炼一下,存在两个对象都被智能指针shared_ptr管理,即对象1自身被智能指针shared_ptr管理,对象2自身同样被智能指针shared_ptr管理,并且在这两个对象内部互相使用智能指针shared_ptr管理对方这个对象,即对象1内部也使用智能指针shared_ptr管理对象2,同理,对象2内部使用智能指针shared_ptr管理对象1
  • 所以上面这个模型此时就会造成循环引用的问题,进而引用计数内部的计数只能减到1无法减到0,进而造成对象及其资源无法正常释放,进而造成内存泄露的问题,那么小编,你的意思是这里如果Connection类内部如果也使用shared_ptr管理TcpServer对象也会造成内存泄露的问题,如何体现呢?
  • 首先,main函数中我们由于一些原因不得不使用shared_ptr管理服务器TcpServer类对象,所以此时对象1就是main函数中使用shared_ptr管理TcpServer类对象,那么当服务器运行后,有客户端来连接服务器,所以此时就会创建出Connection对象,TcpServer类对象内部需要使用shared_ptr去管理Connection类对象,所以此时被shared_ptr管理的Connection类对象是对应上面的对象2,所以无论是对象1还是对象2都被shared_ptr进行管理
  • 那么接下来,我们继续看,TcpServer类对象内部有智能指针shared_ptr管理Connection类对象,并且Connection类对象内部也有智能指针shared_ptr管理TcpServer类对象,所以本身被智能指针shared_ptr管理对象1内部使用智能指针shared_ptr管理对象2,本身被智能指针shared_ptr管理对象2内部使用智能指针shared_ptr管理对象1
  • 所以此时类比上面小编抽象出来的模式,此时就造成了智能指针shared_ptr的循环引用问题,进而导致对象计数无法减为0,进而导致对象无法释放导致的内存泄露问题,那么我们该如何解决这个智能指针shared_ptr的循环引用问题呢?
  • 很简单使用智能指针weak_ptr打破它就好了,智能指针weak_ptr被设计出来是为了解决智能指针shared_ptr的循环引用问题,weak_ptr的特性是管理对象不增加对象的计数,所以如果Connection类内部的TcpServer使用weak_ptr这个智能指针进行管理,那么就不会增加shared_ptr的引用计数,所以最终引用计数就能减为0,进而对象就能被正常释放,也就不会造成内存泄露了
  • 并且weak_ptr不管理对象的生命周期,在Connection类对象我们只是期望使用TcpServer的成员函数,所以并不期望管理TcpServer的生命周期,TcpServer的生命周期由main函数中shared_ptr进行管理,所以这里使用weak_ptr在语法上也说得通,所以Connection类对象内部我们要使用weak_ptr管理TcpServer即可
  • 关于weak_ptr的讲解第七点进行的讲解,详情请点击<——,所以此时在Connection类的构造函数这里我们接收一下文件描述符sock,以及使用weak_ptr<TcpServer>接收TcpServer即可,但是这里不仅仅是要简单的接收TcpServer,而是要想要创建Connection类对象的需要进行传参shared_ptr<TcpServer>,因为最终管理TcpServer还是要交给shared_ptr来做的,所以接收的类型是weak_ptr<TcpServer>,创建者传参是shared_ptr<TcpServer>类型
  • 而TcpServer类的定义是位于Connection类的后面,所以这里如果想要使用weak_ptr<TcpServer>这个类型需要提前使用class TcpServer;声明类TcpServer,并且using定义的类型func_t也需要使用Connection类,但是Connection类的位置是在using的后面,所以这里我们需要使用class Connection;声明一下Connection类
  • 还有两点预备工作需要准备,第一点,要知道我们未来要以ET模式运行epoll,所以我们关心的读事件除了设置EPOLLIN之外需要添加EPOLLET,同理关心的写事件除了设置EPOLLOUT之外还需要添加EPOLLET,所以这里我们提前定义一下uint32_t类型的EVENT_IN为(EPOLLIN | EPOLLET);,定义uint32_t类型的EVENT_OUT为(EPOLLOUT | EPOLLET);
  • 第二点将来读取普通sock的tcp的接收缓冲区的时候,我们要循环读取,读取到出错,才能将底层tcp的接收缓冲区的数据全部读取上来,那么我们将来是还需要定义一个临时话冲去buffer的,用于传参给recv或者read,那么这个临时缓冲区的大小也就决定了,诸如使用recv或者read进行一次读取读取多少字节数,所以这里我们设置一下临时缓冲区的大小为g_buffer_size为128,所以一次读取读取128字节,循环读取,直到读取出错,那么有的读者友友可能会问,为什么这里不直接使用Connection类对象内部的_inbuffer?
  • 因为如果将_inbuffer当做参数,那么我们是要进行循环读取的,所以也就注定了会覆盖数据,所以这里我们使用一个临时缓冲区buffer即可,然后将循环读取,进行读取前每次对临时缓冲区的数据进行重置为’\\0’,然后每次进行读取上来的数据放到临时缓冲区buffer中,然后再调用Connection类对象中的AppendInBuffer将临时缓冲区的数据尾插到_inbuffer中即可,所以这样进行循环读取,读取到出错上来的数据才可以进行拼凑为一个又一个的完整的请求报文
  • 那么Connection类对象的中管理的sock有可能是listensock用于获取和客户端的连接的,也有可能是普通sock,用于和客户端进行数据通信的,所以此时有可能我们需要保存一下这个连接对应客户端的一些信息便于打印日志,客户端的信息比如IP地址和端口号port,所以此时我们在Connection类添加公有的成员变量IP地址_ip以及端口号_port即可
  • 那么Connection对象是负责管理文件描述符sock的,所以自然当Connection对象析构的时候,代表着连接已经不需要我们维护了,所以此时在析构函数中,如果文件描述符sock大于0,那么我们调用close将文件描述符sock关闭,释放文件打开对象,释放连接对象,防止内存泄漏
  • #include <iostream>
    #include <memory>
    #include <functional>
    #include <string>
    #include <unordered_map>
    #include <sys/epoll.h>
    #include "Log.hpp"
    #include "Socket.hpp"
    #include "nocopy.hpp"
    #include "Epoller.hpp"

    class Connection;
    class TcpServer;

    uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
    uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
    static const int g_buffer_size = 128;

    using func_t = std::function<void(std::shared_ptr<Connection>)>;

    class Connection
    {
    public:
    Connection(int sock, std::weak_ptr<TcpServer> tcp_server_ptr)
    : _sock(sock), _tcp_server_ptr(tcp_server_ptr)
    {}

    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
    _recv_cb = recv_cb;
    _send_cb = send_cb;
    _except_cb = except_cb;
    }

    void AppendInBuffer(const std::string& info)
    {
    _inbuffer += info;
    }

    void AppendOutBuffer(const std::string& info)
    {
    _outbuffer += info;
    }

    std::string& InBuffer()
    {
    return _inbuffer;
    }

    std::string& OutBuffer()
    {
    return _outbuffer;
    }

    int SockFd()
    {
    return _sock;
    }

    ~Connection()
    {
    if(_sock > 0)
    {
    close(_sock);
    }
    }

    private:
    int _sock;
    std::string _inbuffer;
    std::string _outbuffer;

    public:
    func_t _recv_cb;
    func_t _send_cb;
    func_t _except_cb;

    std::weak_ptr<TcpServer> _tcp_server_ptr;

    std::string _ip;
    uint16_t _port;
    };

    Main.cc

  • 那么我们在Mian.cc中的main函数中使用shared_ptr管理TcpServer,进行new类对象TcpServer的时候要进行传参,告知服务器要绑定的端口号是8080
  • 接下来就是调用Init初始化服务器
  • 然后调用Loop启动服务器,细心的读者友友可能会发现,小编这里启动服务器使用的函数名在命名上发生了改变,从Start变成了Loop,这是什么原因呢?
  • 因为这里命名上使用Loop更为标准,从英文上翻译Loop的汉语在计算机世界中对应的是是一套重复的指令,什么意思呢?即一旦服务器运行,那么服务器就在不断的执行重复的逻辑,什么逻辑呢?
  • 那么服务器一旦运行那么就是一个基于死循环的去执行Dispatcher派发器,即检测事件就绪,派发事件,事件处理完成,继续检测事件,以此重复这个逻辑,所以此时我们运行服务器在命名上这个函数名使用Loop更好
  • #include <iostream>
    #include <memory>
    #include "TcpServer.hpp"

    int main()
    {
    std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080));

    epoll_svr->Init();
    epoll_svr->Loop();

    return 0;
    }

    TcpServer

  • Epoller.hpp在第三点Epoller中进行讲解,详情请点击<——,那么在正式讲解TcpServer之前,小编这里还需要我们对于Epoller.hpp中的Epoller这个类进行一定的调整,其实关于epoller类的私有成员变量中的_timeout是用于epoll_wait的参数,那么为了便于外部直接传参更灵活的控制epoll_wait所以我们将第三个参数设置为timeout,然后使用参数timeout传参epoll_wait,这样我们将Epoller类的私有成员变量_timeout注释掉即可
  • class Epoller : public nocopy
    {
    public:
    int EpollerWait(struct epoll_event revents[], int num, int timeout)
    {
    // int n = epoll_wait(_epfd, revents, num, _timeout);
    // int n = epoll_wait(_epfd, revents, num, 0);
    // int n = epoll_wait(_epfd, revents, num, -1);
    int n = epoll_wait(_epfd, revents, num, timeout);

    return n;
    }

    private:
    int _epfd;
    // int _timeout{3000};
    };

  • 接下来我们就可以正式开始编写TcpServer类了,那么我们知道将来这个TcpServer服务器是要连接多个客户端的,所以也就意味着这个TcpServer服务器需要为来自客户端的连接对应的文件描述符sock创建对应的Connection对象,而要创建Connection对象,那么必不可少的就要传参,那么在上面我们已经讲解过了,需要传参sock以及shared_ptr<TcpServer>,那么此时是位于TcpServer内部,我们该如何传参shared_ptr<TcpServer>?
  • 那么此时有的读者友友心中可能会想,既然是位于TcpServer内部,类内不是有this指针吗,恰好shared_ptr智能指针恰好不就是通过指针的形式指向要管理的资源进行管理,所以是否我们可以将this指针通过shared_ptr的构造函数,即通过shared_ptr<TcpServer>(this)构造出shared_ptr<TcpServer>然后进行传参呢?
  • 不行,一定不行,为什么呢?首先在Main.cc的main函数中使用的是shared_ptr管理的TcpServer其中的shared_ptr的引用计数是为1,那么这里采用即通过shared_ptr<TcpServer>(this)构造的匿名对象中的引用计数同样是是为1的,因为这里本身是采用了匿名对象构造了一个新的智能指针shared_ptr,所以和Main.cc的main函数中的智能指针shared_ptr的引用计数不是同一个
  • 即引用计数不共享,发生了使用了多个引用计数不共享的shared_ptr管理一个TcpServer,那么这里通过shared_ptr<TcpServer>(this)构造的匿名对象是一个临时对象,一旦传参结束,那么这个临时对象的生命周期就到了,别忘了Connection中的是采用的weak_ptr所以不会增加引用计数,所以此时引用计数减1,从1减为0,那么就要调用析构函数,然后再依次调用TcpServer的成员变量的析构函数进行析构释放资源,所以此时TcpServer已经被析构了
  • 别忘了将来这个TcpServer服务器是要连接多个客户端的,所以也就意味着这个TcpServer服务器需要为来自客户端的连接对应的文件描述符sock创建对应的Connection对象,那么前面TcpServer服务器已经析构过一次了,如果此时再有客户端的连接到来,那么服务器TcpServer就要再次创建Connection,那么还要进行传参
  • 那么同样的道理这里通过shared_ptr<TcpServer>(this)构造的匿名对象是一个临时对象,传参结束,临时对象的生命周期到了,那么就要调用析构函数,然后再依次调用TcpServer的成员变量的析构函数进行析构释放资源,就会造成TcpServer再次被析构,所以此时就会出现double free的问题,别忘了我们是不允许同一块空间被连续析构两次的,所以此时进程就会直接异常终止
  • 别忘了此时这个进程是一个服务器,服务器为2个客户端提供服务就直接异常终止了,所以此时服务器是有问题的,并且这个其中的最大问题就是TcpServer服务器为和客户端的连接对应的文件描述符sock创建几个Connection对象,那么TcpServer服务器就要被析构几次
  • 并且如果TcpServer服务器退出,在Main.cc中的main函数中也是使用的shared_ptr管理的TcpServer服务器,所以当main函数要结束的时候,shared_ptr的生命周期结束,还会再次析构释放TcpServer服务器,这个TcpServer服务器被反复的析构释放进行鞭尸,这可就太严重了,所以问题的核心出在哪里呢?
  • 问题的核心出在几乎所有的智能指针shared_ptr的引用计数是不共享的,即我们要创建Connection对象,那么就要给第二个参数传参shared_ptr<TcpServer>,要获取shared_ptr<TcpServer>我们上面的方式是基于this指针构建匿名对象shared_ptr然后进行传参,这种方式对于引用计数不是共享的
  • 我们实际想要获取的shared_ptr<TcpServer>是main函数中的shared_ptr,一旦从main函数获取了shared_ptr<TcpServer>,那么也就意味着引用计数是共享的,所以此时,所以c++中是否提供了接口供我们在TcpServer类内获取main函数中的用于管理TcpServer的shared_ptr呢? 在这里插入图片描述
  • 有的,那么我们来看enable_shared_from_this这个类,使用这个类要包含#include <memory>,这个类是要作为基类进行继承使用的,所以如下,我们对于TcpServer这个类继承一下enable_shared_from_this这个类,由于是类模版,那么我们这里实例化为enable_shared_from_this<TcpServer>即可,并且我们也看上面的第三段simply returning shared_ptr(this) would be problematic,仅仅使用shared_ptr的构造函数转换this指针获得当前TcpServer的shared_ptr的方式是有问题的,原因就在于引用计数不共享,所以会造成重复析构的问题
  • 并且enable_shared_from_this这个类要求在Main.cc的main函数中管理TcpServer的智能指针必须是shared_ptr,即需要在main函数中有一个现有的智能指针shared_ptr对象去管理TcpServer,只有这样才可以共享计数 在这里插入图片描述
  • 那么当继承了enable_shared_from_this这个类之后,如果想要在类内获取当前对象的shared_ptr,那么只需要调用shared_from_this即可,这个函数会返回一个shared_ptr对象,并且与main函数现有的shared_ptr共享所有权,所以在main函数中,当初小编使用智能指针管理TcpServer的时候才会使用shared_ptr
  • 那么同样的对于TcpServer服务器,我们并不期望这个服务器被拷贝,所以我们让TcpServer继承nocopy即可,nocopy.hpp在第二点nocopy中进行讲解,详情请点击<——,接下来我们来看TcpServer类的私有成员变量
  • 作为一个服务器,要绑定端口号,所以私有成员变量中需要有一个端口号_port,那么私有成员变量中还要有一个标志位_quit,如果_quit为false那么服务器启动,如果_quit为true,那么服务器不启动
  • 并且我们要实现的服务器是以ET模式运行的epoll版本的TCP服务器,所以要进行socket编程,那么对于socket相关的接口,我们已经封装在了Socket.hpp中的类Sock中了,所以这里我们要实例化一个Sock的类对象,那么我们期望这个类对象可以自动管理生命周期,所以这里我们在私有成员变量中使用shared_ptr对应的_listensock_ptr管理Sock的类对象,关于Socket.hpp套接字的讲解,在第三点的TCP服务器的Socket.hpp进行的讲解,详情请点击<——
  • 同样的既然是以ET模式运行的epoll版本的TCP服务器,所以这里就要使用epoll的相关接口,小编在前面已经将epoll的相关接口封装在了Epoller.hpp中的Epoller类中了,所以这里我们就要实例化一个Epoller的类对象,那么同样的我们期望这个类对象可以自动管理生命周期,所以这里我们同样在私有成员变量中使用shared_ptr对应的_epoller_ptr管理Epoller的类对象,Epoller.hpp在第三点Epoller中进行讲解,详情请点击<——
  • 那么私有成员变量我们还要定义一个static和const修饰的int类型的变量num,初始值我们给64,用于给Epoller中的EpollerWait的第二个参数传参,即num表示将来我们获取就绪的fd的最大个数,那么要调用EpollerWait就需要传入一个struct epoll_event类型的数组_revs,所以此时我们此时将这个数组放到类的私有成员变量中,便于直接拿来使用,那么_revs中可以放置的对象个数我们设置为num即可
  • 那么我们知道将来这个TcpServer服务器是要连接多个客户端的,所以也就意味着这个TcpServer服务器需要为来自客户端的连接对应的文件描述符sock创建对应的Connection对象,客户端有多个,所以也就意味着Connection对象会存在多个,那么TcpServer服务器要不要将多个Connection对象管理起来?要,如何管理?先描述,再组织
  • 所以已经使用了Connection将连接描述出来了,那么如何组织?那么就是采用特定的数据结构将多个Connection对象组织起来,那么这里我们就可以思考了,一个Connection对象对应一个连接,一个和客户端的连接对应一个文件描述符sock,文件描述符sock的本质是文件描述符表的数组下标,数组下标是唯一的,那么将来我们要fd上处理就绪的事件,是通过epoll_wait直接从内核的epoll模型的就绪队列中拿的 在这里插入图片描述
  • 所以也就意味着从epoll_wait返回的数组_revs中的fd及其事件是连续的,我们可以连续的处理fd对应的事件,这效率就很高了,并且_revs中的对象的类型是struct epoll_event,我们可以从其中的第一个成员变量events获悉是什么事件就绪了,从第二个成员变量data的类型中struct epoll_data_t的fd字段获悉是哪一个文件描述符,所以我们通过了struct epoll_event的结构就可以获悉要对哪个fd上的哪个就绪事件进行处理
  • 所以此时如果我们想要对fd上的就绪的事件进行处理,所以首先就要找到fd对应的Connection对象,并且文件描述符fd又是数组下标是唯一的,所以如果想要快速通过fd定义Connection对象,此时我们想到了一个数据结构哈希表,关于哈希表的讲解,详情请点击<——
  • 所以此时我们就可以使用文件描述符fd当做键值key,使用Connection对象当做value,当然我们想让Connection对象自动管理生命周期,所以这里我们使用shared_ptr管理Connection对象,所以哈希表的value应该是shared_ptr<Connection>,所以此时我们就可以给TcpServer创建类型为unordered_map<int, std::shared_ptr<Connection>>的私有成员变量_connections去帮助我们管理多个Connection对象,以及快速根据文件描述符fd定义Connection对象,至此我们就介绍完成了TcpServer的私有成员变量
  • 那么接下来,我们在构造函数中对私有成员变量进行初始化,我们期望服务器所绑定的端口号是外部传入的,所以这里在构造函数中我们接收端口号port,然后对于标志位_quit我们初始化为true表示默认服务器是不运行的,然后使用接收到的端口号port初始化成员变量_port,然后给_listensock_ptr这个成员变量new一个Sock对象,给_epoller_ptr这个成员变量new一个Epoller对象,于是我们就完成了构造函数
  • 接下来我们来看运行服务器Loop的编写,那么关于Loop就是执行一段重复的逻辑,那么服务器启动,我们将标志位_quit设置为false,表示停止的为假即启动服务器,那么在接下来的while循环的判断条件采用!_quit执行死循环即可,当服务器停止的时候,我们在末尾将_quit设置为true即可,其实设置这个_quit标志位,我们可以当服务器想要停止的时候,那么我们只需要采用某种手段修改_quit为true经过while循环判断即可停止服务器
  • 一旦进入while死循环那么就表示此时服务器启动运行了,服务器一旦运行那么就是一个基于死循环的去执行Dispatcher派发器,给Dispatcher传入超时时间,为了便于演示现象这里我们传入-1表示阻塞式等待文件描述符sock上的事件就绪,执行Dispatcher派发器即执行检测事件就绪,派发事件,事件处理完成,继续检测事件,以此重复这个逻辑
  • 那么每次执行完事件派发器Dispatcher后,在Loop的while循环中,接下来我们都使用PrintConnection打印一下_connections中的文件描述符sock,以及用户级的输入缓冲区_inbuffer的数据,只需要使用迭代器遍历一下_connections,然后键值key就是文件描述符sock我们打印一下即可,然后value是Connection对象,那么其中就有Inbuffer可以获取用户级的输入缓冲区_inbuffer,此时我们打印一下_inbuffer即可
  • class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
    {
    static const int num = 64;
    public:
    TcpServer(uint16_t port)
    : _quit(true)
    , _port(port)
    , _listensock_ptr(new Sock())
    , _epoller_ptr(new Epoller())
    {}

    void Init()
    {}

    void Dispatcher(int timeout)
    {

    }

    void Loop()
    {
    _quit = false;

    while(!_quit)
    {
    Dispatcher(1);
    PrintConnection();
    }

    _quit = true;
    }

    void PrintConnection()
    {
    std::cout << "_connections fd list: " << std::endl;
    for(auto& connection : _connections)
    {
    std::cout << connection.first << ", ";
    std::cout << "inbuffer: " << connection.second->InBuffer() << std::endl;
    }
    std::cout << std::endl;
    }

    ~TcpServer()
    {}

    private:
    bool _quit;
    uint16_t _port;
    std::shared_ptr<Sock> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
    struct epoll_event _revs[num];
    };

  • 接下来我们就要开始编写事件派发器Dispatcher,那么就是检测事件就绪,派发事件,所以首先就要调用epoll_wait去将已经就绪的fd及其事件获取上来,那么对于epoll_wait我们已经提前封装在了Epoller类中了,Epoller.hpp在第三点Epoller中进行讲解,详情请点击<——
  • 所以此时我们就可以调用_epoller_ptr中的EpollerWait方法,传入struct epoll_event数组那么对应就是我们之前早就封装的私有成员变量_revs,传入num我们要一次获取的最大就绪fd的个数,以及传入超时时间tiemout,注意这个的超时时间我们当初为了便于演示设置成了-1,即阻塞等待fd上的事件就绪
  • 那么当EpollerWait的返回值n大于0的时候,表示有n个fd上的事件就绪了,此时要进行事件处理逻辑,那么当n等于0的时候表示超时,但是我们这里并不会出现超时,而是一直阻塞直到fd上的事件就绪,所以对于n等于0的情况我们就不再判断
  • 如果n小于0表示等待出错,一般等待出错的情况很少,所以这里小编也就不再判断n小于0的情况了,那么我们只需要关心的情况就是n大于0的情况,由于n等于0以及n大于0我们都不考虑,只剩下n大于0表示有n个fd上的事件就绪了,所以这里小编就不再判断n了,而是直接处理n个就绪的fd上的事件即可 在这里插入图片描述
  • 别忘了EpollerWait的返回值表示有n个fd上的事件就绪了,所以我们可以连续的处理就绪的fd上的事件,所以此时我们就for循环,循环n次,那么就绪的fd及其事件都在struct epoll_event数组_revs中放着,所以此时我们就可以取出文件描述符sock,取出对应就绪的事件events (一)EPOLLIN,表示对应的文件描述符可以读,或者对端的调用close正常关闭连接 (二)EPOLLOUT,表示对应的文件描述符可以写 (三)EPOLLPRI,表示对应的文件描述符上有紧急的数据可读 (四)EPOLLERR,表示对应的文件描述符发生错误,此时我们需要关闭这个fd (五)EPOLLHUP,表示对应的文件描述符被挂断,此时我们需要关闭这个fd (六)EPOLLET,将epoll的工作模式由默认的LT(Level Triggered)水平触发模式,设置为ET(Edge Triggered)边缘触发模式
  • 所以此时我们就可以对事件events进行判断了,如果events中有EPOLLERR或者EPOLLHUP表示此时事件已经出现异常了,那么这里我们将events按位与上(EPOLLIN | EPOLLOUT),所以此时异常问题就会转换成读写问题,读写问题中有进行异常处理的逻辑
  • 所以接下来我们就对读事件进行判断,如果读事件就绪,那么我们就调用文件描述符sock对应的Connection对象中的_recv_cb方法,但是我们不确定是否这个文件描述符sock在_connections中存在,所以我们就执行IsConnectionSafe去检测fd是否存在,那么调用哈希表的find方法去进行查找,如果不存在那么就返回false,如果存在那么就返回true
  • 所以在正式调用回调_recv_cb方法之前,我们需要调用IsConnectionSafe判断fd是否存在,如果存在我们才调用回调去处理读事件,但是此时还有问题,是否回调_recv_cb被Connection对象的建者设置了呢?我们也不知道,所以此时我们还要判断回调_recv_cb是否存在,如果存在我们才真正的调用回调_recv_cb去处理读事件,那么同样的道理写事件也是如此的逻辑去调用回调_send_cb
  • bool IsConnectionSafe(int fd)
    {
    // std::unordered_map<int, std::shared_ptr<Connection>>::iterator iter = _connections.find(fd);
    auto iter = _connections.find(fd);
    if(iter == _connections.end())
    return false;

    return true;
    }

    void Dispatcher(int timeout)
    {
    int n = _epoller_ptr->EpollerWait(_revs, num, timeout);
    for(int i = 0; i < n; i++)
    {
    int sock = _revs[i].data.fd;
    uint32_t events = _revs[i].events;
    // 统一将事件异常转化为读写问题
    if((events & EPOLLERR) | (events & EPOLLHUP))
    events |= (EPOLLIN | EPOLLOUT);
    // 这样可以简化逻辑,只需要处理读写问题
    if((events & EPOLLIN) && IsConnectionSafe(sock))
    {
    if(_connections[sock]->_recv_cb)
    _connections[sock]->_recv_cb(_connections[sock]);
    }
    if((events & EPOLLOUT) && IsConnectionSafe(sock))
    {
    if(_connections[sock]->_send_cb)
    _connections[sock]->_send_cb(_connections[sock]);
    }
    }
    }

  • 那么我们光进行Loop,进行Dispatcher事件派发了,我们给服务器初始化创建套接字绑定端口号设置监听状态了吗?没有,可是此时我们给epoll设置关心的fd及其事件了吗?没有,所以接下来我们来编写Init初始化服务器,但是在正式编写Init初始化服务器之前,我们还需要一些前置性的铺垫
  • 那么我们要实现的可是以ET模式运行的epoll版本的TCP服务器实现reactor反应堆,而要以ET模式运行,就要确保一次读取将底层的数据或者连接全部读取上来,所以也就意味着我们要进行循环读取,直到出错,这样才可以全部读取上来,而要进行循环读取,直到出错的前提就是文件描述符sock要被设置为非阻塞,所以epoll想要以ET模式运行的前提就是文件描述符sock要被设置为非阻塞,所以我们需要实现一下非阻塞,那么关于非阻塞的实现其实小编已经讲解过了,第四点非阻塞的实现,详情请点击<—— 在这里插入图片描述
  • 那么为了规范,小编新建一个.hpp文件命名为Comm.hpp然后在这个文件中实现将文件描述符sock设置为非阻塞,所以如下,我们包好对应的头文件,fl是文件状态标记,然后先使用fcntl的F_GETFL将文件描述符之前底层已经存在被设置的标志位全部获取上来fl,然后如果获取失败即fl小于0,那么此时问题就很大了,如何理解呢?
  • 之前我们讲解epoll想要以ET模式运行的前提就是文件描述符sock要被设置为非阻塞,此时文件描述符sock设置为非阻塞失败,所以自然epoll也就无法以ET模式运行,所以自然的也就无法以ET模式运行的epoll版本的TCP服务器实现reactor反应堆,所以此时我们的服务器也就不符合我们的设计初衷了,所以此时我们直接就exit终止服务器,设置错误码为NON_BLOCK_ERR表示非阻塞设置失败 在这里插入图片描述
  • 那么走到下面fl一定大于等于0,表示获取之前设置好的标志位fl成功,所以此时我们使用fcntl的第二个参数cmd对应的命令F_SETFL设置文件状态标记,那么应该是在文件描述符对应的文件状态标记fl的基础上进行新增非阻塞O_NONBLOCK这个状态,所以应该是 fl 按位或上O_NONBLOCK即可实现
  • #pragma once

    #include <cstdlib>
    #include <unistd.h>
    #include <fcntl.h>
    #include "Socket.hpp"

    void SetNonBlockOrDie(int sock)
    {
    int fl = fcntl(sock, F_GETFL);
    if(fl < 0)
    exit(NON_BLOCK_ERR);

    fcntl(sock, F_SETFL, fl | O_NONBLOCK);
    }

  • 我们仔细一看,上面小编包了一个#include "Socket.hpp"的头文件,这究竟是什么作用呢?对于这个错误码NON_BLOCK_ERR我们并没有定义,我们将其定义了在#include "Socket.hpp"中enum中添加错误码NON_BLOCK_ERR,关于Socket.hpp套接字的讲解,在第三点的TCP服务器的Socket.hpp进行的讲解,详情请点击<——
  • enum{
    SocketErr = 1,
    BindErr,
    ListenErr,
    NON_BLOCK_ERR
    };

  • Init是要创建listensock的作用,所以listensock也需要被创建Connection,添加到内核等一系列动作,所以此时我们封装一个AddConnection函数帮我们实现这些作用,并且将来不仅仅是listensock需要使用AddConnection,将来新的连接到来也需要进行这些工作,所以这里我们将这个工作封装成一个函数AddConnection便于调用,由于以ET模式运行的epoll其中的文件描述符sock需要被设置为非阻塞,而在AddConnection中我们并不进行设置,那么这个设置文件描述符sock为非阻塞的工作AddConnection默认在调用之前已经完成,即AddConnection认为自己得到的是一个已经被设置为非阻塞的文件描述符sock
  • 那么此时要讲解Init还需要一个前置性的函数AddConnection需要实现,参数比较多,下面我们逐一进行讲解,AddConnection有创建文件描述符sock对应的Connection对象,并且进行其中的设置字段,并且添加连接到TcpServer到_connections,以及将文件描述符sock以及要关心的事件设置进内核epoll模型的红黑树rb_tree,即让内核帮我们关心的sock及其事件作用,关于epoll模型中的rb_tree在第三点中进行的讲解,详情请点击<—— 在这里插入图片描述
  • 那么接下来,首先我们给文件描述符sock创建一个Connection对象,传参sock以及使用shared_from_thsi传参当前this指针在Main.cc中的main函数中的被shared_ptr管理的TcpServer,并且这个Connection对象需要被shared_ptr管理,因为这个Connection对象将来要放到_connections中,所以必须是shared_ptr<Connection>,因为_connections的类型是unordered_map<int, std::shared_ptr<Connection>>
  • 接下来我们就需要给文件描述符sock对应的Connection对象设置方法和字段了,那么调用SetHandler传入接收的三个回调方法,然后对Connection对象中的公有成员变量_ip以及_port进行对应的设置即可,所以此时文件描述符sock对应的Connection对象就初始化以及设置完成了
  • 接下来我们就要将文件描述符sock以及对应的Connection对象添加到TcpServer的unordered_map对象_connections中,那么这里使用make_pair构造出pair对象,然后调用unordered_map的insert方法插入pair对象即可,当然这里也可以不使用make_pair而是使用{sock, new_connection}去构造对象进行传参插入到unordered_map对象_connections中
  • 那么接下来我们还要将文件描述符sock以及对应的event事件添加到内核的epoll模型的红黑树rb_tree中,让内核帮我们关心文件描述符sock上的event事件,那么这里使用EpollerUpdata即可,最后打印日志添加一个新连接成功,然后打印新连接对应的文件描述符sock即可
  • void AddConnection(int sock, uint32_t events, func_t recv_cb, func_t send_cb, func_t except_cb, \\
    const std::string& ip = "0.0.0.0", uint16_t port = 0)
    {
    std::shared_ptr<Connection> new_connection(new Connection(sock, shared_from_this()));
    new_connection->SetHandler(recv_cb, send_cb, except_cb);
    new_connection->_ip = ip;
    new_connection->_port = port;

    _connections.insert(std::make_pair(sock, new_connection));

    _epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, events);
    lg(Debug, "add a new connection success, sockfd: %d", sock);
    }

  • 那么经过前面那么多的前置性讲解,我们终于可以来编写一下Init初始化服务器了,那么首先创建套接字,所以此时文件描述符listensock已经被创建出来了,我们知道文件描述符默认都是阻塞方式,而以ET模式运行的epoll其中的文件描述符sock需要被设置为非阻塞,所以我们也需要调用SetNonBlockOrDie将listensock设置为非阻塞
  • 那么接下来我们就给服务器绑定套接字_port,然后将listensock设置为监听状态,接下来打印日志,创建listensock套接字成功,将listensock的值打印出来,别忘了在调用AddConnection之前我们已经将listensock设置为非阻塞了,然后我们就可以调用AddConnection给文件描述符listensock创建Connection对象,然后将Connection对象添加进_connections中,让内核帮我们关心listensock上的EVENT_IN事件
  • 其中关于这个EVENT_IN我们在文章最开始已经定义过了是uint32_t EVENT_IN = (EPOLLIN | EPOLLET),即以ET模式帮我们关心listensock上的读事件,那么在传入listensock的读回调,写回调,异常回调的时候,我们要使用对于listensock我们只关心listensock的读事件,所以这里我们只需要传入读回调,对于写回调和异常回调我们不需要传入,所以对于写回调和异常回调我们传入nullptr表示listensock的连接对象Connection不需要该回调方法
  • 那么当listensock上的读事件就绪的时候,表示此时客户端来连接服务器,有了Connection对象之后,我们可以对连接的任意进行操作,而_connection中的Connection对象是被shared_ptr所管理的,所以Accepter的参数是shared_ptr<Connection>,我们需要调用连接管理器Accepter将连接从底层tcp的全连接队列中获取上来,得到连接对应的文件描述符sock
  • 但是这个Accepter的TcpServer类内的成员函数,成员函数不能直接作为函数方法进行传参调用,这里需要使用包装器bind,第二点中关于bind,成员函数的讲解,详情请点击<——,所以这里我们使用bind包装器进行传参类内的成员函数Accepter即可,取地址声明是TcpServer的成员函数Accepter,然后传入调用的指针,这里使用this指针充当即可,最后绑定第一个参数即可
  • void Init()
    {
    _listensock_ptr->Socket();
    SetNonBlockOrDie(_listensock_ptr->Fd());
    _listensock_ptr->Bind(_port);
    _listensock_ptr->Listen();
    lg(Info, "create listen socket success, listensock: %d", _listensock_ptr->Fd());

    AddConnection(_listensock_ptr->Fd(), EVENT_IN, \\
    std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
    }

    void Accepter(std::shared_ptr<Connection> connection)
    {}

  • 所以接下来我们就开始编写Accepter,Accepter是连接管理器,所以当Accepter被调用的时候,也就意味着有连接到来了,需要我们使用accept将连接获取上来,那么有可能同一时间有很多的客户端连接服务器,所以也就同时有很多的连接到来
  • 所以也就意味着底层的tcp的全连接队列中有很多连接对象需要我们调用accept将其获取上来关联到文件打开对象中,并且向上返回连接对应的文件描述符sock,既然有很多连接,别忘了我们是在ET模式下运行的epoll,并且listensock之前也被我们设置为非阻塞了,所以我们要循环读取,直到出错才可以将底层的连接全部获取上来
  • 所以此时我们就可以调用accept将连接获取上来,小编调用的accept前面加了::是指定采用系统调用的语法,这里的::并不是错误,那么对于accept的使用其实我们之前已经封装在了Socket.hpp文件的Sock类的Accepter对象中,关于Socket.hpp套接字的讲解,在第三点的TCP服务器的Socket.hpp进行的讲解,详情请点击<——
  • 那么这里我们就直接拿来使用了,所以我们此时调用accept将连接获取上来,accept如果获取连接成功,那么accept的返回值就是连接对应的文件描述符sock,文件描述符sock的本质是文件描述符表的数组下标,数组下标是从0开始的,所以也就意味着如果accept的返回值如果是一个大于等于0的数,那么就表示此时accept获取连接成功
  • 既然获取连接成功,那么我们就可以从peer中得到客户端的端口号port以及ip地址,然后此时我们就可以将客户端的信息通过日志的形式打印出来了,接下来既然经过判断成功的获取到了一个文件描述符sock,那么别忘了我们要实现的是以ET模式运行的epoll版本的TCP服务器实现reactor反应堆,而epoll想要以ET模式运行的前提就是文件描述符sock要被设置为非阻塞,所以这里我们就要调用SetNonBlockOrDie将文件描述符sock设置为非阻塞了
  • 接下来自然的我们就可以调用AddConnection,创建并添加文件描述符sock对应的Connection对象至_connections中,并且让内核帮我们关心文件描述符sock及其事件,那么这里我们是要内核epoll以ET模式关心的是连接上的读事件,所以我们传入EVENT_IN,其中关于这个EVENT_IN我们在文章最开始已经定义过了是uint32_t EVENT_IN = (EPOLLIN | EPOLLET),即以ET模式帮我们关心文件描述sock上的读事件
  • 接下来我们传入文件描述符sock的读回调Recver,写回调Sender,异常回调Excepter,并且由于这三个回调都是TcpServer的成员函数,所以这里同样需要使用bind绑定之后才能作为参数进行传参以及后续的调用,那么最后传入客户端的ip地址以及端口号port即可,但是我们目前讨论是仍然是当accept成功获取到连接并且返回文件描述符sock的情况,如果accept获取失败了,那么就会返回-1表示获取出错,即进入else的逻辑
  • 所以在else中,首先我们可以肯定,本次accept中没有获取到连接对应的文件描述符sock,那么有很多的情况,别忘了以ET模式运行的epoll当文件描述符就绪的时候,需要一直读取,直到出错,并且出错的时候错误码被设置为EWOULDBLOCK,如果错误码是EWOULDBLOCK那么表示此时底层所有的连接已经被我们正常获取完成了,所以此时我们break退出while循环即可
  • 那么如果错误码是EINTR表示正在获取连接的时候被信号等打断,所以底层仍然有连接没有获取完成,所以此时我们continue继续调用accept获取连接即可,那么如果错误码是其它的情况,那么表示此时accept真的获取连接失败了,即accept获取连接真的出错了,所以此时我们同样break退出while循环
  • void Accepter(std::shared_ptr<Connection> connection)
    {
    while(true)
    {
    struct sockaddr_in peer;
    socklen_t len = sizeof(peer);
    int sock = ::accept(connection->SockFd(), (struct sockaddr*)&peer, &len);
    if(sock >= 0)
    {
    uint16_t port = ntohs(peer.sin_port);
    char ip[128];
    inet_ntop(AF_INET, &(peer.sin_addr), ip, sizeof(ip));
    lg(Debug, "get a new client, get info -> [%s:%d], sockfd: %d", ip, port, sock);

    SetNonBlockOrDie(sock);

    AddConnection(sock, EVENT_IN, \\
    std::bind(&TcpServer::Recver, this, std::placeholders::_1), \\
    std::bind(&TcpServer::Sender, this, std::placeholders::_1), \\
    std::bind(&TcpServer::Excepter, this, std::placeholders::_1), \\
    ip, port);
    }
    else
    {
    if(errno == EWOULDBLOCK)
    break;
    else if(errno == EINTR)
    continue;
    else
    break;
    }
    }
    }

    void Recver(std::shared_ptr<Connection> connection)
    {}
    void Sender(std::shared_ptr<Connection> connection)
    {}
    void Excepter(std::shared_ptr<Connection> connection)
    {}

    测试
  • 所以至此我们第一阶段完成,已经将以ET模式运行的epoll版本的TCP服务器实现reactor反应堆的基本框架搭建出来了,并且我们在代码逻辑中使用了连接管理器Accepter作为listensock,那么listensock的读事件就绪的时候,意味着有客户端来进行连接
  • 所以此时我们的代码究竟是否可以运行,当listensock就绪的时候,并且调用读事件的回调去执行Accepter将来自客户端的连接获取上来,在Accepter内部调用AddConnection,创建并添加文件描述符sock对应的Connection对象至_connections中,并且让内核帮我们关心文件描述符sock及其事件
  • 我们期望在PrintConnection中打印的用户层维护的_connections中确实添加了对应的文件描述符,所以此时我们在下图左侧运行服务器,并且在下图右侧使用telnet充当客户端来连接服务器,观察现象 在这里插入图片描述
  • 所以此时最开始我们运行服务器,服务器成功的创建了epoll模型,listen套接字,并且将listensock对应的4号文件描述符及其Connection对象添加到了_connections中,那么接下来小编让右侧的会话一使用telnet充当客户端连接服务器,此时成功的将客户端的连接获取上来,并且将5号文件描述符及其Connection对象添加到添加到了_connections中
  • 同理右侧会话二使用telnet充当客户端也是如此,将6号文件描述符及其Connection对象添加到添加到了_connections中,那么3号文件描述符是什么?是epoll模型对应的文件描述符epfd为3,因为epoll模型比listensock创建的要早,所以epoll模型对应的文件描述符是3号
  • 所以运行结果成功,我们第一阶段完成,已经将以ET模式运行的epoll版本的TCP服务器实现reactor反应堆的基本框架搭建出来了,确保了TcpServer服务器可以正常获取来自客户端的连接
  • 三、第二阶段,引入业务协议

  • 那么在第二阶段,小编会完成TcpServer类的读回调Recver,写回调Sender,异常回调Excepter,然后引入业务协议,那么这个业务,我们选择网络版本计算器的协议,关于网络版本计算器的讲解,详情请点击<——
  • TcpServer

  • 下面我们要开始编写Recver了,其中Recver也叫做事件管理器,当调用Recver的时候意味着文件描述符sock的读事件就绪了,那么在Recver中需要将处于文件描述符sock的tcp的接收缓冲区中将数据读取上来放到文件描述符sock对应的Connection对象中的输入缓冲区_inbuffer中,那么问题来了,那么我们在TcpServer中是否应该关心数据的格式,即读取上来放在_inbuffer中的数据,即数据的各个协议字段,数据是否可以构成一个完整的报文呢?
  • 不应该,因为TcpServer.hpp是用于处理IO的,IO仅仅是负责将数据接收过来放到_inbuffer中,或者将数据发送出去,即TcpServer仅仅是用于处理IO的,对于数据的格式,_inbuffer中的数据是否可以构成一个完整的报文这些不是TcpServer要关心的,那么谁来关心呢?
  • 所以此时我们就要引入用户层的业务了,业务是网络版本计算器,那么在TcpServer内部我们也期望有一个私有成员变量_OnMessage可以直接回调上层的业务处理方法,即在TcpServer内部调用_OnMessage的时候,意味着告诉上层,你要我接收到数据我已经接受完了,我的工作已经完成了,下面的工作就该让你上层去处理信息了,检测_inbuffer是否可以构成一个或多个完整的报文,如果可以,那么就直接处理,如果不能构成,那么就直接返回 在这里插入图片描述
  • 那么既然涉及到了协议的定义,所以这里我们就引入当初在网络版本计算器那里定制的协议,关于网络版本计算器的讲解,详情请点击<——,序列化,反序列化,解码,转码的协议在Protocol.hpp中,客户端的协议在ClientCal.cc中,以及服务器的协议在Servercal.hpp,那么我们统统拷贝到当前目录下即可,所以此时我们就可以引入业务协议了,以及给TcpServer引入一个业务处理的私有成员变量的回调函数_OnMessage了
  • 所以此时我们给TcpServer新增一个类型为func_t类型的私有成员变量_OnMessage,可是这里为什么_OnMessage的类型是func_t呢?因为这个类型为using func_t = std::function<void(std::shared_ptr<Connection>)>,即可以传递Connection对象,那么有了Connection对象,其中的无论是想要在_inbuffer中获取或者新增还是删除数据都可以做到,并且同样对_outbuffer也是如此,所以这里的回调_OnMessage的类型一定要是func_t,那么为什么要对_inbuffer或者_outbuffer操作呢?
  • 以_inbuffer为例,因为上层协议要的数据就绪了,TcpServer已经帮上层将数据接收到了文件描述符sock对应的Connection对象的_inbuffer中了,那么上层已经定制了协议,知道如何数据是否可以构成一个或者多个完整的报文,知道如何对报文进行处理,解码,反序列化,序列化,转码等操作,所以上层我们这里必须要对Connection对象中的_inbuffer或者_outbuffer进行操作
  • TcpServer此时已经新增了类型为func_t的私有成员变量_OnMessage,并且期望上层Main.cc的main函数在创建TcpServer对象的时候就传入一个回调函数可以供TcpServer在下层回调使用,所以此时我们就在TcpServer的构造函数新增一个func_t的参数OnMessage用于接收上层的回调,并且接收完成使用OnMessage赋值初始化TcpServer的私有成员变量_OnMessage
  • class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
    {
    static const int num = 64;
    public:
    TcpServer(uint16_t port, func_t OnMessage)
    : _quit(true)
    , _port(port)
    , _listensock_ptr(new Sock())
    , _epoller_ptr(new Epoller())
    , _OnMessage(OnMessage)
    {}
    private:
    bool _quit;
    uint16_t _port;
    std::shared_ptr<Sock> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
    struct epoll_event _revs[num];
    // 让上层处理信息
    func_t _OnMessage;
    };

    Main.cc

  • 那么接下来在Main.cc文件中我们包好服务器端的上层协议的头文件#include "ServerCal.hpp"用于处理业务,接下来对于处理业务的类ServerCal我们定义一个全局对象calculator,那么接下来我们就开始编写业务处理的函数DefaultOnMessage,那么参数为shared_ptr<Connection>类型的connection
  • 那么接下来我们打印一下Connection对象中的_inbuffer的内容,然后传参connection->InBuffer(),那么InBuffer的返回值是_inbuffer的引用,即传参inbuffer调用calculator中的方法Calculator去处理_inbuffer,由于是传入的_inbuffer的引用,所以在Calculator中是可以对这个_inbuffer的内容进行获取处理的
  • 如果在Calculator经过判断发现_inbuffer的数据不能构成一个完整报文此时无法进行处理,那么就返回""给response_str ,即返回空字符串所以此时我们对response_str进行判断,如果response_str为空,那么代表此时_inbuffer的数据不能构成一个完整的报文,所以此时无法进行处理报文,所以此时直接返回即可
  • 那么如果Calculator经过判断发现此时_inbuffer的数据可以构成一个或者多个完整的报文,那么此时就可以直接进行处理,即进行解码,反序列化,然后进行业务的处理,开始构建响应报文,进行序列化,转码,将转码后的响应报文返回给上层,所以此时response_str就可以拿到响应报文,那么此时我们使用日志进行打印一下即可
  • 接下来我们将响应报文放到Connection对象的_outbuffer中,这时候,我们之前在Connection类中封装的类型为weak_ptr<TcpServer>的对象_tcp_server_ptr就可以派上用场了,别忘了管理这个TcpServer的智能指针是weak_ptr,那么weak_ptr是不支持直接使用它所管理的对象中的方法的,我们应该先调用weak_ptr的lock方法,将weak_ptr暂时强提升为shared_ptr暂时增加计数,当生命周期结束的时候会减少计数
  • 所以这里调用lock将类型转化为shared_ptr<TcpServer>,即使用shared_ptr进行管理TcpServer,那么shared_ptr是可以支持使用所管理对象的方法的,所以此时我们就可以传入Connection对象调用TcpServer的Sender方法将Connection对象中_outbuffer的响应报文对应的数据发送出去,那么此时转化后的shared_ptr<TcpServer>对象的生命周期结束了,所以此时会减少计数,不会出现问题
  • 接下来我们只需要在main函数创建TcpServer对象的时候,除了传入端口号port,再额外传入业务处理的默认回调函数DefaultOnMessage,所以底层的TcpServer构造结束之后,就可以获得这个上层的业务处理的回调函数,那么就可以在Recver接收数据结束后调用业务处理回调函数DefaultOnMessage通知数据已经接收好了,那么你上层进行业务的处理吧
  • #include <iostream>
    #include <memory>
    #include "Log.hpp"
    #include "TcpServer.hpp"
    #include "ServerCal.hpp"

    ServerCal calculator;

    void DefaultOnMessage(std::shared_ptr<Connection> connection)
    {
    std::cout << "上层得到了数据: " << connection->InBuffer() << std::endl;
    std::string response_str = calculator.Calculator(connection->InBuffer());
    if(response_str.empty())
    return;
    lg(Debug, "%s", response_str.c_str());

    connection->AppendOutBuffer(response_str);

    auto tcp_server_ptr = connection->_tcp_server_ptr.lock();
    tcp_server_ptr->Sender(connection);
    }

    int main()
    {
    std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080, DefaultOnMessage));

    epoll_svr->Init();
    epoll_svr->Loop();

    return 0;
    }

    TcpServer

  • 那么经过上面将业务协议的引入,TcpServer中有了通知上层处理业务的回调函数_OnMessage,所以此时我们终于可以来编写Recver事件处理器了,Recver不关心数据的格式,只负责将数据读取上来,并且通知上层即可,所以首先我们来保存一下文件描述符sock,接下来我们定义一个缓冲区buffer,关于缓冲区的大小为g_buffer_size即128,我们已经在Connection部分进行讲解了static const int g_buffer_size = 128,接下来就是循环读取,直到出错
  • 所以此时我们每次读取前,先使用memset将缓冲区的数据全部清零,接下来使用recv读取文件描述符sock底层tcp的接收缓冲区的数据,观察,虽然这里小编给recv设置的标志位是0表示阻塞读取,但是别忘了早在Accepter中的文件描述符sock获取上来的时候,我们就已经使用SetNonBlockOrDie将文件描述符sock设置为非阻塞读取了,所以此时recv的读取方式是非阻塞读取数据
  • 那么recv的返回值n表示读取到的字节数,如果n大于0表示此时成功的读取到了数据,那么此时按照我们以前的习惯就是将缓冲区buffer的n位置的值置为0,但是这里不需要了,因为memset已经帮我们做了这个事情, 就算缓冲区buffer读满了,别忘了当初我们给定recv的buffer的大小是-1的,即预留了最后一个位置用于放置’\\0’,由于memset的作用,那么最后一个位置为0,0即对应’\\0’符合字符串以’\\0’结尾,所以此时我们就可以大胆的将buffer放到Connection对象的输入缓冲区_inbuffer中
  • 那么如果recv的返回值等于0,表示客户端已经将连接关闭了,那么我服务器也要关闭连接退,所以此时我们打印日志,然后调用Connection对象中的异常处理_except_cb即可,这里特别注意异常处理返回之后,此时连接已经被关闭了,我们不能对连接在进行其它的任何操作了,即意味着我们不能让执行流继续向后执行了,所以此时我们return返回即可
  • 那么如果recv的返回值小于0,那么表示此时返回值为-1,已经出错了,所以此时的情况恰好符合我们的预期,循环读取,直到出错,并且出错的时候错误码被设置为EWOULDBLOCK,所以此时我们需要对错误码errno进行判断,如果错误码是EWOULDBLOCK那么表示此时底层所有的数据已经被我们正常读取完成了,所以此时我们break退出while循环即可
  • 那么如果错误码是EINTR表示正在获取连接的时候被信号等打断,所以底层仍然有数据没有读取完成,所以此时我们continue继续调用recv读取数据即可,那么如果错误码是其它的情况,那么表示此时recv真的读取数据失败了,所以此时也就意味着出现了错误,那么我们就调用Connection对象中的异常处理_except_cb即可,这里特别注意异常处理返回之后,此时连接已经被关闭了,我们不能对连接在进行其它的任何操作了,即意味着我们不能让执行流继续向后执行了,所以此时我们return返回即可
  • 所以走到while循环外面,表示此时文件描述符sock底层的tcp的接收缓冲区的数据已经全部被我们读取上来了,读取到了Connection对象的_inbuffer中,即此时Connection对象的_inbuffer中已经有数据了,那么我们此时就可以调用处理业务的回调函数_OnMessage,通知上层数据已经有了,告诉上层快来处理业务吧
  • 所以此时上层就开始执行业务处理函数,首先进行检测,如果Connection对象的_inbuffer数据如果不能构成一个完整的报文,那么就直接返回,如果可以构成一个或者多个完整的报文,那么就进行处理,并且将处理的结果对应的响应报文通过TcpServer的Sender发向客户端
  • void Recver(std::shared_ptr<Connection> connection)
    {
    int sock = connection->SockFd();
    char buffer[g_buffer_size];
    while(true)
    {
    memset(buffer, 0, sizeof(buffer));
    ssize_t n = recv(sock, buffer, sizeof(buffer) 1, 0);
    if(n > 0)
    {
    // buffer[n] = 0; 由于memset已经置为了0,所以这里不需要这个操作了
    connection->AppendInBuffer(buffer);
    }
    else if(n == 0)
    {
    lg(Info, "sockfd: %d, client info -> %s:%d quit…", sock, \\
    connection->_ip.c_str(), connection->_port);

    connection->_except_cb(connection);
    return;
    }
    else
    {
    if(errno == EWOULDBLOCK)
    break;
    else if(errno == EINTR)
    continue;
    else
    {
    lg(Warning, "sockfd: %d, client info -> %s:%d recv error…", sock, \\
    connection->_ip.c_str(), connection->_port);

    connection->_except_cb(connection);
    return;
    }
    }
    }
    // 调用回调函数将数据交付上层处理
    _OnMessage(connection);
    }

  • 所以接下来我们就开始进入Sender的编写,但是我们还需要学习一点前置的知识之后才能进行Sender的编写,所以前置知识是什么呢?如下
  • 诸如epoll/poll/select,因为文件描述符sock上的写事件是经常就绪的,即判断写事件是否就绪那么就要看底层tcp的发送缓冲区是否有空间,事实是底层tcp的发送缓冲区是经常是有空间可供我们发送的,所以文件描述符sock上的写事件也是经常就绪的
  • 所以如果我们设置对文件描述符sock的event事件中的EPOLLOUT读事件关心,那么EPOLLOUT几乎每次都会就绪,所以也就会导致epoll_wait会经常会返回,而我们真正要让epoll关心的是读事件的就绪状态,你写事件由于常常就绪而经常返回,所以此时就会浪费CPU资源
  • 所以此时我们就可以得出结论了,对于读事件,我们需要设置常关心,那么对于写事件,我们需要按需设置,那么如何理解按需设置,我们又如何处理写事件呢?如下
  • 直接将进行写入tcp的发送缓冲区,如果写入完成那么就不需要我们操心了,至于数据如何发,什么时候发,出错了怎么办,有内核tcp自主决定,那么当我们写入完成,此时Connection对象的_outbuffer缓冲区中没有数据了,那么就结束,但是如果写入完成,但是还有数据没有写完,即Connection对象的_outbuffer中还有数据,所以此时我们就需要设置对写事件关心了,如果数据写完了,那么就去除对写事件的关心,那么如果要写的_outbuffer的数据很多,那么我们是否害怕呢?
  • 不怕,因为即使数据很多,那么当写事件就绪的时候,就会继续调用Sender尝试写入数据,此时底层有空间又被写满,那么别忘了当将数据写入底层的时候,此时Connetcion对象的_outbuffer中的数据又减少了一点,随着写事件再次就绪又会继续发送,Connetcion对象的_outbuffer中的数据又会减少了一点,所以积少成多,我们总可以将Connetcion对象的_outbuffer中的数据全部发送出去
  • 所以此时我们就可以来编写Sender了,但是还有一个函数需要我们编写,我们需要在Sender中开启对写事件的关心,关闭对写事件的关心,所以我们需要有一个函数EnableEvent对于文件描述符sock开启或者关闭对事件的关心,所以此时我们来编写一下EnableEvent
  • 那么对于EnableEvent其实有三个参数,那么第一个参数是哪个文件描述符sock,第二个参数是是否对读事件关心readable,如果要对读事件关心,那么设置为true,否则为false,第三个参数是是否对写事件关心writeable,如果要对写事件关心,那么设置为true,否则为false
  • 接下来我们定义一个事件events默认为0,即表示什么事件都不关心,如果想要关心某个事件那么只需要将要关心的事件和events进行按位与即可,所以此时我们就可以依次进行判断了,如果readable为true那么就表示要对读事件关心那么我们就按位与EPOLLIN,否则就是不关心,那么按位与0
  • 如果writeable为true那么就表示要对写事件关心那么我们就按位与EPOLLOUT,否则就是不关心,那么按位与0,我们要实现的是以ET模式运行的epoll版本的TCP服务器实现reactor反应堆,所以无论什么文件描述符sock都要设置ET模式,所以我们再按位与EPOLLET即可
  • 接下来我们有了要对什么文件描述符sock进行设置,有了要设置对什么事件关心的events,所以接下来我们就可以调用Epoller中封装的EpollerUpdate然后使用EPOLL_CTL_MOD修改设置对文件描述符sock中的事件events关心即可
  • void EnableEvent(int sock, bool readable, bool writeable)
    {
    uint32_t events = 0;
    events |= ((readable == true ? EPOLLIN : 0) | \\
    (writeable == true ? EPOLLOUT : 0) | EPOLLET);

    _epoller_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);
    }

  • 接下来我们终于可以开始编写Sender了,所以首先我们来保存一下文件描述符sock,然后使用outbuffer引用Connection对象中的_outbuffer,然后发送的时候也要一直发送(这里的发送本质还是调用send将数据拷贝到sock底层的tcp的发送缓冲区中,由内核决定什么时候发,发多少,出错了怎么办),即循环发送,直到出错
  • 所以此时我们就可以调用send期望将缓冲区的数据全部发送出去,虽然这里的参数为0阻塞式发送,但是要知道当初文件描述符sock之前在Accetper中一获取上来就被我们设置为非阻塞了,所以这里虽然参数是0阻塞式发送,但是实际上send发送的时候是非阻塞发送的
  • 那么返回值n表示发送的字节数,那么当send的返回值n大于0的时候,表示此时成功的将数据拷贝到sock底层的tcp的发送缓冲区中,那么此时拷贝了n个字节的数据,所以对于Sender来讲,此时outbuffer的n个字节Sender认为已经发送出去交给内核了,所以此时我们就可以将outbufer的前n个字节的数据erase删除掉,但是outbuffer中的数据是否被拷贝完呢?
  • 即我们实际想发送的数据的大小是1024字节,但是sock底层的tcp的发送缓冲区中只有512字节的空间了,所以这种情况下有可能outbuffer的数据没有发完,所以此时我们要对outbuffer的数据进行判断,如果outbufer为空,那么就直接break退出while循环,如果outbuffer不为空,那么就继续发送
  • 然后继续发送,但是这里继续发送之后,那么由于底层的tcp的发送缓冲区的空间已经被我们写满了,所以就会造成出错返回值为-1,同时错误码被设置为EWOULDBLOCK,那么我们也break退出循环即可
  • 那么如果send的返回值n等于0,首先返回值n为0不小于0表示没有出错,但是send的返回值n等于0,表示本次写入了0个数据,那么如果底层tcp的接收缓冲区没有空间了,那么就会出错,并且返回值为-1,错误码被设置为EWOULDBLOCK,但是此时的返回值不是-1,返回值是0,那么只有一种情况,那么就是outbuffer由于一些情况,里面并没有任何数据就调用了Sender,那么outbuffer没有任何数据,所以此时表示没有数据需要发送,所以此时我们return返回即可
  • 那么如果send的返回值n小于0,那么表示此时n为-1已经出错了,已经出错了,所以此时的情况恰好符合我们的预期,循环发送,直到出错,并且出错的时候错误码被设置为EWOULDBLOCK,所以此时我们需要对错误码errno进行判断,如果错误码是EWOULDBLOCK那么表示此时sock底层tcp的发送缓冲区已经没有空间了,此时即使outbuffer有数据但是我们已经无法进行写入了,所以此时我们break退出while循环即可
  • 那么如果错误码是EINTR表示正在send发送写入的时候被信号等打断,所以此时outbuffer仍然有数据要发送,并且底层的tcp的发送缓冲区中仍然有空间可以进行写入,所以此时我们continue继续调用send发送数据即可
  • 那么如果错误码是其它的情况,那么表示此时send真的发送数据失败了,所以此时也就意味着出现了错误,那么我们就调用Connection对象中的异常处理_except_cb即可,这里特别注意异常处理返回之后,此时连接已经被关闭了,我们不能对连接在进行其它的任何操作了,即意味着我们不能让执行流继续向后执行了,所以此时我们return返回即可
  • 那么走到下面一定是break跳出循环了,所以此时我们就可以进行判断了,如果outbuffer不为空,说明是n小于0,错误码被设置为EWOULDBLCK,文件描述符sock底层的tcp的发送缓冲区中没有空间了,所以此时我们应该使用EnableEvent设置对写事件的关心,当然读事件我们要常关心的
  • 那么如果outbuffer不为空不成立,即进入else的情况表示outbuffer为空,所以此时是n大于0,并且成功的写入完成了outbuffer的所有数据然后break跳出循环的,所以此时我们应该使用EnableEvent设置取消对写事件的关心,当然读事件我们要常关心的
  • void Sender(std::shared_ptr<Connection> connection)
    {
    int sock = connection->SockFd();
    std::string& outbuffer = connection->OutBuffer();
    while(true)
    {
    ssize_t n = send(sock, outbuffer.c_str(), outbuffer.size(), 0);
    if(n > 0)
    {
    outbuffer.erase(0, n);
    if(outbuffer.empty())
    break;
    }
    else if(n == 0)
    {
    return;
    }
    else
    {
    if(errno == EWOULDBLOCK)
    break;
    else if(errno == EINTR)
    continue;
    else
    {
    lg(Warning, "sockfd: %d, client info -> %s:%d send error…", sock, \\
    connection->_ip.c_str(), connection->_port);

    connection->_except_cb(connection);
    return;
    }
    }
    }

    if(!outbuffer.empty())
    {
    // 开始对写事件的关心
    EnableEvent(sock, true, true);
    }
    else
    {
    // 关闭对写事件的关心
    EnableEvent(sock, true, false);
    }
    }

  • 接下来我们编写Excepter,那么首先我们保存一下要处理的异常文件描述符fd,然后打印日志要处理哪一个文件描述符,对应的客户端的信息也一并打印,那么首先我们就是要在内核epoll模型的红黑树rb_tree中移除对特定fd的关心,所以这里我们就调用Epoller中封装的EpollerUpdate即可,所以此时进行移除操作EPOLL_CTL_DEL,然后要操作的文件描述符是fd,那么由于是移除,所以events事件不需要填写,默认为0即可
  • 接下来使用close关闭异常的文件描述符,所以此时也打印日志,关闭了对应的文件描述符,接下来从unordered_map中移除文件描述符sock及其Connection对象,所以此时调用unordered_map的erase方法,将文件描述符fd对应的Connection对象在unordered_map对应的对象_connections中移除即可,接下来我们打印日志移除了对应的文件描述符及其对象
  • void Excepter(std::shared_ptr<Connection> connection)
    {
    int fd = connection->SockFd();

    lg(Warning, "Excepter handler socket: %d, client info -> %s:%d, excepter handler", \\
    fd, connection->_ip.c_str(), connection->_port);
    // 1. 在内核epoll模型的红黑树rb_tree中移除对特定fd的关心
    _epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
    // 2. close关闭异常的文件描述符
    lg(Debug, "close %d done…", fd);
    close(fd);
    // 3. 从unordered_map中移除文件描述符sock及其Connection对象
    lg(Debug, "remove %d from _connections…", fd);
    _connections.erase(fd);
    }

  • 所以貌似我们已经编写完成了第二阶段,即然后引入业务协议,完成TcpServer类的读回调Recver,写回调Sender,异常回调Excepter,下面我们来使用如下的makefile尝试编译一下代码
  • reactor_server:Main.cc
    g++ -o $@ $^ -std=c++11
    .PHONY:clean
    clean:
    rm -f reactor_server

    运行结果如下 在这里插入图片描述

  • 所以此时编译报错,我们一看上面的报错,Json,当初我们在上层的业务协议中针对序列化和反序列化使用了Json,所以对于Json我们还需要学习一下Json的使用,所以我们来看小编之前文章的讲解关于Json的使用的讲解在第二点,详情请点击<——
  • 所以经过上面的讲解,我们知道了发生了连接错误,jsoncpp是一个第三方库,编译器并不知道要使用Json需要要哪一个库,所以这里我们需要在编译的时候需要使用-l选项告诉g++编译器,要去链接jsoncpp这个库文件,即编译的时候添加 -ljsoncpp
  • 还没有完成,上述我们不仅仅引入了服务端,连并客户端我们也引入了,所以对于客户端ClientCal.cc我们也要进行编译形成可执行,所以我们最终编写出来的makefile如下
  • .PHONY:all
    all:reactor_client reactor_server

    reactor_client:ClientCal.cc
    g++ -o $@ $^ -std=c++11 -ljsoncpp
    reactor_server:Main.cc
    g++ -o $@ $^ -std=c++11 -ljsoncpp

    .PHONY:clean
    clean:
    rm -f reactor_client reactor_server

    所以此时我们重新进行编译,成功 在这里插入图片描述

    测试
  • 所以成功编译好了服务器和客户端,那么此时我们就可以尝试运行一下了,那么下图左侧会话充当服务器,右侧会话充当客户端
  • 运行结果如下 在这里插入图片描述

  • 网络版本计算器讲解,详情请点击<——,所以运行结果如上,客户端构建请求,进行序列化,转码构建请求报文,发送给服务器,服务器接收到请求报文,然后解码,反序列化得到计算任务,然后服务器构建响应,进行序列化,转码构建响应报文,发送给客户端,以此重复五次,所以结果如上无误
  • 并且当这五次请求与响应完成之后,右侧的客户端就会退出,我们也观察到了左侧服务器会处理客户端的退出,在内核epoll的红黑树rb_tree中移除文件描述符sock对应的连接,关闭文件描述符sock,将文件描述符sock及其Connection对象在unordered_map的对象_connections中移除,没有问题,完美
  • 至此我们的以ET模式运行的epoll版本的TCP服务器实现reactor反应堆完结,可是还没有真正的完结,因为小编还要讲解一下拓展,如下
  • 四、拓展

  • 其实我们的以ET模式运行的epoll版本的TCP服务器实现reactor反应堆还并不是很完善,其实我们的代码中还可以从单线程调整为多线程,多线程之后还可以将线程池进行接入,关于线程池的讲解,详情请点击<——
  • 主要的逻辑是,主线程是一个reactor,线程池中的新线程有多个,每一个新线程也同样一个reactor,主线程仅仅负责将客户端的连接获取上来,将连接对应的文件描述符fd放到一个vector数组中,此时主线程每放置一个文件描述符fd那么就去线程池中唤醒一个新线程,然后让新线程从vector数组中获取连接对应的文件描述符fd进行对应的关心,提供服务
  • 所以这样随着连接的越来越多,主线程的reactor仍然是负责将连接获取上来,然后线程池中的新线程上均匀分配的这些到来的连接对应的文件描述符fd,那么一旦这些文件描述符fd上的事件就绪,那么对应的新线程reactor就会告知上层文件描述符fd就绪了,那么此时新线程对应的上层业务协议中就会进行处理提供服务
  • 其实引入线程池还不够,我们还可以进行链接管理,给每一个客户端对应的链接设置一个限定时间,如果客户端长时间不活跃,并且超过了这个限定的时间,那么我们就将客户端的链接释放掉
  • 那么我们知道服务器将来要面对的来自客户端的链接是很多的,所以服务器要不要将这些链接管理起来呢?要,那么如何管理?先描述,再组织,那么引入定时器这个类描述客户端的链接,其中包含例如,连接对应的文件描述符fd是哪一个,连接对应的客户端的信息有哪些,ip地址,端口号port,限定时间是到什么时候结束等,所以此时使用定时器我们就将器描述起来了,那么使用什么数据结构将链接组织起来呢?
  • 使用一个最小堆即可,因为我们给每一个来自客户端的链接都引入了一个计时器这个类,其中就有这个客户端的限定时间,时间可是有大小的,所以使用最小堆我们将所有的链接组织起来,这个限定时间是一个时间戳,那么使用当前的时间戳减去限定时间的时间戳可以得出一个时间,这个时间是该链接的过期时间,如果客户端长时间不活跃,并且超过了这个限定的时间,那么我们就将客户端的链接释放掉
  • 那么这个过期时间小的我们就放在最小堆的堆顶,别忘了时间是有大小的,可以进行比较的,由于是最小堆,堆顶的过期时间最小,堆底的过期时间最大,所以只要堆顶的过期时间不到,那么堆底的全部的连接一定不过期,所以每次我们要进行链接管理只需要看最小堆的堆顶的过期时间即可
  • void Loop()
    {
    _quit = false;

    while(!_quit)
    {
    Dispatcher(1);
    PrintConnection();
    }

    _quit = true;
    }

  • 所以此时我们再来看主线程中的Loop,那么如果引入链接管理,那么我们给Dispatcher的时间就不应该是-1了,而是应该是过期时间,根据堆顶的链接的过期时间进行动态调整,当堆顶的过期时间到了,并且如果客户端长时间不活跃,并且超过了这个限定的事件,那么我们就将客户端的链接释放掉,然后再从最小堆中选出去除堆顶后的过期时间最小的对象即可
  • 所以下面我们来讲解一下reactor的理论,reactor如果要真正实现其实是一个半同步半异步模型,而小编本文实现的以ET模式运行的epoll版本的TCP服务器实现reactor反应堆,其实是一个同步模型,因为IO = 等待 + 拷贝,我们当前只有一个进程进程中只有一个主线程
  • 所以主线程负责等待fd上的事件就绪,并且还要负责进行数据的拷贝,所以严格来讲我们当前实现的reactor是同步模型,那么如何理解真正的reactor对应的半同步半异步模型呢?
  • 首先就要引入多线程,引入线程池,站在主线程的角度,主线程只负责等待,那么当获取到连接之后,将连接通过线程池交给多个新线程去做,IO = 等待 + 拷贝,主线程只进行了等待,所以对于主线程来讲是半同步,那么新线程获取到了连接对应的文件描述符fd,如果连接对应的文件描述符fd就绪了,那么去进行提供服务的,去进行拷贝的就不是主线程来做了,而是要新线程去做,所以主线程并没有参与提供服务,进行拷贝的工作,所以对于主线程来讲,针对提供服务,进行拷贝,主线程是半异步的
  • 所以我们才说主线程的reactor反应堆是一个半同步半异步模型,reactor又叫做反应堆,那么我们该如何理解这里的反应堆呢?那么这里小编引入一个打地鼠游戏帮助大家进行理解,如下 在这里插入图片描述
  • 那么reactor反应堆其实就像打地鼠一样,那么有很多的地洞,每一个地洞中都有可能出现地鼠,规定一个玩家只能拿一个锤子,如果玩家只有一个,那么如果地鼠冒出来只能是一个锤子来砸地鼠 在这里插入图片描述
  • 那么如果此时有多个玩家,那么即使一个玩家只能持有一个锤子,那么此时锤子也能有很多,所以此时地鼠一旦从地洞里面冒出来,立即就会有锤子可以砸地鼠,如果同时有三个地鼠冒出来,那么一瞬间三个锤子就会砸地鼠
  • 所以地洞就相当于客户端和服务器的链接,而地鼠就相当于客户端的请求,地鼠什么时候冒出来我们不知道,同样的客户端什么时候请求服务器的服务我们也不知道,很类似,所以如果是我们的进程中只有一个主线程,所以只能是同步模型,只能有主线程一个锤子砸地鼠,即只能一个主线程处理服务
  • 那么如果我们的进程中不只有主线程,那么引入了线程池,有多个新线程,此时的模型就是半同步半异步模型,那么这多个新线程的效率就比较高了,每一个新线程都有一个锤子可以砸地鼠,所以每一个新线程都可以提供服务,即使同一时间有很多个客户端的请求来了,那么多个新线程处理提供服务就是了
  • 所以如果这个砸地鼠是一个游戏,可以进行网络的远端联机,那么要求多人协作共同砸地鼠,所以游戏就会通知玩家一关心1,2,3洞口,玩家二关心4,5,6洞口,玩家三关心7,8,9洞口,如果玩家关心的洞口有地鼠出现了,请玩家操纵锤子去砸地鼠
  • 那么类比一下,在半同步半异步的reactor的反应堆中,主线程的作用就仅仅负责获得连接对应的文件描述符fd,然后主线程将连接对应的文件描述符fd通过线程池交给新线程,新线程负责连接对应的文件描述符fd,如果文件描述符fd上的事件就绪了,那么新线程就要处理fd上的事件,提供服务,别忘了一个玩家可能负责多个洞口,即一个新线程也可能要负责多个连接对应的文件描述符fd
  • 所以在半同步半异步的reactor的反应堆中,主线程作为reactor只负责将连接对应的文件描述符fd进行获取,多个新线程也作为reactor负责对各自关心的多个文件描述符fd上的事件,如果事件就绪,那么就为客户端提供服务
  • 所以至此我们的以ET模式运行的epoll版本的TCP服务器实现reactor反应堆才算真正的完结
  • 五、写博客一年的总结

  • 所以至此我们的以ET模式运行的epoll版本的TCP服务器实现reactor反应堆才算真正的完结 在这里插入图片描述
  • 那么写到这里其实小编已经不知不觉写了4.4万字,挺多的,也算是小编写过的最长的文章了,那么本文的以ET模式运行的epoll版本的TCP服务器实现reactor反应堆的讲解也算结束了
  • 而reactor反应堆的实现也是小编讲解的计算机网络的最后一篇文章,也意味着计算机网络模块小编也已经带领着各位读者友友学习完了,所以至此linux也就完结了,还是挺有感慨的,真正意义上的第一篇博客,小编从最初的开始写真正意义上的第一篇博客到现在已经一年多了,具体点是380天 在这里插入图片描述
  • 算上当前这一篇,小编输出了153篇文章,很难想象,我已经码了100多万字了,其实当初小编也没有想过能将c++,linux系统,linux网络模块的知识输出在博客上,或许当初对于小编来讲,很远,可是380天后的今天2026年2月3日,小编已经做到了,回头看,那153篇文章就在那里,静静的,不管是伟大还是渺小,就在那里
  • 在编写的过程中,或许给小编带来成就,或许给小编带来痛苦,但是小编都已经坚持下来了,也算坎坷,其实小编很喜欢看蛊真人,其中的方源带给了小编很多坚持下去的动力,至此小编图谋的重铸自身根基也算是结束了
  • 这里小编还是想加入一下方源,吟诗一首 落魄谷中寒风吹,春秋蝉鸣少年归。 荡魂山处石人泪,定仙游走魔向北。 逆流河上万仙退,爱情不敌坚持泪。 宿命天成命中败。仙尊悔而我不悔。 早岁已知世事艰,仍许飞鸿荡云间。 一路寒风身如絮,命海沉浮客独行。 千磨万击心铸铁,殚精竭虑铸一剑。 今朝剑指叠云处,练蛊练人还练天。 独帜入渊深未知,身似浮萍命难持。 千州皆朝归海处,一苇青拨戏浪巅。 惊鸿四散鱼逃尽,唯有残帆傲此间。 待到天开云雾散,负手直望笑苍天! 我清楚的知道,人与人的路都是不可复制的,我走在我自己的人生路上,哪怕路途的风雨太大,大到我步履维艰,哪怕荆棘丛生,刺得我伤痕遍布,我也仍旧痴痴笑笑,我体会此中滋味,呵呵呵,我相信独有的小船终有一天会看到两岸群山青翠虫鸟同鸣……
  • 我们都是小人物,一定要坚持,一定要不断的前行,我们都走在自己的人生路上,小编目前阶段的博客之路已经完结,但是小编的技术之路才刚刚开始,这里小编期望大家包括小编自己都可以淌过自己心中的那条逆流河
  • 六、源代码

    ClientCal.cc

    #include <iostream>
    #include <string>
    #include <ctime>
    #include <cassert>
    #include <unistd.h>
    #include "Socket.hpp"
    #include "Protocol.hpp"

    static void Usage(const std::string& proc)
    {
    std::cout << "\\n\\t" << proc << " serverip serverport" << std::endl << std::endl;
    }

    // ./clientcal serverip serverport
    int main(int argc, char* argv[])
    {
    if(argc != 3)
    {
    Usage(argv[0]);
    exit(0);
    }

    std::string serverip = argv[1];
    uint16_t serverport = std::stoi(argv[2]);

    Sock sockfd;
    sockfd.Socket();
    bool r = sockfd.Connect(serverip, serverport);
    if(!r)
    return 1;

    srand(time(nullptr));
    std::string opers = "+-*/%~?@=";
    int cnt = 1;

    std::string inbuffer_stream;
    while(cnt <= 5)
    {
    std::cout << "————第" << cnt << "次测试————-" << std::endl;
    int x = rand() % 10 + 1;
    usleep(123);
    int y = rand() % 10;
    usleep(654);
    char oper = opers[rand() % opers.size()];

    Request req(x, y, oper);
    req.DebugPrint();

    std::string package;
    req.Serialize(&package);
    package = Encode(package);

    write(sockfd.Fd(), package.c_str(), package.size());
    // std::cout << "这是最新的发送出去的请求" << std::endl << package;
    // write(sockfd.Fd(), package.c_str(), package.size());
    // std::cout << "这是最新的发送出去的请求" << std::endl << package;
    // write(sockfd.Fd(), package.c_str(), package.size());
    // std::cout << "这是最新的发送出去的请求" << std::endl << package;

    char buffer[128];
    ssize_t n = read(sockfd.Fd(), buffer, sizeof(buffer));
    if(n > 0)
    {
    buffer[n] = 0;
    inbuffer_stream += buffer;
    std::cout << inbuffer_stream << std::endl;

    std::string content;
    bool r = Decode(inbuffer_stream, &content);
    assert(r);

    Response resp;
    r = resp.Deserialize(content);
    assert(r);

    resp.DebugPrint();
    }
    else if(n == 0)
    break;
    else
    break;

    std::cout << "———————————-" << std::endl;

    sleep(2);
    cnt++;
    }

    sockfd.Close();

    return 0;
    }

    Comm.hpp

    #pragma once

    #include <cstdlib>
    #include <unistd.h>
    #include <fcntl.h>
    #include "Socket.hpp"

    void SetNonBlockOrDie(int sock)
    {
    int fl = fcntl(sock, F_GETFL);
    if(fl < 0)
    exit(NON_BLOCK_ERR);

    fcntl(sock, F_SETFL, fl | O_NONBLOCK);
    }

    Epoller.hpp

    #include <iostream>
    #include <cstring>
    #include <unistd.h>
    #include <sys/epoll.h>
    #include "Log.hpp"
    #include "nocopy.hpp"

    class Epoller : public nocopy
    {
    static const int size = 128;

    public:
    Epoller()
    {
    _epfd = epoll_create(size);
    if(_epfd == 1)
    {
    lg(Error, "epoll_create error: %s", strerror(errno));
    }
    else
    {
    lg(Info, "epoller_create success, epfd: %d", _epfd);
    }
    }

    int EpollerWait(struct epoll_event revents[], int num, int timeout)
    {
    // int n = epoll_wait(_epfd, revents, num, _timeout);
    // int n = epoll_wait(_epfd, revents, num, 0);
    // int n = epoll_wait(_epfd, revents, num, -1);
    int n = epoll_wait(_epfd, revents, num, timeout);

    return n;
    }

    int EpollerUpdate(int oper, int sock, uint32_t event)
    {
    int n = 0;
    if(oper == EPOLL_CTL_DEL)
    {
    n = epoll_ctl(_epfd, oper, sock, nullptr);
    if(n == 1)
    {
    lg(Error, "epoll_ctl delete error");
    }
    }
    else
    {
    // EPOLL_CTL_ADD || EPOLL_CTL_MOD
    struct epoll_event ev;
    ev.data.fd = sock;
    ev.events = event;

    n = epoll_ctl(_epfd, oper, sock, &ev);
    if(n == 1)
    {
    lg(Error, "epoll_ctl error");
    }
    }

    return n;
    }

    ~Epoller()
    {
    if(_epfd >= 0)
    {
    close(_epfd);
    }
    }

    private:
    int _epfd;
    // int _timeout{3000};
    };

    Log.hpp

    #pragma once

    #include <iostream>
    #include <string>
    #include <ctime>
    #include <cstdio>
    #include <cstdarg>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <fcntl.h>
    #include <unistd.h>

    #define SIZE 1024

    #define Info 0
    #define Debug 1
    #define Warning 2
    #define Error 3
    #define Fatal 4

    #define Screen 1 //输出到屏幕上
    #define Onefile 2 //输出到一个文件中
    #define Classfile 3 //根据事件等级输出到不同的文件中

    #define LogFile "log.txt" //日志名称

    class Log
    {
    public:
    Log()
    {
    printMethod = Screen;
    path = "./log/";
    }

    void Enable(int method) //改变日志打印方式
    {
    printMethod = method;
    }

    ~Log()
    {}

    std::string levelToString(int level)
    {
    switch(level)
    {
    case Info:
    return "Info";
    case Debug:
    return "Debug";
    case Warning:
    return "Warning";
    case Error:
    return "Error";
    case Fatal:
    return "Fatal";
    default:
    return "";
    }
    }

    void operator()(int level, const char* format, ...)
    {
    //默认部分 = 日志等级 + 日志时间
    time_t t = time(nullptr);
    struct tm* ctime = localtime(&t);
    char leftbuffer[SIZE];
    snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
    ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
    ctime->tm_hour, ctime->tm_min, ctime->tm_sec);

    va_list s;
    va_start(s, format);
    char rightbuffer[SIZE];
    vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
    va_end(s);

    char logtxt[2 * SIZE];
    snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer);

    printLog(level, logtxt);
    }

    void printLog(int level, const std::string& logtxt)
    {
    switch(printMethod)
    {
    case Screen:
    std::cout << logtxt << std::endl;
    break;
    case Onefile:
    printOneFile(LogFile, logtxt);
    break;
    case Classfile:
    printClassFile(level, logtxt);
    break;
    default:
    break;
    }
    }

    void printOneFile(const std::string& logname, const std::string& logtxt)
    {
    std::string _logname = path + logname;
    int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666);
    if(fd < 0)
    return;
    write(fd, logtxt.c_str(), logtxt.size());
    close(fd);
    }

    void printClassFile(int level, const std::string& logtxt)
    {
    std::string filename = LogFile;
    filename += ".";
    filename += levelToString(level);

    printOneFile(filename, logtxt);
    }

    private:
    int printMethod;
    std::string path;
    };

    Log lg;

    Main.cc

    #include <iostream>
    #include <memory>
    #include "Log.hpp"
    #include "TcpServer.hpp"
    #include "ServerCal.hpp"

    ServerCal calculator;

    void DefaultOnMessage(std::shared_ptr<Connection> connection)
    {
    std::cout << "上层得到了数据: " << connection->InBuffer() << std::endl;
    std::string response_str = calculator.Calculator(connection->InBuffer());
    if(response_str.empty())
    return;
    lg(Debug, "%s", response_str.c_str());

    connection->AppendOutBuffer(response_str);

    auto tcp_server_ptr = connection->_tcp_server_ptr.lock();
    tcp_server_ptr->Sender(connection);
    }

    int main()
    {
    std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080, DefaultOnMessage));

    epoll_svr->Init();
    epoll_svr->Loop();

    return 0;
    }

    makefile

    .PHONY:all
    all:reactor_client reactor_server

    reactor_client:ClientCal.cc
    g++ -o $@ $^ -std=c++11 -ljsoncpp
    reactor_server:Main.cc
    g++ -o $@ $^ -std=c++11 -ljsoncpp

    .PHONY:clean
    clean:
    rm -f reactor_client reactor_server

    nocopy.hpp

    #pragma once

    class nocopy
    {
    public:
    nocopy(){};

    nocopy(const nocopy& ) = delete;

    const nocopy& operator=(const nocopy& ) = delete;
    };

    Protocol.hpp

    #pragma once

    #include <iostream>
    #include <string>
    #include <jsoncpp/json/json.h>

    const std::string black_space_sep = " ";
    const std::string protocol_sep = "\\n";

    // 123 + 4 -> len\\n123 + 4\\n
    std::string Encode(const std::string& content)
    {
    std::string package = std::to_string(content.size());
    package += protocol_sep;
    package += content;
    package += protocol_sep;

    return package;
    }

    // len\\n123 + 4\\n -> 123 + 4
    bool Decode(std::string& package, std::string* content)
    {
    size_t pos = package.find(protocol_sep);
    if(pos == std::string::npos)
    return false;

    std::string len_str = package.substr(0, pos);
    size_t len = std::stoi(len_str);
    int total_len = len + 1 + len_str.size() + 1;
    if(package.size() < total_len)
    return false;
    *content += package.substr(pos + 1, len);

    //erase ??
    package.erase(0, total_len);

    return true;
    }

    class Request
    {
    public:
    Request(int data1, int data2, char oper)
    : x(data1)
    , y(data2)
    , op(oper)
    {}

    Request()
    {}

    //123 + 4
    bool Serialize(std::string* out)
    {
    #ifdef MySelf
    std::string s = std::to_string(x);
    s += black_space_sep;
    s += op;
    s += black_space_sep;
    s += std::to_string(y);
    *out = s;

    return true;
    #else
    Json::Value root;
    // 初始化
    root["x"] = x;
    root["y"] = y;
    root["op"] = op;

    // 序列化 无论是采用Json::FastWriter还是Json::StyledWriter都可以
    // 这里看个人喜好自行选择即可
    // Json::FastWriter w;
    Json::StyledWriter w;
    *out = w.write(root);

    return true;
    #endif
    }

    //123 + 4 -> 123 + 4
    // 1 + 1
    bool Deserialize(const std::string& in)
    {
    #ifdef MySelf
    size_t left = in.find(black_space_sep);
    if(left == std::string::npos)
    return false;
    std::string part_x = in.substr(0, left);

    size_t right = in.rfind(black_space_sep);
    if(right == std::string::npos)
    return false;
    std::string part_y = in.substr(right + 1);

    if(left + 2 != right)
    return false;

    x = std::stoi(part_x);
    y = std::stoi(part_y);
    op = in[left + 1];

    return true;
    #else
    Json::Value root;
    Json::Reader r;
    // 反序列化
    r.parse(in, root);

    // 解析
    x = root["x"].asInt();
    y = root["y"].asInt();
    op = root["op"].asInt();

    return true;
    #endif
    }

    void DebugPrint()
    {
    std::cout << "新请求构建完成: " << x << op << y << "=?" << std::endl;
    }

    public:
    int x;
    int y;
    char op;
    };

    class Response
    {
    public:
    Response(int res, int c)
    : result(res)
    , code(c)
    {}

    Response()
    {}

    //100 0
    bool Serialize(std::string* out)
    {
    #ifdef MySelf
    std::string s = std::to_string(result);
    s += black_space_sep;
    s += std::to_string(code);
    *out = s;

    return true;
    #else
    Json::Value root;
    //初始化
    root["result"] = result;
    root["code"] = code;

    Json::StyledWriter w;
    *out = w.write(root);

    return true;
    #endif
    }
    //100 0
    bool Deserialize(const std::string& in)
    {
    #ifdef MySelf
    size_t pos = in.find(black_space_sep);
    if(pos == std::string::npos)
    return false;
    std::string part_left = in.substr(0, pos);
    std::string part_right = in.substr(pos + 1);

    result = std::stoi(part_left);
    code = std::stoi(part_right);

    return true;
    #else
    Json::Value root;
    Json::Reader r;
    //反序列化
    r.parse(in, root);

    // 解析
    result = root["result"].asInt();
    code = root["code"].asInt();

    return true;
    #endif
    }

    void DebugPrint()
    {
    std::cout << "结果响应完成, result: " << result << ", code: " << code << std::endl;
    }

    public:
    int result;
    int code;
    };

    ServerCal.hpp

    #pragma once

    #include "Log.hpp"
    #include "Protocol.hpp"

    enum
    {
    Div_Zero = 1,
    Mod_Zero,
    Other_Oper,
    };

    class ServerCal
    {
    public:
    ServerCal()
    {}

    Response CalculatorHelper(Request &req)
    {
    Response resp(0, 0);
    switch (req.op)
    {
    case '+':
    resp.result = req.x + req.y;
    break;
    case '-':
    resp.result = req.x req.y;
    break;
    case '*':
    resp.result = req.x * req.y;
    break;
    case '/':
    {
    if (req.y == 0)
    resp.code = Div_Zero;
    else
    resp.result = req.x / req.y;
    }
    break;
    case '%':
    {
    if (req.y == 0)
    resp.code = Mod_Zero;
    else
    resp.result = req.x % req.y;
    }
    break;
    default:
    resp.code = Other_Oper;
    break;
    }

    return resp;
    }

    std::string Calculator(std::string &package)
    {
    // 解码
    std::string content;
    bool r = Decode(package, &content);
    if(!r)
    return "";
    Request req;
    //反序列化
    r = req.Deserialize(content);
    if (!r)
    return "";
    //计算
    Response resp = CalculatorHelper(req);

    content = "";
    // 序列化
    resp.Serialize(&content);
    // 转码
    content = Encode(content);

    return content;
    }

    ~ServerCal()
    {}
    };

    Socket.hpp

    #pragma once

    #include <iostream>
    #include <string>
    #include <cstring>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include "Log.hpp"

    const int backlog = 10;

    enum{
    SocketErr = 1,
    BindErr,
    ListenErr,
    NON_BLOCK_ERR
    };

    class Sock
    {
    public:
    Sock()
    {}

    void Socket()
    {
    sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd_ < 0)
    {
    lg(Fatal, "socket error, %s : %d", strerror(errno), errno);
    exit(SocketErr);
    }

    int opt = 1;
    setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
    }

    void Bind(uint16_t port)
    {
    struct sockaddr_in local;
    memset(&local, 0, sizeof(local));
    local.sin_family = AF_INET;
    local.sin_port = htons(port);
    local.sin_addr.s_addr = INADDR_ANY;
    socklen_t len = sizeof(local);

    if(bind(sockfd_, (struct sockaddr*)&local, len) < 0)
    {
    lg(Fatal, "bind error, %s : %d", strerror(errno), errno);
    exit(BindErr);
    }
    }

    void Listen()
    {
    if(listen(sockfd_, backlog) < 0)
    {
    lg(Fatal, "listen error, %s : %d", strerror(errno), errno);
    exit(ListenErr);
    }
    }

    int Accept(std::string* clientip, uint16_t* clientport)
    {
    struct sockaddr_in peer;
    socklen_t len = sizeof(peer);

    int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);
    if(newfd < 0)
    {
    lg(Warning, "accept error, %s : %d", strerror(errno), errno);
    return 1;
    }

    char ipstr[128];
    inet_ntop(AF_INET, &(peer.sin_addr), ipstr, sizeof(ipstr));
    *clientip = ipstr;
    *clientport = ntohs(peer.sin_port);

    return newfd;
    }

    bool Connect(const std::string& serverip, uint16_t serverport)
    {
    struct sockaddr_in peer;
    memset(&peer, 0, sizeof(peer));
    peer.sin_family = AF_INET;
    peer.sin_port = htons(serverport);
    inet_pton(AF_INET, serverip.c_str(), &(peer.sin_addr));
    socklen_t len = sizeof(peer);

    int n = connect(sockfd_, (struct sockaddr*)&peer, len);
    if(n == 1)
    {
    std::cerr << "connect to " << serverip << ':' << serverport << "error" << std::endl;
    return false;
    }

    return true;
    }

    void Close()
    {
    if(sockfd_ > 0)
    {
    close(sockfd_);
    }
    }

    int Fd()
    {
    return sockfd_;
    }

    ~Sock()
    {}
    private:
    int sockfd_;
    };

    TcpServer.hpp

    #include <iostream>
    #include <memory>
    #include <functional>
    #include <string>
    #include <unordered_map>
    #include <cerrno>
    #include <sys/epoll.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <arpa/inet.h>
    #include "Log.hpp"
    #include "Socket.hpp"
    #include "nocopy.hpp"
    #include "Epoller.hpp"
    #include "Comm.hpp"

    class Connection;
    class TcpServer;

    uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
    uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
    static const int g_buffer_size = 128;

    using func_t = std::function<void(std::shared_ptr<Connection>)>;

    class Connection
    {
    public:
    Connection(int sock, std::weak_ptr<TcpServer> tcp_server_ptr)
    : _sock(sock), _tcp_server_ptr(tcp_server_ptr)
    {}

    void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
    _recv_cb = recv_cb;
    _send_cb = send_cb;
    _except_cb = except_cb;
    }

    void AppendInBuffer(const std::string& info)
    {
    _inbuffer += info;
    }

    void AppendOutBuffer(const std::string& info)
    {
    _outbuffer += info;
    }

    std::string& InBuffer()
    {
    return _inbuffer;
    }

    std::string& OutBuffer()
    {
    return _outbuffer;
    }

    int SockFd()
    {
    return _sock;
    }

    ~Connection()
    {
    if(_sock > 0)
    {
    close(_sock);
    }
    }

    private:
    int _sock;
    std::string _inbuffer;
    std::string _outbuffer;

    public:
    func_t _recv_cb;
    func_t _send_cb;
    func_t _except_cb;

    std::weak_ptr<TcpServer> _tcp_server_ptr;

    std::string _ip;
    uint16_t _port;
    };

    class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
    {
    static const int num = 64;
    public:
    TcpServer(uint16_t port, func_t OnMessage)
    : _quit(true)
    , _port(port)
    , _listensock_ptr(new Sock())
    , _epoller_ptr(new Epoller())
    , _OnMessage(OnMessage)
    {}

    void Init()
    {
    _listensock_ptr->Socket();
    SetNonBlockOrDie(_listensock_ptr->Fd());
    _listensock_ptr->Bind(_port);
    _listensock_ptr->Listen();
    lg(Info, "create listen socket success, listensock: %d", _listensock_ptr->Fd());

    AddConnection(_listensock_ptr->Fd(), EVENT_IN, \\
    std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
    }

    void AddConnection(int sock, uint32_t events, func_t recv_cb, func_t send_cb, func_t except_cb, \\
    const std::string& ip = "0.0.0.0", uint16_t port = 0)
    {
    std::shared_ptr<Connection> new_connection(new Connection(sock, shared_from_this()));
    new_connection->SetHandler(recv_cb, send_cb, except_cb);
    new_connection->_ip = ip;
    new_connection->_port = port;

    _connections.insert(std::make_pair(sock, new_connection));

    _epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, events);
    lg(Debug, "add a new connection success, sockfd: %d", sock);
    }

    void Accepter(std::shared_ptr<Connection> connection)
    {
    while(true)
    {
    struct sockaddr_in peer;
    socklen_t len = sizeof(peer);
    int sock = ::accept(connection->SockFd(), (struct sockaddr*)&peer, &len);
    if(sock >= 0)
    {
    uint16_t port = ntohs(peer.sin_port);
    char ip[128];
    inet_ntop(AF_INET, &(peer.sin_addr), ip, sizeof(ip));
    lg(Debug, "get a new client, get info -> [%s:%d], sockfd: %d", ip, port, sock);

    SetNonBlockOrDie(sock);

    AddConnection(sock, EVENT_IN, \\
    std::bind(&TcpServer::Recver, this, std::placeholders::_1), \\
    std::bind(&TcpServer::Sender, this, std::placeholders::_1), \\
    std::bind(&TcpServer::Excepter, this, std::placeholders::_1), \\
    ip, port);
    }
    else
    {
    if(errno == EWOULDBLOCK)
    break;
    else if(errno == EINTR)
    continue;
    else
    break;
    }
    }
    }

    void Recver(std::shared_ptr<Connection> connection)
    {
    int sock = connection->SockFd();
    char buffer[g_buffer_size];
    while(true)
    {
    memset(buffer, 0, sizeof(buffer));
    ssize_t n = recv(sock, buffer, sizeof(buffer) 1, 0);
    if(n > 0)
    {
    // buffer[n] = 0; 由于memset已经置为了0,所以这里不需要这个操作了
    connection->AppendInBuffer(buffer);
    }
    else if(n == 0)
    {
    lg(Info, "sockfd: %d, client info -> %s:%d quit…", sock, \\
    connection->_ip.c_str(), connection->_port);

    connection->_except_cb(connection);
    return;
    }
    else
    {
    if(errno == EWOULDBLOCK)
    break;
    else if(errno == EINTR)
    continue;
    else
    {
    lg(Warning, "sockfd: %d, client info -> %s:%d recv error…", sock, \\
    connection->_ip.c_str(), connection->_port);

    connection->_except_cb(connection);
    return;
    }
    }
    }
    // 调用回调函数将数据交付上层处理
    _OnMessage(connection);
    }

    void Sender(std::shared_ptr<Connection> connection)
    {
    int sock = connection->SockFd();
    std::string& outbuffer = connection->OutBuffer();
    while(true)
    {
    ssize_t n = send(sock, outbuffer.c_str(), outbuffer.size(), 0);
    if(n > 0)
    {
    outbuffer.erase(0, n);
    if(outbuffer.empty())
    break;
    }
    else if(n == 0)
    {
    return;
    }
    else
    {
    if(errno == EWOULDBLOCK)
    break;
    else if(errno == EINTR)
    continue;
    else
    {
    lg(Warning, "sockfd: %d, client info -> %s:%d send error…", sock, \\
    connection->_ip.c_str(), connection->_port);

    connection->_except_cb(connection);
    return;
    }
    }
    }

    if(!outbuffer.empty())
    {
    // 开始对写事件的关心
    EnableEvent(sock, true, true);
    }
    else
    {
    // 关闭对写事件的关心
    EnableEvent(sock, true, false);
    }
    }

    void Excepter(std::shared_ptr<Connection> connection)
    {
    int fd = connection->SockFd();

    lg(Warning, "Excepter handler socket: %d, client info -> %s:%d, excepter handler", \\
    fd, connection->_ip.c_str(), connection->_port);
    // 1. 在内核epoll模型的红黑树rb_tree中移除对特定fd的关心
    _epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
    // 2. close关闭异常的文件描述符
    lg(Debug, "close %d done…", fd);
    close(fd);
    // 3. 从unordered_map中移除文件描述符sock及其Connection对象
    lg(Debug, "remove %d from _connections…", fd);
    _connections.erase(fd);
    }

    void EnableEvent(int sock, bool readable, bool writeable)
    {
    uint32_t events = 0;
    events |= ((readable == true ? EPOLLIN : 0) | \\
    (writeable == true ? EPOLLOUT : 0) | EPOLLET);

    _epoller_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);
    }

    bool IsConnectionSafe(int fd)
    {
    // std::unordered_map<int, std::shared_ptr<Connection>>::iterator iter = _connections.find(fd);
    auto iter = _connections.find(fd);
    if(iter == _connections.end())
    return false;

    return true;
    }

    void Dispatcher(int timeout)
    {
    int n = _epoller_ptr->EpollerWait(_revs, num, timeout);
    for(int i = 0; i < n; i++)
    {
    int sock = _revs[i].data.fd;
    uint32_t events = _revs[i].events;
    // 统一将事件异常转化为读写问题
    if((events & EPOLLERR) | (events & EPOLLHUP))
    events |= (EPOLLIN | EPOLLOUT);
    // 这样可以简化逻辑,只需要处理读写问题
    if((events & EPOLLIN) && IsConnectionSafe(sock))
    {
    if(_connections[sock]->_recv_cb)
    _connections[sock]->_recv_cb(_connections[sock]);
    }
    if((events & EPOLLOUT) && IsConnectionSafe(sock))
    {
    if(_connections[sock]->_send_cb)
    _connections[sock]->_send_cb(_connections[sock]);
    }
    }
    }

    void Loop()
    {
    _quit = false;

    while(!_quit)
    {
    Dispatcher(1);
    PrintConnection();
    }

    _quit = true;
    }

    void PrintConnection()
    {
    std::cout << "_connections fd list: " << std::endl;
    for(auto& connection : _connections)
    {
    std::cout << connection.first << ", ";
    std::cout << "inbuffer: " << connection.second->InBuffer() << std::endl;
    }
    std::cout << std::endl;
    }

    ~TcpServer()
    {}

    private:
    bool _quit;
    uint16_t _port;
    std::shared_ptr<Sock> _listensock_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
    struct epoll_event _revs[num];
    // 让上层处理信息
    func_t _OnMessage;
    };


    总结

    以上就是今天的博客内容啦,希望对读者朋友们有帮助 水滴石穿,坚持就是胜利,读者朋友们可以点个关注 点赞收藏加关注,找到小编不迷路!

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 【linux】高级IO,以ET模式运行的epoll版本的TCP服务器实现reactor反应堆
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!