1.Reactor介紹
1.回調函數
**回調(Callback)**是一種編程技術,允許將一個函數作為參數傳遞給另一個函數,并在適當的時候調用該函數
1.工作原理
-
定義回調函數
-
注冊回調函數
- 觸發回調
2.優點
-
異步編程
-
回調函數允許在事件發生時執行特定的邏輯,而無需阻塞主線程,特別適合實現異步編程。
-
例如,在網絡編程中,當數據可讀或可寫時調用回調函數,而無需阻塞等待。
-
-
事件驅動
-
回調函數是事件驅動編程的核心機制,可以高效地處理事件,提高系統的響應速度和吞吐量。
-
例如,在 GUI 編程中,按鈕點擊事件會觸發回調函數。
-
-
代碼復用
-
回調函數可以獨立定義和復用,提高了代碼的可復用性和可維護性。
-
例如,同一個回調函數可以用于多個事件處理。
-
3.缺點
-
回調地獄:?當回調函數嵌套過深時,代碼結構會變得混亂,難以理解和維護,這種現象被稱為“回調地獄”。
-
錯誤處理復雜
-
回調函數中的錯誤處理邏輯需要分散在各個回調函數中,增加了錯誤處理的復雜性。
-
例如,每個回調函數都需要單獨處理錯誤。
-
-
調試困難
-
回調函數的調用鏈可能很長,調試時難以跟蹤回調函數的執行順序和上下文。
-
3.回調函應用
1. Linux 信號捕捉
在 Linux 系統中,信號(Signal)是一種軟件中斷機制,用于通知進程發生了某些事件。進程可以通過信號處理函數(即回調函數)來響應這些事件。
#include <stdio.h>
#include <signal.h>
#include <unistd.h>// 定義信號處理函數(回調函數)
void signal_handler(int sig) {printf("Received signal %d\n", sig);
}int main() {// 注冊信號處理函數signal(SIGINT, signal_handler); // 捕捉 Ctrl+C 信號signal(SIGTERM, signal_handler); // 捕捉 kill 信號printf("Process ID: %d\n", getpid());printf("Press Ctrl+C to send SIGINT signal\n");// 無限循環,等待信號while (1) {sleep(1);}return 0;
}
-
定義回調函數:
signal_handler
是一個回調函數,用于處理信號事件。它接收一個參數sig
,表示接收到的信號編號。 -
注冊回調函數:使用
signal
函數將特定信號(如SIGINT
、SIGTERM
)與回調函數關聯起來。當進程接收到這些信號時,系統會自動調用注冊的回調函數。 -
觸發回調:當進程接收到指定的信號時,操作系統會中斷當前執行的代碼,調用注冊的回調函數。回調函數執行完成后,進程繼續執行。
2. 多線程線程創建
在多線程編程中,線程創建時通常需要指定一個回調函數,該函數在線程啟動時被調用。
#include <iostream>
#include <thread>// 定義線程回調函數
void thread_function(int id) {std::cout << "Thread " << id << " is running\n";
}int main() {// 創建線程并指定回調函數std::thread t1(thread_function, 1);std::thread t2(thread_function, 2);// 等待線程完成t1.join();t2.join();return 0;
}
-
定義回調函數:
thread_function
是一個回調函數,用于在線程啟動時執行。它接收一個參數id
,表示線程的編號。 -
創建線程并注冊回調函數:使用
std::thread
構造函數創建線程時,將回調函數和參數傳遞給線程對象。線程啟動時會自動調用該回調函數。 -
觸發回調:線程啟動時,系統會調用注冊的回調函數,并將線程參數傳遞給它。
3. Qt 信號槽機制
Qt 是一個跨平臺的 C++ 應用程序開發框架,廣泛用于開發圖形用戶界面(GUI)應用程序。Qt 的信號槽機制是一種基于回調的事件處理機制。
#include <QApplication>
#include <QPushButton>// 定義槽函數(回調函數)
void buttonClicked() {std::cout << "Button clicked!\n";
}int main(int argc, char *argv[]) {QApplication app(argc, argv);QPushButton button("Click Me");button.show();// 連接信號和槽QObject::connect(&button, &QPushButton::clicked, buttonClicked);return app.exec();
}
-
定義槽函數:
buttonClicked
是一個槽函數(回調函數),用于處理按鈕點擊事件。 -
連接信號和槽:使用
QObject::connect
函數將按鈕的clicked
信號與槽函數關聯起來。當按鈕被點擊時,系統會自動調用槽函數。 -
觸發回調:當用戶點擊按鈕時,按鈕會發出
clicked
信號,Qt 的事件循環會檢測到該信號并調用注冊的槽函數。
4. 可調用對象包裝器綁定器
在 C++ 中,可以使用 std::function
和 std::bind
來創建可調用對象包裝器和綁定器,從而實現更靈活的回調機制。(不需要函數指針,將函數包裝成可調用對象)
#include <iostream>
#include <functional>
#include <thread>// 定義回調函數
void callback_function(int a, int b) {std::cout << "Callback called with arguments: " << a << " and " << b << std::endl;
}int main() {// 使用 std::bind 綁定回調函數和參數auto bound_callback = std::bind(callback_function, 10, 20);// 使用 std::function 包裝回調函數std::function<void()> callback = bound_callback;// 在線程中調用回調函數std::thread t(callback);t.join();return 0;
}
-
定義回調函數:
callback_function
是一個回調函數,接收兩個參數a
和b
。 -
綁定回調函數和參數:使用
std::bind
將回調函數和參數綁定在一起,生成一個可調用對象。 -
包裝回調函數:使用
std::function
將綁定后的可調用對象包裝起來,使其可以作為回調函數使用。 -
觸發回調:在新線程中調用包裝后的回調函數。
2.Reactor 模型
基于 Reactor 模型是一種高效的事件驅動編程模式,廣泛應用于網絡編程和并發處理。它通過監聽事件(如 I/O 事件、定時事件等)并觸發相應的回調函數來處理這些事件,從而實現高效的并發處理。Reactor 模型的核心思想是將事件處理邏輯與事件監聽邏輯分離,使得系統能夠高效地處理大量并發事件。
1..Reactor 模型的核心組件
-
事件分發器(Event Demultiplexer)
-
負責監聽和分發事件。它通常使用操作系統提供的 I/O 多路復用機制(如
select
、poll
、epoll
等)來同時監聽多個文件描述符(如套接字)上的事件。 -
當某個文件描述符上有事件發生時(如新連接、數據可讀、數據可寫等),事件分發器會將事件通知給相應的事件處理器。
-
-
事件處理器(Event Handler)
-
是與特定事件相關聯的回調函數或對象。當事件分發器檢測到某個事件時,會調用相應的事件處理器來處理該事件。
-
事件處理器通常包括以下幾種類型:
-
接受連接處理器(Acceptor):負責處理新連接事件,通常用于服務器端接受客戶端的連接請求。
-
讀事件處理器(Reader):負責處理數據可讀事件,從套接字中讀取數據。
-
寫事件處理器(Writer):負責處理數據可寫事件,將數據寫入套接字。
-
定時器處理器(Timer Handler):負責處理定時事件,用于實現定時任務。
-
-
-
事件循環(Event Loop)
-
是 Reactor 模型的運行時核心,負責驅動整個事件處理流程。它通常是一個無限循環,持續監聽事件并調用相應的事件處理器。
-
事件循環的主要步驟包括:
-
調用事件分發器,獲取已經就緒的事件。
-
遍歷就緒事件列表,調用每個事件對應的事件處理器。
-
處理完所有事件后,繼續循環監聽新的事件。
-
-
2.?
3.Reactor 模型的優點
-
高并發處理能力
-
通過事件驅動和回調機制,Reactor 模型可以高效地處理大量并發事件,而無需為每個連接創建一個線程或進程。
-
特別適合處理高并發的網絡服務,如 Web 服務器、聊天服務器等。
-
-
資源利用率高
-
事件分發器使用 I/O 多路復用機制,可以同時監聽多個文件描述符,減少了系統資源的浪費。
-
事件處理器通常是非阻塞的,不會阻塞事件循環,從而提高了系統的響應速度和吞吐量。
-
-
代碼結構清晰
-
Reactor 模型將事件處理邏輯與事件監聽邏輯分離,使得代碼結構更加清晰,易于維護和擴展。
-
事件處理器可以獨立開發和測試,提高了代碼的可復用性。
-
4.Reactor 模型的缺點
-
回調地獄
-
由于事件處理邏輯是通過回調函數實現的,當事件處理邏輯復雜時,可能會導致回調嵌套過深,形成“回調地獄”,使代碼難以理解和維護。
-
例如,多個異步操作的嵌套回調可能會導致代碼結構混亂。
-
-
錯誤處理復雜
-
在事件驅動的編程模型中,錯誤處理邏輯需要分散在各個事件處理器中,增加了錯誤處理的復雜性。
-
例如,網絡錯誤、文件讀寫錯誤等需要在每個事件處理器中單獨處理。
-
-
不適合 CPU 密集型任務
-
Reactor 模型主要適用于 I/O 密集型任務,對于 CPU 密集型任務,事件循環可能會被阻塞,從而影響系統的性能。
-
例如,計算密集型任務可能會導致事件循環無法及時處理新的事件。
-
5.Reactor 模型的應用場景
Reactor 模型廣泛應用于高性能網絡編程和并發處理場景,以下是一些典型的應用場景:
-
Web 服務器
-
Web 服務器需要同時處理大量客戶端的 HTTP 請求,Reactor 模型可以高效地處理并發請求,提高服務器的吞吐量。
-
例如,Nginx 和 Node.js 都采用了類似的事件驅動模型。
-
-
聊天服務器
-
聊天服務器需要實時處理用戶的連接和消息,Reactor 模型可以快速響應用戶的輸入和消息推送。
-
例如,基于 WebSocket 的聊天服務可以利用 Reactor 模型實現高效的通信。
-
-
游戲服務器
-
游戲服務器需要處理大量玩家的實時交互,Reactor 模型可以高效地處理玩家的輸入和游戲邏輯。
-
例如,多人在線游戲服務器可以利用 Reactor 模型實現高并發處理。
-
-
分布式系統
-
分布式系統中的節點需要高效地通信和同步數據,Reactor 模型可以用于實現高效的網絡通信。
-
例如,分布式緩存系統和分布式數據庫可以利用 Reactor 模型實現高效的節點間通信。
-
6.Reactor 模型的實現示例
以下是一個簡單的 Reactor 模型實現示例,使用 C++ 和 epoll
機制:
#include <iostream>
#include <vector>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>// 設置文件描述符為非阻塞模式
void setNonBlocking(int fd) {int flags = fcntl(fd, F_GETFL, 0);flags |= O_NONBLOCK;fcntl(fd, F_SETFL, flags);
}// 事件處理器接口
class EventHandler {
public:virtual void handleEvent(int fd) = 0;virtual ~EventHandler() {}
};// Reactor 類
class Reactor {
public:Reactor() : epoll_fd(epoll_create1(0)) {}~Reactor() {close(epoll_fd);}// 注冊事件處理器void registerHandler(int fd, EventHandler* handler) {setNonBlocking(fd);struct epoll_event ev;ev.events = EPOLLIN | EPOLLET; // 邊緣觸發,監聽讀事件ev.data.ptr = handler;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);}// 運行事件循環void run() {std::vector<struct epoll_event> events(16);while (true) {int nfds = epoll_wait(epoll_fd, events.data(), events.size(), -1);for (int i = 0; i < nfds; ++i) {EventHandler* handler = static_cast<EventHandler*>(events[i].data.ptr);handler->handleEvent(events[i].data.fd);}}}private:int epoll_fd;
};// 示例事件處理器
class MyEventHandler : public EventHandler {
public:void handleEvent(int fd) override {std::cout << "Event on fd: " << fd << std::endl;// 處理事件邏輯,例如讀取數據}
};int main() {Reactor reactor;int fd = 0; // 示例文件描述符,實際應用中可以是套接字MyEventHandler handler;reactor.registerHandler(fd, &handler);reactor.run();return 0;
}
在這個示例中,Reactor
類負責管理事件循環和事件分發器(epoll
),EventHandler
是事件處理器的接口,MyEventHandler
是具體的事件處理器實現。通過調用 registerHandler
方法,可以將文件描述符和事件處理器注冊到 Reactor 中,然后運行事件循環來處理事件。
3.為什么Reactor適合高并發
Reactor 模式之所以適合高并發場景,主要是因為它采用了事件驅動的方式,避免了傳統多線程/多進程模型的高開銷。
在高并發情況下,如果采用每個連接創建一個線程(One Thread Per Connection)的方式,會導致:
-
線程上下文切換開銷大。
-
線程資源(如棧、調度等)消耗過多。
-
操作系統管理大量線程的開銷增加。
而 Reactor 模型 通過 I/O 多路復用 機制(如 epoll
)實現高效的事件處理,能夠支撐大規模連接。
Reactor 適合高并發的關鍵點
1. 基于 I/O 多路復用,避免阻塞
傳統的阻塞 I/O 需要為每個連接創建一個線程來處理 I/O 操作,導致線程資源浪費。而 Reactor 模式使用 epoll/kqueue
這類高效的 I/O 多路復用技術,可以讓單線程高效管理上萬個連
-
epoll_wait
只需少量線程,即可監聽成千上萬個連接的事件變化。 -
當某個 socket 可讀/可寫時,Reactor 僅對其處理,而不會阻塞其他連接。
2. 避免頻繁的線程切換
-
傳統
One Thread Per Connection
每個請求創建一個線程,線程上下文切換(context switch)是性能瓶頸。 -
Reactor 采用少量線程來管理大量連接,避免了過多的線程切換開銷。
對比:
方案 | 線程數 | 線程切換 | 適用場景 |
---|---|---|---|
每連接一個線程 | N (連接數) | 高 | 低并發場景 |
線程池 | M(固定數量) | 中 | 中等并發 |
Reactor (epoll) | 1 或 M(少量) | 低 | 高并發 |
3. 高效的任務分發
Reactor 只負責事件分發,不會阻塞:
-
當有連接可讀,Reactor 觸發
onMessage()
處理數據。 -
當有新連接,Reactor 觸發
onConnection()
處理新連接。 -
實際業務邏輯可以交給工作線程池執行,避免阻塞 Reactor 主線程。
4. 單線程或多線程模式可擴展
-
小規模服務:單線程 Reactor,所有 I/O 和計算在一個線程內完成。
-
大規模服務:多線程 Reactor,主線程負責 I/O,工作線程池處理業務邏輯。
?2.Moduo網絡庫簡介
-
開發背景:muduo是陳碩個人開發的TCP網絡編程庫,主要用于Linux平臺。
-
編程模型:它基于Reactor模式,支持多線程。其核心設計是一個線程一個事件循環(one loop per thread),即一個線程只能有一個事件循環(EventLoop),而一個文件描述符(fd)只能由一個線程進行讀寫。
-
主要特性:
-
使用非阻塞IO和事件驅動。
-
提供了線程池(ThreadPool)來處理耗時的計算任務。
-
支持TCP連接的生命周期管理。
-
提供了豐富的回調機制,用于處理連接建立、消息到達、連接斷開等事件。
-
-
適用場景:適用于開發高并發的TCP服務器,將網絡I/O與業務代碼分開
muduo的主要組件
-
EventLoop:事件循環,負責監聽事件并調用對應的回調函數。
-
Poller:事件監聽器,負責檢測事件是否觸發,muduo默認使用epoll實現。
-
TcpServer:用于創建TCP服務器,管理連接和事件循環。
-
TcpConnection:表示一個TCP連接,封裝了連接的讀寫操作。
-
Acceptor:負責接收新連接,并將其分配到子事件循環(subReactor)中。
muduo的優勢
-
高性能:基于Linux的epoll機制,能夠高效地處理大量并發連接。
-
易用性:提供了簡潔的API,方便開發者快速實現網絡服務。
-
可擴展性:支持多線程模型,可以根據CPU核心數靈活擴展。
?1.Muduo 的特點
1.基于 Reactor 模型
-
采用 epoll(多路復用)實現事件驅動的 I/O 處理。
-
高效的事件循環(
EventLoop
)避免了傳統阻塞式編程的缺陷。
2.多線程友好
-
采用 one loop per thread(每個線程一個
EventLoop
)的方式,避免鎖爭用,提高并發性能。 -
提供
EventLoopThreadPool
,支持多線程調度。
3.非阻塞 + 事件驅動
-
采用 非阻塞套接字 + 邊緣觸發(ET 模式),降低 CPU 負擔,提高吞吐量。
-
配合定時器(
TimerQueue
)支持高精度定時任務。
在網絡服務器開發中,經常需要 定時任務,比如:
-
定期清理空閑連接(避免資源浪費)
-
定時發送心跳包(保持連接活躍)
-
任務超時檢測(如 HTTP 請求超時)
TimerQueue
主要功能
-
支持 一次性定時任務(只執行一次)
-
支持 周期性定時任務(定期觸發)
-
使用
timerfd
觸發定時事件,精度高 -
所有定時任務都在
EventLoop
線程中執行,線程安全
4.現代 C++ 設計
-
遵循 RAII 原則,避免資源泄露。
-
使用
boost::noncopyable
防止對象拷貝,提高安全性。 -
提供智能指針(如
std::shared_ptr
)管理連接生命周期,避免懸空指針問題。
5.高效的 Buffer 設計?
-
Buffer
類封裝了 零拷貝 機制,提高數據讀寫性能。
零拷貝機制是一種優化技術,旨在減少數據在內存中的拷貝次數,從而提高數據傳輸的效率。在傳統的數據傳輸過程中,數據通常需要在多個緩沖區之間進行多次拷貝,這不僅增加了 CPU 的負擔,還可能導致性能瓶頸。零拷貝機制通過減少這些不必要的拷貝操作,顯著提高了數據傳輸的性能。
1.傳統數據傳輸方式
在傳統的數據傳輸中,數據通常需要經過多次拷貝。例如,從磁盤讀取數據并發送到網絡,可能涉及以下步驟:
-
從磁盤讀取數據到內核緩沖區:
-
數據從磁盤讀取到內核的緩沖區。
-
這一步通常由
read
系統調用完成。
-
-
從內核緩沖區拷貝到用戶空間緩沖區:
-
數據從內核緩沖區拷貝到用戶空間的緩沖區。
-
這一步通常由
read
系統調用完成。
-
-
從用戶空間緩沖區拷貝到內核緩沖區:
-
數據從用戶空間的緩沖區拷貝回內核緩沖區。
-
這一步通常由
write
系統調用完成。
-
-
從內核緩沖區發送到網絡:
-
數據從內核緩沖區發送到網絡。
-
這一步通常由
write
系統調用完成。
-
這種傳統的數據傳輸方式涉及多次數據拷貝,增加了 CPU 的負擔,降低了性能。
2.零拷貝機制的優勢
零拷貝機制通過減少數據在內存中的拷貝次數,顯著提高了數據傳輸的效率。具體優勢包括:
-
減少 CPU 負擔:
-
減少了數據在內核空間和用戶空間之間的拷貝操作,降低了 CPU 的使用率。
-
-
提高吞吐量:
-
減少了數據傳輸的延遲,提高了系統的吞吐量。
-
-
減少內存使用:
-
減少了不必要的內存分配和釋放,提高了內存的使用效率。
-
3.零拷貝機制的實現方式
1. 使用?mmap
?和?sendfile
mmap
和 sendfile
是 Linux 提供的兩種零拷貝機制,它們可以顯著減少數據在內存中的拷貝次數。
-
mmap
:-
mmap
將文件映射到用戶空間的內存中,允許直接訪問文件內容,而無需通過read
系統調用。 -
數據可以直接從用戶空間的內存發送到網絡,減少了內核空間和用戶空間之間的拷貝。
-
-
sendfile
:-
sendfile
是一個系統調用,允許直接從一個文件描述符(通常是文件)發送數據到另一個文件描述符(通常是套接字)。 -
數據直接從內核緩沖區發送到網絡,無需經過用戶空間,減少了數據拷貝。
-
2. 使用?splice
splice
是另一種零拷貝機制,允許在兩個文件描述符之間直接傳輸數據,而無需經過用戶空間。splice
可以用于管道(pipe)和套接字之間的數據傳輸。
3. 使用?writev
?和?readv
writev
和 readv
是 Linux 提供的 I/O 系統調用,允許將多個緩沖區的數據一次性寫入或讀取,減少了系統調用的次數。
-
writev
:-
允許將多個緩沖區的數據一次性寫入文件描述符。
-
數據在內核空間中直接拼接,減少了用戶空間和內核空間之間的拷貝。
-
-
readv
:-
允許從文件描述符一次性讀取數據到多個緩沖區。
-
數據在內核空間中直接拆分,減少了用戶空間和內核空間之間的拷貝。
-
4.muduo
?中的零拷貝機制
muduo
網絡庫通過以下方式實現了零拷貝機制:
-
使用
writev
和readv
:-
muduo
的Buffer
類在讀取和寫入數據時,使用readv
和writev
系統調用,減少了數據在內核空間和用戶空間之間的拷貝。
-
-
支持 預留空間,減少內存分配次數。
1. 預留空間(Prepend Space)
muduo::Buffer
類在設計上預留了一段空間(kCheapPrepend
),通常為 8 字節。這段預留空間位于緩沖區的開頭,用于優化小數據寫入操作。
-
優化小數據寫入:當需要寫入少量數據時,可以直接利用預留空間,避免了內存移動操作。
-
減少內存分配次數:通過預留空間,可以在不頻繁擴容的情況下,高效地處理小數據的寫入。
2. 動態擴容策略
muduo::Buffer
類支持動態擴容,當可寫空間不足時,會根據當前數據量的大小動態調整緩沖區的大小。這種策略考慮了整個緩沖區的使用情況,包括預留空間和已讀數據區域,從而避免了不必要的內存分配。
-
擴容條件:當可寫空間不足時,
Buffer
會根據當前數據量的大小動態調整緩沖區的大小。 -
擴容策略:擴容時,
Buffer
會考慮預留空間和已讀數據區域,從而避免了不必要的內存分配。
3. 內存分配優化
通過預留空間和動態擴容策略,muduo::Buffer
類顯著減少了內存分配次數,從而提高了性能。
-
減少內存分配次數:通過預留空間和動態擴容策略,減少了不必要的內存分配。
-
提高性能:減少了內存分配次數,從而提高了數據讀寫的性能。
6.支持 TCP 服務器 & 客戶端
-
封裝了
TcpServer
和TcpClient
,簡化開發,降低網絡編程的復雜度。
7.集成日志系統
-
提供高效的日志模塊
muduo::Logger
,支持日志等級、文件輸出等。
muduo
網絡庫提供了一個高效且功能豐富的日志模塊 muduo::Logger
,支持日志等級、文件輸出、異步日志記錄等功能。以下是其主要特點和使用方法:
1. 日志等級
muduo::Logger
提供了多種日志級別,包括 TRACE
、DEBUG
、INFO
、WARN
、ERROR
和 FATAL
。這些級別允許開發者根據日志的重要性進行篩選,從而在不同環境下控制日志輸出的詳細程度。
2. 異步日志記錄
muduo
的日志系統采用異步日志記錄機制,通過 AsyncLogging
類實現。這種方式可以有效防止日志輸出阻塞程序的正常運行。AsyncLogging
使用獨立的線程進行日志寫入,主線程只需將日志消息放入隊列,由后臺線程負責寫入磁盤。
3. 文件輸出與滾動策略
muduo
支持將日志輸出到文件,并提供了日志文件滾動功能。這包括基于文件大小和時間的滾動策略:
-
基于大小的滾動:當日志文件達到指定大小時,會自動創建新的日志文件。
-
基于時間的滾動:支持按天滾動日志文件,每天生成一個新的日志文件。
4. 日志格式與自定義輸出
muduo::Logger
支持自定義日志格式,開發者可以根據需要配置日志的輸出格式。此外,還可以通過設置輸出函數,將日志輸出到控制臺、文件或其他目標。
2.Muduo 的 one loop per thread
?
-
Main Reactor(主Reactor) 只負責接收新連接,不處理數據讀寫。
-
Sub Reactors(子Reactor 線程池) 處理已建立連接的讀寫、回調、計算等任務。
-
每個線程一個 EventLoop(one loop per thread),保證 Reactor 線程間互不干擾。
1. 什么是 poll 的大小?
這里的 poll 指的是 epoll
的監聽對象數,即每個 EventLoop 線程最多能管理的連接數。
-
poll
(或epoll_wait
)用來監聽 socket 事件,比如可讀、可寫、新連接等。 -
每個 Reactor(線程)都有一個
epoll
實例,監聽其負責的所有連接。 -
“大小固定” 指的是:在
one loop per thread
設計下,每個線程的epoll
管理的連接數受限于 CPU 資源。
2. 為什么要根據 CPU 數目確定?
Muduo 采用 多線程 + 負載均衡 來充分利用 CPU 資源:
-
一臺服務器的 CPU 核心數 = 最佳 Sub Reactor 線程數。
-
每個 CPU 核心運行一個 Sub Reactor 線程,即每個 CPU 只運行一個
epoll
,避免線程間競爭,提高性能。
這樣做的好處是:
-
最大化 CPU 利用率:每個 CPU 只運行一個 Reactor 線程,避免線程爭用 CPU 資源。
-
減少鎖競爭:每個
EventLoop
線程獨立運行,不需要鎖來同步訪問共享資源。 -
負載均衡:新的連接采用 Round-Robin(輪詢)方式分配給 Reactor 線程,均勻分布負載。
如果服務器有 4 核 CPU,Muduo 的線程模型如下:
1 個主 Reactor 線程(MainReactor
)—— 負責 accept()
接收新連接,并分發給 Sub Reactor 線程。
4 個子 Reactor 線程(SubReactors
)—— 處理 I/O 事件(read/write
)。
業務線程池(可選)—— 處理耗時任務,如數據庫操作、計算等(避免阻塞 Sub Reactor 線程)
3.Moduo使用
1.小規模服務:單線程 Reactor,所有 I/O 和計算在一個線程內完成。
#include <functional> // 包含 std::bind 和 std::placeholders 等功能
#include <iostream> // 包含標準輸入輸出流
#include <muduo/base/Logging.h> // 包含 Muduo 的日志模塊
#include <muduo/net/EventLoop.h> // 包含事件循環類
#include <muduo/net/TcpServer.h> // 包含 TCP 服務器類
#include <string> // 包含標準字符串類namespace muduo {
namespace net {// 定義一個 EchoServer 類,用于創建回聲服務器
class EchoServer {
public:// 構造函數,初始化服務器EchoServer(muduo::net::EventLoop *loop, // 事件循環對象const muduo::net::InetAddress &addr, // 服務器地址和端口const std::string &name) // 服務器名稱: server_(loop, addr, name), // 初始化 TcpServer 對象loop_(loop) // 保存事件循環對象的指針{// 注冊連接回調函數// 當有新的客戶端連接或斷開連接時,調用 EchoServer::OnConnect 函數server_.setConnectionCallback(std::bind(&EchoServer::OnConnect, this, std::placeholders::_1));// 注冊消息回調函數// 當接收到客戶端發送的消息時,調用 EchoServer::OnMessage 函數server_.setMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));}// 啟動服務器void start() {server_.start(); // 調用 TcpServer 的 start 方法啟動服務器}private:// 消息回調函數void OnMessage(const muduo::net::TcpConnectionPtr &conn, // 客戶端連接對象muduo::net::Buffer *buff, // 消息緩沖區muduo::Timestamp time) // 消息到達的時間{std::string str = buff->retrieveAllAsString(); // 從緩沖區中讀取消息conn->send(str); // 將消息原樣返回給客戶端conn->shutdown(); // 關閉連接的寫端,表示不再發送數據}// 連接回調函數void OnConnect(const muduo::net::TcpConnectionPtr &conn) {if (conn->connected()) {// 如果客戶端連接成功,打印客戶端和服務器的地址信息std::cout << conn->peerAddress().toIpPort() // 客戶端地址和端口<< " -> " << conn->localAddress().toIpPort() // 服務器地址和端口<< " state: online" << std::endl;} else {// 如果客戶端斷開連接,打印相關信息std::cout << conn->peerAddress().toIpPort() // 客戶端地址和端口<< " -> " << conn->localAddress().toIpPort() // 服務器地址和端口<< " state: offline" << std::endl;conn->shutdown(); // 關閉連接}}muduo::net::EventLoop *loop_; // 事件循環對象指針muduo::net::TcpServer server_; // TCP 服務器對象
};} // namespace net
} // namespace muduoint main() {muduo::net::EventLoop loop; // 創建事件循環對象muduo::net::InetAddress addr("127.0.0.1", 8080); // 定義服務器地址和端口muduo::net::EchoServer server(&loop, addr,"EchoServer"); // 創建 EchoServer 對象server.start(); // 啟動服務器,監聽新連接,并注冊事件回調loop.loop(); // 進入 Reactor 事件循環,不斷監聽和處理事件return 0;
}
std::bind創建一個“綁定器”,將一個函數(或成員函數)和它的參數綁定在一起,形成一個新的可調用對象
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>using namespace muduo;
using namespace muduo::net;class EchoClient {
public:EchoClient(EventLoop* loop, const InetAddress& serverAddr): loop_(loop),client_(loop, serverAddr, "EchoClient"),connectionEstablished_(false) {// 設置連接回調函數client_.setConnectionCallback(std::bind(&EchoClient::onConnection, this, std::placeholders::_1));// 設置消息回調函數client_.setMessageCallback(std::bind(&EchoClient::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}void connect() {client_.connect();}void send(const std::string& message) {if (connectionEstablished_) {conn_->send(message);} else {std::cerr << "Connection not established yet." << std::endl;}}private:void onConnection(const TcpConnectionPtr& conn) {conn_ = conn;if (conn->connected()) {connectionEstablished_ = true;std::cout << "Connected to server." << std::endl;} else {connectionEstablished_ = false;std::cout << "Disconnected from server." << std::endl;}}void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time) {std::string message = buf->retrieveAllAsString();std::cout << "Received message from server: " << message << std::endl;}EventLoop* loop_;TcpClient client_;TcpConnectionPtr conn_;bool connectionEstablished_;
};int main() {EventLoop loop;InetAddress serverAddr("127.0.0.1", 8080); // 服務器地址和端口EchoClient client(&loop, serverAddr);client.connect(); // 連接到服務器// 發送一條消息到服務器client.send("Hello, server!");loop.loop(); // 開始事件循環,讓客戶端開始運行return 0;
}
2.大規模服務:多線程 Reactor,主線程負責 I/O,工作線程池處理業務邏輯
#include <iostream>
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpServer.h>
#include <string>using namespace muduo;
using namespace muduo::net;class EchoServer {
public:EchoServer(EventLoop *loop, const InetAddress &listenAddr, int numThreads): server_(loop, listenAddr, "EchoServer") {// 設置線程池的線程數量server_.setThreadNum(numThreads);// 設置連接回調函數server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, std::placeholders::_1));// 設置消息回調函數server_.setMessageCallback(std::bind(&EchoServer::onMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));}void start() { server_.start(); }private:void onConnection(const TcpConnectionPtr &conn) {if (conn->connected()) {LOG_INFO << "Connection UP: " << conn->peerAddress().toIpPort();} else {LOG_INFO << "Connection DOWN: " << conn->peerAddress().toIpPort();}}void onMessage(const TcpConnectionPtr &conn, Buffer *buf,Timestamp receiveTime) {std::string msg(buf->retrieveAllAsString());LOG_INFO << "Received message from " << conn->peerAddress().toIpPort()<< ": " << msg;conn->send(msg); // 回聲}TcpServer server_;
};int main(int argc, char *argv[]) {LOG_INFO << "pid = " << getpid();if (argc > 1) {EventLoop loop;InetAddress listenAddr(8080);int numThreads = argc > 2 ? atoi(argv[2]) : 3; // 默認使用 3 個線程EchoServer server(&loop, listenAddr, numThreads);server.start();loop.loop();} else {std::cerr << "Usage: " << argv[0] << " <port> [num_threads]" << std::endl;}return 0;
}
3.日志
Muduo 日志模塊支持多種日志級別,每種級別用于記錄不同嚴重程度的信息。以下是 Muduo 日志模塊支持的日志級別及其用途:
1.?TRACE
-
用途:記錄最詳細的信息,通常用于開發和調試階段。
-
LOG_TRACE << "This is a trace message.";
2.?DEBUG
-
用途:記錄調試信息,用于開發和調試階段。
-
LOG_DEBUG << "This is a debug message.";
3.?INFO
-
用途:記錄正常運行時的重要信息。
-
LOG_INFO << "This is an info message.";
4.?WARN
-
用途:記錄警告信息,表示程序可能存在問題,但不會影響正常運行。
-
LOG_WARN << "This is a warning message.";
5.?ERROR
-
用途:記錄錯誤信息,表示程序運行中出現了嚴重問題。
-
LOG_ERROR << "This is an error message.";
6.?FATAL
-
用途:記錄致命錯誤,通常會導致程序崩潰。
-
LOG_FATAL << "This is a fatal error message.";
日志級別從低到高依次為:
TRACE < DEBUG < INFO < WARN < ERROR < FATAL
配置日志級別
你可以通過配置文件或環境變量來設置日志的最低輸出級別。例如,如果你只想輸出 INFO
及以上級別的日志,可以設置日志級別為 INFO
。
?3.CMake
add_executable(server server.cpp)# 鏈接Muduo庫
target_link_libraries(server "/usr/local/lib/libmuduo_net.a" "/usr/local/lib/libmuduo_base.a"
)