✨✨欢迎来到T_X_Parallel的博客!! 🛰️博客主页:T_X_Parallel 🛰️项目代码仓库:Reactor模型高并发服务器项目代码仓库(随博客更新) 🛰️专栏 : Reactor模型高并发服务器项目 🛰️欢迎关注:👍点赞🙌收藏✍️留言
文章目录
-
- 1. 前言
- 2. 模块分析
-
- LoopThread模块
- LoopThreadPool模块
- Acceptor模块
- TcpServer模块
- 3. 模块实现
-
- LoopThread模块
- LoopThreadPool模块
- Acceptor模块
- TcpServer模块
- 4. 模块测试
- 5. 模块代码
-
- LOG更新后代码
- LoopThread模块代码
- Acceptor模块代码
- TcpServer模块代码
项目环境:vscode、wsl2(Ubuntu22.04)
技术栈:C/C++、C++11、C++17、STL、HTTP、TCP、HTML
1. 前言
在博主仔细考虑后决定把tcpserver模块加到该篇博客后,因为除了Httpserver模块以外的剩下的三个模块实现还是比较简单,毕竟大多是对之前模块的调用封装之类的操作,所以一篇文章将这个服务器组件的基本组件搭出来,只是没有应用层协议支持而已,但是该项目的核心功能可以完成
在实现完这三个模块后,抛开协议支持不说,该项目基本完成,应用层协议的支持基本就是方便使用者
废话不多说,下面直接开始
2. 模块分析
LoopThread模块
该模块就是将一个EventLoop放进一个线程中,而这些线程中EventLoop对象可以说是之前讲到的从属Reactor,负责连接的各种事件事务的操作。而这个EventLoop的实例化应该在线程中执行,因为要让对象对应线程而不出错,所以不能在主Reactor或者说主线程中实例化EventLoop对象再给线程,这是不可取的,会出现一些问题。
该模块主要要实现的功能有创建一个线程,然后就获取线程中的EventLoop对象,这里获取的应该是指针,但是这里要注意,创建线程的时候给的函数里肯定就是EventLoop的实例化,那么线程创建的入口函数和获取EventLoop对象指针函数会存在线程安全问题,那么这里就需要加锁,同时,创建线程的入口函数的执行必须比获取早,不然获取为空,就会出错,这里需要使用条件变量来确保这个顺序,这个条件变量常常也和锁一起使用
所以该模块就四个成员,锁、条件变量、EventLoop对象指针以及线程
LoopThreadPool模块
该模块就是一个存储多个LoopThread的模块,当新连接来了之后直接给新连接分配一个LoopThread对象即可
该模块主要就三个功能,第一个设置线程数量,第二个创建线程池,第三个获取一个LoopThread对象
该模块也就几个成员,线程池中的线程数量、下个分配出去的线程对应的下标、基础EventLoop对象指针(该EventLoop对象即主Reactor,监控着监听套接字的读事件,即监听套接字对应的Loop)、存储所创建的LoopThread对象指针的数组以及存储线程对应的EventLoop对象指针的数组,一个下标对应一个线程和EventLoop,一个线程又对应EventLoop
Acceptor模块
该模块就是一个使用监听套接字接收新连接的接收者,而接收到新连接后的操作该模块并不关心,而是有一个回调函数,获得的新连接描述符fd传给这个回调函数处理即可,该模块主要就是利用eventloop对监听描述符进行事件循环监控,启动可读事件监控,而可读事件处理函数即获取新连接的一系列操作函数
该模块主要功能就四个,第一个设置给监听套接字对应的Channel对象中的可读事件处理回调函数的函数、第二个创建服务端监听套接字、第三个设置处理新连接描述符回调函数的功能、第四个就是启动监听即开启监控监听描述符可读事件
该模块主要成员也只有四个,第一个监听套接字,第二个对应的EventLoop对象指针,第三个对应的Channel对象,第四个处理新连接描述符的回调函数
TcpServer模块
该模块就是整合前面所有模块的头部模块,该模块就是实现一些设置给其他模块中回调函数的函数,比如Connection中的多个业务处理函数以及Acceptor中的新连接处理函数,还有就是一些对其他模块中接口的封装,比如添加定时任务、设置线程池的线程数量等
该模块主要功能有设置相应的业务处理回调函数、设置线程数量、添加定时任务、移除指定Connection智能指针、设置给Acceptor的新连接处理函数、服务器启动、启动超时连接释放功能
该模块主要成员有作为连接ID和定时任务ID的自增Id变量、监听端口、超时时间、超时连接释放标识符、监听套接字对应的Loop对象、一个Acceptor对象、一个Loop线程池、一个存储连接ID和Connection对象智能指针的对应关系的哈希表、和Connection中一样的四种事务处理回调函数
3. 模块实现
由于这几个模块比较简单,也比较好理解,所以下面内容比较简短,直接去看代码也是能看懂的
注:这里的日志功能应该改用异步日志打印,博主使用的成熟的spdlog库,但是不对该库进行讲解,可以直接cv使用,该内容不属于本项目内容,如感兴趣请自行搜索使用文档,直接下载博主代码仓库中的spdlog文件夹放入你的项目下,编译的时候添加指定头文件搜索路径即可
LoopThread模块
模块接口
公有接口
- 获取该线程对应的EventLoop对象指针
ns_eventloop::EventLoop *GetLoop();
私有接口
- 线程的入口函数,即对线程对应的EventLoop对象实例化
void ThreadEntry();
模块实现细节
上面已经分析过,这两个函数的执行是有先后顺序的,先实例化在获取,要实现该顺序只需使用锁和条件变量即可(使用方法看代码中的使用即可)
在实例化一个Loop时不需要使用new,而是直接创建一个对象,然后在该函数中直接启动监控,这样该函数不会执行完,也就不会释放该对象
LoopThreadPool模块
模块接口
公有接口
- 设置线程数量
- 初始化线程池
- 获取Loop
// 设置线程数量
void SetThreadCount(const int count);
// 初始化线程池
void CreateThreadPool();
// 获取Loop
ns_eventloop::EventLoop *GetNewLoop();
模块实现细节
获取新的线程的函数实现只需使用下标即可,而下标是一个自增并轮询的变量,这样比较简单
设置一个线程数组还有一个对应的loop指针数组的原因就是获取时不需要通过线程获取,线程中的获取接口使用锁和条件变量,直接通过下标找到对应的Loop即可
Acceptor模块
模块接口
公有接口
- 设置新连接处理回调函数的接口
- 开启监听套接字的监控
void SetAcceptCallBack(const AcceptCallBack &cb);
void Listen();
私有接口
- 监听套接字可读事件处理函数,即获取新连接描述符fd,并执行新连接处理回调函数
- 创建监听套接字即创建服务端
void HandleRead();
int CreateServer(const uint16_t port);
模块实现细节
该模块没什么细节,主要就是要先设置可读事件处理函数在启动监控,其他就是调用之前模块的接口,比较简单
TcpServer模块
模块接口
公有接口
- 设置线程数量
- 启动超时连接释放
- 设置连接、通信、关闭、任意事务处理函数
- 将添加定时任务操作放入任务队列中
- 启动服务器
// 设置线程数量
void SetThreadCount(int count);
// 启动超时连接释放
void EnableInactiveRelease(int sec);
// 设置连接事务处理函数
void SetConnectedCallBack(const ConnectedCallBack &cb);
// 设置通信事务处理函数
void SetMessageCallBack(const MessageCallBack &cb);
// 设置关闭事务处理函数
void SetClosedCallBack(const ClosedCallBack &cb);
// 设置任意事件事务处理函数
void SetAnyEventCallBack(const AnyEventCallBack &cb);
// 将添加定时任务操作放入任务队列中
void AddTimerTask(const Function &cb, int delay);
// 启动服务器
void Start();
私有接口
- 将移除Connection智能指针操作放入任务队列
- 移除Connection智能指针(设置给Connection中的服务端关闭事务处理函数)
- 添加定时任务
- 新连接处理函数(设置给Acceptor)
// 将移除Connection智能指针操作放入任务队列
void RemoveConnection(const SharePtrConnection &conn);
// 移除Connection智能指针(设置给Connection中的服务端关闭事务处理函数)
void _RemoveConnection(const SharePtrConnection &conn);
// 添加定时任务
void _AddTimerTask(const Function &cb, int delay);
// 新连接处理函数(设置给Acceptor)
void NewConnection(int fd);
模块实现细节
在设置监听套接字的可读事件处理回调函数、启动监听套接字可读事件监控、初始化线程池以及启动监听套接字监控这四个步骤顺序不能乱,建议设置监听套接字的可读事件处理回调函数、启动监听套接字可读事件监控放在TcpServer中的构造函数中,初始化线程池以及启动监听套接字监控放在服务器启动接口中
然后是新连接处理中,需要将多个回调函数以及属性设置完再调用conn->Established(),该接口中有启动可读事件监控,得先设置完各种回调函数之后再启动
其他就是一些简单的设置以及其他模块接口的调用,并没多少细节,直接看代码部分即可,非常好理解,如果有任何问题,在评论区评论即可
4. 模块测试
测试代码
// 服务端
#include "../code/tcpserver.hpp"
void HandleConnected(const ns_connection::SharePtrConnection &conn)
{
LOG(NORMAL, "New Connection: " + std::to_string((uintptr_t)(&conn)));
}
void HandleClose(const ns_connection::SharePtrConnection &conn)
{
LOG(NORMAL, "Close Connection: " + std::to_string((uintptr_t)(&conn)));
}
void HandleMessage(const ns_connection::SharePtrConnection &conn, ns_buffer::Buffer *buff)
{
std::string s = buff->Read();
LOG(NORMAL, s);
s = "xiaomi yu7";
conn->Send(s.c_str(), s.size());
}
int main()
{
ns_tcpserver::TcpServer tcpserver(8085);
tcpserver.SetThreadCount(2);
tcpserver.SetConnectedCallBack(HandleConnected);
tcpserver.SetMessageCallBack(HandleMessage);
tcpserver.SetClosedCallBack(HandleClose);
tcpserver.EnableInactiveRelease(10);
tcpserver.Start();
return 0;
}
// 客户端
int main()
{
ns_socket::Socket cli_socket;
cli_socket.CreateClient(8085, "127.0.0.1");
for (int i = 0; i < 10; i++)
{
std::string str = "xiaomi su7ultra";
cli_socket.Send(str.c_str(), str.size());
char buf[1024] = {0};
cli_socket.Recv(&buf, 1023);
LOG(NORMAL, buf);
sleep(1);
}
while (1)
sleep(1);
return 0;
}
可自行进去其他测试,比如运行多次客户端或者使用多进程
博主这里只运行了两个客户端进行测试
由于加入外部库spdlog,编译需要指出头文件搜索路径
g++ test_tcpserver_server.cc -o Server -std=c++17 -I../
最后的路径为你spdlog文件夹所放的相对位置
测试结果
// 服务端
./Server
[Level# info] [2025–04–05 22:23:10] [ThreadID# 238055] [File# ../code/tcpserver.hpp] [Line# 136] Message# SIGPIPE IGNORE
[Level# info] [2025–04–05 22:23:16] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 5] Message# New Connection: 139640637052880
[Level# info] [2025–04–05 22:23:16] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:17] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 5] Message# New Connection: 139640645445584
[Level# info] [2025–04–05 22:23:17] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:17] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:18] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:18] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:19] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:19] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:20] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:20] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:21] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:21] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:22] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:22] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:23] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:23] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:24] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:24] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:25] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:25] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:26] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [2025–04–05 22:23:35] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 10] Message# Close Connection: 139640637052880
[Level# info] [2025–04–05 22:23:36] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 10] Message# Close Connection: 139640645445584
// 客户端(运行两个进程)
./Client
[Level# info] [2025–04–05 22:23:16] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [2025–04–05 22:23:17] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [2025–04–05 22:23:18] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [2025–04–05 22:23:19] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [2025–04–05 22:23:20] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [2025–04–05 22:23:21] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [2025–04–05 22:23:22] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [2025–04–05 22:23:23] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [2025–04–05 22:23:24] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [2025–04–05 22:23:25] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
5. 模块代码
LOG更新后代码
#pragma once
#include <string>
#include <iostream>
#include <ctime>
#include <sstream>
#include <spdlog/spdlog.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/async.h>
#define LOG_TO_CONSOLE
#define LOG_LIST_COUNT 10000
#define LOG_THREAD_COUNT 20
#define NORMAL 1
#define DEBUG 2
#define WARNING 3
#define ERROR 4
#define FATAL 5
#define LOG_LEVEL NORMAL
#define LOG(LEVEL, MESSAGE) log(#LEVEL, MESSAGE, __FILE__, __LINE__)
void log(std::string level, std::string message, std::string file, int line)
{
// 获取当前时间
std::time_t t = std::time(nullptr);
char timeStr[100];
std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", std::localtime(&t));
// 获取线程 ID
std::ostringstream threadIdStream;
threadIdStream << std::this_thread::get_id();
std::string threadId = threadIdStream.str();
// 初始化异步日志线程池,队列大小为 8192,使用 4 个后台线程
static auto thread_pool = std::make_shared<spdlog::details::thread_pool>(LOG_LIST_COUNT, LOG_THREAD_COUNT);
// 初始化 spdlog
#ifdef LOG_TO_CONSOLE
// 创建异步控制台日志记录器
static auto async_logger = std::make_shared<spdlog::async_logger>(
"async_console_logger",
spdlog::sinks_init_list{std::make_shared<spdlog::sinks::stdout_color_sink_mt>()},
thread_pool,
spdlog::async_overflow_policy::block);
#else
// 创建异步文件日志记录器
static auto async_logger = std::make_shared<spdlog::async_logger>(
"async_file_logger",
spdlog::sinks_init_list{std::make_shared<spdlog::sinks::basic_file_sink_mt>("logs/async_logfile.log", true)},
thread_pool,
spdlog::async_overflow_policy::block);
#endif
// 设置日志格式,自动获取时间和线程 ID
async_logger->set_pattern("[Level# %l] [%Y-%m-%d %H:%M:%S] [ThreadID# %t] %v");
// 映射日志级别
spdlog::level::level_enum spdlogLevel;
switch (LOG_LEVEL)
{
case NORMAL:
if (level == "NORMAL")
{
spdlogLevel = spdlog::level::info;
break;
}
case DEBUG:
if (level == "DEBUG")
{
spdlogLevel = spdlog::level::debug;
break;
}
case WARNING:
if (level == "WARNING")
{
spdlogLevel = spdlog::level::warn;
break;
}
case ERROR:
if (level == "ERROR")
{
spdlogLevel = spdlog::level::err;
break;
}
case FATAL:
if (level == "FATAL")
spdlogLevel = spdlog::level::critical;
else
return;
}
async_logger->set_level(spdlogLevel);
async_logger->log(
spdlogLevel, // 日志级别
"[File# {}] [Line# {}] Message# {}", // 格式化字符串
file, // 文件名
line, // 行号
message // 消息内容
);
}
LoopThread模块代码
#pragma once
#include <thread>
#include <condition_variable>
#include <mutex>
#include "eventloop.hpp"
namespace ns_loopthread
{
class LoopThread
{
private:
std::mutex _mutex;
std::condition_variable _cond;
ns_eventloop::EventLoop *_loop;
std::thread _thread;
private:
// 实例化Loop
void ThreadEntry()
{
ns_eventloop::EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);
_loop = &loop;
_cond.notify_all();
}
loop.Start();
}
public:
LoopThread()
: _loop(NULL), _thread(&LoopThread::ThreadEntry, this)
{
}
// 获取实例化后的Loop对象指针
ns_eventloop::EventLoop *GetLoop()
{
ns_eventloop::EventLoop *loop = NULL;
{
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [&]()
{ return _loop != NULL; });
loop = _loop;
}
return loop;
}
};
} // namespace ns_loopthread
namespace ns_loopthreadpool
{
class LoopThreadPool
{
private:
int _thread_count;
int _next_index;
ns_eventloop::EventLoop *_base_loop;
std::vector<ns_loopthread::LoopThread *> _threadpool;
std::vector<ns_eventloop::EventLoop *> _loops;
public:
LoopThreadPool(ns_eventloop::EventLoop *loop)
: _thread_count(0), _next_index(0), _base_loop(loop)
{
}
// 设置线程数量
void SetThreadCount(const int count)
{
_thread_count = count;
}
// 初始化线程池
void CreateThreadPool()
{
if (_thread_count > 0)
{
_threadpool.resize(_thread_count);
_loops.resize(_thread_count);
for (int i = 0; i < _thread_count; i++)
{
_threadpool[i] = new ns_loopthread::LoopThread();
_loops[i] = _threadpool[i]->GetLoop();
}
}
}
// 获取Loop
ns_eventloop::EventLoop *GetNewLoop()
{
if (_thread_count == 0)
return _base_loop;
_next_index = (_next_index + 1) % _thread_count;
return _loops[_next_index];
}
};
} // namespace loopthreadpool
Acceptor模块代码
#pragma once
#include "eventloop.hpp"
#include "socket.hpp"
namespace ns_acceptor
{
typedef std::function<void(int)> AcceptCallBack;
class Acceptor
{
private:
ns_socket::Socket _ls_socket;
ns_eventloop::EventLoop *_loop;
ns_eventloop::Channel _channel;
AcceptCallBack _acceptcallback;
private:
// 监听套接字可读事件处理函数,即获取新连接描述符fd,并执行新连接处理回调函数
void HandleRead()
{
int newfd = _ls_socket.Accept();
if (newfd < 0)
{
LOG(ERROR, "New Connection Accept Failed");
return;
}
if (_acceptcallback)
_acceptcallback(newfd);
}
// 创建监听套接字即创建服务端
int CreateServer(const uint16_t port)
{
bool ret = _ls_socket.CreateServer(port);
if (ret == false)
{
LOG(ERROR, "Create Server Failed");
}
return _ls_socket.GetFd();
}
public:
Acceptor(uint16_t port, ns_eventloop::EventLoop *loop)
: _ls_socket(CreateServer(port)), _loop(loop), _channel(_ls_socket.GetFd(), _loop)
{
_channel.SetReadCallBack(std::bind(&Acceptor::HandleRead, this));
}
// 设置新连接处理回调函数的接口
void SetAcceptCallBack(const AcceptCallBack &cb)
{
_acceptcallback = cb;
}
// 开启监听套接字的监控
void Listen()
{
_channel.EnableRead();
}
};
} // namespace ns_acceptor
TcpServer模块代码
#pragma once
#include <signal.h>
#include "loopthread.hpp"
#include "connection.hpp"
#include "acceptor.hpp"
namespace ns_tcpserver
{
typedef std::shared_ptr<ns_connection::Connection> SharePtrConnection;
typedef std::function<void()> Function;
typedef std::function<void(const SharePtrConnection &)> ConnectedCallBack;
typedef std::function<void(const SharePtrConnection &, ns_buffer::Buffer *)> MessageCallBack;
typedef std::function<void(const SharePtrConnection &)> ClosedCallBack;
typedef std::function<void(const SharePtrConnection &)> AnyEventCallBack;
class TcpServer
{
private:
uint64_t _id;
int _port;
int _timeout;
bool _inactive_release_sign;
ns_eventloop::EventLoop _baseloop;
ns_acceptor::Acceptor _acceptor;
ns_loopthreadpool::LoopThreadPool _pool;
std::unordered_map<uint64_t, SharePtrConnection> _conns;
ConnectedCallBack _connected_callback;
MessageCallBack _message_callback;
ClosedCallBack _closed_callback;
AnyEventCallBack _anyevent_callback;
private:
// 将移除Connection智能指针操作放入任务队列
void RemoveConnection(const SharePtrConnection &conn)
{
_baseloop.RunInLoop(std::bind(&TcpServer::_RemoveConnection, this, conn));
}
// 移除Connection智能指针(设置给Connection中的服务端关闭事务处理函数)
void _RemoveConnection(const SharePtrConnection &conn)
{
uint64_t id = conn->GetConnId();
auto iter = _conns.find(id);
if (iter != _conns.end())
_conns.erase(iter);
}
// 添加定时任务
void _AddTimerTask(const Function &cb, int delay)
{
_baseloop.AddTimerTask(_id++, delay, cb);
}
// 新连接处理函数(设置给Acceptor)
void NewConnection(int fd)
{
ns_connection::SharePtrConnection conn(new ns_connection::Connection(_id, fd, _pool.GetNewLoop()));
conn->SetConnectedCallBack(_connected_callback);
conn->SetMessageCallBack(_message_callback);
conn->SetAnyEventCallBack(_anyevent_callback);
conn->SetClosedCallBack(_closed_callback);
conn->SetServerClosedCallBack(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
if (_inactive_release_sign)
conn->EnableInactiveRelease(_timeout);
conn->Established();
_conns.insert(std::make_pair(_id++, conn));
}
public:
TcpServer(int port)
: _port(port), _id(0), _inactive_release_sign(false), _acceptor(_port, &_baseloop), _pool(&_baseloop)
{
_acceptor.SetAcceptCallBack(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
_acceptor.Listen();
}
// 设置线程数量
void SetThreadCount(int count)
{
_pool.SetThreadCount(count);
}
// 启动超时连接释放
void EnableInactiveRelease(int sec)
{
_inactive_release_sign = true;
_timeout = sec;
}
// 设置连接事务处理函数
void SetConnectedCallBack(const ConnectedCallBack &cb)
{
_connected_callback = cb;
}
// 设置通信事务处理函数
void SetMessageCallBack(const MessageCallBack &cb)
{
_message_callback = cb;
}
// 设置关闭事务处理函数
void SetClosedCallBack(const ClosedCallBack &cb)
{
_closed_callback = cb;
}
// 设置任意事件事务处理函数
void SetAnyEventCallBack(const AnyEventCallBack &cb)
{
_anyevent_callback = cb;
}
// 将添加定时任务操作放入任务队列中
void AddTimerTask(const Function &cb, int delay)
{
_baseloop.RunInLoop(std::bind(&TcpServer::_AddTimerTask, this, cb, delay));
}
// 启动服务器
void Start()
{
_pool.CreateThreadPool();
_baseloop.Start();
}
};
class NetWork
{
public:
NetWork()
{
LOG(NORMAL, "SIGPIPE IGNORE");
signal(SIGPIPE, SIG_IGN);
}
};
static NetWork network;
} // namespace ns_tcpserver
下一篇博客就是对Http协议的支持模块了,主要就是对Http通信的上下文的解析等一些操作,只要学过网络基础编程应该都实现过类似的
专栏:Reactor模型高并发服务器项目 项目代码仓库:Reactor模型高并发服务器项目代码仓库(随博客更新) 都看到这里了,留下你们的珍贵的👍点赞+⭐收藏+📋评论吧
评论前必须登录!
注册