【Linux】Reactor模式

Reactor模式

Reactor模式的定義

Reactor反應器模式,也叫做分發者模式或通知者模式,是一種將就緒事件派發給對應服務處理程序的事件設計模式。

Reactor模式的角色構成

Reactor主要由以下五個角色構成:

reactor模式的角色
角色解釋
Handle(句柄)用于標識不同的事件,本質就是一個文件描述符
Sychronous Event Demultiplexer(同步事件分離器)本質就是一個系統調用,用于等待時間發聲。對于Linux來說,同步事件分離器指的就是IO/多路復用,比如select、poll、epoll等
Event Handler(事件處理器)由多個回調方法構成,這些回調方法構成了與應用相關的對于某個事件的處理反饋
Concrete Event Handler(具體事件處理器)事件處理器中各種回調方法的具體實現
Initiation Dispatcher(初始分發器)初始分發器實際上就是reactor角色,初始分發器會通過同步事件分離器來等待事件的發生,當對應事件就緒就調用事件處理器,最后調用對應的回調方法來處理這個事件

Reactor模式的工作流程

Reactor模式的工作流程如下:

  1. 當應用向初始分發器注冊具體事件處理器時,應用會標識出該事件處理器希望初始分發器在某個事件發生時向其通知,該事件于Handle關聯。
  2. 初始分發器會要求每個事件處理器向其傳遞內部的Handle,該Handle向操作系統標識了事件處理器。
  3. 當所有的事件處理器注冊完畢后,應用會啟動初始分發器的事件循環,這時初始分發器會將每個事件處理器的Handle合并起來,并使用同步事件分離器等待這些事件的發生。
  4. 當某個事件處理器的Handle變為Ready狀態時,同步事件分離器會通知初始分發器。
  5. 初始分發器會將Ready狀態的Handle作為key,來尋找其對應的事件處理器。
  6. 初始分發器會調用其對應事件處理器當中的回調方法來響應該事件。

epoll?ET服務器(Reactor模式)

如果在此之前沒有了解過Reactor模式,相信在看了Reactor模式的工作流程后一定是一頭霧水,下面我們實現一個Reactor模式下的epoll ET服務器,來感受一下Reactor模式。

設計思路

功能完善于問題提出

之前echo版的EpollServer,只能做了檢測讀取事件的就緒,現在需要添加以下功能:

  1. 讀取數據保存的問題:
    1. 如何保證讀取上來的數據就是一條完整的報文?一次讀取不能保證,需要邊讀取邊檢測。
    2. 既然一次的讀取數據不能保證讀完,那讀上來的數據如何保存呢?建議一個堆的緩存區。套接字會有很多,那又如何確保套接字和緩沖區一 一對應呢?
    3. 在ET的工作模式下,怎么能確保通知一次,就把內核中緩沖區的數據全部讀取完呢?循環讀取,非阻塞的方式。那什么時候停止讀取操作呢?讀取上來的數據小于預期的值。
  2. 如果得到了一條完整的報文,如何提取提取有效載荷呢?定制協議+Json的序列化和反序列化!
  3. 如果讀、寫和異常事件發生了,如何知道?Epoll接口。怎么樣執行對應的方法呢?
  4. 寫事件的就緒條件是緩沖區沒有滿就是就緒的,所以,如果一直監視寫事件的話,就需要一直調用寫事件所對應的方法,而大部分時候是沒有數據可以發送的,這樣調用方法就直接返回了,浪費CPU資源 —— 結論&細節:寫事件是按需進行監視的!也就是說有數據要發送的時候才開啟對寫事件的監視。
  5. 在ET的工作模式下,要循環寫入,確保一次通知就把寫的工作干完。當輸出緩沖區的數據為空的時候結束寫的事件!

設計思路

epoll ET服務器

在epoll ET服務器中,我們需要處理如下幾種事件:

  • 讀事件:如果是監聽套接字的讀事件就緒則調用accept函數獲取底層的連接,如果是其他套接字的讀事件就緒則調用recv函數讀取客戶端發來的數據。
  • 寫事件:寫事件就緒則將待發送的數據寫入到發送緩沖區當中。
  • 異常事件:當某個套接字的異常事件就緒時我們不做過多處理,直接關閉該套接字。

當epoll ET服務器監測到某一事件就緒后,就會將該事件交給對應的服務處理程序進行處理。

Reactor模式的五個角色

在這個epoll ET服務器中,Reactor模式中的五個角色對應如下:

  • 句柄:文件描述符
  • 同步事件分離器:IO/多路復用epoll。
  • 事件處理器:包括讀回調、寫回調和異常回調。
  • 具體事件處理器:讀回調、寫回調和異常回調的具體實現。
  • 初始分發器:Reactor類當中的Dispatcher函數

Dispatcher函數要做的就是調用epoll_wait函數等待事件發生,當有事件發生后就將就緒的事件派發給對應的服務處理程序即可。

EventItem類

  • 在Reactor的工作流程中說道,在注冊事件處理器時需要將其與Handle關聯,本質上就是將讀回調、寫回調和異常回調與某個文件描述符關聯起來。
  • 這樣做的目的就是為了當某個文件描述符上的事件就緒就可以找到其對應的各種回調函數,進而執行對應的回調方法來處理該事件。

所以我們可以設計一個Eventtem類,該類中的成員就包括一個文件描述符,以及該文件描述符對應的各種回調函數。

Reactor類

  • 在Reactor的工作流程中說道,當所有事件處理器注冊完畢后,會使用同步事件分離器等待這些事件發生,當某個事件處理的Handle變成為Ready狀態時,同步事件分離器會通知初始分發器,然后初始分發器會將Ready狀態的Handle作為key來尋找其對應的事件處理器,并調用該事件處理器中對應的回調方法來響應該事件。
  • 本質就是當事件注冊完畢后,會調用epoll_wait函數來等待這些事件發生,當某個事件就緒時epoll_wait函數就會告訴調用方,然后調用方就根據就緒的文件描述符來找到其對應的各種回調函數,并調用對應的回調函數進行事件處理。

對此我們可以設計一個Reactor類。

  • 該類當中有一個成員函數叫做Dispatcher,這個函數就是所謂的初始分發器,在該函數內部會調用epoll_wait函數等待事件的發生,當事件發生后會告知Dispatcher已經就緒的事件。
  • 該類當中有一個成員函數叫做Dispatcher,這個函數就是所謂的初始分發器,在該函數內部會調用epoll_wait函數等待事件的發生,當事件發生后會告知Dispatcher已經就緒的事件。
  • 我們可以使用C++ STL當中的unordered_map,來建立各個文件描述符與其對應的EventItem結構之間的映射,這個unordered_map可以作為Reactor類的一個成員變量,當需要找某個文件描述符的EventItem結構時就可以通過該成員變量找到。
  • 當然,Reactor類當中還需要提供成員函數AddEvent和DelEvent,用于向Dispatcher當中注冊和刪除事件。

此外,在Reactor類當中還有一些其他成員,后面實現的時候再做詳細論述。

epoll ET服務器的工作流程

這個epoll ET服務器在Reactor模式下的工作流程如下:

  • 首先epoll ET服務器需要進行套接字的創建、綁定和監聽。
  • 然后定義一個Reactor對象并初始化,初始化時要做的就是創建epoll模型。
  • 緊接著需要為監聽套接字創建對應的EventItem結構,并調用Reactor類中提供的AddEvent函數將監聽套接字添加到epoll模型中,并建立監聽套接字與其對應的EventItem結構之間的映射關系。
  • 之后就可以不斷調用Reactor類中的Dispatcher函數進行事件派發。

在事件處理過程中,會不斷向Dispatcher當中新增或刪除事件,而每個事件就緒時都會自動調用其對應的回調函數進行處理,所以我們要做的就是不斷調用Dispatcher函數進行事件派發即可。

EventItem結構

EventItem結構中除了包含文件描述符和其對應的讀回調、寫回調和異常回調之外,還包含一個輸入緩沖區inbuffer、一個輸出緩沖區outbuffer以及一個回指指針R。

  • 當某個文件描述符的讀事件就緒時,我們會調用recv函數讀取客戶端發來的數據,但我們并不能保證我們讀取到了一個完整的報文,因此需要將讀取到的數據暫時存放到該文件描述符對應的inbuffer當中,當inbuffer當中可以分離出一個完整的報文后再將其分離出來進行數據處理,這里的inbuffer本質就是用來解決粘包問題的。
  • 當處理完一個報文請求后,需要將響應數據發送給客戶端,但我們并不能保證底層TCP的發送緩沖區中有足夠的空間供我們寫入,因此需要將要發送的數據暫時存放到該文件描述符對應的outbuffer當中,當底層TCP的發送緩沖區中有空間,即寫事件就緒時,再依次發送outbuffer當中的數據。
  • EventItem結構當中設置回指指針R,便于快速找到我們定義的Reactor對象,因為后續我們需要根據EventItem結構找到這個Reactor對象。比如當連接事件就緒時,需要調用Reactor類當中的AddEvent函數將其添加到Dispatcher當中。

此外,EventItem結構當中需要提供一個管理回調的成員函數,便于外部對EventItem結構當中的各種回調進行設置。

代碼如下:

typedef int(*callback_t)(EventItem*);class EventItem{
public:int _sock; //文件描述符Reactor* _R; //回指指針callback_t _recv_handler; //讀回調callback_t _send_handler; //寫回調callback_t _error_handler; //異常回調std::string _inbuffer; //輸入緩沖區std::string _outbuffer; //輸出緩沖區
public:EventItem(): _sock(-1), _R(nullptr), _recv_handler(nullptr), _send_handler(nullptr), _error_handler(nullptr){}//管理回調void ManageCallbacks(callback_t recv_handler, callback_t send_handler, callback_t error_handler){_recv_handler = recv_handler;_send_handler = send_handler;_error_handler = error_handler;}~EventItem(){}
};

Reactor類

在Reactor類當中有一個unordered_map成員,用于建立文件描述符和與其對應的EventItem結構之間的映射,還有一個epfd成員,該成員是epoll模型對應的文件描述符。

  • 在初始化Reactor對象的時候就可以調用epoll_create函數創建epoll模型,并將該epoll模型對應的文件描述符用epfd成員記錄下來,便于后續使用。
  • Reactor對象在析構的時候,需要調用close函數將該epoll模型進行關閉。

代碼如下:

#define SIZE 256class Reactor{
private:int _epfd; //epoll模型std::unordered_map<int, EventItem> _event_items; //建立sock與EventItem結構的映射
public:Reactor(): _epfd(-1){}void InitReactor(){//創建epoll模型_epfd = epoll_create(SIZE);if (_epfd < 0){std::cerr << "epoll_create error" << std::endl;exit(5);}}~Reactor(){if (_epfd >= 0){close(_epfd);}}
};

Dispatcher函數(事件分派器)

Reactor類當中的Dispatcher函數就是之前所說的初始分發器,這里我們更形象的將其稱之為事件分派器。

  • 事件分派器要做的就是調用epoll_wait函數等待事件發生。
  • 當某個文件描述符上的事件發生后,先通過unordered_map找到該文件描述符對應的EventItem結構,然后調用EventItem結構當中對應的回調函數對該事件進行處理即可。

代碼如下:

#define MAX_NUM 64class Reactor{
private:int _epfd; //epoll模型std::unordered_map<int, EventItem> _event_items; //建立sock與EventItem結構的映射
public://事件分派器void Dispatcher(int timeout){struct epoll_event revs[MAX_NUM];int num = epoll_wait(_epfd, revs, MAX_NUM, timeout);for (int i = 0; i < num; i++){int sock = revs[i].data.fd; //就緒的文件描述符if ((revs[i].events&EPOLLERR) || (revs[i].events&EPOLLHUP)){ //異常事件就緒(優先處理)if (_event_items[sock]._error_handler)_event_items[sock]._error_handler(&_event_items[sock]); //調用異常回調}if (revs[i].events&EPOLLIN){ //讀事件就緒if (_event_items[sock]._recv_handler)_event_items[sock]._recv_handler(&_event_items[sock]); //調用讀回調}if (revs[i].events&EPOLLOUT){ //寫事件就緒if (_event_items[sock]._send_handler)_event_items[sock]._send_handler(&_event_items[sock]); //調用寫回調}}}
};

說明一下:

  • 這里沒有用switch或if語句對epoll_wait函數的返回值進行判斷,而是借用for循環對其返回值進行了判斷。
  • 如果epoll_wait的返回值為-1則說明epoll_wait函數調用失敗,此時不會進入到for循環內部進行事件處理。
  • 如果epoll_wait的返回值為0則說明epoll_wait函數超時返回,此時也不會進入到for循環內部進行事件處理。
  • 如果epoll_wait的返回值大于0則說明epoll_wait函數調用成功,此時才會進入到for循環內部調用對應的回調函數對事件進行處理。
  • 事件處理時最好先對異常事件進行處理,因此代碼中將異常事件的判斷放在了最前面。

AddEvent函數

Reactor類當中的AddEvent函數是用于進行事件注冊的。???

  • 在注冊事件時需要傳入一個文件描述符和一個事件集合,表示需要監視哪個文件描述符上的哪些事件。
  • 還需要傳入該文件描述符對應的EventItem結構,表示當該文件描述符上的事件就緒后應該執行的回調方法。
  • 在AddEvent函數內部要做的就是,調用epoll_ctl函數將該文件描述符及其對應的事件集合注冊到epoll模型當中,然后建立該文件描述符與其對應的EventItem結構的映射關系。

代碼如下:

class Reactor{
private:int _epfd; //epoll模型std::unordered_map<int, EventItem> _event_items; //建立sock與EventItem結構的映射
public:void AddEvent(int sock, uint32_t event, const EventItem& item){struct epoll_event ev;ev.data.fd = sock;ev.events = event;if (epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev) < 0){ //將該文件描述符添加到epoll模型當中std::cerr << "epoll_ctl add error, fd: " << sock << std::endl;}else{//建立sock與EventItem結構的映射關系_event_items.insert({ sock, item });std::cout << "添加: " << sock << " 到epoll模型中,成功" << std::endl;}}
};

DelEvent函數?

Reactor類當中的DelEvent函數是用于進行事件刪除的。

  • 在刪除事件時只需要傳入一個文件描述符即可。
  • 在DelEvent函數內部要做的就是,調用epoll_ctl函數將該文件描述符從epoll模型中刪除,并取消該文件描述符與其對應的EventItem結構的映射關系。

代碼如下:

class Reactor{
private:int _epfd; //epoll模型std::unordered_map<int, EventItem> _event_items; //建立sock與EventItem結構的映射
public:void DelEvent(int sock){if (epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr) < 0){ //將該文件描述符從epoll模型中刪除std::cerr << "epoll_ctl del error, fd: " << sock << std::endl;}else{//取消sock與EventItem結構的映射關系_event_items.erase(sock);std::cout << "從epoll模型中刪除: " << sock << ",成功" << std::endl;}}
};

EnableReadWrite函數

Reactor類當中的EnableReadWrite函數,用于使能或使能某個文件描述符的讀寫事件。

  • 調用EnableReadWrite函數時需要傳入一個文件描述符,表示需要設置的是哪個文件描述符對應的事件。
  • 還需要傳入兩個bool值,分別表示需要使能還是使能讀寫事件。
  • EnableReadWrite函數內部會調用epoll_ctl函數修改將該文件描述符的監聽事件。

代碼如下:

class Reactor{
private:int _epfd; //epoll模型std::unordered_map<int, EventItem> _event_items; //建立sock與EventItem結構的映射
public:void EnableReadWrite(int sock, bool read, bool write){struct epoll_event ev;ev.data.fd = sock;ev.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0){ //修改該文件描述符所需要監視的事件std::cerr << "epoll_ctl mod error, fd: " << sock << std::endl;}}
};

回調函數

下面我們就可以實現一些回調函數,這里主要實現四個回調函數。

  • accepter:當連接事件到來時可以調用該回調函數獲取底層建立好的連接。
  • recver:當讀事件就緒時可以調用該回調函數讀取客戶端發來的數據并進行處理。
  • sender:當寫事件就緒時可以調用該回調函數向客戶端發送響應數據。
  • errorer:當異常事件就緒時可以調用該函數將對應的文件描述符進行關閉。

當我們為某個文件描述符創建EventItem結構時,就可以調用EventItem類提供的ManageCallbacks函數,將這些回調函數到EventItem結構當中。

  • 我們會將監聽套接字對應的EventItem結構當中的recv_handler設置為accepter,因為監聽套接字的讀事件就緒就意味著連接事件就緒了,而監聽套接字一般只關心讀事件,因此監聽套接字對應的send_handler和error_handler可以設置為nullptr。
  • 當Dispatcher監測到監聽套接字的讀事件就緒時,會調用監聽套接字對應的EventItem結構當中的recv_handler回調,此時就會調用accepter回調獲取底層建立好的連接。
  • 而對于與客戶端建立連接的套接字,我們會將其對應的EventItem結構當中的recv_handler、send_handler和error_handler分別設置為這里的recver、sender和error。
  • 當Dispatcher監測到這些套接字的事件就緒時,就會調用其對應的EventItem結構當中對應的回調函數,也就是這里的recver、sender和error。

accepter回調

accepter回調用于處理連接事件,其工作流程如下:

  1. 調用accept函數獲取底層建立好的連接。
  2. 將獲取到的套接字設置為非阻塞,并為其創建EventItem結構,填充EventItem結構當中的各個字段,并注冊該套接字相關的回調方法。
  3. 將該套接字及其對應需要關心的事件注冊到Dispatcher當中。

下一次Dispatcher在進行事件派發時就會幫我們關注該套接字對應的事件,當事件就緒時就會執行該套接字對應的EventItem結構中對應的回調方法。

代碼如下:

int accepter(EventItem* item)
{while (true){struct sockaddr_in peer;memset(&peer, 0, sizeof(peer));socklen_t len = sizeof(peer);int sock = accept(item->_sock, (struct sockaddr*)&peer, &len);if (sock < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){ //并沒有讀取出錯,只是底層沒有連接了return 0;}else if (errno == EINTR){ //讀取的過程被信號中斷了continue;}else{ //獲取連接失敗std::cerr << "accept error" << std::endl;return -1;}}SetNonBlock(sock); //將該套接字設置為非阻塞//構建EventItem結構EventItem sock_item;sock_item._sock = sock;sock_item._R = item->_R;sock_item.ManageCallbacks(recver, sender, errorer); //注冊回調方法Reactor* R = item->_R;R->AddEvent(sock, EPOLLIN | EPOLLET, sock_item); //將該套接字及其對應的事件注冊到Dispatcher中}return 0;
}

需要注意的是,因為這里實現的ET模式下的epoll服務器,因此在獲取底層連接時需要循環調用accept函數進行讀取,并且監聽套接字必須設置為非阻塞。

  • 因為ET模式下只有當底層建立的連接從無到有或是從有到多時才會通知上層,如果沒有一次性將底層建立好的連接全部獲取,并且此后再也沒有建立好的連接,那么底層沒有讀取完的連接就相當于丟失了,所以需要循環多次調用accept函數獲取底層建立好的連接。
  • 循環調用accept函數也就意味著,當底層連接全部被獲取后再調用accept函數,此時就會因為底層已經沒有連接了而被阻塞住,因此需要將監聽套接字設置為非阻塞,這樣當底層沒有連接時accept就會返回,而不會被阻塞住。

accept獲取到的新的套接字也需要設置為非阻塞,就是為了避免將來循環調用recv、send等函數時被阻塞。

設置文件描述符為非阻塞

設置文件描述符為非阻塞時,需要先調用fcntl函數獲取該文件描述符對應的文件狀態標記,然后在該文件狀態標記的基礎上添加非阻塞標記O_NONBLOCK,最后調用fcntl函數對該文件描述符的狀態標記進行設置即可。

代碼如下:

//設置文件描述符為非阻塞
bool SetNonBlock(int sock)
{int fl = fcntl(sock, F_GETFL);if (fl < 0){std::cerr << "fcntl error" << std::endl;return false;}fcntl(sock, F_SETFL, fl | O_NONBLOCK);return true;
}

監聽套接字設置為非阻塞后,當底層連接不就緒時,accept函數會以出錯的形式返回,因此當調用accept函數的返回值小于0時,需要繼續判斷錯誤碼。

  • 如果錯誤碼為EAGAINEWOULDBLOCK,說明本次出錯返回是因為底層已經沒有可獲取的連接了,此時底層連接全部獲取完畢,這時我們可以返回0,表示本次accepter調用成功。
  • 如果錯誤碼為EINTR,說明本次調用accept函數獲取底層連接時被信號中斷了,這時還應該繼續調用accept函數進行獲取。
  • 除此之外,才說明accept函數是真正調用失敗了,這時我們可以返回-1,表示本次accepter調用失敗。

accept、recv和send等IO系統調用為什么會被信號中斷?

IO系統調用函數出錯返回并且將錯誤碼設置為EINTR,表明本次在進行數據讀取或數據寫入之前就被信號中斷了,也就是說IO系統調用在陷入內核,但并沒有返回用戶態的時候內核跑去處理其他信號了。

  • 在內核態返回用戶態之前檢查信號的pending位圖,也就是未決信號集,如果pending位圖中有未處理的信號,那么內核就會對該信號進行處理。
  • 但IO系統調用函數在進行IO操作之前就被信號中斷了,這實際上就是一個特例,因為IO過程分為“等”和“拷貝”兩個步驟,而一般“等”的過程比較漫長,而這個過程中我們的執行流其實是處于閑置狀態的,因此在“等”的過程中如果有信號產生,內核就會立即進行信號的處理。

寫事件是按需打開的

這里調用accept獲取上來的套接字在添加到Dispatcher中時,只添加了EOPLLINEPOLLET事件,也就是說只讓epoll幫我們關心該套接字的讀事件。

  • 這里之所以沒有添加寫事件,是因為當前我們并沒有要發送的數據,因此沒有必要讓epoll幫我們關心寫事件。
  • 一般讀事件是經常會被設置的,而寫事件則是按需打開的,只有當我們有數據要發送時才會將寫事件打開,并且在數據全部寫入完畢后又會立即將寫事件關閉。

recver回調

recver回調用于處理讀事件,其工作流程如下:

  1. 循環調用recv函數讀取數據,并將讀取到的數據添加到該套接字對應EventItem結構的inbuffer當中。
  2. 對inbuffer當中的數據進行切割,將完整的報文切割出來,剩余的留在inbuffer當中。
  3. 對切割出來的完整報文進行反序列化。
  4. 業務處理。
  5. 業務處理后形成響應報文。
  6. 將響應報頭添加到對應EventItem結構的outbuffer當中,并打開寫事件。

下一次Dispatcher在進行事件派發時就會幫我們關注該套接字的寫事件,當寫事件就緒時就會執行該套接字對應的EventItem結構中寫回調方法,進而將outbuffer中的響應數據發送給客戶端。

代碼如下:

int recver(EventItem* item)
{if (item->_sock < 0) //該文件描述符已經被關閉return -1;//1、數據讀取if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){ //讀取失敗item->_error_handler(item);return -1;}//2、報文切割std::vector<std::string> datagrams;StringUtil::Split(item->_inbuffer, &datagrams, "X");for (auto s : datagrams){//3、反序列化struct data d;StringUtil::Deserialize(s, &d._x, &d._y, &d._op);//4、業務處理int result = 0;switch (d._op){case '+':result = d._x + d._y;break;case '-':result = d._x - d._y;break;case '*':result = d._x * d._y;break;case '/':if (d._y == 0){std::cerr << "Error: div zero!" << std::endl;continue; //繼續處理下一個報文}else{result = d._x / d._y;}break;case '%':if (d._y == 0){std::cerr << "Error: mod zero!" << std::endl;continue; //繼續處理下一個報文}else{result = d._x % d._y;}break;default:std::cerr << "operation error!" << std::endl;continue; //繼續處理下一個報文}//5、形成響應報文std::string response;response += std::to_string(d._x);response += d._op;response += std::to_string(d._y);response += "=";response += std::to_string(result);response += "X"; //報文與報文之間的分隔符//6、將響應報文添加到outbuffer中item->_outbuffer += response;if (!item->_outbuffer.empty())item->_R->EnableReadWrite(item->_sock, true, true); //打開寫事件}return 0;
}

一、數據讀取

我們可以將循環調用recv函數讀取數據的過程封裝成一個recver_helper函數。

  • recver_helper函數要做的就是循環調用recv函數將讀取到的數據添加到inbuffer當中。
  • 當recv函數的返回值小于0時同樣需要進一步判斷錯誤碼,如果錯誤碼為EAGAINEWOULDBLOCK則說明底層數據讀取完畢了,如果錯誤碼為EINTR則說明讀取過程被信號中斷了,此時還需要繼續調用recv函數進行讀取,否則就是讀取出錯了。
  • 當讀取出錯時直接調用該套接字對應的error_handler回調,最終就會調用到下面將要實現的errorer回調,在我們會在errorer回調當中將該套接字進行關閉。

代碼如下:

int recver_helper(int sock, std::string* out)
{while (true){char buffer[128];ssize_t size = recv(sock, buffer, sizeof(buffer)-1, 0);if (size < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){ //數據讀取完畢return 0;}else if (errno == EINTR){ //被信號中斷,繼續嘗試讀取continue;}else{ //讀取出錯return -1;}}else if (size == 0){ //對端連接關閉return -1;}//讀取成功buffer[size] = '\0';*out += buffer; //將讀取到的數據添加到該套接字對應EventItem結構的inbuffer中}
}

二、報文切割

報文切割本質就是為了防止粘包問題,而粘包問題實際是涉及到協議定制的。

  • 因為我們需要根據協議知道如何將各個報文進行分離,比如UDP分離報文采用的就是定長報頭+自描述字段。
  • 我們的目的是演示整個數據處理的過程,為了簡單起見就不進行過于復雜的協議定制了,這里我們就以“X”作為各個報文之間的分隔符,每個報文的最后都會以一個“X”作為報文結束的標志。
  • 因此現在要做的就是以“X”作為分隔符對inbuffer當中的字符串進行切割,這里將這個過程封裝成一個Split函數并放到一個StringUtil工具類當中。
  • Split函數要做的就是對inbuffer當中的字符串進行切割,將切割出來的一個個報文放到vector當中,對于最后無法切出完整報文的數據就留在inbuffer當中即可。

代碼如下:

class StringUtil{
public:static void Split(std::string& in, std::vector<std::string>* out, std::string sep){int start = 0;size_t pos = in.find(sep, start);while (pos != std::string::npos){out->push_back(in.substr(start, pos - start));start = pos + sep.size();pos = in.find(sep, start);}in = in.substr(start);}
};

三、反序列化

在數據發送之前需要進行序列化encode,接收到數據之后需要對數據進行反序列化decode。

  • 序列化就是將對象的狀態信息轉換為可以存儲或傳輸的形式(字節序列)的過程。
  • 反序列化就是把字節序列恢復為原對象的過程。

實際反序列化也是與協議定制相關的,假設這里的epoll服務器向客戶端提供的就是計算服務,客戶端向服務器發來的都是需要服務器計算的計算表達式,因此可以用一個結構體來描述這樣一個計算表達式,結構體當中包含兩個操作數x和y,以及一個操作符op。

struct data{int _x;int _y;char _op;
};

此時這里所謂的反序列化就是將一個計算表達式轉換成這樣一個結構體,

  • 因此現在要做的就是將形如“1+2”這樣的計算表達式轉換成一個結構體,該結構體當中的x成員的值就是1,y的值就是2,op的值就是‘+’,這里將這個過程封裝成一個Deserialize函數并放到StringUtil工具類當中。
  • Deserialize函數要做的工作其實也很簡單,就是在傳入的字符串當中找到操作符op,此時操作符左邊的就是操作數x,右邊的就是操作數y。

代碼如下:

class StringUtil{
public:static void Deserialize(std::string& in, int* x, int* y, char* op){size_t pos = 0;for (pos = 0; pos < in.size(); pos++){if (in[pos] == '+' || in[pos] == '-' || in[pos] == '*' || in[pos] == '/' || in[pos] == '%')break;}if (pos < in.size()){std::string left = in.substr(0, pos);std::string right = in.substr(pos + 1);*x = atoi(left.c_str());*y = atoi(right.c_str());*op = in[pos];}else{*op = -1;}}
};

說明一下:?實際在做項目時不需要我們自己進行序列化和反序列化,我們一般會直接用JSON或XML這樣的序列化反序列化工具。

四、業務處理

業務處理就是服務器拿到客戶端發來的數據后,對數據進行數據分析,最終拿到客戶端想要的資源。

  • 我們這里要做的業務處理非常簡單,就是用反序列化后的數據進行數據計算,此時得到的計算結果就是客戶端想要的。

五、形成響應報文

在業務處理后我們已經拿到了客戶端想要的數據,現在我們要做的就是形成響應報文,由于我們這里規定每個報文都以“X”作為報文結束的標志,因此在形成響應報文的時候,就需要在每一個計算結果后面都添加上一個“X”,表示這是之前某一個請求報文的響應報文,因為協議制定后就需要雙方遵守。

六、將響應報文添加到outbuffer中

響應報文構建完后需要將其添加到該套接字對應的outbuffer中,并打開該套接字的寫事件,此后當寫事件就緒時就會將outbuffer當中的數據發送出去。

sender回調

sender回調用于處理寫事件,其工作流程如下:

  1. 循環調用send函數發送數據,并將發送出去的數據從該套接字對應EventItem結構的outbuffer中刪除。
  2. 如果循環調用send函數后該套接字對應的outbuffer當中的數據被全部發送,此時就需要將該套接字對應的寫事件關閉,因為已經沒有要發送的數據了,如果outbuffer當中的數據還有剩余,那么該套接字對應的寫事件就應該繼續打開。

代碼如下:

int sender(EventItem* item)
{if (item->_sock < 0) //該文件描述符已經被關閉return -1;int ret = sender_helper(item->_sock, item->_outbuffer);if (ret == 0){ //全部發送成功,不再關心寫事件item->_R->EnableReadWrite(item->_sock, true, false);}else if (ret == 1){ //沒有發送完畢,還需要繼續關心寫事件item->_R->EnableReadWrite(item->_sock, true, true);}else{ //寫入出錯item->_error_handler(item);}return 0;
}

我們可以將循環調用send函數發送數據的過程封裝成一個sender_helper函數。

  • sender_helper函數要做的就是循環調用send函數將outbuffer中的數據發送出去。
  • 當send函數的返回值小于0時也需要進一步判斷錯誤碼,如果錯誤碼為EAGAINEWOULDBLOCK則說明底層TCP發送緩沖區已經被寫滿了,這時需要將已經發送的數據從outbuffer中移除。
  • 如果錯誤碼為EINTR則說明發送過程被信號中斷了,此時還需要繼續調用send函數進行發送,否則就是發送出錯了。
  • 當發送出錯時也直接調用該套接字對應的error_handler回調,最終就會調用到下面將要實現的errorer回調,在我們會在errorer回調當中將該套接字進行關閉。
  • 如果最終outbuffer當中的數據全部發送成功,則將outbuffer清空即可。

代碼如下:

int sender_helper(int sock, std::string& in)
{size_t total = 0; //累加已經發送的字節數while (true){ssize_t size = send(sock, in.c_str() + total, in.size() - total, 0);if (size < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){ //底層發送緩沖區已經沒有空間了in.erase(0, total); //將已經發送的數據移出outbufferreturn 1; //緩沖區寫滿,沒寫完}else if (errno == EINTR){ //被信號中斷,繼續嘗試寫入continue;}else{ //寫入出錯return -1;}}total += size;if (total >= in.size()){in.clear(); //清空outbufferreturn 0; //全部寫入完畢}}
}

errorer回調

errorer回調用于處理異常事件。

  • 對于異常事件就緒的套接字我們這里不做其他過多的處理,簡單的調用close函數將該套接字關閉即可。
  • 但是在關閉該套接字之前,需要先調用DelEvent函數將該套接字從epoll模型中刪除,并取消該套接字與其對應的EventItem結構的映射關系。
  • 由于在Dispatcher當中是先處理的異常事件,為了避免該套接字被關閉后繼續進行讀寫操作,然后因為讀寫操作失敗再次調用errorer回調重復關閉該文件描述符,因此在關閉該套接字后將其EventItem當中的文件描述符值設置為-1。
  • 在調用recver和sender回調執行讀寫操作之前,都會判斷該EventItem結構當中的文件描述符值是否有效,如果無效則不會進行后續操作。

代碼如下:

int errorer(EventItem* item)
{item->_R->DelEvent(item->_sock); //將該文件描述符從epoll模型中刪除,并取消該文件描述符與其EventItem結構的映射關系close(item->_sock); //關閉該文件描述符item->_sock = -1; //防止關閉后繼續執行讀寫回調return 0;
}

套接字相關

這里可以編寫一個Socket類,對套接字相關的接口進行一定程度的封裝,為了讓外部能夠直接調用Socket類當中封裝的函數,于是將這些函數定義成了靜態成員函數。

代碼如下:

class Socket{
public://創建套接字static int SocketCreate(){int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){std::cerr << "socket error" << std::endl;exit(2);}//設置端口復用int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));return sock;}//綁定static void SocketBind(int sock, int port){struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;socklen_t len = sizeof(local);if (bind(sock, (struct sockaddr*)&local, len) < 0){std::cerr << "bind error" << std::endl;exit(3);}}//監聽static void SocketListen(int sock, int backlog){if (listen(sock, backlog) < 0){std::cerr << "listen error" << std::endl;exit(4);}}
};

運行epoll ET服務器

運行我們的epoll ET服務器的步驟如下:

  • 首先需要進行的就是套接字的創建、綁定和監聽,因為是ET模式下的epoll服務器,因此監聽套接字創建出來后需要將其設置為非阻塞。
  • 然后就可以實例化一個Reactor對象,并對其進行初始化,也就是創建epoll模型。
  • 緊接著需要為監聽套接字定義一個EventItem結構,填充EventItem結構當中的各個字段,并將accepter回調設置為監聽套接字的讀回調方法。
  • 然后調用AddEvent函數將監聽套接字及其需要關系的事件添加到Dispatcher當中,該過程包括將監聽套接字注冊到epoll模型中,以及建立監聽套接字與其對應EventItem結構的映射。
  • 最后就可以循環調用Reactor類當中的Dispatcher函數進行事件派發了。

代碼如下:

#include "app_interface.hpp"
#include "reactor.hpp"
#include "socket.hpp"
#include "util.hpp"
#include <string>#define BACK_LOG 5static void Usage(std::string proc)
{std::cout << "Usage: " << proc << " port" << std::endl;
}
int main(int argc, char* argv[])
{if (argc != 2){Usage(argv[0]);exit(1);}int port = atoi(argv[1]);//服務器監聽套接字的創建、綁定和監聽int listen_sock = Socket::SocketCreate();SetNonBlock(listen_sock); //將監聽套接字設置為非阻塞Socket::SocketBind(listen_sock, port);Socket::SocketListen(listen_sock, BACK_LOG);//創建Reactor,并初始化Reactor R;R.InitReactor();//創建監聽套接字對應的EventItem結構            EventItem item;item._sock = listen_sock;item._R = &R;item.ManageCallbacks(accepter, nullptr, nullptr); //監聽套接字只需要關心讀事件//將監聽套接字托管給DispatcherR.AddEvent(listen_sock, EPOLLIN | EPOLLET, item);//循環進行事件派發int timeout = 1000;while (true){R.Dispatcher(timeout);}return 0;
}

參考文獻:

http://t.csdn.cn/pN4A9

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/39904.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/39904.shtml
英文地址,請注明出處:http://en.pswp.cn/news/39904.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

保姆級別講解Python數據處理,你絕對能會

名字&#xff1a;阿玥的小東東 學習&#xff1a;Python、C/C 主頁鏈接&#xff1a;阿玥的小東東的博客_CSDN博客-python&&c高級知識,過年必備,C/C知識講解領域博主 目錄 1. 文件讀取 2. 數據處理 3. 處理結果輸出 總的來說 為了咱們讓程序跑起來&#xff0c;我們需…

DAY3,ARM(LED點燈實驗)

1.匯編實現開發板三盞燈點亮熄滅&#xff1b; .text .global _start _start: /**********LED123點燈**************/RCC_INIT:1使能PE10 PF10 PE8RCC..寄存器,E[4]1 F[5]1 0x50000a28ldr r0,0x50000a28ldr r1,[r0]orr r1,r1,#(0x3 << 4)str r1,[r0]LED1_INET:2初始化LED…

酷開系統 | 酷開科技大數據,更好的與目標消費人群建立聯系

眾所周知&#xff0c;OTT的一大優勢在于強曝光&#xff0c;能夠給消費者帶來強烈的視覺沖擊&#xff0c;強化品牌認知。但是&#xff0c;要想達到提升品牌認知&#xff0c;首先要保證OTT的流量規模&#xff0c;實現對目標人群的有效覆蓋。得年輕消費者得“天下”&#xff0c;年…

tk切換到mac的code分享

文章目錄 前言一、基礎環境配置二、開發軟件與擴展1.用到的開發軟件與平替、擴展情況 總結 前言 最近換上了coding人生的第一臺mac&#xff0c;以前一直偏好tk&#xff0c;近來身邊的朋友越來越多的用mac了&#xff0c;win的自動更新越來越占磁盤了&#xff0c;而且win11拋棄了…

vue elementui v-for 循環el-table-column 第一列數據變到最后一個

這個動態渲染table表格時發現el-table-column 第一列數據變到最后一個 序號被排到后面 代碼 修改后 <el-table:data"tableData"tooltip-effect"dark"style"width: 100%"height"500"><template v-for"(item, index) i…

PostCSS在vue中的使用

1、安裝 PostCSS 和所需的插件。在命令行中運行以下命令&#xff1a; npm install postcss autoprefixer cssnano postcss-pxtorem --save-dev 這將安裝 PostCSS、Autoprefixer、CSSnano 和 postcss-pxtorem 插件&#xff0c;同時將它們添加到項目的開發依賴中。 2、在項目根目…

每天一道leetcode:1926. 迷宮中離入口最近的出口(圖論中等廣度優先遍歷)

今日份題目&#xff1a; 給你一個 m x n 的迷宮矩陣 maze &#xff08;下標從 0 開始&#xff09;&#xff0c;矩陣中有空格子&#xff08;用 . 表示&#xff09;和墻&#xff08;用 表示&#xff09;。同時給你迷宮的入口 entrance &#xff0c;用 entrance [entrancerow, …

SpringBoot的配置文件(properties與yml)

文章目錄 1. 配置文件的作用2. 配置文件格式3. 配置文件的使用方法3.1. properties配置文件3.1.1. 基本語法和使用3.1.2. properties優缺點分析 3.2. yml配置文件3.2.1. 基本語法與使用3.2.2. yml中單雙引號問題3.2.3. yml配置不同類型的數據類型及null3.2.4. 配置對象3.2.5. 配…

android設置豎屏仍然跟隨屏幕旋轉怎么辦

如題所問&#xff0c;我最近遇到一個bug&#xff0c;就是設置了搖感&#xff0c;然后有用戶反饋說設置了手機下拉的系統設置-屏幕旋轉-關閉。然后屏幕還是會旋轉的問題。 首先&#xff0c;我們先從如何設置橫豎屏了解下好了 設置橫屏和豎屏的方法&#xff1a; 方法一&#x…

uni-app引入sortable列表拖拽,兼容App和H5,拖拽排序。

效果: 拖拽排序 背景&#xff1a; 作為一名前端開發人員&#xff0c;在工作中難免會遇到拖拽功能&#xff0c;分享一個github上一個不錯的拖拽js庫&#xff0c;能滿足我們在項目開發中的需要&#xff0c;下面是我在uniapp中使用SortableJS的使用詳細流程&#xff1b; vue開發…

Centos7安裝docker后默認開啟docker0的網卡|卸載默認網卡

一&#xff1a; 停掉服務 systemctl stop docker [rootwww ~]# systemctl stop docker [rootwww ~]# systemctl status docker ● docker.service - Docker Application Container Engine Loaded: loaded (/usr/lib/systemd/system/docker.service; enabled; vendor prese…

神經網絡基礎-神經網絡補充概念-27-深層網絡中的前向傳播

概念 深層神經網絡中的前向傳播是指從輸入數據開始&#xff0c;逐層計算每個神經元的輸出值&#xff0c;直到得到最終的預測值。 一般步驟 1輸入數據傳遞&#xff1a; 將輸入數據傳遞給網絡的輸入層。輸入數據通常是一個特征矩陣&#xff0c;每一列代表一個樣本&#xff0c;…

【bug】Unity無法創建項目

bug UnityHub無法創建項目 UnityHub無法創建項目 出現的問題&#xff1a;在創建新項目時彈出來一個 無法創建項目 嘗試的方法&#xff1a; 刷新許可證 ?沒用退出賬號重新登陸 ?沒用重啟電腦 ?沒用 最后發現是什么問題呢&#xff1f; 2021.3.3這個版本我之前在資源管理器中…

SpringBoot概述及項目的創建使用

文章目錄 一. Spring Boot概述1. 什么是Spring Boot&#xff1f;2. Spring Boot的優點 二. Spring Boot項目的創建1. 使用IDEA創建1.1. 準備工作1.2. 創建運行Spring Boot項目1.3. 進行Web交互1.4. 目錄工程介紹1.5. 項目快速添加依賴1.6. 防止配置文件亂碼所需的配置1.7. Spri…

Docker實戰專欄簡介

&#x1f337;&#x1f341; 博主貓頭虎 帶您 Go to New World.?&#x1f341; &#x1f984; 博客首頁——貓頭虎的博客&#x1f390; &#x1f433;《面試題大全專欄》 文章圖文并茂&#x1f995;生動形象&#x1f996;簡單易學&#xff01;歡迎大家來踩踩~&#x1f33a; &a…

【iMessage蘋果推?IM推送】群控腳本當Apple APNS推送服務器從您的應用程序接吸收注冊消息時,它將為您回到一串devicetoken

推薦內容IMESSGAE相關 作者??IMEAE推薦內容iMessage蘋果推軟件 *** 點擊即可查看作者要求內容信息作者??IMEAE推薦內容1.家庭推內容 *** 點擊即可查看作者要求內容信息作者??IMEAE推薦內容2.相冊推 *** 點擊即可查看作者要求內容信息作者??IMEAE推薦內容3.日歷推 *** …

Rust軟件外包開發語言的特點

Rust 是一種系統級編程語言&#xff0c;強調性能、安全性和并發性的編程語言&#xff0c;適用于廣泛的應用領域&#xff0c;特別是那些需要高度可靠性和高性能的場景。下面和大家分享 Rust 語言的一些主要特點以及適用的場合&#xff0c;希望對大家有所幫助。北京木奇移動技術有…

MongoDB:簡單的增刪改查操作

一.概述 本篇文章介紹在Navicat中對MongoDB數據庫進行增刪改查操作,在后面會介紹在Spring Boot中使用MongoTemplate對MongoDB數據庫進行相關操作.如有必要可以先看看前面幾篇文章. MongoDB:MySQL,Redis,ES,MongoDB的應用場景 MongoDB:數據庫初步應用 二.在Navicat進行增刪改…

linux系統服務學習(七)NFS服務、DHCP服務

文章目錄 一、NFS服務概述1、任務背景2、環境準備3、NFS概述4、NFS組成5、與NFS相關的軟件包6、安裝NFS軟件7、NFS的配置文件 二、NFS實驗1、搭建NFS服務器2、編寫NFS主配置文件3、啟動相關的NFS服務4、搭建Web服務器5、在Web服務器中掛載NFS6、上傳aws.mp4視頻到NFS服務器的/s…

人工智能學習框架—飛槳Paddle人工智能

1.人工智能框架 機器學習的三要素&#xff1a;模型、學習策略、優化算法。 當我們用機器學習來解決一些模式識別任務時&#xff0c;一般的流程包含以下幾個步驟&#xff1a; 1.1.淺層學習和深度學習 淺層學習(Shallow Learning)&#xff1a;不涉及特征學習&#xff0c;其特征…