1、模型框架
客戶端處理思想:事件驅動模式
事件驅動處理模式:誰觸發了我就去處理誰。
( 如何知道觸發了)技術支撐點:I/O的多路復用 (多路轉接技術)
1、單Reactor單線程:在單個線程中進行事件驅動并處理對所有客戶端進行IO事件監控、哪個客戶端觸發了事件,就去處理誰處理:接收它的請求,進行業務處理,進行響應。優點:單線程操作,操作都是串行化的,思想簡單,(不需要考慮進程或者線程間的通信問題,以及安全問題)缺點:所有的事件監控和業務處理都是在一個線程中完成的,因此很容易造成性能瓶頸適用場景:客戶端數量較少,且業務處理快速簡單的場景2、 單Reactor多線程:一個Reactor線程 + 業務線程池對所有客戶端進行IO事件監控、哪個客戶端觸發了事件,就去處理誰(Reactor線程)處理:僅僅進行IO操作然后將事件進行派發給業務線程優點:充分利用cpu多核資源,處理效率可以更高,降低了代碼的耦合度(IO操作和業務處理進行分離)缺點:在單個的Reactor線程中,包含了對所有客戶端的事件監控,以及所有客戶端的IO操作,不利于高并發場景(即每一個時刻都有很多客戶端連接請求,我還在處理上一個client的IO操作的話就來不及進行新的client的連接處理)3、多Reactor多線程:基于單Reator多線程的缺點考慮,如果IO的時候,有連接到來無法處理,因此將連接單獨拎出來。因此讓一個Reactor線程僅僅進行新連接處理,讓其他的Reactor線程進行IO處理,IO Reactor線程拿到數據分發給業務線程池進行處理。因此,多Reactor多線程模式,也叫主從Reactor模型主Reactor線程:進行新連接事件監控從屬Reactor線程:進行IO事件監控業務線程池:進行業務處理優點:充分利用CPU多核資源,并且可以進行合理分配但是:執行流并不是越多越好,因為執行流多了,反而會增加cpu切換調度成本。(所以在有些多Reactor多線程模式中從屬Reactor線程也會充當業務處理函數。
2、?功能模塊劃分:
SERVER模塊:實現Reactor模型的TCP服務器;協議模塊:對當前的Reactor模型服務器提供應?層協議支持
連接模塊:Buffer模塊:Buffer模塊是?個緩沖區模塊,?于實現通信中??態的接收緩沖區和發送緩沖區功能Socket模塊: Socket模塊是對套接字操作封裝的?個模塊,主要實現的socket的各項操作Channel模塊: Channel模塊是對?個描述符需要進?的IO事件管理的模塊,實現對描述符可讀,可寫,錯誤...事件的 管理操作,以及Poller模塊對描述符進?IO事件監控就緒后,根據不同的事件,回調不同的處理函數功能Connection模塊:Connection模塊是對Buffer模塊,Socket模塊,Channel模塊的?個整體封裝,實現了對?個通信套 接字的整體的管理,每?個進?數據通信的套接字(也就是accept獲取到的新連接)都會使?Connection進?管理。Acceptor模塊:Acceptor模塊是對Socket模塊,Channel模塊的?個整體封裝,實現了對?個監聽套接字的整體的管理。TimerQueue模塊:TimerQueue模塊是實現固定時間定時任務的模塊,可以理解就是要給定時任務管理器,向定時任務管理器中添加?個任務,任務將在固定時間后被執?,同時也可以通過刷新定時任務來延遲任務的執?。連接監控模塊:Poller模塊:Poller模塊是對epoll進?封裝的?個模塊,主要實現epoll的IO事件添加,修改,移除,獲取活躍連接功能。
bind函數
#includ <iostream>
#include <string>
#include <functional>void print(const std::string &str,int num)
{std::cout << str << num << std::endl;
}int main()
{//print("hello");auto func = std::bind(print, "hello",std::placeholders::_1);func(10);//----打印結果 hello 10auto func = std::bind(print, "hello",std::placeholders::_1, std::placeholders::_2);func(10,20);// ---打印結果 hello 10 20即傳入的參數是傳給第二個、第三個以及之后的參數//func();//直接調用func()就相當于調用print和傳入hello參數//std::placeholders::_1, std::placeholders::_2 預留一個參數 預留兩個參數return 0;
}
bind函數作用:當我們設計線程池或者任務池的時候,比如要設置一個任務隊列,這個任務隊列里面要包含兩個信息,任務要處理的數據以及這個數據要如何被處理(處理數據的方法) 所以我們要給任務池中添加函數進去、再添加一個數據進去
#includ <iostream>
#include <string>
#include <functional>
#include <vector>void print(const std::string &str,int num)
{std::cout << str << num << std::endl;
}int main()
{//using定義類型別名 Task 代表std::function<void()>類型//std::function 是 C++ 標準庫 <functional> 頭文件里的一個模板類,//它屬于通用的多態函數包裝器。//其作用是存儲、復制和調用任何可調用對象//std::function<void()> 是 std::function 的一個具體實例化using Task = std::function<void()>;std::vector<Task> arry; //一個任務數組 arry.push_back(std::bind(print, "hello",10)); //任務組中放入的是數據和對數據的處理方法//bind它的作用是創建一個新的可調用對象,這個新對象會綁定指定的函數和參數。arry.push_back(std::bind(print, "nihao",20));arry.push_back(std::bind(print, "hhhhh",30));for(auto &f:arry){f(); //f() 調用存儲在 f 中的可調用對象,也就是執行之前綁定的 print 函數。}return 0;
}
定時器:
定時去銷毀不活躍的連接
1、int? timerfd_create(int? clockid,? int? flags)
創建一個定時器 (linux下一切皆文件)所有定時器的操作也是當作文件去操作的
clockid:CLOCK_REALTIME----以系統時間作為計時基準值(如果系統時間發生改變就會出問題)(一般不用)
CLOCK_MONOTONIC---以系統啟動時間進行遞增的一個基準值(定時器不會隨著系統時間的改變而改變)
返回值:文件描述符
flags: 0 --- 阻塞操作
linux下一切皆文件,定時器的操作也是跟文件操作并沒有區別,而是定時器定時原理每隔一段時間(定時器的超時時間),系統就會給這個描述符對應的定時器寫入一個8字節的數據
創建一個定時器,定時器定立的超時時間是3s,也就是說每3s計算一次超時
從啟動開始,每隔3s,也就是每3s計算一次超時
從啟動開始,每隔3s中,系統都會給描述符寫入一個1,表示從上一次讀取數據到現在超時了1次
假設30s之后開始讀數據,則這個時候會讀取到10,表示上一次讀取數據到現在超時了10次
2、int? timerfd_settime(int? fd, int? flags,? struct? itimerspec? *new, struct? itimerspce? *old);
功能:啟動定時器
fd:timerfd_create函數的返回值---文件描述符---創建的定時器的標識符
flags:默認設置為0---使用相對時間(相對于當前的超時時間往后延長多少時間之后的超時)
struct? itimerspec? *new:設置的超時時間
struct timespec {time_t tv_sec; /* 秒 */long tv_nsec; /* 納秒 */};struct itimerspec {struct timespec it_interval; /* 第?次之后的超時間隔時間 */struct timespec it_value; /* 第?次超時時間 */};
struct? itimerspce? *old:用于接收當前定時器原有的超時時間設置,保存起來以便于還原(不需要還原也可以直接傳空)????????
定時器的基本代碼:
#include <stdio.h>
#include <unistd.h>
#inlcude <fcntl.h>
#include <stdint.h>
#include <sys/timerfd.h>int main()
{int timerfd_create(CLOCK_MONOTONIC,0);if(timerfd < 0){perror("timerfd_create error");return -1;}struct itimerspec itime;itime.it_value.tv_sec = 1;itime.it_value.tv_nsec = 0;//第一次超時時間為1s后itime.it_interval.tv_sec = 1;itime.it_interval.tv_nsec = 0;//第一次超時后,每次超時的時間間隔timerfd_settime(timerfd, 0, &itime, NULL);while(1){uint64_t times; //8字節大小int ret = read(timerfd, ×, 8);if(ret < 0){perror("read error");return -1;}printf("超時了,距離上一次超時了%d次\n",times);}close(timerfd);return 0;
}
定時器的作用:高并發的服務器需要定時的去清理不活躍的連接,定義一個定時器,每隔一秒去檢測,(每隔一秒把連接拿過來遍歷一下,看誰是非活躍超時了)超時了就把它釋放掉,每隔一秒來一次。
那如果有上萬個連接,全遍歷一遍效率就會很低很低。
另一種方案: 時間輪設置一個tick滴答指針,指向哪里就代表哪里任務超時了如果tick滴答,以秒為計時單位,如果當前的數組有7個元素,那么最大定時時間就只有7s如果定時器想要設置超大時間定時任務 (不可能去設置一個超大的數組吧)可以采?多層級的時間輪,有秒針輪,分針輪,時針輪? ??設置以天為單位的時間輪:
存在的問題:1、上面這樣的數組,同一時刻的定時任務只能添加一個,需要考慮如何在同一時刻支持添加多個定時任務?
解決方法: 將時間輪的一維數組設計為二維數組(每一個元素都是一個數組)2、假設當前的定時任務是一個連接的非活躍銷毀任務,這個任務什么時候添加到時間輪中比較合適?
?一個連接30s內都沒有通信,則是一個非活躍連接,這時候就銷毀但是一個連接如果在建立的時候添加了一個30s后的銷毀任務,但是這個連接30s內人家有數據通信,在第30s的時候不是一個非活躍連接思想:需要在一個連接有IO事件產生的時候,延遲定時任務的執行作為一個時間定時器,本身并不關注任務類型,只要是時間到了就需要被執行解決方案:類的析構函數? +? 職能指針share_ptr, 通過這兩個技術可以實現定時任務的延時1、使用一個類,對定時任務進行封裝,類實例化的每一個對象,就是一個定時任務對象,當對象被銷毀的時候,再去執行定時任務( 將定時任務的執行放到析構函數中)2、share_ptr用于對new的對象進行空間管理,當share_ptr對一個對象進行管理的時候,內部有一個計數器,計數器為0的時候,則釋放所管理的對象。int *a = new int; std::share_ptr<int> pi(a); std::share_ptr<int> pi1(pi);
a對象只有在pi計數為0的時候,才會被釋放當針對pi又構建了一個shared_ptr對象pi1,則pi和pi1計數器為2但是如果時針對原始對象進行構造,并不會跟pi和pi1共享計數當pi和pi1中任意一個被釋放的時候,只是計數器-1,因此它們管理的a對象并沒有被釋放,只有當pi和pi1都被釋放了,計數器為0了,才會釋放管理的a對象基于這個思想,我們可以使用share_ptr來管理定時器任務對象例如:對象被銷毀的時候,任務(task)才會被執行() 智能指針里面有一個ptr指向task,將智能指針放到定時數組里面,兩秒之后,智能指針被釋放計數器為0,tsak會被釋放掉,就會執行任務。如果在兩秒之間,連接又發送了數據,這個連接變為活躍的,我們就針對share_ptr再生成一個share_ptr,計數器就變為了2,把智能指針添加到時間輪里面去,第一次不會執行task,只有第二次會執行task。![]()
#include<iostream>
#include<vector>
#include <unordered_map>
#include <cstdint>
#include <functional>
#include <memory>
#include <unistd.h>using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;class TimerTask //這個類代表定時器任務
{
private:uint64_t _id; //定時器任務對象IDuint32_t _timeout; //定時器任務的超時時間bool _canceled; //false表示沒有被取消 true表示被取消了TaskFunc _task_cb; //定時器對象要執行的定時任務ReleaseFunc _release; //用于刪除 TimerWheel中保存的定時器對象信息public:TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) :_id(id),_timeout(delay),//外界自己傳入_task_cb(cb),_canceled(false){}~TimerTask() //執行定時器任務{if(_canceled == false)_task_cb(); //當定時任務觸發時,需要執行的具體操作//在析構的時候執行是因為 定時器的任務是銷毀不活躍的連接 那么 他的本質任務就是銷毀 即可以在類對象析構的時候任務對象被銷毀//具體執行什么函數會自己設置 在這個任務構造的時候 需要自己傳入的參數第三個_release();// 從TimerWheel 的 _timers 哈希表中刪除當前定時器任務的信息 --調用這個函數就是調用TimerWheel類中的RemoveTimer(因為下面的bind函數)}void Cancel(){_canceled = true; //true代表已經被取消}void SetRelease(const ReleaseFunc &cb) //傳入的參數是函數{_release = cb; }uint32_t DelayTime(){return _timeout;}
};class TimerWheel //管理這些定時器任務
{
private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _capacity; //表盤最大數量---就是最大延遲時間//用于管理 TimerTask 對象的生命周期,確保任務對象在被添加到時間輪中并且還有其他地方引用時不會被提前銷毀。std::vector<std::vector<PtrTask>> _wheel; //時間輪二維數組里面放的不是任務task而是對任務的share_ptr指針int _tick; //tick走到哪里哪里執行 (即釋放哪里的對象)執行哪里的任務//為了避免因哈希表對任務對象的引用而導致對象無法被正常銷毀的情況,同時又能在需要時獲取到任務對象進行操作。std::unordered_map<uint64_t, WeakTask> _timers; //放入的WeakTask類型,只有這樣在后面構造share_ptr的時候才會共享計數,而且自身也不影響計數
private:void RemoveTimer(uint64_t id) //從管理(map)中刪除{auto it = _timers.find(id);if(it != _timers.end()){_timers.erase(it);}}
public:TimerWheel():_capacity(60), _tick(0),_wheel(_capacity) {}void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)//添加定時任務 --第三個參數就是定時器任務觸發時,具體需要執行的任務{PtrTask pt(new TimerTask(id, delay, cb));pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));//將RemoveTimer綁定一個參數,得到的函數,作為參數傳遞給SetRelease函數int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);//數組_timers[id] = WeakTask(pt); //_timers哈希表中,值為id的元素(如果有就跟新,如果沒有就新創建) WeakTask(pt)----以pt這個 std::shared_ptr為參數構建了一個std::weak_ptr<TimerTask> 類型的弱引用}void TimerRefresh(uint64_t id)//刷新/延遲定時任務{//通過保存的定時器對象的weak_ptr構造一個share_ptr出來,添加到輪子中auto it = _timers.find(id);if(it == _timers.end()){return;//沒找到定時任務,無法進行刷新,無法延遲}PtrTask pt = it->second.lock(); //lock獲取weak_ptr管理的對象對應的share_ptr//it->second代表 與id對應的 std::weak_ptr<TimerTask> 對象//std::weak_ptr 類的一個成員函數,它的作用是嘗試創建一個指向 std::weak_ptr 所觀察對象的 std::shared_ptr//從 _timers 哈希表中找到與給定 id 對應的 std::weak_ptr<TimerTask> 對象,//然后調用其 lock() 方法嘗試獲取一個指向該 TimerTask 對象的 std::shared_ptr。//如果該 TimerTask 對象還存在(即其引用計數不為 0),則 lock() 方法會返回一個有效的 std::shared_ptr,//并將其賦值給 pt;如果該 TimerTask 對象已經被銷毀(引用計數為 0),則 lock() 方法會返回一個空的 std::shared_ptr。//為什么這樣寫????//由于 _timers 中存儲的是 std::weak_ptr,我們不能直接通過它來操作對象。//因此,需要調用 lock() 方法獲取一個 std::shared_ptr,這樣才能確保在操作對象時,對象是存在的。//同時,使用 std::shared_ptr 操作對象可以保證在操作期間對象不會被意外銷毀,因為 std::shared_ptr 會增加對象的引用計數。int dalay = pt->DelayTime();//DelayTime() 這個時間外界自己傳入int pos = (_tick + dalay) % _capacity;_wheel[pos].push_back(pt); //重新更新位置}void TimerCancel(uint64_t id){auto it = _timers.find(id);if(it != _timers.end()){return;//沒找到定時任務,無法進行刷新,無法延遲}PtrTask pt = it->second.lock(); //lock獲取weak_ptr管理的對象對應的share_ptrif(pt)pt->Cancel();}//這個函數應該每秒被執行一次,相當于秒針向后走了一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear();//清空指定位置的數組,就會把數組中保存的所有管理定時器對象的share_ptr釋放掉//它會調用 std::vector 的 clear 方法,將該槽對應的 std::vector<PtrTask> 中的所有 std::shared_ptr<TimerTask> 移除。//當 std::shared_ptr 被移除時,如果該 std::shared_ptr 是最后一個指向 TimerTask 對象的強引用,//那么它所管理的 TimerTask 對象的引用計數會變為 0,從而觸發 TimerTask 對象的析構函數 ~TimerTask()。}
};
class Test
{
public:Test(){std::cout << "構造" << std::endl;}~Test(){std::cout << "構造" << std::endl;}
};
void DelTest(Test *t)
{delete t;
}int main()
{TimerWheel tw;Test *t = new Test();tw.TimerAdd(888,5,std::bind(DelTest,t));//設置具體的任務id、延時時間、以及定時器觸發時具體要執行的任務for(int i = 0; i < 5; i++){tw.TimerRefresh(888);//刷新定時任務tw.RunTimerTask();//向后移動秒針sleep(1);}while(1){tw.RunTimerTask();sleep(1);}return 0;
}