// Acceptor 是由 mainLoop 管理 listenfd,负责接收新连接。
Acceptor:mainLoop -> listenfd -> Channel
// TcpConnection 是由 subLoop 管理 connfd,负责处理具体连接。
TcpConnection:subLoop -> connfd -> Channel
成员变量
enum StateE { KDisconnected, KConnecting,KConnected,KDisconnecting }; //表示当前连接的状态: KDisconnected:已断开 KConnecting:正在连接中 KConnected:连接已建立 KDisconnecting:正在断开 EventLoop *loop_;//不是baseloop,因为Tcpconnection都是在subloop管理的 const std::string name_;//当前连接的名字(通常由 TcpServer 指定,用于日志或识别) std::atomic_int state_;//当前连接的状态,由上面的 StateE 表示,使用 std::atomic 是为了线程安全。 bool reading_;//表示当前是否正在监听读事件 std::unique_ptr<Socket>socket_;//封装 TCP 连接使用的 socket fd。unique_ptr 表示该连接独占该 socket,生命周期跟随 TcpConnection。 std::unique_ptr<Channel>channel_;//封装该连接对应的 Channel,用于监听 fd 的可读/可写事件。TcpConnection 通过它注册 handleRead, handleWrite 等回调 const InetAddress localAddr_;//本地地址(本机绑定的 IP 和端口) const InetAddress peerAddr_;//对端地址(客户端的 IP 和端口) //这些函数在不同事件触发时被调用,由用户或上层 TcpServer 设置 ConnectionCallback connectionCallback_;//新连接时的回调· MessageCallback messageCallback_; //有读写消息时的回调 WriteCompleteCallback writeCompleteCallback_;//消息发送完成的回调 HighWaterMarkCallback highWaterMarkCallback_;//当 outputBuffer_ 写入数据太多(超过高水位线)时触发 CloseCallback usercloseCallback_;//对端关闭连接时调用 size_t highWaterMark_;//水位线 Buffer inputBuffer_;// 存储从 socket 读出来的数据(接收缓冲区) Buffer outputBuffer_; // 存储准备写入 socket 的数据(发送缓冲区)
回调函数
using TcpconnectionPtr=std::shared_ptr<Tcpconnection>;//连接指针 using ConnectionCallback=std::function<void (const TcpconnectionPtr&)>;//连接建立或断开时触发 using CloseCallback=std::function<void (const TcpconnectionPtr&)>;//连接关闭时触发(比如对端关闭 socket、服务器主动关闭) using WriteCompleteCallback=std::function<void (const TcpconnectionPtr&)>;//当发送缓冲区数据全部写入 socket 后触发 using MessageCallback=std::function<void (const TcpconnectionPtr&,Buffer*,Timestamp)>;//每次读到数据就触发一次 using HighWaterMarkCallback = std::function<void(const TcpconnectionPtr&, size_t)>;//当发送缓冲区 outputBuffer_ 数据过多(超过某个“高水位线”)时触发,避免服务器“写爆”,用于流控和反压(backpressure)
成员函数
EventLoop*getLoop()const {return loop_;}//指向当前连接所在 subLoop 的指针(即管理这个连接的 EventLoop)。 const std::string&name()const{return name_;}//不能通过这个返回值修改 name_ 的内容。,返回连接名称 const InetAddress&localAddr()const{return localAddr_;}//返回本端地址(即该 TcpConnection 绑定的地址)。 const InetAddress&peerAddr()const{return peerAddr_;}//返回对端地址(连接的对方 IP 和端口)。 bool connected()const{return state_==KConnected;}//返回当前连接是否处于“已连接”状态。 //注册回调 void setConnectionCallback(const ConnectionCallback&cb)//注册新连接建立时的回调函数 { connectionCallback_=cb; } void setMessageCallback(const MessageCallback&cb)//注册消息到达时的回调函数(例如对端发送了数据) { messageCallback_=cb; } void setWriteCompleteCallback(const WriteCompleteCallback&cb)//注册写完成的回调函数,当发送缓冲区中的数据全部写完后调用,通常用于通知业务层“数据已发完” { writeCompleteCallback_=cb; } void setHighWaterMarkCallback(const HighWaterMarkCallback&cb,size_t highWaterMark)//当**发送缓冲区大小超过一定阈值(highWaterMark)**时调用。 { highWaterMarkCallback_=cb; highWaterMark_=highWaterMark; } void setCloseCallback(const CloseCallback&cb)//连接关闭时的回调函数,包括对端关闭或我们主动关闭时调用。 { usercloseCallback_=cb; } void setState(StateE state)//设置连接的当前状态: { state_=state; } static EventLoop* CheckLoopNotNull(EventLoop*loop)//确保传入的 subLoop* 非空; { if(loop==nullptr) { LOG_FATAL("%s:%s:%d Tcpconnectionloop is null!\n",__FILE__,__FUNCTION__,__LINE__); } return loop; } Tcpconnection::Tcpconnection(EventLoop *loop,const std::string &namearg,int sockfd,const InetAddress& LocalAddr,const InetAddress&peerAddr): loop_(CheckLoopNotNull(loop)),// 管理这个连接的 EventLoop(通常是某个 subLoop); name_(namearg),//连接名称 state_(KConnecting),//连接状态 reading_(true),//表示当前是否正在监听读事件 socket_(new Socket(sockfd)),//这个连接的 socket 文件描述符; channel_(new Channel(loop,sockfd)),//创建一个 连接Channel,后续会注册事件 localAddr_(LocalAddr),// 初始化本地和对端地址 peerAddr_(peerAddr), highWaterMark_(64*1024*1024)//64MB { //给channel设置相应的回调函数,poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数 channel_->setReadCallback(std::bind(&Tcpconnection::handleRead,this,std::placeholders::_1));//handleRead: 读事件发生时(客户端发来数据)调用 channel_->setWriteCallback(std::bind(&Tcpconnection::handlewrite,this));//handleWrite: 写缓冲区可写时调用;发数据 channel_->setCloseCallback(std::bind(&Tcpconnection::handleClose,this));//连接被对端关闭时调用 channel_->setErrorCallback(std::bind(&Tcpconnection::handleError,this));//发生错误时调用 LOG_INFO("Tcpconnection::ctor[%s] at fd=%d\n",namearg.c_str(),sockfd); socket_->setKeepAlive(true);//设置 TCP 的 SO_KEEPALIVE 选项,开启 TCP keepalive 机制,用于检测对端是否存活 } Tcpconnection::~Tcpconnection()//释放资源:Tcpconnection 是一个连接对象,它通过 unique_ptr<Socket> 和 unique_ptr<Channel> 管理 socket 和 channel,因此不需要显式释放内存,unique_ptr 会自动在析构时释放资源。 { LOG_INFO("Tcpconnection::dtor[%s] at fd=%d state=%d\n",name_.c_str(),channel_->fd(),state_.load()); //socket_->setKeepAlive(true); } //handleRead() 是处理读事件的核心函数:从 socket 读取数据 → 写入 inputBuffer_ → 调用上层注册的消息处理回调;若对端关闭或出错则调用关闭或错误处理函数。 void Tcpconnection::handleRead(Timestamp receiveTime)//收数据,当 epoll 通知 fd 可读时,Channel 会调用它 { int saveErrno=0; ssize_t n=inputBuffer_.readFD(channel_->fd(),&saveErrno);//从 socket 中读取数据,写入 inputBuffer_(接收缓冲区)中。 if(n>0) { //已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessage(交给上层) //如果读取成功(n > 0),说明客户端发来了数据。 //这时调用用户注册的消息处理回调 messageCallback_ messageCallback_(shared_from_this(),&inputBuffer_,receiveTime); } else if(n==0)//客户端断开 { handleClose(); } else { errno=saveErrno; LOG_ERROR("Tcpconnection::handleread"); handleError(); } } //用于处理 EPOLLOUT 可写事件(也就是 socket 缓冲区有空间时触发),常见于“发送缓冲区有数据等待发送”的情况。,发送缓冲区可写 void Tcpconnection::handlewrite()//从outbuffer向connfd写数据 { if(channel_->isWriting())//isWriting() 表示是否注册了 EPOLLOUT 到 poller。如果没有监听写事件,就不需要执行后续逻辑(说明之前没有待发送的数据)。 { int saveErrno=0; ssize_t n=outputBuffer_.writeFD(channel_->fd(),&saveErrno);//发送数据到fd,把 outputBuffer_ 中的数据写入 socket(fd) if(n>0)//写入成功 { outputBuffer_.retrieve(n);// “消费了 n 字节”,可读指针增加n if(outputBuffer_.readableBytes()==0)//发送完毕了 { channel_->disableWriting();//发送完成后就不需要监听可写事件了,避免无意义的触发 if(writeCompleteCallback_)// 如果用户设置了 “发送完成” 的回调函数 { //唤醒loop_对应的thread线程,执行回调 loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));// 异步地把回调任务加入 loop_ 的事件循环中执行(线程安全,延迟执行) } if(state_==KDisconnecting)//正关闭连接 ,当连接正处于 “等待关闭”状态(KDisconnecting),而发送缓冲区的数据刚好已经全部发送完毕了(即 outputBuffer_ 为空),那么现在就可以 真正地关闭连接的写端(shutdown()) { shutdownInLoop(); } } } else { LOG_ERROR("Tcpconnection::handleWrite"); } } else { LOG_ERROR("Tcp connection fd=%d is down,no more writing\n",channel_->fd()); } } //为什么 写完成回调(writeCompleteCallback_) 要通过 loop_->queueInLoop(...) 执行 而 消息到达(即读事件触发) 的回调函数 却可以直接调用,例如 messageCallback_(...)? //只要你不是在 I/O 线程中(subloop)直接调用用户回调,就必须使用 queueInLoop() 保证线程安全。 //写完成回调可能发生在其他线程发起的 send() 中(跨线程发起),为了线程安全必须 queueInLoop()。 void Tcpconnection::handleClose()// TCP 连接被动关闭(对端关闭连接)的场景,客户端主动关闭连接的时候,服务器端会调用这个方法 { LOG_INFO("fd=%d state=%d \n",channel_->fd(),state_.load());//录当前连接所属的 fd 和连接状态(state_) setState(KDisconnected);//将连接状态置为 KDisconnected,表示连接已经断开。 channel_->disableAll();//将与这个连接关联的 Channel 上的所有事件(如 EPOLLIN, EPOLLOUT)从 epoll 中注销。(封装connfd的channel) TcpconnectionPtr connPtr(shared_from_this());//创建 shared_ptr 智能指针 connPtr 指向当前 TcpConnection 对象,延长其生命周期直到当前函数结束,防止回调函数中出现悬垂指针。 connectionCallback_(connPtr);/调用用户设置的 ConnectionCallback,通知业务层连接状态发生变化(断开)。 usercloseCallback_(connPtr);// 执行关闭回调(CloseCallback) tcpserver::removeConnection(tcpserver设置的) } void Tcpconnection::handleError() { int optval;// 定义一个整型变量 optval,用来临时存储通过 getsockopt() 获取的 socket 错误代码。 socklen_t optlen=sizeof optval; int err=0;//定义错误码变量 err /** * 调用 getsockopt() 获取 socket 错误状态 channel_->fd():获取当前连接的 socket 文件描述符。 SOL_SOCKET:表示操作 socket 层级。 SO_ERROR:指定获取 socket 的错误码。 optval:用于接收返回的错误码。 optlen:输入输出参数,指明 optval 的长度。 */ if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen)<0) { err=errno;//系统调用失败时获取 errno } else { err=optval;// 系统调用成功时,错误码在 optval 中 } LOG_ERROR("TcpConnection::handleError name:%s -SO_ERROR:%d \n",name_.c_str(),err); } void Tcpconnection::send(const std::string &buf)// 线程安全发送数据接口 { if(state_==KConnected)//只在连接建立完成后(已连接)**才允许发送数据,防止对已关闭或尚未建立连接的 socket 写入 { if(loop_->isInLoopThread())//检查当前线程是否是此 TcpConnection 对象所属的 subLoop 线程(I/O线程)。 //如果是本线程,就直接调用 sendInLoop(),避免不必要的回调提交,提高效率 { sendInLoop(buf.c_str(),buf.size());//buf.c_str(),获取 buf 字符串内部的 const char* 指针,如果当前就是 EventLoop 所在线程,直接调用 sendInLoop 函数发送数据。 } else//如果是其他线程调用 send()(比如业务线程),就通过 runInLoop() 提交任务给这个subLoop。runInLoop() 会将这个绑定好的任务添加到 subLoop 的 pendingFunctors_ 中,然后唤醒 epoll_wait() 的阻塞,让 loop 线程在下一次循环中调用 sendInLoop()。 { loop_->runInLoop(std::bind(&Tcpconnection::sendInLoop,this,buf.c_str(),buf.size())); } /** * 通过 runInLoop 把任务 封装成回调,提交给此subLoop 线程执行。 */ } } void Tcpconnection::sendInLoop(const void*data,size_t len)//data: 要发送的数据指针。len: 要发送的数据长度 防止使用已关闭连接发送数据,保护资源、避免异常。 { ssize_t nwrote=0;//实际写入 socket 的字节数 size_t remaining=len;//剩余还未发送的字节数(初始为总长度) bool faultError=false;//标记是否发生致命错误(比如 EPIPE 或 ECONNRESET)。 //之前调用过connection的shutdown了,不能再进行发送了 if(state_==KDisconnected)//如果连接已经断开,记录日志并直接返回。防止已关闭连接还尝试发送数据。 { LOG_ERROR("disconnected,give up writing!"); return; } /** * 如果当前 channel 没有监听可写事件,且输出缓冲区中没有残留数据,说明之前没有等待发送的数据。 此时尝试直接向 socket 写数据,提高性能(避免不必要的缓冲和事件驱动)。 */ if(!channel_->isWriting()&&outputBuffer_.readableBytes()==0) { nwrote=::write(channel_->fd(),data,len);//如果缓冲区干净,先尝试直接写到 socket(避免无谓拷贝,性能更好)。 } if(nwrote>=0)//如果写成功(大于等于0): { remaining=len-nwrote;//剩余未写完的字节数。。 if(remaining==0&&writeCompleteCallback_)//如果一次性写完了,并且设置了写完成回调(例如触发业务逻辑),通过 queueInLoop 异步执行回调,保持线程安全。 { //既然在这里数据全部发送完成, 这里不需要注册写事件,因为数据已经写完 了 loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this())); } } else// write失败 { nwrote=0;//将写入量重置为 0(防止后续数据指针偏移出错)。 if(errno!=EWOULDBLOCK)//如果错误不是 EWOULDBLOCK(写缓冲区满,但不是致命错误),记录错误日志。 { LOG_ERROR("TcpConnection::sendInLoop"); if(errno==EPIPE||errno==ECONNRESET)//若是 EPIPE(写端已关闭)或 ECONNRESET(连接重置),标记为致命错误。 { faultError=true; } } } if(!faultError&&remaining>0)//如果没有严重错误,且还有剩余数据需要处理,一次没写完 //说明当前这一次write没有把数据全部发送出去,剩余的数据需要保存到缓冲区中,然后给channel //注册epollout可写事件,poller发现tcp的发送缓冲区(可写)有空间,会通知相应的channel,调用handlewrite回调方法 //最终调用Tcpconnection::handwrite方法,把发送缓冲区中的数据全部发送完成 { size_t oldlen=outputBuffer_.readableBytes();//目前发送缓冲区剩余的未发送的数据 if(oldlen+remaining>=highWaterMark_&& oldlen<highWaterMark_&& highWaterMarkCallback_)//如果发送缓冲区剩余的未发送的数据与现有未发送数据的总量超过高水位阈值,且之前没超过,就触发高水位回调,用于限流或提示(例如:暂停写入)。 { loop_->queueInLoop(std::bind(highWaterMarkCallback_,shared_from_this(),oldlen+remaining)); }) outputBuffer_.append((char*)data+nwrote,remaining);//将未写完的数据追加到 outputBuffer_ 中(注意数据偏移) if(!channel_->isWriting())//如果当前 channel 没有监听写事件,则调用 enableWriting() 开启 EPOLLOUT,让 Poller 监听 socket 可写事件。 { channel_->enableWriting();//当 socket 可写时,会触发 handleWrite(),它会继续将 outputBuffer_ 中数据发送出去。 } } } 整个流程就是:如果outbuffer以前没有写入的数据还没发送而且tcpconnection的connfd的channel没有监听可写事件,直接往connfd写,如果写成功了: 1.一次性写完了,并且注册了写完成回调,执行写完成回调; 2.一次性没写完,就需要把没写完的数据放到outbuffer,这个时候,outbuffer里 oldlen=0,直接看remaining(未写完的数据),判断一下是否超过outputBuffer的水位线,如果超过,执行回调,然后把remaining(未写完的数据)写入outbuffer,并且设置监听connfd可写,一旦poller通知了,直接把outputBuffer里的数据写入connfd 如果outbuffer以前有写入的数据还没发送,就不会,直接往connfd写,跳转到 if(!faultError&&remaining>0),先发送以前写入的数据,在发送后来的 //连接建立了 void Tcpconnection::connectEstablished() { setState(KConnected);//将当前 TcpConnection 状态设置为“已连接”状态 channel_->tie(shared_from_this());//将当前 TcpConnection(用 shared_from_this() 获取 shared_ptr<TcpConnection>)绑定给其内部的 Channel,确保 当 Channel 处理事件时 TcpConnection 仍然存在。 channel_->enableReading();//像poller注册connfd的channel的可读事件((比如客户端发来数据)),会触发handleread处理可读事件 // 通知上层用户“连接已建立” connectionCallback_(shared_from_this()); } //连接销毁 void Tcpconnection::connectDestroyed()//:连接销毁阶段。它的作用是从事件循环中安全地注销该连接,并通知上层逻辑“连接已关闭”。 { if(state_==KConnected)//只有当连接处于“已连接”状态时才进行状态转换与回调。 { setState(KDisconnected);//将状态从 KConnected 设置为 KDisconnected channel_->disableAll();//把connfd->channel的所有感兴趣的事件,从poller中del掉 connectionCallback_(shared_from_this());//即使是“销毁连接”,也需要通知上层用户(回调层):,这样用户就可以记录日志或清理业务状态,例如从在线连接表中移除: } channel_->remove();//把connfd->channel从poller中删除掉,注销,不在监听任何事件 } //关闭连接,优雅关闭:在数据发送完后再关闭连接,不丢数据;要关闭的是 写端(shutdown(SHUT_WR)),不是直接 close(fd)。 void Tcpconnection::shutdown() { if(state_==KConnected)//只在连接处于 KConnected 状态时允许关闭; { setState(KDisconnecting);//这是为了标记当前连接已经处于“准备关闭”的阶段 loop_->runInLoop(std::bind(&Tcpconnection::shutdownInLoop,this));//线程安全 } } void Tcpconnection::shutdownInLoop() { if(!channel_->isWriting())//当前outputBuffer中的数据已经全部发送完成(从handlewrite可以看出来) { socket_->showdownWrite();//connfd->关闭写端,connfd表示我不会再写数据到缓冲区,但是可以继续读取 } }
Tcpconnection就是封装了connfd(连接套接字)并且通过每条Tcpconnection所在的subloop创建了自己的channel事件通道,并且持有发送缓冲区outbuffer和接收缓冲区inbuffer,其实有一个 connectionCallback_回调给上层,然后管理这条链接上的读,写,关闭,错误等事件,这个channel向对应的subloop管理的poller上注册,可读,可写,连接关闭,错误等事件,如果connfd上发生了。就会出发相应的回调:
connfd可读事件:客户端有数据过来了,可以读到服务器的inbuffer缓冲区;
connfd可写事件:服务器的outbuffer数据可以向connfd写数据了,发送给客户端;send是写到connfd或者发送到缓冲区
connfd关闭事件:客户端主动关闭连接的时候,服务器端会调用这个方法
connfd错误事件:连接发生错误;