目录
封装发送队列
核心概念与基础知识
处理粘包问题
核心概念与基础知识
字节序处理和消息队列的控制
核心概念与基础知识
asio 处理粘包的简单办法
核心概念与基础知识
总结
封装发送队列
核心概念与基础知识
- 队列(_send_que)保证异步写的顺序性,避免连续发起 async_write 导致数据乱序;
- 互斥锁(_mutex)保证多线程(如业务线程调用 Send)操作队列的线程安全;
// 实际使用的服务器并不是应答式的,而是全双工通信方式
class CServer;
class CSession:public std::enable_shared_from_this<CSession>{
public:
CSession(asio::io_context& ioc, CServer* server):_socket(ioc), _server(server) {
uuids::uuid a_uuid = uuids::random_generator()();
_uuid = uuids::to_string(a_uuid);
}
asio::ip::tcp::socket& Socket() {
return _socket;
}
~CSession() {
std::cout <<"session destruct delete this " << this << std::endl;
}
void Start();
std::string& GetUuid();
private:
void handle_read(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> _self_shared);
void handle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared);
asio::ip::tcp::socket _socket;
enum{max_length = 1024};
char _data[max_length];
CServer* _server;
std::string _uuid;
};
class CServer {
public:
CServer(asio::io_context& io_context, short port);
void ClearSession(std::string);
private:
void HandleAccept(std::shared_ptr<CSession>, const boost::system::error_code& error);// 智能指针也是帮助实现伪闭包
void StartAccept();
asio::io_context& _io_context;
short _port;
asio::ip::tcp::acceptor _acceptor;
std::map<std::string, std::shared_ptr<CSession>> _sessions; // 用来实现伪闭包
};
// 数据节点设计
class MsgNode {
friend class CSession;
public:
MsgNode(char* msg, int max_len) {
_data = new char[max_len];
memcpy(_data, msg, max_len);
}
~MsgNode() {
delete[] _data;
}
private:
int _cur_len;
int _max_len;
char* _data;
};
// 首先要在CSession中新增一个队列存储要发送的数据,同时新增一个发送接口send
class CSession:public std::enable_shared_from_this<CSession> {
/* …. */
private:
void Send(char* msg, int _max_length);
std::queue<std::shared_ptr<MsgNode>> _send_que;
std::mutex _mutex;
};
// 实现发送接口
void CSession::Send(char *msg, int _max_length) {
bool pending = false;
std::lock_guard<std::mutex> lock(_mutex);
if (_send_que.size()> 0) {
pending = true;
}
_send_que.push(std::make_shared<MsgNode>(msg, _max_length));
if (pending) {
return;
}
asio::async_write(_socket, asio::buffer(_data, _max_length), std::bind(&CSeesion::handleWrite, this, std::placeholders::_1, shared_from_this()));
}
void CSeesion::handleWrite(const boost::system::error_code& error, std::shared_ptr<MsgNode> _self_shared) {
if (!error) {
std::lock_guard<std::mutex> lock(_sned_lock);
_send_que.pop();
if (!_send_que.empty()) {
auto& msgNode = _send_que.front();
asio::asnyc_write(_socket, asio::buffer(msgNode->data, msgNode->_max_len),
std::bind(&CSeesion::handleWrtie, this, std::placeholders::_1, _self_shared));
}
}
else {
std::cout << error.message() << std::endl;
_server->ClearSession(_uuid);
}
}
// 修改读回调,因为我们要一直监听对端发送的数据,所以要在每次收到数据后继续绑定监听事件
void CSession::HandleRead(const boost::system::error_code& error, std::size_t bytes_transferred, std::shared_ptr<CSession> _self_shared) {
if (!error) {
std::cout << "read data is" << _data << std::endl;
Send(_data, bytes_transferred);
memset(_data, 0, max_length);
_socket.async_read_some(asio::buffer(_data, max_length),
std::bind(&CSession::handleRead, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
}
else {
std::cout << error.message() << std::endl;
_server->ClearSession(_uuid);
}
}
处理粘包问题
核心概念与基础知识
- 粘包:连续发送的小数据被合并为一个 TCP 包(如发 "a"+"b",接收端可能收到 "ab");
- 拆包:大数据被拆分为多个 TCP 包(如发 1024 字节,接收端可能分两次收到 500+524 字节);
- 消息结构:头部(固定长度,存储数据体长度) + 数据体,HEAD_LENGTH 通常为 2 字节(short)或 4 字节(int);
- 解析逻辑:先读头部→解析数据体长度→再读对应长度的数据体,天然划分消息边界;
- 阶段 1:读取头部,处理「头部未收全」(单次 read_some 仅收到部分头部);
- 阶段 2:读取数据体,处理「数据体未收全」「粘包(多个消息)」;
class MsgNode
{
friend class CSession;
public:
MsgNode(char * msg, short max_len):_total_len(max_len + HEAD_LENGTH),_cur_len(0){
_data = new char[_total_len+1]();
memcpy(_data, &max_len, HEAD_LENGTH);
memcpy(_data+ HEAD_LENGTH, msg, max_len);
_data[_total_len] = '\\0';
}
MsgNode(short max_len):_total_len(max_len),_cur_len(0) {
_data = new char[_total_len +1]();
}
~MsgNode() {
delete[] _data;
}
void Clear() {
::memset(_data, 0, _total_len);
_cur_len = 0;
}
private:
short _cur_len;
short _total_len;
char* _data;
};
// CSession完善
//收到的消息结构
std::shared_ptr<MsgNode> _recv_msg_node;
//头部是否接受完毕
bool _b_head_parse;
//收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node;
void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self){
if (!error) {
//已经移动的字符数
int copy_len = 0;
while (bytes_transferred>0) {
if (!_b_head_parse) {
//收到的数据不足头部大小
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) {
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+ copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
//收到的数据比头部多
//头部剩余未复制的长度
int head_remain = HEAD_LENGTH – _recv_head_node->_cur_len;
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+copy_len, head_remain);
//更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transferred -= head_remain;
//获取头部数据
short data_len = 0;
memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
cout << "data_len is " << data_len << endl;
//头部长度非法
if (data_len > MAX_LENGTH) {
std::cout << "invalid data length is " << data_len << endl;
_server->ClearSession(_uuid);
return;
}
_recv_msg_node = make_shared<MsgNode>(data_len);
//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < data_len) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
//头部处理完成
_b_head_parse = true;
return;
}
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len);
_recv_msg_node->_cur_len += data_len;
copy_len += data_len;
bytes_transferred -= data_len;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\\0';
cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}
//已经处理完头部,处理上次未接受完的消息数据
//接收的数据仍不足剩余未处理的
int remain_msg = _recv_msg_node->_total_len – _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\\0';
cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}
// 当然也要处理客户端
字节序处理和消息队列的控制
核心概念与基础知识
- 主机字节序:CPU 存储数据的顺序,分小端(x86/x64 架构,低字节存低地址)和大端(部分嵌入式 CPU);
- 网络字节序:TCP/IP 协议规定的统一字节序(大端),跨机器通信必须将数据转为网络字节序,接收后转回主机字节序;
- host_to_network_short/long:16/32 位整数从主机字节序→网络字节序;
- network_to_host_short/long:16/32 位整数从网络字节序→主机字节序;
- 仅需转换「长度字段」(头部),数据体(如字符串)无需转换;
- 问题:高频调用 Send 会导致 _send_que 长度激增,占用大量内存且发送延迟变大;
- 解决方案:设置队列最大长度(MAX_SENDQUE),超过则丢弃新数据包,保证通信实时性;
// 计算机内部存储数据的方式有两种大端序和小端序
// 在进行网络编程的时候,通常使用的是大端序
//服务器使用网络字节序
// boost::asio::detail::socket_ops::host_to_network_long()将一个32位无符号整数从主机字节序转换为网络字节序
// boost::asio::detail::socket_ops::host_to_network_short()将一个16位无符号整数从主机字节序转换为网络字节序
// 在服务器的HandleRead函数里,添加对data_len的转换,将网络字节序转为本地字节序
short data_len = 0;
memcpy(&data_len, _recv_head_node->data, HEAD_LENGTH);
data_len = boost::asio::detail::socket_ops::network_to_host_short(data_len);
// 在服务器的发送数据时会构造消息节点,构造消息节点时,将发生长度由本地字节序换为网络字节序
MsgNode(char* msg, short max_len):_total_len(max_len + HEAD_LENGTH), cur_len(0) {
_data = new char[_total_len+1];
int max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
}
// 客户端也遵循相同的处理
// 消息队列控制
// 每个session都有一个发送队列,因为有的时候发送的频率过高会导致队列增大,所以要对队列大小做出限制,当队列大于指定长度时,就要丢弃要发送的数据包,以保证信息的快速收发
void CSeesion::Send(char* msg, int max_length) {
std::lock_guard<std::mutex> lock(_send_lock);
int send_que_size = _send_que.size();
if (send_que_size > MAX_SENDQUE) {
std::cout << "session: " << _uuid << " send que fulled, size is " << max_length << std::endl;
return;
}
_send_que.push_back(std::make_shared<MsgNode>(msg, max_length));
if (send_que_size > 1) {
return;
}
auto& msgnode= _send_que.front();
bosst::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&CSeesion::HandleWrite, this, std::placeholders::_1, std::placeholders::_2));
}
asio 处理粘包的简单办法
核心概念与基础知识
- 区别于 async_read_some(读取「至少 1 字节」就触发回调),async_read 是 Boost.Asio 高层封装,保证读取指定字节数后才触发回调(仅在「读取完成」或「出错」时回调);
- 天然解决拆包:无需手动追踪已读长度,async_read 内部循环调用 async_read_some 直到读取指定字节数;
- 阶段 1:async_read 读取固定长度的头部(HEAD_LENGTH),回调 HandleReadHead;
- 阶段 2:解析头部得到数据体长度,再 async_read 读取对应长度的数据体,回调 HandleReadMsg;
- 循环监听:数据体读取完成后,再次发起 async_read 读取下一个消息的头部,实现持续监听;
/ 改为使用boost::asio::async_read来指定读取字节数
// 获得头部数据
void CSeesion::Start() {
_recv_head_node->Clear();
asio::async_read(_socket, asio::buffer(_recv_head_node->data, HEAD_LENGTH),
std::bind(&CSeesion::HandleReadHead, this, std::placeholders::_1, std::placeholders::_2), SharedSelf());
}
void CSession::HandleReadHead(const boost::system::error_code& error, std::size_t bytes_transferred, std::shared_ptr<CSession> shared_self) {
if (!error) {
if (bytes_transferred < HEAD_LENGTH) {
std::cout << "read head length error" << std::endl;
Close();
_server->ClearSession(_uuid);
return;
}
// 头部接收完
short data_len = 0;
memcpy(&data_len, _recv_head_node->data, HEAD_LENGTH);
// 省略字节序转换
/* … */
if (data_len > MAX_LENGTH) {
std::cout << "invalid data length" << std::endl;
_server->ClearSession(__uuid);
return;
}
_recv_msg_node = make_shared<MsgNode>(data_len);
asio::async_read(_socket, asio::buffer(_recv_msg_node->data, _recv_msg_node->total_len),
std::bind(&CSession::HandleReadMsg, this, std::placeholders::_1, std::placeholders::_2, SharedSelf()));
}
else {
std::cout << "read head error" << std::endl;
Close();
_server->ClearSession(__uuid);
}
}
// 获取信息体
void CSession::HandleReadMsg(const boost::system::error_code& error, std::size_t bytes_transferred, std::shared_ptr<CSession> shared_self) {
if (!error) {
_recv_msg_node->data[_recv_msg_node->_total_len] = '\\0';
std::cout << _recv_msg_node->data << std::endl;
Send(_recv_msg_node->data, _recv_msg_node->total_len);
_recv_msg_node->Clear();
asio::async_read(_socket, asio::buffer(_recv_head_node->data, HEAD_LENGTH),
std::bind(&CSession::HandleReaddHead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf()));
}
else {
std::cout << "read msg error, error: " << error.what()<<std::endl;
Close();
_server->ClearSession(__uuid);
}
}
总结
- 手动解析:async_read_some + 长度前缀 + 循环处理剩余数据,灵活但代码复杂;
- 简化方案:async_read 分阶段读头部 / 数据体,省去手动分段逻辑,可读性更高;
- 字节序转换:跨机器通信必须转换长度字段(主机→网络→主机);
- 队列限流:避免高频发送导致队列膨胀,保证通信实时性;
- 线程安全:互斥锁保护队列操作,避免多线程竞争。
网硕互联帮助中心

评论前必须登录!
注册