目录
1.准备知识
1.1时间轮
1.2正则表达式
1.3 通用类型Any类型的实现
2.Server服务器模块实现:
2.1 缓冲区Buffer类的实现
2.2 日志宏的实现
2.3 套接字模块的实现
2.4 事件管理Channel类的实现
2.5 描述符事件监控Poller类实现
2.6 定时任务管理TimerWheel类实现
2.7 Reactor-EventLoop线程池类实现
2.8 用于管理单个事件循环线程LoopThread类
2.9 线程池的主要逻辑LoopThreadPool类
2.10 Any类代码的实现
2.11 通信连接管理Connection类实现
2.12 监听描述符管理Acceptor模块
2.13 服务器类TcpServer类的实现
3.HTTP协议支持模块实现
3.1 Util实用工具类实现
3.2 HttpRequest 类的实现
3.3 HttpResponse类的实现
3.4 HttpContext类的实现
3.5 HttpServer类的实现
4.结语
1.准备知识
1.1时间轮
如下图,这就是时间轮的图像,接下来我来解释一下这个时间轮的原理
中间的指针我们称为_tick,用来表示现在指针指向的位置,指针指向这个位置的时候,就去检查这个位置上面是否有对应任务到了时间,如果时间已到,那么就移除这个任务,并且执行这个对应的任务。
如何实现?
首先我们实现一个TimerTask的类,这个类就是用来包含具体的任务,超时时间等等,以便这个任务时间到了的时候执行对应的任务
先包装任务函数
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
包含的私有成员
uint64_t _id; // 定时器任务对象id
uint32_t _timeout; // 定时任务的超时时间
bool _canceled; // false-表示没有被取消
TaskFunc _task_cb; // 定时器对象要执行的定时任务
ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
包含的公有成员有设置是否取消任务、设置释放时用来删除定时器对象信息的函数、设置超时时间
void Cancel()
{
_canceled = true;
}
void SetRelease(const ReleaseFunc& cb)
{
_release = cb;
}
uint32_t DelayTime()
{
return _timeout;
}
TimerTask的整体代码如下:
class TimerTask
{
private:
uint64_t _id; // 定时器任务对象id
uint32_t _timeout; // 定时任务的超时时间
bool _canceled; // false-表示没有被取消
TaskFunc _task_cb; // 定时器对象要执行的定时任务
ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
public:
TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb)
: _id(id), _timeout(delay), _task_cb(cb), _canceled(false)
{
}
~TimerTask()
{
if(_canceled == false)_task_cb();
_release();
}
void Cancel()
{
_canceled = true;
}
void SetRelease(const ReleaseFunc& cb)
{
_release = cb;
}
uint32_t DelayTime()
{
return _timeout;
}
};
接下来就是时间轮的主体,时间轮_wheel我们使用一个vector来实现,_tick指针遍历完一遍数组之后从头开始遍历数组,数组的每一个位置中,也存放着一个数组,这个数组中存放任务。
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
int _tick; // 当前的秒针,走到哪里释放哪里
int _capacity; // 表盘最大数量,其实就是最大延迟时间
std::vector<std::vector<PtrTask>> _wheel;
std::unordered_map<uint64_t, WeakTask> _timers; //使用任务ID找到对应的weak_ptr
这里解释一下为什么要使用weak_ptr来作为对应TimerTask的指针,因为weak_ptr不会增加其引用计数,不影响shared_ptr中的引用计数为0时对TimerTask的正常释放
把TimerTask从_timers中移除
void RemoveTimer(uint16_t id)
{
auto it = _timers.find(id);
if(it != _timers.end())
{
_timers.erase(it);
}
}
添加定时任务
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb)
{
// 创建一个新的
PtrTask pt(new TimerTask(id, delay, cb));
// 设置释放函数
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
// 当前指针的位置 加上 超时时间 除 时间轮的容量得到插入位置
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
// 把任务放入哈希表中
_timers[id] = WeakTask(pt);
}
刷新/延迟定时任务,这里需要说明的是,这里使用的lock(),是可以把weak_ptr提升为shared_ptr 强智能指针,再添加一个TimerTask进去,相当于把shared_ptr的引用计数增加1,变相地起到了刷新/延迟定时任务的作用
void TimerRefresh(uint64_t id)//刷新/延迟定时任务
{
//通过保存的定时器对象的weakptr构造一个shared_ptr出来,添加到轮子中
auto it = _timers.find(id);
if(it == _timers.end())
{
//没有找到定时任务
return;
}
PtrTask pt = it->second.lock();//lock获取weakptr管理的对象对应的shared_ptr;
int delay = pt->DelayTime();
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
}
取消定时任务
void TimerCancel(uint64_t id)
{
auto it = _timers.find(id);
if(it == _timers.end())
{
//没有找到定时任务
return;
}
PtrTask pt = it->second.lock();
if(pt) pt->Cancel();
}
执行定时任务
void RunTimeTask()
{
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear();
}
总体的时间轮代码
class TimerWheel
{
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
int _tick; // 当前的秒针,走到哪里释放哪里
int _capacity; // 表盘最大数量,其实就是最大延迟时间
std::vector<std::vector<PtrTask>> _wheel;
std::unordered_map<uint64_t, WeakTask> _timers; //使用任务ID找到对应的weak_ptr
private:
void RemoveTimer(uint16_t id)
{
auto it = _timers.find(id);
if(it != _timers.end())
{
_timers.erase(it);
}
}
public:
TimerWheel()
: _capacity(60),
_tick(0),
_wheel(_capacity)
{
}
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb) //添加定时任务
{
PtrTask pt(new TimerTask(id, delay, cb));
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
_timers[id] = WeakTask(pt);
}
void TimerRefresh(uint64_t id)//刷新/延迟定时任务
{
//通过保存的定时器对象的weakptr构造一个shared_ptr出来,添加到轮子中
auto it = _timers.find(id);
if(it == _timers.end())
{
//没有找到定时任务
return;
}
PtrTask pt = it->second.lock();//lock获取weakptr管理的对象对应的shared_ptr;
int delay = pt->DelayTime();
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
}
void TimerCancel(uint64_t id)
{
auto it = _timers.find(id);
if(it == _timers.end())
{
//没有找到定时任务
return;
}
PtrTask pt = it->second.lock();
if(pt) pt->Cancel();
}
//这个函数每秒被执行一次,相当于秒针向后走了一步
void RunTimeTask()
{
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear();
}
};
1.2正则表达式
首先,必不可少的,是对http请求行的解析,但是,如果直接手撕请求行,把请求行分为多个部分,未免有些麻烦,所以我们选择使用正则表达式来解析请求行,更为精确高效,
首先我们需要知道HTTP请求行的基本结构
<HTTP方法> <请求目标> <HTTP版本>
HTTP方法包括GET、POST、PUT、DELETE等方法
请求目标就是用户想要请求资源的对应路径
下面是一个HTTP请求行的例子:
GET /helloworld/index?user=diluodexingzi&pass=123456 HTTP/1.1\\r\\n
首先我们需要匹配HTTP方法,如下:
(GET|HEAD|POST|PUT|DELETE)
| 代表或的意思,表示从这几个常见的HTTP方法中选择一个
接下来我们需要来匹配资源路径
([^?]*)
[^?]表示匹配的是非?的字符,加上后面的*就表示匹配非?字符0次或多次
后面的user=diluodexingzi&pass=123456是我们不需要获取的,只匹配但是不捕获
(?:\\\\?(.*))?
(?:pattern)表示匹配pattern但是不获取匹配结果
\\\\? 表示原始的?字符,这里表示以?字符作为起始
.*代表提取除\\n之外的任意字符0次或多次
最后的?表示匹配前面的表达式0次或者一次,有的请求是没有查询字符串的
最后来匹配HTTP版本
(HTTP/1\\\\.[01])(?:\\n|\\r\\n)?
HTTP/1 表示以HTTP/1开始的字符串
\\\\. 表示匹配 . 原始字符
[01]表示匹配字符串0或者1字符
(?:\\r\\n) 表示匹配一个\\r\\n或者\\n字符,但是不捕捉这个内容
总的来说就是匹配以HTTP/1.开始,后面跟了一个0或1的字符,且最终以\\n或者\\r\\n作为结尾的字符串
1.3 通用类型Any类型的实现
每一个Connection对连接进行管理,最终都不可避免需要涉及到应用层协议的处理,因此在Connection中需要设置协议处理的上下文来控制处理节奏。但是应用层协议千千万,为了降低耦合度,这个协议接收解析上下文就不能有明显的协议倾向,它可以是任意协议的上下文信息因此就需要一个通用的类型来保存各种不同的数据结构。
为了增加代码的可移植性,我们减少对第三方库的依赖,决定自己来实现。
Any类肯定不能是一个模版类,否则编译的时候Any<int> a,Any<float> b,需要传入类型作为模版参数,也就是说在使用的时候就要确定其类型。 这是不可以的,因为保存在Content中的协议上下文,我们在定义any对象的时候是不知道他们的协议类型的,因此无法传递类型作为模版参数 因此考虑Any内部设计一个模版容器holder类,可以保存各种类型数据 而因为在Any类中无法定义这个holder对象或指针,因为Any也不知道这个类要保存什么类型的数据,因此无法传递类型参数 所以。定义一个基类placeholder,让holder继承于placeholder,而Any类保存父类指针即可 当需要保存数据时,则new一个带有模版参数的子类holder对象去保存数据。然后让Any类中的父类指针,指向这个子类对象就可以了
Any类的具体实现:
class Any
{
private:
class holder
{
public:
virtual ~holder()
{}
// 数据类型
virtual const std::type_info& type() = 0;
// 克隆出新的对象
virtual holder* clone() = 0;
};
template<class T>
class placeholder: public holder
{
public:
placeholder(const T& val): _val(val){}
// 获取子类对象保存的数据类型
virtual const std::type_info& type()
{
return typeid(T);
}
// 针对出当前的对象自身,克隆出一个新的对象
virtual holder* clone()
{
return new placeholder(_val);
}
public:
T _val;
};
holder* _content;
public:
Any():_content(NULL){}
template<class T>
Any(const T& val):_content(new placeholder<T>(val)){}
Any(const Any& other):_content(other._content ? other._content->clone() : NULL) {}
~Any(){ delete _content;}
Any &swap(Any& other)
{
std::swap(_content, other._content);
return *this;
}
template<class T>
//返回子类对象保存的数据的指针
T* get()
{
// 想要获取的数据类型,必须和保存的数据类型一致
//if(typeid(T) != _content->type()) return NULL;
assert(typeid(T) == _content->type());
return &((placeholder<T>*)_content)->_val;
}
//赋值运算符重载函数
template<class T>
Any& operator=(const T& val)
{
//为val构造一个临时的通用容器,然后与当前容器自身进行指针交换
//临时对象释放的时候,原先保存的数据也就被释放了
Any(val).swap(*this);
return *this;
}
Any& operator=(const Any& other)
{
Any(other).swap(*this);
return *this;
}
};
2.Server服务器模块实现:
2.1 缓冲区Buffer类的实现
Buffer类用于实现用户态缓冲区,提供数据缓冲,取出的功能
首先Buffer这个类有三个私有成员变量,分别是_buffer,_reader_idx,_writer_idx。这三个私有成员分别代表缓冲区,读偏移和写偏移。需要解释的是读偏移是指读到的位置,写偏移是指已经写进缓冲区中的位置
私有成员变量的代码如下:
// 使用vector进行内存空间管理
std::vector<char> _buffer;
uint64_t _reader_idx; // 读偏移
uint64_t _writer_idx; // 写偏移
首先我们需要有个函数能返回这个_buffer的起始地址
char* Begin()
{
return &*_buffer.begin();
}
_buffer.begin()返回的是迭代器的初始位置,我们需要对迭代器进行解引用然后取地址才能获得char*的地址。
然后我们也需要获得当前写入起始地址和当前读取起始地址
// 获取当前写入起始地址
char *WritePosition()
{
//_buffer的空间起始地址,加上写偏移量
return Begin() + _writer_idx;
}
// 获取当前读取起始地址
char *ReadPosition()
{
return Begin() + _reader_idx;
}
获得缓冲区末尾和起始空间的大小
// 获取缓冲区末尾空闲空间大小–写偏移之后的空闲空间
uint64_t TailIdleSize()
{
return _buffer.size() – _writer_idx;
}
// 获取缓冲区起始空闲空间大小–读偏移之前的空闲空间
uint64_t HeadIdleSize()
{
return _reader_idx;
}
可读数据的大小即是写偏移减去读偏移
// 获取可读数据大小
uint64_t ReadAbleSize()
{
return _writer_idx – _reader_idx;
}
读完数据或者写完数据都需要将读偏移和写偏移向后移动
// 将读偏移向后移动
void MoveReadOffset(uint64_t len)
{
if (len == 0)
return;
// 向后移动的大小,必须小于可读数据的大小
assert(len <= ReadAbleSize());
_reader_idx += len;
}
// 将写偏移向后移动
void MoveWriteOffset(uint64_t len)
{
// 向后移动的大小,必须小于当前后边的空闲空间大小
assert(len <= TailIdleSize());
_writer_idx += len;
}
然后,我们也需要确保可写的空间足够,如果空间够就直接移动数据。
问题来了,如何移动数据?
很简单,即把在读偏移和写偏移之间的数据都移动到_buffer的起始位置,这里需要用到std::copy函数,下面是我们需要用到的copy函数的定义
inline char *std::copy<char *, char *>(char *__first, char *__last, char *__result)
第一个和第二个参数分别表示数据的起始位置和结束位置,第三个参数则表示拷贝到的位置
如果即使移动数据,空间还是不够,我们就需要扩容,使用到的函数是resize
下面是详细代码
// 确保可写空间足够(足够就移动数据,否则就扩容)
void EnsureWriteSpace(uint64_t len)
{
// 如果末尾空闲空间大小足够,直接返回
if (TailIdleSize() >= len)
{
return;
}
// 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够,够了就将数据移动到起始位置
if (len <= TailIdleSize() + HeadIdleSize())
{
// 将数据移动到起始位置
uint64_t rsz = ReadAbleSize();
// 把可读数据拷贝到起始位置
std::copy(ReadPosition(), ReadPosition() + rsz, Begin());
_reader_idx = 0; // 将读偏移归0
_writer_idx = rsz; // 将写位置置为可读数据大小,因为当前的可读数据大小就是写偏移量
}
else
{
// 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可
DBG_LOG("RESIZE %ld", _writer_idx + len);
_buffer.resize(_writer_idx + len);
}
}
要写入数据进去,需要两步,即保证有足够空间和拷贝数据进入_buffer。
// 写入数据
void Write(const void *data, uint64_t len)
{
// 1.保证有足够空间
if (len == 0)
return;
EnsureWriteSpace(len);
const char *d = (const char *)data;
// 2.拷贝数据进去
std::copy(d, d + len, WritePosition());
}
接下来的几个函数都是基于Write函数的拓展集成,包括写入string、Buffer类型的数据
void WriteAndPush(const void *data, uint64_t len)
{
Write(data, len);
MoveWriteOffset(len);
}
void WriteString(const std::string &data)
{
return Write(data.c_str(), data.size());
}
void WriteStringAndPush(const std::string &data)
{
WriteString(data);
// std::cout << WritePosition() << std::endl;
MoveWriteOffset(data.size());
// std::cout << ReadAbleSize() << std::endl;
}
void WriteBuffer(Buffer &data)
{
return Write(data.ReadPosition(), data.ReadAbleSize());
}
void WriteBufferAndPush(Buffer &data)
{
WriteBuffer(data);
MoveWriteOffset(data.ReadAbleSize());
}
接下来的是Read函数,包含两个参数分别是把数据读到那个位置,和读取的长度
// 读取数据
void Read(void *buf, uint64_t len)
{
assert(len <= ReadAbleSize());
// std::cout << ReadPosition() << std::endl;
std::copy(ReadPosition(), ReadPosition() + len, (char *)buf);
}
同样,下面的函数也是基于Read函数的拓展集成
void ReadAndPop(void *buf, uint64_t len)
{
Read(buf, len);
MoveReadOffset(len);
}
// 把缓冲区中的数据当做string读取
std::string ReadAsString(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str;
str.resize(len);
Read(&str[0], len);
return str;
}
std::string ReadAsStringAndPop(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str = ReadAsString(len);
MoveReadOffset(len);
return str;
}
在读取的时候,例如读取Http协议,就需要一行一行的读,这时候就需要读取到每一行的\\r\\n,所以就有了FindCRLF函数使用到的是cstring中的memchr函数
void *memchr(const void *buf, int c, size_t count);
buf:指向待搜索内存块的指针
c:要查找的字符
count:要搜索的字节数
由于我们需要的是char* 的返回值,所以需要强制转换一下
char *FindCRLF()
{
char *res = (char *)memchr(ReadPosition(), '\\n', ReadAbleSize());
return res;
}
/*通常获取一行数据,这种情况针对是HTTP协议*/
std::string GetLine()
{
char *pos = FindCRLF();
if (pos == NULL)
{
return "";
}
// +1 是为了把换行字符也取出来
return ReadAsString(pos – ReadPosition() + 1);
}
下面是这个Buffer类的整体代码
#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
private:
// 使用vector进行内存空间管理
std::vector<char> _buffer;
uint64_t _reader_idx; // 读偏移
uint64_t _writer_idx; // 写偏移
public:
Buffer() : _reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}
char *Begin() { return &*_buffer.begin(); }
void TestPrint()
{
for (auto e : _buffer)
{
std::cout << e;
}
}
// 获取当前写入起始地址
char *WritePosition()
{
//_buffer的空间起始地址,加上写偏移量
return Begin() + _writer_idx;
}
// 获取当前读取起始地址
char *ReadPosition()
{
return Begin() + _reader_idx;
}
// 获取缓冲区末尾空闲空间大小–写偏移之后的空闲空间
uint64_t TailIdleSize()
{
return _buffer.size() – _writer_idx;
}
// 获取缓冲区起始空闲空间大小–读偏移之前的空闲空间
uint64_t HeadIdleSize()
{
return _reader_idx;
}
// 获取可读数据大小
uint64_t ReadAbleSize()
{
return _writer_idx – _reader_idx;
}
// 将读偏移向后移动
void MoveReadOffset(uint64_t len)
{
if (len == 0)
return;
// 向后移动的大小,必须小于可读数据的大小
assert(len <= ReadAbleSize());
_reader_idx += len;
}
// 将写偏移向后移动
void MoveWriteOffset(uint64_t len)
{
// 向后移动的大小,必须小于当前后边的空闲空间大小
assert(len <= TailIdleSize());
_writer_idx += len;
}
// 确保可写空间足够(足够就移动数据,否则就扩容)
void EnsureWriteSpace(uint64_t len)
{
// 如果末尾空闲空间大小足够,直接返回
if (TailIdleSize() >= len)
{
return;
}
// 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够,够了就将数据移动到起始位置
if (len <= TailIdleSize() + HeadIdleSize())
{
// 将数据移动到起始位置
uint64_t rsz = ReadAbleSize();
// 把可读数据拷贝到起始位置
std::copy(ReadPosition(), ReadPosition() + rsz, Begin());
_reader_idx = 0; // 将读偏移归0
_writer_idx = rsz; // 将写位置置为可读数据大小,因为当前的可读数据大小就是写偏移量
}
else
{
// 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可
DBG_LOG("RESIZE %ld", _writer_idx + len);
_buffer.resize(_writer_idx + len);
}
}
// 写入数据
void Write(const void *data, uint64_t len)
{
// 1.保证有足够空间
if (len == 0)
return;
EnsureWriteSpace(len);
const char *d = (const char *)data;
// 2.拷贝数据进去
std::copy(d, d + len, WritePosition());
}
void WriteAndPush(const void *data, uint64_t len)
{
Write(data, len);
MoveWriteOffset(len);
}
void WriteString(const std::string &data)
{
return Write(data.c_str(), data.size());
}
void WriteStringAndPush(const std::string &data)
{
WriteString(data);
// std::cout << WritePosition() << std::endl;
MoveWriteOffset(data.size());
// std::cout << ReadAbleSize() << std::endl;
}
void WriteBuffer(Buffer &data)
{
return Write(data.ReadPosition(), data.ReadAbleSize());
}
void WriteBufferAndPush(Buffer &data)
{
WriteBuffer(data);
MoveWriteOffset(data.ReadAbleSize());
}
// 读取数据
void Read(void *buf, uint64_t len)
{
assert(len <= ReadAbleSize());
// std::cout << ReadPosition() << std::endl;
std::copy(ReadPosition(), ReadPosition() + len, (char *)buf);
}
void ReadAndPop(void *buf, uint64_t len)
{
Read(buf, len);
MoveReadOffset(len);
}
// 把缓冲区中的数据当做string读取
std::string ReadAsString(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str;
str.resize(len);
Read(&str[0], len);
return str;
}
std::string ReadAsStringAndPop(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str = ReadAsString(len);
MoveReadOffset(len);
return str;
}
char *FindCRLF()
{
char *res = (char *)memchr(ReadPosition(), '\\n', ReadAbleSize());
return res;
}
/*通常获取一行数据,这种情况针对是HTTP协议*/
std::string GetLine()
{
char *pos = FindCRLF();
if (pos == NULL)
{
return "";
}
// +1 是为了把换行字符也取出来
return ReadAsString(pos – ReadPosition() + 1);
}
std::string GetLineAndPop()
{
std::string str = GetLine();
MoveReadOffset(str.size());
return str;
}
// 清空缓冲区
void Clear()
{
// 只需要将偏移量归零
_reader_idx = 0;
_writer_idx = 0;
}
};
2.2 日志宏的实现
作为一名合格的程序员,日志宏是不可或缺的,我们将在这里写一个简单的日志宏
首先我们的日志应该包含日志等级,方便我们调试,这里我们给出了三种等级的日志等级
INF,DBG,ERR,分别设置为0 1 2 然后1以上的日志等级将打印出具体的日志信息
我们也想日志能打印出现在的时间,需要使用到ctime这个头文件
里面有一个tm结构体,定义如下:
struct tm
{
int tm_sec; /* Seconds. [0-60] (1 leap second) */
int tm_min; /* Minutes. [0-59] */
int tm_hour; /* Hours. [0-23] */
int tm_mday; /* Day. [1-31] */
int tm_mon; /* Month. [0-11] */
int tm_year; /* Year – 1900. */
int tm_wday; /* Day of week. [0-6] */
int tm_yday; /* Days in year.[0-365] */
int tm_isdst; /* DST. [-1/0/1]*/
}
我们可以使用localtime这个函数来把时间戳的信息转移到这个tm结构体当中然后读取,在以特定格式来打印出时间
这里有几个小的知识点
1.如何打印对应的行号和文件呢?
可以使用__FILE__ 和 __LINE__ 两个宏定义来打印对应的文件名和行号
2.宏定义中的可变参数该如何表示呢?
使用##__VA_ARGS__来表示
下面就是详细的代码
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG
#define LOG(level, format, …) \\
do \\
{ \\
if (level < LOG_LEVEL) \\
break; \\
time_t t = time(NULL); \\
struct tm *ltm = localtime(&t); \\
char tmp[32] = {0}; \\
strftime(tmp, 31, "%H:%M:%S", ltm); \\
fprintf(stdout, "[%p %s %s:%d]" format "\\n", (void *)pthread_self(), tmp, __FILE__, __LINE__, ##__VA_ARGS__); \\
} while (0)
#define INF_LOG(format, …) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, …) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, …) LOG(ERR, format, ##__VA_ARGS__)
2.3 套接字模块的实现
套接字是服务器编程中不可缺少的,我们使用一个结构体Socket来统一管理
1.首先封装创建套接字的函数Create()
bool Create()
{
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd < 0)
{
ERR_LOG("CREATE SOCKET FAILED!!");
return false;
}
return true;
}
这里解释一下socket的三个参数,第一个参数表示使用IPV4协议,第二个参数表示使用面向连接的套接字,第三个参数表示选择TCP协议,也可以把第三个参数直接写成0,系统会自动选择协议
2.封装绑定地址信息的函数Bind(),传入两个参数即可,即ip和port
bool Bind(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = bind(_sockfd, (struct sockaddr *)&addr, len);
if (ret < 0)
{
ERR_LOG("BIND ADDRESS FAILED!");
return false;
}
return true;
}
其中sockaddr_in 这个结构体包含三个成员,分别是sin_family,sin_port,sin_addr
sin_family:sin_family设置为AF_INET表示使用IPV4协议
sin_port:传入sin_port参数的时候,需要将主机字节序转化为网络字节序。因为网络协议中统一使用大端模式作为标准字节序
sin_addr:需要使用inet_addr把点分十进制字符串转化为网络字节序的32位无符号整数的函数
接下来解释bind函数,第一个参数传入创建出来的套接字, 第二个参数是addr强转的地址,第三个参数表示表示addr的大小
3.绑定好地址信息后,就要开始监听了,这里我们使用Listen函数来包装,Listen传入一个缺省参数backlog
bool Listen(int backlog = MAX_LISTEN)
{
int ret = listen(_sockfd, backlog);
if (ret < 0)
{
ERR_LOG("SOCKET LISTEN FAILED!");
return false;
}
return true;
}
这里解释一下listen函数的两个参数,第一个参数表示传入的套接字,第二个参数表示定义了 已完成连接队列(ESTABLISHED 状态)的最大长度。
4.接下来就是向服务器发起连接Connect(),传入两个参数,ip和port
bool Connect(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = connect(_sockfd, (struct sockaddr *)&addr, len);
if (ret < 0)
{
ERR_LOG("CONNECT ADDRESS FAILED!");
return false;
}
return true;
}
这里connect的参数就不多说
ssize_t Recv(void *buf, size_t len, int flag = 0) // flag代表设置一些阻塞相关的操作
{
ssize_t ret = recv(_sockfd, buf, len, flag);
if (ret <= 0)
{
if (errno == EAGAIN || errno == EINTR)
{
return 0;
}
ERR_LOG("SOCKET RECV FAILED!");
return -1;
}
return ret;
}
5. 获取新链接使用Accept()函数来封装
int Accept()
{
int newfd = accept(_sockfd, NULL, NULL);
if (newfd < 0)
{
ERR_LOG("SOCKET ACCEPT FAILED!");
return -1;
}
return newfd;
}
这里我们选择将accept的第二个和第三个参数都设置为NULL表示我们不关心客户端地址信息。所以只传入了第一个参数套接字,用于从已完成连接队列中提取连接
6.使用Recv()函数来接受数据
ssize_t Recv(void *buf, size_t len, int flag = 0) // flag代表设置一些阻塞相关的操作
{
ssize_t ret = recv(_sockfd, buf, len, flag);
if (ret <= 0)
{
if (errno == EAGAIN || errno == EINTR)
{
return 0;
}
ERR_LOG("SOCKET RECV FAILED!");
return -1;
}
return ret;
}
recv有四个参数,简单解释一下,第一个参数传入套接字,第二个参数表示接受的数据放到的位置,第三个参数表示单次接受的数据量,flags控制接收行为的标志位。传入0表示阻塞读取。
7.再次封装一个函数NonBlockRecv表示非阻塞接受
ssize_t NonBlockRecv(void *buf, size_t len)
{
if (len == 0)
return 0;
return Recv(buf, len, MSG_DONTWAIT);
}
MSG_DONTWAIT表示不要等待的接受
8.发送数据封装一个Send()函数,封装三个参数分别是发送数据的位置,len表示发送数据的大小,第三个参数为flag和Recv参数一样
ssize_t Send(const void *buf, size_t len, int flag = 0)
{
// ssize_t send(int sockfd, void* data, size_t len, int flag);
ssize_t ret = send(_sockfd, buf, len, flag);
if (ret < 0)
{
if (errno == EAGAIN || errno == EINTR)
{
return 0;
}
ERR_LOG("SOCKET SEND FAILED!");
return -1;
}
return ret;
}
send的参数就不多解释了 9.封装一个NonBlockSend函数表示非阻塞发送
ssize_t NonBlockSend(void *buf, size_t len)
{
if (len == 0)
return 0;
return Send(buf, len, MSG_DONTWAIT);
}
这里同样使用MSG_DONTWAIT来表示非阻塞
10.套接字使用完毕后需要关闭,象文件描述符一样
void Close()
{
if (_sockfd != -1)
{
close(_sockfd);
_sockfd = -1;
}
}
11.创建一个服务器连接,在这里分为5个步骤,创建套接字,设置非阻塞,绑定地址,开始监听,启动地址重用
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false)
{
// 创建套接字
if (Create() == false)
return false;
// 设置非阻塞
if (block_flag)
NonBlock();
// 绑定地址
if (Bind(ip, port) == false)
return false;
// 开始监听
if (Listen() == false)
return false;
// 启动地址重用
ReuseAddress();
return true;
}
12.创建一个客户端连接,分为两个步骤:Create和Connect如下代码所示
bool CreateClient(uint16_t port, const std::string &ip)
{
// 1.创建套接字, 2.指向连接服务器
if (Create() == false)
return false;
if (Connect(ip, port) == false)
return false;
return true;
}
13.开启地址端口重用
这里解释一下什么事启动地址重用,在一个TCP连接关闭时,操作系统会保留该链接的端口一段时间,这个状态被称为TIME_WAIT。在TIME_WAIT状态下,端口不能被立即重新使用,这是为了确保所有相关的TCP报文段(如延迟的ACK或FIN)都已经被处理完毕,从而避免新链接与旧链接的数据混淆。设置地址重用,可以在短时间内连续启用同一个端口号
void ReuseAddress()
{
int val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int));
val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int));
}
下面是setsockopt函数的原型
int setsockopt(int sockfd, int level, int option_name, const void* option_value, socklen_t option_len)
socket参数指定被操作的目标socket,level参数指定要操作那个协议的选项,比如IPV4,IPV6,TCP等。option_name参数指定选项的名字,option_value和option_len参数分别是被操作选项的值和长度
这里我们把val设置为1,val是一个整数,通常设置为1或0。1表示启动指定的套接字选项,0表示禁用指定的套接字选项
14.设置套接字阻塞属性—设置为非阻塞,这里我们用到的函数是fcntl函数,代码后面将会给出详细的解释
void NonBlock()
{
int flag = fcntl(_sockfd, F_GETFL, 0);
fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
fcntl函数,正如其名字一样,提供了对文件描述符的各种控制操作,另外要一个常见的控制文件描述符的属性和行为的系统调用时ioctl,但是fcntl是由POSIX 规定的首选方法。
我们先在flag中获取fd的标志,然后再将这个函数的第二个参数设置为F_SETFL表示设置fd的标志,第三个参数就写成flag | O_NONBLOCK表示再加上非阻塞属性
最后是完整的socket代码
#define MAX_LISTEN 1024
class Socket
{
private:
int _sockfd;
public:
Socket() : _sockfd(-1) {}
Socket(int fd) : _sockfd(fd) {}
~Socket() { Close(); }
int Fd() { return _sockfd; }
// 创建套接字
bool Create()
{
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd < 0)
{
ERR_LOG("CREATE SOCKET FAILED!!");
return false;
}
return true;
}
// 绑定地址信息
bool Bind(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = bind(_sockfd, (struct sockaddr *)&addr, len);
if (ret < 0)
{
ERR_LOG("BIND ADDRESS FAILED!");
return false;
}
return true;
}
// 开始监听
bool Listen(int backlog = MAX_LISTEN)
{
int ret = listen(_sockfd, backlog);
if (ret < 0)
{
ERR_LOG("SOCKET LISTEN FAILED!");
return false;
}
return true;
}
// 向服务器发起连接
bool Connect(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = connect(_sockfd, (struct sockaddr *)&addr, len);
if (ret < 0)
{
ERR_LOG("CONNECT ADDRESS FAILED!");
return false;
}
return true;
}
// 获取新链接
int Accept()
{
int newfd = accept(_sockfd, NULL, NULL);
if (newfd < 0)
{
ERR_LOG("SOCKET ACCEPT FAILED!");
return -1;
}
return newfd;
}
// 接收数据
ssize_t Recv(void *buf, size_t len, int flag = 0) // flag代表设置一些阻塞相关的操作
{
ssize_t ret = recv(_sockfd, buf, len, flag);
if (ret <= 0)
{
if (errno == EAGAIN || errno == EINTR)
{
return 0;
}
ERR_LOG("SOCKET RECV FAILED!");
return -1;
}
return ret;
}
ssize_t NonBlockRecv(void *buf, size_t len)
{
if (len == 0)
return 0;
return Recv(buf, len, MSG_DONTWAIT);
}
// 发送数据
ssize_t Send(const void *buf, size_t len, int flag = 0)
{
// ssize_t send(int sockfd, void* data, size_t len, int flag);
ssize_t ret = send(_sockfd, buf, len, flag);
if (ret < 0)
{
if (errno == EAGAIN || errno == EINTR)
{
return 0;
}
ERR_LOG("SOCKET SEND FAILED!");
return -1;
}
return ret;
}
ssize_t NonBlockSend(void *buf, size_t len)
{
if (len == 0)
return 0;
return Send(buf, len, MSG_DONTWAIT);
}
// 关闭套接字
void Close()
{
if (_sockfd != -1)
{
close(_sockfd);
_sockfd = -1;
}
}
// 创建一个服务端连接
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false)
{
// 创建套接字
if (Create() == false)
return false;
// 设置非阻塞
if (block_flag)
NonBlock();
// 绑定地址
if (Bind(ip, port) == false)
return false;
// 开始监听
if (Listen() == false)
return false;
// 启动地址重用
ReuseAddress();
return true;
}
// 创建一个客户端连接
bool CreateClient(uint16_t port, const std::string &ip)
{
// 1.创建套接字, 2.指向连接服务器
if (Create() == false)
return false;
if (Connect(ip, port) == false)
return false;
return true;
}
// 设置套接字选项–开启地址端口重用
void ReuseAddress()
{
int val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int));
val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int));
}
// 设置套接字阻塞属性–设置为非阻塞
void NonBlock()
{
int flag = fcntl(_sockfd, F_GETFL, 0);
fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
};
2.4 事件管理Channel类的实现
Channel模块是对一个描述符需要进行IO事件管理的模块,实现对描述符可读,可写,错误事件的管理操作,以及Poller模块对描述符进行IO事件监控就绪后,根据不同的事件,回调不同的处理函数功能。
在Channel类的私有成员中,主要包含文件描述符_fd,当前需要监控的事件_events和当前触发的事件_revents还有各种事件被触发时的回调函数
int _fd;
EventLoop *_loop;
uint32_t _events; // 当前需要监控的事件
uint32_t _revents; // 当前触发的事件
using EventCallback = std::function<void()>;
EventCallback _read_callback; // 可读事件被触发的回调函数
EventCallback _write_callback; // 可写事件被触发的回调函数
EventCallback _error_callback; // 错误事件被触发的回调函数
EventCallback _close_callback; // 连接断开事件被触发的回调函数
EventCallback _event_callback; // 任意事件被触发的回调函数
接下来是Channel类中所包含的成员函数
首先是设置各种事件的回调函数
void SetReadCallback(const EventCallback &cb)
{
_read_callback = cb;
}
void SetWriteCallback(const EventCallback &cb)
{
_write_callback = cb;
}
void SetErrorCallback(const EventCallback &cb)
{
_error_callback = cb;
}
void SetCloseCallback(const EventCallback &cb)
{
_close_callback = cb;
}
void SetEventCallback(const EventCallback &cb)
{
_event_callback = cb;
}
接下来的两个函数用来检测当前是否可读和可写
bool ReadAble() // 当前是否可读
{
return (_events & EPOLLIN);
}
bool WriteAble() // 当前是否可写
{
return (_events & EPOLLOUT);
}
这四个函数用来启动事件监控和关闭事件监控
void EnableRead() // 启动读事件监控
{
_events |= EPOLLIN; /*后边会添加到EventLoop的事件监控中*/
Update();
}
void EnableWrite() // 启动写事件监控
{
_events |= EPOLLOUT;
Update();
}
void DisableRead() // 关闭读事件监控
{
_events &= ~EPOLLIN;
Update();
}
void DisableWrite() // 关闭写事件监控
{
_events &= ~EPOLLOUT;
Update();
}
void DisableAll() // 关闭所有事件监控
{
_events = 0;
Update();
}
还需要对监控进行移除和更新的函数,这两个函数需要在类外实现,因为EventLoop类在Channel::Remove()和Channel::Update()中被使用时,编译器需要看到他的完整定义
void Channel::Remove() // 移除监控
{
return _loop->RemoveEvent(this);
}
void Channel::Update()
{
return _loop->UpdateEvent(this);
}
接下来是事件处理函数,就调用这个函数,自己触发了什么事件如何自己决定。这里解释一些常用的epoll事件类型。
- EPOLLIN:数据可读
- EPOLLRDHUP:TCP连接被对方关闭,或者对方关闭了写操作,它由GNU引入
- EPOLLPRI:高优先级数据可读,比如TCP带外数据
- EPOLLOUT:数据可写
- EPOLLERR:错误
- EPOLLHUP:挂起,比如管道的写端被关闭后,读端描述符上将被收到EPOLLHUP事件。
void HandleEvent() // 事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事件如何自己决定
{
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
/*不管任何事件,都调用的回调函数*/
if (_read_callback)
_read_callback();
}
/*有可能会释放连接的操作事件,一次只处理一个*/
if (_revents & EPOLLOUT)
{
/*不管任何事件,都调用的回调函数*/
if (_write_callback)
_write_callback();
}
else if (_revents & EPOLLERR)
{
if (_error_callback)
_error_callback();
}
else if (_revents & EPOLLHUP)
{
if (_close_callback)
_close_callback();
}
if (_event_callback)
_event_callback();
}
下面是整个Channel类的实现
class Channel
{
private:
int _fd;
EventLoop *_loop;
uint32_t _events; // 当前需要监控的事件
uint32_t _revents; // 当前触发的事件
using EventCallback = std::function<void()>;
EventCallback _read_callback; // 可读事件被触发的回调函数
EventCallback _write_callback; // 可写事件被触发的回调函数
EventCallback _error_callback; // 错误事件被触发的回调函数
EventCallback _close_callback; // 连接断开事件被触发的回调函数
EventCallback _event_callback; // 任意事件被触发的回调函数
public:
Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {}
int Fd() { return _fd; }
void SetREvents(uint32_t events) { _revents = events; }
uint32_t Events() { return _events; }
void SetReadCallback(const EventCallback &cb)
{
_read_callback = cb;
}
void SetWriteCallback(const EventCallback &cb)
{
_write_callback = cb;
}
void SetErrorCallback(const EventCallback &cb)
{
_error_callback = cb;
}
void SetCloseCallback(const EventCallback &cb)
{
_close_callback = cb;
}
void SetEventCallback(const EventCallback &cb)
{
_event_callback = cb;
}
bool ReadAble() // 当前是否可读
{
return (_events & EPOLLIN);
}
bool WriteAble() // 当前是否可写
{
return (_events & EPOLLOUT);
}
void EnableRead() // 启动读事件监控
{
_events |= EPOLLIN; /*后边会添加到EventLoop的事件监控中*/
Update();
}
void EnableWrite() // 启动写事件监控
{
_events |= EPOLLOUT;
Update();
}
void DisableRead() // 关闭读事件监控
{
_events &= ~EPOLLIN;
Update();
}
void DisableWrite() // 关闭写事件监控
{
_events &= ~EPOLLOUT;
Update();
}
void DisableAll() // 关闭所有事件监控
{
_events = 0;
Update();
}
void Remove(); // 移除监控
void Update();
void HandleEvent() // 事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事件如何自己决定
{
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
/*不管任何事件,都调用的回调函数*/
if (_read_callback)
_read_callback();
}
/*有可能会释放连接的操作事件,一次只处理一个*/
if (_revents & EPOLLOUT)
{
/*不管任何事件,都调用的回调函数*/
if (_write_callback)
_write_callback();
}
else if (_revents & EPOLLERR)
{
if (_error_callback)
_error_callback();
}
else if (_revents & EPOLLHUP)
{
if (_close_callback)
_close_callback();
}
if (_event_callback)
_event_callback();
}
};
2.5 描述符事件监控Poller类实现
Poller模块是对epoll进行封装的一个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能
首先来说明一下这个类的私有成员变量,首先是epoll文件描述符_epfd,然后是_evt,这是用来描述需要监控的文件描述符,最后是_channels,这是用来通过文件描述符找到其对应的Channel模块。
int _epfd;
struct epoll_event _evs[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels;
接下来注意介绍其中的成员函数
首先是对epoll的直接操作函数Update,Update中传入两个参数,分别是channel指针和需要执行的操作op,首先我们需要通过channel指针获得fd,然后创建一个epoll_event类型的结构体并对其进行初始化,这里介绍一下epoll_event
struct epoll_event {
uint32_t events; // 事件类型(位掩码)
epoll_data_t data; // 用户数据(联合体)
};
typedef union epoll_data {
void *ptr; // 用户自定义数据指针
int fd; // 文件描述符
uint32_t u32; // 32位无符号整数
uint64_t u64; // 64位无符号整数
} epoll_data_t;
在epoll_data这个联合体当中,最常用的数据是fd
然后使用epoll_ctl对epoll实例添加、修改或删除监控的文件描述符及其关注的事件
int epoll_ctl(int __epfd, int __op, int __fd, epoll_event *) throw()
参数解释:
- __epfd:由epoll_create或epoll_create1创建的epoll实例的文件描述符
- __op:一般使用以下三种操作类型:EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL
- __fd:需要被监控或操作的普通文件描述符
- epoll_event类型参数:定义需要监控的事件类型和关联的用户数据
Update的完整实现:
void Update(Channel *channel, int op)
{
// int epoll_ctl(int epfd, int op, int fd, struct epoll_event* ev);
int fd = channel->Fd();
struct epoll_event ev;
ev.data.fd = fd;
ev.events = channel->Events();
int ret = epoll_ctl(_epfd, op, fd, &ev);
if (ret < 0)
{
ERR_LOG("EPOLLCTL FAILED!");
// abort(); //退出程序
}
return;
}
然后就是判断一个Channel是否已经添加了事件监控
bool HasChannel(Channel *channel)
{
auto it = _channels.find(channel->Fd());
if (it == _channels.end())
{
return false;
}
return true;
}
添加或修改监控事件,如果Channel能找到,那么就修改否则就添加
void UpdateEvent(Channel *channel)
{
bool ret = HasChannel(channel);
if (ret == false)
{
// 不存在则添加
_channels.insert(std::make_pair(channel->Fd(), channel));
return Update(channel, EPOLL_CTL_ADD);
}
return Update(channel, EPOLL_CTL_MOD);
}
移除监控
void RemoveEvent(Channel *channel)
{
auto it = _channels.find(channel->Fd());
{
if (it != _channels.end())
{
_channels.erase(it);
}
Update(channel, EPOLL_CTL_DEL);
}
}
开始监控,并且返回活跃连接
介绍一下epoll_wait,用于等待epoll实例监控的文件描述符上发生的事件,返回值是就绪的文件描述符的数量。并且这个函数会把就绪事件填充到第二个参数指向的数组中
void Poll(std::vector<Channel *> *active)
{
int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); //-1代表阻塞监控
if (nfds < 0)
{
if (errno == EINTR)
{
return;
}
ERR_LOG("EPOLL WAIT ERROR:%s\\n", strerror(errno));
abort();
}
for (int i = 0; i < nfds; i++)
{
auto it = _channels.find(_evs[i].data.fd);
assert(it != _channels.end());
it->second->SetREvents(_evs[i].events);
active->push_back(it->second);
}
return;
}
下面是完整代码
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
int _epfd;
struct epoll_event _evs[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels;
private:
// 对epoll的直接操作
void Update(Channel *channel, int op)
{
// int epoll_ctl(int epfd, int op, int fd, struct epoll_event* ev);
int fd = channel->Fd();
struct epoll_event ev;
ev.data.fd = fd;
ev.events = channel->Events();
int ret = epoll_ctl(_epfd, op, fd, &ev);
if (ret < 0)
{
ERR_LOG("EPOLLCTL FAILED!");
// abort(); //退出程序
}
return;
}
// 判断一个Channel是否已经添加了事件监控
bool HasChannel(Channel *channel)
{
auto it = _channels.find(channel->Fd());
if (it == _channels.end())
{
return false;
}
return true;
}
public:
Poller()
{
_epfd = epoll_create(MAX_EPOLLEVENTS);
if (_epfd < 0)
{
ERR_LOG("EPOLL CREATE FAILED!");
abort();
}
}
void UpdateEvent(Channel *channel) // 添加或修改监控事件
{
bool ret = HasChannel(channel);
if (ret == false)
{
// 不存在则添加
_channels.insert(std::make_pair(channel->Fd(), channel));
return Update(channel, EPOLL_CTL_ADD);
}
return Update(channel, EPOLL_CTL_MOD);
}
void RemoveEvent(Channel *channel) // 移除监控
{
auto it = _channels.find(channel->Fd());
{
if (it != _channels.end())
{
_channels.erase(it);
}
Update(channel, EPOLL_CTL_DEL);
}
}
void Poll(std::vector<Channel *> *active) // 开始监控,返回活跃连接
{
int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); //-1代表阻塞监控
if (nfds < 0)
{
if (errno == EINTR)
{
return;
}
ERR_LOG("EPOLL WAIT ERROR:%s\\n", strerror(errno));
abort();
}
for (int i = 0; i < nfds; i++)
{
auto it = _channels.find(_evs[i].data.fd);
assert(it != _channels.end());
it->second->SetREvents(_evs[i].events);
active->push_back(it->second);
}
return;
}
};
2.6 定时任务管理TimerWheel类实现
前面的准备知识中我们已经基本实现了时间轮,唯一需要解释的就是对于定时器文件描述符的使用
定时器文件描述符的创建:timerfd_create
第一个参数一般传入的是CLOCK_MONOTONIC,表示使用单调时钟,不收系统时间调整影响
第二个参数传入0表示没有特殊标志
然后我们需要设置定时器参数,这里使用的是itimerspec类型的结构体
struct itimerspec
{
struct timespec it_interval;
struct timespec it_value;
};
it_value包含两个参数,tv_sec和tv_nsec分别用来设置秒和毫秒,tv_sec用来设置第一次超时时间,it_interval同样包含两个参数tv_sec和tv_nsec,用来设置第一次超时后,每次的超时时间间隔
可以使用read函数,传入定时器文件描述符来读取一共超时了多少次
int ret = read(_timerfd, ×, 8);
下面就是这个类完整代码了
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
uint64_t _id; // 定时器任务对象id
uint32_t _timeout; // 定时任务的超时时间
bool _canceled; // false-表示没有被取消
TaskFunc _task_cb; // 定时器对象要执行的定时任务
ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
public:
TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb)
: _id(id), _timeout(delay), _task_cb(cb), _canceled(false)
{
}
~TimerTask()
{
if (_canceled == false)
_task_cb();
_release();
}
void Cancel()
{
_canceled = true;
}
void SetRelease(const ReleaseFunc &cb)
{
_release = cb;
}
uint32_t DelayTime()
{
return _timeout;
}
};
class TimerWheel
{
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
int _tick; // 当前的秒针,走到哪里释放哪里
int _capacity; // 表盘最大数量,其实就是最大延迟时间
std::vector<std::vector<PtrTask>> _wheel;
std::unordered_map<uint64_t, WeakTask> _timers; // 使用任务ID找到对应的weak_ptr
EventLoop *_loop;
int _timerfd; // 定时器描述符
std::unique_ptr<Channel> _timer_channel;
private:
void RemoveTimer(uint16_t id)
{
auto it = _timers.find(id);
if (it != _timers.end())
{
_timers.erase(it);
}
}
static int CreateTimerFd()
{
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0)
{
ERR_LOG("TIMERED CREATE FAILED!");
abort();
}
struct itimerspec itime;
itime.it_value.tv_sec = 1;
itime.it_value.tv_nsec = 0; // 第一次超时时间为1s后
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0; // 第一次超时后,每次超时的时间间隔
timerfd_settime(timerfd, 0, &itime, NULL);
return timerfd;
}
int ReadTimefd()
{
uint64_t times;
//有可能因为其他描述符的事件处理花费时间比较长,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次
//read读取到的数据times就是从上一次read之后超时的次数
int ret = read(_timerfd, ×, 8);
if (ret < 0)
{
ERR_LOG("READ TIMER FAILED!");
abort();
}
return times;
}
// 这个函数每秒被执行一次,相当于秒针向后走了一步
void RunTimeTask()
{
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear();
}
void OnTime()
{
//根据实际超时的次数,执行对应的超时任务
int times = ReadTimefd();
for(int i = 0; i < times; i++)
{
RunTimeTask();
}
}
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) // 添加定时任务
{
PtrTask pt(new TimerTask(id, delay, cb));
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
_timers[id] = WeakTask(pt);
}
void TimerRefreshInLoop(uint64_t id) // 刷新/延迟定时任务
{
// 通过保存的定时器对象的weakptr构造一个shared_ptr出来,添加到轮子中
auto it = _timers.find(id);
if (it == _timers.end())
{
// 没有找到定时任务
return;
}
PtrTask pt = it->second.lock(); // lock获取weakptr管理的对象对应的shared_ptr;
int delay = pt->DelayTime();
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
}
void TimerCancelInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
// 没有找到定时任务
return;
}
PtrTask pt = it->second.lock();
if (pt)
pt->Cancel();
}
public:
TimerWheel(EventLoop *loop)
: _capacity(60),
_tick(0),
_wheel(_capacity),
_loop(loop),
_timerfd(CreateTimerFd()),
_timer_channel(new Channel(_loop, _timerfd))
{
_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
_timer_channel->EnableRead();
}
/*定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行, 因此需要考虑线程安全问题*/
/*如果不想加锁,那就把对定时的所有操作,都放在一个线程中进行*/
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);
// 刷新/延迟定时任务
void TimerRefresh(uint64_t id);
void TimerCancel(uint64_t id);
/*这个接口存在线程安全问题–这个接口实际上不能被外界使用者调用,只能在模块内,在对应的EventLoop线程内执行*/
bool HasTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return false;
}
return true;
}
};
2.7 Reactor-EventLoop线程池类实现
EventLoop模块可以理解为Reactor模块,它是对Poller模块,TimeQueue模块,Socket模块的一个整体封装,进行所有描述符的事件监控
首先是EventLoop的私有成员变量
using Functor = std::function<void()>;
std::thread::id _thread_id; // 线程ID
int _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞
std::unique_ptr<Channel> _event_channel; // 通过这个来管理_event_fd中的事件
Poller _poller; // 进行所有描述符的事件监控
std::vector<Functor> _tasks; // 任务池
std::mutex _mutex; // 实现任务池操作的线程安全
TimerWheel _timer_wheel; // 定时器模块
然后就是对其中的成员函数解释
首先是执行任务池中的所有任务RunAllTask函数
void RunAllTask()
{
std::vector<Functor> functor;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.swap(functor); // 交换之后,任务全部交换到了functor里面
}
for (auto &f : functor)
{
f();
}
}
这个函数有一个点需要解释一下,就是为什么下面这段代码要加上大括号
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.swap(functor); // 交换之后,任务全部交换到了functor里面
}
原因:大括号用来定义一个作用域,使用大括号包含std::unique_lock<srd::mutex>的声明和_tasks.swap(functor)的调用,是为了限制std::unique_lock对象的生命周期和作用域。std::unique_lock<std::mutex>是一个RALL对象,会在构造时获取锁,并在析构时自动释放锁,将对象的作用域限制在大括号内,可以确保在离开这个作用域时,锁会被自动释放,这可以防止由于异常或提前返回导致的锁未释放问题
创建eventfd
static int CreateEventFd()
{
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0)
{
ERR_LOG("CREATE EVENTFD FAILED!");
abort();
}
return efd;
}
这里解释一下eventfd函数
第一个参数0表示初始事件计数为0,第二个参数是标志位,用于控制eventfd的行为
EFD_CLOEXEC:设置close-on-exec标志,确保在执行exec系列函数时关闭文件描述符
例如,在创建子进程时,父进程可能打开了一些文件描述符用于内部通信。如果不设置close-on-exec标志,这些文件描述符可能会被子进程继承,导致资源泄漏或安全问题。
EFD_NONBLOCK:设置非阻塞模式,对eventfd的读写操作不会阻塞调用线程
从_event_fd中读取事件计数
void ReadEventFd()
{
uint64_t res = 0;
int ret = read(_event_fd, &res, sizeof(res));
if (ret < 0)
{
if (errno == EINTR || errno == EAGAIN)
{
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return;
}
向eventfd中写入一个事件计数,以通知其他线程或进程事件的发生
void WakeUpEventFd()
{
uint64_t val = 1;
int ret = write(_event_fd, &val, sizeof(val));
if (ret < 0)
{
if (errno == EINTR)
{
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return;
}
Start函数,分为三个步骤:事件监控->就绪事件处理->执行任务
void Start()
{
while (1)
{
std::vector<Channel *> actives;
_poller.Poll(&actives);
for (auto &channel : actives)
{
channel->HandleEvent();
}
RunAllTask();
}
}
使用IsInLoop函数判断当前线程是否为eventloop对应的线程
bool IsInLoop()
{
return (_thread_id == std::this_thread::get_id());
}
判断要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列
void RunInLoop(const Functor &cb)
{
if (IsInLoop())
{
return cb();
}
return QueueInLoop(cb);
}
将操作压入任务池
void QueueInLoop(const Functor &cb)
{
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.push_back(cb);
}
// 唤醒有可能因为没有事件就绪而导致的epoll阻塞
// 其实就是给eventfd写入一个数据,eventfd就会触发可读事件
WakeUpEventFd();
}
最后就是对描述符监控的操作
// 添加/修改描述符的事件监控
void UpdateEvent(Channel *channel)
{
return _poller.UpdateEvent(channel);
}
// 移除描述符的监控
void RemoveEvent(Channel *channel)
{
return _poller.RemoveEvent(channel);
}
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{
return _timer_wheel.TimerAdd(id, delay, cb);
}
void TimerRefresh(uint64_t id)
{
return _timer_wheel.TimerRefresh(id);
}
void TimerCancel(uint64_t id)
{
return _timer_wheel.TimerCancel(id);
}
bool HasTimer(uint64_t id)
{
return _timer_wheel.HasTimer(id);
}
全部代码如下
class EventLoop
{
private:
using Functor = std::function<void()>;
std::thread::id _thread_id; // 线程ID
int _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞
std::unique_ptr<Channel> _event_channel; // 通过这个来管理_event_fd中的事件
Poller _poller; // 进行所有描述符的事件监控
std::vector<Functor> _tasks; // 任务池
std::mutex _mutex; // 实现任务池操作的线程安全
TimerWheel _timer_wheel; // 定时器模块
public:
// 执行任务池中的所有任务
void RunAllTask()
{
std::vector<Functor> functor;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.swap(functor); // 交换之后,任务全部交换到了functor里面
}
for (auto &f : functor)
{
f();
}
}
static int CreateEventFd()
{
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0)
{
ERR_LOG("CREATE EVENTFD FAILED!");
abort();
}
return efd;
}
void ReadEventFd()
{
uint64_t res = 0;
int ret = read(_event_fd, &res, sizeof(res));
if (ret < 0)
{
if (errno == EINTR || errno == EAGAIN)
{
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return;
}
void WakeUpEventFd()
{
uint64_t val = 1;
int ret = write(_event_fd, &val, sizeof(val));
if (ret < 0)
{
if (errno == EINTR)
{
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return;
}
public:
EventLoop()
: _thread_id(std::this_thread::get_id()),
_event_fd(CreateEventFd()),
_event_channel(new Channel(this, _event_fd)),
_timer_wheel(this)
{
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));
// 启动eventfd读事件监控
_event_channel->EnableRead();
}
void Start() // 三步走–事件监控->就绪事件处理->执行任务
{
while (1)
{
std::vector<Channel *> actives;
_poller.Poll(&actives);
for (auto &channel : actives)
{
channel->HandleEvent();
}
RunAllTask();
}
}
// 用于判断当前线程是否是eventloop对应的线程
bool IsInLoop()
{
return (_thread_id == std::this_thread::get_id());
}
void AssertInLoop()
{
assert(_thread_id == std::this_thread::get_id());
}
// 判断要执行的任务是否处于当前线程中,如果是则执行,如果不是则压入队列
void RunInLoop(const Functor &cb)
{
if (IsInLoop())
{
return cb();
}
return QueueInLoop(cb);
}
// 将操作压入任务池
void QueueInLoop(const Functor &cb)
{
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.push_back(cb);
}
// 唤醒有可能因为没有事件就绪而导致的epoll阻塞
// 其实就是给eventfd写入一个数据,eventfd就会触发可读事件
WakeUpEventFd();
}
// 添加/修改描述符的事件监控
void UpdateEvent(Channel *channel)
{
return _poller.UpdateEvent(channel);
}
// 移除描述符的监控
void RemoveEvent(Channel *channel)
{
return _poller.RemoveEvent(channel);
}
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{
return _timer_wheel.TimerAdd(id, delay, cb);
}
void TimerRefresh(uint64_t id)
{
return _timer_wheel.TimerRefresh(id);
}
void TimerCancel(uint64_t id)
{
return _timer_wheel.TimerCancel(id);
}
bool HasTimer(uint64_t id)
{
return _timer_wheel.HasTimer(id);
}
};
2.8 用于管理单个事件循环线程LoopThread类
这个类的主要作用是确保线程安全地创建和访问EventLoop对象
关于成员变量
std::mutex _mutex; // 互斥锁
std::condition_variable _cond; // 条件变量
EventLoop *_loop; // EventLoop指针变量,这个对象需要在线程
std::thread _thread; // EventLoop对应的线程
接下来解释一下成员函数
首先是ThreadEntry,负责在线程中实例化EventLoop对象,并确保其他线程能够安全地访问这个EventLoop对象。
void ThreadEntry()
{
EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);
_loop = &loop;
_cond.notify_all();
}
loop.Start();
}
这里解释一下_cond.notify_all(),他可以唤醒所有在_cond条件变量上等待的线程,通知他们_loop已经初始化完成。
关于这个类的初始化函数
初始化_thread成员变量为一个新的线程,该线程将执行ThreadEntry方法。
LoopThread()
: _loop(NULL),
_thread(std::thread(&LoopThread::ThreadEntry, this))
{
}
然后就是返回当前线程关联的EventLoop指针
直到_loop不为空才将loop初始化
EventLoop *GetLoop()
{
EventLoop *loop = NULL;
{
std::unique_lock<std::mutex> lock(_mutex); // 加锁
/*满足什么的条件就不会等待*/
// loop为空就一直阻塞
_cond.wait(lock, [&]()
{ return _loop != NULL; });
loop = _loop;
}
return loop;
}
下面是完整代码
class LoopThread
{
private:
/*用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop*/
std::mutex _mutex; // 互斥锁
std::condition_variable _cond; // 条件变量
EventLoop *_loop; // EventLoop指针变量,这个对象需要在线程
std::thread _thread; // EventLoop对应的线程
private:
/*实例化EventLoop对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能*/
void ThreadEntry()
{
EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);
_loop = &loop;
_cond.notify_all();
}
loop.Start();
}
public:
/*创建线程,设置线程入口函数*/
LoopThread()
: _loop(NULL),
_thread(std::thread(&LoopThread::ThreadEntry, this))
{
}
/*返回当前线程关联的EventLoop指针*/
EventLoop *GetLoop()
{
EventLoop *loop = NULL;
{
std::unique_lock<std::mutex> lock(_mutex); // 加锁
/*满足什么的条件就不会等待*/
// loop为空就一直阻塞
_cond.wait(lock, [&]()
{ return _loop != NULL; });
loop = _loop;
}
return loop;
}
};
2.9 线程池的主要逻辑LoopThreadPool类
LoopThreadPool用于管理多个LoopThread对象,提供线程池功能
首先是成员变量
int _thread_count;
int _next_idx;
EventLoop *_baseloop;
std::vector<LoopThread *> _threads;
std::vector<EventLoop *> _loops;
- _thread_count:线程的数量
- _next_idx:下一个线程的索引
- _baseloop:指向一个EventLoop的指针
- _threads:存储指向LoopThread对象的指针
- _loops:存储指向EventLoop对象的指针
然后就是对主要的成员函数的解释
首先是对线程池的创建Create()
当线程的总数大于0的时候,首先需要对_threads和_loops这两个数组进行初始化,然后创建_thread_count数目的线程以及线程对应的EventLoop
接下来是NextLoop函数,这个使用了轮询的方式来分配任务
EventLoop *NextLoop()
{
if (_thread_count == 0)
{
return _baseloop;
}
_next_idx = (_next_idx + 1) % _thread_count;
return _loops[_next_idx];
}
如果可用的工作线程数目为0,那么返回_baseloop,即基础的事件循环。通过模运算更新_next_idx实现轮询选择,最后根据更新后的_next_idx返回相应的事件循环
下面就是这个类的完整代码
class LoopThreadPool
{
private:
int _thread_count;
int _next_idx;
EventLoop *_baseloop;
std::vector<LoopThread *> _threads;
std::vector<EventLoop *> _loops;
public:
LoopThreadPool(EventLoop *baseloop)
: _thread_count(0),
_next_idx(0),
_baseloop(baseloop)
{
}
void SetThreadCount(int count)
{
_thread_count = count;
}
void Create()
{
if (_thread_count > 0)
{
_threads.resize(_thread_count);
_loops.resize(_thread_count);
for (int i = 0; i < _thread_count; i++)
{
_threads[i] = new LoopThread();
_loops[i] = _threads[i]->GetLoop();
}
}
return;
}
EventLoop *NextLoop()
{
if (_thread_count == 0)
{
return _baseloop;
}
_next_idx = (_next_idx + 1) % _thread_count;
return _loops[_next_idx];
}
};
2.10 Any类代码的实现
前面的准备知识中已经详细解释了Any类的具体实现,那么这里直接给出代码
class Any
{
private:
class holder
{
public:
virtual ~holder()
{
}
// 数据类型
virtual const std::type_info &type() = 0;
// 克隆出新的对象
virtual holder *clone() = 0;
};
template <class T>
class placeholder : public holder
{
public:
placeholder(const T &val) : _val(val) {}
// 获取子类对象保存的数据类型
virtual const std::type_info &type()
{
return typeid(T);
}
// 针对出当前的对象自身,克隆出一个新的对象
virtual holder *clone()
{
return new placeholder(_val);
}
public:
T _val;
};
holder *_content;
public:
Any() : _content(NULL) {}
template <class T>
Any(const T &val) : _content(new placeholder<T>(val)) {}
Any(const Any &other) : _content(other._content ? other._content->clone() : NULL) {}
~Any() { delete _content; }
Any &swap(Any &other)
{
std::swap(_content, other._content);
return *this;
}
template <class T>
// 返回子类对象保存的数据的指针
T *get()
{
// 想要获取的数据类型,必须和保存的数据类型一致
// if(typeid(T) != _content->type()) return NULL;
assert(typeid(T) == _content->type());
return &((placeholder<T> *)_content)->_val;
}
// 赋值运算符重载函数
template <class T>
Any &operator=(const T &val)
{
// 为val构造一个临时的通用容器,然后与当前容器自身进行指针交换
// 临时对象释放的时候,原先保存的数据也就被释放了
Any(val).swap(*this);
return *this;
}
Any &operator=(const Any &other)
{
Any(other).swap(*this);
return *this;
}
};
2.11 通信连接管理Connection类实现
Connection模块时对Buffer模块,Socket模块Channel模块的一个整体封装,实现了对一个套接通信字整体的管理,每一个进行数据通信的套接字(也就是accept获取到的新链接)都会使用Connecition进行管理。
首先,连接Connection的状态肯定有多种,这里我们使用enum来管理连接的状态
typedef enum
{
DISCONNECTED, // 连接关闭状态
CONNECTING, // 连接建立成功待处理状态
CONNECTED, // 连接建立完成,各种设置已完成可以通信的状态
DISCONNECTING // 待关闭状态
} ConnStatu;
接下来介绍一下Connection主题的设计
这里介绍一下shared_from_this,他是C++标准库的一个机制,定义在<memory>头文件中,通过std::enable_shared_fron_this模版类提供。它允许一个对象安全地生成指向自身的shared_ptr,而不会创建额外的控制块
shared_fron_this解决了以下问题
所以,为了安全地生成指向自身的shared_ptr,我们让Connection继承enable_shared_from_this<Connection>
下面介绍一下这个类的私有成员
uint64_t _conn_id; // 连接唯一ID,便于连接的管理和查找
// uint64_t _timer_id; //定时器ID,必须是唯一的,为了简化操作使用conn_id作为定时器ID
int _sockfd; // 连接关联的文件描述符
bool _enable_inactive_release; // 连接是否启动非活跃销毁,默认是false
EventLoop *_loop; // 连接所关联的一个loop
ConnStatu _statu; // 连接状态
Socket _socket; // 套接字连接管理
Channel _channel; // 连接的事件管理
Buffer _in_buffer; // 输入缓冲区—存放从socket中读取到的数据
Buffer _out_buffer; // 输出缓冲区—存放要发送给对端的数据
Any _context; // 请求的处理接收处理上下文
// 对外操作使用智能指针
// 这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)
/**/
using ConnectedCallback = std::function<void(const PtrConnection &)>;
using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
using ClosedCallback = std::function<void(const PtrConnection &)>;
using AnyEventCallback = std::function<void(const PtrConnection &)>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
/*组件内的连接关闭回调–组件内设置的,因为服务器组件内会把所有的链接管理起来,一旦某个连接要关闭*/
/*就应该从管理的地方移除掉自己的信息*/
ClosedCallback _server_closed_callback;
然后就是关于成员函数的介绍
首先是对描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callback
void HandleRead()
{
/*接收socket的数据,放到缓冲区*/
char buf[65536];
ssize_t ret = _socket.NonBlockRecv(buf, 65535);
if (ret < 0)
{
/*出错了,不能直接关闭连接,要看接受缓冲区和发送缓冲区中还有没有数据*/
return ShutDownInLoop();
}
// else if(ret == 0)
// {
// //这里的0表示没有读取到数据,而不是连接断开了,连接断开返回的是-1
// return;
// }
// 将数据放入到输入缓冲区,并且移动写偏移
_in_buffer.WriteAndPush(buf, ret);
/*调用_message_callback进行业务处理*/
if (_in_buffer.ReadAbleSize() > 0)
{
// shared_fron_this从当前对象自身获取到
return _message_callback(shared_from_this(), &_in_buffer);
}
}
处理写事件的函数逻辑与上面差不多
void HandleWrite() // 描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送
{
//_out_buffer中保存的数据就是要发送的数据
ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
if (ret < 0)
{
// 发送错误就该关闭连接了
if (_in_buffer.ReadAbleSize() > 0)
{
_message_callback(shared_from_this(), &_in_buffer);
}
return Release(); // 这时候就是实际的关闭释放操作
}
_out_buffer.MoveReadOffset(ret);
if (_out_buffer.ReadAbleSize() == 0)
{
_channel.DisableWrite(); // 没有数据待发送了,关闭写事件监控
// 如果连接时待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放
if (_statu == DISCONNECTING)
{
return Release();
}
}
return;
}
然后就是描述符触发挂断事件和出错事件
void HandleClose() // 描述符触发挂断事件
{
// 一旦连接挂断了,套接字就什么都干不了,因此有数据就处理一下,完毕关闭连接
if (_in_buffer.ReadAbleSize() > 0)
{
_message_callback(shared_from_this(), &_in_buffer);
}
return Release();
}
void HandleError() // 描述符触发出错事件
{
HandleClose();
}
当描述符触发任一事件时,我们需要刷新连接的活跃度
void HandleEvent() // 描述符触发任一事件
{
// 刷新链接的活跃度 + 调用组件使用者的任意事件回调
if (_enable_inactive_release == true)
{
_loop->TimerRefresh(_conn_id);
}
if (_event_callback)
{
_event_callback(shared_from_this());
}
}
下个函数会把连接状态从半连接修改为连接状态,并且会启动读事件监控
void EstablishedInLoop() // 连接获取之后,所处的状态下要进行的各种设置(给Channel设置时间回调,启动读监控)
{
// 修改连接状态 + 启动读事件监控 + 调用回调函数
assert(_statu == CONNECTING); // 当前的状态必须一定是上层的半连接状态
_statu = CONNECTED; // 当前函数执行完毕,则连接进入已完成连接状态
// 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃链接销毁
_channel.EnableRead();
if (_connected_callback)
_connected_callback(shared_from_this());
}
对连接进行释放的函数ReleaseInLoop()
void ReleaseInLoop() // 这个接口才是实际的释放接口
{
// 修改连接状态,设置为DISCONNECTED
_statu = DISCONNECTED;
// 移除连接的事件监控
_channel.Remove();
// 关闭描述符
_socket.Close();
// 如果当前定时器队列中还有定时销毁任务,则取消任务,避免野指针操作
if (_loop->HasTimer(_conn_id))
CancelInactiveReleaseInLoop();
// 调用关闭回调函数,避免因为先移除服务器管理的连接信息导致Connection被释放,再去处理就会出错,因此先调用用户
if (_closed_callback)
_closed_callback(shared_from_this());
// 移除服务器内部管理的连接信息
if (_server_closed_callback)
_server_closed_callback(shared_from_this());
}
把数据放入到发送缓冲区,启动可写事件监控
void SendInLoop(Buffer &buf)
{
if (_statu == DISCONNECTED)
return;
_out_buffer.WriteBufferAndPush(buf);
if (_channel.WriteAble() == false)
{
_channel.EnableWrite();
}
}
这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送
void ShutDownInLoop()
{
_statu = DISCONNECTED; // 设置连接为半关闭状态
if (_in_buffer.ReadAbleSize() > 0)
{
if (_message_callback)
_message_callback(shared_from_this(), &_in_buffer);
}
// 要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭
if (_out_buffer.ReadAbleSize() > 0)
{
if (_channel.WriteAble() == false) // 是否启动写事件监控
{
_channel.EnableWrite();
}
}
// 没有数据了
if (_out_buffer.ReadAbleSize() == 0)
{
Release();
}
}
启动非活跃链接超时释放,不存在这个销毁任务就新增,已经存在则刷新定时器
void EnableInactiveReleaseInLoop(int sec)
{
// 将判断标志_enable_inactive_release 置为true
_enable_inactive_release = true;
// 如果当前定时销毁任务已经存在,那就刷新延迟一下即可
if (_loop->HasTimer(_conn_id))
{
return _loop->TimerRefresh(_conn_id);
}
// 如果不存在定时销毁任务,则新增
_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
}
取消非活跃连接释放
void CancelInactiveReleaseInLoop()
{
_enable_inactive_release = false;
if (_loop->HasTimer(_conn_id))
{
_loop->TimerCancel(_conn_id);
}
}
切换协议,重置上下文以及阶段性处理函数
void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,
const ClosedCallback &closed, const AnyEventCallback &event)
{
_context = context;
_connected_callback = conn;
_message_callback = msg;
_closed_callback = closed;
_event_callback = event;
}
下面是Connection类的完整代码
typedef enum
{
DISCONNECTED, // 连接关闭状态
CONNECTING, // 连接建立成功待处理状态
CONNECTED, // 连接建立完成,各种设置已完成可以通信的状态
DISCONNECTING // 待关闭状态
} ConnStatu;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{
private:
uint64_t _conn_id; // 连接唯一ID,便于连接的管理和查找
// uint64_t _timer_id; //定时器ID,必须是唯一的,为了简化操作使用conn_id作为定时器ID
int _sockfd; // 连接关联的文件描述符
bool _enable_inactive_release; // 连接是否启动非活跃销毁,默认是false
EventLoop *_loop; // 连接所关联的一个loop
ConnStatu _statu; // 连接状态
Socket _socket; // 套接字连接管理
Channel _channel; // 连接的事件管理
Buffer _in_buffer; // 输入缓冲区—存放从socket中读取到的数据
Buffer _out_buffer; // 输出缓冲区—存放要发送给对端的数据
Any _context; // 请求的处理接收处理上下文
// 对外操作使用智能指针
// 这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)
/**/
using ConnectedCallback = std::function<void(const PtrConnection &)>;
using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
using ClosedCallback = std::function<void(const PtrConnection &)>;
using AnyEventCallback = std::function<void(const PtrConnection &)>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
/*组件内的连接关闭回调–组件内设置的,因为服务器组件内会把所有的链接管理起来,一旦某个连接要关闭*/
/*就应该从管理的地方移除掉自己的信息*/
ClosedCallback _server_closed_callback;
private:
/*五个channel的事件回调函数*/
void HandleRead() // 描述符可读事件触发后调用的函数,接收socket数据放到接受缓冲区中,然后调用_message_callback
{
/*接收socket的数据,放到缓冲区*/
char buf[65536];
ssize_t ret = _socket.NonBlockRecv(buf, 65535);
if (ret < 0)
{
/*出错了,不能直接关闭连接,要看接受缓冲区和发送缓冲区中还有没有数据*/
return ShutDownInLoop();
}
// else if(ret == 0)
// {
// //这里的0表示没有读取到数据,而不是连接断开了,连接断开返回的是-1
// return;
// }
// 将数据放入到输入缓冲区,并且移动写偏移
_in_buffer.WriteAndPush(buf, ret);
/*调用_message_callback进行业务处理*/
if (_in_buffer.ReadAbleSize() > 0)
{
// shared_fron_this从当前对象自身获取到
return _message_callback(shared_from_this(), &_in_buffer);
}
}
void HandleWrite() // 描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送
{
//_out_buffer中保存的数据就是要发送的数据
ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
if (ret < 0)
{
// 发送错误就该关闭连接了
if (_in_buffer.ReadAbleSize() > 0)
{
_message_callback(shared_from_this(), &_in_buffer);
}
return Release(); // 这时候就是实际的关闭释放操作
}
_out_buffer.MoveReadOffset(ret);
if (_out_buffer.ReadAbleSize() == 0)
{
_channel.DisableWrite(); // 没有数据待发送了,关闭写事件监控
// 如果连接时待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放
if (_statu == DISCONNECTING)
{
return Release();
}
}
return;
}
void HandleClose() // 描述符触发挂断事件
{
// 一旦连接挂断了,套接字就什么都干不了,因此有数据就处理一下,完毕关闭连接
if (_in_buffer.ReadAbleSize() > 0)
{
_message_callback(shared_from_this(), &_in_buffer);
}
return Release();
}
void HandleError() // 描述符触发出错事件
{
HandleClose();
}
void HandleEvent() // 描述符触发任一事件
{
// 刷新链接的活跃度 + 调用组件使用者的任意事件回调
if (_enable_inactive_release == true)
{
_loop->TimerRefresh(_conn_id);
}
if (_event_callback)
{
_event_callback(shared_from_this());
}
}
void EstablishedInLoop() // 连接获取之后,所处的状态下要进行的各种设置(给Channel设置时间回调,启动读监控)
{
// 修改连接状态 + 启动读事件监控 + 调用回调函数
assert(_statu == CONNECTING); // 当前的状态必须一定是上层的半连接状态
_statu = CONNECTED; // 当前函数执行完毕,则连接进入已完成连接状态
// 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃链接销毁
_channel.EnableRead();
if (_connected_callback)
_connected_callback(shared_from_this());
}
void ReleaseInLoop() // 这个接口才是实际的释放接口
{
// 修改连接状态,设置为DISCONNECTED
_statu = DISCONNECTED;
// 移除连接的事件监控
_channel.Remove();
// 关闭描述符
_socket.Close();
// 如果当前定时器队列中还有定时销毁任务,则取消任务,避免野指针操作
if (_loop->HasTimer(_conn_id))
CancelInactiveReleaseInLoop();
// 调用关闭回调函数,避免因为先移除服务器管理的连接信息导致Connection被释放,再去处理就会出错,因此先调用用户
if (_closed_callback)
_closed_callback(shared_from_this());
// 移除服务器内部管理的连接信息
if (_server_closed_callback)
_server_closed_callback(shared_from_this());
}
// 这个接口并不是实际的发送接口,而只是把数据放到发送缓冲区,启动可写事件监控
void SendInLoop(Buffer &buf)
{
if (_statu == DISCONNECTED)
return;
_out_buffer.WriteBufferAndPush(buf);
if (_channel.WriteAble() == false)
{
_channel.EnableWrite();
}
}
// 这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送
void ShutDownInLoop()
{
_statu = DISCONNECTED; // 设置连接为半关闭状态
if (_in_buffer.ReadAbleSize() > 0)
{
if (_message_callback)
_message_callback(shared_from_this(), &_in_buffer);
}
// 要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭
if (_out_buffer.ReadAbleSize() > 0)
{
if (_channel.WriteAble() == false) // 是否启动写事件监控
{
_channel.EnableWrite();
}
}
// 没有数据了
if (_out_buffer.ReadAbleSize() == 0)
{
Release();
}
}
// 启动非活跃连接超时释放规则
void EnableInactiveReleaseInLoop(int sec)
{
// 将判断标志_enable_inactive_release 置为true
_enable_inactive_release = true;
// 如果当前定时销毁任务已经存在,那就刷新延迟一下即可
if (_loop->HasTimer(_conn_id))
{
return _loop->TimerRefresh(_conn_id);
}
// 如果不存在定时销毁任务,则新增
_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
}
void CancelInactiveReleaseInLoop()
{
_enable_inactive_release = false;
if (_loop->HasTimer(_conn_id))
{
_loop->TimerCancel(_conn_id);
}
}
// 切换协议,重置上下文以及阶段性处理函数
void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,
const ClosedCallback &closed, const AnyEventCallback &event)
{
_context = context;
_connected_callback = conn;
_message_callback = msg;
_closed_callback = closed;
_event_callback = event;
}
public:
Connection(EventLoop *loop, uint64_t conn_id, int sockfd)
: _conn_id(conn_id),
_sockfd(sockfd),
_enable_inactive_release(false),
_loop(loop),
_statu(CONNECTING),
_socket(_sockfd),
_channel(loop, _sockfd)
{
_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
}
~Connection()
{
DBG_LOG("RELEASE CONNECTION:%p", this);
}
int Fd() // 获取管理的文件描述符
{
return _sockfd;
}
int Id() // 获取连接的ID
{
return _conn_id;
}
bool Connected() // 是否处于Connected状态
{
return (_statu == CONNECTED);
}
void SetContext(const Any &context) // 设置上下文–连接建立完成时
{
_context = context;
}
Any *GetContext() // 获取上下文,返回的是指针
{
return &_context;
}
void SetConnectedCallback(const ConnectedCallback &cb)
{
_connected_callback = cb;
}
void SetMessageCallback(const MessageCallback &cb)
{
_message_callback = cb;
}
void SetClosedCallback(const ClosedCallback &cb)
{
_closed_callback = cb;
}
void SetAnyEventCallback(const AnyEventCallback &cb)
{
_event_callback = cb;
}
void SetSrvClosedCallback(const ClosedCallback &cb)
{
_server_closed_callback = cb;
}
void Established() // 连接建立就绪后,进行Channel回调设置,启动读监控–连接建立完成时进行调用
{
_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
}
void Send(const char *data, size_t len) // 发送数据,将数据放到发送缓冲区,启动写事件监控
{
// 外界传入的data,可能是个临时的空间,我们只是把发送操作压入了任务池,有可能并没有被立即执行
// 因此有可能执行的时候,data指向的空间有可能已经被释放了
Buffer buf;
buf.WriteAndPush(data, len);
_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf))); // 右值引用
//_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, data, len));
}
void Shutdown() // 提供给组件使用者的关闭接口–并不实际关闭,需要判断有没有事件待处理
{
_loop->RunInLoop(std::bind(&Connection::ShutDownInLoop, this));
}
void Release()
{
_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));
}
void EnableInactiveRelease(int sec) // 启动非活跃销毁并定义多长时间无通信就是非活跃
{
_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
}
void CancelInactiveRelease() // 取消非活跃销毁
{
_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));
}
// 切换协议,重置上下文以及阶段性回调处理函数 — 非线程安全的 — 这个接口必须在EventLoop线程中执行
// 防备新的事件触发后,处理的时候,切换任务还没有被执行 — 会导致数据使用原协议处理了
void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,
const ClosedCallback &closed, const AnyEventCallback &event)
{
_loop->AssertInLoop();
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));
}
};
2.12 监听描述符管理Acceptor模块
Acceptor模块是对Socket模块,Channel模块的整体封装,实现对一个监听套接字的整体的管理
首先这是Acceptor类的成员变量介绍
Socket _socket; // 用于创建监听套接字
EventLoop *_loop; // 用于对监听套接字进行事件监控
Channel _channel; // 用于对监听套接字进行事件管理
using AcceptCallback = std::function<void(int)>; // 获取到链接的回调函数
AcceptCallback _accept_callback;
下面是对其中的成员函数解释
监听套接字的读事件处理函数,获取新链接,调用_accept_callback进行新链接处理
void HandleRead()
{
// DBG_LOG("ACCEPT HANDLE READ");
int newfd = _socket.Accept();
if (newfd < 0)
{
return;
}
if (_accept_callback)
_accept_callback(newfd);
}
创建服务端
int CreateServer(int port)
{
bool ret = _socket.CreateServer(port);
assert(ret == true);
return _socket.Fd();
}
然后是Acceptor的初始化函数,这里需要注意的是,不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动
否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没有被设置,新链接得不到处理,且资源泄露
Acceptor(EventLoop *loop, int port)
: _socket(CreateServer(port)),
_loop(loop),
_channel(loop, _socket.Fd())
{
_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
}
下面是这个类的完整代码
class Acceptor
{
private:
Socket _socket; // 用于创建监听套接字
EventLoop *_loop; // 用于对监听套接字进行事件监控
Channel _channel; // 用于对监听套接字进行事件管理
using AcceptCallback = std::function<void(int)>; // 获取到链接的回调函数
AcceptCallback _accept_callback;
private:
/*监听套接字的读事件处理函数—获取新链接,调用_accept_callback进行新链接处理*/
void HandleRead()
{
// DBG_LOG("ACCEPT HANDLE READ");
int newfd = _socket.Accept();
if (newfd < 0)
{
return;
}
if (_accept_callback)
_accept_callback(newfd);
}
int CreateServer(int port)
{
bool ret = _socket.CreateServer(port);
assert(ret == true);
return _socket.Fd();
}
public:
/*不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动*/
/*否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新链接得不到处理,且资源泄露*/
Acceptor(EventLoop *loop, int port)
: _socket(CreateServer(port)),
_loop(loop),
_channel(loop, _socket.Fd())
{
_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
}
void SetAcceptCallBack(const AcceptCallback &cb)
{
_accept_callback = cb;
}
void Listen()
{
_channel.EnableRead();
}
};
2.13 服务器类TcpServer类的实现
这个模块时一个整体Tcp服务器模块的封装,内部封装了Acceptor模块,LoopThreadPool模块
下面是对其成员变量的解释
uint64_t _next_id; // 这是一个自动增长的连接ID
int _timeout; // 这是非活跃链接的统计时间—多长时间无通信就是非活跃连接
int _port;
bool _enable_inactive_release; // 是否启动了非活跃连接超时销毁的判断标志
Acceptor _acceptor; // 这是监听套接字的管理对象
EventLoop _baseloop; // 这是主线程的EventLoop对象,负责监听事件的处理
LoopThreadPool _pool; // 这是我们的从属EventLoop线程池
std::unordered_map<uint64_t, PtrConnection> _conns; // 保存管理所有连接对应的shared_ptr对象
using ConnectedCallback = std::function<void(const PtrConnection &)>;
using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
using ClosedCallback = std::function<void(const PtrConnection &)>;
using AnyEventCallback = std::function<void(const PtrConnection &)>;
using Functor = std::function<void()>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
- TcpServer中包含有一个EventLoop对象:以备在超轻量使用场景中不需要EventLoop线程池,只需要在主线程中完成所有操作的情况
- TcpServer模块内部包含有一个LoopThreadPool对象:其实就是EventLoop线程池,也就是子Reactor线程池
- TcpServer模块内部包含有一个Acceptor对象:一个TcpServer服务器,必然对应有一个监听套接字,能够完成获取客户端新链接,并处理任务
- TcpServer模块内部包含有一个std::shared_ptr<Connection>的hash表:保存了所有的新建连接对应的Connection。需要注意的是,所有的Connection使用shared_ptr进行管理,这样能够保证在hash表中删除了Connection的信息后,在shared_ptr计数器为0的情况下完成对Connection资源的释放操作
接下来是对其成员函数的解释
首先是对主线程的EventLoop对象添加定时器
void RunAfterInLoop(const Functor &task, int delay)
{
_next_id++;
_baseloop.TimerAdd(_next_id, delay, task);
}
为新链接构造Connection进行管理
我们为每一个链接初始化一个Connection类进行管理,然后设置各种回调函数,最后将这个链接加入到unordered_map中
void NewConnection(int fd)
{
// DBG_LOG("NEWCONNECTION FUNCTION");
_next_id++;
PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));
conn->SetMessageCallback(_message_callback); // 为通行套接字设置可读事件的回调函数
conn->SetClosedCallback(_closed_callback); // 关闭事件的回调函数
conn->SetConnectedCallback(_connected_callback); // 错误事件的回调函数
conn->SetAnyEventCallback(_event_callback);
conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
if (_enable_inactive_release)
conn->EnableInactiveRelease(_timeout); // 启动非活跃超时销毁
conn->Established(); // 就绪初始化
_conns.insert(std::make_pair(_next_id, conn));
}
移除这个链接,只需要从_conns这个unordered_map中移除即可
void RemoveConnectionInLoop(const PtrConnection &conn)
{
int id = conn->Id();
auto it = _conns.find(id);
if (it != _conns.end())
{
_conns.erase(it);
}
}
把这个移除Connection的任务放到_baseloop中执行避免了线程安全问题
// 从管理Connection的_conns中移除连接信息
void RemoveConnection(const PtrConnection &conn)
{
_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));
}
下面是这个类的完整代码
class TcpServer
{
private:
uint64_t _next_id; // 这是一个自动增长的连接ID
int _timeout; // 这是非活跃链接的统计时间—多长时间无通信就是非活跃连接
int _port;
bool _enable_inactive_release; // 是否启动了非活跃连接超时销毁的判断标志
Acceptor _acceptor; // 这是监听套接字的管理对象
EventLoop _baseloop; // 这是主线程的EventLoop对象,负责监听事件的处理
LoopThreadPool _pool; // 这是我们的从属EventLoop线程池
std::unordered_map<uint64_t, PtrConnection> _conns; // 保存管理所有连接对应的shared_ptr对象
using ConnectedCallback = std::function<void(const PtrConnection &)>;
using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;
using ClosedCallback = std::function<void(const PtrConnection &)>;
using AnyEventCallback = std::function<void(const PtrConnection &)>;
using Functor = std::function<void()>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
private:
void RunAfterInLoop(const Functor &task, int delay)
{
_next_id++;
_baseloop.TimerAdd(_next_id, delay, task);
}
// 为新连接构造Connection进行管理
void NewConnection(int fd)
{
// DBG_LOG("NEWCONNECTION FUNCTION");
_next_id++;
PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));
conn->SetMessageCallback(_message_callback); // 为通行套接字设置可读事件的回调函数
conn->SetClosedCallback(_closed_callback); // 关闭事件的回调函数
conn->SetConnectedCallback(_connected_callback); // 错误事件的回调函数
conn->SetAnyEventCallback(_event_callback);
conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
if (_enable_inactive_release)
conn->EnableInactiveRelease(_timeout); // 启动非活跃超时销毁
conn->Established(); // 就绪初始化
_conns.insert(std::make_pair(_next_id, conn));
}
void RemoveConnectionInLoop(const PtrConnection &conn)
{
int id = conn->Id();
auto it = _conns.find(id);
if (it != _conns.end())
{
_conns.erase(it);
}
}
// 从管理Connection的_conns中移除连接信息
void RemoveConnection(const PtrConnection &conn)
{
_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));
}
public:
TcpServer(int port)
: _port(port),
_next_id(0),
_enable_inactive_release(false),
_acceptor(&_baseloop, port),
_pool(&_baseloop)
{
_acceptor.SetAcceptCallBack(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
_acceptor.Listen(); // 开始监听,挂到_baseloop上面去
}
void SetThreadCount(int count) // 设置线程池的数量
{
return _pool.SetThreadCount(count);
}
void SetConnectedCallback(const ConnectedCallback &cb)
{
_connected_callback = cb;
}
void SetMessageCallback(const MessageCallback &cb)
{
_message_callback = cb;
}
void SetClosedCallback(const ClosedCallback &cb)
{
_closed_callback = cb;
}
void SetAnyEventCallback(const AnyEventCallback &cb)
{
_event_callback = cb;
}
void EnableInactiveRelease(int timeout)
{
_timeout = timeout;
_enable_inactive_release = true;
}
// 多少秒之后执行一个任务,用于添加一个定时任务
void RunAfter(const Functor &task, int delay)
{
_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));
}
// 需要注意的是,_pool.Create();这个调用函数只能发在start函数之中,
// 不能放在类的初始化函数中,否则会造成越界访问
void Start()
{
_pool.Create(); // 创建线程池中的从属线程
_baseloop.Start(); // 开始去处理事件了
}
};
3.HTTP协议支持模块实现
3.1 Util实用工具类实现
首先我们要写一个字符串分割函数出来,这个函数有三个参数,分别是需要被分割的字符串,被分割的目标字符串,分割后的字符串放入的位置
首先创建一个变量offset表示从0位置开始,然后进入循环,我们使用string里面提供的find函数分别传入需要查找的子串和从哪里开始查找offset,并且接受返回值。当返回值是std::string::npos,表示没有找到。如果pos == offset,说明找到了,但是两个位置之间不存在子串,直接让offset跳过查找的字符串,跳过本次循环。如果pos和offset不相等,就直接把pos与offset之间的子串传入到arry中。经过循环之后,最后返回arry的大小,即子串的数量
static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string> *arry)
{
size_t offset = 0;
while (offset < src.size())
{
size_t pos = src.find(sep, offset); // 在src字符串偏移量offset处,开始向后查找sep字符/子串,返回查找到的位置
if (pos == std::string::npos)
{
// 没有找到特定字符
if (pos == src.size())
break;
arry->push_back(src.substr(offset));
return arry->size();
}
if (pos == offset)
{
offset = pos + sep.size();
continue;
} // 当前子串是空的,没有内容
arry->push_back(src.substr(offset, pos – offset));
offset = pos + sep.size();
}
return arry->size();
}
然后就是读取文件中的所有内容,将读取到的内容放到buffer中
使用std::ifstream ,传入文件的名字并且以二进制的方式读取
将文件指针跳转到末尾之后,使用tellg函数来找到文件的大小
找到文件大小之后再次使用seekg把文件指针放到起始位置
把读取到的数据放入到buf当中,关闭打开的文件流
static bool ReadFile(const std::string &filename, std::string *buf)
{
std::ifstream ifs(filename, std::ios::binary);
if (ifs.is_open() == false)
{
ERR_LOG("OPEN %s FILE FAILED!!", filename.c_str());
return false;
}
size_t fsize = 0;
// 文件指针跳转到末尾
ifs.seekg(0, ifs.end);
fsize = ifs.tellg(); // 获取当前读写位置相对于起始位置的偏移量,从末尾偏移刚好就是文件大小
ifs.seekg(0, ifs.beg); // 跳转到起始位置
buf->resize(fsize); // 开辟文件大小的空间
ifs.read(&(*buf)[0], fsize);
if (ifs.good() == false)
{
// 读取失败
ERR_LOG("READ %s FILE FAILED!!", filename.c_str());
ifs.close();
return false;
}
ifs.close();
return true;
}
向文件中写入数据
首先需要打开文件,以二进制和截断方式读取,然后使用write函数将buf中的内容写到这个文件当中,最后关闭文件流
static bool WriteFile(const std::string &filename, const std::string &buf)
{
std::ofstream ofs(filename, std::ios::binary | std::ios::trunc); // trunc代表截断
if (ofs.is_open() == false)
{
ERR_LOG("READ %s FILE FAILED!!", filename.c_str());
return false;
}
ofs.write(buf.c_str(), buf.size());
if (ofs.good() == false)
{
ERR_LOG("WRITE %s FILE FAILED!", filename.c_str());
ofs.close();
return false;
}
ofs.close();
return true;
}
这里简单介绍一下URL编码,它的作用是避免URL中的资源路径与查询字符串中的特殊字符与HTTP请求中的特殊字符产生歧义。
编码格式:将特殊字符的ASCII值,转换为两个16进制字符
不编码的特殊字符 RFC3986文档规定.-_~字母,数字属于绝对不编码字符,编码格式%HH
W3C文档中规定查询字符串中的空格,需要被编码为 +,解码则是 + 转化为空格
比如:
- 原始字符串:Hello World!
- 编码后:Hello%20World%21
static std::string UrlEncode(const std::string url, bool convert_space_to_plus)
{
std::string res;
for (auto &c : url)
{
if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c))
{
res += c;
continue;
}
if (c == ' ' && convert_space_to_plus == true)
{
res += '+';
continue;
}
// 剩下的字符都是需要编码称为%HH的格式的
char tmp[4] = {0};
snprintf(tmp, 4, "%%%02X", c);
res += tmp;
}
return res;
}
将十六进制字符转换为对应的整数值
static char HEXTOI(char c)
{
if (c >= '0' && c <= '9')
{
// 数字字符
return c – '0';
}
else if (c >= 'a' && c <= 'z')
{
return c – 'a' + 10;
}
else if (c >= 'A' && c <= 'Z')
{
return c – 'A' + 10;
}
return -1;
}
有URL编码当然也有URL解码,当我们遇到了%,我们需要查看百分号后面的第一个数字和第二个数字,将第一个数字左移4位,相当于直接乘以16,然后加上第二个数字即可
static std::string UrlDecode(const std::string url, bool convert_space_to_space)
{
// 遇到%,则将紧随其后的两个字符,转化为数字,第一个数字左移四位,然后加上第二个数字
// eg:%2b = 2 * 2^4 + b(转化为十进制)
std::string res;
for (int i = 0; i < url.size(); i++)
{
if (url[i] == '+' && convert_space_to_space == true)
{
res += ' ';
continue;
}
if (url[i] == '%' && i + 2 < url.size())
{
char v1 = HEXTOI(url[i + 1]);
char v2 = HEXTOI(url[i + 2]);
char v = (v1 << 4) + v2;
res += v;
i += 2;
continue;
}
res += url[i];
}
return res;
}
我们也需要通过相应的状态码获取相应的状态描述信息
static std::string StatuDesc(int statu)
{
auto it = _statu_msg.find(statu);
if (it != _statu_msg.end())
{
return it->second;
}
return "Unknown";
}
下面将会列出状态码对应的信息
std::unordered_map<int, std::string> _statu_msg = {
{100, "Continue"},
{101, "Switching Protocol"},
{102, "Processing"},
{103, "Early Hints"},
{200, "OK"},
{201, "Created"},
{202, "Accepted"},
{203, "Non-Authoritative Information"},
{204, "No Content"},
{205, "Reset Content"},
{206, "Partial Content"},
{207, "Multi-Status"},
{208, "Already Reported"},
{226, "IM Used"},
{300, "Multiple Choice"},
{301, "Moved Permanently"},
{302, "Found"},
{303, "See Other"},
{304, "Not Modified"},
{305, "Use Proxy"},
{306, "unused"},
{307, "Temporary Redirect"},
{308, "Permanent Redirect"},
{400, "Bad Request"},
{401, "Unauthorized"},
{402, "Payment Required"},
{403, "Forbidden"},
{404, "Not Found"},
{405, "Method Not Allowed"},
{406, "Not Acceptable"},
{407, "Proxy Authentication Required"},
{408, "Request Timeout"},
{409, "Conflict"},
{410, "Gone"},
{411, "Length Required"},
{412, "Precondition Failed"},
{413, "Payload Too Large"},
{414, "URI Too Long"},
{415, "Unsupported Media Type"},
{416, "Range Not Satisfiable"},
{417, "Expectation Failed"},
{418, "I'm a teapot"},
{421, "Misdirected Request"},
{422, "Unprocessable Entity"},
{423, "Locked"},
{424, "Failed Dependency"},
{425, "Too Early"},
{426, "Upgrade Required"},
{428, "Precondition Required"},
{429, "Too Many Requests"},
{431, "Request Header Fields Too Large"},
{451, "Unavailable For Legal Reasons"},
{501, "Not Implemented"},
{502, "Bad Gateway"},
{503, "Service Unavailable"},
{504, "Gateway Timeout"},
{505, "HTTP Version Not Supported"},
{506, "Variant Also Negotiates"},
{507, "Insufficient Storage"},
{508, "Loop Detected"},
{510, "Not Extended"},
{511, "Network Authentication Required"}};
然后是根据文件后缀名获取mime,这里解释一下mime是什么,是指通过文件名的扩展名来确定该文件对应的mime类型,mime类型是一种标准化的文件类型标识方法,用于告诉客户端如歌处理或解析收到的文件
std::unordered_map<std::string, std::string> _mime_msg = {
{".aac", "audio/aac"},
{".abw", "application/x-abiword"},
{".arc", "application/x-freearc"},
{".avi", "video/x-msvideo"},
{".azw", "application/vnd.amazon.ebook"},
{".bin", "application/octet-stream"},
{".bmp", "image/bmp"},
{".bz", "application/x-bzip"},
{".bz2", "application/x-bzip2"},
{".csh", "application/x-csh"},
{".css", "text/css"},
{".csv", "text/csv"},
{".doc", "application/msword"},
{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},
{".eot", "application/vnd.ms-fontobject"},
{".epub", "application/epub+zip"},
{".gif", "image/gif"},
{".htm", "text/html"},
{".html", "text/html"},
{".ico", "image/vnd.microsoft.icon"},
{".ics", "text/calendar"},
{".jar", "application/java-archive"},
{".jpeg", "image/jpeg"},
{".jpg", "image/jpeg"},
{".js", "text/javascript"},
{".json", "application/json"},
{".jsonld", "application/ld+json"},
{".mid", "audio/midi"},
{".midi", "audio/x-midi"},
{".mjs", "text/javascript"},
{".mp3", "audio/mpeg"},
{".mpeg", "video/mpeg"},
{".mpkg", "application/vnd.apple.installer+xml"},
{".odp", "application/vnd.oasis.opendocument.presentation"},
{".ods", "application/vnd.oasis.opendocument.spreadsheet"},
{".odt", "application/vnd.oasis.opendocument.text"},
{".oga", "audio/ogg"},
{".ogv", "video/ogg"},
{".ogx", "application/ogg"},
{".otf", "font/otf"},
{".png", "image/png"},
{".pdf", "application/pdf"},
{".ppt", "application/vnd.ms-powerpoint"},
{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},
{".rar", "application/x-rar-compressed"},
{".rtf", "application/rtf"},
{".sh", "application/x-sh"},
{".svg", "image/svg+xml"},
{".swf", "application/x-shockwave-flash"},
{".tar", "application/x-tar"},
{".tif", "image/tiff"},
{".tiff", "image/tiff"},
{".ttf", "font/ttf"},
{".txt", "text/plain"},
{".vsd", "application/vnd.visio"},
{".wav", "audio/wav"},
{".weba", "audio/webm"},
{".webm", "video/webm"},
{".webp", "image/webp"},
{".woff", "font/woff"},
{".woff2", "font/woff2"},
{".xhtml", "application/xhtml+xml"},
{".xls", "application/vnd.ms-excel"},
{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},
{".xml", "application/xml"},
{".xul", "application/vnd.mozilla.xul+xml"},
{".zip", "application/zip"},
{".3gp", "video/3gpp"},
{".3g2", "video/3gpp2"},
{".7z", "application/x-7z-compressed"}};
根据文件后缀名获取文件mime的代码如下
static std::string ExtMime(const std::string &filename)
{
// a.b.txt
size_t pos = filename.find_last_of('.');
if (pos == std::string::npos)
{
return "application/octet-stream";
}
std::string ext = filename.substr(pos);
auto it = _mime_msg.find(ext);
if (it == _mime_msg.end())
{
return "application/octet-stream";
}
return it->second;
}
判断一个文件是否是一个普通文件
这里需要使用到系统调用stat来获取文件的状态信息,并且需要通过S_ISREG宏来判断文件是否为普通文件
static bool IsRegular(const std::string &filename)
{
struct stat st;
int ret = stat(filename.c_str(), &st);
if (ret < 0)
{
return false;
}
return S_ISREG(st.st_mode);
}
检查http请求的资源路径的有效性判断
static bool ValidPath(const std::string &path)
{
std::vector<std::string> subdir;
Split(path, "/", &subdir);
int level = 0;
for (auto &dir : subdir)
{
if (dir == "..")
{
level–;
if (level < 0)
{
return false;
}
continue;
}
level++;
}
return true;
}
下面是Util类的完整代码
class Util
{
public:
// 字符串分割函数,分割后的子串放到vector里面,最终返回子串的数量
static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string> *arry)
{
size_t offset = 0;
while (offset < src.size())
{
size_t pos = src.find(sep, offset); // 在src字符串偏移量offset处,开始向后查找sep字符/子串,返回查找到的位置
if (pos == std::string::npos)
{
// 没有找到特定字符
if (pos == src.size())
break;
arry->push_back(src.substr(offset));
return arry->size();
}
if (pos == offset)
{
offset = pos + sep.size();
continue;
} // 当前子串是空的,没有内容
arry->push_back(src.substr(offset, pos – offset));
offset = pos + sep.size();
}
return arry->size();
}
// 读取文件所有内容,将读取的内容放到buffer中
static bool ReadFile(const std::string &filename, std::string *buf)
{
std::ifstream ifs(filename, std::ios::binary);
if (ifs.is_open() == false)
{
ERR_LOG("OPEN %s FILE FAILED!!", filename.c_str());
return false;
}
size_t fsize = 0;
// 文件指针跳转到末尾
ifs.seekg(0, ifs.end);
fsize = ifs.tellg(); // 获取当前读写位置相对于起始位置的偏移量,从末尾偏移刚好就是文件大小
ifs.seekg(0, ifs.beg); // 跳转到起始位置
buf->resize(fsize); // 开辟文件大小的空间
ifs.read(&(*buf)[0], fsize);
if (ifs.good() == false)
{
// 读取失败
ERR_LOG("READ %s FILE FAILED!!", filename.c_str());
ifs.close();
return false;
}
ifs.close();
return true;
}
// 向文件写入数据
static bool WriteFile(const std::string &filename, const std::string &buf)
{
std::ofstream ofs(filename, std::ios::binary | std::ios::trunc); // trunc代表截断
if (ofs.is_open() == false)
{
ERR_LOG("READ %s FILE FAILED!!", filename.c_str());
return false;
}
ofs.write(buf.c_str(), buf.size());
if (ofs.good() == false)
{
ERR_LOG("WRITE %s FILE FAILED!", filename.c_str());
ofs.close();
return false;
}
ofs.close();
return true;
}
// URL编码,避免URL中资源路径与查询字符串中的特殊字符与HTTP请求中特殊字符产生歧义
// 编码格式:将特殊字符的ASCII值,转换为两个16进制字符,C++ -> C%2B%2B
// 不编码的特殊字符 RFC3986文档规定.-_~字母,数字属于绝对不编码字符,编码格式%HH
// W3C文档中规定查询字符串中的空格,需要被编码为 +,解码则是 + 转化为空格
static std::string UrlEncode(const std::string url, bool convert_space_to_plus)
{
std::string res;
for (auto &c : url)
{
if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c))
{
res += c;
continue;
}
if (c == ' ' && convert_space_to_plus == true)
{
res += '+';
continue;
}
// 剩下的字符都是需要编码称为%HH的格式的
char tmp[4] = {0};
snprintf(tmp, 4, "%%%02X", c);
res += tmp;
}
return res;
}
static char HEXTOI(char c)
{
if (c >= '0' && c <= '9')
{
// 数字字符
return c – '0';
}
else if (c >= 'a' && c <= 'z')
{
return c – 'a' + 10;
}
else if (c >= 'A' && c <= 'Z')
{
return c – 'A' + 10;
}
return -1;
}
// URL解码
static std::string UrlDecode(const std::string url, bool convert_space_to_space)
{
// 遇到%,则将紧随其后的两个字符,转化为数字,第一个数字左移四位,然后加上第二个数字
// eg:%2b = 2 * 2^4 + b(转化为十进制)
std::string res;
for (int i = 0; i < url.size(); i++)
{
if (url[i] == '+' && convert_space_to_space == true)
{
res += ' ';
continue;
}
if (url[i] == '%' && i + 2 < url.size())
{
char v1 = HEXTOI(url[i + 1]);
char v2 = HEXTOI(url[i + 2]);
char v = (v1 << 4) + v2;
res += v;
i += 2;
continue;
}
res += url[i];
}
return res;
}
// 相应状态码的描述信息获取
static std::string StatuDesc(int statu)
{
auto it = _statu_msg.find(statu);
if (it != _statu_msg.end())
{
return it->second;
}
return "Unknown";
}
// 根据文件后缀名获取文件mime
static std::string ExtMime(const std::string &filename)
{
// a.b.txt
size_t pos = filename.find_last_of('.');
if (pos == std::string::npos)
{
return "application/octet-stream";
}
std::string ext = filename.substr(pos);
auto it = _mime_msg.find(ext);
if (it == _mime_msg.end())
{
return "application/octet-stream";
}
return it->second;
}
// 判断一个文件是否是一个目录
static bool IsDirectory(const std::string &filename)
{
struct stat st;
int ret = stat(filename.c_str(), &st);
if (ret < 0)
{
return false;
}
return S_ISDIR(st.st_mode);
}
// 判断一个文件是否是一个普通文件
static bool IsRegular(const std::string &filename)
{
struct stat st;
int ret = stat(filename.c_str(), &st);
if (ret < 0)
{
return false;
}
return S_ISREG(st.st_mode);
}
// http请求的资源路径有效性判断
// /index.html — 前面的/也叫做相对目录 映射的是某个服务器上的子目录
// 想表达的意思就是,客户端只能请求相对根目录中的资源,其他地方就不予理会
// /../login.html这个路径是在相对根目录之外,不合理
static bool ValidPath(const std::string &path)
{
std::vector<std::string> subdir;
Split(path, "/", &subdir);
int level = 0;
for (auto &dir : subdir)
{
if (dir == "..")
{
level–;
if (level < 0)
{
return false;
}
continue;
}
level++;
}
return true;
}
};
3.2 HttpRequest 类的实现
这个模块是HTTP请求数据模块,用于保存HTTP请求数据被解析后的各项请求元素信息
先给出一个HTTP请求的例子
GET /index.html HTTP/1.1
Host: example.com
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
根据上面的请求,我们可以设计出如下的成员变量
std::string _method; // 请求方法
std::string _path; // 资源路径
std::string _version; // 协议版本
std::string _body; // 请求正文
std::smatch _matches; // 资源路径的正则提取数据
std::unordered_map<std::string, std::string> _headers; // 头部字段
std::unordered_map<std::string, std::string> _params; // 查询字符串通常指的是 URL 中的查询参数
然后是对其中的成员函数的解释
关于对这个类的初始化,我们默认将同行协议版本设置为HTTP/1.1
HttpRequest()
: _version("HTTP/1.1")
{}
对类的成员变量进行重置的函数
void ReSet()
{
_method.clear();
_path.clear();
_version = "HTTP/1.1";
_body.clear();
std::smatch match;
_matches.swap(match);
_headers.clear();
_params.clear();
}
_headers是一个哈希表,我们设计一个函数SetHeader来插入头部字段
void SetHeader(std::string &key, std::string &val)
{
_headers.insert(std::make_pair(key, val));
}
HasHeader是用来判断是否存在指定的头部字段,实现它很简单,利用unordered_map中的find即可
bool HasHeader(const std::string &key) const
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return false;
}
return true;
}
Content-Length是头部字段的一种,表示正文长度,通过ContentLength函数来获取正文的长度
size_t ContentLength() const
{
bool ret = HasHeader("Content-Length");
if (ret == false)
{
return 0;
}
std::string clen = GetHeader("Content-Length");
return std::stol(clen);
}
下面是这个类的全部代码
class HttpRequest
{
public:
std::string _method; // 请求方法
std::string _path; // 资源路径
std::string _version; // 协议版本
std::string _body; // 请求正文
std::smatch _matches; // 资源路径的正则提取数据
std::unordered_map<std::string, std::string> _headers; // 头部字段
std::unordered_map<std::string, std::string> _params; // 查询字符串通常指的是 URL 中的查询参数
public:
HttpRequest()
: _version("HTTP/1.1")
{
}
void ReSet()
{
_method.clear();
_path.clear();
_version = "HTTP/1.1";
_body.clear();
std::smatch match;
_matches.swap(match);
_headers.clear();
_params.clear();
}
// 插入头部字段
void SetHeader(std::string &key, std::string &val)
{
_headers.insert(std::make_pair(key, val));
}
// 判断是否存在指定头部字段
bool HasHeader(const std::string &key) const
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return false;
}
return true;
}
// 获取指定头部字段的值
std::string GetHeader(const std::string &key) const
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return "";
}
return it->second;
}
// 插入查询字符串
void SetParam(const std::string &key, const std::string &val)
{
_params.insert(std::make_pair(key, val));
}
// 判断是否有某个指定的查询字符串
bool HasParam(const std::string &key) const
{
auto it = _params.find(key);
if (it == _params.end())
{
return false;
}
return true;
}
// 获取指定的查询字符串
std::string GetParam(const std::string &key) const
{
auto it = _params.find(key);
if (it == _params.end())
{
return "";
}
return it->second;
}
// 获取正文长度
size_t ContentLength() const
{
bool ret = HasHeader("Content-Length");
if (ret == false)
{
return 0;
}
std::string clen = GetHeader("Content-Length");
return std::stol(clen);
}
// 判断是否为短连接
bool Close() const
{
// 没有Connection字段或者有Connection但是值是false,则都是短连接,否则是长连接
// DBG_LOG("%d", HasHeader("Connection") == true);
// DBG_LOG("%d", GetHeader("Connection") == "keep-alive");
// DBG_LOG("[%s]", GetHeader("Connection").c_str());
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive")
{
return false;
}
return true;
}
};
3.3 HttpResponse类的实现
这个模块是HTTP响应数据模块,用于业务处理后设置并保存HTTp响应数据的各项元素信息,最终会被按照HTTP协议响应格式成为响应信息发送给客户端
下面是一个HTTP响应的例子
HTTP/1.1 200 OK
Date: Sun, 18 May 2025 05:37:15 GMT
Server: Apache/2.4.41 (Ubuntu)
Content-Type: text/html; charset=UTF-8
Content-Length: 1256
Connection: keep-alive
<!DOCTYPE html>
<html>
<head>
<title>Example Page</title>
</head>
<body>
<h1>Welcome to the Example Page</h1>
<p>This is a simple HTML page returned by the server.</p>
</body>
</html>
下面的成员变量就是用来保存上面示例报文的信息
int _statu;
bool _redirect_flag;
std::string _body;
std::string _redirect_url;
std::unordered_map<std::string, std::string> _headers;
这里有一个成员变量需要解释一下
_redirect_flag:这是一个表示是否重定向的变量,在Web开发中,HTTP重定向是指服务器返回一个特殊的响应(比如状态码3XX),表示指示客户端访问另一个URL
然后就是对其成员函数的解释
有时候我们需要重定向连接SetRedirect
void SetRedirect(const std::string &url, int statu = 302)
{
_statu = statu;
_redirect_flag = true;
_redirect_url = url;
}
然后就是查看这个链接是长连接还是短连接,我们只需要查看Connection字段,如果这个字段的value值是keep-alive表示是长连接
bool Close()
{
// 没有Connection字段或者有Connection但是值是false,则都是短连接,否则是长连接
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive")
{
return false;
}
return true;
}
下面是这个类的全部代码
class HttpResponse
{
public:
int _statu;
bool _redirect_flag;
std::string _body;
// 重定向
// 在Web开发中,HTTP 重定向是指服务器返回一个特殊的响应(状态码3xx),
// 指示客户端(如浏览器)访问另一个URL。
std::string _redirect_url;
std::unordered_map<std::string, std::string> _headers;
public:
HttpResponse()
: _redirect_flag(false),
_statu(200)
{
}
HttpResponse(int statu)
: _redirect_flag(false),
_statu(statu)
{
}
void ReSet()
{
_statu = 200;
_redirect_flag = false;
_body.clear();
_redirect_url.clear();
_headers.clear();
}
// 插入头部字段
void SetHeader(const std::string &key, const std::string &val)
{
_headers.insert(std::make_pair(key, val));
}
// 判断是否存在指定头部字段
bool HasHeader(const std::string &key)
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return false;
}
return true;
}
// 获取指定头部字段的值
std::string GetHeader(const std::string &key)
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return "";
}
return it->second;
}
void SetContent(const std::string &body, const std::string &type = "text/html")
{
_body = body;
SetHeader("Content-Type", type);
}
void SetRedirect(const std::string &url, int statu = 302)
{
_statu = statu;
_redirect_flag = true;
_redirect_url = url;
}
bool Close()
{
// 没有Connection字段或者有Connection但是值是false,则都是短连接,否则是长连接
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive")
{
return false;
}
return true;
}
};
3.4 HttpContext类的实现
这个模块是一个HTTP请求接受的上下文模块,主要是为了防止再一次接收的数据中,不是一个完整的HTTP请求,则解析过程并未完成,无法进行完整的请求处理,需要在下次接受到新数据后继续根据上下文进行解析,最终得到一个HttpRequest请求信息对象,因此在请求数据的接受以及解析部分需要一个上下文来进行控制接收和处理节奏
首先接收对应多种状态码,这里准备了五种
typedef enum
{
RECV_HTTP_ERROR,
RECV_HTTP_LINE,
RECV_HTTP_HEAD,
RECV_HTTP_BODY,
RECV_HTTP_OVER
} HttpRecvStatu;
然后就是这个类对应的成员变量
int _resp_statu; // 相应状态码
HttpRecvStatu _recv_statu; // 当前接收及解析的阶段状态
HttpRequest _request; // 已经解析得到的请求信息
下面是其对应的成员函数
我们需要对请求行进行解析,使用ParseHttpLine函数,传入的参数为string类型的请求行
bool ParseHttpLine(const std::string &line)
{
// std::cout << "string " << line << std::endl;
std::smatch matches;
std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\\\?(.*))? (HTTP/1\\\\.[01])(?:\\n|\\r\\n)?", std::regex::icase);
bool ret = std::regex_match(line, matches, e);
if (ret == false)
{
std::cout << line << std::endl;
// std::cout << "ParseHttpLine 1 error" << std::endl;
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400;
return false;
}
// 请求方法的获取
_request._method = matches[1];
std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);
// 资源路径的获取,需要进行URL解码操作,但是不需要+转空格
_request._path = Util::UrlDecode(matches[2], false);
// 协议版本的获取
_request._version = matches[4];
// 查询字符串的读取与处理
std::vector<std::string> query_string_arry;
std::string query_string = matches[3];
// 查询字符串的格式 key=value&key=value… ,先以&符号进行分割,得到各个子串
Util::Split(query_string, "&", &query_string_arry);
// 针对各个子串,以 = 符号进行分割,得到key 和 val,得到之后也需要进行URL解码
for (auto &str : query_string_arry)
{
size_t pos = str.find("=");
if (pos == std::string::npos)
{
// 没有找到
// std::cout << "ParseHttpLine 2 error" << std::endl;
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400;
return false;
}
std::string key = Util::UrlDecode(str.substr(0, pos), true);
std::string val = Util::UrlDecode(str.substr(pos + 1), true);
_request.SetParam(key, val);
}
return true;
}
接收请求行的逻辑较为简单,不多赘述
bool RecvHttpLine(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_LINE)
return false;
// 1.获取一行数据
std::string line = buf->GetLineAndPop();
// std::cout << line << " " << "RecvHttpLine" << std::endl;
// 2.需要考虑的一些要素:缓冲区中的数据不足一行,获取的一行数据超大
if (line.size() == 0)
{
// 缓冲区中的数据不足一行,则需要判断缓冲区中的可读数据长度,如果很长了都不足一行,这是有问题的
if (buf->ReadAbleSize() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
// 缓冲区中数据不足一行,但是也不多,就等等新数据的到来
return true;
}
if (line.size() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
bool ret = ParseHttpLine(line);
if (ret == false)
{
return false;
}
// buf->MoveReadOffset(line.size());
_recv_statu = RECV_HTTP_HEAD;
return true;
// return ParseHttpLine(line);
}
接受请求头,一行一行地取出key:value类型的数据,直到遇到空行停止
bool RecvHttpHead(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_HEAD)
return false;
// 一行一行取出数据,直到遇到空行为止
// 并且头部的格式为key:val\\r\\nkey:val\\r\\n
while (1)
{
// 1.获取一行数据
std::string line = buf->GetLineAndPop();
// std::cout << "RecvHttpHead " << line << std::endl;
// 2.需要考虑的一些要素:缓冲区中的数据不足一行,获取的一行数据超大
if (line.size() == 0)
{
// 缓冲区中的数据不足一行,则需要判断缓冲区中的可读数据长度,如果很长了都不足一行,这是有问题的
if (buf->ReadAbleSize() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
// 缓冲区中数据不足一行,但是也不多,就等等新数据的到来
return true;
}
if (line.size() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
if (line == "\\n" || line == "\\r\\n")
{
break;
}
bool ret = ParseHttpHead(line);
if (ret == false)
{
return false;
}
}
_recv_statu = RECV_HTTP_BODY;
return true;
}
解析请求头,把冒号加空格(: )作为分隔符去找其key和value值,并进行存储
bool ParseHttpHead(std::string &line)
{
if (line.back() == '\\n')
line.pop_back(); // 末尾是换行,则去掉
if (line.back() == '\\r')
line.pop_back(); // 有回车,则去掉
size_t pos = line.find(": ");
if (pos == std::string::npos)
{
// std::cout << "ParseHttpHead error" << std::endl;
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400;
return false;
}
std::string key = line.substr(0, pos);
std::string val = line.substr(pos + 2);
_request.SetHeader(key, val);
return true;
}
接收正文部分,主要分为三步
bool ParseHttpHead(std::string &line)
{
if (line.back() == '\\n')
line.pop_back(); // 末尾是换行,则去掉
if (line.back() == '\\r')
line.pop_back(); // 有回车,则去掉
size_t pos = line.find(": ");
if (pos == std::string::npos)
{
// std::cout << "ParseHttpHead error" << std::endl;
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400;
return false;
}
std::string key = line.substr(0, pos);
std::string val = line.substr(pos + 2);
_request.SetHeader(key, val);
return true;
}
bool RecvHttpBody(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_BODY)
return false;
// 1.获取正文长度
size_t content_length = _request.ContentLength();
if (content_length == 0)
{
// 没有正文,则请求接收请求完毕
_recv_statu = RECV_HTTP_OVER;
return true;
}
// 2.当前已经接收了多少正文,其实就是往_request._body中放了多少数据
size_t real_len = content_length – _request._body.size(); // 实际还需要接受的正文长度
// 3.接收正文放到body中,但是也要考虑当前缓冲区中的数据是否是全部的正文
// 3.1缓冲区中的数据,包含了当前请求的所有正文,取出所需的数据
if (buf->ReadAbleSize() >= real_len)
{
_request._body.append(buf->ReadPosition(), real_len);
buf->MoveReadOffset(real_len);
_recv_statu = RECV_HTTP_OVER;
return true;
}
// 3.2缓冲区中的数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来
_request._body.append(buf->ReadPosition(), buf->ReadAbleSize());
buf->MoveReadOffset(buf->ReadAbleSize());
return true;
}
接收并解析Http请求,我们使用RecvHttpRequest函数,然后使用switch关键字,通过状态码来判断做什么事情
void RecvHttpRequest(Buffer *buf)
{
// buf->TestPrint();
// 不同的状态,做不同的事情
// 不需要加入break,完成一步后直接进行下一步
switch (_recv_statu)
{
case RECV_HTTP_LINE:
RecvHttpLine(buf);
case RECV_HTTP_HEAD:
RecvHttpHead(buf);
case RECV_HTTP_BODY:
RecvHttpBody(buf);
}
return;
}
下面是全部代码
typedef enum
{
RECV_HTTP_ERROR,
RECV_HTTP_LINE,
RECV_HTTP_HEAD,
RECV_HTTP_BODY,
RECV_HTTP_OVER
} HttpRecvStatu;
#define MAX_LINE 8192 // 通常设置为8kb
class HttpContext
{
private:
int _resp_statu; // 相应状态码
HttpRecvStatu _recv_statu; // 当前接收及解析的阶段状态
HttpRequest _request; // 已经解析得到的请求信息
private:
// 解析请求行
bool ParseHttpLine(const std::string &line)
{
// std::cout << "string " << line << std::endl;
std::smatch matches;
std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\\\?(.*))? (HTTP/1\\\\.[01])(?:\\n|\\r\\n)?", std::regex::icase);
bool ret = std::regex_match(line, matches, e);
if (ret == false)
{
std::cout << line << std::endl;
// std::cout << "ParseHttpLine 1 error" << std::endl;
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400;
return false;
}
// 请求方法的获取
_request._method = matches[1];
std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);
// 资源路径的获取,需要进行URL解码操作,但是不需要+转空格
_request._path = Util::UrlDecode(matches[2], false);
// 协议版本的获取
_request._version = matches[4];
// 查询字符串的读取与处理
std::vector<std::string> query_string_arry;
std::string query_string = matches[3];
// 查询字符串的格式 key=value&key=value… ,先以&符号进行分割,得到各个子串
Util::Split(query_string, "&", &query_string_arry);
// 针对各个子串,以 = 符号进行分割,得到key 和 val,得到之后也需要进行URL解码
for (auto &str : query_string_arry)
{
size_t pos = str.find("=");
if (pos == std::string::npos)
{
// 没有找到
// std::cout << "ParseHttpLine 2 error" << std::endl;
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400;
return false;
}
std::string key = Util::UrlDecode(str.substr(0, pos), true);
std::string val = Util::UrlDecode(str.substr(pos + 1), true);
_request.SetParam(key, val);
}
return true;
}
// 接收请求行
bool RecvHttpLine(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_LINE)
return false;
// 1.获取一行数据
std::string line = buf->GetLineAndPop();
// std::cout << line << " " << "RecvHttpLine" << std::endl;
// 2.需要考虑的一些要素:缓冲区中的数据不足一行,获取的一行数据超大
if (line.size() == 0)
{
// 缓冲区中的数据不足一行,则需要判断缓冲区中的可读数据长度,如果很长了都不足一行,这是有问题的
if (buf->ReadAbleSize() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
// 缓冲区中数据不足一行,但是也不多,就等等新数据的到来
return true;
}
if (line.size() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
bool ret = ParseHttpLine(line);
if (ret == false)
{
return false;
}
// buf->MoveReadOffset(line.size());
_recv_statu = RECV_HTTP_HEAD;
return true;
// return ParseHttpLine(line);
}
bool RecvHttpHead(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_HEAD)
return false;
// 一行一行取出数据,直到遇到空行为止
// 并且头部的格式为key:val\\r\\nkey:val\\r\\n
while (1)
{
// 1.获取一行数据
std::string line = buf->GetLineAndPop();
// std::cout << "RecvHttpHead " << line << std::endl;
// 2.需要考虑的一些要素:缓冲区中的数据不足一行,获取的一行数据超大
if (line.size() == 0)
{
// 缓冲区中的数据不足一行,则需要判断缓冲区中的可读数据长度,如果很长了都不足一行,这是有问题的
if (buf->ReadAbleSize() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
// 缓冲区中数据不足一行,但是也不多,就等等新数据的到来
return true;
}
if (line.size() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
if (line == "\\n" || line == "\\r\\n")
{
break;
}
bool ret = ParseHttpHead(line);
if (ret == false)
{
return false;
}
}
_recv_statu = RECV_HTTP_BODY;
return true;
}
bool ParseHttpHead(std::string &line)
{
if (line.back() == '\\n')
line.pop_back(); // 末尾是换行,则去掉
if (line.back() == '\\r')
line.pop_back(); // 有回车,则去掉
size_t pos = line.find(": ");
if (pos == std::string::npos)
{
// std::cout << "ParseHttpHead error" << std::endl;
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400;
return false;
}
std::string key = line.substr(0, pos);
std::string val = line.substr(pos + 2);
_request.SetHeader(key, val);
return true;
}
bool RecvHttpBody(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_BODY)
return false;
// 1.获取正文长度
size_t content_length = _request.ContentLength();
if (content_length == 0)
{
// 没有正文,则请求接收请求完毕
_recv_statu = RECV_HTTP_OVER;
return true;
}
// 2.当前已经接收了多少正文,其实就是往_request._body中放了多少数据
size_t real_len = content_length – _request._body.size(); // 实际还需要接受的正文长度
// 3.接收正文放到body中,但是也要考虑当前缓冲区中的数据是否是全部的正文
// 3.1缓冲区中的数据,包含了当前请求的所有正文,取出所需的数据
if (buf->ReadAbleSize() >= real_len)
{
_request._body.append(buf->ReadPosition(), real_len);
buf->MoveReadOffset(real_len);
_recv_statu = RECV_HTTP_OVER;
return true;
}
// 3.2缓冲区中的数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来
_request._body.append(buf->ReadPosition(), buf->ReadAbleSize());
buf->MoveReadOffset(buf->ReadAbleSize());
return true;
}
public:
HttpContext()
: _resp_statu(200),
_recv_statu(RECV_HTTP_LINE)
{
}
void ReSet()
{
_resp_statu = 200;
_recv_statu = RECV_HTTP_LINE;
_request.ReSet();
}
int RespStatu()
{
return _resp_statu;
}
HttpRecvStatu RecvStatu()
{
return _recv_statu;
}
HttpRequest &Request()
{
return _request;
}
// 接收并解析Http请求
void RecvHttpRequest(Buffer *buf)
{
// buf->TestPrint();
// 不同的状态,做不同的事情
// 不需要加入break,完成一步后直接进行下一步
switch (_recv_statu)
{
case RECV_HTTP_LINE:
RecvHttpLine(buf);
case RECV_HTTP_HEAD:
RecvHttpHead(buf);
case RECV_HTTP_BODY:
RecvHttpBody(buf);
}
return;
}
};
3.5 HttpServer类的实现
- 这个模块时最终给组件使用者提供的HTTP服务器模块,用简单的接口实现HTTP服务器的搭建
- HttpServer模块内部包含有一个TcpServer对象:TcpServer对象实现服务器的搭建
- HttpServer模块内部包含有两个提供给TcpServer对象的接口:连接建立成功设置上下文接口,数据处理接口
- HttpServer模块内部包含有一个hash-map表存储请求于处理函数的映射表:组件使用者向HttpServer设置那些请求应该使用那些函数进行处理,等TcpServer收到对应的请求就会使用对应的函数进行实现
首先是其成员变量
using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;
using Handlers = std::vector<std::pair<std::regex, Handler>>;
Handlers _get_route;
Handlers _post_route;
Handlers _put_route;
Handlers _delete_route;
std::string _basedir; // 静态资源根目录
TcpServer _server;
然后就是其成员函数
关于对错误的处理ErrorHandler,我们使用简单的Html写一个页面放到string中,然后放入rsp即可
void ErrorHandler(const HttpRequest &req, HttpResponse *rsp)
{
// 1.组织一个错误展示页面
std::string body;
body += "<html>";
body += "<head>";
body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";
body += "</head>";
body += "<body>";
body += "<h1>";
body += std::to_string(rsp->_statu);
body += " ";
body += Util::StatuDesc(rsp->_statu);
body += "</h1>";
body += "</body>";
body += "</html>";
// 2.将页面数据,当做响应正文,放入rsp中
rsp->SetContent(body, "text/html");
}
将HttpResponse中的要素按照http协议格式进行组织发送
void WriteResponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp)
{
// 1.先完善头部字段
// req是const,调用的Close也必须是const类型
if (req.Close() == true)
{
rsp.SetHeader("Connection", "close");
}
else
{
rsp.SetHeader("Connection", "keep-alive");
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false)
{
rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false)
{
rsp.SetHeader("Content-Type", "application/octet-stream"); // 二进制流
}
if (rsp._redirect_flag == true)
{
rsp.SetHeader("Location", rsp._redirect_url);
}
// 2.将rsp的要素,按照http协议进行组织
std::stringstream rsp_str;
rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "\\r\\n";
for (auto &head : rsp._headers)
{
rsp_str << head.first << ":" << head.second << "\\r\\n";
}
rsp_str << "\\r\\n";
rsp_str << rsp._body;
// 3.发送数据
conn->Send(rsp_str.str().c_str(), rsp_str.str().size());
}
判断是否为静态资源,分为四个步骤
bool IsFileHandler(const HttpRequest &req)
{
// 1.必须设置了静态资源根目录
if (_basedir.empty())
{
return false;
}
// 2.请求方法必须是GET或者HEAD
if (req._method != "GET" && req._method != "HEAD")
{
return false;
}
// 3.请求的资源路径必须是一个合法路径
if (Util::ValidPath(req._path) == false)
{
return false;
}
// 4.请求的资源必须存在
// 直接请求根目录是比较特殊的,此时直接追加到首页
// 不要忘记前缀的相对根目录
std::string req_path = _basedir + req._path; // 避免直接修改请求的资源路径
if (req_path.back() == '/')
{
req_path += "index.html";
}
if (Util::IsRegular(req_path) == false)
{
return false;
}
// req._path = req_path; //如果请求就是静态资源请求,则有可能需要追加index.html
return true;
}
静态资源请求处理函数,将静态资源文件的数据读取出来,放到rsp的_body,并设置mime
void FileHandler(const HttpRequest &req, HttpResponse *rsp)
{
std::string req_path = _basedir + req._path; // 避免直接修改请求的资源路径
if (req_path.back() == '/')
{
req_path += "index.html";
}
bool ret = Util::ReadFile(req_path, &rsp->_body);
if (ret == false)
{
return;
}
std::string mime = Util::ExtMime(req_path);
rsp->SetHeader("Content-Type", mime);
return;
}
请求当然也有功能性请求
void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers)
{
// 在对应请求方法的路由表中,查找是哦否含有对应资源请求的处理函数,有则调用,没有则返回404
// 思想:路由表存储的是键值对 — 正则表达式 & 处理函数
// 使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就是有对应函数进行处理
for (auto &handler : handlers)
{
const std::regex &re = handler.first;
const Handler &functor = handler.second;
// std::cout << "进入了这里" << std::endl;
// std::cout << "req._path" << req._path << "req._matches" << req._matches[1] << std::endl;
// std::cout << "req._path" << req._path <<std::endl;
bool ret = std::regex_match(req._path, req._matches, re);
if (ret == false)
{
continue;
}
return functor(req, rsp); // 传入请求信息,和空的rsp执行处理函数
// std::cout << "到这里没有出错" << std::endl;
}
// std::cout << "改成404" << std::endl;
rsp->_statu = 404;
}
路由查找匹配函数,主要是用来进行查找时静态资源还是动态资源,然后执行对应的处理,如果都不是则返回404
// 路由查找匹配
void Route(HttpRequest &req, HttpResponse *rsp)
{
// 1.对请求资源进行分辨,是一个静态资源请求,还是一个功能性请求
// 静态资源请求,则进行静态资源处理
// 功能性请求,则需要通过几个请求路由表来确定是否有处理函数
// 既不是静态资源请求,也不是功能性处理请求,则返回404
if (IsFileHandler(req) == true)
{
// 是一个静态资源请求,则进行静态资源请求的处理
// std::cout << "进入了这里" << std::endl;
return FileHandler(req, rsp);
}
// HEAD与GET类似,只不过不要响应正文
// std::cout << "req._method" << req._method << std::endl;
if (req._method == "GET" || req._method == "HEAD")
{
// std::cout << "使用了GET调用函数添加处理" << std::endl;
return Dispatcher(req, rsp, _get_route);
}
else if (req._method == "POST")
{
return Dispatcher(req, rsp, _post_route);
}
else if (req._method == "PUT")
{
return Dispatcher(req, rsp, _put_route);
}
else if (req._method == "DELETE")
{
return Dispatcher(req, rsp, _delete_route);
}
rsp->_statu = 405; // METHOD NOT ALLOWED
}
设置上下文的函数OnConnected
void OnConnected(const PtrConnection &conn)
{
conn->SetContext(HttpContext());
DBG_LOG("NEW CONNECTION %p", conn.get());
}
// 缓冲区数据解析 + 处理
void OnMessage(const PtrConnection &conn, Buffer *buffer)
{
while (buffer->ReadAbleSize() > 0) // 有数据就继续处理
{
// 1.获取上下文
HttpContext *context = conn->GetContext()->get<HttpContext>();
// 2.通过上下文对缓冲区数据进行解析,得到HttpRequest对象
// 1.如果缓冲区的数据解析出错,就直接回复出错响应
// 2.如果解析正常,且请求已经获取完毕,才开始去进行处理
context->RecvHttpRequest(buffer);
HttpRequest &req = context->Request();
HttpResponse rsp(context->RespStatu());
if (context->RespStatu() >= 400)
{
// 出错了,关闭连接
ErrorHandler(req, &rsp); // 填充一个错误显示页面数据到rsp中
WriteResponse(conn, req, rsp); // 组织响应发送给客户端
context->ReSet(); //!!!重要代码行!!!如果不清空状态的话,他的状态码会一直在400多,导致不断给客户端发送出错信息
buffer->MoveReadOffset(buffer->ReadAbleSize()); //出错了把缓冲区清空不再处理
conn->Shutdown(); // 关闭连接
return;
}
if (context->RecvStatu() != RECV_HTTP_OVER)
{
// 当前请求还没有接受完整,则退出,等到新数据到来再重新处理
return;
}
// 3.请求路由 + 业务处理
Route(req, &rsp);
// 4.对HttpResponse进行组织发送
//DBG_LOG("%s", rsp._body.c_str());
WriteResponse(conn, req, rsp);
// 5.重置上下文
context->ReSet();
// 5.根据长短连接判断是否连接或者继续处理
if (rsp.Close() == true) // 短连接
{
conn->Shutdown();
}
}
return;
}
下面是这个类的完整代码
class HttpServer
{
public:
using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;
using Handlers = std::vector<std::pair<std::regex, Handler>>;
Handlers _get_route;
Handlers _post_route;
Handlers _put_route;
Handlers _delete_route;
std::string _basedir; // 静态资源根目录
TcpServer _server;
private:
void ErrorHandler(const HttpRequest &req, HttpResponse *rsp)
{
// 1.组织一个错误展示页面
std::string body;
body += "<html>";
body += "<head>";
body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";
body += "</head>";
body += "<body>";
body += "<h1>";
body += std::to_string(rsp->_statu);
body += " ";
body += Util::StatuDesc(rsp->_statu);
body += "</h1>";
body += "</body>";
body += "</html>";
// 2.将页面数据,当做响应正文,放入rsp中
rsp->SetContent(body, "text/html");
}
// 将HttpResponse中的要素按照http协议格式进行组织,发送
void WriteResponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp)
{
// 1.先完善头部字段
// req是const,调用的Close也必须是const类型
if (req.Close() == true)
{
rsp.SetHeader("Connection", "close");
}
else
{
rsp.SetHeader("Connection", "keep-alive");
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false)
{
rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false)
{
rsp.SetHeader("Content-Type", "application/octet-stream"); // 二进制流
}
if (rsp._redirect_flag == true)
{
rsp.SetHeader("Location", rsp._redirect_url);
}
// 2.将rsp的要素,按照http协议进行组织
std::stringstream rsp_str;
rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "\\r\\n";
for (auto &head : rsp._headers)
{
rsp_str << head.first << ":" << head.second << "\\r\\n";
}
rsp_str << "\\r\\n";
rsp_str << rsp._body;
// 3.发送数据
conn->Send(rsp_str.str().c_str(), rsp_str.str().size());
}
// 判断是否为静态资源
bool IsFileHandler(const HttpRequest &req)
{
// 1.必须设置了静态资源根目录
if (_basedir.empty())
{
return false;
}
// 2.请求方法必须是GET或者HEAD
if (req._method != "GET" && req._method != "HEAD")
{
return false;
}
// 3.请求的资源路径必须是一个合法路径
if (Util::ValidPath(req._path) == false)
{
return false;
}
// 4.请求的资源必须存在
// 直接请求根目录是比较特殊的,此时直接追加到首页
// 不要忘记前缀的相对根目录
std::string req_path = _basedir + req._path; // 避免直接修改请求的资源路径
if (req_path.back() == '/')
{
req_path += "index.html";
}
if (Util::IsRegular(req_path) == false)
{
return false;
}
// req._path = req_path; //如果请求就是静态资源请求,则有可能需要追加index.html
return true;
}
// 静态资源请求处理 — 将静态资源文件的数据读取出来,放到rsp的_body,并设置mime
void FileHandler(const HttpRequest &req, HttpResponse *rsp)
{
std::string req_path = _basedir + req._path; // 避免直接修改请求的资源路径
if (req_path.back() == '/')
{
req_path += "index.html";
}
bool ret = Util::ReadFile(req_path, &rsp->_body);
if (ret == false)
{
return;
}
std::string mime = Util::ExtMime(req_path);
rsp->SetHeader("Content-Type", mime);
return;
}
// 功能性请求的分类处理
void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers)
{
// 在对应请求方法的路由表中,查找是哦否含有对应资源请求的处理函数,有则调用,没有则返回404
// 思想:路由表存储的是键值对 — 正则表达式 & 处理函数
// 使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就是有对应函数进行处理
for (auto &handler : handlers)
{
const std::regex &re = handler.first;
const Handler &functor = handler.second;
// std::cout << "进入了这里" << std::endl;
// std::cout << "req._path" << req._path << "req._matches" << req._matches[1] << std::endl;
// std::cout << "req._path" << req._path <<std::endl;
bool ret = std::regex_match(req._path, req._matches, re);
if (ret == false)
{
continue;
}
return functor(req, rsp); // 传入请求信息,和空的rsp执行处理函数
// std::cout << "到这里没有出错" << std::endl;
}
// std::cout << "改成404" << std::endl;
rsp->_statu = 404;
}
// 路由查找匹配
void Route(HttpRequest &req, HttpResponse *rsp)
{
// 1.对请求资源进行分辨,是一个静态资源请求,还是一个功能性请求
// 静态资源请求,则进行静态资源处理
// 功能性请求,则需要通过几个请求路由表来确定是否有处理函数
// 既不是静态资源请求,也不是功能性处理请求,则返回404
if (IsFileHandler(req) == true)
{
// 是一个静态资源请求,则进行静态资源请求的处理
// std::cout << "进入了这里" << std::endl;
return FileHandler(req, rsp);
}
// HEAD与GET类似,只不过不要响应正文
// std::cout << "req._method" << req._method << std::endl;
if (req._method == "GET" || req._method == "HEAD")
{
// std::cout << "使用了GET调用函数添加处理" << std::endl;
return Dispatcher(req, rsp, _get_route);
}
else if (req._method == "POST")
{
return Dispatcher(req, rsp, _post_route);
}
else if (req._method == "PUT")
{
return Dispatcher(req, rsp, _put_route);
}
else if (req._method == "DELETE")
{
return Dispatcher(req, rsp, _delete_route);
}
rsp->_statu = 405; // METHOD NOT ALLOWED
}
// 设置上下文
void OnConnected(const PtrConnection &conn)
{
conn->SetContext(HttpContext());
DBG_LOG("NEW CONNECTION %p", conn.get());
}
// 缓冲区数据解析 + 处理
void OnMessage(const PtrConnection &conn, Buffer *buffer)
{
while (buffer->ReadAbleSize() > 0) // 有数据就继续处理
{
// 1.获取上下文
HttpContext *context = conn->GetContext()->get<HttpContext>();
// 2.通过上下文对缓冲区数据进行解析,得到HttpRequest对象
// 1.如果缓冲区的数据解析出错,就直接回复出错响应
// 2.如果解析正常,且请求已经获取完毕,才开始去进行处理
context->RecvHttpRequest(buffer);
HttpRequest &req = context->Request();
HttpResponse rsp(context->RespStatu());
if (context->RespStatu() >= 400)
{
// 出错了,关闭连接
ErrorHandler(req, &rsp); // 填充一个错误显示页面数据到rsp中
WriteResponse(conn, req, rsp); // 组织响应发送给客户端
context->ReSet(); //!!!重要代码行!!!如果不清空状态的话,他的状态码会一直在400多,导致不断给客户端发送出错信息
buffer->MoveReadOffset(buffer->ReadAbleSize()); //出错了把缓冲区清空不再处理
conn->Shutdown(); // 关闭连接
return;
}
if (context->RecvStatu() != RECV_HTTP_OVER)
{
// 当前请求还没有接受完整,则退出,等到新数据到来再重新处理
return;
}
// 3.请求路由 + 业务处理
Route(req, &rsp);
// 4.对HttpResponse进行组织发送
//DBG_LOG("%s", rsp._body.c_str());
WriteResponse(conn, req, rsp);
// 5.重置上下文
context->ReSet();
// 5.根据长短连接判断是否连接或者继续处理
if (rsp.Close() == true) // 短连接
{
conn->Shutdown();
}
}
return;
}
public:
HttpServer(int port, int timeout = DEFAULT_TIMEOUT)
: _server(port)
{
_server.EnableInactiveRelease(timeout);
_server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));
_server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
}
void SetBaseDir(const std::string &path)
{
int ret = Util::IsDirectory(path);
assert(ret == true);
_basedir = path;
}
/*设置/添加请求(请求的正则表达式)与处理函数的映射关系*/
void Get(const std::string &patten, const Handler &handler)
{
_get_route.push_back(std::make_pair(std::regex(patten), handler));
}
void Post(const std::string &patten, const Handler &handler)
{
_post_route.push_back(std::make_pair(std::regex(patten), handler));
}
void Put(const std::string &patten, const Handler &handler)
{
_put_route.push_back(std::make_pair(std::regex(patten), handler));
}
void Delete(const std::string &patten, const Handler &handler)
{
_delete_route.push_back(std::make_pair(std::regex(patten), handler));
}
void SetThreadCount(int count)
{
_server.SetThreadCount(count);
}
void Listen()
{
_server.Start();
}
};
4.结语
代码连接如下lesson41 · 张家兴/Linux – 码云 – 开源中国
有需要自取,博客不精准地方请多谅解
评论前必须登录!
注册