小编个人主页详情<—请点击 小编个人gitee代码仓库<—请点击 linux系统编程专栏<—请点击 linux网络编程专栏<—请点击 倘若命中无此运,孤身亦可登昆仑,送给屏幕面前的读者朋友们和小编自己! 
目录
-
- 前言
- 一、前置知识
- 二、第一阶段,基本框架的实现
-
- Connection
- Main.cc
- TcpServer
-
- 测试
- 三、第二阶段,引入业务协议
-
- TcpServer
- Main.cc
- TcpServer
-
- 测试
- 四、拓展
- 五、写博客一年的总结
- 六、源代码
-
- ClientCal.cc
- Comm.hpp
- Epoller.hpp
- Log.hpp
- Main.cc
- makefile
- nocopy.hpp
- Protocol.hpp
- ServerCal.hpp
- Socket.hpp
- TcpServer.hpp
- 总结
前言
【linux】高级IO,I/O多路转接之epoll的两种工作模式:LT水平触发模式和ET边缘触发模式——书接上文 详情请点击<——,本文会在上文的基础上进行讲解,所以对上文不了解的读者友友请点击前方的蓝字链接进行学习 本文由小编为大家介绍——【linux】高级IO,以ET模式运行的epoll版本的TCP服务器实现reactor反应堆
一、前置知识

二、第一阶段,基本框架的实现
Connection
void Recver(int fd)
{
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) – 1);
if(n > 0)
{
buffer[n] = 0;
std::cout << "get a message: " << buffer << std::endl;
std::string echo_str = "server echo$ ";
echo_str += buffer;
write(fd, echo_str.c_str(), echo_str.size());
}
else if(n == 0)
{
lg(Info, "client quit, me to, close fd: %d", fd);
_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
else
{
lg(Warning, "recv error, fd: %d", fd);
_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
}

#include <iostream>
#include <memory>
#include <functional>
#include <string>
#include <unordered_map>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Socket.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
class Connection;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
static const int g_buffer_size = 128;
using func_t = std::function<void(std::shared_ptr<Connection>)>;
class Connection
{
public:
Connection(int sock, std::weak_ptr<TcpServer> tcp_server_ptr)
: _sock(sock), _tcp_server_ptr(tcp_server_ptr)
{}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
void AppendInBuffer(const std::string& info)
{
_inbuffer += info;
}
void AppendOutBuffer(const std::string& info)
{
_outbuffer += info;
}
std::string& InBuffer()
{
return _inbuffer;
}
std::string& OutBuffer()
{
return _outbuffer;
}
int SockFd()
{
return _sock;
}
~Connection()
{
if(_sock > 0)
{
close(_sock);
}
}
private:
int _sock;
std::string _inbuffer;
std::string _outbuffer;
public:
func_t _recv_cb;
func_t _send_cb;
func_t _except_cb;
std::weak_ptr<TcpServer> _tcp_server_ptr;
std::string _ip;
uint16_t _port;
};
Main.cc
#include <iostream>
#include <memory>
#include "TcpServer.hpp"
int main()
{
std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080));
epoll_svr->Init();
epoll_svr->Loop();
return 0;
}
TcpServer
class Epoller : public nocopy
{
public:
int EpollerWait(struct epoll_event revents[], int num, int timeout)
{
// int n = epoll_wait(_epfd, revents, num, _timeout);
// int n = epoll_wait(_epfd, revents, num, 0);
// int n = epoll_wait(_epfd, revents, num, -1);
int n = epoll_wait(_epfd, revents, num, timeout);
return n;
}
private:
int _epfd;
// int _timeout{3000};
};



class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
{
static const int num = 64;
public:
TcpServer(uint16_t port)
: _quit(true)
, _port(port)
, _listensock_ptr(new Sock())
, _epoller_ptr(new Epoller())
{}
void Init()
{}
void Dispatcher(int timeout)
{
}
void Loop()
{
_quit = false;
while(!_quit)
{
Dispatcher(–1);
PrintConnection();
}
_quit = true;
}
void PrintConnection()
{
std::cout << "_connections fd list: " << std::endl;
for(auto& connection : _connections)
{
std::cout << connection.first << ", ";
std::cout << "inbuffer: " << connection.second->InBuffer() << std::endl;
}
std::cout << std::endl;
}
~TcpServer()
{}
private:
bool _quit;
uint16_t _port;
std::shared_ptr<Sock> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
struct epoll_event _revs[num];
};

bool IsConnectionSafe(int fd)
{
// std::unordered_map<int, std::shared_ptr<Connection>>::iterator iter = _connections.find(fd);
auto iter = _connections.find(fd);
if(iter == _connections.end())
return false;
return true;
}
void Dispatcher(int timeout)
{
int n = _epoller_ptr->EpollerWait(_revs, num, timeout);
for(int i = 0; i < n; i++)
{
int sock = _revs[i].data.fd;
uint32_t events = _revs[i].events;
// 统一将事件异常转化为读写问题
if((events & EPOLLERR) | (events & EPOLLHUP))
events |= (EPOLLIN | EPOLLOUT);
// 这样可以简化逻辑,只需要处理读写问题
if((events & EPOLLIN) && IsConnectionSafe(sock))
{
if(_connections[sock]->_recv_cb)
_connections[sock]->_recv_cb(_connections[sock]);
}
if((events & EPOLLOUT) && IsConnectionSafe(sock))
{
if(_connections[sock]->_send_cb)
_connections[sock]->_send_cb(_connections[sock]);
}
}
}


#pragma once
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include "Socket.hpp"
void SetNonBlockOrDie(int sock)
{
int fl = fcntl(sock, F_GETFL);
if(fl < 0)
exit(NON_BLOCK_ERR);
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}
enum{
SocketErr = 1,
BindErr,
ListenErr,
NON_BLOCK_ERR
};

void AddConnection(int sock, uint32_t events, func_t recv_cb, func_t send_cb, func_t except_cb, \\
const std::string& ip = "0.0.0.0", uint16_t port = 0)
{
std::shared_ptr<Connection> new_connection(new Connection(sock, shared_from_this()));
new_connection->SetHandler(recv_cb, send_cb, except_cb);
new_connection->_ip = ip;
new_connection->_port = port;
_connections.insert(std::make_pair(sock, new_connection));
_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, events);
lg(Debug, "add a new connection success, sockfd: %d", sock);
}
void Init()
{
_listensock_ptr->Socket();
SetNonBlockOrDie(_listensock_ptr->Fd());
_listensock_ptr->Bind(_port);
_listensock_ptr->Listen();
lg(Info, "create listen socket success, listensock: %d", _listensock_ptr->Fd());
AddConnection(_listensock_ptr->Fd(), EVENT_IN, \\
std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
void Accepter(std::shared_ptr<Connection> connection)
{}
void Accepter(std::shared_ptr<Connection> connection)
{
while(true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = ::accept(connection->SockFd(), (struct sockaddr*)&peer, &len);
if(sock >= 0)
{
uint16_t port = ntohs(peer.sin_port);
char ip[128];
inet_ntop(AF_INET, &(peer.sin_addr), ip, sizeof(ip));
lg(Debug, "get a new client, get info -> [%s:%d], sockfd: %d", ip, port, sock);
SetNonBlockOrDie(sock);
AddConnection(sock, EVENT_IN, \\
std::bind(&TcpServer::Recver, this, std::placeholders::_1), \\
std::bind(&TcpServer::Sender, this, std::placeholders::_1), \\
std::bind(&TcpServer::Excepter, this, std::placeholders::_1), \\
ip, port);
}
else
{
if(errno == EWOULDBLOCK)
break;
else if(errno == EINTR)
continue;
else
break;
}
}
}
void Recver(std::shared_ptr<Connection> connection)
{}
void Sender(std::shared_ptr<Connection> connection)
{}
void Excepter(std::shared_ptr<Connection> connection)
{}
测试

三、第二阶段,引入业务协议
TcpServer

class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
{
static const int num = 64;
public:
TcpServer(uint16_t port, func_t OnMessage)
: _quit(true)
, _port(port)
, _listensock_ptr(new Sock())
, _epoller_ptr(new Epoller())
, _OnMessage(OnMessage)
{}
private:
bool _quit;
uint16_t _port;
std::shared_ptr<Sock> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
struct epoll_event _revs[num];
// 让上层处理信息
func_t _OnMessage;
};
Main.cc
#include <iostream>
#include <memory>
#include "Log.hpp"
#include "TcpServer.hpp"
#include "ServerCal.hpp"
ServerCal calculator;
void DefaultOnMessage(std::shared_ptr<Connection> connection)
{
std::cout << "上层得到了数据: " << connection->InBuffer() << std::endl;
std::string response_str = calculator.Calculator(connection->InBuffer());
if(response_str.empty())
return;
lg(Debug, "%s", response_str.c_str());
connection->AppendOutBuffer(response_str);
auto tcp_server_ptr = connection->_tcp_server_ptr.lock();
tcp_server_ptr->Sender(connection);
}
int main()
{
std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080, DefaultOnMessage));
epoll_svr->Init();
epoll_svr->Loop();
return 0;
}
TcpServer
void Recver(std::shared_ptr<Connection> connection)
{
int sock = connection->SockFd();
char buffer[g_buffer_size];
while(true)
{
memset(buffer, 0, sizeof(buffer));
ssize_t n = recv(sock, buffer, sizeof(buffer) – 1, 0);
if(n > 0)
{
// buffer[n] = 0; 由于memset已经置为了0,所以这里不需要这个操作了
connection->AppendInBuffer(buffer);
}
else if(n == 0)
{
lg(Info, "sockfd: %d, client info -> %s:%d quit…", sock, \\
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
else
{
if(errno == EWOULDBLOCK)
break;
else if(errno == EINTR)
continue;
else
{
lg(Warning, "sockfd: %d, client info -> %s:%d recv error…", sock, \\
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
// 调用回调函数将数据交付上层处理
_OnMessage(connection);
}
void EnableEvent(int sock, bool readable, bool writeable)
{
uint32_t events = 0;
events |= ((readable == true ? EPOLLIN : 0) | \\
(writeable == true ? EPOLLOUT : 0) | EPOLLET);
_epoller_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);
}
void Sender(std::shared_ptr<Connection> connection)
{
int sock = connection->SockFd();
std::string& outbuffer = connection->OutBuffer();
while(true)
{
ssize_t n = send(sock, outbuffer.c_str(), outbuffer.size(), 0);
if(n > 0)
{
outbuffer.erase(0, n);
if(outbuffer.empty())
break;
}
else if(n == 0)
{
return;
}
else
{
if(errno == EWOULDBLOCK)
break;
else if(errno == EINTR)
continue;
else
{
lg(Warning, "sockfd: %d, client info -> %s:%d send error…", sock, \\
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
if(!outbuffer.empty())
{
// 开始对写事件的关心
EnableEvent(sock, true, true);
}
else
{
// 关闭对写事件的关心
EnableEvent(sock, true, false);
}
}
void Excepter(std::shared_ptr<Connection> connection)
{
int fd = connection->SockFd();
lg(Warning, "Excepter handler socket: %d, client info -> %s:%d, excepter handler", \\
fd, connection->_ip.c_str(), connection->_port);
// 1. 在内核epoll模型的红黑树rb_tree中移除对特定fd的关心
_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
// 2. close关闭异常的文件描述符
lg(Debug, "close %d done…", fd);
close(fd);
// 3. 从unordered_map中移除文件描述符sock及其Connection对象
lg(Debug, "remove %d from _connections…", fd);
_connections.erase(fd);
}
reactor_server:Main.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f reactor_server
运行结果如下 
.PHONY:all
all:reactor_client reactor_server
reactor_client:ClientCal.cc
g++ -o $@ $^ -std=c++11 -ljsoncpp
reactor_server:Main.cc
g++ -o $@ $^ -std=c++11 -ljsoncpp
.PHONY:clean
clean:
rm -f reactor_client reactor_server
所以此时我们重新进行编译,成功 
测试
运行结果如下 
四、拓展
void Loop()
{
_quit = false;
while(!_quit)
{
Dispatcher(–1);
PrintConnection();
}
_quit = true;
}


五、写博客一年的总结


六、源代码
ClientCal.cc
#include <iostream>
#include <string>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include "Socket.hpp"
#include "Protocol.hpp"
static void Usage(const std::string& proc)
{
std::cout << "\\n\\t" << proc << " serverip serverport" << std::endl << std::endl;
}
// ./clientcal serverip serverport
int main(int argc, char* argv[])
{
if(argc != 3)
{
Usage(argv[0]);
exit(0);
}
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
Sock sockfd;
sockfd.Socket();
bool r = sockfd.Connect(serverip, serverport);
if(!r)
return 1;
srand(time(nullptr));
std::string opers = "+-*/%~?@=";
int cnt = 1;
std::string inbuffer_stream;
while(cnt <= 5)
{
std::cout << "————第" << cnt << "次测试————-" << std::endl;
int x = rand() % 10 + 1;
usleep(123);
int y = rand() % 10;
usleep(654);
char oper = opers[rand() % opers.size()];
Request req(x, y, oper);
req.DebugPrint();
std::string package;
req.Serialize(&package);
package = Encode(package);
write(sockfd.Fd(), package.c_str(), package.size());
// std::cout << "这是最新的发送出去的请求" << std::endl << package;
// write(sockfd.Fd(), package.c_str(), package.size());
// std::cout << "这是最新的发送出去的请求" << std::endl << package;
// write(sockfd.Fd(), package.c_str(), package.size());
// std::cout << "这是最新的发送出去的请求" << std::endl << package;
char buffer[128];
ssize_t n = read(sockfd.Fd(), buffer, sizeof(buffer));
if(n > 0)
{
buffer[n] = 0;
inbuffer_stream += buffer;
std::cout << inbuffer_stream << std::endl;
std::string content;
bool r = Decode(inbuffer_stream, &content);
assert(r);
Response resp;
r = resp.Deserialize(content);
assert(r);
resp.DebugPrint();
}
else if(n == 0)
break;
else
break;
std::cout << "———————————-" << std::endl;
sleep(2);
cnt++;
}
sockfd.Close();
return 0;
}
Comm.hpp
#pragma once
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include "Socket.hpp"
void SetNonBlockOrDie(int sock)
{
int fl = fcntl(sock, F_GETFL);
if(fl < 0)
exit(NON_BLOCK_ERR);
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}
Epoller.hpp
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/epoll.h>
#include "Log.hpp"
#include "nocopy.hpp"
class Epoller : public nocopy
{
static const int size = 128;
public:
Epoller()
{
_epfd = epoll_create(size);
if(_epfd == –1)
{
lg(Error, "epoll_create error: %s", strerror(errno));
}
else
{
lg(Info, "epoller_create success, epfd: %d", _epfd);
}
}
int EpollerWait(struct epoll_event revents[], int num, int timeout)
{
// int n = epoll_wait(_epfd, revents, num, _timeout);
// int n = epoll_wait(_epfd, revents, num, 0);
// int n = epoll_wait(_epfd, revents, num, -1);
int n = epoll_wait(_epfd, revents, num, timeout);
return n;
}
int EpollerUpdate(int oper, int sock, uint32_t event)
{
int n = 0;
if(oper == EPOLL_CTL_DEL)
{
n = epoll_ctl(_epfd, oper, sock, nullptr);
if(n == –1)
{
lg(Error, "epoll_ctl delete error");
}
}
else
{
// EPOLL_CTL_ADD || EPOLL_CTL_MOD
struct epoll_event ev;
ev.data.fd = sock;
ev.events = event;
n = epoll_ctl(_epfd, oper, sock, &ev);
if(n == –1)
{
lg(Error, "epoll_ctl error");
}
}
return n;
}
~Epoller()
{
if(_epfd >= 0)
{
close(_epfd);
}
}
private:
int _epfd;
// int _timeout{3000};
};
Log.hpp
#pragma once
#include <iostream>
#include <string>
#include <ctime>
#include <cstdio>
#include <cstdarg>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1 //输出到屏幕上
#define Onefile 2 //输出到一个文件中
#define Classfile 3 //根据事件等级输出到不同的文件中
#define LogFile "log.txt" //日志名称
class Log
{
public:
Log()
{
printMethod = Screen;
path = "./log/";
}
void Enable(int method) //改变日志打印方式
{
printMethod = method;
}
~Log()
{}
std::string levelToString(int level)
{
switch(level)
{
case Info:
return "Info";
case Debug:
return "Debug";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "";
}
}
void operator()(int level, const char* format, ...)
{
//默认部分 = 日志等级 + 日志时间
time_t t = time(nullptr);
struct tm* ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
char logtxt[2 * SIZE];
snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer);
printLog(level, logtxt);
}
void printLog(int level, const std::string& logtxt)
{
switch(printMethod)
{
case Screen:
std::cout << logtxt << std::endl;
break;
case Onefile:
printOneFile(LogFile, logtxt);
break;
case Classfile:
printClassFile(level, logtxt);
break;
default:
break;
}
}
void printOneFile(const std::string& logname, const std::string& logtxt)
{
std::string _logname = path + logname;
int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666);
if(fd < 0)
return;
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const std::string& logtxt)
{
std::string filename = LogFile;
filename += ".";
filename += levelToString(level);
printOneFile(filename, logtxt);
}
private:
int printMethod;
std::string path;
};
Log lg;
Main.cc
#include <iostream>
#include <memory>
#include "Log.hpp"
#include "TcpServer.hpp"
#include "ServerCal.hpp"
ServerCal calculator;
void DefaultOnMessage(std::shared_ptr<Connection> connection)
{
std::cout << "上层得到了数据: " << connection->InBuffer() << std::endl;
std::string response_str = calculator.Calculator(connection->InBuffer());
if(response_str.empty())
return;
lg(Debug, "%s", response_str.c_str());
connection->AppendOutBuffer(response_str);
auto tcp_server_ptr = connection->_tcp_server_ptr.lock();
tcp_server_ptr->Sender(connection);
}
int main()
{
std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080, DefaultOnMessage));
epoll_svr->Init();
epoll_svr->Loop();
return 0;
}
makefile
.PHONY:all
all:reactor_client reactor_server
reactor_client:ClientCal.cc
g++ -o $@ $^ -std=c++11 -ljsoncpp
reactor_server:Main.cc
g++ -o $@ $^ -std=c++11 -ljsoncpp
.PHONY:clean
clean:
rm -f reactor_client reactor_server
nocopy.hpp
#pragma once
class nocopy
{
public:
nocopy(){};
nocopy(const nocopy& ) = delete;
const nocopy& operator=(const nocopy& ) = delete;
};
Protocol.hpp
#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
const std::string black_space_sep = " ";
const std::string protocol_sep = "\\n";
// 123 + 4 -> len\\n123 + 4\\n
std::string Encode(const std::string& content)
{
std::string package = std::to_string(content.size());
package += protocol_sep;
package += content;
package += protocol_sep;
return package;
}
// len\\n123 + 4\\n -> 123 + 4
bool Decode(std::string& package, std::string* content)
{
size_t pos = package.find(protocol_sep);
if(pos == std::string::npos)
return false;
std::string len_str = package.substr(0, pos);
size_t len = std::stoi(len_str);
int total_len = len + 1 + len_str.size() + 1;
if(package.size() < total_len)
return false;
*content += package.substr(pos + 1, len);
//erase ??
package.erase(0, total_len);
return true;
}
class Request
{
public:
Request(int data1, int data2, char oper)
: x(data1)
, y(data2)
, op(oper)
{}
Request()
{}
//123 + 4
bool Serialize(std::string* out)
{
#ifdef MySelf
std::string s = std::to_string(x);
s += black_space_sep;
s += op;
s += black_space_sep;
s += std::to_string(y);
*out = s;
return true;
#else
Json::Value root;
// 初始化
root["x"] = x;
root["y"] = y;
root["op"] = op;
// 序列化 无论是采用Json::FastWriter还是Json::StyledWriter都可以
// 这里看个人喜好自行选择即可
// Json::FastWriter w;
Json::StyledWriter w;
*out = w.write(root);
return true;
#endif
}
//123 + 4 -> 123 + 4
// 1 + 1
bool Deserialize(const std::string& in)
{
#ifdef MySelf
size_t left = in.find(black_space_sep);
if(left == std::string::npos)
return false;
std::string part_x = in.substr(0, left);
size_t right = in.rfind(black_space_sep);
if(right == std::string::npos)
return false;
std::string part_y = in.substr(right + 1);
if(left + 2 != right)
return false;
x = std::stoi(part_x);
y = std::stoi(part_y);
op = in[left + 1];
return true;
#else
Json::Value root;
Json::Reader r;
// 反序列化
r.parse(in, root);
// 解析
x = root["x"].asInt();
y = root["y"].asInt();
op = root["op"].asInt();
return true;
#endif
}
void DebugPrint()
{
std::cout << "新请求构建完成: " << x << op << y << "=?" << std::endl;
}
public:
int x;
int y;
char op;
};
class Response
{
public:
Response(int res, int c)
: result(res)
, code(c)
{}
Response()
{}
//100 0
bool Serialize(std::string* out)
{
#ifdef MySelf
std::string s = std::to_string(result);
s += black_space_sep;
s += std::to_string(code);
*out = s;
return true;
#else
Json::Value root;
//初始化
root["result"] = result;
root["code"] = code;
Json::StyledWriter w;
*out = w.write(root);
return true;
#endif
}
//100 0
bool Deserialize(const std::string& in)
{
#ifdef MySelf
size_t pos = in.find(black_space_sep);
if(pos == std::string::npos)
return false;
std::string part_left = in.substr(0, pos);
std::string part_right = in.substr(pos + 1);
result = std::stoi(part_left);
code = std::stoi(part_right);
return true;
#else
Json::Value root;
Json::Reader r;
//反序列化
r.parse(in, root);
// 解析
result = root["result"].asInt();
code = root["code"].asInt();
return true;
#endif
}
void DebugPrint()
{
std::cout << "结果响应完成, result: " << result << ", code: " << code << std::endl;
}
public:
int result;
int code;
};
ServerCal.hpp
#pragma once
#include "Log.hpp"
#include "Protocol.hpp"
enum
{
Div_Zero = 1,
Mod_Zero,
Other_Oper,
};
class ServerCal
{
public:
ServerCal()
{}
Response CalculatorHelper(Request &req)
{
Response resp(0, 0);
switch (req.op)
{
case '+':
resp.result = req.x + req.y;
break;
case '-':
resp.result = req.x – req.y;
break;
case '*':
resp.result = req.x * req.y;
break;
case '/':
{
if (req.y == 0)
resp.code = Div_Zero;
else
resp.result = req.x / req.y;
}
break;
case '%':
{
if (req.y == 0)
resp.code = Mod_Zero;
else
resp.result = req.x % req.y;
}
break;
default:
resp.code = Other_Oper;
break;
}
return resp;
}
std::string Calculator(std::string &package)
{
// 解码
std::string content;
bool r = Decode(package, &content);
if(!r)
return "";
Request req;
//反序列化
r = req.Deserialize(content);
if (!r)
return "";
//计算
Response resp = CalculatorHelper(req);
content = "";
// 序列化
resp.Serialize(&content);
// 转码
content = Encode(content);
return content;
}
~ServerCal()
{}
};
Socket.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.hpp"
const int backlog = 10;
enum{
SocketErr = 1,
BindErr,
ListenErr,
NON_BLOCK_ERR
};
class Sock
{
public:
Sock()
{}
void Socket()
{
sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd_ < 0)
{
lg(Fatal, "socket error, %s : %d", strerror(errno), errno);
exit(SocketErr);
}
int opt = 1;
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
}
void Bind(uint16_t port)
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
socklen_t len = sizeof(local);
if(bind(sockfd_, (struct sockaddr*)&local, len) < 0)
{
lg(Fatal, "bind error, %s : %d", strerror(errno), errno);
exit(BindErr);
}
}
void Listen()
{
if(listen(sockfd_, backlog) < 0)
{
lg(Fatal, "listen error, %s : %d", strerror(errno), errno);
exit(ListenErr);
}
}
int Accept(std::string* clientip, uint16_t* clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);
if(newfd < 0)
{
lg(Warning, "accept error, %s : %d", strerror(errno), errno);
return –1;
}
char ipstr[128];
inet_ntop(AF_INET, &(peer.sin_addr), ipstr, sizeof(ipstr));
*clientip = ipstr;
*clientport = ntohs(peer.sin_port);
return newfd;
}
bool Connect(const std::string& serverip, uint16_t serverport)
{
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
peer.sin_family = AF_INET;
peer.sin_port = htons(serverport);
inet_pton(AF_INET, serverip.c_str(), &(peer.sin_addr));
socklen_t len = sizeof(peer);
int n = connect(sockfd_, (struct sockaddr*)&peer, len);
if(n == –1)
{
std::cerr << "connect to " << serverip << ':' << serverport << "error" << std::endl;
return false;
}
return true;
}
void Close()
{
if(sockfd_ > 0)
{
close(sockfd_);
}
}
int Fd()
{
return sockfd_;
}
~Sock()
{}
private:
int sockfd_;
};
TcpServer.hpp
#include <iostream>
#include <memory>
#include <functional>
#include <string>
#include <unordered_map>
#include <cerrno>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "Socket.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
#include "Comm.hpp"
class Connection;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
static const int g_buffer_size = 128;
using func_t = std::function<void(std::shared_ptr<Connection>)>;
class Connection
{
public:
Connection(int sock, std::weak_ptr<TcpServer> tcp_server_ptr)
: _sock(sock), _tcp_server_ptr(tcp_server_ptr)
{}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
void AppendInBuffer(const std::string& info)
{
_inbuffer += info;
}
void AppendOutBuffer(const std::string& info)
{
_outbuffer += info;
}
std::string& InBuffer()
{
return _inbuffer;
}
std::string& OutBuffer()
{
return _outbuffer;
}
int SockFd()
{
return _sock;
}
~Connection()
{
if(_sock > 0)
{
close(_sock);
}
}
private:
int _sock;
std::string _inbuffer;
std::string _outbuffer;
public:
func_t _recv_cb;
func_t _send_cb;
func_t _except_cb;
std::weak_ptr<TcpServer> _tcp_server_ptr;
std::string _ip;
uint16_t _port;
};
class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
{
static const int num = 64;
public:
TcpServer(uint16_t port, func_t OnMessage)
: _quit(true)
, _port(port)
, _listensock_ptr(new Sock())
, _epoller_ptr(new Epoller())
, _OnMessage(OnMessage)
{}
void Init()
{
_listensock_ptr->Socket();
SetNonBlockOrDie(_listensock_ptr->Fd());
_listensock_ptr->Bind(_port);
_listensock_ptr->Listen();
lg(Info, "create listen socket success, listensock: %d", _listensock_ptr->Fd());
AddConnection(_listensock_ptr->Fd(), EVENT_IN, \\
std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
void AddConnection(int sock, uint32_t events, func_t recv_cb, func_t send_cb, func_t except_cb, \\
const std::string& ip = "0.0.0.0", uint16_t port = 0)
{
std::shared_ptr<Connection> new_connection(new Connection(sock, shared_from_this()));
new_connection->SetHandler(recv_cb, send_cb, except_cb);
new_connection->_ip = ip;
new_connection->_port = port;
_connections.insert(std::make_pair(sock, new_connection));
_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, events);
lg(Debug, "add a new connection success, sockfd: %d", sock);
}
void Accepter(std::shared_ptr<Connection> connection)
{
while(true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = ::accept(connection->SockFd(), (struct sockaddr*)&peer, &len);
if(sock >= 0)
{
uint16_t port = ntohs(peer.sin_port);
char ip[128];
inet_ntop(AF_INET, &(peer.sin_addr), ip, sizeof(ip));
lg(Debug, "get a new client, get info -> [%s:%d], sockfd: %d", ip, port, sock);
SetNonBlockOrDie(sock);
AddConnection(sock, EVENT_IN, \\
std::bind(&TcpServer::Recver, this, std::placeholders::_1), \\
std::bind(&TcpServer::Sender, this, std::placeholders::_1), \\
std::bind(&TcpServer::Excepter, this, std::placeholders::_1), \\
ip, port);
}
else
{
if(errno == EWOULDBLOCK)
break;
else if(errno == EINTR)
continue;
else
break;
}
}
}
void Recver(std::shared_ptr<Connection> connection)
{
int sock = connection->SockFd();
char buffer[g_buffer_size];
while(true)
{
memset(buffer, 0, sizeof(buffer));
ssize_t n = recv(sock, buffer, sizeof(buffer) – 1, 0);
if(n > 0)
{
// buffer[n] = 0; 由于memset已经置为了0,所以这里不需要这个操作了
connection->AppendInBuffer(buffer);
}
else if(n == 0)
{
lg(Info, "sockfd: %d, client info -> %s:%d quit…", sock, \\
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
else
{
if(errno == EWOULDBLOCK)
break;
else if(errno == EINTR)
continue;
else
{
lg(Warning, "sockfd: %d, client info -> %s:%d recv error…", sock, \\
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
// 调用回调函数将数据交付上层处理
_OnMessage(connection);
}
void Sender(std::shared_ptr<Connection> connection)
{
int sock = connection->SockFd();
std::string& outbuffer = connection->OutBuffer();
while(true)
{
ssize_t n = send(sock, outbuffer.c_str(), outbuffer.size(), 0);
if(n > 0)
{
outbuffer.erase(0, n);
if(outbuffer.empty())
break;
}
else if(n == 0)
{
return;
}
else
{
if(errno == EWOULDBLOCK)
break;
else if(errno == EINTR)
continue;
else
{
lg(Warning, "sockfd: %d, client info -> %s:%d send error…", sock, \\
connection->_ip.c_str(), connection->_port);
connection->_except_cb(connection);
return;
}
}
}
if(!outbuffer.empty())
{
// 开始对写事件的关心
EnableEvent(sock, true, true);
}
else
{
// 关闭对写事件的关心
EnableEvent(sock, true, false);
}
}
void Excepter(std::shared_ptr<Connection> connection)
{
int fd = connection->SockFd();
lg(Warning, "Excepter handler socket: %d, client info -> %s:%d, excepter handler", \\
fd, connection->_ip.c_str(), connection->_port);
// 1. 在内核epoll模型的红黑树rb_tree中移除对特定fd的关心
_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
// 2. close关闭异常的文件描述符
lg(Debug, "close %d done…", fd);
close(fd);
// 3. 从unordered_map中移除文件描述符sock及其Connection对象
lg(Debug, "remove %d from _connections…", fd);
_connections.erase(fd);
}
void EnableEvent(int sock, bool readable, bool writeable)
{
uint32_t events = 0;
events |= ((readable == true ? EPOLLIN : 0) | \\
(writeable == true ? EPOLLOUT : 0) | EPOLLET);
_epoller_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);
}
bool IsConnectionSafe(int fd)
{
// std::unordered_map<int, std::shared_ptr<Connection>>::iterator iter = _connections.find(fd);
auto iter = _connections.find(fd);
if(iter == _connections.end())
return false;
return true;
}
void Dispatcher(int timeout)
{
int n = _epoller_ptr->EpollerWait(_revs, num, timeout);
for(int i = 0; i < n; i++)
{
int sock = _revs[i].data.fd;
uint32_t events = _revs[i].events;
// 统一将事件异常转化为读写问题
if((events & EPOLLERR) | (events & EPOLLHUP))
events |= (EPOLLIN | EPOLLOUT);
// 这样可以简化逻辑,只需要处理读写问题
if((events & EPOLLIN) && IsConnectionSafe(sock))
{
if(_connections[sock]->_recv_cb)
_connections[sock]->_recv_cb(_connections[sock]);
}
if((events & EPOLLOUT) && IsConnectionSafe(sock))
{
if(_connections[sock]->_send_cb)
_connections[sock]->_send_cb(_connections[sock]);
}
}
}
void Loop()
{
_quit = false;
while(!_quit)
{
Dispatcher(–1);
PrintConnection();
}
_quit = true;
}
void PrintConnection()
{
std::cout << "_connections fd list: " << std::endl;
for(auto& connection : _connections)
{
std::cout << connection.first << ", ";
std::cout << "inbuffer: " << connection.second->InBuffer() << std::endl;
}
std::cout << std::endl;
}
~TcpServer()
{}
private:
bool _quit;
uint16_t _port;
std::shared_ptr<Sock> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
struct epoll_event _revs[num];
// 让上层处理信息
func_t _OnMessage;
};
总结
以上就是今天的博客内容啦,希望对读者朋友们有帮助 水滴石穿,坚持就是胜利,读者朋友们可以点个关注 点赞收藏加关注,找到小编不迷路!
网硕互联帮助中心






评论前必须登录!
注册