?1.對類進行禁止拷貝
class noncopyable
{public:noncopyable(const noncopyable&) = delete;void operator=(const noncopyable&) = delete;protected:noncopyable() = default;~noncopyable() = default;
};
2.日志
使用枚舉定義日志等級
enum LogLevel{TRACE,DEBUG,INFO,WARN,ERROR,FATAL,NUM_LOG_LEVELS,};
1.獲取日志實例對象
Logger& instance();
2.設置日志級別
void setLogLevel(int level);
3.寫日志
void log(std::string& msg);
#define LOG_info(fmt, ...) \do { \fprintf(stderr, "[INFO] %s:%d: " fmt "\n", \__FILE__, __LINE__, ##__VA_ARGS__); \} while(0)
1.C/C++ 的預處理器規定,一個完整的宏定義必須位于同一邏輯行。但為了代碼可讀性,需要用?\
?顯式聲明換行。
2.為什么用?do { ... } while(0),
避免宏展開后的語法問題
3.?LOG_info(fmt, ...)
-
fmt
:格式化字符串參數(如?"Value: %d"
) -
...
:可變參數(變長參數列表),表示可以傳遞任意數量的額外參數。 -
##__VA_ARGS__:
GNU 擴展語法,處理可變參數
#ifndef LOG_H
#define LOG_H#include <cstdio>
#include <ctime>
#include <cstdarg>// 定義日志級別
enum LogLevel
{TRACE = 0,DEBUG,INFO,WARN,ERROR,FATAL,NUM_LOG_LEVELS
};// 設置默認日志級別
extern LogLevel g_logLevel; // 在別的地方定義這個全局變量// 宏定義日志級別
#define LOG_TRACE(fmt, ...) \do { \if (g_logLevel <= TRACE) log_message("TRACE", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_DEBUG(fmt, ...) \do { \if (g_logLevel <= DEBUG) log_message("DEBUG", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_INFO(fmt, ...) \do { \if (g_logLevel <= INFO) log_message("INFO", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_WARN(fmt, ...) \do { \if (g_logLevel <= WARN) log_message("WARN", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_ERROR(fmt, ...) \do { \if (g_logLevel <= ERROR) log_message("ERROR", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_FATAL(fmt, ...) \do { \if (g_logLevel <= FATAL) log_message("FATAL", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)inline void log_message(const char* level, const char* file, int line, const char* fmt, ...)
{// 獲取當前時間char timeBuf[20];std::time_t now = std::time(nullptr);std::strftime(timeBuf, sizeof(timeBuf), "%Y-%m-%d %H:%M:%S", std::localtime(&now));// 打印日志fprintf(stderr, "[%s] [%s] %s:%d: ", timeBuf, level, file, line);va_list args;va_start(args, fmt);vfprintf(stderr, fmt, args);va_end(args);fprintf(stderr, "\n");
}#endif // LOG_H
?3.TimeStamp
#include <chrono>
#include <string>
#include <ctime>
#include <iomanip>
#include <sstream>class Timestamp {
public:// 構造函數Timestamp() : tp_(std::chrono::system_clock::now()) {}// 從時間點構造explicit Timestamp(std::chrono::system_clock::time_point tp) : tp_(tp) {}// 獲取當前時間戳 (靜態工廠方法)static Timestamp now() {return Timestamp();}// 轉換為字符串 (默認格式)std::string to_string() const {return format("%Y-%m-%d %H:%M:%S");} private:std::chrono::system_clock::time_point tp_;
};
4.InetAddress
# include<netinet/in.h> // 定義了Internet地址族// 封裝socket地址
class InetAdress{
public:explicit InetAddress(std::string& ip, uint16_t port, bool ipv6 = false);explicit InetAddress(const sockaddr_in &addr): addr_(addr);std::string toIp() const;std::string toIpPort() const;uint16_t port() const;
private:socketaddr_in addr_;
}
struct sockaddr_in {short sin_family; // 地址族unsigned short sin_port; // 端口號struct in_addr sin_addr; // IP 地址char sin_zero[8]; // 填充字節,用于對齊
};
InetAddress::InetAddress(StringArg ip, uint16_t port, bool ipv6)
{if (ipv6 || strchr(ip.c_str(), ':')){// ...}else{memset(&addr_, 0, sizeof addr_);addr_->sin_family = AF_INET;addr_->sin_port = htons(port);struct in_addr addr;if (inet_pton(AF_INET, ip, &addr) <= 0) {printf("Invalid IPv4 address\n");} else {printf("IPv4 address converted successfully\n");}addr_->sin_addr = addr;}
}string InetAddress::toIpPort() const
{char buf[64] = "";inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);size_t end = strlen(buf);uint16_t port = ntohs(addr_.sin_port);sprintf(buf + end, ":%u", port);return buf;
}
5.EventLoop
1.獲取和管理線程的標識(TID)、線程名稱、線程堆棧信息
namespace muduo
{
namespace CurrentThread
{// internalextern __thread int t_cachedTid;extern __thread char t_tidString[32];extern __thread int t_tidStringLength;extern __thread const char* t_threadName;void cacheTid();inline int tid(){if (__builtin_expect(t_cachedTid == 0, 0)){cacheTid();}return t_cachedTid;}inline const char* tidString() // for logging{return t_tidString;}inline int tidStringLength() // for logging{return t_tidStringLength;}inline const char* name(){return t_threadName;}bool isMainThread();void sleepUsec(int64_t usec); // for testingstring stackTrace(bool demangle);
} // namespace CurrentThread
} // namespace muduo
extern
關鍵字用于聲明變量或函數的存儲位置?- __thread 是 GCC/Clang 提供的關鍵字,用于聲明線程局部存儲變量(Thread-Local Storage, TLS)。每個線程都有自己獨立的變量副本。?
- __builtin_expect 是 GCC/Clang 提供的內置函數,用于向編譯器提供分支預測的提示?
static_cast<pid_t>(::syscall(SYS_gettid));
?2.EventLoop:
1. EventLoop 干什么?(職責)
EventLoop
是事件循環的核心類,負責管理和調度 I/O 事件和異步任務。
主要職責:
-
事件循環管理: 不斷循環等待和處理事件。
-
事件分發: 將活躍事件交給合適的回調處理。
-
任務調度: 支持在事件循環線程內安全地執行異步任務。
-
線程喚醒: 當其他線程提交任務時能夠喚醒阻塞的事件循環。
class EvenLoop : noncopyable{
public:using Functor = std::function<viod()>;EvenLoop();~EvenLoop();void loop();void quit();//狀態查詢Timestamp pollReturnTime() const { return pollReturnTime_; }//回調管理void runInLoop(Functor cb);// 在事件循環線程中立即執行回調 cbvoid queueInLoop(Functor cb);//將回調 cb 排隊,稍后在事件循環線程中執行size_t queueSize() const;//通道管理void wakeup();void updateChannel(Channel* channel);void removeChannel(Channel* channel);bool hasChannel(Channel* channel);//線程檢查bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }private:using ChannelList = std::vector<Channel*>;bool looping_;/* atomic *///是否正在進行事件循環std::atomic_bool quit_;//是否退出時間循環bool eventHandling_; /* atomic */ //表示是否正在處理事件bool callingPendingFunctors_; /* atomic *///表示是否正在執行待處理回調const pid_t threadId_;//當前循環tid// PollerTimestamp pollReturnTime_;//Poller返回時間std::unique_ptr<Poller> poller_;//Eventloop管理的PollerChannelList activeChannels_;// 存儲當前Poller 返回的活動通道列表// Channelint wakeupFd_;//eventfd,用于跨線程喚醒事件循環std::unique_ptr<Channel> wakeupChannel_;//封裝 wakeupFd_,綁定到 Poller(如 epoll),監控讀事件mutable MutexLock mutex_;std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);// 存儲其他線程提交的任務void doPendingFunctors();
}
1.在 muduo 網絡庫中,EventLoop
?類的構造函數初始化了一系列?bool
?類型的標志變量(如?looping_
、quit_
、eventHandling_
、callingPendingFunctors_
),這些變量用于 ??精確控制事件循環的狀態和線程安全??
looping_
?的限制?:??禁止重復啟動事件循環?
??eventHandling_的限制:
不能在處理事件時析構channel
callingPendingFunctors_
?:確保跨線程任務提交或事件循環忙時能夠及時喚醒?EventLoop
if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
?在 Linux 系統中,獲取當前線程的唯一 ID(TID)可以通過?syscall(SYS_gettid)
?
//防止一個線程創建多個EventLoop
__thread EventLoop* t_loopInThisThread = 0;
2.在 Linux 系統中,::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)
?用于創建一個 ??事件文件描述符(eventfd)??,通常用于線程間或進程間的異步事件通知(例如喚醒事件循環)?
3. std::atomic_bool 是 C++ 標準庫(<atomic> 頭文件)提供的一種原子類型,專門用于布爾值的原子操作。它是 std::atomic<bool> 的特化版本,允許在多線程環境中安全地讀寫布爾值,而無需顯式使用鎖(如 std::mutex)。std::atomic_bool 確保操作的原子性,避免數據競爭(data race),適用于需要高效、無鎖同步的場景。?
4.assertInLoopThread();當前線程是否是loop所在線程
-
EventLoop::loop()
-
EventLoop::updateChannel(Channel* channel)
-
EventLoop::removeChannel(Channel* channel)
-
bool EventLoop::hasChannel(Channel* channel)
-
void TcpConnection::sendInLoop(const void* data, size_t len)
-
void TcpConnection::shutdownInLoop()
-
void TcpConnection::startReadInLoop()?void TcpConnection::stopReadInLoop()
-
void TcpConnection::connectEstablished() void TcpConnection::connectDestroyed()
-
void TcpConnection::handleRead(Timestamp receiveTime)
-
void TcpConnection::handleWrite()
EventLoop::updateChannel(Channel* channel)|
poller_->updateChannel(channel);
對channel進行操作確保: 當前線程是loop所在線程,當前線程是channel所在線程
void EventLoop::loop()
{looping_ = true;quit_ = false;while (!quit_){activeChannels_.clear();// 等待事件發生poller_->poll(kPollTimeMs, &activeChannels_);// 遍歷所有活躍通道,逐個調用其事件處理函數for (Channel* channel : activeChannels_){channel->handleEvent();}// 執行異步任務doPendingFunctors();}looping_ = false;
}
void EventLoop::doPendingFunctors()
{std::vector<Functor> functors; // 定義一個局部任務列表callingPendingFunctors_ = true; // 標記正在執行任務{MutexLockGuard lock(mutex_); // 上鎖,保護任務隊列functors.swap(pendingFunctors_); // 將任務隊列中的任務移到本地變量}// 逐個執行任務for (const Functor& functor : functors){functor(); // 執行任務回調}callingPendingFunctors_ = false; // 執行完畢,重置標記
}
?用 swap()
將任務隊列和局部變量互換,而不是逐個取出任務:降低鎖競爭,只加鎖一次,把所有任務交換到局部變量中,后續執行不需要加鎖。
2. 為什么需要任務隊列?
特性 | handleEvent() | doPendingFunctors() |
---|---|---|
作用 | 處理I/O事件 | 處理異步任務 |
觸發時機 | poll()返回后,有活躍事件時觸發 | 每輪事件循環末尾,I/O事件處理完畢后 |
優先級 | 高,實時性要求高 | 低,非實時任務 |
來源 | 網絡 I/O、定時器、信號等事件 | 其他線程提交的任務、延遲任務 |
典型場景 | 讀寫網絡數據、關閉連接 | 線程間任務提交、定時任務、異步回調 |
線程安全性 | 需要確保 Channel 所在線程安全 | 通過加鎖保證線程安全 |
執行頻率 | 每個活躍事件都調用 | 每次事件循環結束后調用一次 |
在高性能服務器中,muduo
采用單線程 Reactor 模式,一個 EventLoop
對象只能在一個線程中運行。
但是,多個線程可能需要向同一個事件循環線程提交任務,比如:
-
工作線程向主線程提交任務。
-
異步回調需要在事件循環線程中執行。
-
事件處理函數中異步添加任務。
?當需要將任務提交到事件循環時,調用 queueInLoop()
:
?
void EventLoop::runInLoop(Functor cb)
{if (isInLoopThread()){cb();}else{queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(Functor cb)
{{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
}?
?callingPendingFunctors_
?:
-
如果當前正在執行回調隊列中的回調函數(
callingPendingFunctors_
為true
),并且有新的回調函數被加入到隊列中,需要通過wakeup()
方法喚醒事件循環線程,以確保新的回調函數能夠被處理。 -
如果不檢查
callingPendingFunctors_
,可能會導致事件循環線程被重復喚醒,從而增加不必要的開銷
int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_SYSERR << "Failed in eventfd";abort();}return evtfd;
}
?eventfd 文件描述符(FD),這是一個 Linux 特定的機制,用于線程間通信或事件通知。eventfd 常用于高效的無鎖同步。
EventLoop::EventLoop(): looping_(false),quit_(false),eventHandling_(false),callingPendingFunctors_(false),iteration_(0),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(this)),timerQueue_(new TimerQueue(this)),wakeupFd_(createEventfd()),wakeupChannel_(new Channel(this, wakeupFd_)),currentActiveChannel_(NULL)
{LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));// we are always reading the wakeupfdwakeupChannel_->enableReading();
}
void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}
事件循環線程持有 wakeupFd_
,并監聽其讀端。當其他線程調用 wakeup()
方法時,事件循環線程會從阻塞狀態中喚醒。
-
交換后,
pendingFunctors_
?變為空向量,新任務可繼續寫入。 -
局部變量?
functors
?在棧上析構時自動清理已執行的任務
MutexLockGuard
是一個 RAII(Resource Acquisition Is Initialization,資源獲取即初始化)風格的鎖管理器。它的作用是確保在作用域結束時自動釋放鎖,從而避免忘記手動釋放鎖導致的死鎖問題。?
RAII 的核心思想是將資源的獲取(如分配內存、打開文件、鎖定互斥鎖等)與對象的構造綁定在一起,將資源的釋放與對象的析構綁定在一起,確保資源在對象生命周期結束時自動釋放,從而避免資源泄漏和其他資源管理錯誤。
3.Thread
封裝了對 POSIX 線程(pthread)的創建、管理和操作
class Thread : nocopyable{
public:using ThreadFunc = std::function<void()>;explicit Thread(ThreadFunc, const std::string &name = string());~Thread();void start();void join();bool started() const { return started_; }pthread_t pthreadId() const { return pthreadId_; }pid_t tid() const { return tid_; }const string& name() const { return name_; }
pruvate:bool started_;bool joined_;std::shared_ptr<std::thread> thread_;pid_t tid_;ThreadFunc func_;string name_;static std::atomic_int32 numCreated;
}
void Thread::start() {assert(!started_);started_ = true;if (pthread_create(&pthreadId_, nullptr, &startThread, this) != 0) {started_ = false;throw std::runtime_error("Failed to create thread");}latch_.wait(); // 等待線程真正啟動
}void* startThread(void* obj)
{ThreadData* data = static_cast<ThreadData*>(obj);data->runInThread();delete data;return NULL;
}
int Thread::join() {assert(started_);assert(!joined_);joined_ = true;return pthread_join(pthreadId_, nullptr);
}
?4.EventLoopThread:它將事件循環和線程進行綁定,用于在一個獨立線程中運行 EventLoop
,從而避免在主線程中阻塞 I/O 事件處理
class EventLoopThread : noncopable(){
public:using ThreadInitCallback = std::functional<void(EventLoop*)>;EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),const string& name = string());~EventLoopThread();EventLoop* startLoop();private:EventLoop* loop_; // 事件循環指針bool exiting_; // 標志退出Thread thread_; // 封裝的線程對象std::mutex mutex_; // 互斥鎖,保證線程安全Condition cond_; // 條件變量,同步線程啟}
EventLoop* EventLoopThread::startLoop() {assert(!thread_.started());thread_.start(); // 啟動線程EventLoop* loop = nullptr;{MutexLockGuard lock(mutex_);while (loop_ == nullptr) { // 等待子線程初始化cond_.wait();}loop = loop_;}return loop;
}
?為什么要在循環中檢查?
(1)條件變量的常見問題:
-
虛假喚醒(Spurious Wakeup):
-
條件變量在沒有通知的情況下,
wait()
可能意外返回。 -
這通常由操作系統調度或中斷信號引起。
-
如果僅使用
cond_.wait();
,即使條件未滿足,程序也可能繼續執行,導致邏輯錯誤。
-
-
多線程競爭:
-
可能存在多個線程等待同一條件,一旦被喚醒,不一定是期望的線程。
-
沒有循環檢查,程序容易進入非預期狀態
-
void EventLoopThread::threadFunc() {EventLoop loop; // 創建事件循環對象{MutexLockGuard lock(mutex_);loop_ = &loop; // 將loop_指向剛創建的EventLoop對象cond_.notify(); // 通知主線程:loop_已初始化}loop.loop(); // 啟動事件循環
}
為什么要使用條件變量?
-
EventLoop
對象是在子線程中創建的,但主線程需要獲取它的指針。 -
如果沒有同步機制,主線程有可能在子線程創建之前就訪問 loop_,導致空指針。
-
通過互斥鎖和條件變量,讓主線程在
loop_
初始化之前阻塞等待,直到子線程創建完成并通知。
?5.EventLoopThreadPool:單個線程的事件循環(EventLoop
)無法應對大量并發請求。
為了提升性能,通常使用多線程事件循環,即線程池:
-
主線程(I/O 線程): 負責監聽新連接。
-
工作線程(計算/事件線程): 負責數據讀寫和計算。
class EventLoopThreadPool : noncopyable {public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);~EventLoopThreadPool();void setThreadNum(int numThreads) { numThreads_ = numThreads; }void start(const ThreadInitCallback& cb = ThreadInitCallback());/// round-robinEventLoop* getNextLoop();/// with the same hash code, it will always return the same EventLoopEventLoop* getLoopForHash(size_t hashCode);std::vector<EventLoop*> getAllLoops();bool started() const{ return started_; }const string& name() const{ return name_; }private:EventLoop* baseLoop_; // 主線程中的EventLoopbool started_; // 是否啟動int numThreads_; // 線程數量int next_; // 下一個分配的EventLoop下標std::vector<std::unique_ptr<EventLoopThread>> threads_; // 線程列表std::vector<EventLoop*> loops_; // 事件循環列表};
//啟動線程池 void EventLoopThreadPool::start() {assert(!started_);baseLoop_->assertInLoopThread();started_ = true;for (int i = 0; i < numThreads_; ++i) {auto thread = std::make_unique<EventLoopThread>();loops_.push_back(thread->startLoop()); // 啟動并獲取事件循環threads_.push_back(std::move(thread)); // 存儲線程對象}// 如果沒有設置線程,主線程直接作為唯一的EventLoopif (numThreads_ == 0) {loops_.push_back(baseLoop_);} }
//獲取下一個事件循環 EventLoop* EventLoopThreadPool::getNextLoop() {baseLoop_->assertInLoopThread();EventLoop* loop = baseLoop_;if (!loops_.empty()) {loop = loops_[next_];next_ = (next_ + 1) % loops_.size(); // 輪詢選擇下一個Loop}return loop; }
6.channel??
作為 Reactor 模式中的事件處理器,封裝了文件描述符的事件管理和回調處理
1.關鍵的成員變量
變量 | 類型 | 作用 |
---|---|---|
fd_ | const int | 管理的文件描述符(socket/eventfd/timerfd 等),Channel 只管理但不擁有它 |
events_ | int | 關注的事件類型(EPOLLIN /EPOLLOUT ?等),通過?enableReading() /enableWriting() ?設置 |
revents_ | int | 實際觸發的事件類型(由 Poller/Epoll 返回) |
loop_ | EventLoop* | 所屬的事件循環,所有操作必須在對應 EventLoop 線程中執行 |
readCallback_ | ReadEventCallback | 讀事件回調(帶時間戳參數,如收到數據時調用) |
writeCallback_ | EventCallback | 寫事件回調(如可寫時調用) |
closeCallback_ | EventCallback | 連接關閉回調 |
errorCallback_ | EventCallback | 錯誤處理回調 |
tie_ | std::weak_ptr<void> | 生命周期綁定(防止 Channel 在處理事件期間被意外銷毀) |
2.關鍵的成員函數
1.?事件管理
函數 | 作用 | |
---|---|---|
enableReading() | 注冊讀事件(`events_ | = kReadEvent`) |
disableReading() | 取消讀事件 | |
enableWriting() | 注冊寫事件(`events_ | = kWriteEvent`) |
disableWriting() | 取消寫事件 | |
disableAll() | 取消所有事件 | |
update() | 通知 EventLoop 更新關注的事件(內部調用?EventLoop::updateChannel() ) |
2.?事件處理核心
函數 | 作用 |
---|---|
handleEvent(Timestamp) | 處理事件的入口(根據?revents_ ?調用對應的回調) |
handleEventWithGuard(Timestamp) | 實際處理事件(加生命周期保護) |
setReadCallback() /setWriteCallback() | 設置讀/寫/關閉/錯誤的回調函數 |
3.?生命周期控制
函數 | 作用 |
---|---|
tie(const std::shared_ptr<void>&) | 綁定共享指針,防止回調執行期間對象被銷毀 |
remove() | 從 EventLoop 中移除 Channel |
3.核心設計思想
-
事件與回調分離
-
events_
?和?revents_
?分離,由 Poller 監聽?events_
,返回?revents_
。 -
通過?
setXXXCallback()
?設置業務邏輯,Channel 只負責事件分發。
-
-
線程安全
-
所有操作必須通過?
EventLoop
?在 IO 線程執行(通過?loop_
?指針保證)。
-
-
資源管理
-
使用?
tie_
?綁定共享指針(如?TcpConnection
),避免處理事件時對象被析構。
-
-
高效事件過濾
-
isReading()
/isWriting()
?快速檢查事件狀態,避免無效回調。
-
class Channel : noncopyable{
public:using EventCallback = std::function<void()>;using ReadEventCallback = std::function<void(Timestamp)>;Channel(EventLoop* loop, int fd);~Channel();// poller通知后,處理事件
// 是否綁定對象,來決定是否需要使用智能指針 guard 來保護這個對象的生命周期void handleEvent(Timestamp receiveTime);//設置回調函數對象:movevoid setReadCallback(ReadEventCallback cb){ readCallback_ = std::move(cb); }void setWriteCallback(EventCallback cb){ writeCallback_ = std::move(cb); }void setCloseCallback(EventCallback cb){ closeCallback_ = std::move(cb); }void setErrorCallback(EventCallback cb){ errorCallback_ = std::move(cb); }// 防止channel執行回調時被刪除void tie(const std::shared_ptr<void>&);// 事件管理void enableReading() { events_ |= kReadEvent; update(); }void disableReading() { events_ &= ~kReadEvent; update(); }void enableWriting() { events_ |= kWriteEvent; update(); }void disableWriting() { events_ &= ~kWriteEvent; update(); }void disableAll() { events_ = kNoneEvent; update(); }bool isWriting() const { return events_ & kWriteEvent; }bool isReading() const { return events_ & kReadEvent; }bool isNoneEvent() const { return events_ == kNoneEvent; }int fd() const { return fd_; }int events() const { return events_; }void set_revents(int revt) { revents_ = revt; }EventLoop* ownerLoop() { return loop_; }// for Pollerint index() { return index_; }void set_index(int idx) { index_ = idx; }void remove();private:void update();void handleEventWithGuard(Timestamp receiveTime);static const int kNoneEvent;static const int kReadEvent;static const int kWriteEvent;EventLoop* loop_;const int fd_;int events_;int revents_; // it's the received event types of epoll or pollint index_; // used by Poller.(channel的狀態:new/added/deleted)std::weak_ptr<void> tie_;//回調函數ReadEventCallback readCallback_;EventCallback writeCallback_;EventCallback closeCallback_;EventCallback errorCallback_;
}
??1.?guard
?和?tie
?的作用(防止對象生命周期問題)???
在事件驅動模型中,Channel
?對象通常與某個資源(如 TCP 連接?TcpConnection
)關聯。當事件觸發時,Channel
?會調用用戶注冊的回調函數(如?readCallback_
)。
??風險??:在回調執行期間,關聯的資源(如?TcpConnection
)可能被其他線程銷毀,導致回調訪問野指針,引發崩潰。
在?handleEvent
?中嘗試將?tie_
?提升為?shared_ptr
(guard = tie_.lock()
)
在 muduo 的?Channel
?類中,使用?std::weak_ptr<void>
?而非?std::shared_ptr<void>
?來管理關聯對象的生命周期,主要基于以下設計考量:
??1. 避免循環引用(核心原因)??
-
??問題場景??:
如果?Channel
?直接持有?shared_ptr<void>
,而關聯對象(如?TcpConnection
)又持有?Channel
?的?shared_ptr
,會導致循環引用 -
??解決方案??:
weak_ptr
?是弱引用,不會增加引用計數,打破循環依賴。
只有通過?lock()
?臨時提升為?shared_ptr
?時才增加計數,確保安全訪問。
??2. 明確所有權關系??
-
??
Channel
?不擁有資源??:
Channel
?只是事件處理器,其關聯對象(如?TcpConnection
)應由更高層(如?EventLoop
?或用戶代碼)管理生命周期。- 使用?
weak_ptr
?表明?Channel
?僅“觀察”資源,不參與所有權管理。 - 資源銷毀時,
tie_.lock()
?會返回?nullptr
,Channel
?自動跳過回調。
- 使用?
-
??對比?
shared_ptr
??:
如果?Channel
?持有?shared_ptr
,會模糊所有權邊界,增加資源意外存活的風險。
std::weak_ptr
是 C++ 標準庫中的一種智能指針類型,用于解決 std::shared_ptr
的循環引用問題。它允許一個對象安全地引用另一個對象,而不會增加引用計數。std::weak_ptr
通常用于觀察(但不擁有)一個由 std::shared_ptr
管理的對象。lock()
方法,嘗試將弱引用提升為強引用(std::shared_ptr
)
循環引用:假設我們有兩個類 A
和 B
,它們相互引用。如果不使用 std::weak_ptr
,會導致循環引用。
2.setXXXCallback()
?函數使用?std::move
?來設置回調函數:避免不必要的拷貝,提高性能
3.std::function
是 C++ 標準庫中的一個模板類,用于封裝可調用對象?
7.poller
1.poller
封裝操作系統的 I/O 多路復用機制(如 poll
, epoll
),提供一個統一接口讓 EventLoop
能獲取到就緒的 I/O 事件(如可讀、可寫、出錯等),并通知對應的 Channel
。?EventLoop
的核心組件之一
功能 | 描述 |
---|
抽象封裝 | 抽象 I/O 多路復用(poll/epoll/...) |
事件驅動核心 | 與 EventLoop 協作,驅動整個網絡庫 |
管理 fd → Channel 映射 | 負責讓系統知道我們關心哪些事件、什么時候觸發 |
class Poller : noncopyable{
Public:using ChannelList = std::vector<Channel*>;Poller(EventLoop *loop);virtual ~Poller();// 虛函數接口(抽象方法)// 執行一次 I/O 事件輪詢(超時等待 timeoutMs 毫秒),將活躍的 Channel 填入virtual TimeStamp poll(int timeoutMs, ChannelList* activeChannels) = 0;// 當 Channel 感興趣的事件發生變化時(如從監聽讀改為監聽寫),調用該函數virtual void updataChannel(Channel* channel) = 0;//從 Poller 中移除某個 Channelvirtual void removeChannel(Channel* channel) = 0;virtual bool hasChannel(Channel* channel) const;//工廠方法static Poller* newDefaultPoller(EventLoop* loop);protected:using ChannelMap = std::unordered<int, Channel*>;ChannelMap channels_;
private:EventLoop *ownweLoop_;
}
static Poller* newDefaultPoller(EventLoop* loop);是工廠方法,工廠方法是將對象的創建“推遲”到子類或函數中,“返回基類指針”是工廠方法模式最常見、最有用的實踐形式。實現“抽象創建 + 多態調用”的關鍵方式??
2.EPollPoller
EPollPoller
是對 epoll
的封裝,實現了 Poller
接口,核心作用是“等待就緒事件并通知 EventLoop”
class EPollPoller : public Poller{
public:EPollEpoller(EventLoop *loop);~EPollPoller() override;Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;// epoll_waitvoid updateChannel(Channel* channel) override; //epoll_ctl :add/modvoid removeChannel(Channel* channel) override; //epoll_ctl :delprivate:void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;// 將 epoll_event 轉成 Channelvoid update(int operation, Channel* channel);// 封裝 epoll_ctlint epollfd_;EventList events_;//就緒的IO事件
}
8.Socket:提供對底層 socket 操作的封裝(如設置選項、綁定、連接、關閉等)
class Socket : noncopyable {public:explicit Socket(int sockfd) : sockfd_(sockfd) {}~Socket();int fd() const { return sockfd_; }void bindAddress(const InetAddress& localaddr);void listen();int accept(InetAddress* peeraddr);void shutdownWrite();void setTcpNoDelay(bool on);void setReuseAddr(bool on);void setReusePort(bool on);void setKeepAlive(bool on);private:const int sockfd_;
};
1. TCP 選項總覽:
TCP 選項 | 對應方法 | 作用 |
---|---|---|
禁用 Nagle 算法 | setTcpNoDelay(bool on) | 減少小包延遲,提高實時性 |
地址復用(SO_REUSEADDR) | setReuseAddr(bool on) | 解決端口占用,快速重啟服務 |
端口復用(SO_REUSEPORT) | setReusePort(bool on) | 多進程監聽同一端口,提高并發能力 |
TCP 保活 | setKeepAlive(bool on) | 檢測長時間空閑連接,防止假死 |
2. TCP_NODELAY:禁用 Nagle 算法
-
Nagle算法:
-
為了減少網絡中小數據包的數量,TCP 默認會將小包合并后再發送。
-
只有當前一個數據包得到ACK 確認后,才發送下一個小包。
-
-
問題:
-
在實時通信(如即時消息、游戲)中,小包積累和延遲會影響性能。
-
-
低延遲要求高的應用: 如即時通信、實時游戲。
大量小數據包傳輸: 如流媒體、物聯網設備。
void Socket::setTcpNoDelay(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
}
-
IPPROTO_TCP
: 指定協議類型。 -
TCP_NODELAY
: 禁用 Nagle 算法。 -
optval
: 1 為禁用,0 為啟用。
3. SO_REUSEADDR:地址復用
-
場景:
-
當服務器意外崩潰或重啟后,端口可能仍處于 TIME_WAIT 狀態,導致端口被占用。
-
-
問題:
-
無法立即重新綁定相同端口,導致服務重啟失敗。
-
-
服務器重啟: 服務器崩潰后端口快速重用。
-
端口復用: 多個進程監聽相同端口(配合 SO_REUSEPORT)。
void Socket::setReuseAddr(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
-
SOL_SOCKET
: 套接字級別的選項。 -
SO_REUSEADDR
: 地址復用。 -
optval
: 1 為啟用,0 為禁用。
4. SO_REUSEPORT:端口復用
-
場景:
-
多核服務器上,為了充分利用 CPU 資源,通常使用多個進程監聽相同端口。
-
-
問題:
-
未開啟時,多個進程無法綁定同一端口,導致服務擴展性差。
-
-
多核服務器: 高并發服務器提高吞吐量。
-
負載均衡: 允許多個進程共享一個監聽端口。
void Socket::setReusePort(bool on) {
#ifdef SO_REUSEPORTint optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
#elseif (on) {LOG_SYSERR << "SO_REUSEPORT is not supported.";}
#endif
}
5. SO_KEEPALIVE:TCP 保活
-
場景:
-
長時間空閑連接,如客戶端掉線或網絡中斷。
-
-
問題:
-
服務器長時間無法檢測到客戶端斷開。
-
-
長連接服務: 保持連接狀態,防止假死。
-
防止資源浪費: 檢測空閑連接,及時釋放。
void Socket::setKeepAlive(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
}
TIME_WAIT 產生的原因
當主動關閉連接的一方發送 FIN
并接收到對方的 ACK
后,進入 TIME_WAIT
狀態:
-
防止舊數據包干擾新連接:
-
如果立即釋放端口,新連接可能收到上一個連接的殘留數據包。
-
-
確保對方收到 ACK:
-
最后一個 ACK 可能丟失,對方會重新發送 FIN,TIME_WAIT 可以重新應答。
-
3. TIME_WAIT 的問題
在高并發服務器中,TIME_WAIT 數量過多會導致以下問題:
-
端口耗盡:
-
短時間內建立大量連接,導致端口不足。
-
-
重啟服務失敗:
-
上次監聽的端口仍處于
TIME_WAIT
,導致新進程無法重新綁定相同端口。
-
-
資源占用:
-
系統中有大量處于
TIME_WAIT
狀態的連接,浪費內存和 CPU。
-
SO_REUSEADDR 是一個套接字選項,允許服務器在端口處于 TIME_WAIT 時快速重啟。
-
允許端口復用:
-
即使處于
TIME_WAIT
,也能重新綁定相同的端口。
-
-
避免端口占用:
-
當服務器崩潰重啟時,避免地址已被占用的錯誤。
-
?使用示例:TCP 服務器
InetAddress serverAddr(8080);
Socket listenSock(::socket(AF_INET, SOCK_STREAM, 0)); // 創建套接字listenSock.setReuseAddr(true); // 設置地址復用
listenSock.bindAddress(serverAddr); // 綁定端口
listenSock.listen(); // 監聽InetAddress clientAddr;
int connfd = listenSock.accept(&clientAddr); // 接受連接if (connfd >= 0) {printf("New connection from %s\n", clientAddr.toIpPort().c_str());Socket connSocket(connfd); // 管理新連接connSocket.setTcpNoDelay(true); // 禁用 Nagle 算法connSocket.shutdownWrite(); // 關閉寫操作
}
9.Acceptor:監聽客戶端連接并接收新連接
class Acceptor : noncopyable {public:typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);void setNewConnectionCallback(const NewConnectionCallback& cb) {newConnectionCallback_ = cb;}bool listening() const { return listening_; }void listen();private:void handleRead();EventLoop* loop_;Socket acceptSocket_; // 封裝監聽套接字Channel acceptChannel_; // 封裝監聽事件NewConnectionCallback newConnectionCallback_; // 新連接回調bool listening_;
};
//構造函數
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport): loop_(loop),acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),acceptChannel_(loop, acceptSocket_.fd()),listening_(false) {acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(reuseport);acceptSocket_.bindAddress(listenAddr);acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
void Acceptor::handleRead() {InetAddress peerAddr;int connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0) {if (newConnectionCallback_) {newConnectionCallback_(connfd, peerAddr);} else {sockets::close(connfd);}}
}
10. buffer:是存儲從套接字讀入的數據或待發送的數據,并進行高效的讀寫操作
+-------------------+------------------+------------------+
| prependable bytes | readable bytes | writable bytes |
| | (數據區域) | |
+-------------------+------------------+------------------+
0 readerIndex_ writerIndex_ buffer_.size()
class Buffer : copable{
public:static const size_t kCheapPrepend = 8;static const size_t kInitialSize = 1024;explicit Buffer(size_t initialSize = kInitialSize): buffer_(kCheapPrepend + initialSize),readerIndex_(kCheapPrepend),writerIndex_(kCheapPrepend){}size_t readableBytes() const{ return writerIndex_ - readerIndex_; }size_t writableBytes() const{ return buffer_.size() - writerIndex_; }size_t prependableBytes() const{ return readerIndex_; }const char* peek() const{ return begin() + readerIndex_; }//可讀數據起始地址void retrieveAll()//讀寫指針復位{readerIndex_ = kCheapPrepend;writerIndex_ = kCheapPrepend;}void retrieve(size_t len)//讀指針偏移{assert(len <= readableBytes());if (len < readableBytes()){readerIndex_ += len;}else{retrieveAll();}}string retrieveAllAsString(){return retrieveAsString(readableBytes());//將所有可讀數據轉成字符串}string retrieveAsString(size_t len)//將可讀數據轉成字符串{assert(len <= readableBytes());string result(peek(), len);retrieve(len);return result;}void makeSpace(size_t len)//擴容{if (writableBytes() + prependableBytes() < len + kCheapPrepend){// FIXME: move readable databuffer_.resize(writerIndex_+len);}else{// 將可讀數據前移,覆蓋已讀部分assert(kCheapPrepend < readerIndex_);size_t readable = readableBytes();std::copy(begin()+readerIndex_,begin()+writerIndex_,begin()+kCheapPrepend);readerIndex_ = kCheapPrepend;writerIndex_ = readerIndex_ + readable;assert(readable == readableBytes());}}void ensureWritableBytes(size_t len)//判斷是否有足夠空間可寫{if (writableBytes() < len){makeSpace(len);}assert(writableBytes() >= len);}void append(const StringPiece& str)//追加寫內容{append(str.data(), str.size());}void append(const char* /*restrict*/ data, size_t len){ensureWritableBytes(len);std::copy(data, data+len, beginWrite());hasWritten(len);}ssize_t readFd(int fd, int* savedErrno);//從文件描述符讀數據private:char* begin(){ return &*buffer_.begin(); }//數組首元素地址std::vector<char> buffer_; // 底層存儲容器size_t readerIndex_; // 讀起始位置size_t writerIndex_; // 寫起始位置}
ssize_t readFd(int fd, int* savedErrno) {char extraBuf[65536]; // 棧上空間struct iovec vec[2];const size_t writable = writableBytes();vec[0].iov_base = begin() + writerIndex_;vec[0].iov_len = writable;vec[1].iov_base = extraBuf;vec[1].iov_len = sizeof(extraBuf);const int iovcnt = (writable < sizeof(extraBuf)) ? 2 : 1;const ssize_t n = ::readv(fd, vec, iovcnt);if (n < 0) {*savedErrno = errno;} else if (implicit_cast<size_t>(n) <= writable) {writerIndex_ += n;} else {writerIndex_ = buffer_.size();append(extrabuf, n - writable);}return n;
}
?readv
和 writev
是POSIX 提供的系統調用,“分散讀”和"聚集寫"操作,它們用于一次性從文件描述符讀取多個緩沖區的數據,將多個緩沖區數據寫入文件描述符
#include <sys/uio.h>ssize_t readv(int fd, const struct iovec *iov, int iovcnt);
?從?fd
?順序讀取數據,依次填充到?iov[0]
、iovec[1]
... 直到數據讀完或所有緩沖區填滿
struct iovec {void *iov_base; // 緩沖區起始地址size_t iov_len; // 緩沖區長度
};
iovec
?是?readv
?的核心參數,用于指定數據讀取的目標緩沖區。這里設置了?兩個緩沖區:
(1) 主緩沖區(vec[0]
)
vec[0].iov_base = begin() + writerIndex_; // 指向當前可寫位置的起始地址 vec[0].iov_len = writable; // 可寫入的最大字節數
-
目的:
優先將數據直接讀取到?內部主緩沖區?的剩余空間(writerIndex_
?之后的部分)。 -
優點:
-
避免內存拷貝:如果數據能完全放入主緩沖區,無需額外處理。
-
減少內存碎片:復用已有的緩沖區空間。
-
(2) 備用棧緩沖區(vec[1]
)
vec[1].iov_base = extraBuf; // 棧上的臨時緩沖區 vec[1].iov_len = sizeof(extraBuf); // 固定大小(64KB)
-
目的:
如果主緩沖區剩余空間不足(writable
?太小),則?超額數據?會暫存到棧上的?extraBuf
。 -
優點:
-
避免內存浪費:主緩沖區空間不足時,臨時用棧空間兜底(棧分配速度快,且函數退出后自動釋放)。
-
防止丟包:即使主緩沖區滿,也能保證數據不丟失(暫存到?
extraBuf
?后再處理)
-
11.TcpServer
1.TcpServer 的作用
-
管理連接:
-
負責接受客戶端連接,并生成相應的 TcpConnection 對象。
-
-
線程分配:
-
使用 EventLoopThreadPool 進行多線程管理,保證高并發。
-
-
消息處理:
通過回調機制(如連接建立、消息接收、連接關閉)實現事件響應。
class TcpServer : noncopyable {public:typedef std::function<void(EventLoop*)> ThreadInitCallback;enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option = kNoReusePort);~TcpServer();void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }void start(); // 啟動服務器private:void newConnection(int sockfd, const InetAddress& peerAddr);void removeConnection(const TcpConnectionPtr& conn);void removeConnectionInLoop(const TcpConnectionPtr& conn);EventLoop* loop_; // 主線程的事件循環const string ipPort_;const string name_;std::unique_ptr<Acceptor> acceptor_;std::shared_ptr<EventLoopThreadPool> threadPool_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;ThreadInitCallback threadInitCallback_; //線程初始化回調std::atomic started_;int nextConnId_;std::unordered_map<string, TcpConnectionPtr> connections_; // 連接管理
};
枚舉(enum
)是一種用戶自定義的數據類型,它允許為一組整數值賦予有意義的名稱
2. 常見的回調類型
回調名稱 | 類型 | 觸發時機 |
---|---|---|
ConnectionCallback | void(const TcpConnectionPtr&) | 連接建立或斷開時 |
MessageCallback | void(const TcpConnectionPtr&, Buffer*, Timestamp) | 收到數據時 |
WriteCompleteCallback | void(const TcpConnectionPtr&) | 數據發送完成時 |
HighWaterMarkCallback | void(const TcpConnectionPtr&, size_t) | 緩沖區到達高水位線時 |
CloseCallback | void(const TcpConnectionPtr&) | 連接關閉時 |
TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option): loop_(loop),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),threadPool_(new EventLoopThreadPool(loop, name_)),nextConnId_(1) {acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));
}
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {EventLoop* ioLoop = threadPool_->getNextLoop();string connName = name_ + to_string(nextConnId_++);TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, peerAddr));connections_[connName] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
//初始化連接并注冊回調
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {loop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop(); // 獲取下一個 IO 線程char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;std::string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();InetAddress localAddr(sockets::getLocalAddr(sockfd));// 創建 TcpConnection 對象TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));connections_[connName] = conn; // 添加到連接管理表// 設置各種回調conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);// 設置關閉回調,當連接關閉時會自動移除conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));// 在 IO 線程中調用 connectEstablished()ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
//getLocalAddr 來獲取本地監聽地址
struct sockaddr_in6 sockets::getLocalAddr(int sockfd)
{struct sockaddr_in6 localaddr;memZero(&localaddr, sizeof localaddr);socklen_t addrlen = static_cast<socklen_t>(sizeof localaddr);if (::getsockname(sockfd, sockaddr_cast(&localaddr), &addrlen) < 0){LOG_SYSERR << "sockets::getLocalAddr";}return localaddr;
}
TcpServer
類的 removeConnection
和 removeConnectionInLoop
函數用于在連接關閉時從服務器的連接管理中移除相應的 TcpConnection
對象?
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{// FIXME: unsafeloop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{loop_->assertInLoopThread();LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_<< "] - connection " << conn->name();size_t n = connections_.erase(conn->name());(void)n;assert(n == 1);EventLoop* ioLoop = conn->getLoop();ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
TcpServer::~TcpServer()
{loop_->assertInLoopThread();LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";for (auto& item : connections_){TcpConnectionPtr conn(item.second);item.second.reset();conn->getLoop()->runInLoop(std::bind(&TcpConnection::connectDestroyed, conn));}
}
1. 連接綁定線程的原則
每個 TcpConnection
都綁定在一個特定的**事件循環(I/O 線程)**中。
核心思想:
-
一個連接只能在所屬的 I/O 線程中操作,不能跨線程直接操作連接對象。
-
原因:
-
事件循環是單線程運行的,如果其他線程直接操作連接,會破壞線程安全,導致數據競爭或崩潰。
-
2. 為什么不能直接在析構函數中銷毀?
連接的生命周期與服務器不完全一致:
-
服務器關閉時,可能還有活躍連接未完成數據傳輸。
-
直接銷毀可能導致正在傳輸的數據被中斷,或者導致未完成的回調直接崩潰。
跨線程問題:
-
假設:
-
服務器(主線程)正在析構,直接調用
TcpConnection::connectDestroyed()
。 -
但是該連接實際上是在其他 I/O 線程中活躍,直接操作會破壞線程隔離。
-
-
后果:
-
如果直接調用銷毀函數,容易導致訪問未釋放或無效內存,導致程序崩潰。
-
12.TcpConnection
-
連接管理: 負責連接的建立和關閉。
-
數據收發: 提供異步發送和接收數據的接口。
回調類型 | 描述 | 典型設置方式 |
---|---|---|
連接回調 | 當連接建立或斷開時觸發 | setConnectionCallback() |
消息回調 | 當有消息到達時觸發 | setMessageCallback() |
寫完成回調 | 發送緩沖區中的數據完全發送時觸發 | setWriteCompleteCallback() |
高水位回調 | 發送緩沖區數據量超過高水位時觸發 | setHighWaterMarkCallback() |
關閉回調 | 連接關閉時觸發 | setCloseCallback() |
class TcpConnection : public std::enable_shared_from_this<TcpConnection>, noncopyable{
public:TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr);~TcpConnection();const std::string& name() const { return name_; } // 獲取連接名稱const InetAddress& localAddress() const { return localAddr_; } // 獲取本地地址const InetAddress& peerAddress() const { return peerAddr_; } // 獲取對端地址bool connected() const { return state_ == kConnected; } // 檢查是否已連接void setConnectionCallback(const ConnectionCallback& cb);void setMessageCallback(const MessageCallback& cb);void setWriteCompleteCallback(const WriteCompleteCallback& cb);void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark);void setCloseCallback(const CloseCallback& cb);void send(Buffer* buf); // 發送緩沖區內容void shutdown(); // 關閉寫端void connectEstablished(); // 連接建立時調用void connectDestroyed(); // 連接銷毀時調用private:void handleRead(Timestamp receiveTime); // 讀取數據事件void handleWrite(); // 寫入數據事件void handleClose(); // 關閉事件void handleError(); // 錯誤事件void sendInLoop(const void* message, size_t len);void shutdownInLoop();enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };EventLoop* loop_; // 事件循環指針const string name_; // 連接名稱StateE state_; // 連接狀態(連接、斷開、正在連接)bool reading_; // 是否正在讀取std::unique_ptr<Socket> socket_; // 封裝的 TCP 套接字std::unique_ptr<Channel> channel_; // 事件分發器const InetAddress localAddr_; // 本地地址const InetAddress peerAddr_; // 對端地址Buffer inputBuffer_; // 接收緩沖區Buffer outputBuffer_; // 發送緩沖區ConnectionCallback connectionCallback_; // 連接回調MessageCallback messageCallback_; // 消息回調WriteCompleteCallback writeCompleteCallback_; // 寫完成回調HighWaterMarkCallback highWaterMarkCallback_; // 高水位回調CloseCallback closeCallback_; // 關閉回調}
1.std::enable_shared_from_this
std::enable_shared_from_this
是一個輔助類模板,允許在類的成員函數中安全地獲取當前對象的 shared_ptr
std::enable_shared_from_this
的工作原理
-
當
TcpConnection
對象作為shared_ptr
被創建時,enable_shared_from_this
會在內部保存一個弱引用。 -
當需要獲取自身的
shared_ptr
時,調用shared_from_this()
方法即可。 -
這樣可以確保即使在類的成員函數中,也不會錯誤地生成一個新的
shared_ptr
,而是與原來的共享控制塊關聯。
假設你在類方法中直接這樣寫:
std::shared_ptr<MyClass> ptr(this); // 錯誤示范
出現問題的原因:
-
引用計數錯誤:
-
當對象通過
new
創建并使用std::shared_ptr<T>(this)
包裝時,會生成一個新的控制塊,與已有的shared_ptr
無關。 -
這樣做使得同一個對象有兩個獨立的控制塊,各自管理引用計數。
-
-
雙重釋放:
-
當第一個
shared_ptr
析構時,引用計數減為零,釋放內存。 -
第二個
shared_ptr
析構時,再次釋放已經無效的內存,導致崩潰。
-
2.使用智能指針 std::unique_ptr
來管理 Socket 和 Channel 對象
1.資源獨占性:保證 Socket 和 Channel 的唯一所有權
2.自動釋放資源:防止內存泄漏
//處理讀事件,當有數據到達時觸發。
void TcpConnection::handleRead(Timestamp receiveTime) {int savedErrno = 0;ssize_t n = inputBuffer_.readFd(socket_->fd(), &savedErrno);if (n > 0) {messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); // 調用消息回調} else if (n == 0) {handleClose(); // 對端關閉} else {errno = savedErrno;handleError(); // 讀取出錯}
}
void TcpConnection::handleClose() {LOG_INFO << "TcpConnection::handleClose - fd = " << channel_->fd();state_ = kDisconnected;channel_->disableAll(); // 停止監聽所有事件TcpConnectionPtr guardThis(shared_from_this());connectionCallback_(guardThis); // 通知上層連接已關閉closeCallback_(guardThis); // 觸發關閉回調
}
void TcpConnection::send(const std::string &buf)
{if (state_ == kConnected){if (loop_->isInLoopThread()){sendInLoop(message);}else{void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;loop_->runInLoop(std::bind(fp,this, // FIXMEmessage.as_string()));//std::forward<string>(message)));}}
}
sendInLoop()
在網絡編程中用于發送數據,但它是在事件循環線程中執行的。
-
將數據發送到網絡連接上(通過 socket 寫入)。
-
如果無法一次性寫完,則將剩余數據存入輸出緩沖區,等待下次可寫時繼續發送。
void TcpConnection::sendInLoop(const void* data, size_t len)
{loop_->assertInLoopThread(); // 確保在事件循環線程中ssize_t nwrote = 0;size_t remaining = len;bool faultError = false;if (state_ == kDisconnected){LOG_WARN << "disconnected, give up writing";return;}// Step 1: 直接寫入數據(如果可能)if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = sockets::write(channel_->fd(), data, len);if (nwrote >= 0){remaining = len - nwrote;if (remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else {nwrote = 0;if (errno != EWOULDBLOCK){LOG_SYSERR << "TcpConnection::sendInLoop";if (errno == EPIPE || errno == ECONNRESET){faultError = true;}}}}// Step 2: 如果沒有寫完,則保存剩余數據到緩沖區if (!faultError && remaining > 0){size_t oldLen = outputBuffer_.readableBytes();if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);if (!channel_->isWriting()){//可寫事件監聽:表示套接字緩沖區有空閑空間,可以繼續寫入數據。channel_->enableWriting();}}
}
void TcpConnection::handleWrite()
{loop_->assertInLoopThread();if (channel_->isWriting()){ssize_t n = sockets::write(channel_->fd(),outputBuffer_.peek(),outputBuffer_.readableBytes());if (n > 0){outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0){channel_->disableWriting();if (writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}if (state_ == kDisconnecting){shutdownInLoop();}}}else{LOG_SYSERR << "TcpConnection::handleWrite";// if (state_ == kDisconnecting)// {// shutdownInLoop();// }}}else{LOG_TRACE << "Connection fd = " << channel_->fd()<< " is down, no more writing";}
}
??TcpConnection
?數據發送流程總結??
-
??直接發送??
- 當 ??輸出緩沖區為空?? 且 ??socket 不可寫?? 時,直接調用?
write()
?嘗試發送數據。 - 如果全部發送成功,觸發?
writeCompleteCallback_
。
- 當 ??輸出緩沖區為空?? 且 ??socket 不可寫?? 時,直接調用?
-
??緩沖剩余數據??
- 如果未完全發送(或?
write()
?返回?EAGAIN
),剩余數據存入?outputBuffer_
,并注冊?EPOLLOUT
?事件。
- 如果未完全發送(或?
-
??處理可寫事件??
- 當?
EPOLLOUT
?觸發時,handleWrite()
?從?outputBuffer_
?讀取數據繼續發送。 - 如果緩沖區清空,取消?
EPOLLOUT
?監聽,避免忙等待。
- 當?
?將當前對象(TcpConnection
)傳遞給回調函數,以便在回調函數中正確訪問和操作該對象
-
使用
shared_from_this()
獲取當前對象的智能指針(std::shared_ptr
),而不是裸指針。 -
這樣可以在回調函數中持有對象的生命周期,防止對象在回調執行之前被銷毀。
void TcpConnection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);channel_->tie(shared_from_this()); // 防止channel中使用懸空指針// 注冊讀事件channel_->enableReading();// 調用連接建立回調,通知上層用戶連接已經建立connectionCallback_(shared_from_this());
}
void TcpConnection::connectDestroyed() {loop_->assertInLoopThread();if (state_ == kConnected) {setState(kDisconnected);channel_->disableAll(); // 取消所有事件監聽// 調用連接關閉回調,通知上層用戶連接已關閉connectionCallback_(shared_from_this());}channel_->remove(); // 從 Poller 中移除
}
void TcpConnection::shutdown()
{// FIXME: use compare and swapif (state_ == kConnected){setState(kDisconnecting);// FIXME: shared_from_this()?loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}void TcpConnection::shutdownInLoop()
{loop_->assertInLoopThread();if (!channel_->isWriting()) //沒有數據發送(寫關閉){socket_->shutdownWrite();}
}