http://blog.csdn.net/zhuxiaoping54532/article/details/56494313
處理大并發之一
對epoll的理解,epoll客戶端服務端代碼
序言:
該博客是一系列的博客,首先從最基礎的epoll說起,然后研究libevent源碼及使用方法,最后研究nginx和node.js,關于select,poll這里不做說明,只說明其相對于epoll的不足,其實select和poll我也沒用過,因為我選擇了epoll。
說起epoll,做過大并發的估計都不陌生,之前做了個STB的工具,用的就是epoll處理并發,測試1.5W的并發(非簡單的發消息,包括多個服務端和客戶端的處理)是通過的,epoll的功能強大,讓我有一定的體會,在nginx和node.js中利用的就是libevent,而libevent使用的就是epoll,如果系統不支持epoll,會選擇最佳的方式(select,poll或kqueue中的一種)。
基礎資料:
epoll名詞解釋:
看下百科名片是怎么解釋epoll的吧,(http://baike.baidu.com/view/1385104.htm)epoll是Linux內核為處理大批量句柄而作了改進的poll,是Linux下多路復用IO接口select/poll的增強版本,它能顯著減少程序在大量并發連接中只有少量活躍的情況下的系統CPU利用率。
關于具體的說明可以參考網上資料,我這里羅列下epoll相對于其他多路復用機制(select,poll)的優點吧:
epoll優點:
1.?支持一個進程打開大數目的socket描述符。
2.?IO效率不隨FD數目增加而線性下降,傳統的select/poll每次調用都會線性掃描全部的集合,導致效率呈現線性下降。
3.?使用mmap加速內核與用戶空間的消息傳遞。無論是select,poll還是epoll都需要內核把FD消息通知給用戶空間,如何避免不必要的內存拷貝就很重要,在這點上,epoll是通過內核于用戶空間mmap同一塊內存實現的。
select和poll的缺點:
1.?每次調用時要重復地從用戶態讀入參數。
2.?每次調用時要重復地掃描文件描述符。
3.?每次在調用開始時,要把當前進程放入各個文件描述符的等待隊列。在調用結束后,又把進程從各個等待隊列中刪除。
分析libevent的時候,發現一個博客介紹epoll的不錯,網址是:http://blog.csdn.net/sparkliang/article/details/4770655
epoll用的的數據結構和函數
數據結構?
typedef?union?epoll_data?{?
????????????????void?*ptr;?
????????????????int?fd;?
????????????????__uint32_t?u32;?
????????????????__uint64_t?u64;?
????????}?epoll_data_t;?
????????struct?epoll_event?{?
????????????????__uint32_t?events;??????/*?epoll?events?*/?
????????????????epoll_data_t?data;??????/*?user?data?variable?*/?
????????};?
結構體epoll_event?被用于注冊所感興趣的事件和回傳所發生待處理的事件.?
其中epoll_data?聯合體用來保存觸發事件的某個文件描述符相關的數據.?
例如一個client連接到服務器,服務器通過調用accept函數可以得到于這個client對應的socket文件描述符,可以把這文件描述符賦給epoll_data的fd字段以便后面的讀寫操作在這個文件描述符上進行。epoll_event?結構體的events字段是表示感興趣的事件和被觸發的事件可能的取值為:?
EPOLLIN?:表示對應的文件描述符可以讀;?
EPOLLOUT:表示對應的文件描述符可以寫;?
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀?
EPOLLERR:表示對應的文件描述符發生錯誤;?
EPOLLHUP:表示對應的文件描述符被掛斷;?
EPOLLET:表示對應的文件描述符有事件發生;?
ET和LT模式
LT(level?triggered)是缺省的工作方式,并且同時支持block和no-block?socket.在這種做法中,內核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的fd進行IO操作。如果你不作任何操作,內核還是會繼續通知你的,所以,這種模式編程出錯誤可能性要小一點。傳統的select/poll都是這種模型的代表。
ET?(edge-triggered)是高速工作方式,只支持no-block?socket。在這種模式下,當描述符從未就緒變為就緒時,內核通過epoll告訴你。然后它會假設你知道文件描述符已經就緒,并且不會再為那個文件描述符發送更多的就緒通知,直到你做了某些操作導致那個文件描述符不再為就緒狀態了(比如,你在發送,接收或者接收請求,或者發送接收的數據少于一定量時導致了一個EWOULDBLOCK?錯誤)。但是請注意,如果一直不對這個fd作IO操作(從而導致它再次變成未就緒),內核不會發送更多的通知(only?once),不過在TCP協議中,ET模式的加速效用仍需要更多的benchmark確認。
ET和LT的區別在于LT事件不會丟棄,而是只要讀buffer里面有數據可以讓用戶讀,則不斷的通知你。而ET則只在事件發生之時通知。可以簡單理解為LT是水平觸發,而ET則為邊緣觸發。
ET模式僅當狀態發生變化的時候才獲得通知,這里所謂的狀態的變化并不包括緩沖區中還有未處理的數據,也就是說,如果要采用ET模式,需要一直read/write直到出錯為止,很多人反映為什么采用ET模式只接收了一部分數據就再也得不到通知了,大多因為這樣;而LT模式是只要有數據沒有處理就會一直通知下去的.
函數
- 1.?int?epoll_create(int?size);??
- 2.?int?epoll_ctl(int?epfd,?int?op,?int?fd,?struct?epoll_event?*event);??
- 3.?int?epoll_wait(int?epfd,?struct?epoll_event?*events,int?maxevents,?int?timeout);??
詳解:
1.?int?epoll_create(int?size);
創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大。需要注意的是,當創建好epoll句柄后,它就是會占用一個fd值,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll后,必須調用close()關閉,否則可能導致fd被耗盡。
2.?int?epoll_ctl(int?epfd,?int?op,?int?fd,?struct?epoll_event?*event);
epoll的事件注冊函數,它不同與select()是在監聽事件時告訴內核要監聽什么類型的事件,而是在這里先注冊要監聽的事件類型。第一個參數是epoll_create()的返回值,第二個參數表示動作,用三個宏來表示:
EPOLL_CTL_ADD:注冊新的fd到epfd中;
EPOLL_CTL_MOD:修改已經注冊的fd的監聽事件;
EPOLL_CTL_DEL:從epfd中刪除一個fd;
第三個參數是需要監聽的fd,第四個參數是告訴內核需要監聽什么事
3.?int?epoll_wait(int?epfd,?struct?epoll_event?*events,int?maxevents,?int?timeout);
等待事件的產生,參數events用來從內核得到事件的集合,maxevents告之內核這個events有多大,這個?maxevents的值不能大于創建epoll_create()時的size,參數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。該函數返回需要處理的事件數目,如返回0表示已超時。
資料總結這么多,其實都是一些理論,關鍵還在于代碼,下面附上我使用epoll開發的服務端和客戶端,該代碼我會上傳到我的資源里,可以免費下載。鏈接:
如果代碼有bug,可以告訴我。
程序demo
首先對服務端和客戶端做下說明:
我想實現的是客戶端和服務端并發的程序,客戶端通過配置并發數,說明有多少個用戶去連接服務端。
客戶端會發送消息:"Client:?i?send?message?Hello?Server!”,其中i表示哪一個客戶端;收到消息:"Recv?Server?Msg?Content:%s\n"。
例如:
發送:Client:?1?send?message?"Hello?Server!"
接收:Recv?Derver?Msg?Content:Hello,?client?fd:?6
服務端收到后給客戶端回復消息:"Hello,?client?fd:?i",其中i表示服務端接收的fd,用戶區別是哪一個客戶端。接收客戶端消息:"Terminal?Received?Msg?Content:%s\n"
例如:
發送:Hello,?client?fd:?6
接收:Terminal?Received?Msg?Content:Client:?1?send?message?"Hello?Server!"
備注:這里在接收到消息后,直接打印出消息,如果需要對消息進行處理(如果消息處理比較占用時間,不能立即返回,可以將該消息放入一個隊列中,然后開啟一個線程從隊列中取消息進行處理,這樣的話不會因為消息處理而阻塞epoll)。libenent好像對這種有2中處理方式,一個就是回調,要求回調函數,不占用太多的時間,基本能立即返回,另一種好像也是一個隊列實現的,這個還需要研究。
服務端代碼說明:
服務端在綁定監聽后,開啟了一個線程,用于負責接收客戶端連接,加入到epoll中,這樣只要accept到客戶端的連接,就將其add?EPOLLIN到epoll中,然后進入循環調用epoll_wait,監聽到讀事件,接收數據,并將事件修改為EPOLLOUT;反之監聽到寫事件,發送數據,并將事件修改為EPOLLIN。
?
- cepollserver.h??
- #ifndef??C_EPOLL_SERVER_H??
- #define??C_EPOLL_SERVER_H??
- ??
- #include?<sys/epoll.h>??
- #include?<sys/socket.h>??
- #include?<netinet/in.h>??
- #include?<fcntl.h>??
- #include?<arpa/inet.h>??
- #include?<stdio.h>??
- #include?<stdlib.h>??
- #include?<iostream>??
- #include?<pthread.h>??
- ??
- #define?_MAX_SOCKFD_COUNT?65535??
- ??
- class?CEpollServer??
- {??
- ????????public:??
- ????????????????CEpollServer();??
- ????????????????~CEpollServer();??
- ??
- ????????????????bool?InitServer(const?char*?chIp,?int?iPort);??
- ????????????????void?Listen();??
- ????????????????static?void?ListenThread(?void*?lpVoid?);??
- ????????????????void?Run();??
- ??
- ????????private:??
- ????????????????int????????m_iEpollFd;??
- ????????????????int????????m_isock;??
- ????????????????pthread_t???????m_ListenThreadId;//?監聽線程句柄??
- ??
- };??
- ??
- #endif??
cepollserver.cpp
- #include?"cepollserver.h"??
- ??
- using?namespace?std;??
- ??
- CEpollServer::CEpollServer()??
- {??
- }??
- ??
- CEpollServer::~CEpollServer()??
- {??
- ????close(m_isock);??
- }??
- ??
- bool?CEpollServer::InitServer(const?char*?pIp,?int?iPort)??
- {??
- ????m_iEpollFd?=?epoll_create(_MAX_SOCKFD_COUNT);??
- ??
- ????//設置非阻塞模式??
- ????int?opts?=?O_NONBLOCK;??
- ????if(fcntl(m_iEpollFd,F_SETFL,opts)<0)??
- ????{??
- ????????printf("設置非阻塞模式失敗!\n");??
- ????????return?false;??
- ????}??
- ??
- ????m_isock?=?socket(AF_INET,SOCK_STREAM,0);??
- ????if?(?0?>?m_isock?)??
- ????{??
- ????????printf("socket?error!\n");??
- ????????return?false;??
- }??
- ??
- sockaddr_in?listen_addr;??
- ????listen_addr.sin_family=AF_INET;??
- ????listen_addr.sin_port=htons?(?iPort?);??
- ????listen_addr.sin_addr.s_addr=htonl(INADDR_ANY);??
- ????listen_addr.sin_addr.s_addr=inet_addr(pIp);??
- ??
- ????int?ireuseadd_on?=?1;//支持端口復用??
- ????setsockopt(m_isock,?SOL_SOCKET,?SO_REUSEADDR,?&ireuseadd_on,?sizeof(ireuseadd_on)?);??
- ??
- ????if?(?bind?(?m_isock,?(?sockaddr?*?)?&listen_addr,sizeof?(?listen_addr?)?)?!=0?)??
- ????{??
- ????????printf("bind?error\n");??
- ????????return?false;??
- ????}??
- ??
- ????if?(?listen?(?m_isock,?20)?<0?)??
- ????{??
- ????????printf("listen?error!\n");??
- ????????return?false;??
- ????}??
- ????else??
- ????{??
- ????????printf("服務端監聽中...\n");??
- ????}??
- ??
- ????//?監聽線程,此線程負責接收客戶端連接,加入到epoll中??
- ????if?(?pthread_create(?&m_ListenThreadId,?0,?(?void?*?(?*?)?(?void?*?)?)?ListenThread,?this?)?!=?0?)??
- ????{??
- ????????printf("Server?監聽線程創建失敗!!!");??
- ????????return?false;??
- ????}??
- }??
- //?監聽線程??
- void?CEpollServer::ListenThread(?void*?lpVoid?)??
- {??
- ????CEpollServer?*pTerminalServer?=?(CEpollServer*)lpVoid;??
- ????sockaddr_in?remote_addr;??
- ????int?len?=?sizeof?(remote_addr);??
- ????while?(?true?)??
- ????{??
- ????????int?client_socket?=?accept?(pTerminalServer->m_isock,?(?sockaddr?*?)?&remote_addr,(socklen_t*)&len?);??
- ????????if?(?client_socket?<?0?)??
- ????????{??
- ????????????printf("Server?Accept失敗!,?client_socket:?%d\n",?client_socket);??
- ????????????continue;??
- ????????}??
- ????????else??
- ????????{??
- ????????????struct?epoll_event????ev;??
- ????????????ev.events?=?EPOLLIN?|?EPOLLERR?|?EPOLLHUP;??
- ????????????ev.data.fd?=?client_socket;?????//記錄socket句柄??
- ????????????epoll_ctl(pTerminalServer->m_iEpollFd,?EPOLL_CTL_ADD,?client_socket,?&ev);??
- ????????}??
- ????}??
- }??
- ??
- void?CEpollServer::Run()??
- {??
- ????while?(?true?)??
- ????{??
- ????????struct?epoll_event????events[_MAX_SOCKFD_COUNT];??
- ????????int?nfds?=?epoll_wait(?m_iEpollFd,?events,??_MAX_SOCKFD_COUNT,?-1?);??
- ????????for?(int?i?=?0;?i?<?nfds;?i++)??
- ????????{??
- ????????????int?client_socket?=?events[i].data.fd;??
- ????????????char?buffer[1024];//每次收發的字節數小于1024字節??
- ????????????memset(buffer,?0,?1024);??
- ????????????if?(events[i].events?&?EPOLLIN)//監聽到讀事件,接收數據??
- ????????????{??
- ????????????????int?rev_size?=?recv(events[i].data.fd,buffer,?1024,0);??
- ????????????????if(?rev_size?<=?0?)??
- ????????????????{??
- ????????????????????cout?<<?"recv?error:?recv?size:?"?<<?rev_size?<<?endl;??
- ????????????????????struct?epoll_event?event_del;??
- ????????????????????event_del.data.fd?=?events[i].data.fd;??
- ????????????????????event_del.events?=?0;??
- ????????????????????epoll_ctl(m_iEpollFd,?EPOLL_CTL_DEL,?event_del.data.fd,?&event_del);??
- ????????????????}??
- ????????????????else??
- ????????????????{??
- ????????????????????printf("Terminal?Received?Msg?Content:%s\n",buffer);??
- ????????????????????struct?epoll_event????ev;??
- ????????????????????ev.events?=?EPOLLOUT?|?EPOLLERR?|?EPOLLHUP;??
- ????????????????????ev.data.fd?=?client_socket;?????//記錄socket句柄??
- ????????????????????epoll_ctl(m_iEpollFd,?EPOLL_CTL_MOD,?client_socket,?&ev);??
- ????????????????}??
- ????????????}??
- else?if(events[i].events?&?EPOLLOUT)//監聽到寫事件,發送數據??
- ????????????{??
- ????????????????char?sendbuff[1024];??
- ????????????????sprintf(sendbuff,?"Hello,?client?fd:?%d\n",?client_socket);??
- ????????????????int?sendsize?=?send(client_socket,?sendbuff,?strlen(sendbuff)+1,?MSG_NOSIGNAL);??
- ????????????????if(sendsize?<=?0)??
- ????????????????{??
- ????????????????????struct?epoll_event?event_del;??
- ????????????????????event_del.data.fd?=?events[i].data.fd;??
- ????????????????????event_del.events?=?0;??
- ????????????????????epoll_ctl(m_iEpollFd,?EPOLL_CTL_DEL,?event_del.data.fd,?&event_del);??
- ????????????????}??
- ????????????????else??
- ????????????????{??
- ????????????????????printf("Server?reply?msg?ok!?buffer:?%s\n",?sendbuff);??
- ????????????????????struct?epoll_event????ev;??
- ????????????????????ev.events?=?EPOLLIN?|?EPOLLERR?|?EPOLLHUP;??
- ????????????????????ev.data.fd?=?client_socket;?????//記錄socket句柄??
- ????????????????????epoll_ctl(m_iEpollFd,?EPOLL_CTL_MOD,?client_socket,?&ev);??
- ????????????????}??
- ????????????}??
- ????????????else??
- ????????????{??
- ????????????????cout?<<?"EPOLL?ERROR\n"?<<endl;??
- ????????????????epoll_ctl(m_iEpollFd,?EPOLL_CTL_DEL,?events[i].data.fd,?&events[i]);??
- ????????????}??
- ????????}??
- ????}??
- }??
main.cpp
- #include?<iostream>??
- #include?"cepollserver.h"??
- ??
- using?namespace?std;??
- ??
- int?main()??
- {??
- ????????CEpollServer??theApp;??
- ????????theApp.InitServer("127.0.0.1",?8000);??
- ????????theApp.Run();??
- ??
- ????????return?0;??
- }??
客戶端代碼:
說明:測試是兩個并發進行測試,每一個客戶端都是一個長連接。代碼中在連接服務器(ConnectToServer)時將用戶ID和socketid關聯起來。用戶ID和socketid是一一對應的關系。
cepollclient.h
- #ifndef?_DEFINE_EPOLLCLIENT_H_??
- #define?_DEFINE_EPOLLCLIENT_H_??
- #define?_MAX_SOCKFD_COUNT?65535??
- ??
- #include<iostream>??
- #include?<sys/epoll.h>??
- #include?<sys/socket.h>??
- #include?<netinet/in.h>??
- #include?<fcntl.h>??
- #include?<arpa/inet.h>??
- #include?<errno.h>??
- #include?<sys/ioctl.h>??
- #include?<sys/time.h>??
- #include?<string>??
- ??
- using?namespace?std;??
- ??
- /**?
- ?*?@brief?用戶狀態?
- ?*/??
- typedef?enum?_EPOLL_USER_STATUS_EM??
- {??
- ????????FREE?=?0,??
- ????????CONNECT_OK?=?1,//連接成功??
- ????????SEND_OK?=?2,//發送成功??
- ????????RECV_OK?=?3,//接收成功??
- }EPOLL_USER_STATUS_EM;??
- ??
- /*@brief?
- ?*@CEpollClient?class?用戶狀態結構體?
- ?*/??
- struct?UserStatus??
- {??
- ????????EPOLL_USER_STATUS_EM?iUserStatus;//用戶狀態??
- ????????int?iSockFd;//用戶狀態關聯的socketfd??
- ????????char?cSendbuff[1024];//發送的數據內容??
- ????????int?iBuffLen;//發送數據內容的長度??
- ????????unsigned?int?uEpollEvents;//Epoll?events??
- };??
- ??
- class?CEpollClient??
- {??
- ????????public:??
- ??
- ????????????????/**?
- ?????????????????*?@brief?
- ?????????????????*?函數名:CEpollClient?
- ?????????????????*?描述:構造函數?
- ?????????????????*?@param?[in]?iUserCount??
- ?????????????????*?@param?[in]?pIP?IP地址?
- ?????????????????*?@param?[in]?iPort?端口號?
- ?????????????????*?@return?無返回?
- ?????????????????*/??
- ????????????????CEpollClient(int?iUserCount,?const?char*?pIP,?int?iPort);??
- ??
- /**?
- ?????????????????*?@brief?
- ?????????????????*?函數名:CEpollClient?
- ?????????????????*?描述:析構函數?
- ?????????????????*?@return?無返回?
- ?????????????????*/??
- ????????????????~CEpollClient();??
- ??
- ????????????????/**?
- ?????????????????*?@brief?
- ?????????????????*?函數名:RunFun?
- ?????????????????*?描述:對外提供的接口,運行epoll類?
- ?????????????????*?@return?無返回值?
- ?????????????????*/??
- ????????????????int?RunFun();??
- ??
- ????????private:??
- ??
- ????????????????/**?
- ?????????????????*?@brief?
- ?????????????????*?函數名:ConnectToServer?
- ?????????????????*?描述:連接到服務器?
- ?????????????????*?@param?[in]?iUserId?用戶ID?
- ?????????????????*?@param?[in]?pServerIp?連接的服務器IP?
- ?????????????????*?@param?[in]?uServerPort?連接的服務器端口號?
- ?????????????????*?@return?成功返回socketfd,失敗返回的socketfd為-1?
- ?????????????????*/??
- ????????????????int?ConnectToServer(int?iUserId,const?char?*pServerIp,unsigned?short?uServerPort);??
- ??
- /**?
- ?????????????????*?@brief?
- ?????????????????*?函數名:SendToServerData?
- ?????????????????*?描述:給服務器發送用戶(iUserId)的數據?
- ?????????????????*?@param?[in]?iUserId?用戶ID?
- ?????????????????*?@return?成功返回發送數據長度?
- ?????????????????*/??
- ????????????????int?SendToServerData(int?iUserId);??
- ??
- ????????????????/**?
- ?????????????????*?@brief?
- ?????????????????*?函數名:RecvFromServer?
- ?????????????????*?描述:接收用戶回復消息?
- ?????????????????*?@param?[in]?iUserId?用戶ID?
- ?????????????????*?@param?[in]?pRecvBuff?接收的數據內容?
- ?????????????????*?@param?[in]?iBuffLen?接收的數據長度?
- ?????????????????*?@return?成功返回接收的數據長度,失敗返回長度為-1?
- ?????????????????*/??
- ????????????????int?RecvFromServer(int?iUserid,char?*pRecvBuff,int?iBuffLen);??
- ??
- ????????????????/**?
- ?????????????????*?@brief?
- ?????????????????*?函數名:CloseUser?
- ?????????????????*?描述:關閉用戶?
- ?????????????????*?@param?[in]?iUserId?用戶ID?
- ?????????????????*?@return?成功返回true?
- ?????????????????*/??
- ????????????????bool?CloseUser(int?iUserId);??
- ??
- /**?
- ?????????????????*?@brief?
- ?????????????????*?函數名:DelEpoll?
- ?????????????????*?描述:刪除epoll事件?
- ?????????????????*?@param?[in]?iSockFd?socket?FD?
- ?????????????????*?@return?成功返回true?
- ?????????????????*/??
- ????????????????bool?DelEpoll(int?iSockFd);??
- ????????private:??
- ??
- ????????????????int????m_iUserCount;//用戶數量;??
- ????????????????struct?UserStatus?*m_pAllUserStatus;//用戶狀態數組??
- ????????????????int????m_iEpollFd;//需要創建epollfd??
- ????????????????int????m_iSockFd_UserId[_MAX_SOCKFD_COUNT];//將用戶ID和socketid關聯起來??
- ????????????????int????m_iPort;//端口號??
- ????????????????char???m_ip[100];//IP地址??
- };??
- ??
- #endif??
cepollclient.cpp
- #include?"cepollclient.h"??
- ??
- CEpollClient::CEpollClient(int?iUserCount,?const?char*?pIP,?int?iPort)??
- {??
- ????strcpy(m_ip,?pIP);??
- ????m_iPort?=?iPort;??
- ????m_iUserCount?=?iUserCount;??
- ????m_iEpollFd?=?epoll_create(_MAX_SOCKFD_COUNT);??
- ????m_pAllUserStatus?=?(struct?UserStatus*)malloc(iUserCount*sizeof(struct?UserStatus));??
- ????for(int?iuserid=0;?iuserid<iUserCount?;?iuserid++)??
- ????{??
- ????????m_pAllUserStatus[iuserid].iUserStatus?=?FREE;??
- ????????sprintf(m_pAllUserStatus[iuserid].cSendbuff,?"Client:?%d?send?message?\"Hello?Server!\"\r\n",?iuserid);??
- ????????m_pAllUserStatus[iuserid].iBuffLen?=?strlen(m_pAllUserStatus[iuserid].cSendbuff)?+?1;??
- ????????m_pAllUserStatus[iuserid].iSockFd?=?-1;??
- ????}??
- ????memset(m_iSockFd_UserId,?0xFF,?sizeof(m_iSockFd_UserId));??
- }??
- ??
- CEpollClient::~CEpollClient()??
- {??
- ????free(m_pAllUserStatus);??
- }??
- int?CEpollClient::ConnectToServer(int?iUserId,const?char?*pServerIp,unsigned?short?uServerPort)??
- {??
- ????if(?(m_pAllUserStatus[iUserId].iSockFd?=?socket(AF_INET,SOCK_STREAM,0)?)?<?0?)??
- ????{??
- ????????cout?<<"[CEpollClient?error]:?init?socket?fail,?reason?is:"<<strerror(errno)<<",errno?is:"<<errno<<endl;??
- ????????m_pAllUserStatus[iUserId].iSockFd?=?-1;??
- ????????return??m_pAllUserStatus[iUserId].iSockFd;??
- ????}??
- ??
- ????struct?sockaddr_in?addr;??
- ????bzero(&addr,?sizeof(addr));??
- ????addr.sin_family?=?AF_INET;??
- ????addr.sin_port?=?htons(uServerPort);??
- ????addr.sin_addr.s_addr?=?inet_addr(pServerIp);??
- ??
- ????int?ireuseadd_on?=?1;//支持端口復用??
- ????setsockopt(m_pAllUserStatus[iUserId].iSockFd,?SOL_SOCKET,?SO_REUSEADDR,?&ireuseadd_on,?sizeof(ireuseadd_on));??
- ??
- ????unsigned?long?ul?=?1;??
- ????ioctl(m_pAllUserStatus[iUserId].iSockFd,?FIONBIO,?&ul);?//設置為非阻塞模式??
- ??
- ????connect(m_pAllUserStatus[iUserId].iSockFd,?(const?sockaddr*)&addr,?sizeof(addr));??
- ????m_pAllUserStatus[iUserId].iUserStatus?=?CONNECT_OK;??
- ????m_pAllUserStatus[iUserId].iSockFd?=?m_pAllUserStatus[iUserId].iSockFd;??
- ??
- ????return?m_pAllUserStatus[iUserId].iSockFd;??
- }??
- int?CEpollClient::SendToServerData(int?iUserId)??
- {??
- ????sleep(1);//此處控制發送頻率,避免狂打日志,正常使用中需要去掉??
- ????int?isendsize?=?-1;??
- ????if(?CONNECT_OK?==?m_pAllUserStatus[iUserId].iUserStatus?||?RECV_OK?==?m_pAllUserStatus[iUserId].iUserStatus)??
- ????{??
- ????????isendsize?=?send(m_pAllUserStatus[iUserId].iSockFd,?m_pAllUserStatus[iUserId].cSendbuff,?m_pAllUserStatus[iUserId??
- ].iBuffLen,?MSG_NOSIGNAL);??
- ????????if(isendsize?<?0)??
- ????????{??
- ????????????cout?<<"[CEpollClient?error]:?SendToServerData,?send?fail,?reason?is:"<<strerror(errno)<<",errno?is:"<<errno<??
- <endl;??
- ????????}??
- ????????else??
- ????????{??
- ????????????printf("[CEpollClient?info]:?iUserId:?%d?Send?Msg?Content:%s\n",?iUserId,?m_pAllUserStatus[iUserId].cSendbuff??
- );??
- ????????????m_pAllUserStatus[iUserId].iUserStatus?=?SEND_OK;??
- ????????}??
- ????}??
- ????return?isendsize;??
- }??
- int?CEpollClient::RecvFromServer(int?iUserId,char?*pRecvBuff,int?iBuffLen)??
- {??
- ????int?irecvsize?=?-1;??
- ????if(SEND_OK?==?m_pAllUserStatus[iUserId].iUserStatus)??
- ????{??
- ????????irecvsize?=?recv(m_pAllUserStatus[iUserId].iSockFd,?pRecvBuff,?iBuffLen,?0);??
- ????????if(0?>?irecvsize)??
- ????????{??
- ????????????cout?<<"[CEpollClient?error]:?iUserId:?"?<<?iUserId?<<?"RecvFromServer,?recv?fail,?reason?is:"<<strerror(errn??
- o)<<",errno?is:"<<errno<<endl;??
- ????????}??
- ????????else?if(0?==?irecvsize)??
- ????????{??
- ????????????cout?<<"[warning:]?iUserId:?"<<?iUserId?<<?"RecvFromServer,?STB收到數據為0,表示對方斷開連接,irecvsize:"<<ire??
- cvsize<<",iSockFd:"<<?m_pAllUserStatus[iUserId].iSockFd?<<?endl;??
- ????????}??
- ????????else??
- ????????{??
- ????????????printf("Recv?Server?Msg?Content:%s\n",?pRecvBuff);??
- ????????????m_pAllUserStatus[iUserId].iUserStatus?=?RECV_OK;??
- ????????}??
- ????}??
- ????return?irecvsize;??
- }??
- ??
- bool?CEpollClient::CloseUser(int?iUserId)??
- {??
- ????close(m_pAllUserStatus[iUserId].iSockFd);??
- ????m_pAllUserStatus[iUserId].iUserStatus?=?FREE;??
- ????m_pAllUserStatus[iUserId].iSockFd?=?-1;??
- ????return?true;??
- }??
- ??????
- int?CEpollClient::RunFun()??
- {??
- ????int?isocketfd?=?-1;??
- ????for(int?iuserid=0;?iuserid<m_iUserCount;?iuserid++)??
- ????{??
- ????????struct?epoll_event?event;??
- ????????isocketfd?=?ConnectToServer(iuserid,?m_ip,?m_iPort);??
- ????????if(isocketfd?<?0)??
- ????????????cout?<<"[CEpollClient?error]:?RunFun,?connect?fail"?<<endl;??
- ????????m_iSockFd_UserId[isocketfd]?=?iuserid;//將用戶ID和socketid關聯起來??
- ??
- ????????event.data.fd?=?isocketfd;??
- ????????event.events?=?EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP;??
- ??
- ????????m_pAllUserStatus[iuserid].uEpollEvents?=?event.events;??
- ????????epoll_ctl(m_iEpollFd,?EPOLL_CTL_ADD,?event.data.fd,?&event);??
- }??
- while(1)??
- ????{??
- ????????struct?epoll_event?events[_MAX_SOCKFD_COUNT];??
- ????????char?buffer[1024];??
- ????????memset(buffer,0,1024);??
- ????????int?nfds?=?epoll_wait(m_iEpollFd,?events,?_MAX_SOCKFD_COUNT,?100?);//等待epoll事件的產生??
- ????????for?(int?ifd=0;?ifd<nfds;?ifd++)//處理所發生的所有事件??
- ????????{??
- ????????????struct?epoll_event?event_nfds;??
- ????????????int?iclientsockfd?=?events[ifd].data.fd;??
- ????????????cout?<<?"events[ifd].data.fd:?"?<<?events[ifd].data.fd?<<?endl;??
- ????????????int?iuserid?=?m_iSockFd_UserId[iclientsockfd];//根據socketfd得到用戶ID??
- ????????????if(?events[ifd].events?&?EPOLLOUT?)??
- ????????????{??
- ????????????????int?iret?=?SendToServerData(iuserid);??
- ????????????????if(?0?<?iret?)??
- ????????????????{??
- ????????????????????event_nfds.events?=?EPOLLIN|EPOLLERR|EPOLLHUP;??
- ????????????????????event_nfds.data.fd?=?iclientsockfd;??
- ????????????????????epoll_ctl(m_iEpollFd,?EPOLL_CTL_MOD,?event_nfds.data.fd,?&event_nfds);??
- ????????????????}??
- ????????????????else??
- ????????????????{??
- ????????????????????cout?<<"[CEpollClient?error:]?EpollWait,?SendToServerData?fail,?send?iret:"<<iret<<",iuserid:"<<iuser??
- id<<",fd:"<<events[ifd].data.fd<<endl;??
- ????????????????????DelEpoll(events[ifd].data.fd);??
- ????????????????????CloseUser(iuserid);??
- ????????????????}??
- ????????????}??
- else?if(?events[ifd].events?&?EPOLLIN?)//監聽到讀事件,接收數據??
- ????????????{??
- ????????????????int?ilen?=?RecvFromServer(iuserid,?buffer,?1024);??
- ????????????????if(0?>?ilen)??
- ????????????????{??
- ????????????????????cout?<<"[CEpollClient?error]:?RunFun,?recv?fail,?reason?is:"<<strerror(errno)<<",errno?is:"<<errno<<e??
- ndl;??
- ????????????????????DelEpoll(events[ifd].data.fd);??
- ????????????????????CloseUser(iuserid);??
- ????????????????}??
- ????????????????else?if(0?==?ilen)??
- ????????????????{??
- ????????????????????cout?<<"[CEpollClient?warning:]?server?disconnect,ilen:"<<ilen<<",iuserid:"<<iuserid<<",fd:"<<events[??
- ifd].data.fd<<endl;??
- ????????????????????DelEpoll(events[ifd].data.fd);??
- ????????????????????CloseUser(iuserid);??
- ????????????????}??
- ????????????????else??
- ????????????????{??
- ????????????????????m_iSockFd_UserId[iclientsockfd]?=?iuserid;//將socketfd和用戶ID關聯起來??
- ????????????????????event_nfds.data.fd?=?iclientsockfd;??
- ????????????????????event_nfds.events?=?EPOLLOUT|EPOLLERR|EPOLLHUP;??
- ????????????????????epoll_ctl(m_iEpollFd,?EPOLL_CTL_MOD,?event_nfds.data.fd,?&event_nfds);??
- ????????????????}??
- ????????????}??
- ????????????else??
- ????????????{??
- ????????????????cout?<<"[CEpollClient?error:]?other?epoll?error"<<endl;??
- ????????????????DelEpoll(events[ifd].data.fd);??
- ????????????????CloseUser(iuserid);??
- ????????????}??
- ????????}??
- }??
- }??
- ??
- bool?CEpollClient::DelEpoll(int?iSockFd)??
- {??
- ????bool?bret?=?false;??
- ????struct?epoll_event?event_del;??
- ????if(0?<?iSockFd)??
- ????{??
- ????????event_del.data.fd?=?iSockFd;??
- ????????event_del.events?=?0;??
- ????????if(?0?==?epoll_ctl(m_iEpollFd,?EPOLL_CTL_DEL,?event_del.data.fd,?&event_del)?)??
- ????????{??
- ????????????bret?=?true;??
- ????????}??
- ????????else??
- ????????{??
- ????????????cout?<<"[SimulateStb?error:]?DelEpoll,epoll_ctl?error,iSockFd:"<<iSockFd<<endl;??
- ????????}??
- ????????m_iSockFd_UserId[iSockFd]?=?-1;??
- ????}??
- ????else??
- ????{??
- ????????bret?=?true;??
- ??
- ????}??
- ????return?bret;??
- }??
main.cpp
- #include?"cepollclient.h"??
- ??
- int?main(int?argc,?char?*argv[])??
- {??
- ????????CEpollClient?*pCEpollClient?=?new?CEpollClient(2,?"127.0.0.1",?8000);??
- ????????if(NULL?==?pCEpollClient)??
- ????????{??
- ????????????????cout<<"[epollclient?error]:main?init"<<"Init?CEpollClient?fail"<<endl;??
- ????????}??
- ??
- ????????pCEpollClient->RunFun();??
- ??
- ????????if(NULL?!=?pCEpollClient)??
- ????????{??
- ????????????????delete?pCEpollClient;??
- ????????????????pCEpollClient?=?NULL;??
- ????????}??
- ??
- ????????return?0;??
- }??
運行結果:
客戶端:
服務端:
如是轉載,請指明原出處:http://blog.csdn.net/feitianxuxue,謝謝合作!