一、reactor的实现
零声教育
reator的优点
不同的文件描述符(fd)发生了不同的事件(event),就会自动分派到不同的回调函数(callback)去处理。 对于epoll来说,如果一个io发送大数据包以至于超过了buffer的大小,会导致多出的数据没办法存下来。 reactor做到了:不同的事件,做不同的action,并且每个io独立。
reactor.c代码
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>
#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1024
typedef int (*RCALLBACK)(int fd);
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
int epfd = 0;
struct conn {
int fd;
char rbuffer[BUFFER_LENGTH];
int rlength;
char wbuffer[BUFFER_LENGTH];
int wlength;
RCALLBACK send_callback;
union {
RCALLBACK recv_callback;
RCALLBACK accept_callback;
}r_action;
};
struct conn conn_list[CONNECTION_SIZE] = {0};
int set_event(int fd, int event, int flag) {
if (flag) {
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
} else {
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int event_register(int fd, int event) {
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;
memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;
set_event(fd,event,1);
}
int accept_cb(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
printf("accept finished:%d\\n", clientfd);
event_register(clientfd,EPOLLIN);
return 0;
}
int recv_cb(int fd) {
int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);
if (count == 0) {
printf("client disconnect: %d\\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); //unfinished
return 0;
}
conn_list[fd].rlength = count;
printf("RECV: %s\\n", conn_list[fd].rbuffer);
#if 1
conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
#endif
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd) {
int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
set_event(fd, EPOLLIN, 0);
return count;
}
int init_server(unsigned short port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //0.0.0
servaddr.sin_port = htons(port); //0-1023
if (–1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof (struct sockaddr))) {
printf("bind failed: %s\\n", strerror(errno));
}
listen(sockfd, 10);
printf("listen finished:%d\\n", sockfd);
return sockfd;
}
int main() {
unsigned short port = 2000;
int sockfd = init_server(port);
epfd = epoll_create(1);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
while (1) { //mainloop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, –1);
int i = 0;
for (i = 0;i < nready;i ++) {
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
}
if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
}
}
}
二、服务器百万并发
将上面reactor.c的代码中的CONNECTION_SIZE宏定义为1048576,用作服务器。
客户端代码mul_port_client_epoll.c
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/time.h>
#include <unistd.h>
#define MAX_BUFFER128
#define MAX_EPOLLSIZE(384*1024)
#define MAX_PORT1
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec – tv2.tv_sec) * 1000 + (tv1.tv_usec – tv2.tv_usec) / 1000)
int isContinue = 0;
static int ntySetNonblock(int fd) {
int flags;
flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return flags;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0) return –1;
return 0;
}
static int ntySetReUseAddr(int fd) {
int reuse = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}
int main(int argc, char **argv) {
if (argc <= 2) {
printf("Usage: %s ip port\\n", argv[0]);
exit(0);
}
const char *ip = argv[1];
int port = atoi(argv[2]);
int connections = 0;
char buffer[128] = {0};
int i = 0, index = 0;
struct epoll_event events[MAX_EPOLLSIZE];
int epoll_fd = epoll_create(MAX_EPOLLSIZE);
strcpy(buffer, " Data From MulClient\\n");
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip);
struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);
int sockfd = 0;
while (1) {
if (++index >= MAX_PORT) index = 0;
struct epoll_event ev;
if (connections < 340000 && !isContinue) {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == –1) {
perror("socket");
goto err;
}
//ntySetReUseAddr(sockfd);
addr.sin_port = htons(port+index);
if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
perror("connect");
goto err;
}
ntySetNonblock(sockfd);
ntySetReUseAddr(sockfd);
sprintf(buffer, "Hello Server: client –> %d\\n", connections);
send(sockfd, buffer, strlen(buffer), 0);
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLOUT;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
connections ++;
}
//connections ++;
if (connections % 1000 == 999 || connections >= 340000) {
struct timeval tv_cur;
memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
gettimeofday(&tv_begin, NULL);
int time_used = TIME_SUB_MS(tv_begin, tv_cur);
printf("connections: %d, sockfd:%d, time_used:%d\\n", connections, sockfd, time_used);
int nfds = epoll_wait(epoll_fd, events, connections, 100);
for (i = 0;i < nfds;i ++) {
int clientfd = events[i].data.fd;
if (events[i].events & EPOLLOUT) {
//sprintf(buffer, "data from %d\\n", clientfd);
send(sockfd, buffer, strlen(buffer), 0);
} else if (events[i].events & EPOLLIN) {
char rBuffer[MAX_BUFFER] = {0};
ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
if (length > 0) {
//printf(" RecvBuffer:%s\\n", rBuffer);
if (!strcmp(rBuffer, "quit")) {
isContinue = 0;
}
} else if (length == 0) {
printf(" Disconnect clientfd:%d\\n", clientfd);
connections —;
close(clientfd);
} else {
if (errno == EINTR || errno == EAGAIN || errno == ENOTSOCK) continue;
printf(" Error clientfd:%d, errno:%d\\n", clientfd, errno);
close(clientfd);
}
} else {
printf(" clientfd:%d, errno:%d\\n", clientfd, errno);
close(clientfd);
}
}
}
usleep(500);
}
return 0;
err:
printf("error : %s\\n", strerror(errno));
return 0;
}
第一次运行
出现too many open files问题
左边是一个服务器,右边是三个客户端,可以看出,当连接数超过1024时,出现了错误:too many open files。
解决方法
客户端键入ulimit -a后,显示open files:1024,键入ulimit -n 1048576将open files改为1048576,不仅客户端要修改,服务端也要修改。
第二次运行
reactor.c编译不通过
可以明显看出,代码中struct conn conn_list[CONNECTION_SIZE] = {0}定义了一个非常大的全局(静态)数组或者结构体数组,这会在全局数据段(静态区/BSS段)分配一大块内存,比如上百万个结构体,每个2KB左右,总共会占用2GB甚至更多的空间。编译器/链接器(尤其是ld)对单个全局变量的大小有限制,通常2GB左右,超出就会报错。下面尝试通过molloc的方法来解决。malloc是 C 语言中用来“向操作系统申请一块内存”的函数,英文全称是 Memory Allocation(分配内存),用法是malloc(想要的字节数),申请到的内存是“堆”上的内存,可以随便用、用完还可以归还,返回的是“新分配的内存的首地址”。
解决方法
服务端键入gcc -o reactor reactor.c -mcmodel=medium。
这是GCC在x86_64(64位)架构下的一个内存模型参数,它调整了编译器对静态数据段(BSS/Data段)的寻址方式,可以支持比默认更大的全局变量。x86_64平台有不同的内存模型,-mcmodel=medium让全局和静态变量寻址空间变大,允许你分配比默认更大的静态全局数据(比如大于2GB)。默认是-mcmodel=small,全局静态变量寻址有限(一般最大2GB)。用-mcmodel=medium,全局静态数据可以很大(只要物理内存和虚拟地址空间支持)
第三次运行
出现客户端本地端口耗尽的问题
单个端口(如服务器的2000端口)理论上可以同时支持成千上万个连接,实际能建立多少连接取决于操作系统的资源限制,比如文件描述符数量、TCP本地端口可用范围等。当出现‘Cannot assign requested address’等错误时,往往是客户端本地端口或其它资源耗尽了,而不是服务器端口本身的限制。 此外,因为每条打印过于耗时,所以每一千个连接打印一条信息以及其所耗的时间。
解决方法
把服务器的端口从一个(2000)增加到20个。
reactor.c代码
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>
#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1048576
#define MAX_PORTS 20
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec – tv2.tv_sec) * 1000 + (tv1.tv_usec – tv2.tv_usec) / 1000)
typedef int (*RCALLBACK)(int fd);
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
int epfd = 0;
struct timeval begin;
struct conn {
int fd;
char rbuffer[BUFFER_LENGTH];
int rlength;
char wbuffer[BUFFER_LENGTH];
int wlength;
RCALLBACK send_callback;
union {
RCALLBACK recv_callback;
RCALLBACK accept_callback;
}r_action;
};
struct conn conn_list[CONNECTION_SIZE] = {0};
int set_event(int fd, int event, int flag) {
if (flag) {
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
} else {
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int event_register(int fd, int event) {
if (fd < 0)return –1;
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;
memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;
set_event(fd,event,1);
}
int accept_cb(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
//printf("accept finished:%d\\n", clientfd);
if (clientfd < 0) {
printf("accept errno: %d –> %s\\n", errno, strerror(errno));
return –1;
}
event_register(clientfd, EPOLLIN);
if ((clientfd % 1000) == 0) {
struct timeval current;
gettimeofday(¤t, NULL);
int time_used = TIME_SUB_MS(current, begin);
memcpy(&begin, ¤t, sizeof(struct timeval));
printf("accept finished: %d, time_used: %d\\n", clientfd, time_used);
}
return 0;
}
int recv_cb(int fd) {
int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);
if (count == 0) {
printf("client disconnect: %d\\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); //unfinished
return 0;
}
conn_list[fd].rlength = count;
//printf("RECV: %s\\n", conn_list[fd].rbuffer);
#if 1
conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
#endif
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd) {
int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
set_event(fd, EPOLLIN, 0);
return count;
}
int init_server(unsigned short port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //0.0.0
servaddr.sin_port = htons(port); //0-1023
if (–1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof (struct sockaddr))) {
printf("bind failed: %s\\n", strerror(errno));
}
listen(sockfd, 10);
//printf("listen finished:%d\\n", sockfd);
return sockfd;
}
int main() {
unsigned short port = 2000;
epfd = epoll_create(1);
int i = 0;
for (i = 0;i < MAX_PORTS;i ++) {
int sockfd = init_server(port + i);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
}
gettimeofday(&begin, NULL);
while (1) { //mainloop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, –1);
int i = 0;
for (i = 0;i < nready;i ++) {
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
}
if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
}
}
}
第四次运行
在运行前,每台虚拟机键入sudo vim /etc/security/limits.conf,在文件底部添加内容如下图所示,中间的空是tab键,没有空格键。
出现客户端连接超时的问题
解决方法(改eth0)
ifconfig查看服务器虚拟机的ip地址,然后键入sudo vim /etc/default/grub ,修改为下图中的样子: 系统的引导加载器grub配置文件,GRUB_CMDLINE_LINUX 参数被修改为"net.ifnames=0 biosdevname=0",这禁用了新的网络接口命名规则(如 ens33、enp0s3),使用传统的网络接口名称(如 eth0、eth1)。 随后键入sudo update-grub,更新后键入sudo vim /etc/netplan/00-installer-config.yaml ,修改内容为下图:
配置了一个名为 eth0 的网络接口,dhcp4: true: 使用 DHCP 自动获取 IPv4 地址,version: 2: 使用 Netplan 配置格式版本 2。 最后键入sudo shutdown -h now关机,关机后增加一个网络适配器:NAT。 一张网卡用于SSH连接,另一张网卡用于跑百万服务器。
第五次运行
运行成功
经过一段时间的等待,总连接数跑到了102W,实现了百万并发。
三、总结reactor.c的执行流程
从main()函数入手:
unsigned short port = 2000;
定义一个无符号短整型的变量port用来存储端口号2000,无符号(unsigned)只能表示0和正数,短整型(short)只能表示2字节的整数。
epfd = epoll_create(1);
创建一个epoll实例并返回一个管理epoll实例的文件描述符epfd。
int i = 0;
for (i = 0;i < MAX_PORTS;i ++) {
int sockfd = init_server(port + i);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
}
对于每个端口,调用init_server函数并返回对应的sockfd,下面是init_server对应的代码:
int init_server(unsigned short port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //0.0.0
servaddr.sin_port = htons(port); //0-1023
if (–1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof (struct sockaddr))
) {
printf("bind failed: %s\\n", strerror(errno));
}
listen(sockfd, 10);
//printf("listen finished:%d\\n", sockfd);
return sockfd;
}
在init_server中,创建一个IPV4 TCP套接字,返回一个sockfd;创建一个类型为struct sockaddr_in的结构体变量servaddr,将这个变量的地址族设置为IPV4,服务器监听所有网络接口并指定服务器监听的端口号,通过bind()将套接字(socket) 和 IP地址、端口号 绑定在一起,通过listen()将套接字设置为监听状态,准备接收客户端的连接请求,返回sockfd。 再回到main()函数当中,拿着返回的fd,将结构体数组conn_list的索引号与返回的sockfd的值对应上并且将conn_list中的成员联合体r_action中的recv_callback函数指针指向accept_cb函数的地址。 随后进入到set_event函数中:
int set_event(int fd, int event, int flag) {
if (flag) {
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
} else {
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
定义一个类型为struct epoll_event的结构体变量ev,设置ev当中的事件以及对应的fd并且通过epoll_ctl将这些通过epfd添加epoll实例当中。 回到main函数:
gettimeofday(&begin, NULL);
获取当前时间并存储到 begin 变量
while (1) { //mainloop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, –1);
int i = 0;
for (i = 0;i < nready;i ++) {
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN) {
conn_list[connfd].r_action.recv_callback(connfd);
}
if (events[i].events & EPOLLOUT) {
conn_list[connfd].send_callback(connfd);
}
}
}
创建一个类型为struct epoll_event的结构体变量events,定义了一个存储最多1024个事件的数组,并将每个事件的字段初始化为0。通过epoll_wait等待指定的 epoll 实例上的事件发生,将触发的事件按顺序从第一个元素开始写入 events 数组,并返回触发的事件数。for循环遍历已经写入events的所有元素。对于循环内部,将事件对应的fd赋值给connfd,如果事件是可读,执行前面设置好的accept_cb,传入参数connfd,对于accept_cb函数:
int accept_cb(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
//printf("accept finished:%d\\n", clientfd);
if (clientfd < 0) {
printf("accept errno: %d –> %s\\n", errno, strerror(errno));
return –1;
}
event_register(clientfd, EPOLLIN);
if ((clientfd % 1000) == 0) {
struct timeval current;
gettimeofday(¤t, NULL);
int time_used = TIME_SUB_MS(current, begin);
memcpy(&begin, ¤t, sizeof(struct timeval));
printf("accept finished: %d, time_used: %d\\n", clientfd, time_used);
}
return 0;
}
accept建立连接并返回clientfd,调用event_register并传入参数clientfd,EPOLLIN:
int event_register(int fd, int event) {
if (fd < 0)return –1;
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;
memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;
set_event(fd,event,1);
}
依旧是将conn_list数组的索引对应fd,并且把recv_callback和send_callback函数指针指向recv_cb函数和send_cb函数的地址,清空fd对应的rbuffer、wbuffer并重置rlength、wlength,通过set_event函数将引入的fd和事件添加到epoll实例当中。 回到accept_cb当中,每一千条连接打印连接数已经连接所用的时间。 再回到主程序的while循环当中,如果是可写事件,通过函数指针send_callback回调函数send_cb,并传入参数connfd:
int send_cb(int fd) {
int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
set_event(fd, EPOLLIN, 0);
return count;
}
将connfd对应的wbuffer中的长度为wlength的数据发送到指定的fd当中,并且返回成功发送的字节数count,通过sent_event将此fd的事件从可写改为可读,返回count,再次返回到main()函数重复执行while循环。
评论前必须登录!
注册