云计算百科
云计算领域专业知识百科平台

Reactor模型高并发服务器——05_多线程模块与接收连接模块与服务器模块

请添加图片描述

✨✨欢迎来到T_X_Parallel的博客!!     🛰️博客主页:T_X_Parallel     🛰️项目代码仓库:Reactor模型高并发服务器项目代码仓库(随博客更新)     🛰️专栏 : Reactor模型高并发服务器项目     🛰️欢迎关注:👍点赞🙌收藏✍️留言

文章目录

    • 1. 前言
    • 2. 模块分析
      • LoopThread模块
      • LoopThreadPool模块
      • Acceptor模块
      • TcpServer模块
    • 3. 模块实现
      • LoopThread模块
      • LoopThreadPool模块
      • Acceptor模块
      • TcpServer模块
    • 4. 模块测试
    • 5. 模块代码
      • LOG更新后代码
      • LoopThread模块代码
      • Acceptor模块代码
      • TcpServer模块代码

项目环境:vscode、wsl2(Ubuntu22.04)

技术栈:C/C++、C++11、C++17、STL、HTTP、TCP、HTML

1. 前言

在博主仔细考虑后决定把tcpserver模块加到该篇博客后,因为除了Httpserver模块以外的剩下的三个模块实现还是比较简单,毕竟大多是对之前模块的调用封装之类的操作,所以一篇文章将这个服务器组件的基本组件搭出来,只是没有应用层协议支持而已,但是该项目的核心功能可以完成

在实现完这三个模块后,抛开协议支持不说,该项目基本完成,应用层协议的支持基本就是方便使用者

废话不多说,下面直接开始


2. 模块分析

LoopThread模块

该模块就是将一个EventLoop放进一个线程中,而这些线程中EventLoop对象可以说是之前讲到的从属Reactor,负责连接的各种事件事务的操作。而这个EventLoop的实例化应该在线程中执行,因为要让对象对应线程而不出错,所以不能在主Reactor或者说主线程中实例化EventLoop对象再给线程,这是不可取的,会出现一些问题。

该模块主要要实现的功能有创建一个线程,然后就获取线程中的EventLoop对象,这里获取的应该是指针,但是这里要注意,创建线程的时候给的函数里肯定就是EventLoop的实例化,那么线程创建的入口函数和获取EventLoop对象指针函数会存在线程安全问题,那么这里就需要加锁,同时,创建线程的入口函数的执行必须比获取早,不然获取为空,就会出错,这里需要使用条件变量来确保这个顺序,这个条件变量常常也和锁一起使用

所以该模块就四个成员,锁、条件变量、EventLoop对象指针以及线程

LoopThreadPool模块

该模块就是一个存储多个LoopThread的模块,当新连接来了之后直接给新连接分配一个LoopThread对象即可

该模块主要就三个功能,第一个设置线程数量,第二个创建线程池,第三个获取一个LoopThread对象

该模块也就几个成员,线程池中的线程数量、下个分配出去的线程对应的下标、基础EventLoop对象指针(该EventLoop对象即主Reactor,监控着监听套接字的读事件,即监听套接字对应的Loop)、存储所创建的LoopThread对象指针的数组以及存储线程对应的EventLoop对象指针的数组,一个下标对应一个线程和EventLoop,一个线程又对应EventLoop

Acceptor模块

该模块就是一个使用监听套接字接收新连接的接收者,而接收到新连接后的操作该模块并不关心,而是有一个回调函数,获得的新连接描述符fd传给这个回调函数处理即可,该模块主要就是利用eventloop对监听描述符进行事件循环监控,启动可读事件监控,而可读事件处理函数即获取新连接的一系列操作函数

该模块主要功能就四个,第一个设置给监听套接字对应的Channel对象中的可读事件处理回调函数的函数、第二个创建服务端监听套接字、第三个设置处理新连接描述符回调函数的功能、第四个就是启动监听即开启监控监听描述符可读事件

该模块主要成员也只有四个,第一个监听套接字,第二个对应的EventLoop对象指针,第三个对应的Channel对象,第四个处理新连接描述符的回调函数

TcpServer模块

该模块就是整合前面所有模块的头部模块,该模块就是实现一些设置给其他模块中回调函数的函数,比如Connection中的多个业务处理函数以及Acceptor中的新连接处理函数,还有就是一些对其他模块中接口的封装,比如添加定时任务、设置线程池的线程数量等

该模块主要功能有设置相应的业务处理回调函数、设置线程数量、添加定时任务、移除指定Connection智能指针、设置给Acceptor的新连接处理函数、服务器启动、启动超时连接释放功能

该模块主要成员有作为连接ID和定时任务ID的自增Id变量、监听端口、超时时间、超时连接释放标识符、监听套接字对应的Loop对象、一个Acceptor对象、一个Loop线程池、一个存储连接ID和Connection对象智能指针的对应关系的哈希表、和Connection中一样的四种事务处理回调函数

在这里插入图片描述


3. 模块实现

由于这几个模块比较简单,也比较好理解,所以下面内容比较简短,直接去看代码也是能看懂的

注:这里的日志功能应该改用异步日志打印,博主使用的成熟的spdlog库,但是不对该库进行讲解,可以直接cv使用,该内容不属于本项目内容,如感兴趣请自行搜索使用文档,直接下载博主代码仓库中的spdlog文件夹放入你的项目下,编译的时候添加指定头文件搜索路径即可

LoopThread模块

模块接口

公有接口

  • 获取该线程对应的EventLoop对象指针

ns_eventloop::EventLoop *GetLoop();

私有接口

  • 线程的入口函数,即对线程对应的EventLoop对象实例化

void ThreadEntry();

模块实现细节

上面已经分析过,这两个函数的执行是有先后顺序的,先实例化在获取,要实现该顺序只需使用锁和条件变量即可(使用方法看代码中的使用即可)

在实例化一个Loop时不需要使用new,而是直接创建一个对象,然后在该函数中直接启动监控,这样该函数不会执行完,也就不会释放该对象

LoopThreadPool模块

模块接口

公有接口

  • 设置线程数量
  • 初始化线程池
  • 获取Loop

// 设置线程数量
void SetThreadCount(const int count);

// 初始化线程池
void CreateThreadPool();

// 获取Loop
ns_eventloop::EventLoop *GetNewLoop();

模块实现细节

获取新的线程的函数实现只需使用下标即可,而下标是一个自增并轮询的变量,这样比较简单

设置一个线程数组还有一个对应的loop指针数组的原因就是获取时不需要通过线程获取,线程中的获取接口使用锁和条件变量,直接通过下标找到对应的Loop即可

Acceptor模块

模块接口

公有接口

  • 设置新连接处理回调函数的接口
  • 开启监听套接字的监控

void SetAcceptCallBack(const AcceptCallBack &cb);

void Listen();

私有接口

  • 监听套接字可读事件处理函数,即获取新连接描述符fd,并执行新连接处理回调函数
  • 创建监听套接字即创建服务端

void HandleRead();

int CreateServer(const uint16_t port);

模块实现细节

该模块没什么细节,主要就是要先设置可读事件处理函数在启动监控,其他就是调用之前模块的接口,比较简单

TcpServer模块

模块接口

公有接口

  • 设置线程数量
  • 启动超时连接释放
  • 设置连接、通信、关闭、任意事务处理函数
  • 将添加定时任务操作放入任务队列中
  • 启动服务器

// 设置线程数量
void SetThreadCount(int count);

// 启动超时连接释放
void EnableInactiveRelease(int sec);

// 设置连接事务处理函数
void SetConnectedCallBack(const ConnectedCallBack &cb);
// 设置通信事务处理函数
void SetMessageCallBack(const MessageCallBack &cb);

// 设置关闭事务处理函数
void SetClosedCallBack(const ClosedCallBack &cb);

// 设置任意事件事务处理函数
void SetAnyEventCallBack(const AnyEventCallBack &cb);

// 将添加定时任务操作放入任务队列中
void AddTimerTask(const Function &cb, int delay);

// 启动服务器
void Start();

私有接口

  • 将移除Connection智能指针操作放入任务队列
  • 移除Connection智能指针(设置给Connection中的服务端关闭事务处理函数)
  • 添加定时任务
  • 新连接处理函数(设置给Acceptor)

// 将移除Connection智能指针操作放入任务队列
void RemoveConnection(const SharePtrConnection &conn);

// 移除Connection智能指针(设置给Connection中的服务端关闭事务处理函数)
void _RemoveConnection(const SharePtrConnection &conn);

// 添加定时任务
void _AddTimerTask(const Function &cb, int delay);

// 新连接处理函数(设置给Acceptor)
void NewConnection(int fd);

模块实现细节

在设置监听套接字的可读事件处理回调函数、启动监听套接字可读事件监控、初始化线程池以及启动监听套接字监控这四个步骤顺序不能乱,建议设置监听套接字的可读事件处理回调函数、启动监听套接字可读事件监控放在TcpServer中的构造函数中,初始化线程池以及启动监听套接字监控放在服务器启动接口中

然后是新连接处理中,需要将多个回调函数以及属性设置完再调用conn->Established(),该接口中有启动可读事件监控,得先设置完各种回调函数之后再启动

其他就是一些简单的设置以及其他模块接口的调用,并没多少细节,直接看代码部分即可,非常好理解,如果有任何问题,在评论区评论即可


4. 模块测试

测试代码

// 服务端
#include "../code/tcpserver.hpp"

void HandleConnected(const ns_connection::SharePtrConnection &conn)
{
LOG(NORMAL, "New Connection: " + std::to_string((uintptr_t)(&conn)));
}

void HandleClose(const ns_connection::SharePtrConnection &conn)
{
LOG(NORMAL, "Close Connection: " + std::to_string((uintptr_t)(&conn)));
}

void HandleMessage(const ns_connection::SharePtrConnection &conn, ns_buffer::Buffer *buff)
{
std::string s = buff->Read();
LOG(NORMAL, s);
s = "xiaomi yu7";
conn->Send(s.c_str(), s.size());
}

int main()
{
ns_tcpserver::TcpServer tcpserver(8085);
tcpserver.SetThreadCount(2);
tcpserver.SetConnectedCallBack(HandleConnected);
tcpserver.SetMessageCallBack(HandleMessage);
tcpserver.SetClosedCallBack(HandleClose);
tcpserver.EnableInactiveRelease(10);
tcpserver.Start();
return 0;
}

// 客户端
int main()
{
ns_socket::Socket cli_socket;
cli_socket.CreateClient(8085, "127.0.0.1");
for (int i = 0; i < 10; i++)
{
std::string str = "xiaomi su7ultra";
cli_socket.Send(str.c_str(), str.size());
char buf[1024] = {0};
cli_socket.Recv(&buf, 1023);
LOG(NORMAL, buf);
sleep(1);
}
while (1)
sleep(1);
return 0;
}

可自行进去其他测试,比如运行多次客户端或者使用多进程

博主这里只运行了两个客户端进行测试

由于加入外部库spdlog,编译需要指出头文件搜索路径

g++ test_tcpserver_server.cc -o Server -std=c++17 -I../

最后的路径为你spdlog文件夹所放的相对位置

测试结果

// 服务端
./Server
[Level# info] [20250405 22:23:10] [ThreadID# 238055] [File# ../code/tcpserver.hpp] [Line# 136] Message# SIGPIPE IGNORE
[Level# info] [20250405 22:23:16] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 5] Message# New Connection: 139640637052880
[Level# info] [20250405 22:23:16] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:17] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 5] Message# New Connection: 139640645445584
[Level# info] [20250405 22:23:17] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:17] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:18] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:18] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:19] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:19] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:20] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:20] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:21] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:21] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:22] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:22] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:23] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:23] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:24] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:24] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:25] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:25] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:26] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 16] Message# xiaomi su7ultra
[Level# info] [20250405 22:23:35] [ThreadID# 238077] [File# test_tcpserver_server.cc] [Line# 10] Message# Close Connection: 139640637052880
[Level# info] [20250405 22:23:36] [ThreadID# 238076] [File# test_tcpserver_server.cc] [Line# 10] Message# Close Connection: 139640645445584

// 客户端(运行两个进程)
./Client
[Level# info] [20250405 22:23:16] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [20250405 22:23:17] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [20250405 22:23:18] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [20250405 22:23:19] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [20250405 22:23:20] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [20250405 22:23:21] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [20250405 22:23:22] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [20250405 22:23:23] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [20250405 22:23:24] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7
[Level# info] [20250405 22:23:25] [ThreadID# 238180] [File# test_tcpserver_client.cc] [Line# 29] Message# xiaomi yu7


5. 模块代码

LOG更新后代码

#pragma once

#include <string>
#include <iostream>
#include <ctime>
#include <sstream>
#include <spdlog/spdlog.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/async.h>

#define LOG_TO_CONSOLE

#define LOG_LIST_COUNT 10000
#define LOG_THREAD_COUNT 20

#define NORMAL 1
#define DEBUG 2
#define WARNING 3
#define ERROR 4
#define FATAL 5

#define LOG_LEVEL NORMAL

#define LOG(LEVEL, MESSAGE) log(#LEVEL, MESSAGE, __FILE__, __LINE__)

void log(std::string level, std::string message, std::string file, int line)
{
// 获取当前时间
std::time_t t = std::time(nullptr);
char timeStr[100];
std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S", std::localtime(&t));

// 获取线程 ID
std::ostringstream threadIdStream;
threadIdStream << std::this_thread::get_id();
std::string threadId = threadIdStream.str();

// 初始化异步日志线程池,队列大小为 8192,使用 4 个后台线程
static auto thread_pool = std::make_shared<spdlog::details::thread_pool>(LOG_LIST_COUNT, LOG_THREAD_COUNT);

// 初始化 spdlog
#ifdef LOG_TO_CONSOLE
// 创建异步控制台日志记录器
static auto async_logger = std::make_shared<spdlog::async_logger>(
"async_console_logger",
spdlog::sinks_init_list{std::make_shared<spdlog::sinks::stdout_color_sink_mt>()},
thread_pool,
spdlog::async_overflow_policy::block);
#else
// 创建异步文件日志记录器
static auto async_logger = std::make_shared<spdlog::async_logger>(
"async_file_logger",
spdlog::sinks_init_list{std::make_shared<spdlog::sinks::basic_file_sink_mt>("logs/async_logfile.log", true)},
thread_pool,
spdlog::async_overflow_policy::block);
#endif

// 设置日志格式,自动获取时间和线程 ID
async_logger->set_pattern("[Level# %l] [%Y-%m-%d %H:%M:%S] [ThreadID# %t] %v");

// 映射日志级别
spdlog::level::level_enum spdlogLevel;

switch (LOG_LEVEL)
{
case NORMAL:
if (level == "NORMAL")
{
spdlogLevel = spdlog::level::info;
break;
}
case DEBUG:
if (level == "DEBUG")
{
spdlogLevel = spdlog::level::debug;
break;
}
case WARNING:
if (level == "WARNING")
{
spdlogLevel = spdlog::level::warn;
break;
}
case ERROR:
if (level == "ERROR")
{
spdlogLevel = spdlog::level::err;
break;
}
case FATAL:
if (level == "FATAL")
spdlogLevel = spdlog::level::critical;
else
return;
}

async_logger->set_level(spdlogLevel);
async_logger->log(
spdlogLevel, // 日志级别
"[File# {}] [Line# {}] Message# {}", // 格式化字符串
file, // 文件名
line, // 行号
message // 消息内容
);
}

LoopThread模块代码

#pragma once

#include <thread>
#include <condition_variable>
#include <mutex>
#include "eventloop.hpp"

namespace ns_loopthread
{
class LoopThread
{
private:
std::mutex _mutex;
std::condition_variable _cond;

ns_eventloop::EventLoop *_loop;
std::thread _thread;

private:
// 实例化Loop
void ThreadEntry()
{
ns_eventloop::EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);
_loop = &loop;
_cond.notify_all();
}
loop.Start();
}

public:
LoopThread()
: _loop(NULL), _thread(&LoopThread::ThreadEntry, this)
{
}

// 获取实例化后的Loop对象指针
ns_eventloop::EventLoop *GetLoop()
{
ns_eventloop::EventLoop *loop = NULL;
{
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [&]()
{ return _loop != NULL; });
loop = _loop;
}
return loop;
}
};
} // namespace ns_loopthread

namespace ns_loopthreadpool
{
class LoopThreadPool
{
private:
int _thread_count;
int _next_index;
ns_eventloop::EventLoop *_base_loop;
std::vector<ns_loopthread::LoopThread *> _threadpool;
std::vector<ns_eventloop::EventLoop *> _loops;

public:
LoopThreadPool(ns_eventloop::EventLoop *loop)
: _thread_count(0), _next_index(0), _base_loop(loop)
{
}

// 设置线程数量
void SetThreadCount(const int count)
{
_thread_count = count;
}

// 初始化线程池
void CreateThreadPool()
{
if (_thread_count > 0)
{
_threadpool.resize(_thread_count);
_loops.resize(_thread_count);
for (int i = 0; i < _thread_count; i++)
{
_threadpool[i] = new ns_loopthread::LoopThread();
_loops[i] = _threadpool[i]->GetLoop();
}
}
}

// 获取Loop
ns_eventloop::EventLoop *GetNewLoop()
{
if (_thread_count == 0)
return _base_loop;
_next_index = (_next_index + 1) % _thread_count;
return _loops[_next_index];
}
};
} // namespace loopthreadpool

Acceptor模块代码

#pragma once

#include "eventloop.hpp"
#include "socket.hpp"

namespace ns_acceptor
{
typedef std::function<void(int)> AcceptCallBack;

class Acceptor
{
private:
ns_socket::Socket _ls_socket;
ns_eventloop::EventLoop *_loop;
ns_eventloop::Channel _channel;

AcceptCallBack _acceptcallback;

private:
// 监听套接字可读事件处理函数,即获取新连接描述符fd,并执行新连接处理回调函数
void HandleRead()
{
int newfd = _ls_socket.Accept();
if (newfd < 0)
{
LOG(ERROR, "New Connection Accept Failed");
return;
}
if (_acceptcallback)
_acceptcallback(newfd);
}

// 创建监听套接字即创建服务端
int CreateServer(const uint16_t port)
{
bool ret = _ls_socket.CreateServer(port);
if (ret == false)
{
LOG(ERROR, "Create Server Failed");
}
return _ls_socket.GetFd();
}

public:
Acceptor(uint16_t port, ns_eventloop::EventLoop *loop)
: _ls_socket(CreateServer(port)), _loop(loop), _channel(_ls_socket.GetFd(), _loop)
{
_channel.SetReadCallBack(std::bind(&Acceptor::HandleRead, this));
}

// 设置新连接处理回调函数的接口
void SetAcceptCallBack(const AcceptCallBack &cb)
{
_acceptcallback = cb;
}

// 开启监听套接字的监控
void Listen()
{
_channel.EnableRead();
}
};
} // namespace ns_acceptor

TcpServer模块代码

#pragma once

#include <signal.h>
#include "loopthread.hpp"
#include "connection.hpp"
#include "acceptor.hpp"

namespace ns_tcpserver
{
typedef std::shared_ptr<ns_connection::Connection> SharePtrConnection;
typedef std::function<void()> Function;
typedef std::function<void(const SharePtrConnection &)> ConnectedCallBack;
typedef std::function<void(const SharePtrConnection &, ns_buffer::Buffer *)> MessageCallBack;
typedef std::function<void(const SharePtrConnection &)> ClosedCallBack;
typedef std::function<void(const SharePtrConnection &)> AnyEventCallBack;

class TcpServer
{
private:
uint64_t _id;
int _port;
int _timeout;
bool _inactive_release_sign;

ns_eventloop::EventLoop _baseloop;
ns_acceptor::Acceptor _acceptor;
ns_loopthreadpool::LoopThreadPool _pool;

std::unordered_map<uint64_t, SharePtrConnection> _conns;

ConnectedCallBack _connected_callback;
MessageCallBack _message_callback;
ClosedCallBack _closed_callback;
AnyEventCallBack _anyevent_callback;

private:
// 将移除Connection智能指针操作放入任务队列
void RemoveConnection(const SharePtrConnection &conn)
{
_baseloop.RunInLoop(std::bind(&TcpServer::_RemoveConnection, this, conn));
}

// 移除Connection智能指针(设置给Connection中的服务端关闭事务处理函数)
void _RemoveConnection(const SharePtrConnection &conn)
{
uint64_t id = conn->GetConnId();
auto iter = _conns.find(id);
if (iter != _conns.end())
_conns.erase(iter);
}

// 添加定时任务
void _AddTimerTask(const Function &cb, int delay)
{
_baseloop.AddTimerTask(_id++, delay, cb);
}

// 新连接处理函数(设置给Acceptor)
void NewConnection(int fd)
{
ns_connection::SharePtrConnection conn(new ns_connection::Connection(_id, fd, _pool.GetNewLoop()));
conn->SetConnectedCallBack(_connected_callback);
conn->SetMessageCallBack(_message_callback);
conn->SetAnyEventCallBack(_anyevent_callback);
conn->SetClosedCallBack(_closed_callback);
conn->SetServerClosedCallBack(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
if (_inactive_release_sign)
conn->EnableInactiveRelease(_timeout);
conn->Established();
_conns.insert(std::make_pair(_id++, conn));
}

public:
TcpServer(int port)
: _port(port), _id(0), _inactive_release_sign(false), _acceptor(_port, &_baseloop), _pool(&_baseloop)
{
_acceptor.SetAcceptCallBack(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
_acceptor.Listen();
}

// 设置线程数量
void SetThreadCount(int count)
{
_pool.SetThreadCount(count);
}

// 启动超时连接释放
void EnableInactiveRelease(int sec)
{
_inactive_release_sign = true;
_timeout = sec;
}

// 设置连接事务处理函数
void SetConnectedCallBack(const ConnectedCallBack &cb)
{
_connected_callback = cb;
}

// 设置通信事务处理函数
void SetMessageCallBack(const MessageCallBack &cb)
{
_message_callback = cb;
}

// 设置关闭事务处理函数
void SetClosedCallBack(const ClosedCallBack &cb)
{
_closed_callback = cb;
}

// 设置任意事件事务处理函数
void SetAnyEventCallBack(const AnyEventCallBack &cb)
{
_anyevent_callback = cb;
}

// 将添加定时任务操作放入任务队列中
void AddTimerTask(const Function &cb, int delay)
{
_baseloop.RunInLoop(std::bind(&TcpServer::_AddTimerTask, this, cb, delay));
}

// 启动服务器
void Start()
{
_pool.CreateThreadPool();
_baseloop.Start();
}
};
class NetWork
{
public:
NetWork()
{
LOG(NORMAL, "SIGPIPE IGNORE");
signal(SIGPIPE, SIG_IGN);
}
};
static NetWork network;
} // namespace ns_tcpserver


下一篇博客就是对Http协议的支持模块了,主要就是对Http通信的上下文的解析等一些操作,只要学过网络基础编程应该都实现过类似的


请添加图片描述

专栏:Reactor模型高并发服务器项目 项目代码仓库:Reactor模型高并发服务器项目代码仓库(随博客更新) 都看到这里了,留下你们的珍贵的👍点赞+⭐收藏+📋评论吧

赞(0)
未经允许不得转载:网硕互联帮助中心 » Reactor模型高并发服务器——05_多线程模块与接收连接模块与服务器模块
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!