muduo源碼解析

?1.對類進行禁止拷貝

class noncopyable
{public:noncopyable(const noncopyable&) = delete;void operator=(const noncopyable&) = delete;protected:noncopyable() = default;~noncopyable() = default;
};

2.日志

使用枚舉定義日志等級

enum LogLevel{TRACE,DEBUG,INFO,WARN,ERROR,FATAL,NUM_LOG_LEVELS,};

1.獲取日志實例對象

Logger& instance();

2.設置日志級別

void setLogLevel(int level);

3.寫日志

void log(std::string& msg);
#define LOG_info(fmt, ...) \do { \fprintf(stderr, "[INFO] %s:%d: " fmt "\n", \__FILE__, __LINE__, ##__VA_ARGS__); \} while(0)

1.C/C++ 的預處理器規定,一個完整的宏定義必須位于同一邏輯行。但為了代碼可讀性,需要用?\?顯式聲明換行。

2.為什么用?do { ... } while(0),避免宏展開后的語法問題

3.?LOG_info(fmt, ...)

  • fmt:格式化字符串參數(如?"Value: %d"

  • ...:可變參數(變長參數列表),表示可以傳遞任意數量的額外參數。

  • ##__VA_ARGS__:GNU 擴展語法,處理可變參數

#ifndef LOG_H
#define LOG_H#include <cstdio>
#include <ctime>
#include <cstdarg>// 定義日志級別
enum LogLevel
{TRACE = 0,DEBUG,INFO,WARN,ERROR,FATAL,NUM_LOG_LEVELS
};// 設置默認日志級別
extern LogLevel g_logLevel;  // 在別的地方定義這個全局變量// 宏定義日志級別
#define LOG_TRACE(fmt, ...) \do { \if (g_logLevel <= TRACE) log_message("TRACE", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_DEBUG(fmt, ...) \do { \if (g_logLevel <= DEBUG) log_message("DEBUG", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_INFO(fmt, ...) \do { \if (g_logLevel <= INFO) log_message("INFO", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_WARN(fmt, ...) \do { \if (g_logLevel <= WARN) log_message("WARN", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_ERROR(fmt, ...) \do { \if (g_logLevel <= ERROR) log_message("ERROR", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_FATAL(fmt, ...) \do { \if (g_logLevel <= FATAL) log_message("FATAL", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)inline void log_message(const char* level, const char* file, int line, const char* fmt, ...)
{// 獲取當前時間char timeBuf[20];std::time_t now = std::time(nullptr);std::strftime(timeBuf, sizeof(timeBuf), "%Y-%m-%d %H:%M:%S", std::localtime(&now));// 打印日志fprintf(stderr, "[%s] [%s] %s:%d: ", timeBuf, level, file, line);va_list args;va_start(args, fmt);vfprintf(stderr, fmt, args);va_end(args);fprintf(stderr, "\n");
}#endif // LOG_H

?3.TimeStamp

#include <chrono>
#include <string>
#include <ctime>
#include <iomanip>
#include <sstream>class Timestamp {
public:// 構造函數Timestamp() : tp_(std::chrono::system_clock::now()) {}// 從時間點構造explicit Timestamp(std::chrono::system_clock::time_point tp) : tp_(tp) {}// 獲取當前時間戳 (靜態工廠方法)static Timestamp now() {return Timestamp();}// 轉換為字符串 (默認格式)std::string to_string() const {return format("%Y-%m-%d %H:%M:%S");}   private:std::chrono::system_clock::time_point tp_;
};

4.InetAddress

# include<netinet/in.h> // 定義了Internet地址族// 封裝socket地址
class InetAdress{
public:explicit InetAddress(std::string& ip, uint16_t port, bool ipv6 = false);explicit InetAddress(const sockaddr_in &addr): addr_(addr);std::string toIp() const;std::string toIpPort() const;uint16_t port() const;
private:socketaddr_in addr_;
}

struct sockaddr_in {short            sin_family;   // 地址族unsigned short   sin_port;     // 端口號struct in_addr   sin_addr;     // IP 地址char             sin_zero[8];  // 填充字節,用于對齊
};
InetAddress::InetAddress(StringArg ip, uint16_t port, bool ipv6)
{if (ipv6 || strchr(ip.c_str(), ':')){// ...}else{memset(&addr_, 0, sizeof addr_);addr_->sin_family = AF_INET;addr_->sin_port = htons(port);struct in_addr addr;if (inet_pton(AF_INET, ip, &addr) <= 0) {printf("Invalid IPv4 address\n");} else {printf("IPv4 address converted successfully\n");}addr_->sin_addr = addr;}
}string InetAddress::toIpPort() const
{char buf[64] = "";inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);size_t end  = strlen(buf);uint16_t port = ntohs(addr_.sin_port);sprintf(buf + end, ":%u", port);return buf;
}

5.EventLoop

1.獲取和管理線程的標識(TID)、線程名稱、線程堆棧信息

namespace muduo
{
namespace CurrentThread
{// internalextern __thread int t_cachedTid;extern __thread char t_tidString[32];extern __thread int t_tidStringLength;extern __thread const char* t_threadName;void cacheTid();inline int tid(){if (__builtin_expect(t_cachedTid == 0, 0)){cacheTid();}return t_cachedTid;}inline const char* tidString() // for logging{return t_tidString;}inline int tidStringLength() // for logging{return t_tidStringLength;}inline const char* name(){return t_threadName;}bool isMainThread();void sleepUsec(int64_t usec);  // for testingstring stackTrace(bool demangle);
}  // namespace CurrentThread
}  // namespace muduo
  • extern 關鍵字用于聲明變量或函數的存儲位置?
  • __thread 是 GCC/Clang 提供的關鍵字,用于聲明線程局部存儲變量(Thread-Local Storage, TLS)。每個線程都有自己獨立的變量副本。?
  • __builtin_expect 是 GCC/Clang 提供的內置函數,用于向編譯器提供分支預測的提示?
static_cast<pid_t>(::syscall(SYS_gettid));

?2.EventLoop:

1. EventLoop 干什么?(職責)

EventLoop事件循環的核心類,負責管理和調度 I/O 事件和異步任務。
主要職責:

  1. 事件循環管理: 不斷循環等待和處理事件。

  2. 事件分發: 將活躍事件交給合適的回調處理。

  3. 任務調度: 支持在事件循環線程內安全地執行異步任務。

  4. 線程喚醒: 當其他線程提交任務時能夠喚醒阻塞的事件循環。

class EvenLoop : noncopyable{
public:using Functor = std::function<viod()>;EvenLoop();~EvenLoop();void loop();void quit();//狀態查詢Timestamp pollReturnTime() const { return pollReturnTime_; }//回調管理void runInLoop(Functor cb);// 在事件循環線程中立即執行回調 cbvoid queueInLoop(Functor cb);//將回調 cb 排隊,稍后在事件循環線程中執行size_t queueSize() const;//通道管理void wakeup();void updateChannel(Channel* channel);void removeChannel(Channel* channel);bool hasChannel(Channel* channel);//線程檢查bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }private:using ChannelList = std::vector<Channel*>;bool looping_;/* atomic *///是否正在進行事件循環std::atomic_bool quit_;//是否退出時間循環bool eventHandling_; /* atomic */ //表示是否正在處理事件bool callingPendingFunctors_; /* atomic *///表示是否正在執行待處理回調const pid_t threadId_;//當前循環tid// PollerTimestamp pollReturnTime_;//Poller返回時間std::unique_ptr<Poller> poller_;//Eventloop管理的PollerChannelList activeChannels_;// 存儲當前Poller 返回的活動通道列表// Channelint wakeupFd_;//eventfd,用于跨線程喚醒事件循環std::unique_ptr<Channel> wakeupChannel_;//封裝 wakeupFd_,綁定到 Poller(如 epoll),監控讀事件mutable MutexLock mutex_;std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);// 存儲其他線程提交的任務void doPendingFunctors();
}

1.在 muduo 網絡庫中,EventLoop?類的構造函數初始化了一系列?bool?類型的標志變量(如?looping_quit_eventHandling_callingPendingFunctors_),這些變量用于 ??精確控制事件循環的狀態和線程安全??

looping_?的限制?:??禁止重復啟動事件循環?

??eventHandling_的限制:不能在處理事件時析構channel

callingPendingFunctors_?:確保跨線程任務提交或事件循環忙時能夠及時喚醒?EventLoop

if (!isInLoopThread() || callingPendingFunctors_){wakeup();}

?在 Linux 系統中,獲取當前線程的唯一 ID(TID)可以通過?syscall(SYS_gettid)?

//防止一個線程創建多個EventLoop
__thread EventLoop* t_loopInThisThread = 0;

2.在 Linux 系統中,::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)?用于創建一個 ??事件文件描述符(eventfd)??,通常用于線程間或進程間的異步事件通知(例如喚醒事件循環)?

3. std::atomic_bool 是 C++ 標準庫(<atomic> 頭文件)提供的一種原子類型,專門用于布爾值的原子操作。它是 std::atomic<bool> 的特化版本,允許在多線程環境中安全地讀寫布爾值,而無需顯式使用鎖(如 std::mutex)。std::atomic_bool 確保操作的原子性,避免數據競爭(data race),適用于需要高效、無鎖同步的場景。?

4.assertInLoopThread();當前線程是否是loop所在線程

  • EventLoop::loop()

  • EventLoop::updateChannel(Channel* channel)

  • EventLoop::removeChannel(Channel* channel)

  • bool EventLoop::hasChannel(Channel* channel)

  • void TcpConnection::sendInLoop(const void* data, size_t len)

  • void TcpConnection::shutdownInLoop()

  • void TcpConnection::startReadInLoop()?void TcpConnection::stopReadInLoop()

  • void TcpConnection::connectEstablished() void TcpConnection::connectDestroyed()

  • void TcpConnection::handleRead(Timestamp receiveTime)

  • void TcpConnection::handleWrite()

EventLoop::updateChannel(Channel* channel)|
poller_->updateChannel(channel);

對channel進行操作確保: 當前線程是loop所在線程,當前線程是channel所在線程

void EventLoop::loop()
{looping_ = true;quit_ = false;while (!quit_){activeChannels_.clear();// 等待事件發生poller_->poll(kPollTimeMs, &activeChannels_);// 遍歷所有活躍通道,逐個調用其事件處理函數for (Channel* channel : activeChannels_){channel->handleEvent();}// 執行異步任務doPendingFunctors();}looping_ = false;
}

void EventLoop::doPendingFunctors()
{std::vector<Functor> functors;           // 定義一個局部任務列表callingPendingFunctors_ = true;          // 標記正在執行任務{MutexLockGuard lock(mutex_);         // 上鎖,保護任務隊列functors.swap(pendingFunctors_);     // 將任務隊列中的任務移到本地變量}// 逐個執行任務for (const Functor& functor : functors){functor();                          // 執行任務回調}callingPendingFunctors_ = false;         // 執行完畢,重置標記
}

?用 swap()任務隊列局部變量互換,而不是逐個取出任務:降低鎖競爭,只加鎖一次,把所有任務交換到局部變量中,后續執行不需要加鎖。

2. 為什么需要任務隊列?
特性handleEvent()doPendingFunctors()
作用處理I/O事件處理異步任務
觸發時機poll()返回后,有活躍事件時觸發每輪事件循環末尾,I/O事件處理完畢后
優先級高,實時性要求高低,非實時任務
來源網絡 I/O、定時器、信號等事件其他線程提交的任務、延遲任務
典型場景讀寫網絡數據、關閉連接線程間任務提交、定時任務、異步回調
線程安全性需要確保 Channel 所在線程安全通過加鎖保證線程安全
執行頻率每個活躍事件都調用每次事件循環結束后調用一次

在高性能服務器中,muduo 采用單線程 Reactor 模式,一個 EventLoop 對象只能在一個線程中運行。
但是,多個線程可能需要向同一個事件循環線程提交任務,比如:

  1. 工作線程向主線程提交任務。

  2. 異步回調需要在事件循環線程中執行。

  3. 事件處理函數中異步添加任務。

?當需要將任務提交到事件循環時,調用 queueInLoop()

?
void EventLoop::runInLoop(Functor cb)
{if (isInLoopThread()){cb();}else{queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(Functor cb)
{{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
}?

?callingPendingFunctors_?:

  • 如果當前正在執行回調隊列中的回調函數(callingPendingFunctors_true),并且有新的回調函數被加入到隊列中,需要通過 wakeup() 方法喚醒事件循環線程,以確保新的回調函數能夠被處理。

  • 如果不檢查 callingPendingFunctors_,可能會導致事件循環線程被重復喚醒,從而增加不必要的開銷

int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_SYSERR << "Failed in eventfd";abort();}return evtfd;
}

?eventfd 文件描述符(FD),這是一個 Linux 特定的機制,用于線程間通信或事件通知。eventfd 常用于高效的無鎖同步。

EventLoop::EventLoop(): looping_(false),quit_(false),eventHandling_(false),callingPendingFunctors_(false),iteration_(0),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(this)),timerQueue_(new TimerQueue(this)),wakeupFd_(createEventfd()),wakeupChannel_(new Channel(this, wakeupFd_)),currentActiveChannel_(NULL)
{LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));// we are always reading the wakeupfdwakeupChannel_->enableReading();
}
void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}

事件循環線程持有 wakeupFd_,并監聽其讀端。當其他線程調用 wakeup() 方法時,事件循環線程會從阻塞狀態中喚醒。

  • 交換后,pendingFunctors_?變為空向量,新任務可繼續寫入。

  • 局部變量?functors?在棧上析構時自動清理已執行的任務

MutexLockGuard 是一個 RAII(Resource Acquisition Is Initialization,資源獲取即初始化)風格的鎖管理器。它的作用是確保在作用域結束時自動釋放鎖,從而避免忘記手動釋放鎖導致的死鎖問題。?

RAII 的核心思想是將資源的獲取(如分配內存、打開文件、鎖定互斥鎖等)與對象的構造綁定在一起,將資源的釋放與對象的析構綁定在一起,確保資源在對象生命周期結束時自動釋放,從而避免資源泄漏和其他資源管理錯誤。

3.Thread

封裝了對 POSIX 線程(pthread)的創建、管理和操作

class Thread : nocopyable{
public:using ThreadFunc = std::function<void()>;explicit Thread(ThreadFunc, const std::string &name = string());~Thread();void start();void join();bool started() const { return started_; }pthread_t pthreadId() const { return pthreadId_; }pid_t tid() const { return tid_; }const string& name() const { return name_; }
pruvate:bool       started_;bool       joined_;std::shared_ptr<std::thread> thread_;pid_t      tid_;ThreadFunc func_;string     name_;static std::atomic_int32 numCreated;
}
void Thread::start() {assert(!started_);started_ = true;if (pthread_create(&pthreadId_, nullptr, &startThread, this) != 0) {started_ = false;throw std::runtime_error("Failed to create thread");}latch_.wait();  // 等待線程真正啟動
}void* startThread(void* obj)
{ThreadData* data = static_cast<ThreadData*>(obj);data->runInThread();delete data;return NULL;
}
int Thread::join() {assert(started_);assert(!joined_);joined_ = true;return pthread_join(pthreadId_, nullptr);
}

?4.EventLoopThread:它將事件循環和線程進行綁定,用于在一個獨立線程中運行 EventLoop,從而避免在主線程中阻塞 I/O 事件處理

class EventLoopThread : noncopable(){
public:using ThreadInitCallback = std::functional<void(EventLoop*)>;EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),const string& name = string());~EventLoopThread();EventLoop* startLoop();private:EventLoop* loop_;               // 事件循環指針bool exiting_;                  // 標志退出Thread thread_;                 // 封裝的線程對象std::mutex mutex_;               // 互斥鎖,保證線程安全Condition cond_;                // 條件變量,同步線程啟}
EventLoop* EventLoopThread::startLoop() {assert(!thread_.started());thread_.start();  // 啟動線程EventLoop* loop = nullptr;{MutexLockGuard lock(mutex_);while (loop_ == nullptr) {  // 等待子線程初始化cond_.wait();}loop = loop_;}return loop;
}

?為什么要在循環中檢查?

(1)條件變量的常見問題:

  1. 虛假喚醒(Spurious Wakeup):

    • 條件變量在沒有通知的情況下,wait() 可能意外返回

    • 這通常由操作系統調度中斷信號引起。

    • 如果僅使用 cond_.wait();,即使條件未滿足,程序也可能繼續執行,導致邏輯錯誤

  2. 多線程競爭:

    • 可能存在多個線程等待同一條件,一旦被喚醒,不一定是期望的線程

    • 沒有循環檢查,程序容易進入非預期狀態

void EventLoopThread::threadFunc() {EventLoop loop;  // 創建事件循環對象{MutexLockGuard lock(mutex_);loop_ = &loop;       // 將loop_指向剛創建的EventLoop對象cond_.notify();      // 通知主線程:loop_已初始化}loop.loop();  // 啟動事件循環
}

為什么要使用條件變量?

  • EventLoop 對象是在子線程中創建的,但主線程需要獲取它的指針。

  • 如果沒有同步機制,主線程有可能在子線程創建之前就訪問 loop_,導致空指針

  • 通過互斥鎖和條件變量,讓主線程在 loop_ 初始化之前阻塞等待,直到子線程創建完成并通知。

?5.EventLoopThreadPool:單個線程的事件循環(EventLoop無法應對大量并發請求。
為了提升性能,通常使用
多線程事件循環
,即線程池

  • 主線程(I/O 線程): 負責監聽新連接。

  • 工作線程(計算/事件線程): 負責數據讀寫和計算。

    class EventLoopThreadPool : noncopyable
    {public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);~EventLoopThreadPool();void setThreadNum(int numThreads) { numThreads_ = numThreads; }void start(const ThreadInitCallback& cb = ThreadInitCallback());/// round-robinEventLoop* getNextLoop();/// with the same hash code, it will always return the same EventLoopEventLoop* getLoopForHash(size_t hashCode);std::vector<EventLoop*> getAllLoops();bool started() const{ return started_; }const string& name() const{ return name_; }private:EventLoop* baseLoop_;                           // 主線程中的EventLoopbool started_;                                  // 是否啟動int numThreads_;                                // 線程數量int next_;                                      // 下一個分配的EventLoop下標std::vector<std::unique_ptr<EventLoopThread>> threads_;  // 線程列表std::vector<EventLoop*> loops_;                  // 事件循環列表};
    //啟動線程池
    void EventLoopThreadPool::start() {assert(!started_);baseLoop_->assertInLoopThread();started_ = true;for (int i = 0; i < numThreads_; ++i) {auto thread = std::make_unique<EventLoopThread>();loops_.push_back(thread->startLoop());  // 啟動并獲取事件循環threads_.push_back(std::move(thread));  // 存儲線程對象}// 如果沒有設置線程,主線程直接作為唯一的EventLoopif (numThreads_ == 0) {loops_.push_back(baseLoop_);}
    }
    
    //獲取下一個事件循環
    EventLoop* EventLoopThreadPool::getNextLoop() {baseLoop_->assertInLoopThread();EventLoop* loop = baseLoop_;if (!loops_.empty()) {loop = loops_[next_];next_ = (next_ + 1) % loops_.size();  // 輪詢選擇下一個Loop}return loop;
    }
    

6.channel??

作為 Reactor 模式中的事件處理器,封裝了文件描述符的事件管理和回調處理

1.關鍵的成員變量

變量類型作用
fd_const int管理的文件描述符(socket/eventfd/timerfd 等),Channel 只管理但不擁有它
events_int關注的事件類型EPOLLIN/EPOLLOUT?等),通過?enableReading()/enableWriting()?設置
revents_int實際觸發的事件類型(由 Poller/Epoll 返回)
loop_EventLoop*所屬的事件循環,所有操作必須在對應 EventLoop 線程中執行
readCallback_ReadEventCallback讀事件回調(帶時間戳參數,如收到數據時調用)
writeCallback_EventCallback寫事件回調(如可寫時調用)
closeCallback_EventCallback連接關閉回調
errorCallback_EventCallback錯誤處理回調
tie_std::weak_ptr<void>生命周期綁定(防止 Channel 在處理事件期間被意外銷毀)

2.關鍵的成員函數

1.?事件管理
函數作用
enableReading()注冊讀事件(`events_= kReadEvent`)
disableReading()取消讀事件
enableWriting()注冊寫事件(`events_= kWriteEvent`)
disableWriting()取消寫事件
disableAll()取消所有事件
update()通知 EventLoop 更新關注的事件(內部調用?EventLoop::updateChannel()
2.?事件處理核心
函數作用
handleEvent(Timestamp)處理事件的入口(根據?revents_?調用對應的回調)
handleEventWithGuard(Timestamp)實際處理事件(加生命周期保護)
setReadCallback()/setWriteCallback()設置讀/寫/關閉/錯誤的回調函數
3.?生命周期控制
函數作用
tie(const std::shared_ptr<void>&)綁定共享指針,防止回調執行期間對象被銷毀
remove()從 EventLoop 中移除 Channel

3.核心設計思想

  1. 事件與回調分離

    • events_?和?revents_?分離,由 Poller 監聽?events_,返回?revents_

    • 通過?setXXXCallback()?設置業務邏輯,Channel 只負責事件分發。

  2. 線程安全

    • 所有操作必須通過?EventLoop?在 IO 線程執行(通過?loop_?指針保證)。

  3. 資源管理

    • 使用?tie_?綁定共享指針(如?TcpConnection),避免處理事件時對象被析構。

  4. 高效事件過濾

    • isReading()/isWriting()?快速檢查事件狀態,避免無效回調。

class Channel : noncopyable{
public:using EventCallback = std::function<void()>;using ReadEventCallback = std::function<void(Timestamp)>;Channel(EventLoop* loop, int fd);~Channel();// poller通知后,處理事件
// 是否綁定對象,來決定是否需要使用智能指針 guard 來保護這個對象的生命周期void handleEvent(Timestamp receiveTime);//設置回調函數對象:movevoid setReadCallback(ReadEventCallback cb){ readCallback_ = std::move(cb); }void setWriteCallback(EventCallback cb){ writeCallback_ = std::move(cb); }void setCloseCallback(EventCallback cb){ closeCallback_ = std::move(cb); }void setErrorCallback(EventCallback cb){ errorCallback_ = std::move(cb); }// 防止channel執行回調時被刪除void tie(const std::shared_ptr<void>&);// 事件管理void enableReading() { events_ |= kReadEvent; update(); }void disableReading() { events_ &= ~kReadEvent; update(); }void enableWriting() { events_ |= kWriteEvent; update(); }void disableWriting() { events_ &= ~kWriteEvent; update(); }void disableAll() { events_ = kNoneEvent; update(); }bool isWriting() const { return events_ & kWriteEvent; }bool isReading() const { return events_ & kReadEvent; }bool isNoneEvent() const { return events_ == kNoneEvent; }int fd() const { return fd_; }int events() const { return events_; }void set_revents(int revt) { revents_ = revt; }EventLoop* ownerLoop() { return loop_; }// for Pollerint index() { return index_; }void set_index(int idx) { index_ = idx; }void remove();private:void update();void handleEventWithGuard(Timestamp receiveTime);static const int kNoneEvent;static const int kReadEvent;static const int kWriteEvent;EventLoop* loop_;const int  fd_;int        events_;int        revents_; // it's the received event types of epoll or pollint        index_;   // used by Poller.(channel的狀態:new/added/deleted)std::weak_ptr<void> tie_;//回調函數ReadEventCallback readCallback_;EventCallback writeCallback_;EventCallback closeCallback_;EventCallback errorCallback_;
}

??1.?guard?和?tie?的作用(防止對象生命周期問題)???

在事件驅動模型中,Channel?對象通常與某個資源(如 TCP 連接?TcpConnection)關聯。當事件觸發時,Channel?會調用用戶注冊的回調函數(如?readCallback_)。
??風險??:在回調執行期間,關聯的資源(如?TcpConnection)可能被其他線程銷毀,導致回調訪問野指針,引發崩潰。

在?handleEvent?中嘗試將?tie_?提升為?shared_ptrguard = tie_.lock()

在 muduo 的?Channel?類中,使用?std::weak_ptr<void>?而非?std::shared_ptr<void>?來管理關聯對象的生命周期,主要基于以下設計考量:


??1. 避免循環引用(核心原因)??

  • ??問題場景??:
    如果?Channel?直接持有?shared_ptr<void>,而關聯對象(如?TcpConnection)又持有?Channel?的?shared_ptr,會導致循環引用

  • ??解決方案??:
    weak_ptr?是弱引用,不會增加引用計數,打破循環依賴。
    只有通過?lock()?臨時提升為?shared_ptr?時才增加計數,確保安全訪問。


??2. 明確所有權關系??

  • ??Channel?不擁有資源??:
    Channel?只是事件處理器,其關聯對象(如?TcpConnection)應由更高層(如?EventLoop?或用戶代碼)管理生命周期。

    • 使用?weak_ptr?表明?Channel?僅“觀察”資源,不參與所有權管理。
    • 資源銷毀時,tie_.lock()?會返回?nullptrChannel?自動跳過回調。
  • ??對比?shared_ptr??:
    如果?Channel?持有?shared_ptr,會模糊所有權邊界,增加資源意外存活的風險。

std::weak_ptr 是 C++ 標準庫中的一種智能指針類型,用于解決 std::shared_ptr 的循環引用問題。它允許一個對象安全地引用另一個對象,而不會增加引用計數。std::weak_ptr 通常用于觀察(但不擁有)一個由 std::shared_ptr 管理的對象。lock() 方法,嘗試將弱引用提升為強引用(std::shared_ptr

循環引用:假設我們有兩個類 AB,它們相互引用。如果不使用 std::weak_ptr,會導致循環引用。

2.setXXXCallback()?函數使用?std::move?來設置回調函數:避免不必要的拷貝,提高性能

3.std::function 是 C++ 標準庫中的一個模板類,用于封裝可調用對象?

7.poller

1.poller

封裝操作系統的 I/O 多路復用機制(如 poll, epoll),提供一個統一接口讓 EventLoop 能獲取到就緒的 I/O 事件(如可讀、可寫、出錯等),并通知對應的 Channel。?EventLoop 的核心組件之一

功能描述
抽象封裝抽象 I/O 多路復用(poll/epoll/...)
事件驅動核心EventLoop 協作,驅動整個網絡庫
管理 fd → Channel 映射負責讓系統知道我們關心哪些事件、什么時候觸發

class Poller : noncopyable{
Public:using ChannelList = std::vector<Channel*>;Poller(EventLoop *loop);virtual ~Poller();// 虛函數接口(抽象方法)// 執行一次 I/O 事件輪詢(超時等待 timeoutMs 毫秒),將活躍的 Channel 填入virtual TimeStamp poll(int timeoutMs, ChannelList* activeChannels) = 0;// 當 Channel 感興趣的事件發生變化時(如從監聽讀改為監聽寫),調用該函數virtual void updataChannel(Channel* channel) = 0;//從 Poller 中移除某個 Channelvirtual void removeChannel(Channel* channel) = 0;virtual bool hasChannel(Channel* channel) const;//工廠方法static Poller* newDefaultPoller(EventLoop* loop);protected:using ChannelMap = std::unordered<int, Channel*>;ChannelMap channels_;
private:EventLoop *ownweLoop_;
}

static Poller* newDefaultPoller(EventLoop* loop);是工廠方法,工廠方法是將對象的創建“推遲”到子類或函數中,“返回基類指針”是工廠方法模式最常見、最有用的實踐形式。實現“抽象創建 + 多態調用”的關鍵方式??

2.EPollPoller

EPollPoller 是對 epoll 的封裝,實現了 Poller 接口,核心作用是“等待就緒事件并通知 EventLoop

class EPollPoller : public Poller{
public:EPollEpoller(EventLoop *loop);~EPollPoller() override;Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;// epoll_waitvoid updateChannel(Channel* channel) override;    //epoll_ctl :add/modvoid removeChannel(Channel* channel) override;    //epoll_ctl :delprivate:void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;// 將 epoll_event 轉成 Channelvoid update(int operation, Channel* channel);// 封裝 epoll_ctlint epollfd_;EventList events_;//就緒的IO事件
}

8.Socket:提供對底層 socket 操作的封裝(如設置選項、綁定、連接、關閉等)

class Socket : noncopyable {public:explicit Socket(int sockfd) : sockfd_(sockfd) {}~Socket();int fd() const { return sockfd_; }void bindAddress(const InetAddress& localaddr);void listen();int accept(InetAddress* peeraddr);void shutdownWrite();void setTcpNoDelay(bool on);void setReuseAddr(bool on);void setReusePort(bool on);void setKeepAlive(bool on);private:const int sockfd_;
};

1. TCP 選項總覽:

TCP 選項對應方法作用
禁用 Nagle 算法setTcpNoDelay(bool on)減少小包延遲,提高實時性
地址復用(SO_REUSEADDR)setReuseAddr(bool on)解決端口占用,快速重啟服務
端口復用(SO_REUSEPORT)setReusePort(bool on)多進程監聽同一端口,提高并發能力
TCP 保活setKeepAlive(bool on)檢測長時間空閑連接,防止假死

2. TCP_NODELAY:禁用 Nagle 算法

  • Nagle算法:

    • 為了減少網絡中小數據包的數量,TCP 默認會將小包合并后再發送。

    • 只有當前一個數據包得到ACK 確認后,才發送下一個小包。

  • 問題:

    • 在實時通信(如即時消息、游戲)中,小包積累和延遲會影響性能。

  • 低延遲要求高的應用:即時通信、實時游戲

大量小數據包傳輸:流媒體、物聯網設備

void Socket::setTcpNoDelay(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
}
  • IPPROTO_TCP 指定協議類型。

  • TCP_NODELAY 禁用 Nagle 算法。

  • optval 1 為禁用,0 為啟用。

3. SO_REUSEADDR:地址復用

  • 場景:

    • 當服務器意外崩潰或重啟后,端口可能仍處于 TIME_WAIT 狀態,導致端口被占用

  • 問題:

    • 無法立即重新綁定相同端口,導致服務重啟失敗。

  • 服務器重啟: 服務器崩潰后端口快速重用。

  • 端口復用: 多個進程監聽相同端口(配合 SO_REUSEPORT)。

void Socket::setReuseAddr(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
  • SOL_SOCKET 套接字級別的選項。

  • SO_REUSEADDR 地址復用。

  • optval 1 為啟用,0 為禁用。

4. SO_REUSEPORT:端口復用

  • 場景:

    • 多核服務器上,為了充分利用 CPU 資源,通常使用多個進程監聽相同端口

  • 問題:

    • 未開啟時,多個進程無法綁定同一端口,導致服務擴展性差

  • 多核服務器: 高并發服務器提高吞吐量。

  • 負載均衡: 允許多個進程共享一個監聽端口。

void Socket::setReusePort(bool on) {
#ifdef SO_REUSEPORTint optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
#elseif (on) {LOG_SYSERR << "SO_REUSEPORT is not supported.";}
#endif
}

5. SO_KEEPALIVE:TCP 保活

  • 場景:

    • 長時間空閑連接,如客戶端掉線或網絡中斷。

  • 問題:

    • 服務器長時間無法檢測到客戶端斷開

  • 長連接服務: 保持連接狀態,防止假死。

  • 防止資源浪費: 檢測空閑連接,及時釋放。

void Socket::setKeepAlive(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
}

TIME_WAIT 產生的原因

主動關閉連接的一方發送 FIN 并接收到對方的 ACK 后,進入 TIME_WAIT 狀態:

  1. 防止舊數據包干擾新連接:

    • 如果立即釋放端口,新連接可能收到上一個連接的殘留數據包

  2. 確保對方收到 ACK:

    • 最后一個 ACK 可能丟失,對方會重新發送 FIN,TIME_WAIT 可以重新應答。

3. TIME_WAIT 的問題

在高并發服務器中,TIME_WAIT 數量過多會導致以下問題:

  • 端口耗盡:

    • 短時間內建立大量連接,導致端口不足

  • 重啟服務失敗:

    • 上次監聽的端口仍處于 TIME_WAIT,導致新進程無法重新綁定相同端口。

  • 資源占用:

    • 系統中有大量處于 TIME_WAIT 狀態的連接,浪費內存和 CPU。

SO_REUSEADDR 是一個套接字選項,允許服務器在端口處于 TIME_WAIT快速重啟

  • 允許端口復用:

    • 即使處于 TIME_WAIT,也能重新綁定相同的端口。

  • 避免端口占用:

    • 當服務器崩潰重啟時,避免地址已被占用的錯誤。

?使用示例:TCP 服務器

InetAddress serverAddr(8080);
Socket listenSock(::socket(AF_INET, SOCK_STREAM, 0));  // 創建套接字listenSock.setReuseAddr(true);       // 設置地址復用
listenSock.bindAddress(serverAddr);  // 綁定端口
listenSock.listen();                 // 監聽InetAddress clientAddr;
int connfd = listenSock.accept(&clientAddr);  // 接受連接if (connfd >= 0) {printf("New connection from %s\n", clientAddr.toIpPort().c_str());Socket connSocket(connfd);      // 管理新連接connSocket.setTcpNoDelay(true); // 禁用 Nagle 算法connSocket.shutdownWrite();     // 關閉寫操作
}

9.Acceptor:監聽客戶端連接接收新連接

class Acceptor : noncopyable {public:typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);void setNewConnectionCallback(const NewConnectionCallback& cb) {newConnectionCallback_ = cb;}bool listening() const { return listening_; }void listen();private:void handleRead();EventLoop* loop_;Socket acceptSocket_;             // 封裝監聽套接字Channel acceptChannel_;           // 封裝監聽事件NewConnectionCallback newConnectionCallback_;  // 新連接回調bool listening_;
};

//構造函數
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport): loop_(loop),acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),acceptChannel_(loop, acceptSocket_.fd()),listening_(false) {acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(reuseport);acceptSocket_.bindAddress(listenAddr);acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
void Acceptor::handleRead() {InetAddress peerAddr;int connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0) {if (newConnectionCallback_) {newConnectionCallback_(connfd, peerAddr);} else {sockets::close(connfd);}}
}

10. buffer:是存儲從套接字讀入的數據待發送的數據,并進行高效的讀寫操作

+-------------------+------------------+------------------+
| prependable bytes |  readable bytes   |  writable bytes   |
|                  |  (數據區域)        |                   |
+-------------------+------------------+------------------+
0                readerIndex_       writerIndex_      buffer_.size()

class Buffer : copable{
public:static const size_t kCheapPrepend = 8;static const size_t kInitialSize = 1024;explicit Buffer(size_t initialSize = kInitialSize): buffer_(kCheapPrepend + initialSize),readerIndex_(kCheapPrepend),writerIndex_(kCheapPrepend){}size_t readableBytes() const{ return writerIndex_ - readerIndex_; }size_t writableBytes() const{ return buffer_.size() - writerIndex_; }size_t prependableBytes() const{ return readerIndex_; }const char* peek() const{ return begin() + readerIndex_; }//可讀數據起始地址void retrieveAll()//讀寫指針復位{readerIndex_ = kCheapPrepend;writerIndex_ = kCheapPrepend;}void retrieve(size_t len)//讀指針偏移{assert(len <= readableBytes());if (len < readableBytes()){readerIndex_ += len;}else{retrieveAll();}}string retrieveAllAsString(){return retrieveAsString(readableBytes());//將所有可讀數據轉成字符串}string retrieveAsString(size_t len)//將可讀數據轉成字符串{assert(len <= readableBytes());string result(peek(), len);retrieve(len);return result;}void makeSpace(size_t len)//擴容{if (writableBytes() + prependableBytes() < len + kCheapPrepend){// FIXME: move readable databuffer_.resize(writerIndex_+len);}else{// 將可讀數據前移,覆蓋已讀部分assert(kCheapPrepend < readerIndex_);size_t readable = readableBytes();std::copy(begin()+readerIndex_,begin()+writerIndex_,begin()+kCheapPrepend);readerIndex_ = kCheapPrepend;writerIndex_ = readerIndex_ + readable;assert(readable == readableBytes());}}void ensureWritableBytes(size_t len)//判斷是否有足夠空間可寫{if (writableBytes() < len){makeSpace(len);}assert(writableBytes() >= len);}void append(const StringPiece& str)//追加寫內容{append(str.data(), str.size());}void append(const char* /*restrict*/ data, size_t len){ensureWritableBytes(len);std::copy(data, data+len, beginWrite());hasWritten(len);}ssize_t readFd(int fd, int* savedErrno);//從文件描述符讀數據private:char* begin(){ return &*buffer_.begin(); }//數組首元素地址std::vector<char> buffer_;  // 底層存儲容器size_t readerIndex_;        // 讀起始位置size_t writerIndex_;        // 寫起始位置}
ssize_t readFd(int fd, int* savedErrno) {char extraBuf[65536];  // 棧上空間struct iovec vec[2];const size_t writable = writableBytes();vec[0].iov_base = begin() + writerIndex_;vec[0].iov_len = writable;vec[1].iov_base = extraBuf;vec[1].iov_len = sizeof(extraBuf);const int iovcnt = (writable < sizeof(extraBuf)) ? 2 : 1;const ssize_t n = ::readv(fd, vec, iovcnt);if (n < 0) {*savedErrno = errno;} else if (implicit_cast<size_t>(n) <= writable) {writerIndex_ += n;} else {writerIndex_ = buffer_.size();append(extrabuf, n - writable);}return n;
}

?readvwritevPOSIX 提供的系統調用,“分散讀”"聚集寫"操作,它們用于一次性從文件描述符讀取多個緩沖區的數據,將多個緩沖區數據寫入文件描述符

#include <sys/uio.h>ssize_t readv(int fd, const struct iovec *iov, int iovcnt);

?從?fd?順序讀取數據,依次填充到?iov[0]iovec[1]... 直到數據讀完或所有緩沖區填滿

struct iovec {void  *iov_base;  // 緩沖區起始地址size_t iov_len;   // 緩沖區長度
};

iovec?是?readv?的核心參數,用于指定數據讀取的目標緩沖區。這里設置了?兩個緩沖區

(1) 主緩沖區(vec[0]

vec[0].iov_base = begin() + writerIndex_;  // 指向當前可寫位置的起始地址
vec[0].iov_len = writable;                 // 可寫入的最大字節數
  • 目的
    優先將數據直接讀取到?內部主緩沖區?的剩余空間(writerIndex_?之后的部分)。

  • 優點

    • 避免內存拷貝:如果數據能完全放入主緩沖區,無需額外處理。

    • 減少內存碎片:復用已有的緩沖區空間。

(2) 備用棧緩沖區(vec[1]

vec[1].iov_base = extraBuf;                // 棧上的臨時緩沖區
vec[1].iov_len = sizeof(extraBuf);         // 固定大小(64KB)
  • 目的
    如果主緩沖區剩余空間不足(writable?太小),則?超額數據?會暫存到棧上的?extraBuf

  • 優點

    • 避免內存浪費:主緩沖區空間不足時,臨時用棧空間兜底(棧分配速度快,且函數退出后自動釋放)。

    • 防止丟包:即使主緩沖區滿,也能保證數據不丟失(暫存到?extraBuf?后再處理)

11.TcpServer

1.TcpServer 的作用

  • 管理連接:

    • 負責接受客戶端連接,并生成相應的 TcpConnection 對象。

  • 線程分配:

    • 使用 EventLoopThreadPool 進行多線程管理,保證高并發。

  • 消息處理:

通過回調機制(如連接建立、消息接收、連接關閉)實現事件響應。

class TcpServer : noncopyable {public:typedef std::function<void(EventLoop*)> ThreadInitCallback;enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option = kNoReusePort);~TcpServer();void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }void start();  // 啟動服務器private:void newConnection(int sockfd, const InetAddress& peerAddr);void removeConnection(const TcpConnectionPtr& conn);void removeConnectionInLoop(const TcpConnectionPtr& conn);EventLoop* loop_;  // 主線程的事件循環const string ipPort_;const string name_;std::unique_ptr<Acceptor> acceptor_;std::shared_ptr<EventLoopThreadPool> threadPool_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;ThreadInitCallback threadInitCallback_; //線程初始化回調std::atomic started_;int nextConnId_;std::unordered_map<string, TcpConnectionPtr> connections_;  // 連接管理
};

枚舉(enum)是一種用戶自定義的數據類型,它允許為一組整數值賦予有意義的名稱

2. 常見的回調類型

回調名稱類型觸發時機
ConnectionCallbackvoid(const TcpConnectionPtr&)連接建立或斷開時
MessageCallbackvoid(const TcpConnectionPtr&, Buffer*, Timestamp)收到數據時
WriteCompleteCallbackvoid(const TcpConnectionPtr&)數據發送完成時
HighWaterMarkCallbackvoid(const TcpConnectionPtr&, size_t)緩沖區到達高水位線時
CloseCallbackvoid(const TcpConnectionPtr&)連接關閉時
TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option): loop_(loop),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),threadPool_(new EventLoopThreadPool(loop, name_)),nextConnId_(1) {acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));
}
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {EventLoop* ioLoop = threadPool_->getNextLoop();string connName = name_ + to_string(nextConnId_++);TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, peerAddr));connections_[connName] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
//初始化連接并注冊回調
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {loop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop();  // 獲取下一個 IO 線程char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;std::string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();InetAddress localAddr(sockets::getLocalAddr(sockfd));// 創建 TcpConnection 對象TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));connections_[connName] = conn;  // 添加到連接管理表// 設置各種回調conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);// 設置關閉回調,當連接關閉時會自動移除conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));// 在 IO 線程中調用 connectEstablished()ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

//getLocalAddr 來獲取本地監聽地址
struct sockaddr_in6 sockets::getLocalAddr(int sockfd)
{struct sockaddr_in6 localaddr;memZero(&localaddr, sizeof localaddr);socklen_t addrlen = static_cast<socklen_t>(sizeof localaddr);if (::getsockname(sockfd, sockaddr_cast(&localaddr), &addrlen) < 0){LOG_SYSERR << "sockets::getLocalAddr";}return localaddr;
}

TcpServer 類的 removeConnectionremoveConnectionInLoop 函數用于在連接關閉時從服務器的連接管理中移除相應的 TcpConnection 對象?

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{// FIXME: unsafeloop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{loop_->assertInLoopThread();LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_<< "] - connection " << conn->name();size_t n = connections_.erase(conn->name());(void)n;assert(n == 1);EventLoop* ioLoop = conn->getLoop();ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}

TcpServer::~TcpServer()
{loop_->assertInLoopThread();LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";for (auto& item : connections_){TcpConnectionPtr conn(item.second);item.second.reset();conn->getLoop()->runInLoop(std::bind(&TcpConnection::connectDestroyed, conn));}
}

1. 連接綁定線程的原則

每個 TcpConnection 都綁定在一個特定的**事件循環(I/O 線程)**中。
核心思想:

  • 一個連接只能在所屬的 I/O 線程中操作,不能跨線程直接操作連接對象。

  • 原因:

    • 事件循環是單線程運行的,如果其他線程直接操作連接,會破壞線程安全,導致數據競爭崩潰

2. 為什么不能直接在析構函數中銷毀?

連接的生命周期與服務器不完全一致:

  • 服務器關閉時,可能還有活躍連接未完成數據傳輸。

  • 直接銷毀可能導致正在傳輸的數據被中斷,或者導致未完成的回調直接崩潰。

跨線程問題:
  • 假設:

    • 服務器(主線程)正在析構,直接調用 TcpConnection::connectDestroyed()

    • 但是該連接實際上是在其他 I/O 線程中活躍,直接操作會破壞線程隔離

  • 后果:

    • 如果直接調用銷毀函數,容易導致訪問未釋放或無效內存,導致程序崩潰。

12.TcpConnection

  • 連接管理: 負責連接的建立和關閉。

  • 數據收發: 提供異步發送和接收數據的接口。

回調類型描述典型設置方式
連接回調當連接建立或斷開時觸發setConnectionCallback()
消息回調當有消息到達時觸發setMessageCallback()
寫完成回調發送緩沖區中的數據完全發送時觸發setWriteCompleteCallback()
高水位回調發送緩沖區數據量超過高水位時觸發setHighWaterMarkCallback()
關閉回調連接關閉時觸發setCloseCallback()
class TcpConnection : public std::enable_shared_from_this<TcpConnection>, noncopyable{
public:TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr);~TcpConnection();const std::string& name() const { return name_; }        // 獲取連接名稱const InetAddress& localAddress() const { return localAddr_; } // 獲取本地地址const InetAddress& peerAddress() const { return peerAddr_; }   // 獲取對端地址bool connected() const { return state_ == kConnected; }  // 檢查是否已連接void setConnectionCallback(const ConnectionCallback& cb);void setMessageCallback(const MessageCallback& cb);void setWriteCompleteCallback(const WriteCompleteCallback& cb);void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark);void setCloseCallback(const CloseCallback& cb);void send(Buffer* buf);                    // 發送緩沖區內容void shutdown();                           // 關閉寫端void connectEstablished();    // 連接建立時調用void connectDestroyed();      // 連接銷毀時調用private:void handleRead(Timestamp receiveTime);     // 讀取數據事件void handleWrite();                        // 寫入數據事件void handleClose();                        // 關閉事件void handleError();                        // 錯誤事件void sendInLoop(const void* message, size_t len);void shutdownInLoop();enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };EventLoop* loop_;                  // 事件循環指針const string name_;                // 連接名稱StateE state_;                     // 連接狀態(連接、斷開、正在連接)bool reading_;                     // 是否正在讀取std::unique_ptr<Socket> socket_;   // 封裝的 TCP 套接字std::unique_ptr<Channel> channel_; // 事件分發器const InetAddress localAddr_;      // 本地地址const InetAddress peerAddr_;       // 對端地址Buffer inputBuffer_;               // 接收緩沖區Buffer outputBuffer_;              // 發送緩沖區ConnectionCallback connectionCallback_;     // 連接回調MessageCallback messageCallback_;           // 消息回調WriteCompleteCallback writeCompleteCallback_; // 寫完成回調HighWaterMarkCallback highWaterMarkCallback_; // 高水位回調CloseCallback closeCallback_;               // 關閉回調}

1.std::enable_shared_from_this

std::enable_shared_from_this 是一個輔助類模板,允許在類的成員函數中安全地獲取當前對象的 shared_ptr

std::enable_shared_from_this 的工作原理

  1. TcpConnection 對象作為 shared_ptr 被創建時,enable_shared_from_this 會在內部保存一個弱引用。

  2. 當需要獲取自身的 shared_ptr 時,調用 shared_from_this() 方法即可。

  3. 這樣可以確保即使在類的成員函數中,也不會錯誤地生成一個新的 shared_ptr,而是與原來的共享控制塊關聯。

假設你在類方法中直接這樣寫:

std::shared_ptr<MyClass> ptr(this); // 錯誤示范

出現問題的原因:

  1. 引用計數錯誤:

    • 當對象通過 new 創建并使用 std::shared_ptr<T>(this) 包裝時,會生成一個新的控制塊,與已有的 shared_ptr 無關。

    • 這樣做使得同一個對象有兩個獨立的控制塊,各自管理引用計數。

  2. 雙重釋放:

    • 當第一個 shared_ptr 析構時,引用計數減為零,釋放內存。

    • 第二個 shared_ptr 析構時,再次釋放已經無效的內存,導致崩潰

2.使用智能指針 std::unique_ptr 來管理 SocketChannel 對象

1.資源獨占性:保證 Socket 和 Channel 的唯一所有權

2.自動釋放資源:防止內存泄漏

//處理讀事件,當有數據到達時觸發。
void TcpConnection::handleRead(Timestamp receiveTime) {int savedErrno = 0;ssize_t n = inputBuffer_.readFd(socket_->fd(), &savedErrno);if (n > 0) {messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);  // 調用消息回調} else if (n == 0) {handleClose();  // 對端關閉} else {errno = savedErrno;handleError();  // 讀取出錯}
}

void TcpConnection::handleClose() {LOG_INFO << "TcpConnection::handleClose - fd = " << channel_->fd();state_ = kDisconnected;channel_->disableAll();  // 停止監聽所有事件TcpConnectionPtr guardThis(shared_from_this());connectionCallback_(guardThis);  // 通知上層連接已關閉closeCallback_(guardThis);       // 觸發關閉回調
}
void TcpConnection::send(const std::string &buf)
{if (state_ == kConnected){if (loop_->isInLoopThread()){sendInLoop(message);}else{void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;loop_->runInLoop(std::bind(fp,this,     // FIXMEmessage.as_string()));//std::forward<string>(message)));}}
}

sendInLoop() 在網絡編程中用于發送數據,但它是在事件循環線程中執行的。

  • 將數據發送到網絡連接上(通過 socket 寫入)。

  • 如果無法一次性寫完,則將剩余數據存入輸出緩沖區,等待下次可寫時繼續發送。

void TcpConnection::sendInLoop(const void* data, size_t len)
{loop_->assertInLoopThread();  // 確保在事件循環線程中ssize_t nwrote = 0;size_t remaining = len;bool faultError = false;if (state_ == kDisconnected){LOG_WARN << "disconnected, give up writing";return;}// Step 1: 直接寫入數據(如果可能)if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = sockets::write(channel_->fd(), data, len);if (nwrote >= 0){remaining = len - nwrote;if (remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else {nwrote = 0;if (errno != EWOULDBLOCK){LOG_SYSERR << "TcpConnection::sendInLoop";if (errno == EPIPE || errno == ECONNRESET){faultError = true;}}}}// Step 2: 如果沒有寫完,則保存剩余數據到緩沖區if (!faultError && remaining > 0){size_t oldLen = outputBuffer_.readableBytes();if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);if (!channel_->isWriting()){//可寫事件監聽:表示套接字緩沖區有空閑空間,可以繼續寫入數據。channel_->enableWriting();}}
}

void TcpConnection::handleWrite()
{loop_->assertInLoopThread();if (channel_->isWriting()){ssize_t n = sockets::write(channel_->fd(),outputBuffer_.peek(),outputBuffer_.readableBytes());if (n > 0){outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0){channel_->disableWriting();if (writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}if (state_ == kDisconnecting){shutdownInLoop();}}}else{LOG_SYSERR << "TcpConnection::handleWrite";// if (state_ == kDisconnecting)// {//   shutdownInLoop();// }}}else{LOG_TRACE << "Connection fd = " << channel_->fd()<< " is down, no more writing";}
}

??TcpConnection?數據發送流程總結??

  1. ??直接發送??

    • 當 ??輸出緩沖區為空?? 且 ??socket 不可寫?? 時,直接調用?write()?嘗試發送數據。
    • 如果全部發送成功,觸發?writeCompleteCallback_
  2. ??緩沖剩余數據??

    • 如果未完全發送(或?write()?返回?EAGAIN),剩余數據存入?outputBuffer_,并注冊?EPOLLOUT?事件。
  3. ??處理可寫事件??

    • 當?EPOLLOUT?觸發時,handleWrite()?從?outputBuffer_?讀取數據繼續發送。
    • 如果緩沖區清空,取消?EPOLLOUT?監聽,避免忙等待。

?將當前對象(TcpConnection)傳遞給回調函數,以便在回調函數中正確訪問和操作該對象

  • 使用 shared_from_this() 獲取當前對象的智能指針(std::shared_ptr,而不是裸指針。

  • 這樣可以在回調函數中持有對象的生命周期,防止對象在回調執行之前被銷毀。

void TcpConnection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);channel_->tie(shared_from_this());  // 防止channel中使用懸空指針// 注冊讀事件channel_->enableReading();// 調用連接建立回調,通知上層用戶連接已經建立connectionCallback_(shared_from_this());
}

void TcpConnection::connectDestroyed() {loop_->assertInLoopThread();if (state_ == kConnected) {setState(kDisconnected);channel_->disableAll();  // 取消所有事件監聽// 調用連接關閉回調,通知上層用戶連接已關閉connectionCallback_(shared_from_this());}channel_->remove();  // 從 Poller 中移除
}
void TcpConnection::shutdown()
{// FIXME: use compare and swapif (state_ == kConnected){setState(kDisconnecting);// FIXME: shared_from_this()?loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}void TcpConnection::shutdownInLoop()
{loop_->assertInLoopThread();if (!channel_->isWriting())    //沒有數據發送(寫關閉){socket_->shutdownWrite();}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/80291.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/80291.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/80291.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

互聯網大廠Java面試實錄:Spring Boot與微服務架構在電商場景中的應用解析

&#x1f4aa;&#x1f3fb; 1. Python基礎專欄&#xff0c;基礎知識一網打盡&#xff0c;9.9元買不了吃虧&#xff0c;買不了上當。 Python從入門到精通 &#x1f601; 2. 畢業設計專欄&#xff0c;畢業季咱們不慌忙&#xff0c;幾百款畢業設計等你選。 ?? 3. Python爬蟲專欄…

關于匯編語言與程序設計——單總線溫度采集與顯示的應用

一、實驗要求 (1)握碼管的使用方式 (2)掌握DS18B20溫度傳感器的工作原理 (3)掌握單總線通信方式實現 MCU與DS18B20數據傳輸 二、設計思路 1.整體思路 通過編寫數碼管顯示程序和單總線溫度采集程序&#xff0c;結合溫度傳感報警&#xff0c;利用手指觸碰傳感器&#xff0c;當…

用html+js+css實現的戰略小游戲

效果圖: 兄弟們&#xff0c;話不多說&#xff0c;直接上代碼 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0">…

Navicat BI 數據分析功能上線 | 數據洞察新方法

Navicat 17.2 版本一經發布&#xff0c;便以 AI 助手賦能智能交互、Snowflake 支持拓展數據連接版圖、拓展對關系型、維度以及數據倉庫 2.0 建模方法的支持等新特性與功能抓住了用戶的目光&#xff0c;但其中一項低調且實用的更新 - 在 BI 數據預覽中深度集成數據分析工具&…

【ts】defineProps數組的類型聲明

第一種&#xff1a;使用Record<string, unknown> Record<string, unknown>表示一個對象&#xff0c;鍵是string類型&#xff0c;值是未知的 import { defineProps, PropType } from vue;const props defineProps({dataList: {type: Array as PropType<Record…

OpenCv實戰筆記(4)基于opencv實現ORB特征匹配檢測

一、原理作用 ORB 原理&#xff08;Oriented FAST and Rotated BRIEF&#xff09;&#xff1a; 特征點檢測&#xff1a;使用 FAST 算法檢測角點&#xff08;關鍵點&#xff09;。 方向計算&#xff1a;為每個關鍵點分配主方向&#xff0c;增強旋轉不變性。 特征描述&#xff1a…

Unreal 從入門到精通之VR常用操作

文章目錄 前言1.如何設置VRPawn視角的位置。2.如何播放視頻3.如何播放VR全景視頻。4.如何打開和關閉VR模式。前言 我們使用Unreal5 開發VR 項目的時候,會遇到很多常見問題。 比如: 1.如何設置VRPawn視角的位置。 2.如何播放視頻。 3.如何播放VR全景視頻。 4.如何打開和關閉V…

[論文閱讀]Deep Cross Network for Ad Click Predictions

摘要 特征工程是許多預測模型成功的關鍵。然而&#xff0c;這個過程是困難的&#xff0c;甚至需要手動特征工程或窮舉搜索。DNN能夠自動學習特征交互&#xff1b;然而&#xff0c;它們隱式地生成所有的交互&#xff0c;并且不一定有效地學習所有類型的交叉特征。在本文中&…

數據庫(MySQL)基礎

一、登錄數據庫 在linux系統中登錄數據庫的指令 mysql -h 127.48.0.236 -P 3306 -u root -p -h&#xff1a;填寫IP地址&#xff0c;指明要連接的主機。如果不加該字段表示本地主機-P&#xff1a;填寫端口號&#xff0c;指明進程。 如果不加該字段會使用默認的端口號。-u&…

遠程調試---在電腦上devtools調試運行在手機上的應用

1、啟動項目–以vite項目為例:先ipconfig查看ip地址 ,然后在vite中配置host為ip地址 2、手機上查看項目:保證手機和電腦在同一局域網, 在手機瀏覽器打開我們vite啟動的項目地址, 3、使用chii進行遠程調試 (1) 安裝 npm install chii -g (2)啟動 chii start -p 8080 (3)在…

【程序員AI入門:開發】11.從零構建智能問答引擎:LangChain + RAG 實戰手冊

1、技術選型 組件推薦方案說明文本嵌入模型sentence-transformers/all-MiniLM-L6-v2輕量級且效果較好的開源模型向量數據庫FAISS高效的本地向量檢索庫大語言模型GPT-3.5/開源LLM&#xff08;如ChatGLM3&#xff09;根據資源選擇云端或本地模型文檔處理框架LangChain簡化RAG流程…

【Linux基礎】文件查找和文本處理指令

目錄 grep命令 find命令 tar命令 head命令 tail命令 wc命令 tee命令 grep命令 作用&#xff1a;在文件中搜索匹配特定模式的文本行&#xff0c;并將結果輸出到標準輸出&#xff08;通常是終端&#xff09;。 基本用法&#xff1a; grep [選項] 搜索模式 [文件名] 常用…

云軸科技ZStack入選賽迪顧問2025AI Infra平臺市場發展報告代表廠商

DeepSeek憑借低成本、高性能、開源優勢帶來的蝴蝶效應依然在持續影響企業AI應用部署。尤其在數據安全備受關注的背景下&#xff0c;私有化部署已經成為企業應用AI大模型的優選方案。賽迪顧問在近期發布的《2025中國AI Infra平臺市場發展研究報告》中認為&#xff0c;在推理算力…

從零開始跑通3DGS教程:(四)修改(縮放、空間變換)colmap生成的sfm結果

寫在前面 本文內容 本文所屬《從零開始跑通3DGS教程》系列文章&#xff1b; 通過colmap進行的sfm的普通方式會丟失場景的物理尺度信息&#xff0c;并且并不在符合一般認知的坐標系下&#xff0c;本文將讀取colmap生成的點云和相機pose&#xff0c;將其進行空間變換和縮放之后&a…

RK3568-OpenHarmony(1) : OpenHarmony 5.1的編譯

概述: 本文主要描述了&#xff0c;如何在ubuntu-20.04操作系統上&#xff0c;編譯RK3568平臺的OpenHarmony 5.1版本。 搭建編譯環境 a. 安裝軟件包 sudo apt-get install git-lfs ruby genext2fs build-essential git curl libncurses5-dev libncursesw5-dev openjdk-11-jd…

vue+tsc+noEmit導致打包報TS類型錯誤問題及解決方法

項目場景&#xff1a; 提示&#xff1a;這里簡述項目相關背景&#xff1a; 當我們新建vue3項目,package.json文件會自動給我添加一些配置選項,這寫選項基本沒有問題,但是在實際操作過程中,當項目越來越復雜就會出現問題,本文給大家分享vuetscnoEmit導致打包報TS類型錯誤問題及…

Js 判斷瀏覽器cookie 是否啟用

驗證時 google瀏覽器 135.0.7049.117 不生效 cookie.html <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><title>Cookie 檢測</title> </head> <body><h1>檢測是否啟用 Cookie<…

Lambda表達式解讀

本文通過具體案例演示函數式接口Function<T,R>的三種實現方式演變過程。 一、傳統匿名內部類實現 Integer resInt1 t1(new Function<String, Integer>() {Overridepublic Integer apply(String s) {int i Integer.parseInt(s);return i;} });實現特點&#xff1…

等價無窮小代換

理解&#xff1a; 函數某一點的值可以使用泰勒展開式表示&#xff0c;&#xff08;低階無窮小 高階無窮小&#xff09;&#xff0c;主要有低階無窮小決定。 計算極限的時候&#xff1a; 乘除關系隨便換&#xff0c;不影響各個式子的低階無窮小加減關系&#xff1a; &#xf…

護網HVV初級藍隊面試題總結

struts2原理特征 原理:默認的content-type解析器會把用戶傳來的數據直接當成代碼執行&#xff0c;造成rce特征:ognl表達式&#xff0c;memberaccess字段&#xff0c;可以通過catalina日志過濾關鍵信息查找攻擊特征ongl表達式可以被當作代碼執行&#xff0c;其中的類為defaulta…