简介
TcpServer实现了多线程主从Reactor服务器
结构
#mermaid-svg-NpKBEy7p5qZMONaq {font-family:\”trebuchet ms\”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-NpKBEy7p5qZMONaq .error-icon{fill:#552222;}#mermaid-svg-NpKBEy7p5qZMONaq .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-NpKBEy7p5qZMONaq .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-NpKBEy7p5qZMONaq .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-NpKBEy7p5qZMONaq .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-NpKBEy7p5qZMONaq .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-NpKBEy7p5qZMONaq .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-NpKBEy7p5qZMONaq .marker{fill:#333333;stroke:#333333;}#mermaid-svg-NpKBEy7p5qZMONaq .marker.cross{stroke:#333333;}#mermaid-svg-NpKBEy7p5qZMONaq svg{font-family:\”trebuchet ms\”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-NpKBEy7p5qZMONaq g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:\”trebuchet ms\”,verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-NpKBEy7p5qZMONaq g.classGroup text .title{font-weight:bolder;}#mermaid-svg-NpKBEy7p5qZMONaq .nodeLabel,#mermaid-svg-NpKBEy7p5qZMONaq .edgeLabel{color:#131300;}#mermaid-svg-NpKBEy7p5qZMONaq .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-NpKBEy7p5qZMONaq .label text{fill:#131300;}#mermaid-svg-NpKBEy7p5qZMONaq .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-NpKBEy7p5qZMONaq .classTitle{font-weight:bolder;}#mermaid-svg-NpKBEy7p5qZMONaq .node rect,#mermaid-svg-NpKBEy7p5qZMONaq .node circle,#mermaid-svg-NpKBEy7p5qZMONaq .node ellipse,#mermaid-svg-NpKBEy7p5qZMONaq .node polygon,#mermaid-svg-NpKBEy7p5qZMONaq .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-NpKBEy7p5qZMONaq .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-NpKBEy7p5qZMONaq g.clickable{cursor:pointer;}#mermaid-svg-NpKBEy7p5qZMONaq g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-NpKBEy7p5qZMONaq g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-NpKBEy7p5qZMONaq .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-NpKBEy7p5qZMONaq .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-NpKBEy7p5qZMONaq .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-NpKBEy7p5qZMONaq .dashed-line{stroke-dasharray:3;}#mermaid-svg-NpKBEy7p5qZMONaq #compositionStart,#mermaid-svg-NpKBEy7p5qZMONaq .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-NpKBEy7p5qZMONaq #compositionEnd,#mermaid-svg-NpKBEy7p5qZMONaq .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-NpKBEy7p5qZMONaq #dependencyStart,#mermaid-svg-NpKBEy7p5qZMONaq .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-NpKBEy7p5qZMONaq #dependencyStart,#mermaid-svg-NpKBEy7p5qZMONaq .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-NpKBEy7p5qZMONaq #extensionStart,#mermaid-svg-NpKBEy7p5qZMONaq .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-NpKBEy7p5qZMONaq #extensionEnd,#mermaid-svg-NpKBEy7p5qZMONaq .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-NpKBEy7p5qZMONaq #aggregationStart,#mermaid-svg-NpKBEy7p5qZMONaq .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-NpKBEy7p5qZMONaq #aggregationEnd,#mermaid-svg-NpKBEy7p5qZMONaq .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-NpKBEy7p5qZMONaq .edgeTerminals{font-size:11px;}#mermaid-svg-NpKBEy7p5qZMONaq :root{–mermaid-font-family:\”trebuchet ms\”,verdana,arial,sans-serif;}TcpServerAcceptorTcpConnectionEventLoopThreadPool
Acceptor新连接回调:
void newConnection(int sockfd, const InetAddress& peerAddr);
TcpConnection的四个回调
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
/// Set message callback.
/// Not thread safe.
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
/// Set write complete callback.
/// Not thread safe.
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }
void removeConnection(const TcpConnectionPtr& conn);
事件循环线程池线程启动初始回调:
void setThreadInitCallback(const ThreadInitCallback& cb)
{ threadInitCallback_ = cb; }
Acceptor连接回调
- 从事件循环池中轮循方式选取一个
- 创建TcpConnection,将TcpServer设置的connectionCallback_, messageCallback_和writeCompleteCallback_传递到TcpConnection,同时设置关闭连接的回调removeConnection
- 创建连接建立的任务connectEstablished放入连接事件循环的队列中异步执行
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] – new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
连接建立回调connectEstablished
- 设置连接状态为kConnected
- 事件通道绑定连接对象,开启读事件
- 执行事件回调connectionCallback_
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading();
connectionCallback_(shared_from_this());
}
关闭连接回调
- 添加任务removeConnectionInLoop到acceptor所在的事件循环队列中异步执行
- 连接容器中删除对应连接
- 添加任务connectDestroyed到连接所在的事件循环队列中异步执行
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// FIXME: unsafe
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] – connection " << conn->name();
size_t n = connections_.erase(conn->name());
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
评论前必须登录!
注册