Redis 采用事件驅動機制來處理大量的網絡IO。它并沒有使用 libevent 或者 libev 這樣的成熟開源方案,而是自己實現一個非常簡潔的事件驅動庫 ae_event。
事件機制
Redis中的事件驅動庫只關注網絡IO,以及定時器。
該事件庫處理下面兩類事件:
- 文件事件(file event):用于處理 Redis 服務器和客戶端之間的網絡IO。
- 時間事件(time eveat):Redis 服務器中的一些操作(比如serverCron函數)需要在給定的時間點執行,而時間事件就是處理這類定時操作的。
事件驅動庫的代碼主要是在src/ae.c
中實現的,其示意圖如下所示。
aeEventLoop
是整個事件驅動的核心,它管理著文件事件表和時間事件列表,不斷地循環處理著就緒的文件事件和到期的時間事件。
文件事件
Redis基于Reactor模式開發了自己的網絡事件處理器,也就是文件事件處理器。文件事件處理器使用IO多路復用技術,同時監聽多個套接字,并為套接字關聯不同的事件處理函數。當套接字的可讀或者可寫事件觸發時,就會調用相應的事件處理函數。
- 1. 為什么單線程的 Redis 能那么快?
Redis的瓶頸主要在IO而不是CPU,所以為了省開發量,在6.0版本前是單線程模型;其次,Redis 是單線程主要是指 Redis 的網絡 IO 和鍵值對讀寫是由一個線程來完成的,這也是 Redis 對外提供鍵值存儲服務的主要流程。(但 Redis 的其他功能,比如持久化、異步刪除、集群數據同步等,其實是由額外的線程執行的)。
Redis 采用了多路復用機制使其在網絡 IO 操作中能并發處理大量的客戶端請求,實現高吞吐率。
- 2. Redis事件響應框架ae_event及文件事件處理器
Redis并沒有使用 libevent 或者 libev 這樣的成熟開源方案,而是自己實現一個非常簡潔的事件驅動庫 ae_event。
Redis 使用的IO多路復用技術主要有:select
、epoll
、evport
和kqueue
等。每個IO多路復用函數庫在 Redis 源碼中都對應一個單獨的文件,比如ae_select.c
,ae_epoll.c
, ae_kqueue.c
等。Redis 會根據不同的操作系統,按照不同的優先級選擇多路復用技術。事件響應框架一般都采用該架構,比如 netty 和 libevent。
如下圖所示,文件事件處理器有四個組成部分,它們分別是套接字、I/O多路復用程序、文件事件分派器以及事件處理器。
文件事件是對套接字操作的抽象,每當一個套接字準備好執行 accept
、read
、write
和 close
等操作時,就會產生一個文件事件。因為 Redis 通常會連接多個套接字,所以多個文件事件有可能并發的出現。
I/O多路復用程序負責監聽多個套接字,并向文件事件派發器傳遞那些產生了事件的套接字。
盡管多個文件事件可能會并發地出現,但I/O多路復用程序總是會將所有產生的套接字都放到同一個隊列(也就是后文中描述的aeEventLoop的fired就緒事件表)里邊,然后文件事件處理器會以有序、同步、單個套接字的方式處理該隊列中的套接字,也就是處理就緒的文件事件。
所以,一次 Redis 客戶端與服務器進行連接并且發送命令的過程如上圖所示。
- 客戶端向服務端發起建立 socket 連接的請求,那么監聽套接字將產生 AE_READABLE 事件,觸發連接應答處理器執行。處理器會對客戶端的連接請求
- 進行應答,然后創建客戶端套接字,以及客戶端狀態,并將客戶端套接字的 AE_READABLE 事件與命令請求處理器關聯。
- 客戶端建立連接后,向服務器發送命令,那么客戶端套接字將產生 AE_READABLE 事件,觸發命令請求處理器執行,處理器讀取客戶端命令,然后傳遞給相關程序去執行。
- 執行命令獲得相應的命令回復,為了將命令回復傳遞給客戶端,服務器將客戶端套接字的 AE_WRITEABLE 事件與命令回復處理器關聯。當客戶端試圖讀取命令回復時,客戶端套接字產生 AE_WRITEABLE 事件,觸發命令回復處理器將命令回復全部寫入到套接字中。
- 3. Redis IO多路復用模型
PS:了解處理流程后,我們有必要深入看下Redis IO多路復用的模型,正好我看到極客時間中《Redis核心技術與實戰》中相關內容講的挺容易理解的,就轉過來了
在 Redis 只運行單線程的情況下,該機制允許內核中,同時存在多個監聽套接字和已連接套接字。內核會一直監聽這些套接字上的連接請求或數據請求。一旦有請求到達,就會交給 Redis 線程處理,這就實現了一個 Redis 線程處理多個 IO 流的效果。
下圖就是基于多路復用的 Redis IO 模型。圖中的多個 FD 就是剛才所說的多個套接字。Redis 網絡框架調用 epoll 機制,讓內核監聽這些套接字。此時,Redis 線程不會阻塞在某一個特定的監聽或已連接套接字上,也就是說,不會阻塞在某一個特定的客戶端請求處理上。正因為此,Redis 可以同時和多個客戶端連接并處理請求,從而提升并發性。
基于多路復用的Redis高性能IO模型為了在請求到達時能通知到 Redis 線程,select/epoll 提供了基于事件的回調機制,即針對不同事件的發生,調用相應的處理函數。那么,回調機制是怎么工作的呢?
其實,select/epoll 一旦監測到 FD 上有請求到達時,就會觸發相應的事件。這些事件會被放進一個事件隊列,Redis 單線程對該事件隊列不斷進行處理。這樣一來,Redis 無需一直輪詢是否有請求實際發生,這就可以避免造成 CPU 資源浪費。同時,Redis 在對事件隊列中的事件進行處理時,會調用相應的處理函數,這就實現了基于事件的回調。因為 Redis 一直在對事件隊列進行處理,所以能及時響應客戶端請求,提升 Redis 的響應性能。
為了方便你理解,我再以連接請求和讀數據請求為例,具體解釋一下。
這兩個請求分別對應 Accept 事件和 Read 事件,Redis 分別對這兩個事件注冊 accept 和 get 回調函數。當 Linux 內核監聽到有連接請求或讀數據請求時,就會觸發 Accept 事件和 Read 事件,此時,內核就會回調 Redis 相應的 accept 和 get 函數進行處理。
這就像病人去醫院瞧病。在醫生實際診斷前,每個病人(等同于請求)都需要先分診、測體溫、登記等。如果這些工作都由醫生來完成,醫生的工作效率就會很低。所以,醫院都設置了分診臺,分診臺會一直處理這些診斷前的工作(類似于 Linux 內核監聽請求),然后再轉交給醫生做實際診斷。這樣即使一個醫生(相當于 Redis 單線程),效率也能提升。
時間事件
Redis 的時間事件分為以下兩類:
- 定時事件:讓一段程序在指定的時間之后執行一次。
- 周期性事件:讓一段程序每隔指定時間就執行一次。
Redis 的時間事件的具體定義結構如下所示。
typedef struct aeTimeEvent {/* 全局唯一ID */long long id; /* time event identifier. *//* 秒精確的UNIX時間戳,記錄時間事件到達的時間*/long when_sec; /* seconds *//* 毫秒精確的UNIX時間戳,記錄時間事件到達的時間*/long when_ms; /* milliseconds *//* 時間處理器 */aeTimeProc *timeProc;/* 事件結束回調函數,析構一些資源*/aeEventFinalizerProc *finalizerProc;/* 私有數據 */void *clientData;/* 前驅節點 */struct aeTimeEvent *prev;/* 后繼節點 */struct aeTimeEvent *next;
} aeTimeEvent;
一個時間事件是定時事件還是周期性事件取決于時間處理器的返回值:
- 如果返回值是
AE_NOMORE
,那么這個事件是一個定時事件,該事件在達到后刪除,之后不會再重復。 - 如果返回值是非
AE_NOMORE
的值,那么這個事件為周期性事件,當一個時間事件到達后,服務器會根據時間處理器的返回值,對時間事件的 when 屬性進行更新,讓這個事件在一段時間后再次達到。
服務器所有的時間事件都放在一個無序鏈表中,每當時間事件執行器運行時,它就遍歷整個鏈表,查找所有已到達的時間事件,并調用相應的事件處理器。正常模式下的Redis服務器只使用serverCron一個時間事件,而在benchmark模式下,服務器也只使用兩個時間事件,所以不影響事件執行的性能。
aeEventLoop的具體實現
介紹完文件事件和時間事件,我們接下來看一下 aeEventLoop的具體實現;
創建事件管理器
Redis 服務端在其初始化函數 initServer中,會創建事件管理器aeEventLoop對象。
函數aeCreateEventLoop將創建一個事件管理器,主要是初始化 aeEventLoop的各個屬性值,比如events、fired、timeEventHead和apidata:
- 首先創建aeEventLoop對象。
- 初始化未就緒文件事件表、就緒文件事件表。events指針指向未就緒文件事件表、fired指針指向就緒文件事件表。表的內容在后面添加具體事件時進行初變更。
- 初始化時間事件列表,設置timeEventHead和timeEventNextId屬性。
- 調用aeApiCreate 函數創建epoll實例,并初始化 apidata。
aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;/* 創建事件狀態結構 */if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;/* 創建未就緒事件表、就緒事件表 */eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;/* 設置數組大小 */eventLoop->setsize = setsize;/* 初始化執行最近一次執行時間 */eventLoop->lastTime = time(NULL);/* 初始化時間事件結構 */eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;eventLoop->aftersleep = NULL;/* 將多路復用io與事件管理器關聯起來 */if (aeApiCreate(eventLoop) == -1) goto err;/* 初始化監聽事件 */for (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;return eventLoop;
err:.....
}
aeApiCreate 函數首先創建了aeApiState對象,初始化了epoll就緒事件表;然后調用epoll_create創建了epoll實例,最后將該aeApiState賦值給apidata屬性。
aeApiState對象中epfd存儲epoll的標識,events是一個epoll就緒事件數組,當有epoll事件發生時,所有發生的epoll事件和其描述符將存儲在這個數組中。這個就緒事件數組由應用層開辟空間、內核負責把所有發生的事件填充到該數組。
static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;/* 初始化epoll就緒事件表 */state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}/* 創建 epoll 實例 */state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}/* 事件管理器與epoll關聯 */eventLoop->apidata = state;return 0;
}
typedef struct aeApiState {/* epoll_event 實例描述符*/int epfd;/* 存儲epoll就緒事件表 */struct epoll_event *events;
} aeApiState;
創建文件事件
aeFileEvent是文件事件結構,對于每一個具體的事件,都有讀處理函數和寫處理函數等。Redis 調用aeCreateFileEvent函數針對不同的套接字的讀寫事件注冊對應的文件事件。
typedef struct aeFileEvent {/* 監聽事件類型掩碼,值可以是 AE_READABLE 或 AE_WRITABLE */int mask;/* 讀事件處理器 */aeFileProc *rfileProc;/* 寫事件處理器 */aeFileProc *wfileProc;/* 多路復用庫的私有數據 */void *clientData;
} aeFileEvent;
/* 使用typedef定義的處理器函數的函數類型 */
typedef void aeFileProc(struct aeEventLoop *eventLoop,
int fd, void *clientData, int mask);
比如說,Redis 進行主從復制時,從服務器需要主服務器建立連接,它會發起一個 socekt連接,然后調用aeCreateFileEvent函數針對發起的socket的讀寫事件注冊了對應的事件處理器,也就是syncWithMaster函數。
aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL);
/* 符合aeFileProc的函數定義 */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {....}
aeCreateFileEvent的參數fd指的是具體的socket套接字,proc指fd產生事件時,具體的處理函數,clientData則是回調處理函數時需要傳入的數據。
aeCreateFileEvent主要做了三件事情:
- 以fd為索引,在events未就緒事件表中找到對應事件。
- 調用aeApiAddEvent函數,該事件注冊到具體的底層 I/O 多路復用中,本例為epoll。
- 填充事件的回調、參數、事件類型等參數。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{/* 取出 fd 對應的文件事件結構, fd 代表具體的 socket 套接字 */aeFileEvent *fe = &eventLoop->events[fd];/* 監聽指定 fd 的指定事件 */if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;/* 置文件事件類型,以及事件的處理器 */fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;/* 私有數據 */fe->clientData = clientData;if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}
如上文所說,Redis 基于的底層 I/O 多路復用庫有多套,所以aeApiAddEvent也有多套實現,下面的源碼是epoll下的實現。其核心操作就是調用epoll的epoll_ctl函數來向epoll注冊響應事件。
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning *//* 如果 fd 沒有關聯任何事件,那么這是一個 ADD 操作。如果已經關聯了某個/某些事件,那么這是一個 MOD 操作。 */int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;/* 注冊事件到 epoll */ee.events = 0;mask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;/* 調用epoll_ctl 系統調用,將事件加入epoll中 */if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}
事件處理
因為 Redis 中同時存在文件事件和時間事件兩個事件類型,所以服務器必須對這兩個事件進行調度,決定何時處理文件事件,何時處理時間事件,以及如何調度它們。
aeMain函數以一個無限循環不斷地調用aeProcessEvents函數來處理所有的事件。
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {/* 如果有需要在事件處理前執行的函數,那么執行它 */if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);/* 開始處理事件*/aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);}
}
下面是aeProcessEvents的偽代碼,它會首先計算距離當前時間最近的時間事件,以此計算一個超時時間;然后調用aeApiPoll函數去等待底層的I/O多路復用事件就緒;aeApiPoll函數返回之后,會處理所有已經產生文件事件和已經達到的時間事件。
/* 偽代碼 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {/* 獲取到達時間距離當前時間最接近的時間事件*/time_event = aeSearchNearestTimer();/* 計算最接近的時間事件距離到達還有多少毫秒*/remaind_ms = time_event.when - unix_ts_now();/* 如果事件已經到達,那么remaind_ms為負數,將其設置為0 */if (remaind_ms < 0) remaind_ms = 0;/* 根據 remaind_ms 的值,創建 timeval 結構*/timeval = create_timeval_with_ms(remaind_ms);/* 阻塞并等待文件事件產生,最大阻塞時間由傳入的 timeval 結構決定,如果remaind_ms 的值為0,則aeApiPoll 調用后立刻返回,不阻塞*//* aeApiPoll調用epoll_wait函數,等待I/O事件*/aeApiPoll(timeval);/* 處理所有已經產生的文件事件*/processFileEvents();/* 處理所有已經到達的時間事件*/processTimeEvents();
}
與aeApiAddEvent類似,aeApiPoll也有多套實現,它其實就做了兩件事情,調用epoll_wait阻塞等待epoll的事件就緒,超時時間就是之前根據最快達到時間事件計算而來的超時時間;然后將就緒的epoll事件轉換到fired就緒事件。aeApiPoll就是上文所說的I/O多路復用程序。具體過程如下圖所示。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
{aeApiState *state = eventLoop->apidata;int retval, numevents = 0;// 調用epoll_wait函數,等待時間為最近達到時間事件的時間計算而來。retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);// 有至少一個事件就緒?if (retval > 0) {int j;/*為已就緒事件設置相應的模式,并加入到 eventLoop 的 fired 數組中*/numevents = retval;for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN)mask |= AE_READABLE;if (e->events & EPOLLOUT)mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE;if (e->events & EPOLLHUP)mask |= AE_WRITABLE;/* 設置就緒事件表元素 */eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}// 返回已就緒事件個數return numevents;
}
processFileEvent是處理就緒文件事件的偽代碼,也是上文所述的文件事件分派器,它其實就是遍歷fired就緒事件表,然后根據對應的事件類型來調用事件中注冊的不同處理器,讀事件調用rfileProc,而寫事件調用wfileProc。
void processFileEvent(int numevents) {for (j = 0; j < numevents; j++) {/* 從已就緒數組中獲取事件 */aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int fired = 0;int invert = fe->mask & AE_BARRIER;/* 讀事件 */if (!invert && fe->mask & mask & AE_READABLE) {/* 調用讀處理函數 */fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}/* 寫事件. */if (fe->mask & mask & AE_WRITABLE) {if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}if (invert && fe->mask & mask & AE_READABLE) {if (!fired || fe->wfileProc != fe->rfileProc) {fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}processed++;}}
}
而processTimeEvents是處理時間事件的函數,它會遍歷aeEventLoop的事件事件列表,如果時間事件到達就執行其timeProc函數,并根據函數的返回值是否等于AE_NOMORE來決定該時間事件是否是周期性事件,并修改器到達時間。
static int processTimeEvents(aeEventLoop *eventLoop) {int processed = 0;aeTimeEvent *te;long long maxId;time_t now = time(NULL);....eventLoop->lastTime = now;te = eventLoop->timeEventHead;maxId = eventLoop->timeEventNextId-1;/* 遍歷時間事件鏈表 */while(te) {long now_sec, now_ms;long long id;/* 刪除需要刪除的時間事件 */if (te->id == AE_DELETED_EVENT_ID) {aeTimeEvent *next = te->next;if (te->prev)te->prev->next = te->next;elseeventLoop->timeEventHead = te->next;if (te->next)te->next->prev = te->prev;if (te->finalizerProc)te->finalizerProc(eventLoop, te->clientData);zfree(te);te = next;continue;}/* id 大于最大maxId,是該循環周期生成的時間事件,不處理 */if (te->id > maxId) {te = te->next;continue;}aeGetTime(&now_sec, &now_ms);/* 事件已經到達,調用其timeProc函數*/if (now_sec > te->when_sec ||(now_sec == te->when_sec && now_ms >= te->when_ms)){int retval;id = te->id;retval = te->timeProc(eventLoop, id, te->clientData);processed++;/* 如果返回值不等于 AE_NOMORE,表示是一個周期性事件,修改其when_sec和when_ms屬性*/if (retval != AE_NOMORE) {aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);} else {/* 一次性事件,標記為需刪除,下次遍歷時會刪除*/te->id = AE_DELETED_EVENT_ID;}}te = te->next;}return processed;
}
刪除事件
當不在需要某個事件時,需要把事件刪除掉。例如: 如果fd同時監聽讀事件、寫事件。當不在需要監聽寫事件時,可以把該fd的寫事件刪除。
aeDeleteEventLoop函數的執行過程總結為以下幾個步驟
- 根據fd在未就緒表中查找到事件
- 取消該fd對應的相應事件標識符
- 調用aeApiFree函數,內核會將epoll監聽紅黑樹上的相應事件監聽取消。