轉載:http://blog.csdn.net/huangyimo/article/details/46806193
這篇文章介紹下libevent在socket異步編程中的應用。在一些對性能要求較高的網絡應用程序中,為了防止程序阻塞在socket I/O操作上造成程序性能的下降,需要使用異步編程,即程序準備好讀寫的函數(或接口)并向系統注冊,然后在需要的時候只向系統提交讀寫的請求之后就繼續做自己的事情,實際的讀寫操作由系統在合適的時候調用我們程序注冊的接口進行。異步編程會給一些程序猿帶來一些理解和編寫上的困難,因為我們通常寫的一些簡單的程序都是順序執行的,而異步編程將程序的執行順序打亂了,有些代碼什么情況下執行往往不是太清晰,因此也使得編程的復雜度大大增加。
Note:這里系統這個詞使用的不準確,實際上可以是自己封裝的異步調用機制,更常見的是一些可用的庫,比如libevent,ACE等
想了解libevent的工作原理可以自行查詢資料,網上相關的介紹一大堆,也可以自己閱讀源碼進行分析,本文僅從使用的角度做一個簡單的介紹,看如何快速的將libevent引入我們的程序中。任何應用都免不了需要承載其功能的底層OS,libevent也不例外,其內部是通過封裝操作系統的IO復用機制實現的,在linux系統上可能是epoll、kqueu之類的,取決于具體的OS所支持的IO復用方式,在我的系統上是epoll,因此可以理解為libevent提供了一個比epoll更為友好的操作接口,將程序猿從網絡IO處理的細節中解放出來,使其可以專注于目標問題的處理上。
首先,安裝libevent到任意目錄下
wget http://monkey.org/~provos/libevent-1.4.13-stable.tar.gz tar –xzvf libevent-1.4.13-stable.tar.gz cd libevent-1.4.13-stable ./configure --prefix=/home/mydir/libevent make && make install
現在假定我們要設計一個服務器程序,用于接收客戶端的數據,并將接收的數據回寫給客戶端。下面來構造該程序,由于本僅僅是展示一個Demo,因此程序中將不對錯誤進行處理,假設所有的調用都成功
2?#define?PORT 25341 3?#define?BACKLOG 5 4?#define?MEM_SIZE 1024 5? 6?struct?event_base*?base; 7? 8?int?main(int?argc,?char*?argv[]) 9?{ 10?????struct?sockaddr_in my_addr; 11?????int?sock; 12? 13???? sock?=?socket(AF_INET, SOCK_STREAM,?0);? 14?????int?yes?=?1; 15???? setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,?&yes,?sizeof(int)); 16???? memset(&my_addr,?0,?sizeof(my_addr)); 17???? my_addr.sin_family?=?AF_INET; 18???? my_addr.sin_port?=?htons(PORT); 19???? my_addr.sin_addr.s_addr?=?INADDR_ANY; 20???? bind(sock, (struct?sockaddr*)&my_addr,?sizeof(struct?sockaddr)); 21???? listen(sock, BACKLOG); 22? 23?????struct?event?listen_ev; 24?????base?=?event_base_new(); 25???? event_set(&listen_ev, sock, EV_READ|EV_PERSIST, on_accept, NULL); 26???? event_base_set(base,?&listen_ev); 27???? event_add(&listen_ev, NULL); 28???? event_base_dispatch(base); 29? 30?????return?0; 31?}
第13行說明創建的是一個TCP socket。第15行是服務器程序的通常做法,設置了該選項后,在父子進程模型中,當子進程為客戶服務的時候如果父進程退出,可以重新啟動程序完成服務的無縫升級,否則在所有父子進程完全退出前再啟動程序會在該端口上綁定失敗,也即不能完成無縫升級的操作(更多信息可以參考該函數說明或Steven先生的<網絡編程>)。第24行用于創建一個事件處理的全局變量,可以理解為這是一個負責集中處理各種出入IO事件的總管家,它負責接收和派發所有輸入輸出IO事件的信息,這里調用的是函數event_base_new(), 很多程序里這里用的是event_init(),區別就是前者是線程安全的、而后者是非線程安全的,后者在其官方說明中已經被標志為過時的函數、且建議用前者代替,libevent中還有很多類似的函數,比如建議用event_base_dispatch代替event_dispatch,用event_assign代替event_set和event_base_set等,關于libevent接口的詳細說明見其官方說明libevent_doc. 第25行說明在listen_en這個事件監聽sock這個描述字的讀操作,當讀消息到達是調用on_accept函數,EV_PERSIST參數告訴系統持續的監聽sock上的讀事件,如果不加該參數,每次要監聽該事件時就要重復的調用26行的event_add函數,從前面的代碼可知,sock這個描述字是bind到本地的socket端口上,因此其對應的可讀事件自然就是來自客戶端的連接到達,我們就可以調用accept無阻塞的返回客戶的連接了。第26行將listen_ev注冊到base這個事件中,相當于告訴處理IO的管家請留意我的listen_ev上的事件。第27行相當于告訴處理IO的管家,當有我的事件到達時你發給我(調用on_accept函數),至此對listen_ev的初始化完畢。第28行正式啟動libevent的事件處理機制,使系統運行起來,運行程序的話會發現event_base_dispatch是一個無限循環。
下面是on_accept函數的內容
1: void on_accept(int sock, short event, void* arg) 2: { 3: struct sockaddr_in cli_addr; 4: int newfd, sin_size; 5: // read_ev must allocate from heap memory, otherwise the program would crash from segmant fault 6: struct event* read_ev = (struct event*)malloc(sizeof(struct event));; 7: sin_size = sizeof(struct sockaddr_in); 8: newfd = accept(sock, (struct sockaddr*)&cli_addr, &sin_size); 9: event_set(read_ev, newfd, EV_READ|EV_PERSIST, on_read, read_ev); 10: event_base_set(base, read_ev); 11: event_add(read_ev, NULL); 12: }
第9-12與前面main函數的24-26相同,即在代表客戶的描述字newfd上監聽可讀事件,當有數據到達是調用on_read函數。這里有亮點需要注意,一是read_ev需要從堆里malloc出來,如果是在棧上分配,那么當函數返回時變量占用的內存會被釋放,因此事件主循環event_base_dispatch會訪問無效的內存而導致進程崩潰(即crash);第二個要注意的是第9行read_ev作為參數傳遞給了on_read函數。
下面是on_read函數的內容
1: void on_read(int sock, short event, void* arg) 2: { 3: struct event* write_ev; 4: int size; 5: char* buffer = (char*)malloc(MEM_SIZE); 6: bzero(buffer, MEM_SIZE); 7: size = recv(sock, buffer, MEM_SIZE, 0); 8: printf("receive data:%s, size:%d\n", buffer, size); 9: if (size == 0) { 10: event_del((struct event*)arg); 11: free((struct event*)arg); 12: close(sock); 13: return; 14: } 15: write_ev = (struct event*) malloc(sizeof(struct event));; 16: event_set(write_ev, sock, EV_WRITE, on_write, buffer); 17: event_base_set(base, write_ev); 18: event_add(write_ev, NULL); 19: }
第9行,當從socket讀返回0標志對方已經關閉了連接,因此這個時候就沒必要繼續監聽該套接口上的事件,由于EV_READ在on_accept函數里是用EV_PERSIST參數注冊的,因此要顯示的調用event_del函數取消對該事件的監聽。第18-21行與on_accept函數的6-11行類似,當可寫時調用on_write函數,注意第19行將buffer作為參數傳遞給了on_write。這段程序還有比較嚴重的問題,后面進行說明。
on_write函數的實現
1?void?on_write(int?sock,?short?event,?void*?arg) 2?{ 3?????char*?buffer?=?(char*)arg; 4???? send(sock, buffer, strlen(buffer),?0);? 5? 6???? free(buffer)
7?}
on_write函數中向客戶端回寫數據,然后釋放on_read函數中malloc出來的buffer。在很多書合編程指導中都很強調資源的所有權,經常要求誰分配資源、就由誰釋放資源,這樣對資源的管理指責就更明確,不容易出問題,但是通過該例子我們發現在異步編程中資源的分配與釋放往往是由不同的所有者操作的,因此也是比較容易出問題的地方。
其實在on_read函數中從socket讀取數據后程序就可以直接調用write/send接口向客戶回寫數據了,因為寫事件已經滿足,不存在異步不異步的問題,這里進行on_write的異步操作僅僅是為了說明異步編程中資源的管理與釋放的問題,另外一方面,直接調用write/send函數向客戶端寫數據可能導致程序較長時間阻塞在IO操作上,比如socket的輸出緩沖區已滿,則write/send操作阻塞到有可用的緩沖區之后才能進行實際的寫操作,而通過向寫事件注冊on_accept函數,那么libevent會在合適的時間調用我們的callback函數,(比如對于會引起IO阻塞的情況比如socket輸出緩沖區滿,則由libevent設計算法來處理,如此當回調on_accept函數時我們在調用IO操作就不會發生真正的IO之外的阻塞)。注:前面括號中是我個人認為一個庫應該實現的功能,至于libevent是不是實現這樣的功能并不清楚也無意深究。
再來看看前面提到的on_read函數中存在的問題,首先write_ev是動態分配的內存,但是沒有釋放,因此存在內存泄漏,另外,on_read中進行malloc操作,那么當多次調用該函數的時候就會造成內存的多次泄漏。這里的解決方法是對socket的描述字可以封裝一個結構體來保護讀、寫的事件以及數據緩沖區,整理后的完整代碼如下
#include?<sys/socket.h> #include?<sys/types.h> #include?<netinet/in.h> #include?<stdio.h> #include?<event.h> #define?PORT??????? 25341 #define?BACKLOG???? 5 #define?MEM_SIZE??? 1024 struct?event_base*?base; struct?sock_ev { ????struct?event*?read_ev; ????struct?event*?write_ev; ????char*?buffer; }; void?release_sock_event(struct?sock_ev*?ev) { ??? event_del(ev->read_ev); ??? free(ev->read_ev); ??? free(ev->write_ev); ??? free(ev->buffer); ??? free(ev); } void?on_write(int?sock,?short?event,?void*?arg) { ????char*?buffer?=?(char*)arg; ??? send(sock, buffer, strlen(buffer),?0); ??? free(buffer); } void?on_read(int?sock,?short?event,?void*?arg) { ????struct?event*?write_ev; ????int?size; ????struct?sock_ev*?ev?=?(struct?sock_ev*)arg; ??? ev->buffer?=?(char*)malloc(MEM_SIZE); ??? bzero(ev->buffer, MEM_SIZE); ??? size?=?recv(sock, ev->buffer, MEM_SIZE,?0); ??? printf("receive data:%s, size:%d\n", ev->buffer, size); ????if?(size?==?0) { ??????? release_sock_event(ev); ??????? close(sock); ????????return; ??? } ??? event_set(ev->write_ev, sock, EV_WRITE, on_write, ev->buffer); ??? event_base_set(base, ev->write_ev); ??? event_add(ev->write_ev, NULL); } void?on_accept(int?sock,?short?event,?void*?arg) { ????struct?sockaddr_in cli_addr; ????int?newfd, sin_size; ????struct?sock_ev*?ev?=?(struct?sock_ev*)malloc(sizeof(struct?sock_ev)); ??? ev->read_ev?=?(struct?event*)malloc(sizeof(struct?event)); ??? ev->write_ev?=?(struct?event*)malloc(sizeof(struct?event)); ??? sin_size?=?sizeof(struct?sockaddr_in); ??? newfd?=?accept(sock, (struct?sockaddr*)&cli_addr,?&sin_size); ??? event_set(ev->read_ev, newfd, EV_READ|EV_PERSIST, on_read, ev); ??? event_base_set(base, ev->read_ev); ??? event_add(ev->read_ev, NULL); } int?main(int?argc,?char*?argv[]) { ????struct?sockaddr_in my_addr; ????int?sock; ??? sock?=?socket(AF_INET, SOCK_STREAM,?0); ????int?yes?=?1; ??? setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,?&yes,?sizeof(int)); ??? memset(&my_addr,?0,?sizeof(my_addr)); ??? my_addr.sin_family?=?AF_INET; ??? my_addr.sin_port?=?htons(PORT); ??? my_addr.sin_addr.s_addr?=?INADDR_ANY; ??? bind(sock, (struct?sockaddr*)&my_addr,?sizeof(struct?sockaddr)); ??? listen(sock, BACKLOG); ????struct?event?listen_ev; ????base?=?event_base_new(); ??? event_set(&listen_ev, sock, EV_READ|EV_PERSIST, on_accept, NULL); ??? event_base_set(base,?&listen_ev); ??? event_add(&listen_ev, NULL); ??? event_base_dispatch(base); ????return?0
程序編譯的時候要加 -levent 連接選項,以連接libevent的共享庫,但是執行的時候依然爆出如下錯誤:error while loading shared libraries: libevent-1.4.so.2: cannot open shared object file: No such file or directory,?這個是程序找不到共享庫的位置,通過執行echo $LD_LIBRARY_PATH可以看到系統庫的環境變量里沒有我們安裝的路徑,即由--prefix制定的路徑,執行export LD_LIBRARY_PATH=/home/mydir/libevent/lib/:$LD_LIBRARY_PATH將該路徑加入系統環境變量里,再執行程序就可以了。