云计算百科
云计算领域专业知识百科平台

基于boost库C++网络编程(三):发送队列和粘包问题

目录

封装发送队列

核心概念与基础知识

处理粘包问题

核心概念与基础知识

字节序处理和消息队列的控制

核心概念与基础知识

asio 处理粘包的简单办法

核心概念与基础知识

总结


封装发送队列

核心概念与基础知识

  • 全双工通信:区别于「读→写」的应答式交互,全双工允许服务端 / 客户端独立发起读写操作(如服务端主动推送数据、客户端随时发消息),需独立的发送队列管理异步写逻辑;
  • std::enable_shared_from_this:让 CSession 继承该类后,可通过 shared_from_this() 获取自身的 std::shared_ptr,解决异步回调中 delete this 导致的二次析构问题(回调执行期间 Session 引用计数 > 0,不会被析构);
  • 发送队列 + 互斥锁:
    • 队列(_send_que)保证异步写的顺序性,避免连续发起 async_write 导致数据乱序;
    • 互斥锁(_mutex)保证多线程(如业务线程调用 Send)操作队列的线程安全;
  • 伪闭包:回调中绑定 shared_from_this()(自身智能指针),本质是延长 Session 生命周期,确保回调执行时对象未被析构;
  • 会话管理:CServer 用 std::map<std::string, std::shared_ptr<CSession>> 管理所有活跃连接,ClearSession 清理失效连接(客户端断开 / 出错时);
  • UUID 标识:为每个 Session 生成唯一 UUID,作为会话管理的键值,避免指针直接作为标识的风险。
  • // 实际使用的服务器并不是应答式的,而是全双工通信方式
    class CServer;
    class CSession:public std::enable_shared_from_this<CSession>{

    public:
    CSession(asio::io_context& ioc, CServer* server):_socket(ioc), _server(server) {
    uuids::uuid a_uuid = uuids::random_generator()();
    _uuid = uuids::to_string(a_uuid);
    }

    asio::ip::tcp::socket& Socket() {
    return _socket;
    }

    ~CSession() {
    std::cout <<"session destruct delete this " << this << std::endl;
    }

    void Start();
    std::string& GetUuid();

    private:
    void handle_read(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> _self_shared);
    void handle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared);
    asio::ip::tcp::socket _socket;
    enum{max_length = 1024};
    char _data[max_length];
    CServer* _server;
    std::string _uuid;
    };

    class CServer {

    public:
    CServer(asio::io_context& io_context, short port);
    void ClearSession(std::string);

    private:
    void HandleAccept(std::shared_ptr<CSession>, const boost::system::error_code& error);// 智能指针也是帮助实现伪闭包
    void StartAccept();
    asio::io_context& _io_context;
    short _port;
    asio::ip::tcp::acceptor _acceptor;
    std::map<std::string, std::shared_ptr<CSession>> _sessions; // 用来实现伪闭包
    };

    // 数据节点设计
    class MsgNode {
    friend class CSession;

    public:
    MsgNode(char* msg, int max_len) {
    _data = new char[max_len];
    memcpy(_data, msg, max_len);
    }

    ~MsgNode() {
    delete[] _data;
    }

    private:
    int _cur_len;
    int _max_len;
    char* _data;
    };

    // 首先要在CSession中新增一个队列存储要发送的数据,同时新增一个发送接口send
    class CSession:public std::enable_shared_from_this<CSession> {
    /* …. */

    private:
    void Send(char* msg, int _max_length);
    std::queue<std::shared_ptr<MsgNode>> _send_que;
    std::mutex _mutex;
    };

    // 实现发送接口
    void CSession::Send(char *msg, int _max_length) {
    bool pending = false;
    std::lock_guard<std::mutex> lock(_mutex);
    if (_send_que.size()> 0) {
    pending = true;
    }
    _send_que.push(std::make_shared<MsgNode>(msg, _max_length));
    if (pending) {
    return;
    }
    asio::async_write(_socket, asio::buffer(_data, _max_length), std::bind(&CSeesion::handleWrite, this, std::placeholders::_1, shared_from_this()));
    }

    void CSeesion::handleWrite(const boost::system::error_code& error, std::shared_ptr<MsgNode> _self_shared) {
    if (!error) {
    std::lock_guard<std::mutex> lock(_sned_lock);
    _send_que.pop();
    if (!_send_que.empty()) {
    auto& msgNode = _send_que.front();
    asio::asnyc_write(_socket, asio::buffer(msgNode->data, msgNode->_max_len),
    std::bind(&CSeesion::handleWrtie, this, std::placeholders::_1, _self_shared));
    }
    }
    else {
    std::cout << error.message() << std::endl;
    _server->ClearSession(_uuid);
    }
    }

    // 修改读回调,因为我们要一直监听对端发送的数据,所以要在每次收到数据后继续绑定监听事件
    void CSession::HandleRead(const boost::system::error_code& error, std::size_t bytes_transferred, std::shared_ptr<CSession> _self_shared) {
    if (!error) {
    std::cout << "read data is" << _data << std::endl;
    Send(_data, bytes_transferred);
    memset(_data, 0, max_length);
    _socket.async_read_some(asio::buffer(_data, max_length),
    std::bind(&CSession::handleRead, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
    }
    else {
    std::cout << error.message() << std::endl;
    _server->ClearSession(_uuid);
    }
    }

    处理粘包问题

    核心概念与基础知识

  • TCP 粘包 / 拆包本质:TCP 是「流式协议」,无消息边界,会出现两种情况:
    • 粘包:连续发送的小数据被合并为一个 TCP 包(如发 "a"+"b",接收端可能收到 "ab");
    • 拆包:大数据被拆分为多个 TCP 包(如发 1024 字节,接收端可能分两次收到 500+524 字节);
  • 长度前缀法(主流粘包解决方案):
    • 消息结构:头部(固定长度,存储数据体长度) + 数据体,HEAD_LENGTH 通常为 2 字节(short)或 4 字节(int);
    • 解析逻辑:先读头部→解析数据体长度→再读对应长度的数据体,天然划分消息边界;
  • 分阶段读取逻辑:
    • 阶段 1:读取头部,处理「头部未收全」(单次 read_some 仅收到部分头部);
    • 阶段 2:读取数据体,处理「数据体未收全」「粘包(多个消息)」;
  • 循环处理剩余数据:单次 async_read_some 可能收到「多个完整消息」或「部分消息 + 完整消息」,需循环解析缓冲区数据,直到处理完毕。
  • class MsgNode
    {
    friend class CSession;
    public:
    MsgNode(char * msg, short max_len):_total_len(max_len + HEAD_LENGTH),_cur_len(0){
    _data = new char[_total_len+1]();
    memcpy(_data, &max_len, HEAD_LENGTH);
    memcpy(_data+ HEAD_LENGTH, msg, max_len);
    _data[_total_len] = '\\0';
    }

    MsgNode(short max_len):_total_len(max_len),_cur_len(0) {
    _data = new char[_total_len +1]();
    }

    ~MsgNode() {
    delete[] _data;
    }

    void Clear() {
    ::memset(_data, 0, _total_len);
    _cur_len = 0;
    }
    private:
    short _cur_len;
    short _total_len;
    char* _data;
    };

    // CSession完善
    //收到的消息结构
    std::shared_ptr<MsgNode> _recv_msg_node;
    //头部是否接受完毕
    bool _b_head_parse;
    //收到的头部结构
    std::shared_ptr<MsgNode> _recv_head_node;

    void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self){
    if (!error) {
    //已经移动的字符数
    int copy_len = 0;
    while (bytes_transferred>0) {
    if (!_b_head_parse) {
    //收到的数据不足头部大小
    if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) {
    memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+ copy_len, bytes_transferred);
    _recv_head_node->_cur_len += bytes_transferred;
    ::memset(_data, 0, MAX_LENGTH);
    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
    std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
    return;
    }
    //收到的数据比头部多
    //头部剩余未复制的长度
    int head_remain = HEAD_LENGTH – _recv_head_node->_cur_len;
    memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+copy_len, head_remain);
    //更新已处理的data长度和剩余未处理的长度
    copy_len += head_remain;
    bytes_transferred -= head_remain;
    //获取头部数据
    short data_len = 0;
    memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
    cout << "data_len is " << data_len << endl;
    //头部长度非法
    if (data_len > MAX_LENGTH) {
    std::cout << "invalid data length is " << data_len << endl;
    _server->ClearSession(_uuid);
    return;
    }
    _recv_msg_node = make_shared<MsgNode>(data_len);

    //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
    if (bytes_transferred < data_len) {
    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
    _recv_msg_node->_cur_len += bytes_transferred;
    ::memset(_data, 0, MAX_LENGTH);
    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
    std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
    //头部处理完成
    _b_head_parse = true;
    return;
    }

    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len);
    _recv_msg_node->_cur_len += data_len;
    copy_len += data_len;
    bytes_transferred -= data_len;
    _recv_msg_node->_data[_recv_msg_node->_total_len] = '\\0';
    cout << "receive data is " << _recv_msg_node->_data << endl;
    //此处可以调用Send发送测试
    Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
    //继续轮询剩余未处理数据
    _b_head_parse = false;
    _recv_head_node->Clear();
    if (bytes_transferred <= 0) {
    ::memset(_data, 0, MAX_LENGTH);
    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
    std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
    return;
    }
    continue;
    }

    //已经处理完头部,处理上次未接受完的消息数据
    //接收的数据仍不足剩余未处理的
    int remain_msg = _recv_msg_node->_total_len – _recv_msg_node->_cur_len;
    if (bytes_transferred < remain_msg) {
    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
    _recv_msg_node->_cur_len += bytes_transferred;
    ::memset(_data, 0, MAX_LENGTH);
    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
    std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
    return;
    }
    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
    _recv_msg_node->_cur_len += remain_msg;
    bytes_transferred -= remain_msg;
    copy_len += remain_msg;
    _recv_msg_node->_data[_recv_msg_node->_total_len] = '\\0';
    cout << "receive data is " << _recv_msg_node->_data << endl;
    //此处可以调用Send发送测试
    Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
    //继续轮询剩余未处理数据
    _b_head_parse = false;
    _recv_head_node->Clear();
    if (bytes_transferred <= 0) {
    ::memset(_data, 0, MAX_LENGTH);
    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
    std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
    return;
    }
    continue;
    }
    }
    else {
    std::cout << "handle read failed, error is " << error.what() << endl;
    Close();
    _server->ClearSession(_uuid);
    }
    }

    // 当然也要处理客户端

    字节序处理和消息队列的控制

    核心概念与基础知识

  • 字节序(Endianness):
    • 主机字节序:CPU 存储数据的顺序,分小端(x86/x64 架构,低字节存低地址)和大端(部分嵌入式 CPU);
    • 网络字节序:TCP/IP 协议规定的统一字节序(大端),跨机器通信必须将数据转为网络字节序,接收后转回主机字节序;
  • Boost.Asio 字节序转换接口:
    • host_to_network_short/long:16/32 位整数从主机字节序→网络字节序;
    • network_to_host_short/long:16/32 位整数从网络字节序→主机字节序;
    • 仅需转换「长度字段」(头部),数据体(如字符串)无需转换;
  • 发送队列限流:
    • 问题:高频调用 Send 会导致 _send_que 长度激增,占用大量内存且发送延迟变大;
    • 解决方案:设置队列最大长度(MAX_SENDQUE),超过则丢弃新数据包,保证通信实时性;
  • 线程安全强化:Send 接口可能被业务线程调用,需用 std::lock_guard 保护队列操作,避免多线程竞争。
  • // 计算机内部存储数据的方式有两种大端序和小端序
    // 在进行网络编程的时候,通常使用的是大端序

    //服务器使用网络字节序
    // boost::asio::detail::socket_ops::host_to_network_long()将一个32位无符号整数从主机字节序转换为网络字节序
    // boost::asio::detail::socket_ops::host_to_network_short()将一个16位无符号整数从主机字节序转换为网络字节序

    // 在服务器的HandleRead函数里,添加对data_len的转换,将网络字节序转为本地字节序
    short data_len = 0;
    memcpy(&data_len, _recv_head_node->data, HEAD_LENGTH);
    data_len = boost::asio::detail::socket_ops::network_to_host_short(data_len);

    // 在服务器的发送数据时会构造消息节点,构造消息节点时,将发生长度由本地字节序换为网络字节序
    MsgNode(char* msg, short max_len):_total_len(max_len + HEAD_LENGTH), cur_len(0) {
    _data = new char[_total_len+1];
    int max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
    }
    // 客户端也遵循相同的处理

    // 消息队列控制
    // 每个session都有一个发送队列,因为有的时候发送的频率过高会导致队列增大,所以要对队列大小做出限制,当队列大于指定长度时,就要丢弃要发送的数据包,以保证信息的快速收发
    void CSeesion::Send(char* msg, int max_length) {
    std::lock_guard<std::mutex> lock(_send_lock);
    int send_que_size = _send_que.size();
    if (send_que_size > MAX_SENDQUE) {
    std::cout << "session: " << _uuid << " send que fulled, size is " << max_length << std::endl;
    return;
    }

    _send_que.push_back(std::make_shared<MsgNode>(msg, max_length));
    if (send_que_size > 1) {
    return;
    }
    auto& msgnode= _send_que.front();
    bosst::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
    std::bind(&CSeesion::HandleWrite, this, std::placeholders::_1, std::placeholders::_2));
    }

    asio 处理粘包的简单办法

    核心概念与基础知识

  • async_read 核心特性:
    • 区别于 async_read_some(读取「至少 1 字节」就触发回调),async_read 是 Boost.Asio 高层封装,保证读取指定字节数后才触发回调(仅在「读取完成」或「出错」时回调);
    • 天然解决拆包:无需手动追踪已读长度,async_read 内部循环调用 async_read_some 直到读取指定字节数;
  • 分阶段 async_read 解粘包:
    • 阶段 1:async_read 读取固定长度的头部(HEAD_LENGTH),回调 HandleReadHead;
    • 阶段 2:解析头部得到数据体长度,再 async_read 读取对应长度的数据体,回调 HandleReadMsg;
    • 循环监听:数据体读取完成后,再次发起 async_read 读取下一个消息的头部,实现持续监听;
  • 简化逻辑:相比 async_read_some + 手动循环解析,async_read 省去「处理部分数据、剩余数据循环」的逻辑,代码更简洁,适合对可读性要求高的场景。
  • / 改为使用boost::asio::async_read来指定读取字节数

    // 获得头部数据
    void CSeesion::Start() {
    _recv_head_node->Clear();
    asio::async_read(_socket, asio::buffer(_recv_head_node->data, HEAD_LENGTH),
    std::bind(&CSeesion::HandleReadHead, this, std::placeholders::_1, std::placeholders::_2), SharedSelf());
    }

    void CSession::HandleReadHead(const boost::system::error_code& error, std::size_t bytes_transferred, std::shared_ptr<CSession> shared_self) {
    if (!error) {
    if (bytes_transferred < HEAD_LENGTH) {
    std::cout << "read head length error" << std::endl;
    Close();
    _server->ClearSession(_uuid);
    return;
    }

    // 头部接收完
    short data_len = 0;
    memcpy(&data_len, _recv_head_node->data, HEAD_LENGTH);
    // 省略字节序转换
    /* … */
    if (data_len > MAX_LENGTH) {
    std::cout << "invalid data length" << std::endl;
    _server->ClearSession(__uuid);
    return;
    }

    _recv_msg_node = make_shared<MsgNode>(data_len);
    asio::async_read(_socket, asio::buffer(_recv_msg_node->data, _recv_msg_node->total_len),
    std::bind(&CSession::HandleReadMsg, this, std::placeholders::_1, std::placeholders::_2, SharedSelf()));

    }

    else {
    std::cout << "read head error" << std::endl;
    Close();
    _server->ClearSession(__uuid);
    }
    }

    // 获取信息体
    void CSession::HandleReadMsg(const boost::system::error_code& error, std::size_t bytes_transferred, std::shared_ptr<CSession> shared_self) {
    if (!error) {
    _recv_msg_node->data[_recv_msg_node->_total_len] = '\\0';
    std::cout << _recv_msg_node->data << std::endl;
    Send(_recv_msg_node->data, _recv_msg_node->total_len);
    _recv_msg_node->Clear();
    asio::async_read(_socket, asio::buffer(_recv_head_node->data, HEAD_LENGTH),
    std::bind(&CSession::HandleReaddHead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf()));
    }
    else {
    std::cout << "read msg error, error: " << error.what()<<std::endl;
    Close();
    _server->ClearSession(__uuid);
    }
    }

    总结

  • 发送队列封装:全双工通信需独立发送队列 +enable_shared_from_this,解决异步写乱序和 Session 析构风险;
  • 粘包处理核心:
    • 手动解析:async_read_some + 长度前缀 + 循环处理剩余数据,灵活但代码复杂;
    • 简化方案:async_read 分阶段读头部 / 数据体,省去手动分段逻辑,可读性更高;
  • 关键优化:
    • 字节序转换:跨机器通信必须转换长度字段(主机→网络→主机);
    • 队列限流:避免高频发送导致队列膨胀,保证通信实时性;
    • 线程安全:互斥锁保护队列操作,避免多线程竞争。
  • 赞(0)
    未经允许不得转载:网硕互联帮助中心 » 基于boost库C++网络编程(三):发送队列和粘包问题
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!