✨✨欢迎来到T_X_Parallel的博客!! 🛰️博客主页:T_X_Parallel 🛰️项目代码仓库:Reactor模型高并发服务器项目代码仓库(随博客更新) 🛰️专栏 : Reactor模型高并发服务器项目 🛰️欢迎关注:👍点赞🙌收藏✍️留言
文章目录
-
- 1. 前言
- 2. 日志模块
- 3. 缓冲区模块
-
- 设计思想
- 接口设计
- 功能实现
- 测试
- 4. 套接字模块
-
- 设计思想
- 接口设计
- 功能实现
- 测试
- 5. 完整代码
-
- 缓冲区模块代码
- 套接字模块代码
项目环境:vscode、wsl2(Ubuntu22.04)
技术栈:C/C++、C++11、C++17、STL、HTTP、TCP、HTML
1. 前言
接下来开始逐步实现项目中的多个模块了,首先先实现缓冲区模块和套接字模块,但是在实现这两个模块之前先在项目中加入日志模块很重要,有些打印信息可以帮助调试。
提醒:接下来所有的模块实现可能会出现各种各样的bug,博主在实现的时候几乎每次都会遇到新bug,都是代码中一些不起眼的错误,所以实现过程一定要仔细,实现完一个模块就进行测试。当然,博主会在一些容易出错的地方提醒
先提前预告一下后面的流程吧,大概率的是:Buffer模块+Socket模块→Channel模块+Poll模块+TimerWheel模块+EventLoop模块→Connection模块→LoopThread模块+Acceptor模块+TcpServer模块→HttpServer模块
这个过程是博主自己先实现了一遍之后认为最合理的一个过程,如果觉得博主博客更新慢,可以去博主代码仓库的项目代码进行阅读,代码已经更新完毕了,内含每个阶段的测试代码和最终代码(有些测试需要通过一些工具去测试)。
废话少说,开始项目的第一个阶段吧。
2. 日志模块
这个日志模块是一个成熟项目中必不可少的模块,可以方便打印的同时也便于看到一些关键信息,比如打印位置所处的文件和行号以及日志等级等,在目前这个阶段可以直接使用上一个站内搜索引擎项目中简单实现的日志模块,但是这里存在一个问题,就是这个项目后面要使用多线程,那么多个线程如果同时打印的话可能会出现打印内容错乱的情况,这时候有两种解决方法,第一种就是加锁,但是我们这个项目是高并发高性能服务器组件,所以这个方法不适用,第二种方法就是将日志模块改成一个多线程异步版的,但是这会加大项目的工作量(如果自己做过异步日志项目可以使用自己做的),所以博主建议直接使用开源的第三方库spdlog,使用方便而且很容易移植到自己的项目中
上面只是提前说明,在没有引入多线程之前,还是先使用简单的日志模块即可,只要封装打印过程即可,并加入日志等级,文件名,文件行号,时间等。这里使用一些C语言的语法来获取这些信息,比如__FILE__和__LINE__可以获取文件名和行号,然后通过宏定义让调用者只需传入日志等级和信息即可
#define LOG(LEVEL, MESSAGE) log(#LEVEL, MESSAGE, __FILE__, __LINE__)
//LEVEL前加#是为了打印出来的等级不是数字而是字母
使用者只需要使用LOG(LEVEL, MESSAGE)即可打印日志信息,下面是完整日志代码
#include <string>
#include <iostream>
#include <ctime>
#define NORMAL 1
#define DEBUG 2
#define WARNING 3
#define ERROR 4
#define FATAL 5
#define LOG(LEVEL, MESSAGE) log(#LEVEL, MESSAGE, __FILE__, __LINE__)
void log(std::string level, std::string message, std::string file, int line)
{
// 获取当前时间
std::time_t t = std::time(nullptr);
char timeStr[100];
std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", std::localtime(&t));
std::cout << '[' << "Level# " << level << ']'
<< '[' << "time# " << timeStr << ']'
<< '[' << "File# " << file << ']'
<< '[' << "Line# " << line << ']'
<< "Message# " << message
<< std::endl;
}
3. 缓冲区模块
设计思想
缓冲区模块就是自己设计一个用户缓冲区,当一个连接发送数据过来之后当可读事件触发时先将内核缓冲区中的数据放入我们自己的用户缓冲区中,以便内核缓冲区持续保持可写状态而减少等待时间,这样客户端就可以持续向服务端连续发送数据,提高了效率。
为了方便管理,用户缓冲区肯定是使用C++的STL容器来充当,这里我们就用vector来充当缓冲区,那么就可以使用双指针来充当可读数据的开始位置和可写数据的开始位置,写数据的时候直接从可写指针开始写,读数据的时候直接从可读指针开始读直到读到可写指针为止。
然后就是扩容方案的设计,这个时候可以分为两大情况,整个数组的空余空间(即可可读起始位置到写起始位置之间空间以外的空间)是否有足够空间写入下一次的写入数据。
第一种情况,如果不够的话,直接扩容,这里可以使用一种扩容思想,就是每次扩容可以取当前数组大小的1.5倍大小和从可写起始位置写入该数据后的空间大小两者的最大值当作该次扩容的大小,这样可以减少扩容次数。
第二种情况就是不扩容即可写入该数据,但是这时分两种情况
- 第一种情况就是可写起始位置后面的空间足够写入该数据,那么直接写入即可
- 第二种情况就是可写起始位置后面不足以写入该数据,但是可写起始位置后面空间和可读起始位置之前的空间之和可以写入该数据,这时可以将中间的可读数据向前移动至缓冲区头部,然后直接从可写起始位置写入(移动后需要移动这两个指针的位置)
接口设计
接下来就是所需要向外提供的功能接口,这里需要考虑后面所要用到的接口,或者先实现基础接口,等后面要用什么接口再实现,博主就将所有需要使用到的接口都提及了
第一肯定是需要提供读写接口,博主建议这个读写接口可以重载多种,可以使用字符串指针或者string写入和读出数据;第二个就是获取可读数据大小和是否有数据可读的接口;第三个是后面Http模块中要用到的取出缓冲区中的一行数据(即可读起始位置到第一个’\\n’的数据);第四个是缓冲区清空。这些就是几个后面要用的对外接口了
// 获取可读数据大小
inline uint64_t GetReadSize();
// 判断缓冲区中是否有数据可读
bool AbleRead();
// 写入数据(对外接口)
void Write(const void *data, uint64_t len);
void Write(const std::string &str);
void Write(Buffer &buff);
// 读出数据(对外接口)
void Read(void *out, uint64_t len = 0);
std::string Read(uint64_t len = 0);
// 获取一行数据即从可读起始位置到第一个'\\n'(这里包括'\\n')(对外接口)
void GetLine(void *out);
std::string GetLine();
// 清空缓冲区
void Clear();
除了这些还有一些私有接口,这些私有接口都是上面公有接口所调用的,大致有获取缓冲区起始地址、获取可读起始地址指针、获取可写起始地址指针、获取当前整个缓冲区中可写空间大小即空闲空间、获取可读起始位置之前的空间、获取可写起始位置以及之后的位置、移动可写起始位置、移动可读起始位置、确保缓冲区能够容纳接下来要写入的数据、写入数据、读出数据、获取从可读起始位置开始的第一个’\\n’位置地址指针这些私有接口,这些实现起来并不麻烦,而且上面扩容思想已经详细讲解过,所以这些就不一一详细解释了
// 获取缓冲区起始地址
inline char *Begin();
// 获取可读起始地址指针
inline char *GetWriteAddr();
// 获取可写起始地址指针
inline char *GetReadAddr();
// 获取当前整个缓冲区中可写空间大小即空闲空间,包括可读起始位置之前的空间和可写起始位置以及之后的位置
inline uint64_t GetWriteSize();
// 获取可读起始位置之前的空间
inline uint64_t GetHeadEmptySize();
// 获取可写起始位置以及之后的位置
inline uint64_t GetTailEmptySize();
// 移动可写起始位置
inline void MoveWritePos(const uint64_t len);
// 移动可读起始位置
inline void MoveReadPos(const uint64_t len);
// 确保缓冲区能够容纳接下来要写入的数据,如果不够则扩容,空间足够则需按照两种情况来处理
// 1. 可写起始位置之后的空间足够大直接写入
// 2. 可写起始位置之后的空间不够,但是加上可读起始位置之前的空间大小足够,则先将数据往前移动在写入
void EnableWriteSpaceEnough(const uint64_t len);
// 写入数据
void _Write(const void *data, const uint64_t len);
// 读出数据
void _Read(void *out, const uint64_t len);
// 获取从可读起始位置开始的第一个'\\n'位置地址指针
void *GetCrlf();
功能实现
实现过程其实很简单,博主主要对读写入数据实现进行讲解,其他可参考代码进行理解,如有问题,随时可在评论区留言
读出数据主要就三步,第一步检查缓冲区中是否有所需长度的数据,第二步就是使用copy函数将数据拷贝出去,第三步向后移动可读起始位置
写入数据主要也就三步,第一步确保缓冲区中有空间进行写入(这步过程按上面扩容方案进行实现即可),第二步将数据拷贝进缓冲区,第三步向后移动可写起始位置
是不是非常简单,那博主提前预告一下,下一篇博客的内容实现非常要命,请提前做好心理准备,是一场硬仗要打
具体实现代码请跳转到文章尾部部分
测试
主要就是测试缓冲区是否能正常读写,还有扩容的正常运作(这里只是提供简单的测试,自己可以想想可以做其他测试)
测试代码:
int main()
{
ns_buffer::Buffer buffer;
std::string str = "xiaomisu7ultra"; // 写入数据
std::string s; // 读出数据
for (int i = 0; i < 100; i++)
{
if (i == 50)
{
s = buffer.Read();
std::cout << "第一次读出数据:" << s << std::endl;
}
buffer.Write(str);
}
s = buffer.Read();
std::cout << "第二次读出数据:" << s << std::endl;
return 0;
}
注:这里博主在测试的时候发现一个问题,因为该模块有些地方是后面改过的,而且后面使用了any类和spdlog,所以一直使用C++17标准,然后在写博客测试的时候,发现c++11和c++17在string的data()接口不太一样,c++11没有重载出非const返回值的接口,但c++17重载了,所以在测试的时候注意一下这个问题
./Buff
第一次读出数据:xiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultra
第二次读出数据:xiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultraxiaomisu7ultra
这个测试即测试了读写和中途读出数据以及扩容
4. 套接字模块
设计思想
这个模块就更简单了,如果学过网络编程的应该都实现过,就是将Linux提供的那套接口进行封装,但是封装后需要将不需要提供给外部的设置成私有,其他设置为公用。而该模块主要就是对socket接口封装以及提供创建服务端以及客户端的接口以及服务端和客户端所要用到的接口
为了实现这个模块,先了解一下Linux提供的接口使用方法
- int socket(int domain, int type, int protocol);
参数
- domain(协议族): 指定套接字使用的协议族(通信域)。常见的值包括:
- AF_INET:IPv4 网络协议。
- AF_INET6:IPv6 网络协议。
- AF_UNIX 或 AF_LOCAL:本地通信(同一主机上的进程间通信)。
- AF_PACKET:底层套接字,用于直接访问网络设备。
- type (套接字类型): 指定套接字的类型,决定通信的特性。常见的值包括:
- SOCK_STREAM:面向连接的套接字(TCP)。
- SOCK_DGRAM:无连接的套接字(UDP)。
- SOCK_RAW:原始套接字,允许直接访问底层协议(需要管理员权限)。
- protocol(协议): 指定使用的具体协议。通常可以设置为0,表示根据 domain 和 type 自动选择合适的协议。例如:
- 对于 SOCK_STREAM 和 AF_INET,默认协议是 IPPROTO_TCP。
- 对于 SOCK_DGRAM 和 AF_INET,默认协议是 IPPROTO_UDP。
- 成功:返回一个非负整数,表示新创建的套接字文件描述符(sockfd)。该描述符用于后续的套接字操作(如 bind、listen、connect 等)。
- 失败:返回-1,并设置errno以指示错误原因。例如:
- EMFILE:进程已达到文件描述符限制。
- ENFILE:系统已达到文件描述符限制。
- EACCES:权限不足。
- EINVAL:参数无效。
返回值
- int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
参数
-
sockfd:
- 套接字文件描述符,由 socket() 函数返回。
- 表示要绑定的套接字。
-
addr:
-
指向 sockaddr 结构体的指针,表示要绑定的地址。
-
具体类型通常是 struct sockaddr_in(IPv4)或 struct sockaddr_in6(IPv6),需要强制转换为 (struct sockaddr *)。
-
结构体内容包括 IP 地址和端口号。例如:
-
对于 IPv4,使用struct sockaddr_in
struct sockaddr_in {
sa_family_t sin_family; // 地址族 (AF_INET)
in_port_t sin_port; // 端口号 (网络字节序)
struct in_addr sin_addr; // IP 地址 (网络字节序)
char sin_zero[8]; // 填充字段
} -
对于 IPv6,使用 struct sockaddr_in6。
-
-
-
addrlen:
- 指定 addr 结构体的大小(以字节为单位)。
- 对于 IPv4,通常为 sizeof(struct sockaddr_in)。
- 对于 IPv6,通常为 sizeof(struct sockaddr_in6)。
- 成功:
- 返回 0,表示绑定成功。
- 失败:
- 返回-1,并设置errno以指示错误原因。例如:
- EACCES:权限不足。
- EADDRINUSE:指定的地址已被占用。
- EBADF:sockfd 不是有效的文件描述符。
- EINVAL:套接字已绑定,或者参数无效。
- ENOTSOCK:sockfd 不是套接字。
- 返回-1,并设置errno以指示错误原因。例如:
返回值
- int listen(int sockfd, int backlog);
参数
- sockfd:
- 套接字文件描述符,由 socket() 函数返回。
- 必须是已经通过 bind() 绑定了地址和端口的套接字。
- 表示要设置为监听模式的套接字。
- backlog:
- 指定内核为此套接字维护的已完成连接队列和未完成连接队列的最大长度。
- 已完成连接队列:存储已完成三次握手的连接。
- 未完成连接队列:存储正在进行三次握手的连接。
- 如果有更多的连接请求超过队列长度,新的连接请求可能会被拒绝(客户端会收到 ECONNREFUSED 错误)。
- 常见值:
- 通常设置为一个较大的值(如 SOMAXCONN,系统定义的最大值)。
- 如果设置为 0,则队列长度可能会被系统调整为默认值。
- 成功:
- 返回 0表示套接字成功进入监听模式。
- 失败:
- 返回-1,并设置errno以指示错误原因。例如:
- EADDRINUSE:地址已被占用。
- EBADF:sockfd 不是有效的文件描述符。
- ENOTSOCK:sockfd 不是套接字。
- EOPNOTSUPP:套接字类型不支持监听操作(如非流式套接字)。
- 返回-1,并设置errno以指示错误原因。例如:
返回值
- int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
参数
-
sockfd:
- 套接字文件描述符,由 socket() 函数返回。
- 表示客户端用于连接的套接字。
-
addr:
-
指向 sockaddr 结构体的指针,表示服务器的地址信息。
-
具体类型通常是 struct sockaddr_in(IPv4)或 struct sockaddr_in6(IPv6),需要强制转换为 (struct sockaddr *)。
-
结构体内容包括服务器的 IP 地址和端口号。例如:
-
对于 IPv4,使用struct sockaddr_in:
struct sockaddr_in {
sa_family_t sin_family; // 地址族 (AF_INET)
in_port_t sin_port; // 端口号 (网络字节序)
struct in_addr sin_addr; // IP 地址 (网络字节序)
char sin_zero[8]; // 填充字段
}; -
对于 IPv6,使用 struct sockaddr_in6。
-
-
-
addrlen:
- 指定 addr 结构体的大小(以字节为单位)。
- 对于 IPv4,通常为 sizeof(struct sockaddr_in)。
- 对于 IPv6,通常为 sizeof(struct sockaddr_in6)。
- 成功:
- 返回 0,表示连接成功。
- 失败:
- 返回-1,并设置errno以指示错误原因。例如:
- ECONNREFUSED:目标地址拒绝连接。
- ETIMEDOUT:连接超时。
- EHOSTUNREACH:目标主机不可达。
- EINPROGRESS:套接字为非阻塞模式,连接正在进行中。
- 返回-1,并设置errno以指示错误原因。例如:
返回值
- ssize_t recv(int sockfd, void *buf, size_t len, int flags);
参数
- sockfd:
- 套接字文件描述符,由 socket() 或 accept() 返回。
- 表示从哪个套接字接收数据。
- buf:
- 指向一个缓冲区的指针,用于存储接收到的数据。
- 该缓冲区需要由调用者分配,大小至少为 len 字节。
- len:
- 指定缓冲区的大小(以字节为单位)。
- 表示最多接收多少字节的数据。
- flags:
- 控制接收行为的标志,可以是以下值的组合:
- 0:默认行为(阻塞模式下等待数据)。
- MSG_DONTWAIT:非阻塞模式,如果没有数据可用,立即返回。
- MSG_PEEK:查看数据但不从接收缓冲区中移除数据。
- MSG_WAITALL:等待接收到指定的 len 字节数据,除非发生错误或连接关闭。
- MSG_OOB:接收带外数据。
- 控制接收行为的标志,可以是以下值的组合:
- 成功:
- 返回接收到的字节数(ssize_t 类型)。
- 如果返回值为 0,表示对端已关闭连接。
- 失败:
- 返回-1,并设置errno以指示错误原因。例如:
- EAGAIN 或 EWOULDBLOCK:在非阻塞模式下,没有数据可用。
- EBADF:sockfd 不是有效的文件描述符。
- ECONNRESET:连接被对端重置。
- ENOTCONN:套接字未连接。
- 返回-1,并设置errno以指示错误原因。例如:
返回值
- ssize_t send(int sockfd, const void *buf, size_t len, int flags);
参数
- sockfd:
- 套接字文件描述符,由 socket() 或 accept() 返回。
- 表示向哪个套接字发送数据。
- buf:
- 指向要发送数据的缓冲区的指针。
- 缓冲区中的数据将被发送到目标套接字。
- len:
- 指定要发送的数据长度(以字节为单位)。
- 表示从 buf 中最多发送多少字节的数据。
- flags:
- 控制发送行为的标志,可以是以下值的组合:
- 0:默认行为。
- MSG_DONTWAIT:非阻塞模式,如果无法立即发送数据,立即返回。
- MSG_OOB:发送带外数据(仅适用于支持带外数据的协议,如 TCP)。
- MSG_NOSIGNAL:阻止发送过程中产生 SIGPIPE 信号。
- 控制发送行为的标志,可以是以下值的组合:
- 成功:
- 返回实际发送的字节数(ssize_t 类型)。
- 返回值可能小于 len,表示部分数据已成功发送。
- 失败:
- 返回-1,并设置errno以指示错误原因。例如:
- EAGAIN 或 EWOULDBLOCK:在非阻塞模式下,发送缓冲区已满。
- EBADF:sockfd 不是有效的文件描述符。
- ECONNRESET:连接被对端重置。
- ENOTCONN:套接字未连接。
- EPIPE:对端已关闭连接。
- 返回-1,并设置errno以指示错误原因。例如:
返回值
- int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
参数
- sockfd:
- 套接字文件描述符,由 socket() 或 accept() 返回。
- 表示要设置选项的套接字。
- level:
- 指定选项的协议层级。
- 常见值:
- SOL_SOCKET:通用套接字选项。
- IPPROTO_TCP:TCP 协议选项。
- IPPROTO_IP:IPv4 协议选项。
- IPPROTO_IPV6:IPv6 协议选项。
- optname:
- 指定要设置的选项名称。
- 常见值(取决于level的值):
- SOL_SOCKET选项:
- SO_REUSEADDR:允许重用本地地址。
- SO_KEEPALIVE:启用 TCP 保活机制。
- SO_RCVBUF:接收缓冲区大小。
- SO_SNDBUF:发送缓冲区大小。
- IPPROTO_TCP选项:
- TCP_NODELAY:禁用 Nagle 算法(减少延迟)。
- IPPROTO_IP或IPPROTO_IPV6选项:
- IP_TTL:设置 IPv4 的 TTL 值。
- IPV6_UNICAST_HOPS:设置 IPv6 的跳数限制。
- SOL_SOCKET选项:
- optval:
- 指向包含选项值的缓冲区的指针。
- 例如,若设置 SO_REUSEADDR,optval 通常指向一个 int 类型的变量,其值为 1(启用)或 0(禁用)。
- optlen:
- 指定 optval 缓冲区的大小(以字节为单位)。
- 通常为 sizeof(int),但具体取决于选项的类型。
- 成功:
- 返回 0,表示选项设置成功。
- 失败:
- 返回-1,并设置errno以指示错误原因。例如:
- EBADF:sockfd 不是有效的文件描述符。
- EINVAL:无效的选项或参数。
- ENOPROTOOPT:指定的选项在指定的协议中不可用。
- ENOTSOCK:sockfd 不是套接字。
- 返回-1,并设置errno以指示错误原因。例如:
返回值
- int fcntl(int fd, int cmd, ... /* arg */ );
参数
- fd:
- 文件描述符,表示要操作的文件或套接字。
- 由系统调用(如 open() 或 socket())返回。
- cmd:
- 指定要执行的操作。
- 常见值包括:
- F_DUPFD:复制文件描述符,返回一个新的文件描述符,值大于或等于 arg。
- F_GETFD:获取文件描述符标志(如 FD_CLOEXEC)。
- F_SETFD:设置文件描述符标志。
- F_GETFL:获取文件状态标志(如是否为非阻塞模式)。
- F_SETFL:设置文件状态标志(如设置非阻塞模式 O_NONBLOCK)。
- F_GETOWN:获取文件描述符的所有者(用于异步 I/O 或信号驱动 I/O)。
- F_SETOWN:设置文件描述符的所有者。
- F_GETLK:获取文件锁状态。
- F_SETLK:设置文件锁(非阻塞)。
- F_SETLKW:设置文件锁(阻塞)。
- … /\\* arg \\*/:
- 可选参数,取决于 cmd 的值。
- 例如:
- 对于 F_DUPFD,arg 是一个整数,表示新的文件描述符的最小值。
- 对于 F_SETFD 或 F_SETFL,arg 是一个整数,表示要设置的标志。
- 对于文件锁操作(如 F_SETLK 或 F_SETLKW),arg 是指向 struct flock 的指针。
- 成功:
- 返回值取决于cmd的具体操作。例如:
- 对于 F_DUPFD,返回新的文件描述符。
- 对于 F_GETFD 或 F_GETFL,返回标志值。
- 对于其他操作,通常返回 0。
- 返回值取决于cmd的具体操作。例如:
- 失败:
- 返回-1,并设置errno以指示错误原因。例如:
- EBADF:无效的文件描述符。
- EINVAL:无效的命令或参数。
- EACCES:权限不足。
- EAGAIN:操作被阻塞(如文件锁操作)。
- 返回-1,并设置errno以指示错误原因。例如:
返回值
接口设计
功能接口分为对外接口和内部接口,对外接口基本就是调用内部接口进行功能实现
对外接口
- 创建客户端接口(客户端)
- 创建服务端接口(服务端)
- 获取新连接接口(服务端)
- 关闭服务端接口(客户端、服务端)
- 接收数据对外接口(客户端、服务端)
- 发送数据对外接口(客户端、服务端)
- 获取套接字接口(客户端、服务端)
// 获取套接字
int GetFd();
// 接收数据
ssize_t Recv(void *buf, size_t len, bool non_block = false);
// 发送数据
ssize_t Send(const void *buf, size_t len, bool non_block = false);
// 服务器获取新连接
int Accept();
// 关闭套接字
void Close();
// 创建服务端连接
bool CreateServer(const uint16_t port, const std::string &ip = "0.0.0.0", int flag = 1);
// 创建客户端连接
bool CreateClient(const uint16_t port, const std::string &ip);
内部接口
- 创建套接字接口(客户端、服务端)
- 绑定地址端口信息接口(服务端)
- 监听接口(服务端)
- 发起连接接口(客户端)
- 接收数据内部接口(客户端、服务端)
- 发送数据内部接口(客户端、服务端)
- 设置套接字选项接口——开启地址端口重用选项(服务端)
- 设置套接字属性接口——设置为非阻塞(服务端)
// 创建套接字
bool CreateSocket();
// 绑定地址端口信息
bool Bind(const std::string &ip, uint16_t port);
// 监听
bool Listen(const int backlog = MAX_LISTEN);
// 客户端发起连接
bool Connect(const std::string &ip, const uint16_t port);
// 接收数据
ssize_t _Recv(void *buf, size_t len, int flag = 0);
// 发送数据
ssize_t _Send(const void *buf, size_t len, int flag = 0);
// 设置套接字选项——开启地址端口重用选项
void SetAddrReuse();
// 设置套接字属性——设置为非阻塞
void SetNonBlock();
功能实现
这个模块的功能实现没多少内容需要分析的,只需能把上面的Linux提供的接口掌握了就能实现上面这几个功能
需要注意的是在发送数据时,如果对方关闭了连接,那么你发送数据的返回值会为0,这里做个特殊处理,如果返回为0,则给接口使用者返回-1,其他几种特殊错误,比如被信号打断或者内核缓冲区为空,则返回0。这样处理后面的模块设计中会用到依次为依据来释放连接。
创建服务端的过程就五步,第一步创建套接字,第二步判断是否打开套接字非阻塞属性,第三步绑定地址端口信息,第四步监听套接字,第五步开启地址端口重用属性
创建客户端就两步,第一步创建套接字,第二步向服务端发起连接
具体代码实现请看文章最末尾的代码部分
测试
测试代码
// 服务端
int main()
{
ns_socket::Socket svr_socket;
svr_socket.CreateServer(8081, "0.0.0.0", 0);// 不设置非阻塞
while (1)
{
int newsockfd = svr_socket.Accept();
if (newsockfd < 0)
{
LOG(DEBUG, "accept failed");
return –1;
}
ns_socket::Socket comm_socket(newsockfd);
char buffer[1024];
int ret = comm_socket.Recv(&buffer, 1023);
if (ret < 0)
{
LOG(DEBUG, "recv failed");
comm_socket.Close();
}
LOG(NORMAL, buffer);
comm_socket.Send(buffer, ret);
}
return 0;
}
// 客户端
int main()
{
ns_socket::Socket cli_socket;
cli_socket.CreateClient(8081, "127.0.0.1");
std::string str = "xiaomi su7ultra";
cli_socket.Send(str.c_str(), str.size());
char buf[1024] = {0};
cli_socket.Recv(&buf, 1023);
LOG(NORMAL, buf);
return 0;
}
测试结果
// 服务端
./Server
[Level# NORMAL][time# 2025–04–04 13:55:47][File# test_socket_server.cc][Line# 23]Message# xiaomi su7ultra
// 客户端
./Client
[Level# NORMAL][time# 2025–04–04 13:55:47][File# test_socket_client.cc][Line# 11]Message# xiaomi su7ultra
5. 完整代码
缓冲区模块代码
#pragma once
#include <iostream>
#include <vector>
#include <algorithm>
#include <cstring>
#include "log.hpp"
namespace ns_buffer
{
#define BUFFER_DEFAULT_CAPACITY 1024
#define Expansion_Factor 1.5
class Buffer
{
private:
std::vector<char> _buffer; // 数组充当缓冲区
uint64_t _write_index; // 可写起始指针
uint64_t _read_index; // 可读起始指针
public:
Buffer()
: _write_index(0), _read_index(0), _buffer(BUFFER_DEFAULT_CAPACITY)
{
}
Buffer(const Buffer &b)
: _write_index(b._write_index), _read_index(b._read_index), _buffer(b._buffer)
{
}
Buffer &operator=(const Buffer &b)
{
_buffer = b._buffer;
_write_index = b._write_index;
_read_index = b._read_index;
return *this;
}
public:
// 获取可读数据大小
inline uint64_t GetReadSize()
{
return _write_index – _read_index;
}
// 判断缓冲区中是否有数据可读
bool AbleRead()
{
return (GetReadSize() > 0);
}
// 写入数据(对外接口)
void Write(const void *data, uint64_t len)
{
if (len == 0)
return;
_Write(data, len);
}
void Write(const std::string &str)
{
if (str.size() == 0)
return;
_Write(str.c_str(), str.size());
}
void Write(Buffer &buff)
{
std::string s;
buff.Read(&s);
Write(s);
}
// 读出数据(对外接口)
void Read(void *out, uint64_t len = 0)
{
if (len == 0)
len = GetReadSize();
_Read(out, len);
}
std::string Read(uint64_t len = 0)
{
if (len == 0)
len = GetReadSize();
std::string s;
s.resize(len);
_Read(s.data(), len);
return s;
}
// 获取一行数据即从可读起始位置到第一个'\\n'(这里包括'\\n')(对外接口)
void GetLine(void *out)
{
auto crlfpos = GetCrlf();
if (crlfpos == NULL)
{
out = nullptr;
return;
}
_Read(out, static_cast<char *>(crlfpos) – static_cast<char *>(GetReadAddr()) + 1);
}
std::string GetLine()
{
std::string s;
auto crlfpos = GetCrlf();
if (crlfpos == NULL)
{
return "";
}
uint64_t len = static_cast<char *>(crlfpos) – static_cast<char *>(GetReadAddr()) + 1;
s.resize(len);
_Read(s.data(), len);
return s;
}
// 清空缓冲区
void Clear()
{
_read_index = 0;
_write_index = 0;
}
private:
// 获取缓冲区起始地址
inline char *Begin()
{
return &*_buffer.begin();
}
// 获取可读起始地址指针
inline char *GetWriteAddr()
{
return Begin() + _write_index;
}
// 获取可写起始地址指针
inline char *GetReadAddr()
{
return Begin() + _read_index;
}
// 获取当前整个缓冲区中可写空间大小即空闲空间,包括可读起始位置之前的空间和可写起始位置以及之后的位置
inline uint64_t GetWriteSize()
{
return GetHeadEmptySize() + GetTailEmptySize();
}
// 获取可读起始位置之前的空间
inline uint64_t GetHeadEmptySize()
{
return _read_index;
}
// 获取可写起始位置以及之后的位置
inline uint64_t GetTailEmptySize()
{
return _buffer.size() – _write_index;
}
// 移动可写起始位置
inline void MoveWritePos(const uint64_t len)
{
_write_index += len;
}
// 移动可读起始位置
inline void MoveReadPos(const uint64_t len)
{
_read_index += len;
}
// 确保缓冲区能够容纳接下来要写入的数据,如果不够则扩容,空间足够则需按照两种情况来处理
// 1. 可写起始位置之后的空间足够大直接写入
// 2. 可写起始位置之后的空间不够,但是加上可读起始位置之前的空间大小足够,则先将数据往前移动在写入
void EnableWriteSpaceEnough(const uint64_t len)
{
if (len <= GetTailEmptySize())
return;
if (len <= GetWriteSize())
{
std::copy(GetReadAddr(), GetWriteAddr(), Begin());
_write_index = GetReadSize();
_read_index = 0;
}
else
{
_buffer.resize(std::max((uint64_t)(Expansion_Factor * _buffer.size()), len + _write_index));
}
}
// 写入数据
void _Write(const void *data, const uint64_t len)
{
if (len == 0)
return;
EnableWriteSpaceEnough(len);
std::copy(static_cast<const char *>(data), static_cast<const char *>(data) + len, GetWriteAddr());
MoveWritePos(len);
}
// 读出数据
void _Read(void *out, const uint64_t len)
{
if (len > GetReadSize())
{
LOG(FATAL, "读出数据大小大于可读数据大小");
return;
}
auto readaddr = GetReadAddr();
std::copy(readaddr, readaddr + len, static_cast<char *>(out));
MoveReadPos(len);
}
// 获取从可读起始位置开始的第一个'\\n'位置地址指针
void *GetCrlf()
{
return std::memchr(GetReadAddr(), '\\n', GetReadSize());
}
};
} // namespace ns_buffer
套接字模块代码
#include <iostream>
#include <string>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/socket.h>
#include "log.hpp"
namespace ns_socket
{
#define MAX_LISTEN 1024
class Socket
{
private:
int _sockfd;
public:
Socket()
: _sockfd(–1)
{
}
Socket(const int sockfd)
: _sockfd(sockfd)
{
}
~Socket()
{
Close();
}
public:
// 获取套接字
int GetFd()
{
return _sockfd;
}
// 接收数据
ssize_t Recv(void *buf, size_t len, bool non_block = false)
{
return non_block == false ? _Recv(buf, len) : _Recv(buf, len, MSG_DONTWAIT);
}
// 发送数据
ssize_t Send(const void *buf, size_t len, bool non_block = false)
{
if (len == 0)
return 0;
return non_block == false ? _Send(buf, len) : _Send(buf, len, MSG_DONTWAIT);
}
// 服务器获取新连接
int Accept()
{
int newfd = accept(_sockfd, NULL, NULL);
if (newfd < 0)
LOG(ERROR, "Accept Failed");
return newfd;
}
// 关闭套接字
void Close()
{
if (_sockfd != –1)
{
close(_sockfd);
_sockfd = –1;
}
}
// 创建服务端连接
bool CreateServer(const uint16_t port, const std::string &ip = "0.0.0.0", int flag = 1)
{
if (CreateSocket() == false)
return false;
if (flag == 1)
SetNonBlock();
if (Bind(ip, port) == false)
return false;
if (Listen() == false)
return false;
SetAddrReuse();
return true;
}
// 创建客户端连接
bool CreateClient(const uint16_t port, const std::string &ip)
{
if (CreateSocket() == false)
return false;
if (Connect(ip, port) == false)
return false;
return true;
}
private:
// 创建套接字
bool CreateSocket()
{
// int socket(int domain, int type, int protocol)
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd == –1)
{
LOG(ERROR, "Sockfd Create Failed");
return false;
}
return true;
}
// 绑定地址端口信息
bool Bind(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip.c_str());
addr.sin_port = htons(port);
if (bind(_sockfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in)) < 0)
{
LOG(ERROR, "Bind Failed");
return false;
}
return true;
}
// 监听
bool Listen(const int backlog = MAX_LISTEN)
{
if (listen(_sockfd, backlog) < 0)
{
LOG(ERROR, "Listen Failed");
return false;
}
return true;
}
// 客户端发起连接
bool Connect(const std::string &ip, const uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip.c_str());
addr.sin_port = htons(port);
if (connect(_sockfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in)) < 0)
{
LOG(ERROR, "Connect Failed");
return false;
}
return true;
}
// 接收数据
ssize_t _Recv(void *buf, size_t len, int flag = 0)
{
int ret = recv(_sockfd, buf, len, flag);
if (ret <= 0)
{
if (errno == EINTR)
{
LOG(WARNING, "Interrupted By Signal");
return 0;
}
else if (errno == EAGAIN)
{
LOG(WARNING, "No Data In Buffer");
return 0;
}
return –1;
}
return ret;
}
// 发送数据
ssize_t _Send(const void *buf, size_t len, int flag = 0)
{
int ret = send(_sockfd, buf, len, flag);
if (ret < 0)
{
if (errno == EINTR)
{
LOG(WARNING, "Interrupted By Signal");
return 0;
}
else if (errno == EAGAIN)
{
LOG(WARNING, "Buffer Is Full");
return 0;
}
LOG(ERROR, "Send Message Failed");
return –1;
}
return ret;
}
// 设置套接字选项——开启地址端口重用选项
void SetAddrReuse()
{
int opt = 1;
if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)(&opt), sizeof(opt)) < 0)
LOG(ERROR, "Set Reuse Addr Failed");
opt = 1;
if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)(&opt), sizeof(opt)) < 0)
LOG(ERROR, "Set Reuse Opt Failed");
}
// 设置套接字属性——设置为非阻塞
void SetNonBlock()
{
int flag = fcntl(_sockfd, F_GETFL, 0);
if (flag < 0)
{
LOG(ERROR, "Get Socket Flage Failed");
return;
}
if (fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK) < 0)
LOG(ERROR, "Set Non-Block Failed");
}
};
} // namespace ns_socket
上面预告过了,下一个博客是实现Channel模块+Poll模块+TimerWheel模块+EventLoop模块,这几个模块可以说联系紧密,所以实现起来比较麻烦,但是博主会尽可能用最简单的语句和图片解释清楚,敬请期待一下吧
专栏:Reactor模型高并发服务器项目 项目代码仓库:Reactor模型高并发服务器项目代码仓库(随博客更新) 都看到这里了,留下你们的珍贵的👍点赞+⭐收藏+📋评论吧
评论前必须登录!
注册