仿muduo庫實現并發服務器
- 一.eventloop模塊
- 1.成員變量
- std::thread::id _thread_id;//線程ID
- Poller _poll;
- int _event_fd;
- std::vector<Function<Function>> _task;
- TimerWheel _timer_wheel
- 2.EventLoop構造
- 3.針對eventfd的操作
- 4.針對poller的操作
- 5.針對threadID的操作
- 6.針對TaskQueue的操作
- 7.針對定時器的操作
- 8.EventLoop的主要工作
- 二.全部代碼
一.eventloop模塊
一個Eventloop對應一個線程
Eventloop模塊它是進行事件監控以及事件處理的模塊。
一個Eventlopp對應一個線程。
當Eventloop監控了一個連接,而這個連接一旦有事件就緒,就會去處理。
如果這個連接在多個線程中都觸發了事件(前提是描述符被多個線程Eventloop監控),就會存在線程安全問題。所以為了避免這個問題,就要求連接必須在一個線程中處理。
也就是連接的所有操作都必須在同一個線程中處理,將連接的事件監控,以及連接事件處理,關閉連接等操作都放在同一個線程中進行。
也因為連接的內部函數可能會被不同的線程同時執行,就會存在線程安全問題。
如何保證一個連接的所有操作都在同一個線程中執行呢?
連接是無法與線程進行綁定的,但是連接可以與Eventloop綁定,而Eventloop則與線程是綁定的。
所以每個連接都有對應的Eventloop,只要讓連接去對應的Eventloop中執行就是在同一個線程中執行。
這其實就相當于把我們的channel跟eventloop給關聯到了一起之間
的關聯到了一起,而eventloop又跟線程是一一對應的,它是在線程里邊去運行的。
eventloop模塊的功能都是在同一個線程里邊去完成的,一旦它監控了連接,并且連接綁定的是同一個eventloop,那也就意味著。這個連接呢,它的一個監控以及它的處理是同一個線程里邊。
eventloop的處理流程:
1.首先在線程中對連接進行事件監控
2.當連接有事件就緒則就進行事件處理(調用回調函數)
3.所有的就緒事件都處理完了,再去任務隊列中將任務一一取出來執行。
如何保證處理連接的回調函數中的操作都是在同一個線程中進行的呢?
解決方法:任務隊列
給每個Eventloop都添加一個任務隊列。
對連接的所有操作(事件監控,事件處理,關閉連接等)都進行一次封裝,向外部提供的連接操作并不直接執行,而是將連接的操作以任務形式壓入到任務隊列里。
不過要注意并不是將所有回調函數都壓入到任務隊列中去,當前執行函數的線程如果就是該連接綁定的eventloop對應的線程,那么就可以直接對函數執行,不需要再壓入到任務隊列中,如果不一樣那么就需要壓入到任務隊列中。
因為如果別的線程要執行當前的線程中的conn的對象的函數的時候,是不會執行的,因為會涉及到安全問題,所以就需要吧他執行的任務放到這個任務隊列中去,讓當前線程自己執行。
通知機制eventfd
因為有可能因為等待描述符IO事件就緒,導致執行流流程阻塞,這時候任務隊列中的任務將得不到脅行因此得有一個事件通知的東西,能夠喚醒事件監控的阻塞
當線程將任務壓入到任務隊列時,會存在這樣情況,任務隊列里有任務,但不能執行。
為什么呢?因為epoll會因為沒有事件就緒而阻塞住,一旦阻塞住了就無法往后執行任務隊列里的任務了。
所以我們就需要一個通知機制,當一旦任務隊列中有任務時,就喚醒可能阻塞的epoll監控。
1.成員變量
private:// 每一個eventloop對應一個線程,當線程創建時,eventloop就會被創建出來并綁定線程的idstd::thread::id _thread_id;// eventloop是一個監控管理模塊,里面封裝了poller,用來監控所有的連接的事件Poller _poll;// 需要一個通知機制eventfd,用來喚醒可能因為IO事件監控沒有事件就緒而阻塞,也就是epoll阻塞,需要喚醒它,往后執行任務隊列里的任務int _event_fd;// eventfd也是一個描述符,也可以掛到poll里進行事件監控,poller監控的對象是Channel*,所以通過Channel來管理event_fdstd::unique_ptr<Channel> _eventfd_channel;// 每個eventllop中都有對應的任務隊列,里面存放著外界調用的函數using Function = std::function<void()>;// 如果別的線程要執行當前的線程中的conn的對象的函數的時候,是不會執行的,因為會涉及到安全問題,所以就需要吧他執行的任務放到這個任務隊列中去,讓當前線程自己執行,std::vector<Function> _task;// 任務隊列可能存在線程安全問題,所以需要一把鎖保護它std::mutex _mutex;//eventloop中還存在的功能是定時任務,所以需要時間輪定時器TimerWheel _timer_wheel;
std::thread::id _thread_id;//線程ID
作用1:綁定線程
一個eventloop對應一個線程,當線程創建后,該線程就會創建eventloop,并且eventloop會綁定該線程的id。
作用2:用來標識當前線程是否與eventloop綁定的線程一致
如果連接的一個函數回調里面要執行一個任務,那么這個任務如果本身就是有eventloop對應的線程執行的,那么該任務是可以直接執行的,如果不是eventloop對應的線程執行的那么就需要壓入到任務隊列去。
當事件就緒,需要處理的時候,處理的過程中,如果需要對連接進行某些操作,那么這些操作必須在當前eventloop對應的線程中執行,而不能由其他eventloop線程執行,要保證對連接的各項操作都是線程安全的。
1.如果執行的操作本就在線程中,不需要將操作壓入隊列了,可以直接執行
2.如果執行的操作不再線程中,才需要加入任務池,等到事件處理完了然后執行任務
Poller _poll;
作用:對所有連接進行事件監控,以及事件處理
int _event_fd;
int _event_fd
std::unique_ptr _eventfd_channel;
eventfd事件通知機制,用來喚醒阻塞epoll。
eventfd也是一個文件描述符,則也可以掛到epoll上監控,即poller也可以對eventfd進行監控,而epoll監控的對象是channel,所以通過channel來管理eventfd。
std::vector<Function> _task;
using Function =std::function<void()>;
std::vector _task;
//任務隊列可能存在線程安全問題,所以需要一把鎖保護它
std::mutex _mutex;
作用:如果別的線程要執行當前的線程中的conn的對象的函數的時候,是不會執行的,因為會涉及到安全問題,所以就需要吧他執行的任務放到這個任務隊列中去,讓當前線程自己執行。
任務隊列中的任務都是由其他線程通過調用RunInloop壓入的函數。這個任務隊列是存在線程安全的,所以在訪問時,需要加鎖訪問。
不需要每次都一次一次的加鎖訪問任務隊列,我們只需要加鎖一次,然后定義一個臨時的任務隊列,將原來的任務隊列中的任務全部交換到臨時的中去,當全部取出來就可以解鎖了。然后執行臨時隊列中的任務即可。
當獲取到一個新連接之后,會調用這里的NewConnection函數,這個函數里面會為新連接創建connection,并且綁定eventloop,再下面這里設置非活躍超時銷毀 以及 可讀事件監聽時,就會涉及當前線程,操作另一個線程的connection對應的操作函數
此時可能會有線程安全的問題,所以封裝成任務加到隊列里面,交由從屬線程執行
TimerWheel _timer_wheel
eventloop另外一個主要功能就是管理著所有的定時任務,添加定時任務,刷新定時任務等。
所以封裝一個時間輪定時器,用來對定時任務進行操作。
定時器里面有一個timerfd,本質也是一個計數器,一旦創建,內核就會每個一段事件往該描述符中發送次數。可以實現每秒鐘執行一下任務。
2.EventLoop構造
需要構造的主要由線程id,事件通知event_fd以及管理該event_fd的channel對象。
需要創建一個eventfd,并且new一個channel對象管理event_fd,并設置該描述符事件就緒時的回調函數,設置可讀監控。
public: // 構造EventLoop() : _thread_id(std::this_thread::get_id()),_event_fd(CreateEventFd()),_eventfd_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 設置_event_fd的讀回調函數,當事件就緒就會去執行_eventfd_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));// 啟動event_fd的讀事件監控_eventfd_channel->EnableRead();}
3.針對eventfd的操作
對事件通知的操作主要有三個:
1.創建事件通知event_fd
2.構造event_fd可讀事件就緒后的回調函數,即讀取event_fd中的通知次數。
3.構造event_fd通知函數,也就是喚醒機制,本質上就是往event_fd描述符中發送一次通知。當發生一次數據,event_fd描述符的可讀事件就會就緒,那么epoll就會被喚醒,往后執行。
public://針對通知機制event_fd的操作:創建eventfd,讀取eventfd中數據(就緒回到函數),網eventfd中寫數據(喚醒epoll)//1.創建通知機制event_fdstatic int CreateEventFd(){int efd=eventfd(0,EFD_CLOEXEC|EFD_NONBLOCK);if(efd<0){ERRLog("createeventfd failed");abort();//異常退出}return efd;}//2.構建event_fd的讀事件回調函數,讀取eventfd中的通知次數,單純的就是讀取出來,沒有作用void ReadEventFd(){int res;ssize_t ret=(_event_fd,&res,sizeof(res));if(ret<0){// 有兩種情況是合法的套接字是沒有問題的if (errno == EAGAIN) // EAGAIN表示sock緩沖區中沒有數據可讀了return ;if (errno == EINTR) // EINTR表示在讀取的過程中被信號中斷了return ;ERRLog("readevent failed");abort();}}//3.構建喚醒機制,本質就是玩eventfd中發送一個數據,那么event的讀事件就緒就會喚醒阻塞的epollvoid WakeupEventFd(){ int val=1;`在這里插入代碼片`ssize_t ret=write(_event_fd,&val,sizeof(val));if(ret<0){// 有兩種情況是合法的套接字是沒有問題的if (errno == EAGAIN) // EAGAIN表示sock緩沖區中沒有空間可寫了return ;if (errno == EINTR) // EINTR表示在寫入的過程中被信號中斷了return ;ERRLog("wakeupevent failed");abort();}}
4.針對poller的操作
主要是封裝poller中的添加/修改事件監控操作和移除事件監控操作。從而讓channel對象可以調用Eventlooop中的事件監控操作對描述符進行監控。
public://針對poller的操作,要封裝添加事件監控和移除事件監控的操作//1.添加或者修改某個連接的事件監控void UpdateEvent(Channel*channel){return _poll.UpdateEvent(channel);} //2.移除某個連接的事件監控void RemoveEvent(Channel*channel){return _poll.RemoveEvent(channel);}channel對象中調用eventloop中的事件監控操作:
// 在Channel前面只是聲明了Eventloop,但Channel不知道Eventloopr類里的成員函數,在類里無法調用Eventloop的對象函數,只能放在外面
void Channel::Update()
{return _loop->UpdateEvent(this);
}
void Channel::Remove()
{return _loop->RemoveEvent(this);
}
5.針對threadID的操作
主要判斷當前的線程是否與eventloop對應的線程是一致的,如果是一致的那么對于連接的操作可以直接執行,如果不是就需要壓入到任務對立中執行。
public://針對線程id的操作//用來判斷當前的線程是否是創建eventloop的線程bool IsInLopp(){return _thread_id==std::this_thread::get_id();}
6.針對TaskQueue的操作
對于任務隊列的操作主要有:
1.將任務函數壓入到任務隊列中
2.根據線程id,決定是否真正壓入到任務隊列中
3.執行任務隊列中的所有任務。
要注意在任務函數壓入到任務隊列中后,理論上隊列中的任務就必須有當前線程去執行,但是會存在epoll因為沒有事件就緒而阻塞住,所以壓入一個任務后,就必須喚醒epoll。
在訪問到任務隊列時,都必須加鎖訪問,保證線程安全。
執行任務隊列中的操作的思想是:先將任務中的所有任務都交換出來,再依次執行。
public://針對任務隊列_task的操作//1.將外界的任務(函數)插入到任務隊列里void TaskInLoop(Function cb){{std::unique_lock<std::mutex> _lock(_mutex);//規定一個作用域,并且頂一個臨時的鎖對象,在定義域內加鎖,出來就解鎖。_task.push_back(cb);}//當任務隊列里有任務了就要把它執行掉,但存在epoll因為沒有事件就緒而阻塞住無法執行任務隊列里的任務//所以插入一個就需要喚醒epoll防止它在阻塞中WakeupEventFd();}//2.有的任務是不用壓入到任務隊列里執行的,如果是eventloop對應的線程調用外部函數,則就直接去執行,如果是其他線程調用的,就需要壓入到任務隊列里去//所以外面真正調用的都是這個接口void RunInLoop(Function cb){if(IsInLopp())return cb();//如果是eventloop對應的線程調用,則直接執行//如果不是則需要壓入對eventloop的任務隊列里執行return TaskInLoop(cb);}//3.執行任務隊列里的任務void RunAllTask(){//利用一個臨時vector,將task里面的任務全部交換出來,再全部執行std::vector<Function> temp_task;{//訪問任務隊列就需要加鎖std::unique_lock<std::mutex> _lock(_mutex);_task.swap(temp_task);}//執行任務隊列里的任務for(auto &task:temp_task){task();} }
7.針對定時器的操作
針對定時器的操作很簡單,就是添加定時任務,刷新定時任務(本質就是延遲事件),終止定時任務,判斷是否有該定時任務等操作。
就是封裝一個定時器的任務即可。
public://針對定時器timer_wheel的操作//1.添加定時任務void AddTask(uint64_t id, uint32_t timeout, const TaskFunc cb){return _timer_wheel.AddTask(id,timeout,cb);}//2.刷新,延遲定時任務void RefreshTask(uint64_t id){return _timer_wheel.RefreshTask(id);}//3.終止定時任務void CancelTimer(uint64_t id){return _timer_wheel.CancelTimer(id);}//4.查看是否有該定時任務bool HasTimer(uint64_t id){return _timer_wheel.HasTimer(id);}
8.EventLoop的主要工作
eventloop的主要工作:
1.對描述符進行事件監控
2.描述符的事件就緒,則進行事件處理
3.執行任務隊列中的任務。
循環往后的執行。
public://Eventloop的主要任務void Start(){while(1){//1.進行事件監控,并獲取就緒的事件和fdstd::vector<Channel*> active;_poll.Poll(&active);//2.進行就緒事件處理for(auto &it: active){it->HandlerEvent();}//3.執行任務隊列里的任務RunAllTask();}}
};
二.全部代碼
class EventLoop
{
private://每一個eventloop對應一個線程,當線程創建時,eventloop就會被創建出來并綁定線程的idstd::thread::id _thread_id; //eventloop是一個監控管理模塊,里面封裝了poller,用來監控所有的連接的事件Poller _poll;//需要一個通知機制eventfd,用來喚醒可能因為IO事件監控沒有事件就緒而阻塞,也就是epoll阻塞,需要喚醒它,往后執行任務隊列里的任務int _event_fd;//eventfd也是一個描述符,也可以掛到poll里進行事件監控,poller監控的對象是Channel*,所以通過Channel來管理event_fdstd::unique_ptr<Channel> _eventfd_channel;//每個eventllop中都有對應的任務隊列,里面存放著外界調用的函數using Function =std::function<void()>;//如果別的線程要執行當前的線程中的conn的對象的函數的時候,是不會執行的,因為會涉及到安全問題,所以就需要吧他執行的任務放到這個任務隊列中去,讓當前線程自己執行,std::vector<Function> _task;//任務隊列可能存在線程安全問題,所以需要一把鎖保護它std::mutex _mutex;public: //構造EventLoop():_thread_id(std::this_thread::get_id()),_event_fd(CreateEventFd()),_eventfd_channel(new Channel(this,_event_fd)){//設置_event_fd的讀回調函數,當事件就緒就會去執行_eventfd_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd,this));//啟動event_fd的讀事件監控_eventfd_channel->EnableRead();}
public://針對通知機制event_fd的操作:創建eventfd,讀取eventfd中數據(就緒回到函數),網eventfd中寫數據(喚醒epoll)//1.創建通知機制event_fdstatic int CreateEventFd(){int efd=eventfd(0,EFD_CLOEXEC|EFD_NONBLOCK);if(efd<0){ERRLog("createeventfd failed");abort();//異常退出}return efd;}//2.構建event_fd的讀事件回調函數,讀取eventfd中的通知次數,單純的就是讀取出來,沒有作用void ReadEventFd(){int res;ssize_t ret=(_event_fd,&res,sizeof(res));if(ret<0){// 有兩種情況是合法的套接字是沒有問題的if (errno == EAGAIN) // EAGAIN表示sock緩沖區中沒有數據可讀了return ;if (errno == EINTR) // EINTR表示在讀取的過程中被信號中斷了return ;ERRLog("readevent failed");abort();}}//3.構建喚醒機制,本質就是玩eventfd中發送一個數據,那么event的讀事件就緒就會喚醒阻塞的epollvoid WakeupEventFd(){ int val=1;ssize_t ret=write(_event_fd,&val,sizeof(val));if(ret<0){// 有兩種情況是合法的套接字是沒有問題的if (errno == EAGAIN) // EAGAIN表示sock緩沖區中沒有空間可寫了return ;if (errno == EINTR) // EINTR表示在寫入的過程中被信號中斷了return ;ERRLog("wakeupevent failed");abort();}}
public://針對poller的操作,要封裝添加事件監控和移除事件監控的操作//1.添加或者修改某個連接的事件監控void UpdateEvent(Channel*channel){return _poll.UpdateEvent(channel);} //2.移除某個連接的事件監控void RemoveEvent(Channel*channel){return _poll.RemoveEvent(channel);}
public://針對線程id的操作//用來判斷當前的線程是否是創建eventloop的線程bool IsInLopp(){return _thread_id==std::this_thread::get_id();}
public://針對任務隊列_task的操作//1.將外界的任務(函數)插入到任務隊列里void TaskInLoop(Function cb){{std::unique_lock<std::mutex> _lock(_mutex);//規定一個作用域,并且頂一個臨時的鎖對象,在定義域內加鎖,出來就解鎖。_task.push_back(cb);}//當任務隊列里有任務了就要把它執行掉,但存在epoll因為沒有事件就緒而阻塞住無法執行任務隊列里的任務//所以插入一個就需要喚醒epoll防止它在阻塞中WakeupEventFd();}//2.有的任務是不用壓入到任務隊列里執行的,如果是eventloop對應的線程調用外部函數,則就直接去執行,如果是其他線程調用的,就需要壓入到任務隊列里去//所以外面真正調用的都是這個接口void RunInLoop(Function cb){if(IsInLopp())cb();//如果是eventloop對應的線程調用,則直接執行//如果不是則需要壓入對eventloop的任務隊列里執行TaskInLoop(cb);}//3.執行任務隊列里的任務void RunAllTask(){//利用一個臨時vector,將task里面的任務全部交換出來,再全部執行std::vector<Function> temp_task;{//訪問任務隊列就需要加鎖std::unique_lock<std::mutex> _lock(_mutex);_task.swap(temp_task);}//執行任務隊列里的任務for(auto &task:temp_task){task();} }
public://Eventloop的主要任務void Start(){while(1){//1.進行事件監控,并獲取就緒的事件和fdstd::vector<Channel*> active;_poll.Poll(&active);//2.進行就緒事件處理for(auto &it: active){it->HandlerEvent();}//3.執行任務隊列里的任務RunAllTask();}} };
#include "../Server.hpp"void Handleclose(Channel *channel)
{ERRLog("close:%d",channel->Fd());channel->Remove(); // 移除監控delete channel;
}
void HandleRead(Channel *channel)
{int fd = channel->Fd();char buf[1024] = {0};int ret = recv(fd, buf, 1023, 0);if (ret <= 0){return Handleclose(channel);}buf[ret] = 0;ERRLog("%s",buf);channel->EnableWrite(); // 啟動可寫事件
}
void HandleError(Channel *channel)
{return Handleclose(channel); // 關閉釋放
}
void HandleEvent(EventLoop* loop,Channel *channel,uint64_t timerid)
{loop->RefreshTask(timerid);
}
void HandleWrite(Channel *channel)
{int fd = channel->Fd();const char *data = "陶恩威你好呀";int ret = send(fd, data, strlen(data), 0);if (ret < 0){return Handleclose(channel); // 關閉釋放}channel->DisableWrite(); // 關閉寫監控
}
void Acceptor(EventLoop *loop, Channel *lst_channel)
{int fd = lst_channel->Fd();int newfd = accept(fd, NULL, NULL);if (newfd < 0)return;uint64_t timerid=rand()%1000;Channel *channel = new Channel(loop, newfd);//獲得一個新連接并掛到loop上channel->SetReadCallback(std::bind(HandleRead, channel)); // 為通信套接字設置可讀事件的回調函數channel->SetWriteCallback(std::bind(HandleWrite, channel)); // 可寫事件的回調函數channel->SetCloseCallback(std ::bind(Handleclose, channel)); // 關閉事件的回調函數channel->SetErrorCallback(std::bind(HandleError, channel)); // 關閉事件的回調函數channel->SetEventCallback(std::bind(HandleEvent, loop,channel,timerid)); // 關閉事件的回調函數//定時器的設置必須要在啟動監聽之前,防止先監聽后立刻有了事件,但還沒定時任務。該連接就會多存活loop->AddTask(timerid,10,std::bind(Handleclose,channel));channel->EnableRead();
}
int main()
{//Poller poll;srand(time(NULL));EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8886);Channel *lst_channel = new Channel(&loop, lst_sock.Fd()); // 為listen套接字創建一個channel對象lst_channel->SetReadCallback(std::bind(Acceptor, &loop, lst_channel)); // 為listen套接字設置讀回調函數Accept,用來接收新連接,并創建Channel對象,添加事件監控,啟動寫事件監控等lst_channel->EnableRead(); // 啟動讀事件監控loop.Start();lst_sock.Close();return 0;}
#include "../Server.hpp"
int main()
{Socket cli_sock;cli_sock.CreateClient(8886, "112.126.85.136");for(int i=0;i<5;i++){std::string str = "tew";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};int n = cli_sock.Recv(buf, 1023);buf[n] = 0;DBG_LOG("%s", buf);sleep(1);}while(1)sleep(1);return 0;
}
這個簡單的服務器流程,就是一開始創建了一個eventloop。然后創建出監聽連接,并綁定該eventloop,然后就設置該連接的回調函數。
最后啟動lst_channel->EnableRead(); 這個函數就是將該套接字掛到poller上去進行事件監聽。當有新連接時,讀事件就緒,就會調用Acceptor,在Acceptor中,首先會創建一個新的套接字,并且也綁定該eventloop,然后設置對應的回調函數,最后啟動channel->EnableRead();這個函數就是將新的套接字掛到poller上去進行事件監聽。當有數據來時,讀事件就會就緒。
添加定時銷毀就是將任務添加到時間輪里,然后連接每操作一次都會調用一次任意事件,進行刷新時長。