高并發服務器項目總結
- 模塊關系圖
- 項目工具模塊
- 緩沖區模塊
- 通用類型模塊
- 套接字socket模塊
- 信道Channel模塊
- 多路轉接Poller模塊
- Reactor模塊
- 時間輪TimeWheel模塊
- 連接Connection模塊
- Accepter模塊
- TcpServer模塊
模塊關系圖
項目工具模塊
緩沖區模塊
底層使用字符容器儲存數據,通過兩個指針:讀偏移和寫偏移管理空間:
- 對于寫入:確保空間足夠,在寫偏移后拷貝數據,寫偏移向后移動
- 對于讀取:確保可讀數據足夠,在讀偏移后讀取數據,讀偏移向后移動
- 空間不足時及時擴容
通用類型模塊
想要實現的效果:
Any a;
a = 123;
a = "123"
a = vector<int>(n , 0);
為了實現這種效果,首先Any類肯定不能帶有模版參數,不然就不能自由轉換了!
- 可以在內部設計一個內部類placeholder(繼承holder父類),它是一個模板類,是實際儲存數據的對象;
- 當我們創建了一個Any對象時,會創建一個父類指針,管理placeholder對象
- 通過模版函數,我們可以實現對placeholder對象賦值。
- 可以通過交換placeholder對象管理的資源實現自由賦值。
套接字socket模塊
這個模塊對外提供一鍵創建服務端套接字和客戶端套接字的方法(是對內部接口的一個封裝):
- 創建套接字接口:以數據流的方式創建一個套接字文件
- 綁定端口信息
- 進入監聽模式
- 發起連接:組織目標端口信息,進行申請連接
- 啟動地址重用:可以快速重啟服務,跳過系統釋放端口的時間;允許多個 socket 監聽相同的地址和端口;避免地址已在使用錯誤。
- 設置非阻塞讀取
對外提供的接口
- 構建服務端套接字:創建套接字,設置為非阻塞,將地址與端口設置為可重用,綁定地址信息,進行監聽。
- 構建客戶端套接字:創建套接字,連接服務器
- 發送數據
- 接收數據
信道Channel模塊
該模塊是用來管理一個文件描述符的,對該描述符的事件進行監控,觸發事件就調用對應的回調函數:
- 當前需要監控的事件集
- 當前連接觸發的事件集
- 管理該文件描述符的Reactor模型(EventLoop)
- 可讀事件觸發的回調函數
- 可寫事件觸發的回調函數
- 錯誤事件觸發的回調函數
- 斷開事件觸發的回調函數
- 任意事件觸發的回調函數
提供一系列接口幫助我們管理對該文件描述符的監控。觸發事件通過HandleEvent()函數進行處理。
多路轉接Poller模塊
該模塊是對epoll接口的二次封裝,管理一系列的文件描述符。內部儲存文件描述符與其對應的信道Channel;
可以通過信道對epoll中監控的文件描述符進行更新監控事件集
提供一個開始監控接口Poll,該函數中阻塞式等待事件就緒。得到就緒事件就進行將就緒事件更新到對應的信道中,然后將信道放入活躍隊列中等待上層處理。
Reactor模塊
時間輪TimeWheel模塊
時間輪模塊是用來執行定時任務的,項目中可以用來管理超時連接,及時進行釋放操作。
為了可以實現定時執行任務,首先需要對封裝出一個任務對象Timer:
- 任務ID uint64_t _id :用來標識任務
- 超時時間 uint32_t _timeout;
- 是否被取消:bool _canceled; 這樣取消任務時,通過哈希表找到對應任務,設置該標志位就好了,效率高!
- 回調任務 _task_cb void()類型
- 釋放操作函數 _release :用于刪除TimeWheel保存的定時器對象信息
- 任務釋放時執行回調函數
時間輪TimeWheel的本質是一個數組,一個指針定時移動,到哪里就釋放該位置的所有任務Timer。
class TimeWheel
{using TaskPtr = std::shared_ptr<Timer>;using WeakPtr = std::weak_ptr<Timer>; // 輔助shared_ptr 不會增加引用計數
private:int _capacity; // 最大容量 表盤最大數量(默認60秒)int _tick; // 移動表針std::vector<std::vector<TaskPtr>> _wheel; // 時間輪std::unordered_map<uint64_t, WeakPtr> _timers; // 定時任務對象哈希表int _timerfd; // 定時器fdEventLoop *_loop; // 對應的Eventfdstd::unique_ptr<Channel> _timer_channel; // 管理_timerfd的channel類!//...
}
這里使用智能指針進行管理,方便操作。引用計數歸零時自動釋放。設置一個timefd,系統每1s都會想其中寫入一個1。讀取到數據,就移動指針,釋放該時間的定時任務指針。
EventLoop模塊
該模塊是真正的Reactor模型,管理一下數據:
std::thread::id _event_id; // 線程IDint _eventfd; // eventfd 用于通知事件std::unique_ptr<Channel> _event_channel; // 管理Event事件的Channel對象Poller _poller; // epoll模型std::vector<Functor> _tasks; // 任務池std::mutex _mtx; // 互斥鎖保護線程TimeWheel _timer_wheel; // 時間輪
該模塊的執行邏輯為:
- 通過EventLoop模塊通過的接口可以添加管理的信道(文件描述符)
- 啟動監控后,對epoll中的文件描述符進行事件監控。
- Poller模塊Poll函數中返回就緒的活躍信道集。
- 對每個活躍信道執行handleEvent處理就緒事件
- 對于外部設置的任務,需要保證是在該eventloop所在線程內部執行,如果不是放入任務池中等待執行。
EventLoop模塊與TimeWheel模塊整合
Reactor模型內部加入一個時間輪,負責執行超時連接銷毀任務,對外提供接口:
- Void TimerAdd(uint64_t id, int delay, Task_t cb):加入定時任務
- void TimerRefresh(uint64_t id):刷新定時任務
- void TimerCancel(uint64_t id):取消定時任務
- bool HasTimer(uint64_t id):判斷是否有該任務
連接Connection模塊
連接模塊是對上面模塊的整體整合,具有以下參數:
uint64_t _conn_id; // connection連接IDSocket _socket; // 管理的套接字int _sockfd; // 套接字fdEventLoop *_loop; // connection連接關聯的EventLoop對象Any _context; // 上下文數據Channel _channel; // 管理連接事件Buffer _in_buffer; // 輸入緩沖區 存放Socket中讀取的數據Buffer _out_buffer; // 輸出緩沖區 存放要發送給對端的數據bool _enable_active_release; // 是否開啟超時銷毀 默認是falseConnStatu _statu; // Connection連接狀態// 5 個 回調函數 --- 注意使用智能指針 防止在執行任務之前Connection銷毀using ConnectedCallBack = std::function<void(const PtrConn &)>; // 連接時進行的回調函數using MessageCallBack = std::function<void(const PtrConn &, Buffer *)>; // 處理數據時的回調函數using ClosedCallBack = std::function<void(const PtrConn &)>; // 關閉連接時的回調函數using AnyEventCallBack = std::function<void(const PtrConn &)>; // 處理任意事件時的回調函數ConnectedCallBack _conn_cb; // 連接回調函數類型MessageCallBack _message_cb; // 處理時回調函數ClosedCallBack _closed_cb; // 關閉階段的回調AnyEventCallBack _event_cb; // 任意事件觸發的回調// 還需要組件內的連接關閉回調 因為服務器組件內會把所有的連接管理起來 一旦某個連接關閉 就應該從管理的地方移除自己的信息!ClosedCallBack _event_closed_cb;
- 該連接管理的底層套接字
- 管理該套接字的信道:用于處理新連接事件
- 連接關聯的EventLoop模塊,用于事件監控
- 通用類型上下文
- 輸入/輸出緩沖區
- 新連接/處理消息/關閉連接/任意事件 回調函數
- 連接內部連接關閉函數
內部提供的函數:
- 首先是該連接內部遇到事件的函數,作為信道的回調函數
- 讀事件回調函數
- 寫事件回調函數
- 關閉事件回調
- 錯誤事件回調
- 線程內發送數據函數
- 線程內關閉連接函數
- 線程內啟動超時管理函數:將該連接的釋放函數Release放入EventLoop的時間輪中
- 線程內取消超時管理函數
- 線程內更新回調函數(協議)
- 線程內釋放函數ReleaseInLoop(真正的釋放函數):修改連接狀態,取消定時任務,執行用戶設置的關閉連接回調函數,通過內部釋放回調函數釋放在連接在服務器內的資源。
對外提供的接口:
- 基礎接口:返回連接id,套接字fd…
- 設置回調函數接口
- 發送數據接口:在EventLoop中加入發送任務
- 關閉連接接口:在EventLoop中加入關閉連接任務
- 啟動超時銷毀接口:在EventLoop中加入啟動超時銷毀任務
- 取消超時銷毀接口:在EventLoop中加入取消超時銷毀任務
- 切換協議接口:在EventLoop中加入取消切換協議任務
Accepter模塊
該模塊是對監聽套接字進行管理的:
Socket _socket; // 套接字對象EventLoop *_loop; // 對監聽套接字進行事件監控Channel _channel; // 用于對管理監聽套接字的事件using AcceptCallBack = std::function<void(int)>;AcceptCallBack _accept_callback;
不同于不同的套接字文件,監聽套接字只需要讀取事件的回調函數,從監聽套接字中讀取出新連接的fd,然后對該fd執行獲取連接的回調函數。
線程池模塊
我們先對線程進行封裝,使其包含EventLoop模塊。線程中需要執行的函數就是創建一個EventLoop,然后執行Start監控。
線程池對若干個線程進行管理:
int _thread_size; // 從屬線程數量int _next_idx; // 線程索引std::vector<LoopThread *> _threads; // 管理從屬線程std::vector<EventLoop *> _loops; // 管理從屬ReactorEventLoop *_baseloop; // 主Reactor
每次取出一個線程對外提供時,只需提供線程對應的EventLoop即可
TcpServer模塊
這個模塊就是最終的服務器模塊。主Reactor對監聽套接字的進行管理,獲取到新連接后就將連接交給一個線程中的從屬Reactor進行監控。進而實現高并發的服務器!
該模塊管理的成員變量:
uint64_t _conn_id; // 自增長的連接ID;int _port; // 綁定的端口號EventLoop _baseloop; // 主Reactor模型Acceptor _acceptor; // 監聽套接字LoopThreadPool _pool; // 從屬線程池std::unordered_map<uint64_t, PtrConn> _conns; // 管理連接的哈希表int _timeout; // 超時時間bool _enable_active_release; // 是否開啟超時銷毀 默認是false// 4種 回調函數 --- 注意使用智能指針 防止在執行任務之前Connection銷毀using ConnectedCallBack = std::function<void(const PtrConn &)>; // 連接時進行的回調函數using MessageCallBack = std::function<void(const PtrConn &, Buffer *)>; // 處理數據時的回調函數using ClosedCallBack = std::function<void(const PtrConn &)>; // 關閉連接時的回調函數using AnyEventCallBack = std::function<void(const PtrConn &)>; // 處理任意事件時的回調函數ConnectedCallBack _conn_cb; // 連接回調函數類型MessageCallBack _message_cb; // 處理時回調函數ClosedCallBack _closed_cb; // 關閉階段的回調AnyEventCallBack _event_cb; // 任意事件觸發的回調
- 提供給Accepterd的回調函數NewConnection,在該函數中對連接賦予一個id,并設置連接的回調函數,將該連接交給線程池中的一個線程進行監管,啟動對應的超時銷毀任務。
- 提供連接的內部釋放函數RemoveConnection,銷毀連接在服務器中的資源
- 提供給上層的設置回調函數接口
- 服務器開始運行:啟動服務器套接字的監聽,創建線程池,開啟主Reactor的監控。