定時器
- 定時器原理
- 如何理解定時器
- 定時器數據結構選取
- 定時器觸發方式
- 定時器的實現
定時器原理
如何理解定時器
定時器在日常通常被描述為組織大量延時任務的模塊,其實從字面意思去理解的話,他就是去處理延時任務的,那么什么是延時任務呢?我們來看一張圖:
在現代開發當中,同時是要處理多個任務的,我們將任務添加進某個數據結構到任務觸發之間是存在一段時間間隔的,那么就會引發一個問題,這段時間當前 CPU 核心是在去等么?
這兒肯定是不會讓 CPU 核心去等的,因為我們要保證一個原則,不過去占用一個線程,高效的處理定時任務,這也意味著再添加任務到觸發任務之間的這一段時刻,我們不可能讓線程在這兒進行等待,這樣就違背了我們高效的原則了。
所以也就有了定時器的出現,在添加任務到觸發任務之間肯定是會存在大量任務的,我們就需要將當前的大量任務組織起來,然后觸發最近將要超時的任務,這樣,就保證了當前線程并不是一味地在這兒阻塞進行等待,對于將要超時的任務他就會去進行處理。
那么定時器就可以被描述為組織大量延時任務的數據結構和觸發最近將要超時任務的機制。
定時器數據結構選取
既然定時器需要組織大量的延時任務,就需要選取一種數據結構,最為合適的幾種數據結構分別為:紅黑樹、最小堆和時間輪。
紅黑樹
根據上面的描述我們就可以這么去理解定時器,“時間”
和“任務”
,兩者之間就成了對應的關系,某個時間點就對應了相應的任務,其實我們就可以看出來這天然的就對應這一種 [key,value]
的關系了,這種關系很容易就讓人想到了紅黑樹這中數據結構。
同時,對于添加的任務來說,肯定是存在時間順序的,因為定時器總是優先去處理最近將要么超時的任務:
紅黑樹是一種自平衡的二叉查找樹,同時,他也是一種有序的數據結構,對于插入以及刪除的時間復雜度都是O(LogN)級別的,如果要實現一個定時,紅黑樹肯定是一個可以選擇的數據結構,在 C++ 的 STL 庫中存在四種紅黑樹的數據結構:map/set/multimap/multiset
,我們后續定時器的實現選取的就是multimap
這個結構。
選取multimap
這個結構主要是基于兩點:
- 肯定會存在同一時間觸發多個任務的場景存在,
map
的實現是基于一對一的關系的,而multimap
是基于一對多的關系的,所以肯定是選multimap
的; - 我們通常是獲取最近將要超時的任務,添加也是添加基于順序進行添加的,那么基于紅黑樹這種數據結構,我們會發現第一種操作的的時間復雜度是O(1)。
Ngnix,workflow里面的定時器都是基于紅黑樹進行實現的。
最小堆
最小堆是一種特殊的完全二叉樹數據結構,其中每個父節點的值都小于或等于其子節點的值。這個性質使得最小堆的根節點始終是整個堆中的最小元素。
他的特點就在于:最小堆是一棵完全二叉樹,意味著除了最后一層外,其他層都是完全填充的,且最后一層的節點盡可能向左靠攏。而且任意一個子節點頁滿足最小堆的要求。
對于最小堆來說他的查找,插入和刪除的效率也是很高的,我們如果需要插入一個元素,基于最小堆的性質,肯定是往二叉樹最高層沿著最左側添加一個節點,然后考慮是否需要上升進行調整;刪除一個元素也是,先找到這個元素,然后將其與最后一個節點進行交換,然后調整堆結構,這個可以參考之前的一篇文章:堆數據結構詳解。
堆的這種理念其實跟定時器也是比較契合的,最小堆堆頂總是最小的那個數據,也就是我們最近需要處理的任務,同時,他的插入和刪除的也是比較高的。libev,libevent 里面的定時器是基于最小堆實現的。
時間輪
紅黑樹,最小堆是按觸發時間進行順序組織,而接下來需要介紹的時間輪是按執行順序進行組織的。
對于時間輪來說,它是基于鐘表的原理來進行實現的:
對于Linux內核,Kafka 都是基于時間輪進行實現的。
定時器觸發方式
對于服務端來說,驅動服務端業務邏輯的事件包括網絡事件、定時事件、以及信號事件;通常網絡事件和定時事件會進行協同處理;定時器觸發形式通常有兩種:
利用 IO 多路復用系統調用最后一個參數(超時),來觸發檢測定時器
理解定時器以后我們就很容易想到 IO 多路復用的最后一個參數timeout
,他的取值會存在以下三種情況:
- NULL/nullptr:select調用后進行阻塞等待,直到被監視的某個文件描述符上的某個事件就緒。
- 0:selec調用后t進行非阻塞等待,無論被監視的文件描述符上的事件是否就緒,select檢測后都會立即返回。
- 特定的時間值:select調用后在指定的時間內進行阻塞等待,如果被監視的文件描述符上一直沒有事件就緒,則在該時間后select進行超時返回。
我們首先的理解 recator 的本質,他是將 IO 轉化為對事件的管理,因為我們對于用戶端來說,并不知道 IO 什么時候就緒,客戶端何時發送數據,什么時候建立連接…,存在很多個不知道的情況,所以就會導致我們不知道什么時候去調用 accept/read/write
,而 IO 多路復用解決的就是這個問題,當監聽事件就緒就調用 accept
建立連接通路,真正的事件就緒就調用read/write
對數據進行讀寫,他本質上解決的就是一個調用時機的問題。
對于我們的定時器來說,是處理定時任務的,他其實本質上也是一個事件,如果時間到了,就去執行,其實跟 IO 多路復用的這種思想是很相似的,我們都是需要去異步處理它,所以在這兒我們會使用 IO 多路復用的最后一個參數。
利用 timerfd,將定時檢測作為 IO 多路復用當中的事件進行處理
timerfd 其實也是基于網絡 IO 的思想進行實現的,他主要是依靠以下兩個接口來進行實現的:
#include <sys/timerfd.h>int timerfd_create(int clockid, int flags);
timerfd_create 用于創建一個定時器文件描述符(timer file descriptor),它允許應用程序通過文件描述符來監控定時器事件,解析如下:
clockid
:指定定時器使用的時鐘源,常見選項包括:
CLOCK_REALTIME:系統實時時間(可受系統時間調整影響);
CLOCK_MONOTONIC:單調時鐘(不受系統時間調整影響,適合測量時間間隔);
CLOCK_BOOTTIME(Linux 2.6.39 后支持):類似 CLOCK_MONOTONIC,但包含系統休眠時間。flags
:控制文件描述符的行為,可選值:
TFD_NONBLOCK:設置文件描述符為非阻塞模式。
TFD_CLOEXEC:設置文件描述符為 close-on-exec(執行 exec 時自動關閉)。
返回值:
- 成功時返回一個文件描述符(fd),可用于后續的 timerfd_settime 和 read 操作。
- 失敗時返回 -1,并設置 errno(如 EINVAL 表示無效參數)。
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
timerfd_settime 用于設置或修改 timerfd 計時器的到期時間和間隔周期。它是 timerfd 系列系統調用的一部分,需要配合 timerfd_create 創建的定時器文件描述符使用,參數解析:
fd
:由 timerfd_create 創建的定時器文件描述符;flags
:控制標志位,可以是以下值之一:
0:使用相對時間(以當前時間為基準)
TFD_TIMER_ABSTIME:使用絕對時間(以 Epoch 時間為基準)new_value
:指向 itimerspec 結構的指針,指定新的定時器設置;
struct itimerspec {struct timespec it_interval; /* 定時器間隔周期 */struct timespec it_value; /* 首次到期時間 */
};struct timespec {time_t tv_sec; /* 秒 */long tv_nsec; /* 納秒 */
};
old_value
:指向 itimerspec 結構的指針,用于存儲之前的定時器設置(可為 NULL)。
返回值:
- 成功時返回 0
- 失敗時返回 -1 并設置 errno
定時器的實現
接下來我們通過紅黑樹的方式來實現一個定時器:
#ifndef __TIMER__
#define __TIMER__#include <map>
#include <functional>
#include <chrono>// TimerNode 結點,延時任務
class TimerNode
{
public:friend class Timer;TimerNode(uint64_t timeout, std::function<void()> cb): timeout_(timeout), callback_(std::move(cb)){}private:uint64_t timeout_;std::function<void()> callback_;
};class Timer
{
public:// 添加延時任務TimerNode* AddTimeout();// 刪除延時任務void DelTimeout();// 獲取延時時間int WaitTime();// 處理延時任務void HandleTimeout();private:std::multimap<uint64_t, TimerNode *> timer_map_; // 前面代表超時時間,后面代表超時任務
};#endif
添加延時任務、
static uint64_t GetCurrentTime()
{using namespace std::chrono;return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
}
// 添加延時任務
TimerNode *AddTimeout(uint64_t diff, std::function<void()> cb)
{// 根據 diff 創建延時任務auto node = new TimerNode(GetCurrentTime() + diff, std::move(cb));if (timer_map_.empty() || node->timeout_ < timer_map_.rbegin()->first){auto it = timer_map_.insert(std::make_pair(node->timeout_, std::move(node)));return it->second;}else{auto it = timer_map_.emplace_hint(timer_map_.crbegin().base(), std::make_pair(node->timeout_, std::move(node)));return it->second;}
}
- 我們的延時任務通過 timer_map_ 進行管理,添加延時任務其實就是將延時任務放進 timer_map_ 中進行管理,我們的每一個延時任務都需要被處理,被添加以后我們如何去進行處理就需要使用到回調函數,將回調函數作為 TimerNode 的第二個結點的原因也在這兒,我們可以更為直接的對延時任務進行處理;
- 對于延時任務,我們以當前時間+延時時間,表示延時的時間,在這兒進行添加的時候我們就需要注意,我們添加的延時任務不一定是按順序排布的,但是紅黑樹這個結構肯定是按順序進行排布的,所以添加的延時任務時我們就可以進行優化,如果對應的添加的延時任務的延時時間是小于紅黑樹最后一個節點的,我們調用 insert 函數插入,因為要重新進行排序,如果對應的添加的延時任務的延時時間是大于紅黑樹最后一個節點的,我們直接找到尾結點,調用 emplace_hint 函數直接構造一個節點即可,這樣是可以提高效率的。
- 最終返回值肯定當前的這個延時任務。
刪除延時任務
// 刪除延時任務
void DelTimeout(TimerNode *node)
{auto it = timer_map_.equal_range(node->timeout_);for (auto iter = it.first; iter != it.second; iter++){if (iter->second == node){timer_map_.erase(iter);break;}}
}
刪除延時任務應該注意的地方就在同一時間我們可能會插入多個延時任務,那么再刪除的時候就應該將這個時間點對應的已經處理的延時任務刪除,就使用到了 equal_range 這個接口,他用于在有序范圍內查找等于給定值的所有元素的范圍。它返回一個 std::pair,其中 first 指向第一個不小于給定值的元素,second 指向第一個大于給定值的元素。剛好適用于當前場景。
獲取延時時間
// 獲取延時時間
int WaitTime()
{auto iter = timer_map_.begin();if (iter == timer_map_.end()){return -1;}uint64_t diff = iter->first - GetCurrentTime();return diff > 0 ? diff : 0;
}
獲取延時時間其實就是獲取到當前延時任務的一個時間,這個就不多做解釋了。
處理延時任務
// 處理延時事件
void HandleTimeout()
{auto iter = timer_map_.begin();while (iter != timer_map_.end() && iter->first <= GetCurrentTime()){iter->second->callback_();iter = timer_map_.erase(iter);}
}
處理我們對應的延時任務,就是調用對應的回調函數,處理完成以后將這個延時任務刪除就可以了。
整體代碼如下:
#ifndef __TIMER__
#define __TIMER__#include <map>
#include <functional>
#include <chrono>// TimerNode 結點,延時任務
class TimerNode
{
public:friend class Timer;TimerNode(uint64_t timeout, std::function<void()> cb): timeout_(timeout), callback_(std::move(cb)){}private:// int id_;uint64_t timeout_; // 什么時候觸發std::function<void()> callback_; // 異步任務,回調函數
};class Timer
{
public:static uint64_t GetCurrentTime(){using namespace std::chrono;return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();}// 添加延時任務TimerNode *AddTimeout(uint64_t diff, std::function<void()> cb){// 根據 diff 創建延時任務auto node = new TimerNode(GetCurrentTime() + diff, std::move(cb));if (timer_map_.empty() || node->timeout_ < timer_map_.rbegin()->first){auto it = timer_map_.insert(std::make_pair(node->timeout_, std::move(node)));return it->second;}else{auto it = timer_map_.emplace_hint(timer_map_.crbegin().base(), std::make_pair(node->timeout_, std::move(node)));return it->second;}}// 刪除延時任務void DelTimeout(TimerNode *node){auto it = timer_map_.equal_range(node->timeout_);for (auto iter = it.first; iter != it.second; iter++){if (iter->second == node){timer_map_.erase(iter);break;}}}// 獲取延時時間int WaitTime(){auto iter = timer_map_.begin();if (iter == timer_map_.end()){return -1;}uint64_t diff = iter->first - GetCurrentTime();return diff > 0 ? diff : 0;}// 處理延時事件void HandleTimeout(){auto iter = timer_map_.begin();while (iter != timer_map_.end() && iter->first <= GetCurrentTime()){iter->second->callback_();iter = timer_map_.erase(iter);}}private:std::multimap<uint64_t, TimerNode *> timer_map_; // 前面代表超時時間,后面代表超時任務// std::unordered_map<int, TimerNode *> map_; // id與延時任務對應的關系Timer() = default;Timer(const Timer &) = delete;Timer &operator=(const Timer &) = delete;Timer(Timer &&) = delete;Timer &operator=(Timer &&) = delete;~Timer(){for (auto &pair : timer_map_){delete pair.second;}}
};#endif
當前定時器存在一個可以優化的點,就在于我們需要讓他能夠全局被進行使用,不希望每次不同對象去使用都去進行創建,在這里可以將他設置成為單例的,提供給全局進行使用,優化后代碼如下:
#ifndef __TIMER__
#define __TIMER__#include <map>
#include <functional>
#include <chrono>// TimerNode 結點,延時任務
class TimerNode
{
public:friend class Timer;TimerNode(uint64_t timeout, std::function<void()> cb): timeout_(timeout), callback_(std::move(cb)){}private:// int id_;uint64_t timeout_; // 什么時候觸發std::function<void()> callback_; // 異步任務,回調函數
};class Timer
{
public:// 單例模式static Timer* GetInstance(){static Timer instance;return &instance;}static uint64_t GetCurrentTime(){using namespace std::chrono;return duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();}// 添加延時任務TimerNode *AddTimeout(uint64_t diff, std::function<void()> cb){// 根據 diff 創建延時任務auto node = new TimerNode(GetCurrentTime() + diff, std::move(cb));if (timer_map_.empty() || node->timeout_ < timer_map_.rbegin()->first){auto it = timer_map_.insert(std::make_pair(node->timeout_, std::move(node)));return it->second;}else{auto it = timer_map_.emplace_hint(timer_map_.crbegin().base(), std::make_pair(node->timeout_, std::move(node)));return it->second;}}// 刪除延時任務void DelTimeout(TimerNode *node){auto it = timer_map_.equal_range(node->timeout_);for (auto iter = it.first; iter != it.second; iter++){if (iter->second == node){timer_map_.erase(iter);break;}}}// 獲取延時時間int WaitTime(){auto iter = timer_map_.begin();if (iter == timer_map_.end()){return -1;}uint64_t diff = iter->first - GetCurrentTime();return diff > 0 ? diff : 0;}// 處理延時事件void HandleTimeout(){auto iter = timer_map_.begin();while (iter != timer_map_.end() && iter->first <= GetCurrentTime()){iter->second->callback_();iter = timer_map_.erase(iter);}}private:std::multimap<uint64_t, TimerNode *> timer_map_; // 前面代表超時時間,后面代表超時任務// std::unordered_map<int, TimerNode *> map_; // id與延時任務對應的關系Timer() = default;Timer(const Timer &) = delete;Timer &operator=(const Timer &) = delete;Timer(Timer &&) = delete;Timer &operator=(Timer &&) = delete;~Timer(){for (auto &pair : timer_map_){delete pair.second;}}
};#endif
這就是當前整個定時器的設計,他主要就是用來去處理延時任務的,當我們需要處理大量延時任務的時候,使用定時器就會比較合適。