1.實現目標
仿muduo庫One Thread One Loop式主從Reactor模型實現高并發服務器:
通過實現高并發服務器的組件,可以快速實現一個高并發服務器的搭建,并且,通過組內不同應用層協議的支持,可以快速完成高性能服務器的搭建,由于要實現的是一個服務器,所以并不涉及實際的業務代碼。
Http服務器:
Http協議是位于應用層的一個協議,全稱為一個超文本傳輸協議,是一個簡單的基于請求響應的協議,運行在Tcp之上的協議,它是不安全的協議。
Reactor模式:
Reactor 模式,是指通過一個或多個輸入同時傳遞給服務器進行請求處理時的事件驅動處理模式。
服務端程序處理傳入多路請求,并將它們同步分派給請求對應的處理線程,Reactor 模式也叫Dispatcher 模式,也可以叫做發布者模式。
分類:
單Reacto模式:
單I/O多路復用+業務處理,優點為所有的操作都在一個線程內執行,不存在線程安全問題,沒有死鎖問題,代碼容易編寫,缺點為無法利用cup多核有效資源,資源浪費嚴重,容易達到性能瓶頸。
單Reactor多線程:
單I/O多路轉接+線程池+業務處理,優點為可以充分發揮cpu多核資源,缺點為多個線程之間共享數據比較麻煩,單個Reactor承擔了所有的事件的監聽和響應,在單線程之下,高并發成為了性能瓶頸。
多Reactor多線程模式:
多個I/O多路轉接+線程池+業務處理,充分利用CPU多核資源,主從Reactor各司其職,在主Reactor中處理新連接請求事件,有新連接到來則分發到子Reactor中監控,在子Reactor中進行客戶端通信監控,有事件觸發,則接收數據分發給Worker線程池。
可以充分利用CPU資源,將相應交給子Reactor來實現。
目標定位
One Thread One Loop主從Reactor模型高并發服務器
咱們實現的是主從Reactor模型服務器,也就是主Reactor線程僅僅監控監聽描述符,獲取新建連接,保證獲取新連接的高效性,提高服務器的并發性能,主Reactor獲取到新鏈接之后,發布到子Reactor進行通信事件監控,子Reactor進行對行的監控各自的文件描述符的增刪查改以及上層的業務處理。
One Thread One Loop的思想就是把所有的操作都放到一個線程中進行,一個線程對應一個事件處理的循環。
當前實現中,因為并不確定組件使用者的使用意向,因此并不提供業務層工作線程池的實現,只實現主從Reactor,而Worker工作線程池,可由組件庫的使用者的需要自行決定是否使用和實現。
功能模塊劃分:
基于以上的理解,我們要實現的是一個帶有協議支持的Reactor模型高性能服務器,因此將整個項目的實現劃分為兩個大的模塊:
? SERVER模塊:實現Reactor模型的TCP服務器;
? 協議模塊:對當前的Reactor模型服務器提供應用層協議支持。
SERVER模塊:
SERVER模塊:SERVER模塊就是將所有的連接以及線程進行管理,在合適的時候做合適的事情,完成服務器的實現。具體的管理分為三個模塊:
監聽連接管理:對監聽鏈接進行通信管理。
通信連接管理:對正在通信的鏈接進行管理。
超時連接管理:對已經超時的鏈接進行管理。
基于以上思想,又可以劃分為多個模塊:
Buffer 模塊:
Boffer模塊,是一個緩沖區模塊,用于通信過程中接收和發送信息的管理。
Socket 模塊:
Socket模塊,主要對創建Socket做一個封裝,實現網絡通信的各項功能。
Channel模塊:
Channel模塊,主要對文件描述符需要進行的I/O事件管理模塊,實現對描述符可讀,可寫,錯誤...事件的管理操作,以及Poller模塊對描述符進行IO事件監控就緒后,根據不同的事件,回調不同的處理函數功能。
Connection 模塊:
Connection模塊是對Buffer模塊,Socket模塊,Channel模塊的一個整體封裝,實現了對一個通信套接字的整體的管理,每一個進行數據通信的套接字(也就是accept獲取到的新連接)都會使用Connection進行管理。
Connection模塊內部包含有三個由組件使用者傳入的回調函數:連接建立完成回調,事件回調,新數據回調,關閉回調。
- Connection模塊內部包含有兩個組件使用者提供的接口:數據發送接口,連接關閉接口
- Connection模塊內部包含有兩個用戶態緩沖區:用戶態接收緩沖區,用戶態發送緩沖區
- Connection模塊內部包含有一個Socket對象:完成描述符面向系統的IO操作
- Connection模塊內部包含有一個Channel對象:完成描述符IO事件就緒的處理
具體處理流程如下:
1. 實現向Channel提供可讀,可寫,錯誤等不同事件的IO事件回調函數,然后將Channel和對應的描述符添加到Poller事件監控中。
2. 當描述符在Poller模塊中就緒了IO可讀事件,則調用描述符對應Channel中保存的讀事件處理函數,進行數據讀取,將socket接收緩沖區全部讀取到Connection管理的用戶態接收緩沖區中。然后調用由組件使用者傳入的新數據到來回調函數進行處理。
3. 組件使用者進行數據的業務處理完畢后,通過Connection向使用者提供的數據發送接口,將數據寫入Connection的發送緩沖區中。
4. 啟動描述符在Poll模塊中的IO寫事件監控,就緒后,調用Channel中保存的寫事件處理函數,將發送緩沖區中的數據通過Socket進行面向系統的實際數據發送。Accept 模塊:
Acceptor模塊是對Socket模塊,Channel模塊的一個整體封裝,實現了對一個監聽套接字的整體的管理。
- Acceptor模塊內部包含有一個Socket對象:實現監聽套接字的操作。
- ?Acceptor模塊內部包含有一個Channel對象:實現監聽套接字IO事件就緒的處理。
具體流程:
1.實現向Channel提供可讀事件的IO事件處理回調函數,函數的功能其實也就是獲取新連接
2. 為新連接構建一個Connection對象出來。TimerQueue模塊:
TimerQueue模塊是實現固定時間定時任務的模塊,可以理解就是要給定時任務管理器,向定時任務管理器中添加一個任務,任務將在固定時間后被執行,同時也可以通過刷新定時任務來延遲任務的執行。
這個模塊主要是對Connection對象的生命周期管理,對非活躍連接進行超時后的釋放功能。TimerQueue模塊內部包含有一個timerfd:linux系統提供的定時器。
TimerQueue模塊內部包含有一個Channel對象:實現對timerfd的IO時間就緒回調處理。Poller模塊:
Poller模塊是對epoll進行封裝的一個模塊,主要實現epoll的IO事件添加,修改,移除,獲取活躍連接功能。
EverlLoop 模塊:
EventLoop模塊可以理解就是我們上邊所說的Reactor模塊,它是對Poller模塊,TimerQueue模塊,Socket模塊的一個整體封裝,進行所有描述符的事件監控。
EventLoop模塊必然是一個對象對應一個線程的模塊,線程內部的目的就是運行EventLoop的啟動函數。
EventLoop模塊為了保證整個服務器的線程安全問題,因此要求使用者對于Connection的所有操作一定要在其對應的EventLoop線程內完成,不能在其他線程中進行(比如組件使用者使用Connection發送數據,以及關閉連接這種操作)。
- EventLoop模塊保證自己內部所監控的所有描述符,都要是活躍連接,非活躍連接就要及時釋放避免資源浪費。
- EventLoop模塊內部包含有一個eventfd:eventfd其實就是linux內核提供的一個事件fd,專門用于事件通知。
- EventLoop模塊內部包含有一個Poller對象:用于進行描述符的IO事件監控。
- EventLoop模塊內部包含有一個TimerQueue對象:用于進行定時任務的管理。
- EventLoop模塊內部包含有一個PendingTask隊列:組件使用者將對Connection進行的所有操作,都加入到任務隊列中,由EventLoop模塊進行管理,并在EventLoop對應的線程中進行執行。
- 每一個Connection對象都會綁定到一個EventLoop上,這樣能保證對這個連接的所有操作都是在一個線程中完成的。
具體流程:
- 通過Poller模塊對當前模塊管理內的所有描述符進行IO事件監控,有描述符事件就緒后,通過描述符對應的Channel進行事件處理。
- 所有就緒的描述符IO事件處理完畢后,對任務隊列中的所有操作順序進行執行。
- 由于epoll的事件監控,有可能會因為沒有事件到來而持續阻塞,導致任務隊列中的任務不能及時得到執行,因此創建了eventfd,添加到Poller的事件監控中,用于實現每次向任務隊列添加任務的時候,通過向eventfd寫入數據來喚醒epoll的阻塞。
TcpServer模塊:
- 這個模塊是一個整體Tcp服務器模塊的封裝,內部封裝了Acceptor模塊EventLoopThrea-dpool模塊。
- TcpServer中包含有一個EventLoop對象:以備在超輕量使用場景中不需要EventLoop線程池,只需要在主線程中完成所有操作的情況。
- TcpServer模塊內部包含有一個EventLoopThreadPool對象:其實就是EventLoop線程池,也就是子Reactor線程池。
- TcpServer模塊內部包含有一個Acceptor對象:一個TcpServer服務器,必然對應有一個監聽套接字,能夠完成獲取客戶端新連接,并處理的任務。
- TcpServer模塊內部包含有一個std::shared_ptr<Connection>的hash表:保存了所有的新建連接對應的Connection,注意,所有的Connection使用shared_ptr進行管理,這樣能夠保證在hash表中刪除了Connection信息后,在shared_ptr計數器為0的情況下完成對Connection資源的釋放操作。
具體流程:
1. 在實例化TcpServer對象過程中,完成BaseLoop的設置,Acceptor對象的實例化,以及EventLoop線程池的實例化,以及std::shared_ptr<Connection>的hash表的實例化。
2. 為Acceptor對象設置回調函數:獲取到新連接后,為新連接構建Connection對象,設置
Connection的各項回調,并使用shared_ptr進行管理,并添加到hash表中進行管理,并為
Connection選擇一個EventLoop線程,為Connection添加一個定時銷毀任務,為Connection添加事件監控,
3. 啟動BaseLoop。
SERVER模式圖:
Http協議模式:
Http協議模式:HTTP協議模塊用于對高并發服務器模塊進行協議支持,基于提供的協議支持能夠更方便的完成指定協議服務器的搭建。而HTTP協議支持模塊的實現,可以細分為以下幾個模塊。
Util模塊:
這個模塊是一個工具模塊,主要提供HTTP協議模塊所用到的一些工具函數,比如url編解碼,文件讀寫....等。
HttpRequest模塊:
這個模塊是HTTP請求數據模塊,用于保存HTTP請求數據被解析后的各項請求元素信息。HttpResponse模塊:
這個模塊是HTTP響應數據模塊,用于業務處理后設置并保存HTTP響應數據的的各項元素信息,最終會被按照HTTP協議響應格式組織成為響應信息發送給客戶端。
HttpContext模塊:
這個模塊是一個HTTP請求接收的上下文模塊,主要是為了防止在一次接收的數據中,不是一個完整的HTTP請求,則解析過程并未完成,無法進行完整的請求處理,需要在下次接收到新數據后繼續根據上下文進行解析,最終得到一個HttpRequest請求信息對象,因此在請求數據的接收以及解析部分需要一個上下文來進行控制接收和處理節奏。HttpServer模塊:
這個模塊是最終給組件使用者提供的HTTP服務器模塊了,用于以簡單的接口實現HTTP服務器的搭建。
HttpServer模塊內部包含有一個TcpServer對象:TcpServer對象實現服務器的搭建
HttpServer模塊內部包含有兩個提供給TcpServer對象的接口:連接建立成功設置上下文接口,數據處理接口。
HttpServer模塊內部包含有一個hash-map表存儲請求與處理函數的映射表:組件使用者向
HttpServer設置哪些請求應該使用哪些函數進行處理,等TcpServer收到對應的請求就會使用對應的函數進行處理。
2.代碼實現
2.1? SERVER服務器模塊實現:
1.Buffer 模塊實現:
總緩沖區大小是一定的,開始的時候readbuff等于writebuff,為0,當寫入數據的時候,寫大小為數劇最后一個位置的后一個位置開始,對依然從數據開頭開始讀,讀完之后,我們不用刪除,當再次寫入的時候,寫入到上一個數據的最后一個位置的下一個位置,讀的大小等于寫的大小。寫繼續往后走。如果總體空間不夠,直接擴容,需要實現的函數接口比較多。
具體代碼實現:
#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
private:std::vector<char> _buffer;//總的緩沖區uint64_t _readbuff;//讀緩沖區大小uint64_t _writerbuff;//寫緩沖區大小
public:Buffer() : _readbuff(0), _writerbuff(0), _buffer(BUFFER_DEFAULT_SIZE) {}//初始化// 起始地址char* Begin(){return &(*_buffer.begin());}// 寫的起始地址char* GetWritePos(){return Begin() + _writerbuff;}// 讀的起始地址char* GetReadPos(){return Begin() + _readbuff;}// 獲取緩沖區末尾空閑空間大小--寫偏移之后的空閑空間, 總體空間大小減去寫偏移uint64_t TailIndexSize(){return _buffer.size() - _writerbuff;}// 獲取緩沖區起始空閑空間大小--讀偏移之前的空閑空間uint64_t HeadIndexSize(){return _readbuff;}// 獲取可讀數據大小 = 寫偏移 - 讀偏移uint64_t ReadIndexSize(){return _writerbuff - _readbuff;}// 將讀偏移向后移動size大小void MoveReadOff(uint64_t size){if (size == 0) return;// 向后移動的大小,必須小于可讀數據大小assert(size <= ReadIndexSize());_readbuff += size;}// 將寫偏移向后移動void MoveWriterOff(uint64_t size){if (size == 0) return;assert(size <= TailIndexSize());_writerbuff += size;}//確保有足夠的空間進行寫void EnsureWriteSpace(uint64_t size){// 末尾空間足夠,直接返回if (TailIndexSize() >= size) return;if (TailIndexSize() + HeadIndexSize() >= size){uint64_t rsz = ReadIndexSize();//寫空間的大小std::copy(GetReadPos(), GetReadPos() + rsz, Begin());//直接復制_readbuff = 0;// 將讀偏移歸0_writerbuff = rsz; // 將寫位置置為可讀數據大小, 因為當前的可讀數據大小就是寫偏移量}else{// 總體空間不夠,則需要擴容,不移動數據,直接給寫偏移之后擴容足夠空間即可DBG_LOG("RESIZE %ld", _writerbuff + size);//擴容_buffer.resize(_writerbuff + size);}}// 寫數據void Write(const void* data, uint64_t size){if (size == 0) return;EnsureWriteSpace(size);const char* wd = (const char* )data;std::copy(wd, wd + size, GetWritePos());}// 尾插void WritePush(const void* data, uint64_t size){Write(data, size);MoveWriterOff(size);}//寫stringvoid WriteString(const std::string& data){return Write(data.c_str(), data.size());}//寫數據并插入void WriteStringAndPush(const std::string &data){WriteString(data);MoveWriterOff(data.size());}void WriteBuffer(Buffer &data){return Write(data.GetReadPos(), data.ReadIndexSize());}void WriteBufferAndPush(Buffer &data){WriteBuffer(data);MoveWriterOff(data.ReadIndexSize());}// 讀取數據void Read(void *buf, uint64_t size){// 要求要獲取的數據大小必須小于可讀數據大小assert(size <= ReadIndexSize());std::copy(GetReadPos(), GetReadPos() + size, (char *)buf);}//讀并刪除void ReadAndPop(void *buf, uint64_t size){Read(buf, size);MoveReadOff(size);}//讀std::string ReadAsString(uint64_t size){// 要求要獲取的數據大小必須小于可讀數據大小assert(size <= ReadIndexSize());std::string str;str.resize(size);Read(&str[0], size);return str;}//讀std::string ReadAsStringAndPop(uint64_t size){assert(size <= ReadIndexSize());std::string str = ReadAsString(size);MoveReadOff(size);return str;}//查找CRLFchar* FindCRLF(){char* res = (char *)memchr(GetReadPos(), '\n', ReadIndexSize());//返回找到的地址return res;}std::string GetLine(){char *pos = FindCRLF();if (pos == NULL) return "";// +1是為了把換行字符也取出來。return ReadAsString(pos - GetReadPos() + 1);}std::string GetLineAndPop(){std::string str = GetLine();MoveReadOff(str.size());return str;}// 清空緩沖區void Clear(){// 只需要將偏移量歸0即可_readbuff = 0;_writerbuff = 0;}
};
2.Socket實現:
Socket套接字的實現比較簡單,就是將網絡函數接口進行封裝,Socket的主要功能:創建套接字,綁定地址和端口號,監聽是否有鏈接,服務端獲取新鏈接,客戶端發起連接請求,接收數據,發送數據,關閉套接字,端口號是否重用,設為非阻塞等。
代碼接口:
#define MAX_LISTEN 1024
class Socket
{
private:int _socket;
public:Socket() : _socket(-1){}Socket(int socket) : _socket(socket){}~Socket() { Close(); }int Fd() {return _socket;}// 創建socketbool Create(){_socket = socket(AF_INET, SOCK_STREAM, 0);if (_socket < 0){ERR_LOG("CREATE SOCKET FAILED!!");return false;}return true;}// 綁定地址信息bool Bind(const std::string &ip, uint16_t &port){struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(local);if (bind(_socket, (struct sockaddr *)&local, len) < 0){ERR_LOG("BIND ADDRESS FAILED!");return false;}return true;}// 監聽地址bool Listen(int backlog = MAX_LISTEN){if (listen(_socket, backlog) < 0){ERR_LOG("SOCKET LISTEN FAILED!");return false;}else return true;}// 請求連接bool Connect(const std::string& ip,const uint16_t& port){struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(local);if (connect(_socket, (struct sockaddr *)&local, len) < 0) // 錯誤{ERR_LOG("BIND ADDRESS FAILED!");return false;}return true;}// 獲取新鏈接int Accept(){int ret = accept(_socket, NULL, NULL);if (ret < 0){ERR_LOG("BIND ADDRESS FAILED!");return false;}return ret;}// 讀取數據bool Recv(char *buf, ssize_t size, int flag = 0){int n = recv(_socket, buf, size, flag);if (n <= 0){if (errno == EAGAIN || n == EINTR) return 0; // 這次沒有數據return false;}return true;}// 非阻塞讀取bool NoneBlockRecv(char *buf, ssize_t size){if (size == 0) return 0;Recv(buf, size, MSG_DONTWAIT); // MSG_DONTWAIT表示當前為非阻塞}// 發送bool Send(const char* buf, ssize_t size, int flag = 0){int n = send(_socket, buf, size, flag);if (n <= 0){if (errno == EAGAIN || n == EINTR) return 0; // 這次沒有數據return false;}return true;}// 非阻塞發送bool NoneBlockSend(char *buf, size_t size){if (size == 0) return 0;Send(buf, size, MSG_DONTWAIT);}// 關閉void Close(){if (_socket >= 0){close(_socket);_socket = -1;}}// 創建服務段端bool CreateServer(uint16_t &port, const std::string &ip = "0.0.0.0", bool block_flag = false){if (Create() == false) return false;if (block_flag) NoneBlock(); // 設為非阻塞狀態if (Bind(ip, port) == false) return false;if (Listen() == false) return false;ReuseAddress();return true;}// 創建客戶端bool CreateClient(const uint16_t& port, const std::string& ip = "0.0.0.0"){// 1. 創建套接字,2.指向連接服務器if (Create() == false) return false;if (Connect(ip, port) == false) return false;return true;}// 開啟地址端口重用void ReuseAddress(){int val = 1;setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int));val = 1;setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int));}// 非阻塞void NoneBlock(){int flag = fcntl(_socket, F_GETFL, 0);fcntl(_socket, F_SETFL, flag | O_NONBLOCK);}
};
3.Channel模塊:
Chennal 模塊是對一個文件描述符的封裝,現對描述符可讀,可寫,錯誤…事件的管理操作,以及Poller模塊對描述符進行IO事件監控就緒后,根據不同的事件,回調不同的處理函數功能。
功能:對一個描述符進行監控時間管理,意義,對于描述符的狀態更容易維護,對之后的操作流程更加方便,功能設計:描述符是否可讀,是否可寫,監控可讀,監控可寫,解除可讀,解除可寫,解除所有文件描述符。
代碼框架:
class Channel {private:int _fd;uint32_t events; // 當前需要監控的事件uint32_t revents; // 當前連接觸發的事件using eventCallback = std::function<void()>>;eventCallback _read_callback; // 可讀事件被觸發的回調函數eventCallback _error_callback; // 可寫事件被觸發的回調函數eventCallback _close_callback; // 連接關閉事件被觸發的回調函數eventCallback _event_callback; // 任意事件被觸發的回調函數eventCallback _write_callback; // 可寫事件被觸發的回調函數public:Channel(int fd) : fd(_fd) {}int Fd() {return _fd ;}void setReadCallback(const eventCallback &cb);//設置讀監控void setWriteCallback(const eventCallback &cb);//寫監控void setErrorCallback(const eventCallback &cb);//錯誤監控void setCloseCallback(const eventCallback &cb);//關閉監控void setEventCallback(const eventCallback &cb);//任意時間監控bool readAble(); // 當前是否可讀bool writeAble(); // 當前是否可寫void enableRead(); // 啟動讀事件監控void enableWrite(); // 啟動寫事件監控void disableRead(); // 關閉讀事件監控void disableWrite(); // 關閉寫事件監控void disableAll(); // 關閉所有事件監控void Remove(); //移除監控void handleEvent(); // 事件處理,一旦觸發了某個事件,就調用這個函數!};
4.Poller模塊
描述符事件監控模塊,對任意描述符進行時間監控,就是對epoll函數得封裝,時期變得更有意義。
功能設計:添加事件,修改事件,移除事件,取消定時任務。
封裝思想: 1. 必須擁有一個epoll的操作句柄
? ? ? ? ???????????2. 擁有一個struct epoll_event 結構數組,監控保存所有的活躍事件!
? ? ? ? ? ? ? ? ? ?3. 使用hash表管理描述符與描述符對應的事件管理Channnel對象!
邏輯流程:
? ? ? ? ? ? ? ? ?1. 對描述符進行監控,通過Channnel才能知道描述符監控什么事件
? ? ? ? ? ? ? ? ?2. 當描述符就緒了,通過描述符在hash表中找到對應的Channel(得到了Channel才知? ? ? ? ? ? ? ? ? ? ? 道什么事件如何處理)當描述符就緒了,返回就緒描述符對應的Channel。
代碼框架
框架:
class Poller {
private:int _epfd;struct epoll_event_evs[xxx];std::unordered_map<int,Channel*> mp;
private:// 1. 判斷要更新事件的描述符是否存在// 2. 針對epoll直接操作(添加,修改,移除)
public:// 1. 添加或者更新描述符所監控的事件void Update(Channel* channel);// 2. 移除描述符所監控的事件void Remove(Channel* )// 3. 開始監控,獲取就緒Channel
};
*/
/*
5.EventLoop模塊
這個模塊和線程是一一對應的!
監聽了一個鏈接,如果這個連接一旦就緒,就要進行事件處理!
但是如果這個描述符,在多個線程中都觸發了了事件,進行處理,就會存在線程安全問題!
因此我們需要將一個鏈接的事件監控, 以及連接事件處理,以及其他操作都放在同一個線程中!
如何保證一個連接的所有操作都在eventloop對應的線程中!
給eventLOOP模塊中,都添加一個任務隊列!
對連接的所有操作,都進行一次封裝,將對連接的操作當作任務都添加到任務隊列中!
功能:進行事件監控的模塊,一個模塊對應一個線程。意義,所有的線程都在這個模塊中完成,功能設計,將一個任務添加到任務隊列中,定時任務的添加,取消,刷新。
事件功能:
- 事件監控
- 使用Poller模塊
- 有事件就緒則進行事件處理!
- 執行任務隊列中的任務!
- 注意點:
- 因為有可能因為等待描述符IO事件就緒,執行流流程阻塞,這個時候任務對立中的任務得不到執行!
- 因此得有一個事件通知的東西,能夠喚醒事件監控的阻塞!
- 當事件就緒,需要處理的時候,處理過程中,如果對連接要進行某些操作!
- 這些操作必須要在Eventloop對應的線程中進行,保證對連接的各項操作都是線程安全的。
- 如果執行的操作就在本線程中,不需要將操作壓入隊列了,可以直接執行!
- 如果執行的操作不在線程中,才需要加入任務池,等到事件處理完了之后就行執行任務!
設計框架:
class Eventloop {
private:std::thread::id _thread_id; // 線程IDint _event_fd // eventfd 喚醒IO事件監控有可能的阻塞!!!Poller _poller; // 進行所有描述符的事件監控using Functor = std::function<void()>;std::vector<Functor> _task; // 任務池std::mutex _mutex; // 實現任務池操作的線程安全!!!
public:void runAllTask();
public:Eventloop();void runInLoop(const Functor&cb); // 判斷將要執行的任務是否處于當前線程中,如果是則執行,不是則壓入隊列。void queueInLoop(const Functor&cb); // 將操作壓入任務池!bool isInLoop(); //永遠判斷當前線程是否是EventLoop所對應的線程void updateEvent(Channel* channel); // 添加/修改描述符的事件監控void removeEvent(Channel* channel); // 移除描述符的監控void Start(); // 任務監控完畢進行處理任務! 三步走:事件監控-》就緒事件處理-》執行任務};
6.Connection模塊
Connection模塊是對Buffer模塊,Socket模塊,Channel模塊的?個整體封裝,實現了對一個通信套接字的整體的管理,每一個進行數據通信的套接字(也就是accept獲取到的新連接)都會使用
Connection進行管理。
? Connection模塊內部包含有三個由組件使用者傳入的回調函數:連接建立完成回調,事件回調,
新數據回調,關閉回調。
? Connection模塊內部包含有兩個組件使用者提供的接口:數據發送接口,連接關閉接口
? Connection模塊內部包含有兩個用戶態緩沖區:用戶態接收緩沖區,用戶態發送緩沖區
? Connection模塊內部包含有?個Socket對象:完成描述符面向系統的IO操作
? Connection模塊內部包含有?個Channel對象:完成描述符IO事件就緒的處理
這是對通信套接字進行通信管理的一個模塊,對一個連接的操作都是通過這個模塊來實現的。這各模塊本省并不是一個單獨的功能模塊,是一個連接管理的模塊。
Connection模塊,一個連接有任何的事件怎么處理都是有這個模塊來進行處理的,因為組件的設計也不知道使用者要如何處理事件,因此只能是提供一些事件回調函數由使用者設置。
設計框架:
DISCONECTED -- 連接關閉狀態; CONNECTING -- 連接建立成功-待處理狀態
//CONNECTED -- 連接建立完成,各種設置已完成,可以通信的狀態; DISCONNECTING -- 待關閉狀態
type enum { // 連接關閉;// 連接建立成功 —— 待處理狀態;// 連接設立完成,可以通信;// 待關閉狀態;DISCONECTED,CONNECTING,CONNECTED,DISCONECTING} ConnStatu;
using PreConnection = std::shared_ptr<Connection>;
class Connection {private:uint64_t _conn_id; //連接的唯一ID,便于連接的管理和查找bool _enable_inactive_release; // 連接是否啟動非活躍銷毀的判斷標志,默認為falseint _sockfd; // 連接關聯的文件描述符ConnStatu _statu; // Socket _socket; // 套接字操作管理Channel _channel; // 連接二點事件管理Buffer _in_buffer; // 輸入緩沖區 —— 存放從socket中讀到的數據buffer _out_buffer; // 輸出緩沖區 —— 發送給對端的是數據,等到描述符事件可寫,再發!Any _context; // 請求的接受處理上下文/*這四個回調函數,是讓服務器模塊來設置的(其實服務器模塊的處理回調也是組件使用者設置的)*//*換句話說,這幾個回調都是組件使用者使用的*/using ConnectCallback = std::function<void(const PreConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;/*組件內的連接關閉回調--組件內設置的,因為服務器組件內會把所有的連接管理起來,一旦某個連接要關閉*//*就應該從管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;private:// /*五個channel的事件回調函數*///描述符可讀事件觸發后調用的函數,接收socket數據放到接收緩沖區中,然后調用_message_callbackvoid HandleRead() {}void HandleRead() {}void HandleClose() {}void HandleError() {}//描述符觸發任意事件: 1. 刷新連接的活躍度--延遲定時銷毀任務; 2. 調用組件使用者的任意事件回調void HandleEvent() { }//連接獲取之后,所處的狀態下要進行各種設置(啟動讀監控,調用回調函數)void EstablishedInLoop() { }//這個接口才是實際的釋放接口void ReleaseInLoop() {}//這個接口并不是實際的發送接口,而只是把數據放到了發送緩沖區,啟動了可寫事件監控void SendInLoop(Buffer &buf) {}//這個關閉操作并非實際的連接釋放操作,需要判斷還有沒有數據待處理,待發送void ShutdownInLoop() {}//啟動非活躍連接超時釋放規則void EnableInactiveReleaseInLoop(int sec) {}void CancelInactiveReleaseInLoop() {}void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) {_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}public:Connection(EventLoop* loop,uint64_t _conn_id,int sockfd) : _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}~Connection() { DBG_LOG("RELEASE CONNECTION:%p", this); }//獲取管理的文件描述符int Fd() {return _sockfd; }// 獲取連接IDint Id() {return _conn_id; }// 是否處于CONNECTED狀態bool Connected() { return (_statu == CONNECTED); }//設置上下文--連接建立完成時進行調用void SetContext(const Any &context) { _context = context; }//獲取上下文,返回的是指針Any *GetContext() { return &_context; }void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; }//連接建立就緒后,進行channel回調設置,啟動讀監控,調用_connected_callbackvoid Established() {}_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));// 發送數據,將數據放到發送緩沖區,啟動寫事件監控void Send(const char *data, size_t len) {}//提供給組件使用者的關閉接口--并不實際關閉,需要判斷有沒有數據待處理void Shutdown() {}void Release() {}//啟動非活躍銷毀,并定義多長時間無通信就是非活躍,添加定時任務void EnableInactiveRelease(int sec) { }//取消非活躍銷毀void CancelInactiveRelease() {}void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) {_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}};
7.Acception模塊
對通信連接做整體管理的一個模塊,對一個通信連接的模塊都是通過這個模塊來進行的。實現了對套接字整體的管理。
意義:當獲取了一個新建連接的描述符后,需要為這個通信連接,封裝一個connection對象,設置不同回調。
注意:因為Acceptor模塊本身并不知道一個鏈接產生了某個事件該如何處理,因此獲取一個通信連接后,Connection的封裝,以及事件回調的設置都應該由服務器模塊來進行!
設計框架:
/*
Acceptor 模塊,對監聽套接字進行管理!1. 創建一個監聽套接字2. 啟動讀事件監控3. 事件觸發后,獲取新連接4. 調用新連接獲取成功后的回調函數!4. 為新連接創建Connection進行管理(這一步不是Acceptor模塊操作,應該是服務器模塊)因為Acceptor模塊只進行監聽連接的管理,因此獲取到新連接的描述符后,對于新連接描述符如何處理并不關心!對于新連接如何處理,應該是服務器模塊關心管理!服務器模塊,實現了一個對于新連接描述符處理的函數,將這個函數設置給Acceptor模塊的回調函數!
*/
8.LoopThread模塊
目標:將eventloop模塊和線程整合起來!
eventloop 和 線程是一一對應的!
eventloop 模塊實例化的對象,在構造的時候就會初始化! _thread_id;
而后面當運行一個操作的時候判斷是否運行在eventloop所對應的線程中,就是將線程ID與EventLoop模塊中的thread_id 進行一個比較,相同就表示在同一個線程,不同就表示當前運行線程并不是eventloop線程!
eventloop 模塊在實例化對象的時候,必須在線程內部!
eventloop 實例化對象會設置自己的 thread_id;
如果我們先創建了多個 eventloop 對象,然后創建了多個線程,將各自的線程id,重新給 eventloop 進行設置!
存在問題:在構造 eventloop對象,到設置新的 thread_id 期間將是不可控的!
因此,必須先創建線程,然后在線程的入口函數中,去實例化 eventloop 對象!
構造一個新的模塊:LoopThread。
class LoopThread {private:/*用于實現_loop獲取的同步關系,避免線程創建了,但是_loop還沒有實例化之前去獲取_loop*/std::mutex _mutex; // 互斥鎖std::condition_variable _cond; // 條件變量EventLoop *_loop; // EventLoop指針變量,這個對象需要在線程內實例化std::thread _thread; // EventLoop對應的線程private:/*實例化 EventLoop 對象,喚醒_cond上有可能阻塞的線程,并且開始運行EventLoop模塊的功能*/void ThreadEntry() {EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);//加鎖_loop = &loop;_cond.notify_all();}loop.Start();}public:/*創建線程,設定線程入口函數*/LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}/*返回當前線程關聯的EventLoop對象指針*/EventLoop *GetLoop() {EventLoop *loop = NULL;{std::unique_lock<std::mutex> lock(_mutex);//加鎖_cond.wait(lock, [&](){ return _loop != NULL; });//loop為NULL就一直阻塞loop = _loop;}return loop;}
};
9.LoopThreadPool模塊
線程數量可配置,調節線程數量,對線程數量進行管理。提供線程分配的功能。
1.線程數量可配置(0或多個)
注意事項:在服務器中,主從Reactor模型是 主線程只負責新連接獲取,叢書線程負責新連接的事件監控以及處理!因此當前的線程池,有可能從屬線程會數量為0,也就是實現單 Reactor服務器,一個線程及負責獲取連接以及連接的處理!
2. 對所有的線程進行管理,其實也就是管理0個或多個LoopThread對象!
3. 提供線程分配的功能!
4.當主線程獲取了一個鏈接,需要將新的線程掛到從屬線程上進行事件監控以及管理!
5.假設0個從屬線程,則直接分配給主線程的EventLoop,進行處理!
6.假設有多個叢書線程,則采用RR輪轉!(將對應線程的EventLoop獲取到,設置給對應的Connection)。
設計框架:
class LoopThreadPool {private:int _thread_count;int _next_idx;EventLoop *_baseloop;std::vector<LoopThread*> _threads;std::vector<EventLoop *> _loops;public:LoopThreadPool(EventLoop *baseloop):_thread_count(0), _next_idx(0), _baseloop(baseloop) {}void SetThreadCount(int count) { _thread_count = count; }void Create() {if (_thread_count > 0) {_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++) {_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}return ;}EventLoop *NextLoop() {if (_thread_count == 0) {return _baseloop;}_next_idx = (_next_idx + 1) % _thread_count;return _loops[_next_idx];}
};
10.TcpServer模塊
對之前所有的模塊進行整合,完成一個服務器的搭建。
實現思想:
1.管理
- Acceptor對象,創建一個監聽套接字!
- EventLoop 對象,baseloop對象,實現對監聽套接字的事件監控!
- std::vector conns,實現對新建連接的管理!
- EventLoopPool 對象,創建loop線程池,對新建連接進行事件監控和處理!
2.流程
- 在TcpServer中實例一個Acceptor對象,以及一個EventLoop 對象(baseloop)
- 將Acceptor 掛在baseloop 進行事件監控
- 一旦Acceptor 對象就緒了可讀事件,則執行時間回調函數獲取新建連接!
- 對新連接,創造一個 Connection 進行管理!
- 對新連接對應的 Connection 設置功能回調 (連接完成回調,消息回調,關閉回調,任意事件監控!)
- 啟動Connettion 的非活躍鏈接的超時銷毀功能.
- 將新連接對應的Connection 掛到 LoopThreadPool 中的叢書線程對應的Eventloop 中進行事件監控!
- 一旦Connection對應的鏈接就緒了可讀事件,則這個時候執行讀事件回調函數,讀取數據,讀取完畢后調用TcpServer設置的消息回調!
(三)功能設計
- 設置從屬線程池數量!
- 啟動服務器
- 設置各種回調函數!(連接建立完成,消息,關閉,任意) 用戶設置給TcpServer TcpServer設置獲取的新連接!
- 是否啟動非活躍連接超時銷毀功能
- 添加任務!
設計框架:
class TcpServer {private:uint64_t _next_id; //這是一個自動增長的連接ID,int _port;int _timeout; //這是非活躍連接的統計時間---多長時間無通信就是非活躍連接bool _enable_inactive_release;//是否啟動了非活躍連接超時銷毀的判斷標志EventLoop _baseloop; //這是主線程的EventLoop對象,負責監聽事件的處理Acceptor _acceptor; //這是監聽套接字的管理對象LoopThreadPool _pool; //這是從屬EventLoop線程池std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有連接對應的shared_ptr對象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void RunAfterInLoop(const Functor &task, int delay) {}//為新連接構造一個Connection進行管理void NewConnection(int fd) {}void RemoveConnectionInLoop(const PtrConnection &conn) {}//從管理Connection的_conns中移除連接信息void RemoveConnection(const PtrConnection &conn) {}public:TcpServer(int port):_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));}void SetThreadCount(int count) { }void SetConnectedCallback(const ConnectedCallback&cb) { }void SetMessageCallback(const MessageCallback&cb) { }void SetClosedCallback(const ClosedCallback&cb) {}void SetAnyEventCallback(const AnyEventCallback&cb) { }void EnableInactiveRelease(int timeout) { }//用于添加一個定時任務void RunAfter(const Functor &task, int delay) {}void Start() { }
};
2.2 Http模塊實現
Http模塊是處于應用層的一個簡單的協議,包含五部分,每部分有不同的內容。
1.Util的處理
目的:目的:實現一些工具接口,讀取文件內容,向文件寫入內容,URL編碼,URL解碼,通過HTTP狀態碼獲取描述信息,通過文件后綴名獲取mime,判斷一個文件是不是目錄,判斷一個文,是否是一個普通文件,HTTP資源路徑有效性判斷
框架設計:
class Util {public:// 字符串分割函數size_t Spilt();// 讀取文件內容static bool ReadFile() {}// 向文件寫入內容static bool WriteFile();// URL編碼static bool UrlEncode();// URL解碼static bool UrlDecode();// 通過HTTP狀態碼獲取描述信息static std::string StatusDesc();// 根據文件后綴名獲取文件MINEstatic std::string ExtMine();// 判斷一個文件是不是目錄static bool IsDirectory();//判斷一個文件是否是一個普通文件static bool IsRegular();//HTTP資源路徑有效性判斷static bool VaildPath();
};
2.HttpRequest
目的:存儲Http請求信息的,接收到一個數據,按照HTTP請求格式進行解析,得到各個關鍵要素放到Request中。
功能:
- HttpRequest模塊
- 存儲HTTP請求信息
- 接收到一個數據,按照HTTP請求格式進行解析,得到各個關鍵要素放到Request中
- HttpResponse模塊
- 存儲HTTP響應信息
- 進行業務處理的同時,讓使用者向Response中填充響應要素,完畢后,將其組織成HTTP響應格式的數據,發給客戶端。
class HttpRequest {public:std::string _method; //請求方法std::string _path; //資源路徑std::string _version; //協議版本std::string _body; //請求正文std::smatch _matches; //資源路徑的正則提取數據std::unordered_map<std::string, std::string> _headers; //頭部字段std::unordered_map<std::string, std::string> _params; //查詢字符串public:HttpRequest():_version("HTTP/1.1") {}void ReSet() {_method.clear();_path.clear();_version = "HTTP/1.1";_body.clear();std::smatch match;_matches.swap(match);_headers.clear();_params.clear();}//插入頭部字段void SetHeader(const std::string &key, const std::string &val) {_headers.insert(std::make_pair(key, val));}//判斷是否存在指定頭部字段bool HasHeader(const std::string &key) const {auto it = _headers.find(key);if (it == _headers.end()) {return false;}return true;}//獲取指定頭部字段的值std::string GetHeader(const std::string &key) const {auto it = _headers.find(key);if (it == _headers.end()) {return "";}return it->second;}//插入查詢字符串void SetParam(const std::string &key, const std::string &val) {_params.insert(std::make_pair(key, val));}//判斷是否有某個指定的查詢字符串bool HasParam(const std::string &key) const {auto it = _params.find(key);if (it == _params.end()) {return false;}return true;}//獲取指定的查詢字符串std::string GetParam(const std::string &key) const {auto it = _params.find(key);if (it == _params.end()) {return "";}return it->second;}//獲取正文長度size_t ContentLength() const {// Content-Length: 1234\r\nbool ret = HasHeader("Content-Length");if (ret == false) {return 0;}std::string clen = GetHeader("Content-Length");return std::stol(clen);}//判斷是否是短鏈接bool Close() const {// 沒有Connection字段,或者有Connection但是值是close,則都是短鏈接,否則就是長連接if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") {return false;}return true;}
};
3.Http