源碼:田某super/moduo
目錄
SERVER模塊:
Buffer模塊:
Socket模塊:
Channel模塊:
Connection模塊:
Acceptor模塊:
TimerQueue模塊:
Poller模塊:
EventLoop模塊:
TcpServer模塊:
HTTP協議組件模塊:
Util模塊:
HttpRequest模塊:
HttpContext模塊:
HttpServer模塊:
通過咱們實現的?并發服務器組件,可以簡潔快速的完成?個?性能的服務器搭建。
并且,通過組件內提供的不同應?層協議?持,也可以快速完成?個?性能應?服務器的搭建(當前 ,為了便于項?的演?,項?中提供HTTP協議組件的?持)。 在這?,要明確的是咱們要實現的是?個?并發服務器組件,因此當前的項?中并不包含實際的業務內容。
圖片來源:?圖解one loop per thread:使用muduo網絡庫實現web服務器_znzxc的博客-CSDN博客
本項目總共分為兩大模塊分別為Server和HTTP模塊。
SERVER模塊:
SERVER模塊就是對所有的連接以及線程進?管理,讓它們各司其職,在合適的時候做合適的事,最終完成?性能服務器組件的實現。
管理??:
監聽連接管理:對監聽連接進?管理
通信連接管理:對通信連接進?管理
超時連接管理:對超時連接進?管理
基于以上可以將其分為多個子模塊:
Buffer模塊:
?于實現用戶態緩沖區,提供數據緩沖,取出等功能。
class Buffer {private:std::vector<char> _buffer; //使用vector進行內存空間管理uint64_t _reader_idx; //讀偏移uint64_t _writer_idx; //寫偏移public:Buffer():_reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE){}char *Begin() { return &*_buffer.begin(); }//獲取當前寫入起始地址, _buffer的空間起始地址,加上寫偏移量char *WritePosition() { return Begin() + _writer_idx; }//獲取當前讀取起始地址char *ReadPosition() { return Begin() + _reader_idx; }//獲取緩沖區末尾空閑空間大小--寫偏移之后的空閑空間, 總體空間大小減去寫偏移uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }//獲取緩沖區起始空閑空間大小--讀偏移之前的空閑空間uint64_t HeadIdleSize() { return _reader_idx; }//獲取可讀數據大小 = 寫偏移 - 讀偏移uint64_t ReadAbleSize() { return _writer_idx - _reader_idx; }//將讀偏移向后移動void MoveReadOffset(uint64_t len) { if (len == 0) return; //向后移動的大小,必須小于可讀數據大小assert(len <= ReadAbleSize());_reader_idx += len;}//將寫偏移向后移動 void MoveWriteOffset(uint64_t len) {//向后移動的大小,必須小于當前后邊的空閑空間大小assert(len <= TailIdleSize());_writer_idx += len;}//確保可寫空間足夠(整體空閑空間夠了就移動數據,否則就擴容)void EnsureWriteSpace(uint64_t len) {//如果末尾空閑空間大小足夠,直接返回if (TailIdleSize() >= len) { return; }//末尾空閑空間不夠,則判斷加上起始位置的空閑空間大小是否足夠, 夠了就將數據移動到起始位置if (len <= TailIdleSize() + HeadIdleSize()) {//將數據移動到起始位置uint64_t rsz = ReadAbleSize();//把當前數據大小先保存起來std::copy(ReadPosition(), ReadPosition() + rsz, Begin());//把可讀數據拷貝到起始位置_reader_idx = 0; //將讀偏移歸0_writer_idx = rsz; //將寫位置置為可讀數據大小, 因為當前的可讀數據大小就是寫偏移量}else {//總體空間不夠,則需要擴容,不移動數據,直接給寫偏移之后擴容足夠空間即可DBG_LOG("RESIZE %ld", _writer_idx + len);_buffer.resize(_writer_idx + len);}} //寫入數據void Write(const void *data, uint64_t len) {//1. 保證有足夠空間,2. 拷貝數據進去if (len == 0) return;EnsureWriteSpace(len);const char *d = (const char *)data;std::copy(d, d + len, WritePosition());}void WriteAndPush(const void *data, uint64_t len) {Write(data, len);MoveWriteOffset(len);}void WriteString(const std::string &data) {return Write(data.c_str(), data.size());}void WriteStringAndPush(const std::string &data) {WriteString(data);MoveWriteOffset(data.size());}void WriteBuffer(Buffer &data) {return Write(data.ReadPosition(), data.ReadAbleSize());}void WriteBufferAndPush(Buffer &data) { WriteBuffer(data);MoveWriteOffset(data.ReadAbleSize());}//讀取數據void Read(void *buf, uint64_t len) {//要求要獲取的數據大小必須小于可讀數據大小assert(len <= ReadAbleSize());std::copy(ReadPosition(), ReadPosition() + len, (char*)buf);}void ReadAndPop(void *buf, uint64_t len) {Read(buf, len);MoveReadOffset(len);}std::string ReadAsString(uint64_t len) {//要求要獲取的數據大小必須小于可讀數據大小assert(len <= ReadAbleSize());std::string str;str.resize(len);Read(&str[0], len);return str;}std::string ReadAsStringAndPop(uint64_t len) {assert(len <= ReadAbleSize());std::string str = ReadAsString(len);MoveReadOffset(len);return str;}char *FindCRLF() {char *res = (char*)memchr(ReadPosition(), '\n', ReadAbleSize());return res;}/*通常獲取一行數據,這種情況針對是*/std::string GetLine() {char *pos = FindCRLF();if (pos == NULL) {return "";}// +1是為了把換行字符也取出來。return ReadAsString(pos - ReadPosition() + 1);}std::string GetLineAndPop() {std::string str = GetLine();MoveReadOffset(str.size());return str;}//清空緩沖區void Clear() {//只需要將偏移量歸0即可_reader_idx = 0;_writer_idx = 0;}
};
Socket模塊:
封裝套接字
class Socket {private:int _sockfd;public:Socket():_sockfd(-1) {}Socket(int fd): _sockfd(fd) {}~Socket() { Close(); }int Fd() { return _sockfd; }//創建套接字bool Create() {// int socket(int domain, int type, int protocol)_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (_sockfd < 0) {ERR_LOG("CREATE SOCKET FAILED!!");return false;}return true;}//綁定地址信息bool Bind(const std::string &ip, uint16_t port) {struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);// int bind(int sockfd, struct sockaddr*addr, socklen_t len);int ret = bind(_sockfd, (struct sockaddr*)&addr, len);if (ret < 0) {ERR_LOG("BIND ADDRESS FAILED!");return false;}return true;}//開始監聽bool Listen(int backlog = MAX_LISTEN) {// int listen(int backlog)int ret = listen(_sockfd, backlog);if (ret < 0) {ERR_LOG("SOCKET LISTEN FAILED!");return false;}return true;}//向服務器發起連接bool Connect(const std::string &ip, uint16_t port) {struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);// int connect(int sockfd, struct sockaddr*addr, socklen_t len);int ret = connect(_sockfd, (struct sockaddr*)&addr, len);if (ret < 0) {ERR_LOG("CONNECT SERVER FAILED!");return false;}return true;}//獲取新連接int Accept() {// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);int newfd = accept(_sockfd, NULL, NULL);if (newfd < 0) {ERR_LOG("SOCKET ACCEPT FAILED!");return -1;}return newfd;}//接收數據ssize_t Recv(void *buf, size_t len, int flag = 0) {// ssize_t recv(int sockfd, void *buf, size_t len, int flag);ssize_t ret = recv(_sockfd, buf, len, flag);if (ret <= 0) {//EAGAIN 當前socket的接收緩沖區中沒有數據了,在非阻塞的情況下才會有這個錯誤//EINTR 表示當前socket的阻塞等待,被信號打斷了,if (errno == EAGAIN || errno == EINTR) {return 0;//表示這次接收沒有接收到數據}ERR_LOG("SOCKET RECV FAILED!!");return -1;}return ret; //實際接收的數據長度}ssize_t NonBlockRecv(void *buf, size_t len) {return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示當前接收為非阻塞。}//發送數據ssize_t Send(const void *buf, size_t len, int flag = 0) {// ssize_t send(int sockfd, void *data, size_t len, int flag);ssize_t ret = send(_sockfd, buf, len, flag);if (ret < 0) {if (errno == EAGAIN || errno == EINTR) {return 0;}ERR_LOG("SOCKET SEND FAILED!!");return -1;}return ret;//實際發送的數據長度}ssize_t NonBlockSend(void *buf, size_t len) {if (len == 0) return 0;return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示當前發送為非阻塞。}//關閉套接字void Close() {if (_sockfd != -1) {close(_sockfd);_sockfd = -1;}}//創建一個服務端連接bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) {//1. 創建套接字,2. 綁定地址,3. 開始監聽,4. 設置非阻塞, 5. 啟動地址重用if (Create() == false) return false;if (block_flag) NonBlock();if (Bind(ip, port) == false) return false;if (Listen() == false) return false;ReuseAddress();return true;}//創建一個客戶端連接bool CreateClient(uint16_t port, const std::string &ip) {//1. 創建套接字,2.指向連接服務器if (Create() == false) return false;if (Connect(ip, port) == false) return false;return true;}//設置套接字選項---開啟地址端口重用void ReuseAddress() {// int setsockopt(int fd, int leve, int optname, void *val, int vallen)int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));}//設置套接字阻塞屬性-- 設置為非阻塞void NonBlock() {//int fcntl(int fd, int cmd, ... /* arg */ );int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}
};
Channel模塊:
Channel模塊是對?個描述符需要進?的IO事件管理的模塊,實現對描述符可讀,可寫,錯誤...事件的管理操作,以及Poller模塊對描述符進?IO事件監控就緒后,根據不同的事件,回調不同的處理函數功能。
class Channel {private:int _fd;EventLoop *_loop;uint32_t _events; // 當前需要監控的事件uint32_t _revents; // 當前連接觸發的事件using EventCallback = std::function<void()>;EventCallback _read_callback; //可讀事件被觸發的回調函數EventCallback _write_callback; //可寫事件被觸發的回調函數EventCallback _error_callback; //錯誤事件被觸發的回調函數EventCallback _close_callback; //連接斷開事件被觸發的回調函數EventCallback _event_callback; //任意事件被觸發的回調函數public:Channel(EventLoop *loop, int fd):_fd(fd), _events(0), _revents(0), _loop(loop) {}int Fd() { return _fd; }uint32_t Events() { return _events; }//獲取想要監控的事件void SetREvents(uint32_t events) { _revents = events; }//設置實際就緒的事件void SetReadCallback(const EventCallback &cb) { _read_callback = cb; }void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }//當前是否監控了可讀bool ReadAble() { return (_events & EPOLLIN); } //當前是否監控了可寫bool WriteAble() { return (_events & EPOLLOUT); }//啟動讀事件監控void EnableRead() { _events |= EPOLLIN; Update(); }//啟動寫事件監控void EnableWrite() { _events |= EPOLLOUT; Update(); }//關閉讀事件監控void DisableRead() { _events &= ~EPOLLIN; Update(); }//關閉寫事件監控void DisableWrite() { _events &= ~EPOLLOUT; Update(); }//關閉所有事件監控void DisableAll() { _events = 0; Update(); }//移除監控void Remove();void Update();//事件處理,一旦連接觸發了事件,就調用這個函數,自己觸發了什么事件如何處理自己決定void HandleEvent() {if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {/*不管任何事件,都調用的回調函數*/if (_read_callback) _read_callback();}/*有可能會釋放連接的操作事件,一次只處理一個*/if (_revents & EPOLLOUT) {if (_write_callback) _write_callback();}else if (_revents & EPOLLERR) {if (_error_callback) _error_callback();//一旦出錯,就會釋放連接,因此要放到前邊調用任意回調}else if (_revents & EPOLLHUP) {if (_close_callback) _close_callback();}if (_event_callback) _event_callback();}
};
Connection模塊:
Connection模塊是對Buffer模塊,Socket模塊,Channel模塊的?個整體封裝,實現了對?個通信套接字的整體的管理,每?個進?數據通信的套接字(也就是accept獲取到的新連接)都會使? Connection進?管理。
class Connection : public std::enable_shared_from_this<Connection> {private:uint64_t _conn_id; // 連接的唯一ID,便于連接的管理和查找//uint64_t _timer_id; //定時器ID,必須是唯一的,這塊為了簡化操作使用conn_id作為定時器IDint _sockfd; // 連接關聯的文件描述符bool _enable_inactive_release; // 連接是否啟動非活躍銷毀的判斷標志,默認為falseEventLoop *_loop; // 連接所關聯的一個EventLoopConnStatu _statu; // 連接狀態Socket _socket; // 套接字操作管理Channel _channel; // 連接的事件管理Buffer _in_buffer; // 輸入緩沖區---存放從socket中讀取到的數據Buffer _out_buffer; // 輸出緩沖區---存放要發送給對端的數據Any _context; // 請求的接收處理上下文/*這四個回調函數,是讓服務器模塊來設置的(其實服務器模塊的處理回調也是組件使用者設置的)*//*換句話說,這幾個回調都是組件使用者使用的*/using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;/*組件內的連接關閉回調--組件內設置的,因為服務器組件內會把所有的連接管理起來,一旦某個連接要關閉*//*就應該從管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;private:/*五個channel的事件回調函數*///描述符可讀事件觸發后調用的函數,接收socket數據放到接收緩沖區中,然后調用_message_callbackvoid HandleRead() {//1. 接收socket的數據,放到緩沖區char buf[65536];ssize_t ret = _socket.NonBlockRecv(buf, 65535);if (ret < 0) {//出錯了,不能直接關閉連接return ShutdownInLoop();}//這里的等于0表示的是沒有讀取到數據,而并不是連接斷開了,連接斷開返回的是-1//將數據放入輸入緩沖區,寫入之后順便將寫偏移向后移動_in_buffer.WriteAndPush(buf, ret);//2. 調用message_callback進行業務處理if (_in_buffer.ReadAbleSize() > 0) {//shared_from_this--從當前對象自身獲取自身的shared_ptr管理對象return _message_callback(shared_from_this(), &_in_buffer);}}//描述符可寫事件觸發后調用的函數,將發送緩沖區中的數據進行發送void HandleWrite() {//_out_buffer中保存的數據就是要發送的數據ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if (ret < 0) {//發送錯誤就該關閉連接了,if (_in_buffer.ReadAbleSize() > 0) {_message_callback(shared_from_this(), &_in_buffer);}return Release();//這時候就是實際的關閉釋放操作了。}_out_buffer.MoveReadOffset(ret);//千萬不要忘了,將讀偏移向后移動if (_out_buffer.ReadAbleSize() == 0) {_channel.DisableWrite();// 沒有數據待發送了,關閉寫事件監控//如果當前是連接待關閉狀態,則有數據,發送完數據釋放連接,沒有數據則直接釋放if (_statu == DISCONNECTING) {return Release();}}return;}//描述符觸發掛斷事件void HandleClose() {/*一旦連接掛斷了,套接字就什么都干不了了,因此有數據待處理就處理一下,完畢關閉連接*/if (_in_buffer.ReadAbleSize() > 0) {_message_callback(shared_from_this(), &_in_buffer);}return Release();}//描述符觸發出錯事件void HandleError() {return HandleClose();}//描述符觸發任意事件: 1. 刷新連接的活躍度--延遲定時銷毀任務; 2. 調用組件使用者的任意事件回調void HandleEvent() {if (_enable_inactive_release == true) { _loop->TimerRefresh(_conn_id); }if (_event_callback) { _event_callback(shared_from_this()); }}//連接獲取之后,所處的狀態下要進行各種設置(啟動讀監控,調用回調函數)void EstablishedInLoop() {// 1. 修改連接狀態; 2. 啟動讀事件監控; 3. 調用回調函數assert(_statu == CONNECTING);//當前的狀態必須一定是上層的半連接狀態_statu = CONNECTED;//當前函數執行完畢,則連接進入已完成連接狀態// 一旦啟動讀事件監控就有可能會立即觸發讀事件,如果這時候啟動了非活躍連接銷毀_channel.EnableRead();if (_connected_callback) _connected_callback(shared_from_this());}//這個接口才是實際的釋放接口void ReleaseInLoop() {//1. 修改連接狀態,將其置為DISCONNECTED_statu = DISCONNECTED;//2. 移除連接的事件監控_channel.Remove();//3. 關閉描述符_socket.Close();//4. 如果當前定時器隊列中還有定時銷毀任務,則取消任務if (_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop();//5. 調用關閉回調函數,避免先移除服務器管理的連接信息導致Connection被釋放,再去處理會出錯,因此先調用用戶的回調函數if (_closed_callback) _closed_callback(shared_from_this());//移除服務器內部管理的連接信息if (_server_closed_callback) _server_closed_callback(shared_from_this());}//這個接口并不是實際的發送接口,而只是把數據放到了發送緩沖區,啟動了可寫事件監控void SendInLoop(Buffer &buf) {if (_statu == DISCONNECTED) return ;_out_buffer.WriteBufferAndPush(buf);if (_channel.WriteAble() == false) {_channel.EnableWrite();}}//這個關閉操作并非實際的連接釋放操作,需要判斷還有沒有數據待處理,待發送void ShutdownInLoop() {_statu = DISCONNECTING;// 設置連接為半關閉狀態if (_in_buffer.ReadAbleSize() > 0) {if (_message_callback) _message_callback(shared_from_this(), &_in_buffer);}//要么就是寫入數據的時候出錯關閉,要么就是沒有待發送數據,直接關閉if (_out_buffer.ReadAbleSize() > 0) {if (_channel.WriteAble() == false) {_channel.EnableWrite();}}if (_out_buffer.ReadAbleSize() == 0) {Release();}}//啟動非活躍連接超時釋放規則void EnableInactiveReleaseInLoop(int sec) {//1. 將判斷標志 _enable_inactive_release 置為true_enable_inactive_release = true;//2. 如果當前定時銷毀任務已經存在,那就刷新延遲一下即可if (_loop->HasTimer(_conn_id)) {return _loop->TimerRefresh(_conn_id);}//3. 如果不存在定時銷毀任務,則新增_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));}void CancelInactiveReleaseInLoop() {_enable_inactive_release = false;if (_loop->HasTimer(_conn_id)) { _loop->TimerCancel(_conn_id); }}void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) {_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),_channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}~Connection() { DBG_LOG("RELEASE CONNECTION:%p", this); }//獲取管理的文件描述符int Fd() { return _sockfd; }//獲取連接IDint Id() { return _conn_id; }//是否處于CONNECTED狀態bool Connected() { return (_statu == CONNECTED); }//設置上下文--連接建立完成時進行調用void SetContext(const Any &context) { _context = context; }//獲取上下文,返回的是指針Any *GetContext() { return &_context; }void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; }//連接建立就緒后,進行channel回調設置,啟動讀監控,調用_connected_callbackvoid Established() {_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));}//發送數據,將數據放到發送緩沖區,啟動寫事件監控void Send(const char *data, size_t len) {//外界傳入的data,可能是個臨時的空間,我們現在只是把發送操作壓入了任務池,有可能并沒有被立即執行//因此有可能執行的時候,data指向的空間有可能已經被釋放了。Buffer buf;buf.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));}//提供給組件使用者的關閉接口--并不實際關閉,需要判斷有沒有數據待處理void Shutdown() {_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}void Release() {_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));}//啟動非活躍銷毀,并定義多長時間無通信就是非活躍,添加定時任務void EnableInactiveRelease(int sec) {_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));}//取消非活躍銷毀void CancelInactiveRelease() {_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));}//切換協議---重置上下文以及階段性回調處理函數 -- 而是這個接口必須在EventLoop線程中立即執行//防備新的事件觸發后,處理的時候,切換任務還沒有被執行--會導致數據使用原協議處理了。void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) {_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}
};
Acceptor模塊:
Acceptor模塊是對Socket模塊,Channel模塊的?個整體封裝,實現了對?個監聽套接字的整體的管理。
class Acceptor {private:Socket _socket;//用于創建監聽套接字EventLoop *_loop; //用于對監聽套接字進行事件監控Channel _channel; //用于對監聽套接字進行事件管理using AcceptCallback = std::function<void(int)>;AcceptCallback _accept_callback;private:/*監聽套接字的讀事件回調處理函數---獲取新連接,調用_accept_callback函數進行新連接處理*/void HandleRead() {int newfd = _socket.Accept();if (newfd < 0) {return ;}if (_accept_callback) _accept_callback(newfd);}int CreateServer(int port) {bool ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();}public:/*不能將啟動讀事件監控,放到構造函數中,必須在設置回調函數后,再去啟動*//*否則有可能造成啟動監控后,立即有事件,處理的時候,回調函數還沒設置:新連接得不到處理,且資源泄漏*/Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd()) {_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));}void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }void Listen() { _channel.EnableRead(); }
};
TimerQueue模塊:
TimerQueue模塊是實現固定時間定時任務的模塊,可以理解就是要給定時任務管理器,向定時任務管理器中添加?個任務,任務將在固定時間后被執?,同時也可以通過刷新定時任務來延遲任務的執?。
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask{private:uint64_t _id; // 定時器任務對象IDuint32_t _timeout; //定時任務的超時時間bool _canceled; // false-表示沒有被取消, true-表示被取消TaskFunc _task_cb; //定時器對象要執行的定時任務ReleaseFunc _release; //用于刪除TimerWheel中保存的定時器對象信息public:TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb): _id(id), _timeout(delay), _task_cb(cb), _canceled(false) {}~TimerTask() { if (_canceled == false) _task_cb(); _release(); }void Cancel() { _canceled = true; }void SetRelease(const ReleaseFunc &cb) { _release = cb; }uint32_t DelayTime() { return _timeout; }
};class TimerWheel {private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _tick; //當前的秒針,走到哪里釋放哪里,釋放哪里,就相當于執行哪里的任務int _capacity; //表盤最大數量---其實就是最大延遲時間std::vector<std::vector<PtrTask>> _wheel;std::unordered_map<uint64_t, WeakTask> _timers;EventLoop *_loop;int _timerfd;//定時器描述符--可讀事件回調就是讀取計數器,執行定時任務std::unique_ptr<Channel> _timer_channel;private:void RemoveTimer(uint64_t id) {auto it = _timers.find(id);if (it != _timers.end()) {_timers.erase(it);}}static int CreateTimerfd() {int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd < 0) {ERR_LOG("TIMERFD CREATE FAILED!");abort();}//int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old);struct itimerspec itime;itime.it_value.tv_sec = 1;itime.it_value.tv_nsec = 0;//第一次超時時間為1s后itime.it_interval.tv_sec = 1; itime.it_interval.tv_nsec = 0; //第一次超時后,每次超時的間隔時timerfd_settime(timerfd, 0, &itime, NULL);return timerfd;}int ReadTimefd() {uint64_t times;//有可能因為其他描述符的事件處理花費事件比較長,然后在處理定時器描述符事件的時候,有可能就已經超時了很多次//read讀取到的數據times就是從上一次read之后超時的次數int ret = read(_timerfd, ×, 8);if (ret < 0) {ERR_LOG("READ TIMEFD FAILED!");abort();}return times;}//這個函數應該每秒鐘被執行一次,相當于秒針向后走了一步void RunTimerTask() {_tick = (_tick + 1) % _capacity;_wheel[_tick].clear();//清空指定位置的數組,就會把數組中保存的所有管理定時器對象的shared_ptr釋放掉}void OnTime() {//根據實際超時的次數,執行對應的超時任務int times = ReadTimefd();for (int i = 0; i < times; i++) {RunTimerTask();}}void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) {PtrTask pt(new TimerTask(id, delay, cb));pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);_timers[id] = WeakTask(pt);}void TimerRefreshInLoop(uint64_t id) {//通過保存的定時器對象的weak_ptr構造一個shared_ptr出來,添加到輪子中auto it = _timers.find(id);if (it == _timers.end()) {return;//沒找著定時任務,沒法刷新,沒法延遲}PtrTask pt = it->second.lock();//lock獲取weak_ptr管理的對象對應的shared_ptrint delay = pt->DelayTime();int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}void TimerCancelInLoop(uint64_t id) {auto it = _timers.find(id);if (it == _timers.end()) {return;//沒找著定時任務,沒法刷新,沒法延遲}PtrTask pt = it->second.lock();if (pt) pt->Cancel();}public:TimerWheel(EventLoop *loop):_capacity(60), _tick(0), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)) {_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));_timer_channel->EnableRead();//啟動讀事件監控}/*定時器中有個_timers成員,定時器信息的操作有可能在多線程中進行,因此需要考慮線程安全問題*//*如果不想加鎖,那就把對定期的所有操作,都放到一個線程中進行*/void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);//刷新/延遲定時任務void TimerRefresh(uint64_t id);void TimerCancel(uint64_t id);/*這個接口存在線程安全問題--這個接口實際上不能被外界使用者調用,只能在模塊內,在對應的EventLoop線程內執行*/bool HasTimer(uint64_t id) {auto it = _timers.find(id);if (it == _timers.end()) {return false;}return true;}
};
Poller模塊:
Poller模塊是對epoll進?封裝的?個模塊,主要實現epoll的IO事件添加,修改,移除,獲取活躍連接功能。
class Poller {private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel *> _channels;private://對epoll的直接操作void Update(Channel *channel, int op) {// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0) {ERR_LOG("EPOLLCTL FAILED!");}return;}//判斷一個Channel是否已經添加了事件監控bool HasChannel(Channel *channel) {auto it = _channels.find(channel->Fd());if (it == _channels.end()) {return false;}return true;}public:Poller() {_epfd = epoll_create(MAX_EPOLLEVENTS);if (_epfd < 0) {ERR_LOG("EPOLL CREATE FAILED!!");abort();//退出程序}}//添加或修改監控事件void UpdateEvent(Channel *channel) {bool ret = HasChannel(channel);if (ret == false) {//不存在則添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);}//移除監控void RemoveEvent(Channel *channel) {auto it = _channels.find(channel->Fd());if (it != _channels.end()) {_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}//開始監控,返回活躍連接void Poll(std::vector<Channel*> *active) {// int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout)int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);if (nfds < 0) {if (errno == EINTR) {return ;}ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));abort();//退出程序}for (int i = 0; i < nfds; i++) {auto it = _channels.find(_evs[i].data.fd);assert(it != _channels.end());it->second->SetREvents(_evs[i].events);//設置實際就緒的事件active->push_back(it->second);}return;}
};
EventLoop模塊:
EventLoop模塊可以理解就是我們上邊所說的Reactor模塊,它是對Poller模塊,TimerQueue模塊,Socket模塊的?個整體封裝,進?所有描述符的事件監控。
EventLoop模塊必然是?個對象對應?個線程的模塊,線程內部的?的就是運?EventLoop的啟動函數。
class EventLoop {private:using Functor = std::function<void()>;std::thread::id _thread_id;//線程IDint _event_fd;//eventfd喚醒IO事件監控有可能導致的阻塞std::unique_ptr<Channel> _event_channel;Poller _poller;//進行所有描述符的事件監控std::vector<Functor> _tasks;//任務池std::mutex _mutex;//實現任務池操作的線程安全TimerWheel _timer_wheel;//定時器模塊public://執行任務池中的所有任務void RunAllTask() {std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}for (auto &f : functor) {f();}return ;}static int CreateEventFd() {int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0) {ERR_LOG("CREATE EVENTFD FAILED!!");abort();//讓程序異常退出}return efd;}void ReadEventfd() {uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if (ret < 0) {//EINTR -- 被信號打斷; EAGAIN -- 表示無數據可讀if (errno == EINTR || errno == EAGAIN) {return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return ;}void WeakUpEventFd() {uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if (ret < 0) {if (errno == EINTR) {return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return ;}public:EventLoop():_thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)),_timer_wheel(this) {//給eventfd添加可讀事件回調函數,讀取eventfd事件通知次數_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));//啟動eventfd的讀事件監控_event_channel->EnableRead();}//三步走--事件監控-》就緒事件處理-》執行任務void Start() {while(1) {//1. 事件監控, std::vector<Channel *> actives;_poller.Poll(&actives);//2. 事件處理。 for (auto &channel : actives) {channel->HandleEvent();}//3. 執行任務RunAllTask();}}//用于判斷當前線程是否是EventLoop對應的線程;bool IsInLoop() {return (_thread_id == std::this_thread::get_id());}void AssertInLoop() {assert(_thread_id == std::this_thread::get_id());}//判斷將要執行的任務是否處于當前線程中,如果是則執行,不是則壓入隊列。void RunInLoop(const Functor &cb) {if (IsInLoop()) {return cb();}return QueueInLoop(cb);}//將操作壓入任務池void QueueInLoop(const Functor &cb) {{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}//喚醒有可能因為沒有事件就緒,而導致的epoll阻塞;//其實就是給eventfd寫入一個數據,eventfd就會觸發可讀事件WeakUpEventFd();}//添加/修改描述符的事件監控void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }//移除描述符的監控void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};
TcpServer模塊:
這個模塊是?個整體Tcp服務器模塊的封裝,內部封裝了Acceptor模塊,EventLoopThreadPool模
塊。
class TcpServer {private:uint64_t _next_id; //這是一個自動增長的連接ID,int _port;int _timeout; //這是非活躍連接的統計時間---多長時間無通信就是非活躍連接bool _enable_inactive_release;//是否啟動了非活躍連接超時銷毀的判斷標志EventLoop _baseloop; //這是主線程的EventLoop對象,負責監聽事件的處理Acceptor _acceptor; //這是監聽套接字的管理對象LoopThreadPool _pool; //這是從屬EventLoop線程池std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有連接對應的shared_ptr對象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void RunAfterInLoop(const Functor &task, int delay) {_next_id++;_baseloop.TimerAdd(_next_id, delay, task);}//為新連接構造一個Connection進行管理void NewConnection(int fd) {_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//啟動非活躍超時銷毀conn->Established();//就緒初始化_conns.insert(std::make_pair(_next_id, conn));}void RemoveConnectionInLoop(const PtrConnection &conn) {int id = conn->Id();auto it = _conns.find(id);if (it != _conns.end()) {_conns.erase(it);}}//從管理Connection的_conns中移除連接信息void RemoveConnection(const PtrConnection &conn) {_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}public:TcpServer(int port):_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();//將監聽套接字掛到baseloop上}void SetThreadCount(int count) { return _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }//用于添加一個定時任務void RunAfter(const Functor &task, int delay) {_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}void Start() { _pool.Create(); _baseloop.Start(); }
};void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) {_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
//刷新/延遲定時任務
void TimerWheel::TimerRefresh(uint64_t id) {_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id) {_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}class NetWork {public:NetWork() {DBG_LOG("SIGPIPE INIT");signal(SIGPIPE, SIG_IGN);}
};
HTTP協議組件模塊:
Util模塊:
std::unordered_map<int, std::string> _statu_msg = {{100, "Continue"},{101, "Switching Protocol"},{102, "Processing"},{103, "Early Hints"},{200, "OK"},{201, "Created"},{202, "Accepted"},{203, "Non-Authoritative Information"},{204, "No Content"},{205, "Reset Content"},{206, "Partial Content"},{207, "Multi-Status"},{208, "Already Reported"},{226, "IM Used"},{300, "Multiple Choice"},{301, "Moved Permanently"},{302, "Found"},{303, "See Other"},{304, "Not Modified"},{305, "Use Proxy"},{306, "unused"},{307, "Temporary Redirect"},{308, "Permanent Redirect"},{400, "Bad Request"},{401, "Unauthorized"},{402, "Payment Required"},{403, "Forbidden"},{404, "Not Found"},{405, "Method Not Allowed"},{406, "Not Acceptable"},{407, "Proxy Authentication Required"},{408, "Request Timeout"},{409, "Conflict"},{410, "Gone"},{411, "Length Required"},{412, "Precondition Failed"},{413, "Payload Too Large"},{414, "URI Too Long"},{415, "Unsupported Media Type"},{416, "Range Not Satisfiable"},{417, "Expectation Failed"},{418, "I'm a teapot"},{421, "Misdirected Request"},{422, "Unprocessable Entity"},{423, "Locked"},{424, "Failed Dependency"},{425, "Too Early"},{426, "Upgrade Required"},{428, "Precondition Required"},{429, "Too Many Requests"},{431, "Request Header Fields Too Large"},{451, "Unavailable For Legal Reasons"},{501, "Not Implemented"},{502, "Bad Gateway"},{503, "Service Unavailable"},{504, "Gateway Timeout"},{505, "HTTP Version Not Supported"},{506, "Variant Also Negotiates"},{507, "Insufficient Storage"},{508, "Loop Detected"},{510, "Not Extended"},{511, "Network Authentication Required"}
};std::unordered_map<std::string, std::string> _mime_msg = {{".aac", "audio/aac"},{".abw", "application/x-abiword"},{".arc", "application/x-freearc"},{".avi", "video/x-msvideo"},{".azw", "application/vnd.amazon.ebook"},{".bin", "application/octet-stream"},{".bmp", "image/bmp"},{".bz", "application/x-bzip"},{".bz2", "application/x-bzip2"},{".csh", "application/x-csh"},{".css", "text/css"},{".csv", "text/csv"},{".doc", "application/msword"},{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},{".eot", "application/vnd.ms-fontobject"},{".epub", "application/epub+zip"},{".gif", "image/gif"},{".htm", "text/html"},{".html", "text/html"},{".ico", "image/vnd.microsoft.icon"},{".ics", "text/calendar"},{".jar", "application/java-archive"},{".jpeg", "image/jpeg"},{".jpg", "image/jpeg"},{".js", "text/javascript"},{".json", "application/json"},{".jsonld", "application/ld+json"},{".mid", "audio/midi"},{".midi", "audio/x-midi"},{".mjs", "text/javascript"},{".mp3", "audio/mpeg"},{".mpeg", "video/mpeg"},{".mpkg", "application/vnd.apple.installer+xml"},{".odp", "application/vnd.oasis.opendocument.presentation"},{".ods", "application/vnd.oasis.opendocument.spreadsheet"},{".odt", "application/vnd.oasis.opendocument.text"},{".oga", "audio/ogg"},{".ogv", "video/ogg"},{".ogx", "application/ogg"},{".otf", "font/otf"},{".png", "image/png"},{".pdf", "application/pdf"},{".ppt", "application/vnd.ms-powerpoint"},{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},{".rar", "application/x-rar-compressed"},{".rtf", "application/rtf"},{".sh", "application/x-sh"},{".svg", "image/svg+xml"},{".swf", "application/x-shockwave-flash"},{".tar", "application/x-tar"},{".tif", "image/tiff"},{".tiff", "image/tiff"},{".ttf", "font/ttf"},{".txt", "text/plain"},{".vsd", "application/vnd.visio"},{".wav", "audio/wav"},{".weba", "audio/webm"},{".webm", "video/webm"},{".webp", "image/webp"},{".woff", "font/woff"},{".woff2", "font/woff2"},{".xhtml", "application/xhtml+xml"},{".xls", "application/vnd.ms-excel"},{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},{".xml", "application/xml"},{".xul", "application/vnd.mozilla.xul+xml"},{".zip", "application/zip"},{".3gp", "video/3gpp"},{".3g2", "video/3gpp2"},{".7z", "application/x-7z-compressed"}
};class Util {public://字符串分割函數,將src字符串按照sep字符進行分割,得到的各個字串放到arry中,最終返回字串的數量static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string> *arry) {size_t offset = 0;// 有10個字符,offset是查找的起始位置,范圍應該是0~9,offset==10就代表已經越界了while(offset < src.size()) {size_t pos = src.find(sep, offset);//在src字符串偏移量offset處,開始向后查找sep字符/字串,返回查找到的位置if (pos == std::string::npos) {//沒有找到特定的字符//將剩余的部分當作一個字串,放入arry中if(pos == src.size()) break;arry->push_back(src.substr(offset));return arry->size();}if (pos == offset) {offset = pos + sep.size();continue;//當前字串是一個空的,沒有內容}arry->push_back(src.substr(offset, pos - offset));offset = pos + sep.size();}return arry->size();}//讀取文件的所有內容,將讀取的內容放到一個Buffer中static bool ReadFile(const std::string &filename, std::string *buf) {std::ifstream ifs(filename, std::ios::binary);if (ifs.is_open() == false) {printf("OPEN %s FILE FAILED!!", filename.c_str());return false;}size_t fsize = 0;ifs.seekg(0, ifs.end);//跳轉讀寫位置到末尾fsize = ifs.tellg(); //獲取當前讀寫位置相對于起始位置的偏移量,從末尾偏移剛好就是文件大小ifs.seekg(0, ifs.beg);//跳轉到起始位置buf->resize(fsize); //開辟文件大小的空間ifs.read(&(*buf)[0], fsize);if (ifs.good() == false) {printf("READ %s FILE FAILED!!", filename.c_str());ifs.close();return false;}ifs.close();return true;}//向文件寫入數據static bool WriteFile(const std::string &filename, const std::string &buf) {std::ofstream ofs(filename, std::ios::binary | std::ios::trunc);if (ofs.is_open() == false) {printf("OPEN %s FILE FAILED!!", filename.c_str());return false;}ofs.write(buf.c_str(), buf.size());if (ofs.good() == false) {ERR_LOG("WRITE %s FILE FAILED!", filename.c_str());ofs.close(); return false;}ofs.close();return true;}//URL編碼,避免URL中資源路徑與查詢字符串中的特殊字符與HTTP請求中特殊字符產生歧義//編碼格式:將特殊字符的ascii值,轉換為兩個16進制字符,前綴% C++ -> C%2B%2B// 不編碼的特殊字符: RFC3986文檔規定 . - _ ~ 字母,數字屬于絕對不編碼字符//RFC3986文檔規定,編碼格式 %HH //W3C標準中規定,查詢字符串中的空格,需要編碼為+, 解碼則是+轉空格static std::string UrlEncode(const std::string url, bool convert_space_to_plus) {std::string res;for (auto &c : url) {if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c)) {res += c;continue;}if (c == ' ' && convert_space_to_plus == true) {res += '+';continue;}//剩下的字符都是需要編碼成為 %HH 格式char tmp[4] = {0};//snprintf 與 printf比較類似,都是格式化字符串,只不過一個是打印,一個是放到一塊空間中snprintf(tmp, 4, "%%%02X", c);res += tmp;}return res;}static char HEXTOI(char c) {if (c >= '0' && c <= '9') {return c - '0';}else if (c >= 'a' && c <= 'z') {return c - 'a' + 10;}else if (c >= 'A' && c <= 'Z') {return c - 'A' + 10;}return -1; }static std::string UrlDecode(const std::string url, bool convert_plus_to_space) {//遇到了%,則將緊隨其后的2個字符,轉換為數字,第一個數字左移4位,然后加上第二個數字 + -> 2b %2b->2 << 4 + 11std::string res;for (int i = 0; i < url.size(); i++) {if (url[i] == '+' && convert_plus_to_space == true) {res += ' ';continue;}if (url[i] == '%' && (i + 2) < url.size()) {char v1 = HEXTOI(url[i + 1]);char v2 = HEXTOI(url[i + 2]);char v = v1 * 16 + v2;res += v;i += 2;continue;}res += url[i];}return res;}//響應狀態碼的描述信息獲取static std::string StatuDesc(int statu) {auto it = _statu_msg.find(statu);if (it != _statu_msg.end()) {return it->second;}return "Unknow";}//根據文件后綴名獲取文件mimestatic std::string ExtMime(const std::string &filename) {// a.b.txt 先獲取文件擴展名size_t pos = filename.find_last_of('.');if (pos == std::string::npos) {return "application/octet-stream";}//根據擴展名,獲取mimestd::string ext = filename.substr(pos);auto it = _mime_msg.find(ext);if (it == _mime_msg.end()) {return "application/octet-stream";}return it->second;}//判斷一個文件是否是一個目錄static bool IsDirectory(const std::string &filename) {struct stat st;int ret = stat(filename.c_str(), &st);if (ret < 0) {return false;}return S_ISDIR(st.st_mode);}//判斷一個文件是否是一個普通文件static bool IsRegular(const std::string &filename) {struct stat st;int ret = stat(filename.c_str(), &st);if (ret < 0) {return false;}return S_ISREG(st.st_mode);}//http請求的資源路徑有效性判斷// /index.html --- 前邊的/叫做相對根目錄 映射的是某個服務器上的子目錄// 想表達的意思就是,客戶端只能請求相對根目錄中的資源,其他地方的資源都不予理會// /../login, 這個路徑中的..會讓路徑的查找跑到相對根目錄之外,這是不合理的,不安全的static bool ValidPath(const std::string &path) {//思想:按照/進行路徑分割,根據有多少子目錄,計算目錄深度,有多少層,深度不能小于0std::vector<std::string> subdir;Split(path, "/", &subdir);int level = 0;for (auto &dir : subdir) {if (dir == "..") {level--; //任意一層走出相對根目錄,就認為有問題if (level < 0) return false;continue;}level++;}return true;}
};
HttpRequest模塊:
class HttpRequest {public:std::string _method; //請求方法std::string _path; //資源路徑std::string _version; //協議版本std::string _body; //請求正文std::smatch _matches; //資源路徑的正則提取數據std::unordered_map<std::string, std::string> _headers; //頭部字段std::unordered_map<std::string, std::string> _params; //查詢字符串public:HttpRequest():_version("HTTP/1.1") {}void ReSet() {_method.clear();_path.clear();_version = "HTTP/1.1";_body.clear();std::smatch match;_matches.swap(match);_headers.clear();_params.clear();}//插入頭部字段void SetHeader(const std::string &key, const std::string &val) {_headers.insert(std::make_pair(key, val));}//判斷是否存在指定頭部字段bool HasHeader(const std::string &key) const {auto it = _headers.find(key);if (it == _headers.end()) {return false;}return true;}//獲取指定頭部字段的值std::string GetHeader(const std::string &key) const {auto it = _headers.find(key);if (it == _headers.end()) {return "";}return it->second;}//插入查詢字符串void SetParam(const std::string &key, const std::string &val) {_params.insert(std::make_pair(key, val));}//判斷是否有某個指定的查詢字符串bool HasParam(const std::string &key) const {auto it = _params.find(key);if (it == _params.end()) {return false;}return true;}//獲取指定的查詢字符串std::string GetParam(const std::string &key) const {auto it = _params.find(key);if (it == _params.end()) {return "";}return it->second;}//獲取正文長度size_t ContentLength() const {// Content-Length: 1234\r\nbool ret = HasHeader("Content-Length");if (ret == false) {return 0;}std::string clen = GetHeader("Content-Length");return std::stol(clen);}//判斷是否是短鏈接bool Close() const {// 沒有Connection字段,或者有Connection但是值是close,則都是短鏈接,否則就是長連接if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") {return false;}return true;}
};
HttpContext模塊:
class HttpContext {private:int _resp_statu; //響應狀態碼HttpRecvStatu _recv_statu; //當前接收及解析的階段狀態HttpRequest _request; //已經解析得到的請求信息private:bool ParseHttpLine(const std::string &line) {std::smatch matches;std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase);bool ret = std::regex_match(line, matches, e);if (ret == false) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 400;//BAD REQUESTreturn false;}//0 : GET /bitejiuyeke/login?user=xiaoming&pass=123123 HTTP/1.1//1 : GET//2 : /bitejiuyeke/login//3 : user=xiaoming&pass=123123//4 : HTTP/1.1//請求方法的獲取_request._method = matches[1];std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);//資源路徑的獲取,需要進行URL解碼操作,但是不需要+轉空格_request._path = Util::UrlDecode(matches[2], false);//協議版本的獲取_request._version = matches[4];//查詢字符串的獲取與處理std::vector<std::string> query_string_arry;std::string query_string = matches[3];//查詢字符串的格式 key=val&key=val....., 先以 & 符號進行分割,得到各個字串Util::Split(query_string, "&", &query_string_arry);//針對各個字串,以 = 符號進行分割,得到key 和val, 得到之后也需要進行URL解碼for (auto &str : query_string_arry) {size_t pos = str.find("=");if (pos == std::string::npos) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 400;//BAD REQUESTreturn false;}std::string key = Util::UrlDecode(str.substr(0, pos), true); std::string val = Util::UrlDecode(str.substr(pos + 1), true);_request.SetParam(key, val);}return true;}bool RecvHttpLine(Buffer *buf) {if (_recv_statu != RECV_HTTP_LINE) return false;//1. 獲取一行數據,帶有末尾的換行 std::string line = buf->GetLineAndPop();//2. 需要考慮的一些要素:緩沖區中的數據不足一行, 獲取的一行數據超大if (line.size() == 0) {//緩沖區中的數據不足一行,則需要判斷緩沖區的可讀數據長度,如果很長了都不足一行,這是有問題的if (buf->ReadAbleSize() > MAX_LINE) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414;//URI TOO LONGreturn false;}//緩沖區中數據不足一行,但是也不多,就等等新數據的到來return true;}if (line.size() > MAX_LINE) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414;//URI TOO LONGreturn false;}bool ret = ParseHttpLine(line);if (ret == false) {return false;}//首行處理完畢,進入頭部獲取階段_recv_statu = RECV_HTTP_HEAD;return true;}bool RecvHttpHead(Buffer *buf) {if (_recv_statu != RECV_HTTP_HEAD) return false;//一行一行取出數據,直到遇到空行為止, 頭部的格式 key: val\r\nkey: val\r\n....while(1){std::string line = buf->GetLineAndPop();//2. 需要考慮的一些要素:緩沖區中的數據不足一行, 獲取的一行數據超大if (line.size() == 0) {//緩沖區中的數據不足一行,則需要判斷緩沖區的可讀數據長度,如果很長了都不足一行,這是有問題的if (buf->ReadAbleSize() > MAX_LINE) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414;//URI TOO LONGreturn false;}//緩沖區中數據不足一行,但是也不多,就等等新數據的到來return true;}if (line.size() > MAX_LINE) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414;//URI TOO LONGreturn false;}if (line == "\n" || line == "\r\n") {break;}bool ret = ParseHttpHead(line);if (ret == false) {return false;}}//頭部處理完畢,進入正文獲取階段_recv_statu = RECV_HTTP_BODY;return true;}bool ParseHttpHead(std::string &line) {//key: val\r\nkey: val\r\n....if (line.back() == '\n') line.pop_back();//末尾是換行則去掉換行字符if (line.back() == '\r') line.pop_back();//末尾是回車則去掉回車字符size_t pos = line.find(": ");if (pos == std::string::npos) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 400;//return false;}std::string key = line.substr(0, pos); std::string val = line.substr(pos + 2);_request.SetHeader(key, val);return true;}bool RecvHttpBody(Buffer *buf) {if (_recv_statu != RECV_HTTP_BODY) return false;//1. 獲取正文長度size_t content_length = _request.ContentLength();if (content_length == 0) {//沒有正文,則請求接收解析完畢_recv_statu = RECV_HTTP_OVER;return true;}//2. 當前已經接收了多少正文,其實就是往 _request._body 中放了多少數據了size_t real_len = content_length - _request._body.size();//實際還需要接收的正文長度//3. 接收正文放到body中,但是也要考慮當前緩沖區中的數據,是否是全部的正文// 3.1 緩沖區中數據,包含了當前請求的所有正文,則取出所需的數據if (buf->ReadAbleSize() >= real_len) {_request._body.append(buf->ReadPosition(), real_len);buf->MoveReadOffset(real_len);_recv_statu = RECV_HTTP_OVER;return true;}// 3.2 緩沖區中數據,無法滿足當前正文的需要,數據不足,取出數據,然后等待新數據到來_request._body.append(buf->ReadPosition(), buf->ReadAbleSize());buf->MoveReadOffset(buf->ReadAbleSize());return true;}public:HttpContext():_resp_statu(200), _recv_statu(RECV_HTTP_LINE) {}void ReSet() {_resp_statu = 200;_recv_statu = RECV_HTTP_LINE;_request.ReSet();}int RespStatu() { return _resp_statu; }HttpRecvStatu RecvStatu() { return _recv_statu; }HttpRequest &Request() { return _request; }//接收并解析HTTP請求void RecvHttpRequest(Buffer *buf) {//不同的狀態,做不同的事情,但是這里不要break, 因為處理完請求行后,應該立即處理頭部,而不是退出等新數據switch(_recv_statu) {case RECV_HTTP_LINE: RecvHttpLine(buf);case RECV_HTTP_HEAD: RecvHttpHead(buf);case RECV_HTTP_BODY: RecvHttpBody(buf);}return;}
};
HttpServer模塊:
class HttpServer {private:using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;using Handlers = std::vector<std::pair<std::regex, Handler>>;Handlers _get_route;Handlers _post_route;Handlers _put_route;Handlers _delete_route;std::string _basedir; //靜態資源根目錄TcpServer _server;private:void ErrorHandler(const HttpRequest &req, HttpResponse *rsp) {//1. 組織一個錯誤展示頁面std::string body;body += "<html>";body += "<head>";body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";body += "</head>";body += "<body>";body += "<h1>";body += std::to_string(rsp->_statu);body += " ";body += Util::StatuDesc(rsp->_statu);body += "</h1>";body += "</body>";body += "</html>";//2. 將頁面數據,當作響應正文,放入rsp中rsp->SetContent(body, "text/html");}//將HttpResponse中的要素按照http協議格式進行組織,發送void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) {//1. 先完善頭部字段if (req.Close() == true) {rsp.SetHeader("Connection", "close");}else {rsp.SetHeader("Connection", "keep-alive");}if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false) {rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));}if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false) {rsp.SetHeader("Content-Type", "application/octet-stream");}if (rsp._redirect_flag == true) {rsp.SetHeader("Location", rsp._redirect_url);}//2. 將rsp中的要素,按照http協議格式進行組織std::stringstream rsp_str;rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "\r\n";for (auto &head : rsp._headers) {rsp_str << head.first << ": " << head.second << "\r\n";}rsp_str << "\r\n";rsp_str << rsp._body;//3. 發送數據conn->Send(rsp_str.str().c_str(), rsp_str.str().size());}bool IsFileHandler(const HttpRequest &req) {// 1. 必須設置了靜態資源根目錄if (_basedir.empty()) {return false;}// 2. 請求方法,必須是GET / HEAD請求方法if (req._method != "GET" && req._method != "HEAD") {return false;}// 3. 請求的資源路徑必須是一個合法路徑if (Util::ValidPath(req._path) == false) {return false;}// 4. 請求的資源必須存在,且是一個普通文件// 有一種請求比較特殊 -- 目錄:/, /image/, 這種情況給后邊默認追加一個 index.html// index.html /image/a.png// 不要忘了前綴的相對根目錄,也就是將請求路徑轉換為實際存在的路徑 /image/a.png -> ./wwwroot/image/a.pngstd::string req_path = _basedir + req._path;//為了避免直接修改請求的資源路徑,因此定義一個臨時對象if (req._path.back() == '/') {req_path += "index.html";}if (Util::IsRegular(req_path) == false) {return false;}return true;}//靜態資源的請求處理 --- 將靜態資源文件的數據讀取出來,放到rsp的_body中, 并設置mimevoid FileHandler(const HttpRequest &req, HttpResponse *rsp) {std::string req_path = _basedir + req._path;if (req._path.back() == '/') {req_path += "index.html";}bool ret = Util::ReadFile(req_path, &rsp->_body);if (ret == false) {return;}std::string mime = Util::ExtMime(req_path);rsp->SetHeader("Content-Type", mime);return;}//功能性請求的分類處理void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers) {//在對應請求方法的路由表中,查找是否含有對應資源請求的處理函數,有則調用,沒有則發揮404//思想:路由表存儲的時鍵值對 -- 正則表達式 & 處理函數//使用正則表達式,對請求的資源路徑進行正則匹配,匹配成功就使用對應函數進行處理// /numbers/(\d+) /numbers/12345for (auto &handler : handlers) {const std::regex &re = handler.first;const Handler &functor = handler.second;bool ret = std::regex_match(req._path, req._matches, re);if (ret == false) {continue;}return functor(req, rsp);//傳入請求信息,和空的rsp,執行處理函數}rsp->_statu = 404;}void Route(HttpRequest &req, HttpResponse *rsp) {//1. 對請求進行分辨,是一個靜態資源請求,還是一個功能性請求// 靜態資源請求,則進行靜態資源的處理// 功能性請求,則需要通過幾個請求路由表來確定是否有處理函數// 既不是靜態資源請求,也沒有設置對應的功能性請求處理函數,就返回405if (IsFileHandler(req) == true) {//是一個靜態資源請求, 則進行靜態資源請求的處理return FileHandler(req, rsp);}if (req._method == "GET" || req._method == "HEAD") {return Dispatcher(req, rsp, _get_route);}else if (req._method == "POST") {return Dispatcher(req, rsp, _post_route);}else if (req._method == "PUT") {return Dispatcher(req, rsp, _put_route);}else if (req._method == "DELETE") {return Dispatcher(req, rsp, _delete_route);}rsp->_statu = 405;// Method Not Allowedreturn ;}//設置上下文void OnConnected(const PtrConnection &conn) {conn->SetContext(HttpContext());DBG_LOG("NEW CONNECTION %p", conn.get());}//緩沖區數據解析+處理void OnMessage(const PtrConnection &conn, Buffer *buffer) {while(buffer->ReadAbleSize() > 0){//1. 獲取上下文HttpContext *context = conn->GetContext()->get<HttpContext>();//2. 通過上下文對緩沖區數據進行解析,得到HttpRequest對象// 1. 如果緩沖區的數據解析出錯,就直接回復出錯響應// 2. 如果解析正常,且請求已經獲取完畢,才開始去進行處理context->RecvHttpRequest(buffer);HttpRequest &req = context->Request();HttpResponse rsp(context->RespStatu());if (context->RespStatu() >= 400) {//進行錯誤響應,關閉連接ErrorHandler(req, &rsp);//填充一個錯誤顯示頁面數據到rsp中WriteReponse(conn, req, rsp);//組織響應發送給客戶端context->ReSet();buffer->MoveReadOffset(buffer->ReadAbleSize());//出錯了就把緩沖區數據清空conn->Shutdown();//關閉連接return;}if (context->RecvStatu() != RECV_HTTP_OVER) {//當前請求還沒有接收完整,則退出,等新數據到來再重新繼續處理return;}//3. 請求路由 + 業務處理Route(req, &rsp);//4. 對HttpResponse進行組織發送WriteReponse(conn, req, rsp);//5. 重置上下文context->ReSet();//6. 根據長短連接判斷是否關閉連接或者繼續處理if (rsp.Close() == true) conn->Shutdown();//短鏈接則直接關閉}return;}public:HttpServer(int port, int timeout = DEFALT_TIMEOUT):_server(port) {_server.EnableInactiveRelease(timeout);_server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));}void SetBaseDir(const std::string &path) {assert(Util::IsDirectory(path) == true);_basedir = path;}/*設置/添加,請求(請求的正則表達)與處理函數的映射關系*/void Get(const std::string &pattern, const Handler &handler) {_get_route.push_back(std::make_pair(std::regex(pattern), handler));}void Post(const std::string &pattern, const Handler &handler) {_post_route.push_back(std::make_pair(std::regex(pattern), handler));}void Put(const std::string &pattern, const Handler &handler) {_put_route.push_back(std::make_pair(std::regex(pattern), handler));}void Delete(const std::string &pattern, const Handler &handler) {_delete_route.push_back(std::make_pair(std::regex(pattern), handler));}void SetThreadCount(int count) {_server.SetThreadCount(count);}void Listen() {_server.Start();}
};