? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第一章
1,前言
Libevent是一個輕量級的開源高性能網絡庫,使用者眾多,研究者更甚,相關文章也不少。寫這一系列文章的用意在于,一則分享心得;二則對libevent代碼和設計思想做系統的、更深層次的分析,寫出來,也可供后來者參考。
附帶一句:Libevent是用c語言編寫的(MS大牛們都偏愛c語言哪),而且幾乎是無處不函數指針,學習其源代碼也需要相當的c語言基礎。
?
2,libevent簡介
上來當然要先夸獎啦,Libevent 有幾個顯著的亮點:
? => 事件驅動(event-driven),高性能;
? => 輕量級,專注于網絡,不如ACE那么臃腫龐大;
? => 源代碼相當精煉、易讀;
? => 跨平臺,支持Windows、Linux、*BSD和Mac Os;
? => 支持多種I/O多路復用技術, epoll、poll、dev/poll、select和kqueue等;
? => 支持I/O,定時器和信號等事件;
? => 注冊事件優先級;
Libevent已經被廣泛的應用,作為底層的網絡庫;比如memcached、Vomit、Nylon、Netchat等等。
Libevent當前的最新穩定版是1.4.13;這也是本文參照的版本。
?
3,學習的好處
學習libevent有助于提升程序設計功力,除了網絡程序設計方面外,Libevent的代碼里有很多有用的設計技巧和基礎數據結構,比如信息隱藏、函數指針、c語言的多態支持、鏈表和堆等等,都有助于提升自身的程序功力。
程序設計不止要了解框架,很多細節之處恰恰也是事關整個系統成敗的關鍵。只對libevent本身的框架大概了解,那或許僅僅是一知半解,不深入代碼分析,就難以了解其設計的精巧之處,也就難以為自己所用。
事實上Libevent本身就是一個典型的Reactor模型,理解Reactor模式是理解libevent的基石;因此下一節將介紹典型的事件驅動設計模式——Reactor模式。
?
參考資料:Libevent:?http://monkey.org/~provos/libevent/
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第二章
前面講到,整個libevent本身就是一個Reactor,因此本節將專門對Reactor模式進行必要的介紹,并列出libevnet中的幾個重要組件和Reactor的對應關系,在后面的章節中可能還會提到本節介紹的基本概念。
1,Reactor的事件處理機制
首先來回想一下普通函數調用的機制:程序調用某函數?函數執行,程序等待?函數將結果和控制權返回給程序?程序繼續處理。
Reactor釋義“反應堆”,是一種事件驅動機制。和普通函數調用的不同之處在于:應用程序不是主動的調用某個API完成處理,而是恰恰相反,Reactor逆置了事件處理流程,應用程序需要提供相應的接口并注冊到Reactor上,如果相應的時間發生,Reactor將主動調用應用程序注冊的接口,這些接口又稱為“回調函數”。使用Libevent也是想Libevent框架注冊相應的事件和回調函數;當這些時間發聲時,Libevent會調用這些回調函數處理相應的事件(I/O讀寫、定時和信號)。
? ? 用“好萊塢原則”來形容Reactor再合適不過了:不要打電話給我們,我們會打電話通知你。
? ? 舉個例子:你去應聘某xx公司,面試結束后。
“普通函數調用機制”公司HR比較懶,不會記你的聯系方式,那怎么辦呢,你只能面試完后自己打電話去問結果;有沒有被錄取啊,還是被據了;
“Reactor”公司HR就記下了你的聯系方式,結果出來后會主動打電話通知你:有沒有被錄取啊,還是被據了;你不用自己打電話去問結果,事實上也不能,你沒有HR的留聯系方式。
?
2 ,Reactor模式的優點
Reactor模式是編寫高性能網絡服務器的必備技術之一,它具有如下的優點
??? 1)響應快,不必為單個同步時間所阻塞,雖然Reactor本身依然是同步的;
??? 2)編程相對簡單,可以最大程度的避免復雜的多線程及同步問題,并且避免了多線程/進程的切換開銷;
??? 3)可擴展性,可以方便的通過增加Reactor實例個數來充分利用CPU資源;
??? 4)可復用性,reactor框架本身與具體事件處理邏輯無關,具有很高的復用性;
3 ,Reactor模式框架
?使用Reactor模型,必備的幾個組件:事件源、Reactor框架、多路復用機制和事件處理程序,先來看看Reactor模型的整體框架,接下來再對每個組件做逐一說明。
?
1) 事件源
Linux上是文件描述符,Windows上就是Socket或者Handle了,這里統一稱為“句柄集”;程序在指定的句柄上注冊關心的事件,比如I/O事件。
2) event demultiplexer——事件多路分發機制
由操作系統提供的I/O多路復用機制,比如select和epoll。
??? 程序首先將其關心的句柄(事件源)及其事件注冊到event demultiplexer上;
當有事件到達時,event demultiplexer會發出通知“在已經注冊的句柄集中,一個或多個句柄的事件已經就緒”;
??? 程序收到通知后,就可以在非阻塞的情況下對事件進行處理了。
對應到libevent中,依然是select、poll、epoll等,但是libevent使用結構體eventop進行了封裝,以統一的接口來支持這些I/O多路復用機制,達到了對外隱藏底層系統機制的目的。
3) Reactor——反應器
??? Reactor,是事件管理的接口,內部使用event demultiplexer注冊、注銷事件;并運行事件循環,當有事件進入“就緒”狀態時,調用注冊事件的回調函數處理事件。
對應到libevent中,就是event_base結構體。
一個典型的Reactor聲明方式
1 class Reactor
2 {
3 public:
4 int register_handler(Event_Handler *pHandler, int event);
5 int remove_handler(Event_Handler *pHandler, int event);
6 void handle_events(timeval *ptv);
7 // ...
8 };
?
4) Event Handler——事件處理程序
??? 事件處理程序提供了一組接口,每個接口對應了一種類型的事件,供Reactor在相應的事件發生時調用,執行相應的事件處理。通常它會綁定一個有效的句柄。
對應到libevent中,就是event結構體。
下面是兩種典型的Event Handler類聲明方式,二者互有優缺點。
1 class Event_Handler
2 {
3 public:
4 virtual void handle_read() = 0;
5 virtual void handle_write() = 0;
6 virtual void handle_timeout() = 0;
7 virtual void handle_close() = 0;
8 virtual HANDLE get_handle() = 0;
9 // ...
10 };
11 class Event_Handler
12 {
13 public:
14 // events maybe read/write/timeout/close .etc
15 virtual void handle_events(int events) = 0;
16 virtual HANDLE get_handle() = 0;
17 // ...
18 };
?
4 ,Reactor事件處理流程
前面說過Reactor將事件流“逆置”了,那么使用Reactor模式后,事件控制流是什么樣子呢?
可以參見下面的序列圖。
?
5 ,小結
上面講到了Reactor的基本概念、框架和處理流程,對Reactor有個基本清晰的了解后,再來對比看libevent就會更容易理解了,接下來就正式進入到libevent的代碼世界了,加油!
?
參考資料:
Pattern-Oriented Software Architecture, Patterns for Concurrent and Networked Objects, Volume 2
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第三章
1 ,前言
學習源代碼該從哪里入手?我覺得從程序的基本使用場景和代碼的整體處理流程入手是個不錯的方法,至少從個人的經驗上講,用此方法分析libevent是比較有效的。
2 ,基本應用場景
基本應用場景也是使用libevnet的基本流程,下面來考慮一個最簡單的場景,使用livevent設置定時器,應用程序只需要執行下面幾個簡單的步驟即可。
1)首先初始化libevent庫,并保存返回的指針
1 struct event_base * base = event_init();
實際上這一步相當于初始化一個Reactor實例;在初始化libevent后,就可以注冊事件了。
?
2)初始化事件event,設置回調函數和關注的事件
1 evtimer_set(&ev, timer_cb, NULL);
事實上這等價于調用
1 event_set(&ev, -1, 0, timer_cb, NULL);
event_set的函數原型是:
1 void event_set(struct event *ev, int fd, short event, void (*cb)(int, short, void *), void *arg)
ev:執行要初始化的event對象;
fd:該event綁定的“句柄”,對于信號事件,它就是關注的信號;
event:在該fd上關注的事件類型,它可以是EV_READ, EV_WRITE, EV_SIGNAL;
cb:這是一個函數指針,當fd上的事件event發生時,調用該函數執行處理,它有三個參數,調用時由event_base負責傳入,按順序,實際上就是event_set時的fd, event和arg;
arg:傳遞給cb函數指針的參數;
由于定時事件不需要fd,并且定時事件是根據添加時(event_add)的超時值設定的,因此這里event也不需要設置。
這一步相當于初始化一個event handler,在libevent中事件類型保存在event結構體中。
注意:libevent并不會管理event事件集合,這需要應用程序自行管理;
?
3)設置event從屬的event_base
1 event_base_set(base, &ev);
這一步相當于指明event要注冊到哪個event_base實例上;
?
4)是正式的添加事件的時候了
1 event_add(&ev, timeout);
基本信息都已設置完成,只要簡單的調用event_add()函數即可完成,其中timeout是定時值;
這一步相當于調用Reactor::register_handler()函數注冊事件。
?
5)程序進入無限循環,等待就緒事件并執行事件處理
1 event_base_dispatch(base);
?
3 ,實例代碼
上面例子的程序代碼如下所示
1 struct event ev;
2 struct timeval tv;
3 void time_cb(int fd, short event, void *argc)
4 {
5 printf("timer wakeup/n");
6 event_add(&ev, &tv); // reschedule timer
7 }
8 int main()
9 {
10 struct event_base *base = event_init();
11 tv.tv_sec = 10; // 10s period
12 tv.tv_usec = 0;
13 evtimer_set(&ev, time_cb, NULL);
14 event_add(&ev, &tv);
15 event_base_dispatch(base);
16 }
?
4 ,事件處理流程
當應用程序向libevent注冊一個事件后,libevent內部是怎么樣進行處理的呢?下面的圖就給出了這一基本流程。
? ? 1)首先應用程序準備并初始化event,設置好事件類型和回調函數;這對應于前面第步驟2和3;
? ? 2)向libevent添加該事件event。對于定時事件,libevent使用一個小根堆管理,key為超時時間;對于Signal和I/O事件,libevent將其放入到等待鏈表(wait list)中,這是一個雙向鏈表結構;
? ? 3)程序調用event_base_dispatch()系列函數進入無限循環,等待事件,以select()函數為例;每次循環前libevent會檢查定時事件的最小超時時間tv,根據tv設置select()的最大等待時間,以便于后面及時處理超 ? ? ? ? 時事件;
當select()返回后,首先檢查超時事件,然后檢查I/O事件;
Libevent將所有的就緒事件,放入到激活鏈表中;
然后對激活鏈表中的事件,調用事件的回調函數執行事件處理;
?
5 ,小結
本節介紹了libevent的簡單實用場景,并旋風般的介紹了libevent的事件處理流程,讀者應該對libevent有了基本的印象,下面將會詳細介紹libevent的事件管理框架(Reactor模式中的Reactor框架)做詳細的介紹,在此之前會對源代碼文件做簡單的分類。
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第四章
libevent源代碼文件組織
1 ,前言
詳細分析源代碼之前,如果能對其代碼文件的基本結構有個大概的認識和分類,對于代碼的分析將是大有裨益的。本節內容不多,我想并不是說它不重要!
2 ,源代碼組織結構
Libevent的源代碼雖然都在一層文件夾下面,但是其代碼分類還是相當清晰的,主要可分為頭文件、內部使用的頭文件、輔助功能函數、日志、libevent框架、對系統I/O多路復用機制的封裝、信號管理、定時事件管理、緩沖區管理、基本數據結構和基于libevent的兩個實用庫等幾個部分,有些部分可能就是一個源文件。
源代碼中的test部分就不在我們關注的范疇了。
1)頭文件
主要就是event.h:事件宏定義、接口函數聲明,主要結構體event的聲明;
2)內部頭文件
xxx-internal.h:內部數據結構和函數,對外不可見,以達到信息隱藏的目的;
3)libevent框架
event.c:event整體框架的代碼實現;
4)對系統I/O多路復用機制的封裝
epoll.c:對epoll的封裝;
select.c:對select的封裝;
devpoll.c:對dev/poll的封裝;
kqueue.c:對kqueue的封裝;
5)定時事件管理
min-heap.h:其實就是一個以時間作為key的小根堆結構;
6)信號管理
signal.c:對信號事件的處理;
7)輔助功能函數
evutil.h 和evutil.c:一些輔助功能函數,包括創建socket pair和一些時間操作函數:加、減和比較等。
8)日志
log.h和log.c:log日志函數
9)緩沖區管理
evbuffer.c和buffer.c:libevent對緩沖區的封裝;
10)基本數據結構
compat/sys下的兩個源文件:queue.h是libevent基本數據結構的實現,包括鏈表,雙向鏈表,隊列等;_libevent_time.h:一些用于時間操作的結構體定義、函數和宏定義;
11)實用網絡庫
http和evdns:是基于libevent實現的http服務器和異步dns查詢庫;
?
3 ,小結
本節介紹了libevent的組織和分類,下面將會詳細介紹libevent的核心部分event結構。
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第五章 ? ? ?libevent的核心:事件event
?
對事件處理流程有了高層的認識后,本節將詳細介紹libevent的核心結構event,以及libevent對event的管理。
1 ,libevent的核心-event
? Libevent是基于事件驅動(event-driven)的,從名字也可以看到event是整個庫的核心。event就是Reactor框架中的事件處理程序組件;它提供了函數接口,供Reactor在事件發生時調用,以執行相應的事件處理,通常它會綁定一個有效的句柄。
首先給出event結構體的聲明,它位于event.h文件中:
1 struct event {
2 TAILQ_ENTRY (event) ev_next;
3 TAILQ_ENTRY (event) ev_active_next;
4 TAILQ_ENTRY (event) ev_signal_next;
5 unsigned int min_heap_idx; /* for managing timeouts */
6 struct event_base *ev_base;
7 int ev_fd;
8 short ev_events;
9 short ev_ncalls;
10 short *ev_pncalls; /* Allows deletes in callback */
11 struct timeval ev_timeout;
12 int ev_pri; /* smaller numbers are higher priority */
13 void (*ev_callback)(int, short, void *arg);
14 void *ev_arg;
15 int ev_res; /* result passed to event callback */
16 int ev_flags;
17 };
?
下面簡單解釋一下結構體中各字段的含義。
1)ev_events:event關注的事件類型,它可以是以下3種類型:
I/O事件: EV_WRITE和EV_READ
定時事件:EV_TIMEOUT
信號:??? EV_SIGNAL
輔助選項:EV_PERSIST,表明是一個永久事件
Libevent中的定義為:
1 #define EV_TIMEOUT 0x01
2 #define EV_READ 0x02
3 #define EV_WRITE 0x04
4 #define EV_SIGNAL 0x08
5 #define EV_PERSIST 0x10 /* Persistant event */
?
可以看出事件類型可以使用“|”運算符進行組合,需要說明的是,信號和I/O事件不能同時設置;
還可以看出libevent使用event結構體將這3種事件的處理統一起來;
2)ev_next,ev_active_next和ev_signal_next都是雙向鏈表節點指針;它們是libevent對不同事件類型和在不同的時期,對事件的管理時使用到的字段。
libevent使用雙向鏈表保存所有注冊的I/O和Signal事件,ev_next就是該I/O事件在鏈表中的位置;稱此鏈表為“已注冊事件鏈表”;
同樣ev_signal_next就是signal事件在signal事件鏈表中的位置;
ev_active_next:libevent將所有的激活事件放入到鏈表active list中,然后遍歷active list執行調度,ev_active_next就指明了event在active list中的位置;
3)min_heap_idx和ev_timeout,如果是timeout事件,它們是event在小根堆中的索引和超時值,libevent使用小根堆來管理定時事件,這將在后面定時事件處理時專門講解
4)ev_base該事件所屬的反應堆實例,這是一個event_base結構體,下一節將會詳細講解;
5)ev_fd,對于I/O事件,是綁定的文件描述符;對于signal事件,是綁定的信號;
6)ev_callback,event的回調函數,被ev_base調用,執行事件處理程序,這是一個函數指針,原型為:
1 void (*ev_callback)(int fd, short events, void *arg)
其中參數fd對應于ev_fd;events對應于ev_events;arg對應于ev_arg;
?
7)ev_arg:void*,表明可以是任意類型的數據,在設置event時指定;
8)eb_flags:libevent用于標記event信息的字段,表明其當前的狀態,可能的值有:
1 #define EVLIST_TIMEOUT 0x01 // event在time堆中
2 #define EVLIST_INSERTED 0x02 // event在已注冊事件鏈表中
3 #define EVLIST_SIGNAL 0x04 // 未見使用
4 #define EVLIST_ACTIVE 0x08 // event在激活鏈表中
5 #define EVLIST_INTERNAL 0x10 // 內部使用標記
6 #define EVLIST_INIT 0x80 // event已被初始化
?
9)ev_ncalls:事件就緒執行時,調用ev_callback的次數,通常為1;
10)ev_pncalls:指針,通常指向ev_ncalls或者為NULL;
11)ev_res:記錄了當前激活事件的類型;
?
2 ,libevent對event的管理
從event結構體中的3個鏈表節點指針和一個堆索引出發,大體上也能窺出libevent對event的管理方法了,可以參見下面的示意圖:
?
每次當有事件event轉變為就緒狀態時,libevent就會把它移入到active event list[priority]中,其中priority是event的優先級;
接著libevent會根據自己的調度策略選擇就緒事件,調用其cb_callback()函數執行事件處理;并根據就緒的句柄和事件類型填充cb_callback函數的參數。
?
3 ,事件設置的接口函數
要向libevent添加一個事件,需要首先設置event對象,這通過調用libevent提供的函數有:event_set(), event_base_set(), event_priority_set()來完成;下面分別進行講解。
1 void event_set(struct event *ev, int fd, short events, void (*callback)(int, short, void *), void *arg)
1.設置事件ev綁定的文件描述符或者信號,對于定時事件,設為-1即可;
2.設置事件類型,比如EV_READ|EV_PERSIST, EV_WRITE, EV_SIGNAL等;
3.設置事件的回調函數以及參數arg;
4.初始化其它字段,比如缺省的event_base和優先級;
?
1 int event_base_set(struct event_base *base, struct event *ev)
設置event ev將要注冊到的event_base;
libevent有一個全局event_base指針current_base,默認情況下事件ev將被注冊到current_base上,使用該函數可以指定不同的event_base;
如果一個進程中存在多個libevent實例,則必須要調用該函數為event設置不同的event_base;
?
1 int event_priority_set(struct event *ev, int pri)
設置event ev的優先級,沒什么可說的,注意的一點就是:當ev正處于就緒狀態時,不能設置,返回-1。
?
4 ,小結
?本節講述了libevent的核心event結構,以及libevent支持的事件類型和libevent對event的管理模型;接下來將會描述libevent的事件處理框架,以及其中使用的重要的結構體event_base;
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第六章 ??初見事件處理框架
?
前面已經對libevent的事件處理框架和event結構體做了描述,現在是時候剖析libevent對事件的詳細處理流程了,本節將分析libevent的事件處理框架event_base和libevent注冊、刪除事件的具體流程,可結合前一節libevent對event的管理。
1 ,事件處理框架-event_base
回想Reactor模式的幾個基本組件,本節講解的部分對應于Reactor框架組件。在libevent中,這就表現為event_base結構體,結構體聲明如下,它位于event-internal.h文件中:
1 struct event_base {
2 const struct eventop *evsel;
3 void *evbase;
4 int event_count; /* counts number of total events */
5 int event_count_active; /* counts number of active events */
6 int event_gotterm; /* Set to terminate loop */
7 int event_break; /* Set to terminate loop immediately */
8 /* active event management */
9 struct event_list **activequeues;
10 int nactivequeues;
11 /* signal handling info */
12 struct evsignal_info sig;
13 struct event_list eventqueue;
14 struct timeval event_tv;
15 struct min_heap timeheap;
16 struct timeval tv_cache;
17 };
?
下面詳細解釋一下結構體中各字段的含義。
1)evsel和evbase這兩個字段的設置可能會讓人有些迷惑,這里你可以把evsel和evbase看作是類和靜態函數的關系,
比如添加事件時的調用行為:evsel->add(evbase, ev),實際執行操作的是evbase;這相當于class::add(instance, ev),instance就是class的一個對象實例。
evsel指向了全局變量static const struct eventop *eventops[]中的一個;
前面也說過,libevent將系統提供的I/O demultiplex機制統一封裝成了eventop結構;因此eventops[]包含了select、poll、kequeue和epoll等等其中的若干個全局實例對象。
evbase實際上是一個eventop實例對象;
先來看看eventop結構體,它的成員是一系列的函數指針, 在event-internal.h文件中:
1 struct eventop {
2 const char *name;
3 void *(*init)(struct event_base *); // 初始化
4 int (*add)(void *, struct event *); // 注冊事件
5 int (*del)(void *, struct event *); // 刪除事件
6 int (*dispatch)(struct event_base *, void *, struct timeval *); // 事件分發
7 void (*dealloc)(struct event_base *, void *); // 注銷,釋放資源
8 /* set if we need to reinitialize the event base */
9 int need_reinit;
10 };
?
也就是說,在libevent中,每種I/O demultiplex機制的實現都必須提供這五個函數接口,來完成自身的初始化、銷毀釋放;對事件的注冊、注銷和分發。
比如對于epoll,libevent實現了5個對應的接口函數,并在初始化時并將eventop的5個函數指針指向這5個函數,那么程序就可以使用epoll作為I/O demultiplex機制了,這個在后面會再次提到。
2)activequeues是一個二級指針,前面講過libevent支持事件優先級,因此你可以把它看作是數組,其中的元素activequeues[priority]是一個鏈表,鏈表的每個節點指向一個優先級為priority的就緒事件event。
?
3)eventqueue,鏈表,保存了所有的注冊事件event的指針。
4)sig是由來管理信號的結構體,將在后面信號處理時專門講解;
5)timeheap是管理定時事件的小根堆,將在后面定時事件處理時專門講解;
6)event_tv和tv_cache是libevent用于時間管理的變量,將在后面講到;
其它各個變量都能因名知意,就不再啰嗦了。
?
2 ,創建和初始化event_base
創建一個event_base對象也既是創建了一個新的libevent實例,程序需要通過調用event_init()(內部調用event_base_new函數執行具體操作)函數來創建,該函數同時還對新生成的libevent實例進行了初始化。
該函數首先為event_base實例申請空間,然后初始化timer mini-heap,選擇并初始化合適的系統I/O 的demultiplexer機制,初始化各事件鏈表;
函數還檢測了系統的時間設置,為后面的時間管理打下基礎。
?
3 ,接口函數
前面提到Reactor框架的作用就是提供事件的注冊、注銷接口;根據系統提供的事件多路分發機制執行事件循環,當有事件進入“就緒”狀態時,調用注冊事件的回調函數來處理事件。
Libevent中對應的接口函數主要就是:
1 int event_add(struct event *ev, const struct timeval *timeout);
2 int event_del(struct event *ev);
3 int event_base_loop(struct event_base *base, int loops);
4 void event_active(struct event *event, int res, short events);
5 void event_process_active(struct event_base *base);
?
本節將按介紹事件注冊和刪除的代碼流程,libevent的事件循環框架將在下一節再具體描述。
對于定時事件,這些函數將調用timer heap管理接口執行插入和刪除操作;對于I/O和Signal事件將調用eventopadd和delete接口函數執行插入和刪除操作(eventop會對Signal事件調用Signal處理接口執行操作);這些組件將在后面的內容描述。
?
1)注冊事件
函數原型:
1 int event_add(struct event *ev, const struct timeval *tv)
參數:
? ? ev:指向要注冊的事件;
? ? tv:超時時間;
函數將ev注冊到ev->ev_base上,事件類型由ev->ev_events指明,如果注冊成功,ev將被插入到已注冊鏈表中;如果tv不是NULL,則會同時注冊定時事件,將ev添加到timer堆上;
如果其中有一步操作失敗,那么函數保證沒有事件會被注冊,可以講這相當于一個原子操作。這個函數也體現了libevent細節之處的巧妙設計,且仔細看程序代碼,部分有省略,注釋直接附在代碼中。
1 int event_add(struct event *ev, const struct timeval *tv)
2 {
3 struct event_base *base = ev->ev_base; // 要注冊到的event_base
4 const struct eventop *evsel = base->evsel;
5 void *evbase = base->evbase; // base使用的系統I/O策略
6 // 新的timer事件,調用timer heap接口在堆上預留一個位置
7 // 注:這樣能保證該操作的原子性:
8 // 向系統I/O機制注冊可能會失敗,而當在堆上預留成功后,
9 // 定時事件的添加將肯定不會失敗;
10 // 而預留位置的可能結果是堆擴充,但是內部元素并不會改變
11 if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
12 if (min_heap_reserve(&base->timeheap,
13 1 + min_heap_size(&base->timeheap)) == -1)
14 return (-1); /* ENOMEM == errno */
15 }
16 // 如果事件ev不在已注冊或者激活鏈表中,則調用evbase注冊事件
17 if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
18 !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
19 res = evsel->add(evbase, ev);
20 if (res != -1) // 注冊成功,插入event到已注冊鏈表中
21 event_queue_insert(base, ev, EVLIST_INSERTED);
22 }
23 // 準備添加定時事件
24 if (res != -1 && tv != NULL) {
25 struct timeval now;
26 // EVLIST_TIMEOUT表明event已經在定時器堆中了,刪除舊的
27 if (ev->ev_flags & EVLIST_TIMEOUT)
28 event_queue_remove(base, ev, EVLIST_TIMEOUT);
29 // 如果事件已經是就緒狀態則從激活鏈表中刪除
30 if ((ev->ev_flags & EVLIST_ACTIVE) &&
31 (ev->ev_res & EV_TIMEOUT)) {
32 // 將ev_callback調用次數設置為0
33 if (ev->ev_ncalls && ev->ev_pncalls) {
34 *ev->ev_pncalls = 0;
35 }
36 event_queue_remove(base, ev, EVLIST_ACTIVE);
37 }
38 // 計算時間,并插入到timer小根堆中
39 gettime(base, &now);
40 evutil_timeradd(&now, tv, &ev->ev_timeout);
41 event_queue_insert(base, ev, EVLIST_TIMEOUT);
42 }
43 return (res);
44 }
45
46 event_queue_insert()負責將事件插入到對應的鏈表中,下面是程序代碼;
47 event_queue_remove()負責將事件從對應的鏈表中刪除,這里就不再重復貼代碼了;
48 void event_queue_insert(struct event_base *base, struct event *ev, int queue)
49 {
50 // ev可能已經在激活列表中了,避免重復插入
51 if (ev->ev_flags & queue) {
52 if (queue & EVLIST_ACTIVE)
53 return;
54 }
55 // ...
56 ev->ev_flags |= queue; // 記錄queue標記
57 switch (queue) {
58 case EVLIST_INSERTED: // I/O或Signal事件,加入已注冊事件鏈表
59 TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
60 break;
61 case EVLIST_ACTIVE: // 就緒事件,加入激活鏈表
62 base->event_count_active++;
63 TAILQ_INSERT_TAIL(base->activequeues[ev->ev_pri], ev, ev_active_next);
64 break;
65 case EVLIST_TIMEOUT: // 定時事件,加入堆
66 min_heap_push(&base->timeheap, ev);
67 break;
68 }
69 }
?
2)刪除事件:
函數原型為:
1 int event_del(struct event *ev);
函數將刪除事件ev,對于I/O事件,從I/O 的demultiplexer上將事件注銷;對于Signal事件,將從Signal事件鏈表中刪除;對于定時事件,將從堆上刪除;
同樣刪除事件的操作則不一定是原子的,比如刪除時間事件之后,有可能從系統I/O機制中注銷會失敗。
1 int event_del(struct event *ev)
2 {
3 struct event_base *base;
4 const struct eventop *evsel;
5 void *evbase;
6 // ev_base為NULL,表明ev沒有被注冊
7 if (ev->ev_base == NULL)
8 return (-1);
9 // 取得ev注冊的event_base和eventop指針
10 base = ev->ev_base;
11 evsel = base->evsel;
12 evbase = base->evbase;
13 // 將ev_callback調用次數設置為
14 if (ev->ev_ncalls && ev->ev_pncalls) {
15 *ev->ev_pncalls = 0;
16 }
17
18 // 從對應的鏈表中刪除
19 if (ev->ev_flags & EVLIST_TIMEOUT)
20 event_queue_remove(base, ev, EVLIST_TIMEOUT);
21 if (ev->ev_flags & EVLIST_ACTIVE)
22 event_queue_remove(base, ev, EVLIST_ACTIVE);
23 if (ev->ev_flags & EVLIST_INSERTED) {
24 event_queue_remove(base, ev, EVLIST_INSERTED);
25 // EVLIST_INSERTED表明是I/O或者Signal事件,
26 // 需要調用I/O demultiplexer注銷事件
27 return (evsel->del(evbase, ev));
28 }
29 return (0);
30 }
?
4 ,小節
分析了event_base這一重要結構體,初步看到了libevent對系統的I/O demultiplex機制的封裝event_op結構,并結合源代碼分析了事件的注冊和刪除處理,下面將會接著分析事件管理框架中的主事件循環部分。
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第七章 ?事件主循環
?
? 現在我們已經初步了解了libevent的Reactor組件——event_base和事件管理框架,接下來就是libevent事件處理的中心部分——事件主循環,根據系統提供的事件多路分發機制執行事件循環,對已注冊的就緒事件,調用注冊事件的回調函數來處理事件。
1 ,階段性的勝利
Libevent的事件主循環主要是通過event_base_loop ()函數完成的,其主要操作如下面的流程圖所示,event_base_loop所作的就是持續執行下面的循環。
?
清楚了event_base_loop所作的主要操作,就可以對比源代碼看個究竟了,代碼結構還是相當清晰的。
1 int event_base_loop(struct event_base *base, int flags)
2 {
3 const struct eventop *evsel = base->evsel;
4 void *evbase = base->evbase;
5 struct timeval tv;
6 struct timeval *tv_p;
7 int res, done;
8 // 清空時間緩存
9 base->tv_cache.tv_sec = 0;
10 // evsignal_base是全局變量,在處理signal時,用于指名signal所屬的event_base實例
11 if (base->sig.ev_signal_added)
12 evsignal_base = base;
13 done = 0;
14 while (!done) { // 事件主循環
15 // 查看是否需要跳出循環,程序可以調用event_loopexit_cb()設置event_gotterm標記
16 // 調用event_base_loopbreak()設置event_break標記
17 if (base->event_gotterm) {
18 base->event_gotterm = 0;
19 break;
20 }
21 if (base->event_break) {
22 base->event_break = 0;
23 break;
24 }
25 // 校正系統時間,如果系統使用的是非MONOTONIC時間,用戶可能會向后調整了系統時間
26 // 在timeout_correct函數里,比較last wait time和當前時間,如果當前時間< last wait time
27 // 表明時間有問題,這是需要更新timer_heap中所有定時事件的超時時間。
28 timeout_correct(base, &tv);
29
30 // 根據timer heap中事件的最小超時時間,計算系統I/O demultiplexer的最大等待時間
31 tv_p = &tv;
32 if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
33 timeout_next(base, &tv_p);
34 } else {
35 // 依然有未處理的就緒時間,就讓I/O demultiplexer立即返回,不必等待
36 // 下面會提到,在libevent中,低優先級的就緒事件可能不能立即被處理
37 evutil_timerclear(&tv);
38 }
39 // 如果當前沒有注冊事件,就退出
40 if (!event_haveevents(base)) {
41 event_debug(("%s: no events registered.", __func__));
42 return (1);
43 }
44 // 更新last wait time,并清空time cache
45 gettime(base, &base->event_tv);
46 base->tv_cache.tv_sec = 0;
47 // 調用系統I/O demultiplexer等待就緒I/O events,可能是epoll_wait,或者select等;
48 // 在evsel->dispatch()中,會把就緒signal event、I/O event插入到激活鏈表中
49 res = evsel->dispatch(base, evbase, tv_p);
50 if (res == -1)
51 return (-1);
52 // 將time cache賦值為當前系統時間
53 gettime(base, &base->tv_cache);
54 // 檢查heap中的timer events,將就緒的timer event從heap上刪除,并插入到激活鏈表中
55 timeout_process(base);
56 // 調用event_process_active()處理激活鏈表中的就緒event,調用其回調函數執行事件處理
57 // 該函數會尋找最高優先級(priority值越小優先級越高)的激活事件鏈表,
58 // 然后處理鏈表中的所有就緒事件;
59 // 因此低優先級的就緒事件可能得不到及時處理;
60 if (base->event_count_active) {
61 event_process_active(base);
62 if (!base->event_count_active && (flags & EVLOOP_ONCE))
63 done = 1;
64 } else if (flags & EVLOOP_NONBLOCK)
65 done = 1;
66 }
67 // 循環結束,清空時間緩存
68 base->tv_cache.tv_sec = 0;
69 event_debug(("%s: asked to terminate loop.", __func__));
70 return (0);
71 }
?
3 ,I/O和Timer事件的統一
? ? Libevent將Timer和Signal事件都統一到了系統的I/O 的demultiplex機制中了,相信讀者從上面的流程和代碼中也能窺出一斑了,下面就再啰嗦一次了。
???? 首先將Timer事件融合到系統I/O多路復用機制中,還是相當清晰的,因為系統的I/O機制像select()和epoll_wait()都允許程序制定一個最大等待時間(也稱為最大超時時間)timeout,即使沒有I/O事件發生,它們也保證能在timeout時間內返回。
? ? ?那么根據所有Timer事件的最小超時時間來設置系統I/O的timeout時間;當系統I/O返回時,再激活所有就緒的Timer事件就可以了,這樣就能將Timer事件完美的融合到系統的I/O機制中了。
???? 這是在Reactor和Proactor模式(主動器模式,比如Windows上的IOCP)中處理Timer事件的經典方法了,ACE采用的也是這種方法,大家可以參考POSA vol2書中的Reactor模式一節。
???? 堆是一種經典的數據結構,向堆中插入、刪除元素時間復雜度都是O(lgN),N為堆中元素的個數,而獲取最小key值(小根堆)的復雜度為O(1);因此變成了管理Timer事件的絕佳人選(當然是非唯一的),libevent就是采用的堆結構。
?
4 ,I/O和Signal事件的統一
? ? ?Signal是異步事件的經典事例,將Signal事件統一到系統的I/O多路復用中就不像Timer事件那么自然了,Signal事件的出現對于進程來講是完全隨機的,進程不能只是測試一個變量來判別是否發生了一個信號,而是必須告訴內核“在此信號發生時,請執行如下的操作”。
? ? ?如果當Signal發生時,并不立即調用event的callback函數處理信號,而是設法通知系統的I/O機制,讓其返回,然后再統一和I/O事件以及Timer一起處理,不就可以了嘛。是的,這也是libevent中使用的方法。
???? 問題的核心在于,當Signal發生時,如何通知系統的I/O多路復用機制,這里先買個小關子,放到信號處理一節再詳細說明,我想讀者肯定也能想出通知的方法,比如使用pipe。
?
5 ,小節
介紹了libevent的事件主循環,描述了libevent是如何處理就緒的I/O事件、定時器和信號事件,以及如何將它們無縫的融合到一起。
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第八章 ? ?集成信號處理
?
現在我們已經了解了libevent的基本框架:事件管理框架和事件主循環。上節提到了libevent中I/O事件和Signal以及Timer事件的集成,這一節將分析如何將Signal集成到事件主循環的框架中。
1 ,集成策略——使用socket pair
前一節已經做了足夠多的介紹了,基本方法就是采用“消息機制”。在libevent中這是通過socket pair完成的,下面就來詳細分析一下。
Socket pair就是一個socket對,包含兩個socket,一個讀socket,一個寫socket。工作方式如下圖所示:
?
創建一個socket pair并不是復雜的操作,可以參見下面的流程圖,清晰起見,其中忽略了一些錯誤處理和檢查。
?
Libevent提供了輔助函數evutil_socketpair()來創建一個socket pair,可以結合上面的創建流程來分析該函數。
?
2 ,集成到事件主循環——通知event_base
? ? ? Socket pair創建好了,可是libevent的事件主循環還是不知道Signal是否發生了啊,看來我們還差了最后一步,那就是:為socket pair的讀socket在libevent的event_base實例上注冊一個persist的讀事件。
??? ? 這樣當向寫socket寫入數據時,讀socket就會得到通知,觸發讀事件,從而event_base就能相應的得到通知了。
? ? ? 前面提到過,Libevent會在事件主循環中檢查標記,來確定是否有觸發的signal,如果標記被設置就處理這些signal,這段代碼在各個具體的I/O機制中,以Epoll為例,在epoll_dispatch()函數中,代碼片段如 ? ? ? ? ??下:
1 res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); 2 if (res == -1) { 3 if (errno != EINTR) { 4 event_warn("epoll_wait"); 5 return (-1); 6 } 7 evsignal_process(base);// 處理signal事件 8 return (0); 9 } else if (base->sig.evsignal_caught) { 10 evsignal_process(base);// 處理signal事件 11 }
?
完整的處理框架如下所示:
?
注1:libevent中,初始化階段并不注冊讀socket的讀事件,而是在注冊信號階段才會測試并注冊;
注2:libevent中,檢查I/O事件是在各系統I/O機制的dispatch()函數中完成的,該dispatch()函數在event_base_loop()函數中被調用;
?
3 ,evsignal_info結構體
Libevent中Signal事件的管理是通過結構體evsignal_info完成的,結構體位于evsignal.h文件中,定義如下:
1 struct evsignal_info { 2 struct event ev_signal; 3 int ev_signal_pair[2]; 4 int ev_signal_added; 5 volatile sig_atomic_t evsignal_caught; 6 struct event_list evsigevents[NSIG]; 7 sig_atomic_t evsigcaught[NSIG]; 8 #ifdef HAVE_SIGACTION 9 struct sigaction **sh_old; 10 #else 11 ev_sighandler_t **sh_old; 12 #endif 13 int sh_old_max; 14 };
?
下面詳細介紹一下個字段的含義和作用:
1)ev_signal, 為socket pair的讀socket向event_base注冊讀事件時使用的event結構體;
2)ev_signal_pair,socket pair對,作用見第一節的介紹;
3)ev_signal_added,記錄ev_signal事件是否已經注冊了;
4)evsignal_caught,是否有信號發生的標記;是volatile類型,因為它會在另外的線程中被修改;
5)evsigvents[NSIG],數組,evsigevents[signo]表示注冊到信號signo的事件鏈表;
6)evsigcaught[NSIG],具體記錄每個信號觸發的次數,evsigcaught[signo]是記錄信號signo被觸發的次數;
7)sh_old記錄了原來的signal處理函數指針,當信號signo注冊的event被清空時,需要重新設置其處理函數;
??? evsignal_info的初始化包括,創建socket pair,設置ev_signal事件(但并沒有注冊,而是等到有信號注冊時才檢查并注冊),并將所有標記置零,初始化信號的注冊事件鏈表指針等。
?
4 ,注冊、注銷signal事件
注冊signal事件是通過evsignal_add(struct event *ev)函數完成的,libevent對所有的信號注冊同一個處理函數evsignal_handler(),該函數將在下一段介紹,注冊過程如下:
1 取得ev要注冊到的信號signo;
2 如果信號signo未被注冊,那么就為signo注冊信號處理函數evsignal_handler();
3 如果事件ev_signal還沒喲注冊,就注冊ev_signal事件;
4 將事件ev添加到signo的event鏈表中;
從signo上注銷一個已注冊的signal事件就更簡單了,直接從其已注冊事件的鏈表中移除即可。如果事件鏈表已空,那么就恢復舊的處理函數;
下面的講解都以signal()函數為例,sigaction()函數的處理和signal()相似。
處理函數evsignal_handler()函數做的事情很簡單,就是記錄信號的發生次數,并通知event_base有信號觸發,需要處理:
1 static void evsignal_handler(int sig) 2 { 3 int save_errno = errno; // 不覆蓋原來的錯誤代碼 4 if (evsignal_base == NULL) { 5 event_warn("%s: received signal %d, but have no base configured", __func__, sig); 6 return; 7 } 8 // 記錄信號sig的觸發次數,并設置event觸發標記 9 evsignal_base->sig.evsigcaught[sig]++; 10 evsignal_base->sig.evsignal_caught = 1; 11 #ifndef HAVE_SIGACTION 12 signal(sig, evsignal_handler); // 重新注冊信號 13 #endif 14 // 向寫socket寫一個字節數據,觸發event_base的I/O事件,從而通知其有信號觸發,需要處理 15 send(evsignal_base->sig.ev_signal_pair[0], "a", 1, 0); 16 errno = save_errno; // 錯誤代碼 17 }
?
5,小節
本節介紹了libevent對signal事件的具體處理框架,包括事件注冊、刪除和socket pair通知機制,以及是如何將Signal事件集成到事件主循環之中的。
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第九章 ??集成定時器事件
?
現在再來詳細分析libevent中I/O事件和Timer事件的集成,與Signal相比,Timer事件的集成會直觀和簡單很多。Libevent對堆的調整操作做了一些優化,本節還會描述這些優化方法。
1,集成到事件主循環
? ? ?因為系統的I/O機制像select()和epoll_wait()都允許程序制定一個最大等待時間(也稱為最大超時時間)timeout,即使沒有I/O事件發生,它們也保證能在timeout時間內返回。
? ? ?那么根據所有Timer事件的最小超時時間來設置系統I/O的timeout時間;當系統I/O返回時,再激活所有就緒的Timer事件就可以了,這樣就能將Timer事件完美的融合到系統的I/O機制中了。
? ? ?具體的代碼在源文件event.c的event_base_loop()中,現在就對比代碼來看看這一處理方法:
1 if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) { 2 // 根據Timer事件計算evsel->dispatch的最大等待時間 3 timeout_next(base, &tv_p); 4 } else { 5 // 如果還有活動事件,就不要等待,讓evsel->dispatch立即返回 6 evutil_timerclear(&tv); 7 } 8 // ... 9 // 調用select() or epoll_wait() 等待就緒I/O事件 10 res = evsel->dispatch(base, evbase, tv_p); 11 // ... 12 // 處理超時事件,將超時事件插入到激活鏈表中 13 timeout_process(base);
?
?timeout_next()函數根據堆中具有最小超時值的事件和當前時間來計算等待時間,下面看看代碼:
1 static int timeout_next(struct event_base *base, struct timeval **tv_p) 2 { 3 struct timeval now; 4 struct event *ev; 5 struct timeval *tv = *tv_p; 6 // 堆的首元素具有最小的超時值 7 if ((ev = min_heap_top(&base->timeheap)) == NULL) { 8 // 如果沒有定時事件,將等待時間設置為NULL,表示一直阻塞直到有I/O事件發生 9 *tv_p = NULL; 10 return (0); 11 } 12 // 取得當前時間 13 gettime(base, &now); 14 // 如果超時時間<=當前值,不能等待,需要立即返回 15 if (evutil_timercmp(&ev->ev_timeout, &now, <=)) { 16 evutil_timerclear(tv); 17 return (0); 18 } 19 // 計算等待的時間=當前時間-最小的超時時間 20 evutil_timersub(&ev->ev_timeout, &now, tv); 21 return (0); 22 }
?
2, Timer小根堆
? ? ? Libevent使用堆來管理Timer事件,其key值就是事件的超時時間,源代碼位于文件min_heap.h中。
所有的數據結構書中都有關于堆的詳細介紹,向堆中插入、刪除元素時間復雜度都是O(lgN),N為堆中元素的個數,而獲取最小key值(小根堆)的復雜度為O(1)。堆是一個完全二叉樹,基本存儲方式是一個數組。
????? Libevent實現的堆還是比較輕巧的,雖然我不喜歡這種編碼方式(搞一些復雜的表達式)。輕巧到什么地方呢,就以插入元素為例,來對比說明,下面偽代碼中的size表示當前堆的元素個數:
典型的代碼邏輯如下:
1 Heap[size++] = new; // 先放到數組末尾,元素個數+1 2 // 下面就是shift_up()的代碼邏輯,不斷的將new向上調整 3 _child = size; 4 while(_child>0) // 循環 5 { 6 _parent = (_child-1)/2; // 計算parent 7 if(Heap[_parent].key < Heap[_child].key) 8 break; // 調整結束,跳出循環 9 swap(_parent, _child); // 交換parent和child 10 }
?
? ? ?而libevent的heap代碼對這一過程做了優化,在插入新元素時,只是為新元素預留了一個位置hole(初始時hole位于數組尾部),但并不立刻將新元素插入到hole上,而是不斷向上調整hole的值,將父節點向下調整,最后確認hole就是新元素的所在位置時,才會真正的將新元素插入到hole上,因此在調整過程中就比上面的代碼少了一次賦值的操作,代碼邏輯是:
???? 下面就是shift_up()的代碼邏輯,不斷的將new的“預留位置”向上調整
1 // 下面就是shift_up()的代碼邏輯,不斷的將new的“預留位置”向上調整 2 _hole = size; // _hole就是為new預留的位置,但并不立刻將new放上 3 while(_hole>0) // 循環 4 { 5 _parent = (_hole-1)/2; // 計算parent 6 if(Heap[_parent].key < new.key) 7 break; // 調整結束,跳出循環 8 Heap[_hole] = Heap[_parent]; // 將parent向下調整 9 _hole = _parent; // 將_hole調整到_parent 10 } 11 Heap[_hole] = new; // 調整結束,將new插入到_hole指示的位置 12 size++; // 元素個數+1
?
由于每次調整都少做一次賦值操作,在調整路徑比較長時,調整效率會比第一種有所提高。libevent中的min_heap_shift_up_()函數就是上面邏輯的具體實現,對應的向下調整函數是min_heap_shift_down_()。
舉個例子,向一個小根堆3, 5, 8, 7, 12中插入新元素2,使用第一中典型的代碼邏輯,其調整過程如下圖所示:
?
使用libevent中的堆調整邏輯,調整過程如下圖所示:
?
對于刪除和元素修改操作,也遵從相同的邏輯,就不再羅嗦了。
?
3, 小節
通過設置系統I/O機制的wait時間,從而簡捷的集成Timer事件;主要分析了libevent對堆調整操作的優化。
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第十章 ??支持I/O多路復用技術
?
Libevent的核心是事件驅動、同步非阻塞,為了達到這一目標,必須采用系統提供的I/O多路復用技術,而這些在Windows、Linux、Unix等不同平臺上卻各有不同,如何能提供優雅而統一的支持方式,是首要關鍵的問題,這其實不難,本節就來分析一下。
1, 統一的關鍵
?Libevent支持多種I/O多路復用技術的關鍵就在于結構體eventop,這個結構體前面也曾提到過,它的成員是一系列的函數指針, 定義在event-internal.h文件中:
1 struct eventop { 2 const char *name; 3 void *(*init)(struct event_base *); // 初始化 4 int (*add)(void *, struct event *); // 注冊事件 5 int (*del)(void *, struct event *); // 刪除事件 6 int (*dispatch)(struct event_base *, void *, struct timeval *); // 事件分發 7 void (*dealloc)(struct event_base *, void *); // 注銷,釋放資源 8 /* set if we need to reinitialize the event base */ 9 int need_reinit; 10 };
?
在libevent中,每種I/O demultiplex機制的實現都必須提供這五個函數接口,來完成自身的初始化、銷毀釋放;對事件的注冊、注銷和分發。
比如對于epoll,libevent實現了5個對應的接口函數,并在初始化時并將eventop的5個函數指針指向這5個函數,那么程序就可以使用epoll作為I/O demultiplex機制了。
?
2, 設置I/O demultiplex機制
Libevent把所有支持的I/O demultiplex機制存儲在一個全局靜態數組eventops中,并在初始化時選擇使用何種機制,數組內容根據優先級順序聲明如下:
1 /* In order of preference */ 2 static const struct eventop *eventops[] = { 3 #ifdef HAVE_EVENT_PORTS 4 &evportops, 5 #endif 6 #ifdef HAVE_WORKING_KQUEUE 7 &kqops, 8 #endif 9 #ifdef HAVE_EPOLL 10 &epollops, 11 #endif 12 #ifdef HAVE_DEVPOLL 13 &devpollops, 14 #endif 15 #ifdef HAVE_POLL 16 &pollops, 17 #endif 18 #ifdef HAVE_SELECT 19 &selectops, 20 #endif 21 #ifdef WIN32 22 &win32ops, 23 #endif 24 NULL 25 };
?
?然后libevent根據系統配置和編譯選項決定使用哪一種I/O demultiplex機制,這段代碼在函數event_base_new()中:
1 base->evbase = NULL; 2 for (i = 0; eventops[i] && !base->evbase; i++) { 3 base->evsel = eventops[i]; 4 base->evbase = base->evsel->init(base); 5 }
?
可以看出,libevent在編譯階段選擇系統的I/O demultiplex機制,而不支持在運行階段根據配置再次選擇。
?以Linux下面的epoll為例,實現在源文件epoll.c中,eventops對象epollops定義如下:
1 const struct eventop epollops = { 2 "epoll", 3 epoll_init, 4 epoll_add, 5 epoll_del, 6 epoll_dispatch, 7 epoll_dealloc, 8 1 /* need reinit */ 9 };
?
變量epollops中的函數指針具體聲明如下,注意到其返回值和參數都和eventop中的定義嚴格一致,這是函數指針的語法限制。
1 static void *epoll_init (struct event_base *); 2 static int epoll_add (void *, struct event *); 3 static int epoll_del (void *, struct event *); 4 static int epoll_dispatch(struct event_base *, void *, struct timeval *); 5 static void epoll_dealloc (struct event_base *, void *);
?
那么如果選擇的是epoll,那么調用結構體eventop的init和dispatch函數指針時,實際調用的函數就是epoll的初始化函數epoll_init()和事件分發函數epoll_dispatch()了;
關于epoll的具體用法這里就不多說了,可以參見介紹epoll的文章(本人的哈哈):http://blog.csdn.net/sparkliang/archive/2009/11/05/4770655.aspx
?
C++語言提供了虛函數來實現多態,在C語言中,這是通過函數指針實現的。對于各類函數指針的詳細說明可以參見文章:http://blog.csdn.net/sparkliang/archive/2009/06/09/4254115.aspx
同樣的,上面epollops以及epoll的各種函數都直接定義在了epoll.c源文件中,對外都是不可見的。對于libevent的使用者而言,完全不會知道它們的存在,對epoll的使用也是通過eventop來完成的,達到了信息隱藏的目的。
?
3, 小節
支持多種I/O demultiplex機制的方法其實挺簡單的,借助于函數指針就OK了。通過對源代碼的分析也可以看出,Libevent是在編譯階段選擇系統的I/O demultiplex機制的,而不支持在運行階段根據配置再次選擇。
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第十一章 ??時間管理
?
為了支持定時器,Libevent必須和系統時間打交道,這一部分的內容也比較簡單,主要涉及到時間的加減輔助函數、時間緩存、時間校正和定時器堆的時間值調整等。下面就結合源代碼來分析一下。
1, 初始化檢測
Libevent在初始化時會檢測系統時間的類型,通過調用函數detect_monotonic()完成,它通過調用clock_gettime()來檢測系統是否支持monotonic時鐘類型:
1 static void detect_monotonic(void) 2 { 3 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 4 struct timespec ts; 5 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) 6 use_monotonic = 1; // 系統支持monotonic時間 7 #endif 8 }
?
Monotonic時間指示的是系統從boot后到現在所經過的時間,如果系統支持Monotonic時間就將全局變量use_monotonic設置為1,設置use_monotonic到底有什么用,這個在后面說到時間校正時就能看出來了。
?
2, 時間緩存
?結構體event_base中的tv_cache,用來記錄時間緩存。這個還要從函數gettime()說起,先來看看該函數的代碼:
1 static int gettime(struct event_base *base, struct timeval *tp) 2 { 3 // 如果tv_cache時間緩存已設置,就直接使用 4 if (base->tv_cache.tv_sec) { 5 *tp = base->tv_cache; 6 return (0); 7 } 8 // 如果支持monotonic,就用clock_gettime獲取monotonic時間 9 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 10 if (use_monotonic) { 11 struct timespec ts; 12 if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) 13 return (-1); 14 tp->tv_sec = ts.tv_sec; 15 tp->tv_usec = ts.tv_nsec / 1000; 16 return (0); 17 } 18 #endif 19 // 否則只能取得系統當前時間 20 return (evutil_gettimeofday(tp, NULL)); 21 }
?
? ? ?如果tv_cache已經設置,那么就直接使用緩存的時間;否則需要再次執行系統調用獲取系統時間。
???? 函數evutil_gettimeofday()用來獲取當前系統時間,在Linux下其實就是系統調用gettimeofday();Windows沒有提供函數gettimeofday,而是通過調用_ftime()來完成的。
???? 在每次系統事件循環中,時間緩存tv_cache將會被相應的清空和設置,再次來看看下面event_base_loop的主要代碼邏輯:
1 int event_base_loop(struct event_base *base, int flags) 2 { 3 // 清空時間緩存 4 base->tv_cache.tv_sec = 0; 5 while(!done){ 6 timeout_correct(base, &tv); // 時間校正 7 // 更新event_tv到tv_cache指示的時間或者當前時間(第一次) 8 // event_tv <--- tv_cache 9 gettime(base, &base->event_tv); 10 // 清空時間緩存-- 時間點1 11 base->tv_cache.tv_sec = 0; 12 // 等待I/O事件就緒 13 res = evsel->dispatch(base, evbase, tv_p); 14 // 緩存tv_cache存儲了當前時間的值-- 時間點2 15 // tv_cache <--- now 16 gettime(base, &base->tv_cache); 17 // .. 處理就緒事件 18 } 19 // 退出時也要清空時間緩存 20 base->tv_cache.tv_sec = 0; 21 return (0); 22 }
?
? ? ?時間event_tv指示了dispatch()上次返回,也就是I/O事件就緒時的時間,第一次進入循環時,由于tv_cache被清空,因此gettime()執行系統調用獲取當前系統時間;而后將會更新為tv_cache指示的時間。
???? 時間tv_cache在dispatch()返回后被設置為當前系統時間,因此它緩存了本次I/O事件就緒時的時間(event_tv)。
? ? ?從代碼邏輯里可以看出event_tv取得的是tv_cache上一次的值,因此event_tv應該小于tv_cache的值。
???? 設置時間緩存的優點是不必每次獲取時間都執行系統調用,這是個相對費時的操作;在上面標注的時間點2到時間點1的這段時間(處理就緒事件時),調用gettime()取得的都是tv_cache緩存的時間。
?
3, 時間校正
? ? ?如果系統支持monotonic時間,該時間是系統從boot后到現在所經過的時間,因此不需要執行校正。
? ? ?根據前面的代碼邏輯,如果系統不支持monotonic時間,用戶可能會手動的調整時間,如果時間被向前調整了(MS前面第7部分講成了向后調整,要改正),比如從5點調整到了3點,那么在時間點2取得的值可能 ? ? ? ?會小于上次的時間,這就需要調整了,下面來看看校正的具體代碼,由函數timeout_correct()完成:
1 static void timeout_correct(struct event_base *base, struct timeval *tv) 2 { 3 struct event **pev; 4 unsigned int size; 5 struct timeval off; 6 if (use_monotonic) // monotonic時間就直接返回,無需調整 7 return; 8 gettime(base, tv); // tv <---tv_cache 9 // 根據前面的分析可以知道event_tv應該小于tv_cache 10 // 如果tv < event_tv表明用戶向前調整時間了,需要校正時間 11 if (evutil_timercmp(tv, &base->event_tv, >=)) { 12 base->event_tv = *tv; 13 return; 14 } 15 // 計算時間差值 16 evutil_timersub(&base->event_tv, tv, &off); 17 // 調整定時事件小根堆 18 pev = base->timeheap.p; 19 size = base->timeheap.n; 20 for (; size-- > 0; ++pev) { 21 struct timeval *ev_tv = &(**pev).ev_timeout; 22 evutil_timersub(ev_tv, &off, ev_tv); 23 } 24 base->event_tv = *tv; // 更新event_tv為tv_cache 25 }
?
在調整小根堆時,因為所有定時事件的時間值都會被減去相同的值,因此雖然堆中元素的時間鍵值改變了,但是相對關系并沒有改變,不會改變堆的整體結構。因此只需要遍歷堆中的所有元素,將每個元素的時間鍵值減去相同的值即可完成調整,不需要重新調整堆的結構。
當然調整完后,要將event_tv值重新設置為tv_cache值了。
?
4, 小節
主要分析了一下libevent對系統時間的處理,時間緩存、時間校正和定時堆的時間值調整等,邏輯還是很簡單的,時間的加減、設置等輔助函數則非常簡單,主要在頭文件evutil.h中,就不再多說了
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第十二章 ? ?讓libevent支持多線程
?
Libevent本身不是多線程安全的,在多核的時代,如何能充分利用CPU的能力呢,這一節來說說如何在多線程環境中使用libevent,跟源代碼并沒有太大的關系,純粹是使用上的技巧。
1, 錯誤使用示例
在多核的CPU上只使用一個線程始終是對不起CPU的處理能力啊,那好吧,那就多創建幾個線程,比如下面的簡單服務器場景
? ? ?1> 主線程創建工作線程1;
? ? ?2> 接著主線程監聽在端口上,等待新的連接;
? ? ?3> 在線程1中執行event事件循環,等待事件到來;
? ? ?4> 新連接到來,主線程調用libevent接口event_add將新連接注冊到libevent上;
?
上面的邏輯看起來沒什么錯誤,在很多服務器設計中都可能用到主線程和工作線程的模式….
可是就在線程1注冊事件時,主線程很可能也在操作事件,比如刪除,修改,通過libevent的源代碼也能看到,沒有同步保護機制,問題麻煩了,看起來不能這樣做啊,難道只能使用單線程不成!?
?
2, 支持多線程的幾種模式
Libevent并不是線程安全的,但這不代表libevent不支持多線程模式,其實方法在前面已經將signal事件處理時就接觸到了,那就是消息通知機制。
一句話,“你發消息通知我,然后再由我在合適的時間來處理”;
說到這就再多說幾句,再打個比方,把你自己比作一個工作線程,而你的頭是主線程,你有一個消息信箱來接收別人發給你的消息,當時頭有個新任務要指派給你。
?
2.1 暴力搶占
那么第一節中使用的多線程方法相當下面的流程:
? ? ?1> 當時你正在做事,比如在寫文檔;
? ? ?2> 你的頭找到了一個任務,要指派給你,比如幫他搞個PPT,哈;
? ? ?3> 頭命令你馬上搞PPT,你這是不得不停止手頭的工作,把PPT搞定了再接著寫文檔;
?
2.2 純粹的消息通知機制
那么基于純粹的消息通知機制的多線程方式就像下面這樣:
? ? ?1> 當時你正在寫文檔;
? ? ?2> 你的頭找到了一個任務,要指派給你,幫他搞個PPT;
? ? ?3> 頭發個消息到你信箱,有個PPT要幫他搞定,這時你并不鳥他;
? ? ?4> 你寫好文檔,接著檢查消息發現頭有個PPT要你搞定,你開始搞PPT;
?
第一種的好處是消息可以立即得到處理,但是很方法很粗暴,你必須立即處理這個消息,所以你必須處理好切換問題,省得把文檔上的內容不小心寫到PPT里。在操作系統的進程通信中,消息隊列(消息信箱)都是操作系統維護的,你不必關心。
第二種的優點是通過消息通知,切換問題省心了,不過消息是不能立即處理的(基于消息通知機制,這個總是難免的),而且所有的內容都通過消息發送,比如PPT的格式、內容等等信息,這無疑增加了通信開銷。
?
2.3 消息通知+同步層
有個折中機制可以減少消息通信的開銷,就是提取一個同步層,還拿上面的例子來說,你把工作安排都存放在一個工作隊列中,而且你能夠保證“任何人把新任務扔到這個隊列”,“自己取出當前第一個任務”等這些操作都能夠保證不會把隊列搞亂(其實就是個加鎖的隊列容器)。
再來看看處理過程和上面有什么不同:
? ? ?1> 當時你正在寫文檔;
? ? ?2> 你的頭找到了一個任務,要指派給你,幫他搞個PPT;
? ? ?3> 頭有個PPT要你搞定,他把任務push到你的工作隊列中,包括了PPT的格式、內容等信息;
? ? ?4> 頭發個消息(一個字節)到你信箱,有個PPT要幫他搞定,這時你并不鳥他;
? ? ?5> 你寫好文檔,發現有新消息(這預示著有新任務來了),檢查工作隊列知道頭有個PPT要你搞定,你開始搞PPT;
…
工作隊列其實就是一個加鎖的容器(隊列、鏈表等等),這個很容易實現實現;而消息通知僅需要一個字節,具體的任務都push到了在工作隊列中,因此想比2.2減少了不少通信開銷。
多線程編程有很多陷阱,線程間資源的同步互斥不是一兩句能說得清的,而且出現bug很難跟蹤調試;這也有很多的經驗和教訓,因此如果讓我選擇,在絕大多數情況下都會選擇機制3作為實現多線程的方法。
?
3 , 例子——memcached
Memcached中的網絡部分就是基于libevent完成的,其中的多線程模型就是典型的消息通知+同步層機制。下面的圖足夠說明其多線程模型了,其中有詳細的文字說明。
?
4, 小節
本節更是libevent的使用方面的技巧,討論了一下如何讓libevent支持多線程,以及幾種支持多線程的機制,和memcached使用libevent的多線程模型
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?第十三章 ?libevent?信號處理注意點
?
前面講到了?libevent?實現多線程的方法,然而在多線程的環境中注冊信號事件,還是有一些情況需要小心處理,那就是不能在多個?libevent?實例上注冊信號事件。依然冠名追加到?libevent?系列。
以?2?個線程為例,做簡單的場景分析。
?
?
1> 首先是創建并初始化線程?1?的?libevent?實例?base1?,線程?1?的?libevent?實例?base2?;
2 >在?base1?上注冊?SIGALRM?信號;在?base2?上注冊?SIGINT?信號;
3 >假設當前?base1?和?base2?上都沒有注冊其他的事件;
4 >線程?1?和?2?都進入?event_base_loop?事件循環:
1 event_base_loop(base1) event_base_loop(base2) 2 3 { { 4 5 if (base2->sig.ev_signal_added) if (base2->sig.ev_signal_added) 6 7 evsignal_base = base1; evsignal_base = base2; 8 9 while(!done) while(!done) 10 11 { { 12 13 … … 14 15 evsel->dispatch(…); evsel->dispatch(…); 16 17 … … 18 19 } } 20 21 } }
?
5> 假設線程?1?先進入?event_base_loop?,并設置?evsignal_base = base1?;并等待;
6> 接著線程?2?也進入?event_base_loop?,并設置?evsignal_base = base2?;并等待;
??于是?evsignal_base?就指向了?base2?;
7> 信號?ALARM?觸發,調用服務例程:
1 static void evsignal_handler(int sig) 2 3 { 4 5 ... 6 7 evsignal_base->sig.evsigcaught[sig]++; 8 9 evsignal_base->sig.evsignal_caught = 1; 10 11 /* Wake up our notification mechanism */ 12 13 send(evsignal_base->sig.ev_signal_pair[0], "a", 1, 0); 14 15 ... 16 17 }
?
于是?base2?得到通知?ALARM?信號發生了,而實際上?ALARM?是注冊在?base1?上的,?base2?上的?ALARM?注冊?event?是空的,于是處理函數將不能得到調用;
因此在?libevent?中,如果需要處理信號,只能將信號注冊到一個?libevent?實例上。
memcached?就沒有使用?libevent?提供的?signal?接口,而是直接使用系統提供的原生?API?,看起來這樣更簡潔。
?