TcpServer統領之前所有的類,是用戶直接使用的類。它通過ThreadPool管理所有的loopthread,保存所有的TcpConnection,保存用戶提供的各種回調函數并向TcpConnection的Channel中注冊回調。它負責監聽指定的端口,并接受來自客戶端的連接請求,為每個連接創建一個TcpConnection對象進行管理。它的業務邏輯較先前的三大組件來說,還是比較簡明的。
主要成員變量
EventLoop* loop_
:指向事件循環的指針,這是由用戶創建并傳入的,是mainLoop。事件循環是Muduo網絡庫的核心組件之一,負責監聽文件描述符上的事件(如可讀、可寫、錯誤等),并調度相應的事件處理函數。InetAddress listenAddr_
:表示服務器監聽的地址和端口。Acceptor acceptor_
:Acceptor
對象用于監聽指定的端口,并接受來自客戶端的連接請求。當有新的連接請求到達時,Acceptor
會調用相應的回調函數進行處理。- 回調函數:
TcpServer
類允許用戶注冊多個回調函數,以便在特定事件發生時執行自定義邏輯。這些回調函數包括新連接到來時的回調、連接關閉時的回調等。
主要功能
- 監聽端口:
TcpServer
類通過Acceptor
對象監聽指定的端口,等待客戶端的連接請求。當有新的連接請求到達時,Acceptor
會觸發相應的事件,通知TcpServer
進行處理。 - 接受連接:當
Acceptor
接收到客戶端的連接請求時上報,TcpServer
會創建一個新的TcpConnection
對象來表示這個連接,并分配subLoop將其與客戶端進行關聯。同時,TcpServer
會將新創建的TcpConnection
對象添加到內部的管理列表中,以便后續進行管理和操作。 - 管理連接:
TcpServer
類負責管理與其關聯的所有TcpConnection
對象。這包括連接的建立、保持和關閉等操作。當客戶端斷開連接時,TcpServer
會從管理列表中移除相應的TcpConnection
對象,并釋放相關資源。 - 事件處理:當與
TcpServer
關聯的事件發生時(如新連接到來、連接關閉等),TcpServer
會調用相應的回調函數進行處理。這些回調函數可以是Muduo網絡庫預定義的,也可以是用戶自定義的。通過回調函數,用戶可以編寫自己的業務邏輯來處理各種事件。 - 線程安全:由于Muduo網絡庫是多線程的,因此
TcpServer
類需要是線程安全的。這意味著在多線程環境下,多個線程可以同時訪問同一個TcpServer
對象,而不會導致數據競爭或其他并發問題。Muduo網絡庫通過EventLoopThreadPool來管理所有EventLoopThread,使用互斥鎖和信號量等同步機制來保證TcpServer
類的線程安全性。
使用方式
用戶通常只需要創建一個TcpServer
對象,并設置相應的監聽地址和端口,然后調用其start()
方法來啟動服務器。在服務器運行過程中,用戶可以通過注冊回調函數來處理各種事件。當有新的連接請求到達時,Muduo網絡庫會自動為每個連接創建一個TcpConnection
對象,并將其與TcpServer
進行關聯。用戶可以通過訪問與TcpServer
關聯的TcpConnection
對象來與客戶端進行通信和交互。當客戶端斷開連接時,Muduo網絡庫會自動從管理列表中移除相應的TcpConnection
對象,并釋放相關資源。
源碼
TcpServer.h
#pragma once#include "noncopyable.h"
#include "LogStream.h"
#include "EventLoop.h"
#include "EventLoopThreadPool.h"
#include "Acceptor.h"
#include "Callbacks.h"
#include "InetAddress.h"
#include "Buffer.h"
#include "Timestamp.h"
#include "TcpConnection.h"#include <functional>
#include <string>
#include <memory>
#include <unordered_map>
#include <atomic>// 對外的服務器編程使用的類
class TcpServer : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>;enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop *loop,const InetAddress &listenAddr,const std::string &nameArg,Option option = kNoReusePort);~TcpServer();const std::string &ipPort() const { return ipPort_; }const std::string &name() const { return name_; }EventLoop *getLoop() const { return loop_; }void setThreadNum(int numThreads);//std::shared_ptr<EventLoopThreadPool> threadPool(){return threadPool_;}/// Starts the server if it's not listening.void start();void setThreadInitCallback(const ThreadInitCallback &cb){threadInitCallback_ = cb;}void setConnectionCallback(const ConnectionCallback &cb){connectionCallback_ = cb;}void setMessageCallback(const MessageCallback &cb){messageCallback_ = cb;}void setWriteCompleteCallback(const WriteCompleteCallback &cb){writeCompleteCallback_ = cb;}private:void newConnection(int sockfd, const InetAddress &peerAddr);void removeConnection(const TcpConnectionPtr &conn);void removeConnectionInLoop(const TcpConnectionPtr &conn);using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;EventLoop *loop_; // the acceptor loop, 用戶定義的loopconst std::string ipPort_;const std::string name_;std::unique_ptr<Acceptor> acceptor_; // 運行在mainLoop,監聽新連接事件std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop pre threadConnectionCallback connectionCallback_; // 有新鏈接時的回調MessageCallback messageCallback_; // 有讀寫消息時的回調WriteCompleteCallback writeCompleteCallback_; // 消息發送完成以后的回調ThreadInitCallback threadInitCallback_; // loop線程初始化的回調std::atomic_int started_;int nextConnId_;ConnectionMap connections_; // 保存所有的連接
};
TcpServer.cc
#include "TcpServer.h"#include <sys/socket.h>
#include <string.h>static EventLoop *checkLoopNotNull(EventLoop *loop)
{if (loop == nullptr){LOG_FATAL << "mainLoop is null!";}return loop;
}TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg, Option option): loop_(checkLoopNotNull(loop)),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)), // create sokect, listenthreadPool_(new EventLoopThreadPool(loop, name_)),connectionCallback_(),messageCallback_(),nextConnId_(1),started_(0)
{// 當有新用戶鏈接時,將執行TcpServer::newConnection回調:// 根據輪訓算法選擇一個subLoop并喚醒,把當前connfd封裝成channel分發給subloopacceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));
}TcpServer::~TcpServer()
{LOG_INFO << "TcpServer::~TcpServer [" << name_ << "] destructing";for (auto &item : connections_){// 強智能指針不會再指向原來的對象,放手了,出了作用域,可以自動釋放資源TcpConnectionPtr conn(item.second);item.second.reset(); // 銷毀連接conn->getLoop()->runInLoop(std::bind(&TcpConnection::connectDestroyed, conn));}
}void TcpServer::setThreadNum(int numThreads)
{threadPool_->setThreadNum(numThreads);
}void TcpServer::start()
{LOG_DEBUG << "TcpServer::start() started_ = " << started_ << "loop_=" << loop_;if (started_++ == 0)// 防止一個sever對象被start多次{ threadPool_->start(threadInitCallback_); // 啟動底層的線程池loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get())); // 注冊AcceptorChannel到baseLoopLOG_DEBUG << "TcpServer::start success!";}
}// 有新客戶端連接,運行在主線程mainLoop的acceptor會執行這個回調操作
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{// 輪詢選擇一個subloop(子線程),正在epoll上阻塞,需要queueInLoopEventLoop *ioLoop = threadPool_->getNextLoop();char buf[64] = {0};snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;std::string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();// 通過sockfd獲取其綁定的本機的ip地址和端口信息sockaddr_in localaddr;memset(&localaddr, 0, sizeof localaddr);// ::bzero(&localaddr, sizeof localaddr);socklen_t addrlen = sizeof localaddr;if (::getsockname(sockfd, (sockaddr *)&localaddr, &addrlen) < 0){LOG_ERROR << "sockets::getLocalAddr error, errno=" << errno;}InetAddress localAddr(localaddr);// 根據連接成功的sockfd,創建TcpConnectionTcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));connections_[connName] = conn;// 下面的回調都是用戶來設置給TcpServer=》TcpConnection=》Channel注冊=》Poller=》notify Channel回調conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));// 直接調用TcpConnection::connectEstablished// 本函數運行在baseLoop,而ioloop在子線程阻塞在epoll_wait,// 此時在baseLoop中進行函數調用runInLoop、queueInLoop進而保存到ioloop的成員變量pendingFunctors_,// 進而一直在baseLoop中調用到ioloop的weakup函數,向ioloop的wakeupfd中寫一個數據// 在子線程中的epoll_wait監聽到寫事件,就可以返回,才能夠執行這里的回調函數ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}void TcpServer::removeConnection(const TcpConnectionPtr &conn)
{loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn)
{LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_<< "] - connection " << conn->name();connections_.erase(conn->name());EventLoop *ioLoop = conn->getLoop(); // 獲取conn所在的loop ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}