提示:文章寫完后,目錄可以自動生成,如何生成可參考右邊的幫助文檔
文章目錄
- 1、用timerfd加epoll封裝定時器的優點
- 2、代碼實現
1、用timerfd加epoll封裝定時器的優點
定時器為什么需要timerfd
在設計定時器時,我們首先想到的就是設置一個定時任務觸發的時間,然后不斷判斷(死循環)當前時間是否大于等于定時任務觸發的時間,如果是,那么就處理定時任務。這就是最為簡單的設計,在我之前的博客中[定時器的簡單實現],就是這么實現的,但是這樣設計會存在諸多問題
- CPU資源浪費
使用死循環來檢查時間意味著CPU必須不斷地執行這段代碼,即使大部分時間都是在做無用的比較。這會導致CPU資源的浪費,尤其是在高性能的服務器或多任務環境中。 - 響應性下降
由于CPU忙于執行定時器的檢查,它可能無法及時響應其他重要的事件或任務,導致系統響應性下降。 - 不準確
依賴于系統的時鐘分辨率和調度器延遲,使用死循環檢查時間的方法可能無法實現精確的定時。例如,如果系統時鐘的分辨率是毫秒級,而你嘗試實現微秒級的定時,那么這種方法就無法滿足需求。 - 不適合長時間等待
如果定時任務觸發的時間間隔很長(例如幾小時或幾天),那么使用死循環來等待這段時間是非常低效的。
為解決上述問題,就產生了timerfd,當使用timerfd_create創建timerfd時,并設置了定時任務,當定時任務時間到達時,那么timerfd就變成了可讀,經常與 select/poll/epoll搭配使用
這里我們不需要輪詢這個timerfd,判斷timerfd是否有數據(是否可讀),因為這樣做也會帶來上述問題,因此我們需要將timerfd加入到select/poll/epoll中,讓它們輪詢,一般來說使用epoll更高效
- 統一的事件處理:epoll是Linux下多路復用IO接口select/poll的增強版本,它可以高效地處理大量的文件描述符和I/O事件。通過將timerfd的文件描述符加入epoll的監控集合中,可以將定時器超時事件與其他I/O事件進行統一處理,簡化了事件驅動編程的復雜性。
- 提高并發性能:在高并發的網絡服務器中,使用epoll可以監控多個套接字的I/O事件,而使用timerfd可以實現定時任務(如心跳檢測、超時處理等)。這種結合使用的方式可以提高系統的并發性能和吞吐量。
- 減少系統調用開銷:由于epoll采用I/O多路復用機制,并且只在有事件發生時才進行通知,因此可以減少不必要的系統調用開銷。同時,由于timerfd的精度較高,可以減少因輪詢而產生的額外開銷。
2、代碼實現
定時任務
//TimerEvent.h
#pragma once
#include <cstdint>
#include <functional>
#include <sys/time.h>
#include <memory>
class TimerEvent
{
public:using s_ptr = std::shared_ptr<TimerEvent>;template<typename F, typename... Args>TimerEvent(int interval, bool is_repeated, F&& f, Args&&... args):interval_(interval), is_repeated_(is_repeated){auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);task_ = task;}int64_t getArriveTime() const{return arrive_time_;}void setCancler(bool flag){is_cancled_ = flag;}bool isCancle(){return is_cancled_;}bool isRepeated(){return is_repeated_;}std::function<void()> getCallBack(){return task_;}//重新設定任務到達時間void resetArriveTime();//獲取當前時間static int64_t getNowMs();
private:int64_t arrive_time_;//ms 執行任務時毫秒級時間戳,達到對應的時間戳就執行對應的任務int64_t interval_;//ms 隔多少ms后執行bool is_repeated_{false};//是否為周期性的定時任務bool is_cancled_{false};//是否取消std::function<void()> task_;
};//TimerEvent.cpp
#include"TimerEvent.h"int64_t TimerEvent::getNowMs()
{timeval val;gettimeofday(&val, NULL);return val.tv_sec*1000 + val.tv_usec/1000;
}void TimerEvent::resetArriveTime()
{arrive_time_ = getNowMs() + interval_;
}
對timerfd的封裝
//Timer.h
#pragma once
#include <map>
#include <vector>
#include <iostream>
#include "TimerEvent.h"class Timer
{
public:Timer();~Timer();int getFd(){return fd_;}void addTimerEvent(TimerEvent::s_ptr event);void deleteTimerEvent(TimerEvent::s_ptr event);//時間到達就觸發void onTimer();std::vector<std::function<void()>> &getCallbacks(){return callbacks_;}//重新設置任務的到達時間void resetArriveTime();private:int fd_;std::multimap<int64_t, TimerEvent::s_ptr> pending_events_;std::vector<std::function<void()>> callbacks_;
};//Timer.cpp
#include <sys/timerfd.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include "Timer.h"Timer::Timer() : fd_(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK))
{
}Timer::~Timer()
{}void Timer::resetArriveTime()
{if (pending_events_.empty()){return;}int64_t now = TimerEvent::getNowMs();auto it = pending_events_.begin();int64_t inteval = 0;// 第一個任務的定時時間比當前時間大,則重新設置if (it->second->getArriveTime() > now){inteval = it->second->getArriveTime() - now;}else{// 第一個任務的定時時間比當前時間小或相等,說明第一個任務已經超時了,應該立馬執行該任務inteval = 100; // ms}timespec ts;memset(&ts, 0, sizeof(ts));ts.tv_sec = inteval / 1000;//秒ts.tv_nsec = (inteval % 1000) * 1000000;//納秒itimerspec value;memset(&value, 0, sizeof(value));value.it_value = ts;int result = timerfd_settime(fd_, 0, &value, NULL);if (result != 0){printf("timerfd_settime error, errno=%d, error=%s", errno, strerror(errno));}
}void Timer::addTimerEvent(TimerEvent::s_ptr event)
{bool is_reset_timerfd = false;if (pending_events_.empty()){is_reset_timerfd = true;}else{auto it = pending_events_.begin();// 當前需要插入的定時任務時間比已經存在的定時任務時間要早,那么就需要重新設定超時時間,防止任務延時if (it->first > event->getArriveTime()){is_reset_timerfd = true;}}pending_events_.emplace(event->getArriveTime(), event);if (is_reset_timerfd){resetArriveTime();}
}void Timer::deleteTimerEvent(TimerEvent::s_ptr event)
{event->setCancler(true);//pending_events_是multimap,key是時間,可能存在多個相同時間的event//將對應的event從pending_events_中刪除auto begin = pending_events_.lower_bound(event->getArriveTime());auto end = pending_events_.upper_bound(event->getArriveTime());auto it = begin;for(;it != end; ++it){if(it->second == event){break;}}if(it != end){pending_events_.erase(it);}}void Timer::onTimer()
{char buf[8];for(;;){if((read(fd_, buf, 8) == -1) && errno == EAGAIN){break;}}int64_t now = TimerEvent::getNowMs();std::vector<TimerEvent::s_ptr> tmps;std::vector<std::function<void()>>& callbacks_ = getCallbacks();auto it = pending_events_.begin();for(; it != pending_events_.end(); ++it){// 任務已經到時或者超時,并且沒有被取消,就需要執行if((it->first <= now) && !it->second->isCancle()){tmps.push_back(it->second);callbacks_.push_back(it->second->getCallBack());}else{break;// 因為定時任務是升序排的,只要第一個任務沒到時,后面的都沒到時}}//因為把任務已經保存好了,因此需要把m_pending_events中對應的定時任務刪除,防止下次又執行了pending_events_.erase(pending_events_.begin(), it);// 需要把重復的TimerEvent再次添加進去for(auto i = tmps.begin(); i != tmps.end(); ++i){if(!(*i)->isCancle()){//std::cout<<"重新添加"<<std::endl;(*i)->resetArriveTime();addTimerEvent(*i);}}resetArriveTime();
}
對epoll的封裝
//TimerPollPoller.h
#pragma once
#include <sys/epoll.h>
#include <cstring>
#include <unistd.h>
#include <fcntl.h>
#include <atomic>
#include <iostream>
#include "ThreadPool.h"
#include "Timer.h"class TimerPollPoller
{
public:TimerPollPoller(unsigned int num = std::thread::hardware_concurrency()):epollfd_(::epoll_create1(EPOLL_CLOEXEC)),thread_pool_(ThreadPool::instance()),stop_(true){timer_ = std::make_shared<Timer>();struct epoll_event event;memset(&event, 0, sizeof(event));event.data.ptr = reinterpret_cast<void*>(&timer_);event.events = EPOLLIN;::epoll_ctl(epollfd_, EPOLL_CTL_ADD, timer_->getFd(), &event);start();}~TimerPollPoller(){::close(epollfd_);stop();if(t.joinable()){std::cout << "主線程 join thread " << t.get_id() << std::endl;t.join();}}void start();void stop();void addTimerEvent(TimerEvent::s_ptr event);void cancelTimeEvent(TimerEvent::s_ptr event);void handleTimerfdInEpoll();
private:const int epollfd_;std::shared_ptr<Timer> timer_;std::thread t;//單獨起一個線程,進行輪詢epollThreadPool& thread_pool_;std::atomic<bool> stop_;
};//TimerPollPoller.cpp
#include "TimerPollPoller.h"void TimerPollPoller::start()
{t = std::move(std::thread(&TimerPollPoller::handleTimerfdInEpoll, this));
}
void TimerPollPoller::stop()
{stop_.store(true);
}
void TimerPollPoller::addTimerEvent(TimerEvent::s_ptr event)
{timer_->addTimerEvent(event);
}
void TimerPollPoller::cancelTimeEvent(TimerEvent::s_ptr event)
{timer_->deleteTimerEvent(event);
}
void TimerPollPoller::handleTimerfdInEpoll()
{struct epoll_event event;stop_.store(false);while(!stop_.load()){int numEvents = ::epoll_wait(epollfd_, &event, 1, 0);if(numEvents == 1){std::shared_ptr<Timer> timer_ptr = *reinterpret_cast<std::shared_ptr<Timer>*>(event.data.ptr);timer_ptr->onTimer();std::vector<std::function<void()>> callbacks = std::move(timer_ptr->getCallbacks());for(auto task:callbacks){thread_pool_.commit(task);}}}
}
處理任務的線程池
#pragma once
#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <functional>
class ThreadPool {
public:static ThreadPool& instance() {static ThreadPool ins;return ins;}using Task = std::packaged_task<void()>;~ThreadPool() {stop();}template <class F, class... Args>auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {using RetType = decltype(f(args...));if (stop_.load())return std::future<RetType>{};auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<RetType> ret = task->get_future();{std::lock_guard<std::mutex> cv_mt(cv_mt_);//將任務放進任務隊列中tasks_.emplace([task] { (*task)(); });}//喚醒一個線程cv_lock_.notify_one();return ret;}int idleThreadCount() {return thread_num_;}
private:ThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;ThreadPool(unsigned int num = std::thread::hardware_concurrency()): stop_(false) {{if (num < 1)thread_num_ = 1;elsethread_num_ = num;}start();}//啟動所有線程void start() {for (int i = 0; i < thread_num_; ++i) {pool_.emplace_back([this]() {while (!this->stop_.load()) {Task task;{std::unique_lock<std::mutex> cv_mt(cv_mt_);this->cv_lock_.wait(cv_mt, [this] {//stop_為true或者tasks_不為空(return 返回true),則進行下一步,否則阻塞在條件變量上return this->stop_.load() || !this->tasks_.empty();});if (this->tasks_.empty())return;task = std::move(this->tasks_.front());this->tasks_.pop();}this->thread_num_--;task();this->thread_num_++;}});}}void stop() {stop_.store(true);cv_lock_.notify_all();for (auto& td : pool_) {if (td.joinable()) {std::cout << "join thread " << td.get_id() << std::endl;td.join();}}}
private:std::mutex cv_mt_;std::condition_variable cv_lock_;std::atomic_bool stop_;std::atomic_int thread_num_;std::queue<Task> tasks_;std::vector<std::thread> pool_;
};
測試代碼
#include "TimerPollPoller.h"
#include <iostream>
void print()
{std::cout << "I love psy" << std::endl;
}
void print1()
{std::cout << "I love fl" << std::endl;
}int main()
{TimerPollPoller timerPollPoller;TimerEvent::s_ptr timer1 = std::make_shared<TimerEvent>(500, true, print);TimerEvent::s_ptr timer2 = std::make_shared<TimerEvent>(1000, true, print1);timerPollPoller.addTimerEvent(timer1);timerPollPoller.addTimerEvent(timer2);std::this_thread::sleep_for(std::chrono::seconds(2));timerPollPoller.cancelTimeEvent(timer1);std::this_thread::sleep_for(std::chrono::seconds(2));return 0;
}
makefile
PATH_SRC := .
PATH_BIN = bin
PATH_OBJ = objCXX := g++
CXXFLAGS := -g -O0 -std=c++11 -lpthread -Wall -Wno-deprecated -Wno-unused-but-set-variable
CXXFLAGS += -I./SRCS := $(wildcard $(PATH_SRC)/*.cpp)
OBJS := $(patsubst $(PATH_SRC)/%.cpp,$(PATH_OBJ)/%.o,$(SRCS)) TARGET := $(PATH_BIN)/main# 默認目標:生成可執行文件
all : $(TARGET)# 鏈接規則
$(TARGET): $(OBJS)$(CXX) $(CXXFLAGS) $(OBJS) -o $@ $(PATH_OBJ)/%.o: $(PATH_SRC)/%.cpp $(CXX) $(CXXFLAGS) -c $< -o $@ clean:rm -rf $(PATH_OBJ)/*.o $(TARGET).PHONY : clean
使用之間,在當前目錄下需要創建bin目錄和obj目錄,然后再進行make,就能在bin目錄下生產可執行程序main