timer 模塊
timer的定義,cyberrt中timer模塊用于設置定時器任務,字面意思,設置設置定時周期及出發頻次(周期 or oneshot),到達指定時間時間觸發callback
time wheel
時鐘節拍輪,常見的定時器設計,例如ucos中的定時器,Linux Crontab等,cyberrt也是采用了時鐘輪
時間輪(TimingWheel)簡單來說,是一個 存儲定時任務的循環隊列,隊列中的每個元素都可以放置一個定時任務列表(TimeBucket) 。TimeBucket 是一個list,鏈表中的每一項表示的都是定時任務(TimerTask)。
時鐘輪示意圖
類圖
@startuml
class Timer{
+explicit Timer(TimerOption opt)
+void Start()
+void Stop()
-TimerOption timer_opt_;
-TimingWheel* timing_wheel_ = nullptr;
-std::shared_ptr<TimerTask> task_;
}
class TimingWheel {- TimerBucket work_wheel_[WORK_WHEEL_SIZE]- TimerBucket assistant_wheel_[ASSISTANT_WHEEL_SIZE]- std::thread tick_thread_+ ~TimingWheel()+ void Start()+ void Shutdown()+ void Tick()+ void AddTask(const std::shared_ptr<TimerTask>& task) + void TickFunc()}
class TimerBucket {- std::mutex mutex_- std::list<std::weak_ptr<TimerTask>> task_list_+ void AddTask(const std::shared_ptr<TimerTask>& task)+ std::mutex& mutex()+ std::list<std::weak_ptr<TimerTask>>& task_list()
}struct TimerTask {+ TimerTask(uint64_t timer_id)+ uint64_t timer_id_+ std::function<void()> callback+ uint64_t interval_ms+ uint64_t remainder_interval_ms+ uint64_t next_fire_duration_ms+ int64_t accumulated_error_ns+ uint64_t last_execute_time_ns+ std::mutex mutex
}
class TimerOption {+ uint32_t period+ std::function<void()> callback+ bool oneshot+ TimerOption(uint32_t period, std::function<void()> callback, bool oneshot)+ TimerOption()
}note left of Timer外部主要調用類,定時器對象實體
end notenote left of TimerOption配置參數,主要作為入參構造timer,用于配置定時器
end notenote left of TimingWheel環形隊列,cyberrt內部實現為二級時鐘節拍輪,單例
end notenote left of TimerBucket環形隊列中的元素,為鏈表,存儲std::weak_ptr<TimerTask>,等價線程安全的list<std::weak_ptr<TimerTask>>
end noteTimer --> TimerOption : 入參依賴
Timer --> TimingWheel : 成員依賴
TimingWheel --> TimerBucket : 成員依賴
TimerBucket --> TimerTask : 成員依賴 @enduml
實現原理
timingwheel 中的tick線程用于統計時間時間,并獲取指向的時鐘節拍論里面的元素進行任務觸發,并將任務丟到cyberrt的攜程池里運行,timewheel的實現基本都大同小異,這里不細說。
bug
bug主要是call back的生命周期管理問題,在代碼中其實已經考慮了一部分(shared_ptr+weak_ptr)已經保證了一部分的懸空指針的問題,但是不完全。
bool Timer::InitTimerTask() {task_.reset(new TimerTask(timer_id_));task_->interval_ms = timer_opt_.period;task_->next_fire_duration_ms = task_->interval_ms;if (timer_opt_.oneshot) {std::weak_ptr<TimerTask> task_weak_ptr = task_;task_->callback = [callback = this->timer_opt_.callback, task_weak_ptr]() {auto task = task_weak_ptr.lock();if (task) {std::lock_guard<std::mutex> lg(task->mutex);callback();}};} else {std::weak_ptr<TimerTask> task_weak_ptr = task_;task_->callback = [callback = this->timer_opt_.callback, task_weak_ptr]() {auto task = task_weak_ptr.lock();if (!task) {return;}XXXX //省略TimingWheel::Instance()->AddTask(task);};}return true;
}void TimingWheel::Tick() {auto& bucket = work_wheel_[current_work_wheel_index_];{std::lock_guard<std::mutex> lock(bucket.mutex());auto ite = bucket.task_list().begin();while (ite != bucket.task_list().end()) {auto task = ite->lock();if (task) {ADEBUG << "index: " << current_work_wheel_index_<< " timer id: " << task->timer_id_;auto* callback =reinterpret_cast<std::function<void()>*>(&(task->callback));cyber::Async([this, callback] {if (this->running_) {(*callback)();}});}ite = bucket.task_list().erase(ite);}}
}
以上代碼,構建TimerTask添加到timing wheel中,task為share_ptr,所有權歸屬timer對象很合理,傳遞weak_ptr對象給timingWhee,timer對象釋放后,節拍輪里面的weak_ptr不再有效進行不必要的觸發,這也很正確,合理的考慮了timer對象持有的task和timingwheel中task的異步生命周期管理的問題。
但是在tick函數中,將觸發的task放到異步協程池運行的時候則出現問題了,未考慮異步生命周期的問題。
現在假設我們有這樣一個場景
創建了一個10ms的周期timer,經過第一個10ms時觸發了定時器并將回掉丟到協程池中并推出tick,假設當時協程池比較繁忙,有恰巧我們迅速釋放掉timer對象或者stop掉timer對象,you lose,程序boom了。
因為傳遞task這個share_ptr對象里的callback 成員函數進行了異步調用,tick函數內雖然正確獲取了task,退出該函數,task引用計數-1,于此同時我們stop了timer 對象s或者 析構了對象,time持有的task,引用再次-1歸0,tick函數內cyber::Async雖然正確獲取了task的callback,但此時,已經是懸空指針了。
修改
知道了原因,改起來也很方便,無非就是異步調用指針的管理而已。
void TimingWheel::Tick() {auto& bucket = work_wheel_[current_work_wheel_index_];{std::lock_guard<std::mutex> lock(bucket.mutex());auto ite = bucket.task_list().begin();while (ite != bucket.task_list().end()) {auto task = ite->lock();if (task) {ADEBUG << "index: " << current_work_wheel_index_<< " timer id: " << task->timer_id_;// auto* callback =// reinterpret_cast<std::function<void()>*>(&(task->callback));cyber::Async([this, weakTask = std::weak_ptr<TimerTask>(task)] {auto task = weakTask->lock();if (this->running_ && task ) {task->callback();}});}ite = bucket.task_list().erase(ite);}}
}