云计算百科
云计算领域专业知识百科平台

reactor以及百万服务器并发的实现

一、reactor的实现

零声教育

reator的优点

不同的文件描述符(fd)发生了不同的事件(event),就会自动分派到不同的回调函数(callback)去处理。 对于epoll来说,如果一个io发送大数据包以至于超过了buffer的大小,会导致多出的数据没办法存下来。 reactor做到了:不同的事件,做不同的action,并且每个io独立。

reactor.c代码


#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>

#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1024

typedef int (*RCALLBACK)(int fd);

int accept_cb(int fd);

int recv_cb(int fd);

int send_cb(int fd);

int epfd = 0;

struct conn {
int fd;

char rbuffer[BUFFER_LENGTH];
int rlength;

char wbuffer[BUFFER_LENGTH];
int wlength;

RCALLBACK send_callback;

union {
RCALLBACK recv_callback;
RCALLBACK accept_callback;
}r_action;
};

struct conn conn_list[CONNECTION_SIZE] = {0};

int set_event(int fd, int event, int flag) {

if (flag) {

struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);

} else {

struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

}

}

int event_register(int fd, int event) {

conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;

memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;

memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;

set_event(fd,event,1);

}

int accept_cb(int fd) {

struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);

int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
printf("accept finished:%d\\n", clientfd);

event_register(clientfd,EPOLLIN);

return 0;

}

int recv_cb(int fd) {

int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);

if (count == 0) {
printf("client disconnect: %d\\n", fd);
close(fd);

epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); //unfinished

return 0;
}
conn_list[fd].rlength = count;

printf("RECV: %s\\n", conn_list[fd].rbuffer);

#if 1

conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);

#endif

set_event(fd, EPOLLOUT, 0);

return count;

}

int send_cb(int fd) {

int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);

set_event(fd, EPOLLIN, 0);

return count;
}

int init_server(unsigned short port) {

int sockfd = socket(AF_INET, SOCK_STREAM, 0);

struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //0.0.0
servaddr.sin_port = htons(port); //0-1023

if (1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof (struct sockaddr))) {
printf("bind failed: %s\\n", strerror(errno));
}

listen(sockfd, 10);
printf("listen finished:%d\\n", sockfd);

return sockfd;
}

int main() {

unsigned short port = 2000;
int sockfd = init_server(port);

epfd = epoll_create(1);

conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;

set_event(sockfd, EPOLLIN, 1);

while (1) { //mainloop

struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, 1);

int i = 0;
for (i = 0;i < nready;i ++) {

int connfd = events[i].data.fd;

if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
}

if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
}

}

}

  • main()函数中,有这么一行conn_list[sockfd].r_action.recv_callback = accept_cb;刚开始时我认为应该是conn_list[sockfd].r_action.accept_callback = accept_cb,后来才知道是因为union 联合体里面所有成员共用一块内存,你赋值给哪个,实际上只是往这一块内存里写地址。也就是说用 recv_callback 和 accept_callback ,只是名字上的区分,本质就是同一块内存。所以写成recv_callback还是accept_callback都是一样的效果。
  • int init_server(unsigned short port)的作用是初始化服务器,创建 socket,绑定本地地址和端口,然后进入监听状态并返回监听的fd,准备好接受新客户端连接。
  • int set_event(int fd, int event, int flag)的作用是将某个fd注册到epoll或修改已经注册的事件,是epoll_ctl 的包装。
  • int event_register(int fd, int event)的作用是初始化一个新连接的结构体,分配对应匹配的回调函数以及清空缓存区,并且把这个fd注册到epoll事件监控当中。
  • int accept_cb(int fd)的作用是处理新客户端连接,当有新连接到来时被 epoll 触发,执行 accept,并对新 fd 做初始化。
  • int recv_cb(int fd)的作用是接收客户端数据。用 recv() 从 fd 读取数据到读缓冲区,如果收到的数据长度是0,说明客户端断开,做清理(close() 并从 epoll 删除该 fd),如果收到数据,显示收到内容,拷贝收到的数据到写缓冲区(实现 echo 功能),修改 fd 的 epoll 事件为EPOLLOUT (准备回写数据)。
  • int send_cb(int fd)的作用是发送数据给客户端。用 send() 把写缓冲区的数据写给客户端,修改 fd 的 epoll 事件为 EPOLLIN。
  • 二、服务器百万并发

    将上面reactor.c的代码中的CONNECTION_SIZE宏定义为1048576,用作服务器。

    客户端代码mul_port_client_epoll.c


    #include <stdio.h>
    #include <string.h>
    #include <stdlib.h>

    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>
    #include <errno.h>
    #include <netinet/tcp.h>
    #include <arpa/inet.h>
    #include <netdb.h>
    #include <fcntl.h>
    #include <sys/time.h>
    #include <unistd.h>

    #define MAX_BUFFER128
    #define MAX_EPOLLSIZE(384*1024)
    #define MAX_PORT1

    #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec tv2.tv_sec) * 1000 + (tv1.tv_usec tv2.tv_usec) / 1000)

    int isContinue = 0;

    static int ntySetNonblock(int fd) {
    int flags;

    flags = fcntl(fd, F_GETFL, 0);
    if (flags < 0) return flags;
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) < 0) return 1;
    return 0;
    }

    static int ntySetReUseAddr(int fd) {
    int reuse = 1;
    return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
    }

    int main(int argc, char **argv) {
    if (argc <= 2) {
    printf("Usage: %s ip port\\n", argv[0]);
    exit(0);
    }

    const char *ip = argv[1];
    int port = atoi(argv[2]);
    int connections = 0;
    char buffer[128] = {0};
    int i = 0, index = 0;

    struct epoll_event events[MAX_EPOLLSIZE];

    int epoll_fd = epoll_create(MAX_EPOLLSIZE);

    strcpy(buffer, " Data From MulClient\\n");

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));

    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr(ip);

    struct timeval tv_begin;
    gettimeofday(&tv_begin, NULL);
    int sockfd = 0;

    while (1) {
    if (++index >= MAX_PORT) index = 0;

    struct epoll_event ev;

    if (connections < 340000 && !isContinue) {
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == 1) {
    perror("socket");
    goto err;
    }

    //ntySetReUseAddr(sockfd);
    addr.sin_port = htons(port+index);

    if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
    perror("connect");
    goto err;
    }
    ntySetNonblock(sockfd);
    ntySetReUseAddr(sockfd);

    sprintf(buffer, "Hello Server: client –> %d\\n", connections);
    send(sockfd, buffer, strlen(buffer), 0);

    ev.data.fd = sockfd;
    ev.events = EPOLLIN | EPOLLOUT;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);

    connections ++;
    }
    //connections ++;
    if (connections % 1000 == 999 || connections >= 340000) {
    struct timeval tv_cur;
    memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));

    gettimeofday(&tv_begin, NULL);

    int time_used = TIME_SUB_MS(tv_begin, tv_cur);
    printf("connections: %d, sockfd:%d, time_used:%d\\n", connections, sockfd, time_used);

    int nfds = epoll_wait(epoll_fd, events, connections, 100);
    for (i = 0;i < nfds;i ++) {
    int clientfd = events[i].data.fd;

    if (events[i].events & EPOLLOUT) {
    //sprintf(buffer, "data from %d\\n", clientfd);
    send(sockfd, buffer, strlen(buffer), 0);
    } else if (events[i].events & EPOLLIN) {
    char rBuffer[MAX_BUFFER] = {0};
    ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
    if (length > 0) {
    //printf(" RecvBuffer:%s\\n", rBuffer);

    if (!strcmp(rBuffer, "quit")) {
    isContinue = 0;
    }

    } else if (length == 0) {
    printf(" Disconnect clientfd:%d\\n", clientfd);
    connections ;
    close(clientfd);
    } else {
    if (errno == EINTR || errno == EAGAIN || errno == ENOTSOCK) continue;

    printf(" Error clientfd:%d, errno:%d\\n", clientfd, errno);
    close(clientfd);
    }
    } else {
    printf(" clientfd:%d, errno:%d\\n", clientfd, errno);
    close(clientfd);
    }
    }
    }

    usleep(500);
    }

    return 0;

    err:
    printf("error : %s\\n", strerror(errno));
    return 0;

    }

    第一次运行

    出现too many open files问题

    在这里插入图片描述

    左边是一个服务器,右边是三个客户端,可以看出,当连接数超过1024时,出现了错误:too many open files。

    解决方法

    在这里插入图片描述 客户端键入ulimit -a后,显示open files:1024,键入ulimit -n 1048576将open files改为1048576,不仅客户端要修改,服务端也要修改。

    第二次运行

    reactor.c编译不通过

    在这里插入图片描述 可以明显看出,代码中struct conn conn_list[CONNECTION_SIZE] = {0}定义了一个非常大的全局(静态)数组或者结构体数组,这会在全局数据段(静态区/BSS段)分配一大块内存,比如上百万个结构体,每个2KB左右,总共会占用2GB甚至更多的空间。编译器/链接器(尤其是ld)对单个全局变量的大小有限制,通常2GB左右,超出就会报错。下面尝试通过molloc的方法来解决。malloc是 C 语言中用来“向操作系统申请一块内存”的函数,英文全称是 Memory Allocation(分配内存),用法是malloc(想要的字节数),申请到的内存是“堆”上的内存,可以随便用、用完还可以归还,返回的是“新分配的内存的首地址”。

    解决方法

    服务端键入gcc -o reactor reactor.c -mcmodel=medium。

    这是GCC在x86_64(64位)架构下的一个内存模型参数,它调整了编译器对静态数据段(BSS/Data段)的寻址方式,可以支持比默认更大的全局变量。x86_64平台有不同的内存模型,-mcmodel=medium让全局和静态变量寻址空间变大,允许你分配比默认更大的静态全局数据(比如大于2GB)。默认是-mcmodel=small,全局静态变量寻址有限(一般最大2GB)。用-mcmodel=medium,全局静态数据可以很大(只要物理内存和虚拟地址空间支持)

    第三次运行

    出现客户端本地端口耗尽的问题

    在这里插入图片描述

    单个端口(如服务器的2000端口)理论上可以同时支持成千上万个连接,实际能建立多少连接取决于操作系统的资源限制,比如文件描述符数量、TCP本地端口可用范围等。当出现‘Cannot assign requested address’等错误时,往往是客户端本地端口或其它资源耗尽了,而不是服务器端口本身的限制。 此外,因为每条打印过于耗时,所以每一千个连接打印一条信息以及其所耗的时间。

    解决方法

    把服务器的端口从一个(2000)增加到20个。

    reactor.c代码


    #include <errno.h>
    #include <stdio.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <string.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <poll.h>
    #include <sys/epoll.h>
    #include <errno.h>
    #include <sys/time.h>

    #define BUFFER_LENGTH 1024
    #define CONNECTION_SIZE 1048576

    #define MAX_PORTS 20

    #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec tv2.tv_sec) * 1000 + (tv1.tv_usec tv2.tv_usec) / 1000)

    typedef int (*RCALLBACK)(int fd);

    int accept_cb(int fd);

    int recv_cb(int fd);

    int send_cb(int fd);

    int epfd = 0;

    struct timeval begin;

    struct conn {
    int fd;

    char rbuffer[BUFFER_LENGTH];
    int rlength;

    char wbuffer[BUFFER_LENGTH];
    int wlength;

    RCALLBACK send_callback;

    union {
    RCALLBACK recv_callback;
    RCALLBACK accept_callback;
    }r_action;
    };

    struct conn conn_list[CONNECTION_SIZE] = {0};

    int set_event(int fd, int event, int flag) {

    if (flag) {

    struct epoll_event ev;
    ev.events = event;
    ev.data.fd = fd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);

    } else {

    struct epoll_event ev;
    ev.events = event;
    ev.data.fd = fd;
    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

    }

    }

    int event_register(int fd, int event) {

    if (fd < 0)return 1;

    conn_list[fd].fd = fd;
    conn_list[fd].r_action.recv_callback = recv_cb;
    conn_list[fd].send_callback = send_cb;

    memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
    conn_list[fd].rlength = 0;

    memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
    conn_list[fd].wlength = 0;

    set_event(fd,event,1);

    }

    int accept_cb(int fd) {

    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
    //printf("accept finished:%d\\n", clientfd);

    if (clientfd < 0) {
    printf("accept errno: %d –> %s\\n", errno, strerror(errno));
    return 1;
    }

    event_register(clientfd, EPOLLIN);

    if ((clientfd % 1000) == 0) {

    struct timeval current;
    gettimeofday(&current, NULL);

    int time_used = TIME_SUB_MS(current, begin);
    memcpy(&begin, &current, sizeof(struct timeval));

    printf("accept finished: %d, time_used: %d\\n", clientfd, time_used);

    }

    return 0;

    }

    int recv_cb(int fd) {

    int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);

    if (count == 0) {
    printf("client disconnect: %d\\n", fd);
    close(fd);

    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); //unfinished

    return 0;
    }
    conn_list[fd].rlength = count;

    //printf("RECV: %s\\n", conn_list[fd].rbuffer);

    #if 1

    conn_list[fd].wlength = conn_list[fd].rlength;
    memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);

    #endif

    set_event(fd, EPOLLOUT, 0);

    return count;

    }

    int send_cb(int fd) {

    int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);

    set_event(fd, EPOLLIN, 0);

    return count;
    }

    int init_server(unsigned short port) {

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //0.0.0
    servaddr.sin_port = htons(port); //0-1023

    if (1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof (struct sockaddr))) {
    printf("bind failed: %s\\n", strerror(errno));
    }

    listen(sockfd, 10);
    //printf("listen finished:%d\\n", sockfd);

    return sockfd;
    }

    int main() {

    unsigned short port = 2000;

    epfd = epoll_create(1);

    int i = 0;

    for (i = 0;i < MAX_PORTS;i ++) {

    int sockfd = init_server(port + i);

    conn_list[sockfd].fd = sockfd;
    conn_list[sockfd].r_action.recv_callback = accept_cb;

    set_event(sockfd, EPOLLIN, 1);
    }

    gettimeofday(&begin, NULL);

    while (1) { //mainloop

    struct epoll_event events[1024] = {0};
    int nready = epoll_wait(epfd, events, 1024, 1);

    int i = 0;
    for (i = 0;i < nready;i ++) {

    int connfd = events[i].data.fd;

    if (events[i].events & EPOLLIN) {
    conn_list[connfd].r_action.recv_callback(connfd);
    }

    if (events[i].events & EPOLLOUT) {
    conn_list[connfd].send_callback(connfd);
    }
    }

    }

    }

    第四次运行

    在运行前,每台虚拟机键入sudo vim /etc/security/limits.conf,在文件底部添加内容如下图所示,中间的空是tab键,没有空格键。 在这里插入图片描述

    出现客户端连接超时的问题

    在这里插入图片描述

    解决方法(改eth0)

    ifconfig查看服务器虚拟机的ip地址,然后键入sudo vim /etc/default/grub ,修改为下图中的样子: 在这里插入图片描述 系统的引导加载器grub配置文件,GRUB_CMDLINE_LINUX 参数被修改为"net.ifnames=0 biosdevname=0",这禁用了新的网络接口命名规则(如 ens33、enp0s3),使用传统的网络接口名称(如 eth0、eth1)。 随后键入sudo update-grub,更新后键入sudo vim /etc/netplan/00-installer-config.yaml ,修改内容为下图: 在这里插入图片描述 配置了一个名为 eth0 的网络接口,dhcp4: true: 使用 DHCP 自动获取 IPv4 地址,version: 2: 使用 Netplan 配置格式版本 2。 最后键入sudo shutdown -h now关机,关机后增加一个网络适配器:NAT。 一张网卡用于SSH连接,另一张网卡用于跑百万服务器。

    第五次运行

    运行成功

    在这里插入图片描述 经过一段时间的等待,总连接数跑到了102W,实现了百万并发。

    三、总结reactor.c的执行流程

    从main()函数入手:

    unsigned short port = 2000;

    定义一个无符号短整型的变量port用来存储端口号2000,无符号(unsigned)只能表示0和正数,短整型(short)只能表示2字节的整数。

    epfd = epoll_create(1);

    创建一个epoll实例并返回一个管理epoll实例的文件描述符epfd。

    int i = 0;

    for (i = 0;i < MAX_PORTS;i ++) {

    int sockfd = init_server(port + i);

    conn_list[sockfd].fd = sockfd;
    conn_list[sockfd].r_action.recv_callback = accept_cb;

    set_event(sockfd, EPOLLIN, 1);
    }

    对于每个端口,调用init_server函数并返回对应的sockfd,下面是init_server对应的代码:

    int init_server(unsigned short port) {

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //0.0.0
    servaddr.sin_port = htons(port); //0-1023

    if (1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof (struct sockaddr))
    ) {
    printf("bind failed: %s\\n", strerror(errno));
    }

    listen(sockfd, 10);
    //printf("listen finished:%d\\n", sockfd);

    return sockfd;
    }

    在init_server中,创建一个IPV4 TCP套接字,返回一个sockfd;创建一个类型为struct sockaddr_in的结构体变量servaddr,将这个变量的地址族设置为IPV4,服务器监听所有网络接口并指定服务器监听的端口号,通过bind()将套接字(socket) 和 IP地址、端口号 绑定在一起,通过listen()将套接字设置为监听状态,准备接收客户端的连接请求,返回sockfd。 再回到main()函数当中,拿着返回的fd,将结构体数组conn_list的索引号与返回的sockfd的值对应上并且将conn_list中的成员联合体r_action中的recv_callback函数指针指向accept_cb函数的地址。 随后进入到set_event函数中:

    int set_event(int fd, int event, int flag) {

    if (flag) {

    struct epoll_event ev;
    ev.events = event;
    ev.data.fd = fd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);

    } else {

    struct epoll_event ev;
    ev.events = event;
    ev.data.fd = fd;
    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);

    }

    }

    定义一个类型为struct epoll_event的结构体变量ev,设置ev当中的事件以及对应的fd并且通过epoll_ctl将这些通过epfd添加epoll实例当中。 回到main函数:

    gettimeofday(&begin, NULL);

    获取当前时间并存储到 begin 变量

    while (1) { //mainloop

    struct epoll_event events[1024] = {0};
    int nready = epoll_wait(epfd, events, 1024, 1);

    int i = 0;
    for (i = 0;i < nready;i ++) {

    int connfd = events[i].data.fd;

    if (events[i].events & EPOLLIN) {
    conn_list[connfd].r_action.recv_callback(connfd);
    }

    if (events[i].events & EPOLLOUT) {
    conn_list[connfd].send_callback(connfd);
    }
    }

    }

    创建一个类型为struct epoll_event的结构体变量events,定义了一个存储最多1024个事件的数组,并将每个事件的字段初始化为0。通过epoll_wait等待指定的 epoll 实例上的事件发生,将触发的事件按顺序从第一个元素开始写入 events 数组,并返回触发的事件数。for循环遍历已经写入events的所有元素。对于循环内部,将事件对应的fd赋值给connfd,如果事件是可读,执行前面设置好的accept_cb,传入参数connfd,对于accept_cb函数:

    int accept_cb(int fd) {

    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
    //printf("accept finished:%d\\n", clientfd);

    if (clientfd < 0) {
    printf("accept errno: %d –> %s\\n", errno, strerror(errno));
    return 1;
    }

    event_register(clientfd, EPOLLIN);

    if ((clientfd % 1000) == 0) {

    struct timeval current;
    gettimeofday(&current, NULL);

    int time_used = TIME_SUB_MS(current, begin);
    memcpy(&begin, &current, sizeof(struct timeval));

    printf("accept finished: %d, time_used: %d\\n", clientfd, time_used);

    }

    return 0;

    }

    accept建立连接并返回clientfd,调用event_register并传入参数clientfd,EPOLLIN:

    int event_register(int fd, int event) {

    if (fd < 0)return 1;

    conn_list[fd].fd = fd;
    conn_list[fd].r_action.recv_callback = recv_cb;
    conn_list[fd].send_callback = send_cb;

    memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
    conn_list[fd].rlength = 0;

    memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
    conn_list[fd].wlength = 0;

    set_event(fd,event,1);

    }

    依旧是将conn_list数组的索引对应fd,并且把recv_callback和send_callback函数指针指向recv_cb函数和send_cb函数的地址,清空fd对应的rbuffer、wbuffer并重置rlength、wlength,通过set_event函数将引入的fd和事件添加到epoll实例当中。 回到accept_cb当中,每一千条连接打印连接数已经连接所用的时间。 再回到主程序的while循环当中,如果是可写事件,通过函数指针send_callback回调函数send_cb,并传入参数connfd:

    int send_cb(int fd) {

    int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);

    set_event(fd, EPOLLIN, 0);

    return count;
    }

    将connfd对应的wbuffer中的长度为wlength的数据发送到指定的fd当中,并且返回成功发送的字节数count,通过sent_event将此fd的事件从可写改为可读,返回count,再次返回到main()函数重复执行while循环。

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » reactor以及百万服务器并发的实现
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!