【網絡編程】事件驅動 reactor 式的服務器(EPOLL機制)

文章目錄

  • 業務拆解
    • 事件驅動的 reactor
    • 總流程圖
  • C 代碼實現
    • 準備工作
      • 編寫頭文件 reactor.h
      • 準備頭文件
      • 準備宏定義
      • 聲明三大模塊函數和基礎的內存變量長度
      • 定義全局變量
      • 定義 EPOLL 實例事件處理的函數與釋放資源的函數
      • 注冊服務器監聽套接字的函數
    • accept_cb 模塊
    • read_cb 模塊
    • send_cb 模塊
    • 服務器代碼
  • 代碼運行效果
  • 總結

推薦一個零聲教育學習教程,個人覺得老師講得不錯,分享給大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK等技術內容,點擊立即學習: https://github.com/0voice 鏈接。

業務拆解

在上一篇 文章 里,我們使用 EPOLL 機制去搭建服務器,能夠低成本高效率的執行 “多路復用網絡 I/O 高并發”。在本篇文章里,我們將要把上一篇問文章的代碼切分開來,使之模塊化,更好地滿足業務特色化需求。比如,最基礎的網絡 I 任務,就是讀取信息,如果業務有特殊需求,我們要特殊處理所讀取的信息,我們就要專門在源代碼的基礎上額外寫多幾個業務讀函數;再比如,最基礎的網絡 O 任務,是發送特定資源信息,如果業務有特殊需求,我們要特殊處理將要發送的信息,比如以 HTTP 報文形式發送給用戶,使得用戶可以在瀏覽器上看到我們所發的內容。

總之,我們要實現一個服務器底座,一個可根據業務內容做簡單擴展的服務器代碼,降低開發難度。我們稱之為 “事件驅動的 reactor”。

事件驅動的 reactor

reactor 顧名思義就是反應器的原理,不同的信號就會有不同的反應,回顧上一篇 文章 的 EPOLL 服務器,主要有三類網絡 I/O 任務

  • 輸入任務1:監聽套接字 sockfd 監聽到來訪 IP,讀取連接信息,分配套接字 clientfd,以負責對應網絡 I/O。
  • 輸入任務2:客戶端發來信息,epoll 調用操作系統內核通知進程處理讀事件。
  • 輸出任務:沿著對應套接字 clientfd 向客戶端發送信息。

也就是說我們要把代碼分成三個模塊,實現三種因事件信號而異的反應,綜合起來就是一個反應器 reactor。我們在每一個模塊中預留一個地方給各自的業務代碼函數。這三個模塊我們分別記成

  1. accept_cb 模塊,
  2. read_cb 模塊,
  3. send_cb 模塊

總流程圖

為了簡化表達,避免像上一篇 文章 那樣寫的那么復雜。我們也分模塊來寫流程圖,各個模塊再給出它自己的流程圖。我們先給總體的流程圖。

Created with Rapha?l 2.3.0開始init_server 占用若干個端口建立服務器套接字 sockfds,使這些 sockfds 處于監聽狀態創建 epoll 實例 epfd(被一個套接字所表示),作為總集創建長度固定的 struct epoll_event 事件數組,作為 epfd 內核中就緒鏈表的謄寫白紙把若干個監聽套接字 sockfds 的關注事件類型設置成 EPOLLIN,以 epoll_event 形式加入實例 epfd調用 epoll_wait 函數, epoll 實例同時監視服務器的監聽套接字 sockfd 以及所注冊所有的來訪 IP 連接的套接字 clientfd,并且統計響應數量 nready對這 nready 個內核響應的套接字-事件,我們逐個來處理這 nready 個套接字-事件是否還沒有處理完?當前所檢查的套接字是否正常(沒出現錯誤 EPOLLERR )?當前所檢查的套接字不是監控套接字嗎?當前所檢查的 套接字-事件 不是 EPOLLIN 事件嗎?當前所檢查的 套接字-事件 不是 EPOLLOUT 事件嗎?關閉套接字,釋放資源,快進到下一個套接字-事件已處理的 套接字-事件 計數加 1send_cb 模塊,發送內容給客戶端recv_cb 模塊,ET 邊沿模式需循環讀取 I/O 文件accept_cb 模塊,注冊新的套接字-事件入 EPOLL 實例關閉套接字,釋放資源,快進到下一個套接字-事件yesnoyesnoyesnoyesnoyesno

C 代碼實現

準備工作

編寫頭文件 reactor.h

此文件內定義了特殊的結構體,記錄了若干個套接字每次 I/O 的內容

#ifndef __SERVER_H__
#define __SERVER_H__#define BUFFER_LENGTH		1024typedef int (*RCALLBACK)(int fd);struct conn {int fd;//	申請緩沖區的大小 1024 --> 2048 ...... 逐步遞增 KB,在 accept_cb 函數中用 malloc 建立,在 recv_cb 和 send_cb 函數中用 realloc 函數調整其大小char *rbuffer;	//	邊沿模式下,我們是不能夠假設我們要讀取總量多少的內容,只能是全部讀取ssize_t rlength;ssize_t rcap;  // 讀緩沖區容量char *wbuffer;	//	邊沿模式下,我們是不能夠假設我們要讀取總量多少的內容,只能是全部寫入ssize_t wlength;ssize_t wcap;  // 寫緩沖區容量RCALLBACK send_callback;union {RCALLBACK recv_callback;RCALLBACK accept_callback;} r_action;};#endif

其中以下代碼結構是頭文件為了保護定義而專門設置的,避免同一個頭文件重復定義

#ifndef __SERVER_H__
#define __SERVER_H__// 定義類型#endif

所定義的 struct conn 類型中有一個成員值得注意,那就是 r_action,它是一個聯合體而非結構體,它有一詞多義性,我們知道 RCALLBACK 是一個回調函數,

typedef int (*RCALLBACK)(int fd);

回調函數可以是任何 同 (返回類型、傳入參數類型)的函數。而在類型 struct conn 中的成員 r_action 就好比 “面向對象編程中的多態”,成員都叫同一個名字,但是具體的定義是不一樣的。下文的主函數(服務器代碼)中會出現兩行類似的代碼,就是來自于回調函數的使用,它們分別是(讀者可以通篇閱讀完這篇文章后再回來)

1、conn_list[sockfd].r_action.accept_callback = accept_cb;
2、conn_list[fd].r_action.recv_callback = recv_cb;
3、conn_list[fd].send_callback = send_cb;
4for (int j = 0; j < MAX_PORTS; j++) {if (connfd == listen_fds[j]) {conn_list[connfd].r_action.accept_callback(connfd);is_listener = 1;break;}}
5if (conn_list[connfd].r_action.recv_callback(connfd) >= 0) { // 返回0表示成功printf("[%ld] RECV: %s\n",conn_list[connfd].rlength, conn_list[connfd].rbuffer);} else {//	連接在 recv 函數處早已釋放continue; // 跳過后續處理}
6、conn_list[connfd].send_callback(connfd);

準備頭文件

#include <errno.h>				// 這是全局變量 errno,用于健壯的讀取功能
#include <stdio.h>	
#include <stdlib.h>				// 動態內存分配
#include <sys/socket.h>			// 創建和管理套接字。綁定地址、監聽連接和接受連接。發送和接收數據。設置和獲取套接字選項。 socket()、connect()、sendto()、recvfrom()、accept()
#include <netinet/in.h>			// 提供了結構體 sockaddr_in
#include <string.h>				// strerror 函數
#include <fcntl.h>				// 用于更改套接字的模式,比如非阻塞模式
#include <unistd.h>				// close 函數,關閉套接字
#include <sys/types.h>			// ssize_t 是一個有符號的整數類型
#include <sys/epoll.h>			// EPOLL 高并發機制
#include <sys/time.h>			// timeval 類型,用于表述等待時間#include "reactor.h"				// 底層數據結構,回調函數、業務端函數的統一聲明

此處注意到我們是導入了剛剛所寫的頭文件 “reactor.h”。

準備宏定義

這個代碼可建立一百多萬個連接,下一篇文章里我將介紹百萬連接的方法。為了釋放這百萬連接的代碼潛力,我們要定義這三個宏。

#define CONNECTION_SIZE			1048576 	// 	1024 * 1024,即我們要測試 1 M 的連接數#define MAX_PORTS				20			//	該服務器占用本地 20 個端口,用以建立網絡 I/O ,更好地實現百萬并發//	這是一個宏操作,是兩個時間戳的相減,表示 “計時”
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)	

MAX_PORTS 是用來支持百萬連接的,CONNECTION_SIZE 是指定最大的連接數。TIME_SUB_MS(tv1, tv2) 宏操作是用來測試服務器的性能,用來計算運行的時長

聲明三大模塊函數和基礎的內存變量長度

// 聲明函數,先寫主函數的代碼,適用于底層鏈接的
int accept_cb(int fd);
int recv_cb(int fd );
int send_cb(int fd);#define BUFFER_LENGTH		1024

無論是網絡輸入還是輸出,每次操作的字節數上限都是 BUFFER_LENGTH 個,慢慢的逐步地有條不紊地接收發送信息,而非一次過把所有內容都吞下。

定義全局變量

當我們定義了某個全局變量,我們所定義所有函數都可以在不傳入該變量的前提下對其進行改變,無通過效仿一般函數的傳入參數前外加取址符,以求在內存層次改變變量本身。

全局變量并不占用線程棧的內存空間,而是存儲在 “靜態數據區” 之中。

int epfd = 0;			//  epoll 事件文件套接字,它申請為一個全局變量
struct timeval begin;	//	聲明一個全局時間戳變量//	本 EPOLL 實例一次最多建立 1 M = 1024*1024 個網絡 I/O 事件(百萬并發);這又被稱之為總體事件集
struct conn conn_list[CONNECTION_SIZE] = {0};	
// 	這是一個全局變量
//	在函數中對全局變量賦值后,函數執行完畢后全局變量的值會保持修改后的值。全局變量的特性決定了它的值在程序運行期間會持久存在,不會因為函數執行結束而重置。

如果讀者不知道靜態數據區的大小是多少,可以通過以下方法查看(我是用 Linux 系統編程的)

qiming@qiming:~/share/CTASK/TCP_test$ ulimit -a
real-time non-blocking time  (microseconds, -R) unlimited
core file size              (blocks, -c) 0
data seg size               (kbytes, -d) unlimited
scheduling priority                 (-e) 0
file size                   (blocks, -f) unlimited
pending signals                     (-i) 15051
max locked memory           (kbytes, -l) 496096
max memory size             (kbytes, -m) unlimited
open files                          (-n) 1024
pipe size                (512 bytes, -p) 8
POSIX message queues         (bytes, -q) 819200
real-time priority                  (-r) 0
stack size                  (kbytes, -s) 8192
cpu time                   (seconds, -t) unlimited
max user processes                  (-u) 15051
virtual memory              (kbytes, -v) unlimited
file locks                          (-x) unlimited

我們注意到

data seg size               (kbytes, -d) unlimited
  • data seg size:表示程序數據段的最大大小,單位為 KB。數據段包括全局變量和靜態變量,這些變量存儲在全局數據區中。

這說明靜態數據區沒有特別的限制,僅由操作系統的內存所限制。故而我們不必擔心 conn_list 這個一百萬多元的超大數組會超規模占用空間。

另外,epfd 將會是下文的 “EPOLL 實例”,begin 是全局的開端時間戳。

定義 EPOLL 實例事件處理的函數與釋放資源的函數

EPOLL 實例事件處理的函數如下。

//	該函數可以實現對文件描述符的 EPOLL 事件注冊、修改或刪除,flag 參數是用來區分情況的
int set_event(int fd, int event, int flag) {//	fd 是目標文件描述符或套接字;//	event 是事件類型,比如讀事件 EPOLLIN;//	flag 是用來區分是注冊還是修改的;1 表示是注冊;0 表示修改;2 表示刪除if (flag == 1) {  // add 1struct epoll_event ev;ev.events = event;ev.data.fd = fd;if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {// 老板感興趣的事件是,前臺小姐姐的工作情況;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注冊入 EPOLL 之中// (即一個套接字 fd 對應一個就緒狀態表 0/1)perror("epoll_ctl failed");close(fd);return -1;}} else if (flag == 2) {	// delete 2struct epoll_event ev;ev.events = event;ev.data.fd = fd;if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev) < 0) {// 老板感興趣的事件是,前臺小姐姐的工作情況;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注冊入 EPOLL 之中// (即一個套接字 fd 對應一個就緒狀態表 0/1)perror("epoll_ctl failed");return -1;}} else if (flag == 0) {  // modify 0struct epoll_event ev;ev.events = event;ev.data.fd = fd;//	EPOLL_CTL_MOD:修改已經注冊到 epoll 實例中的文件描述符 fd 的監視事件。if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {// 老板感興趣的事件是,前臺小姐姐的工作情況;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注冊入 EPOLL 之中// (即一個套接字 fd 對應一個就緒狀態表 0/1)perror("epoll_ctl failed");close(fd);return -1;}} else {printf("Param Error: flag =0, 1, 2\n");}}

我們注意到,該函數有一個 int flag 參數,這其實是一個狀態機,用來標明選擇處理方式。用來區分是注冊還是修改的,抑或刪除;1 表示是注冊;0 表示修改;2 表示刪除。

釋放資源的函數如下。

//	只在連接關閉時釋放資源:事件 event 代號 0 通常表示沒有事件發生
void close_connection(int fd) {set_event(fd, 0, 2);close(fd);if (conn_list[fd].rbuffer) {free(conn_list[fd].rbuffer);conn_list[fd].rbuffer = NULL; // 防止重復釋放}if (conn_list[fd].wbuffer) {free(conn_list[fd].wbuffer);conn_list[fd].wbuffer = NULL;}memset(&conn_list[fd], 0, sizeof(struct conn));
}

緩沖區生命周期必須與連接生命周期一致,在連接關閉前不要釋放緩沖區,在連接關閉后確保完全釋放。

注冊服務器監聽套接字的函數

占用設備的端口,設置監聽套接字。

//	創建監聽套接字 sockfd,并且綁定本機 IP 和端口 port
// (遠程IP, 遠程PORT, 本地IP, 本地PORT, 協議) 與 網絡 I/O 一對一成型一個 sockfd 
int init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0servaddr.sin_port = htons(port); // 0-1023, // 設置端口復用// 當服務器主動關閉 TCP 連接時,會進入 TIME_WAIT 狀態(通常持續 2MSL,約 1-4 分鐘)。在此期間,操作系統會保留該端口綁定記錄,防止延遲到達的數據包干擾新連接。// 問題:服務器崩潰或重啟后嘗試重新綁定端口時,會因 TIME_WAIT 狀態導致 bind() 失敗(錯誤:Address already in use) // 以下處理措施:能避免再次啟用服務器程序時,系統的宕機int reuse = 1;if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {//  如果未設置 SO_REUSEADDR,導致端口被占用后無法立即重用(TIME_WAIT 狀態)printf("setsockopt failed: %s\n", strerror(errno));return -1;}// setsockopt 是一個用于設置套接字選項的系統調用函數。// 第二項參數 level:指定選項所在的協議級別。常見的值包括:SOL_SOCKET:表示套接字級別的選項。IPPROTO_TCP:表示 TCP 協議級別的選項。IPPROTO_IP:表示 IP 協議級別的選項。IPPROTO_IPV6:表示 IPv6 協議級別的選項。// 第三項參數 optname:指定要設置的選項名稱。不同的協議級別有不同的選項名稱。例如:在 SOL_SOCKET 級別,常見的選項包括 SO_REUSEADDR、SO_KEEPALIVE、SO_LINGER 等。在 IPPROTO_TCP 級別,常見的選項包括 TCP_NODELAY 等。// 第四項參數 optval:指向包含選項值的內存區域。選項值的類型和大小取決于 optname// 第五項參數 optlen:指定 optval 的長度(以字節為單位)。if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {printf("bind failed: %s\n", strerror(errno));	//	strerror 函數定義在 <string.h> 中,而 errno 是 <errno.h> 的全局變量return -1;}//	一次監聽 10 個來訪 IP:PORT//	printf("listen finshed: %d\n", sockfd); // 3 if (listen(sockfd, 10) < 0) {// 將套接字設置為被動模式:套接字從主動連接模式(用于客戶端)轉換為被動監聽模式(用于服務器)。我們可以把這個 socket 想象成公司的前臺小姐。// 5:是監聽隊列的最大長度,表示系統可以為該套接字排隊的最大未完成連接數。當新的連接請求到達時,如果隊列已滿,新的連接請求將被拒絕。// 返回值:成功,返回 0。失敗,返回 -1,并設置 errno 以指示錯誤原因。printf("listen finshed: %d\n", sockfd); return -1;}return sockfd;}

accept_cb 模塊

首先給出的是,針對已經在 EPOLL 實例中注冊的套接字,初始化其在全局變量 conn_list 對應位置上的內存配置。

//	針對已經注冊的 clientfd	(當然也有可能注冊不成功,要注意處理失敗) 在事件總集的對應 fd 上的綜合情況進行初始化
int event_register(int fd, int event) {if (fd < 0) return -1;								//	這里是用來應對 accept 函數調用失敗的錯誤conn_list[fd].fd = fd;conn_list[fd].r_action.recv_callback = recv_cb;conn_list[fd].send_callback = send_cb;conn_list[fd].rbuffer = NULL;		//	重置讀緩沖區conn_list[fd].rlength = 0;conn_list[fd].rcap = 0;conn_list[fd].wbuffer = NULL; // 確保初始化為NULLconn_list[fd].wcap = 0;conn_list[fd].wlength = 0;set_event(fd, event, 1);							//  標志 1 表示當前是對 fd 的事件注冊而非修改
}

緊接著,當服務器的監聽套接字監聽到了來訪 IP 時,分配出新的套接字以對接接下來對該 IP 的 I/O 任務。

// listenfd(sockfd) --> EPOLLIN --> accept_cb	根據情況使用回調函數————監控套接字 sockfd 的讀事件是 accept 注冊 clientfd
int accept_cb(int fd) {struct sockaddr_in  clientaddr;socklen_t len = sizeof(clientaddr);//	這兩個變量是專門用來注冊 clientfd 的int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);// accept 會因為無輸入而阻塞(因為 sockfd 是默認的阻塞模式),本設計就是防止其阻塞(利用條件判斷繞開阻塞)if (clientfd < 0) {printf("accept errno: %d --> %s\n", errno, strerror(errno));return -1;}// 動態分配讀入內存的空間conn_list[clientfd].rbuffer = malloc(BUFFER_LENGTH); // 先行分配1024字節的內存,它是讀取內容最終歸宿if (conn_list[clientfd].rbuffer) {memset(conn_list[clientfd].rbuffer, 0, BUFFER_LENGTH); // 初始化為0}conn_list[clientfd].rlength = 0;conn_list[clientfd].rcap = BUFFER_LENGTH;  //  記錄容量if (conn_list[clientfd].rbuffer == NULL) {perror("Malloc ReadBuffer Error");return -1;}//  使用 `EPOLLET` 必須配合非阻塞套接字,否則可能阻塞線程int flags = fcntl(clientfd, F_GETFL, 0);        // F_GETFL 是獲取標志的命令fcntl(clientfd, F_SETFL, flags | O_NONBLOCK);   // 要注意 “|” 是按位或操作,能進行掩碼疊加;F_SETFL 是設置文件狀態標志,在原來的基礎上增加非阻塞功能//  函數 fcntl() 作用:讀取 clientfd 當前的所有文件狀態標志//  返回值:包含位掩碼的整數,表示當前所有設置的標志//          O_RDONLY:只讀模式 (0);O_WRONLY:只寫模式 (1);O_RDWR:讀寫模式 (2);//          O_NONBLOCK:非阻塞模式 (04000);O_APPEND:追加模式 (02000)event_register(clientfd, EPOLLIN | EPOLLET);  // | EPOLLET 是 clientfd 使用邊沿觸發模式//	這是用于說明情況的if ((clientfd % 1000) == 0) {		//	為了使得打印不過分密集,每一千個 I/O 就打印一次struct timeval current;gettimeofday(&current, NULL);	//	獲取當前時間戳,定義在 <sys/time.h> 頭文件中int time_used = TIME_SUB_MS(current, begin);	//	獲取時間差memcpy(&begin, &current, sizeof(struct timeval));printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);}return 0;
}

read_cb 模塊

這個函數是有相當多細節的。首先,我們用于計數的變量 count 所用的數字類型是 ssize_t 類型的(在頭文件 <sys/types.h> 中定義的),是一個ssize_t 是一個有符號的整數類型,在 64 位操作系統中,范圍是 [-263,263-1],是一個相當大的數。這說明這個服務器是有被用戶上傳視頻這種大型數據的能力的。

//	根據情況使用回調函數————普通套接字 clientfd 的讀事件是 recv_cb  讀取內存
//	在邊沿模式下,該函數要循環執行
int recv_cb(int fd) {ssize_t count=0;char buffer[BUFFER_LENGTH] = {0}; 		//struct conn* c = &conn_list[fd];  // 使用局部變量簡化代碼if (conn_list[fd].rbuffer != NULL) {	//	連接復用,二次收信息的時候free(conn_list[fd].rbuffer);conn_list[fd].rbuffer = NULL; // 防止重復釋放}conn_list[fd].rlength =0;conn_list[fd].rcap = BUFFER_LENGTH; while (1) {count = recv(fd, buffer, BUFFER_LENGTH, 0);	//	讀取固定長度的內容,0 代表以阻塞模式讀取數據// 因為 clientfd 不是阻塞模式, recv 不會因無輸入而阻斷,而是會立即返回 -1// 函數 recv: 讀取固定字節的內容。// 返回值 > 0:表示成功接收了數據,返回值表示實際接收到的字節數。// 返回值 == 0:表示對端已經關閉了連接(TCP連接的正常關閉)。這是TCP協議的對端關閉連接的標志。// 返回值 == -1:表示無信息可讀。并設置錯誤碼為 EAGAIN 或 EWOULDBLOCK。這表示當前沒有數據可讀,但連接仍然有效。// 當調用 recv 時,會觸發用戶態到內核態的切換,內核負責從套接字接收緩沖區復制數據到用戶提供的緩沖區,內核會更新接收緩沖區的狀態(如移除已讀取數據)if (count == 0) { 	// disconnectclose_connection(fd); // 連接斷開必須要清理套接字資源,前面已經清理過一次了,避免在次釋放printf("client disconnect: %d\n", fd);//	當客戶端主動發來斷開連接的請求時,return -1;} else if (count < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) {// 無數據可讀,退出循環return 0;}else {printf("recv error: %s\n", strerror(errno));close_connection(fd);return -2;}} else {// 使用結構體中的長度和容量字段ssize_t new_length = conn_list[fd].rlength + count;// 確保緩沖區存在,這是下文能成功使用 memcpy 的原因if (!conn_list[fd].rbuffer) {conn_list[fd].rbuffer = malloc(BUFFER_LENGTH);if (!conn_list[fd].rbuffer) {perror("malloc failed");close_connection(fd);return -3;}conn_list[fd].rcap = BUFFER_LENGTH;conn_list[fd].rlength = 0;}// 動態擴容if (new_length > conn_list[fd].rcap) {ssize_t new_cap = conn_list[fd].rcap * 2;char *new_buf = realloc(conn_list[fd].rbuffer, new_cap);if (!new_buf) {perror("realloc failed");close_connection(fd);return -3;}conn_list[fd].rbuffer = new_buf;conn_list[fd].rcap = new_cap;}// 復制數據memcpy(conn_list[fd].rbuffer + conn_list[fd].rlength, buffer, count);	//	我在這個地方多次出錯,memcpy 用的不好,我在此處爆棧了,conn_list[fd].rlength = new_length;// 安全打印(指定長度)// printf("[%zd]RECV: %.*s\n", count, (int)count, buffer);	//	這是特殊的占位符用法}memset(buffer,0,BUFFER_LENGTH); 	// 重置讀取的緩沖區,他是固定長度的,用于接收 fd 里面的字節,每次只讀取一點點}return 0;
}

這個函數先是對初始化 conn_list 的讀緩沖區和讀緩沖區的長度和容量進行初始化,而后采用邊沿讀取的方法,無限循化地讀取,直至讀完。邊沿觸發模式下,事件只在狀態發生變化時通知一次,不會因為緩沖區中持續有數據而反復觸發。這大大減少了epoll_wait的調用次數,降低了內核與用戶態之間的上下文切換開銷,從而顯著提高了性能。邊沿觸發模式通常與非阻塞I/O搭配使用。在非阻塞模式下,程序會盡可能多地讀取或寫入數據,直到遇到EAGAINEWOULDBLOCK錯誤為止。這種模式下,邊沿觸發能夠更好地發揮其優勢。

  • 當用戶遠程斷開連接時,會自動給服務器發送信息,觸發 EPOLL 的讀事件 event,并且使用 recv 函數后,返回值是 0
  • 當文件讀取完畢后,由于 clientfd 被設置成非阻塞模式,recv 不會因無輸入而阻斷,而是會立即返回 -1,對應的錯誤是 EAGAINEWOULDBLOCK 。但這是正常現象。換句話說,在阻塞模式下,是不存在這個錯誤的。
  • 我們在讀取 I/O 文件內容時,是使用了動態內存擴充的方法,這使得我們的服務器可以接受超大型的數據。

send_cb 模塊

在處理網絡輸出(即 EPOLLOUT 事件)時,套接字 clientfd 的模式被函數set_event 調整為阻塞模式,可保證發送到內核緩沖區(即套接字對應的 I/O 文件上),但不保證完整傳輸到對端(受網絡狀況影響),因而還需要循環發送。我們還需設置超時處理,保證可以全部發送。

//	掛起等待,寫事件就緒,處理 send 函數的 Bug
void wait_for_socket_writable(int sockfd) {// 需提前創建epoll實例并注冊事件int epoll_fd_1 = epoll_create1(0);struct epoll_event ev;ev.events = EPOLLOUT;  // 監聽可寫ev.data.fd = sockfd;epoll_ctl(epoll_fd_1, EPOLL_CTL_ADD, sockfd, &ev);// 等待事件觸發,無限阻塞epoll_wait(epoll_fd_1, &ev, 1, -1);close(epoll_fd_1); // 添加關閉
}//	確保數據全部發送
//	即使使用阻塞 sockfd 也必須循環發送!因為阻塞模式只保證發送到內核緩沖區,不保證完整傳輸到對端(受網絡狀況影響)。
ssize_t send_all(int sockfd, const void *buf, size_t len, int flags) {ssize_t total_sent = 0;      // 已發送字節數const char *ptr = (const char *)buf;  // 移動指針指向未發送數據//	即使使用阻塞 sockfd 也必須循環發送!因為阻塞模式只保證發送到內核緩沖區,不保證完整傳輸到對端(受網絡狀況影響)。while (total_sent < len) {// 嘗試發送剩余數據ssize_t n = send(sockfd, ptr, len - total_sent, flags);if (n < 0) {// 錯誤處理(重點!)if (errno == EINTR) continue;   // 信號中斷:重試if (errno == EAGAIN || errno == EWOULDBLOCK) {// 阻塞模式:等待可寫(需配合 select/poll/epoll)wait_for_socket_writable(sockfd);continue;}return -1;  // 其他錯誤(如連接斷開)} else if (n == 0) {printf("client disconnect: %d\n", sockfd);close_connection(sockfd);return total_sent; // 連接關閉(部分發送)}// 更新狀態total_sent += n;ptr += n;  // 移動指針到未發送數據位置}return total_sent;  // 返回實際發送的字節數(應等于 len)
}

綜合以上兩個函數,我們還指示了業務處理位置。

//	根據情況使用回調函數————普通套接字 clientfd 的輸出事件是 send_cb  發送內容
//	在邊沿模式下,該函數要循環執行
int send_cb(int fd) {if (conn_list[fd].wbuffer != NULL) {	//	連接復用,二次發送信息的時候,寫緩沖區的初始化free(conn_list[fd].wbuffer);	conn_list[fd].wbuffer = NULL; // 防止重復釋放}conn_list[fd].wlength =0;///////////////////////////////////////		響應操作的業務端(開始)	 ////////////////////////////////////////////////////////////////////////////////		響應操作的業務端(結束)	 /////////////////////////////////////////ssize_t count = 0;if (conn_list[fd].wlength != 0) {count = send_all(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);}printf("SEND: %zd\n", count);return count;
}

服務器代碼

結合前面的流程圖,我們可以給出這個服務器的代碼

int main() {unsigned short port = 2000;	// 	端口 portepfd = epoll_create(1);		//	epfd 已經被聲明為一個全局變量int listen_fds[MAX_PORTS] = {0}; // 存儲所有監聽socketint i = 0;for (i = 0; i < MAX_PORTS; i++) {int sockfd = init_server(port + i);listen_fds[i] = sockfd;conn_list[sockfd].fd = sockfd;  //  使用fd作為索引conn_list[sockfd].r_action.accept_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);}gettimeofday(&begin, NULL);		//	獲取最初的時間戳,begin 是全局變量while (1) { // mainloop	服務器的根本struct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, 5);	//	5 指代等待 5 毫秒,如果 I/O 響應滿員的話,立即返回//	該通知函數是需要調用系統內核的,是需要花費成本的,如果只是通知某些內容沒有讀完而調用,是極其得不償失的,這是我們需要用到邊沿觸發的原因,只是編程難度更大了//	我們要重視操作系統內核的調動,系統 I/O 并不全體現在代碼之上,而代碼要考慮操作系統內核可能出現的情況;編寫代碼要看到代碼之外的東西int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;int is_listener = 0;	//	狀態機-重置// 當連接異常斷開時未清理資源,添加錯誤處理:if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) {	// 要注意 “|” 是按位或操作,能進行掩碼疊加;“&” 是按位與操作//  EPOLLHUP 表示對應的文件描述符被掛斷。EPOLLERR 表示對應的文件描述符發生錯誤。close_connection(connfd);continue;}// 檢查是否為監聽套接字for (int j = 0; j < MAX_PORTS; j++) {if (connfd == listen_fds[j]) {conn_list[connfd].r_action.accept_callback(connfd);is_listener = 1;break;}}if (is_listener) continue;	//	狀態機-進入下一個環節// 處理讀事件if (events[i].events & EPOLLIN) {// 這是監控到了除 sockfd 以外的套接字// ET 邊沿模式需循環讀取,原因是要保證網絡 I/O 所有內容都被讀取!if (conn_list[connfd].recv_callback(connfd) >= 0) { // 返回0表示成功printf("[%ld] RECV: %s\n",conn_list[connfd].rlength, conn_list[connfd].rbuffer);} else {//	連接在 recv 函數處早已釋放continue; // 跳過后續處理}///////////////////////		讀操作的業務端(start)     //////////////////////////	至此,客戶端的請求報文全部寫完了,寫入了 rbuffer 之中。我們要利用這個內存去執行業務操作//	實現 HTTP 請求,在代碼里面,該函數只是形式上存在,并不是重點//	http_request(&conn_list[fd]);//	WebSocket 協議的請求// 	ws_request(&conn_list[fd]);///////////////////////		讀操作的業務端(start)     ////////////////////////set_event(connfd, EPOLLOUT, 0);} else if (events[i].events & EPOLLOUT) {//	這里必須注意 EPOLLOUT 不是邊沿事件觸發模式,我們通常只把響應報文的 header 寫入 wbuffer 中,長度是固定的,因此水平出發即可。//	至于那大段大段的文件資源傳輸,則是通過文件描述符之間操作完成,跳過緩沖區讀寫// send 會因無輸輸出而阻斷// 函數 send: 發送固定字節的內容。// 返回值 > 0:表示成功發送了數據,返回值表示實際發送的字節數。// 返回值 == -1:表示發送操作失敗。錯誤原因可以通過 errno 獲取conn_list[connfd].send_callback(connfd);//	如果是為了測試百萬并發,則用這個,不要把 fd 關閉set_event(connfd, EPOLLIN | EPOLLET, 0);	// 這次輸出發送事件結束了;改為 “邊沿讀寫模式”,執行 epoll_ctl 函數,系統內核會直接把 fd 加載到 epoll 的就緒集之中}	else {printf("Unknown event on clientfd: %d, errno:%d\n", connfd, errno);close_connection(connfd);}}}}

我們注意到,對于模塊函數 accept_cb 的使用,我們借助了 “狀態機” 的思路,即代碼中的 is_listener,讓情況得以分類。

代碼運行效果

代碼編譯

qiming@qiming:~/share/CTASK/TCP_test$ gcc -o reactor reactor.c

程序執行,一開始沒鏈接的時候,程序并沒有掛起,而是作無意義的 while 死循環,原因是代碼中的 epoll_wait(epfd, events, 1024, 5) 并非阻塞運行,而是等待 5 毫秒后運行下一行代碼。

qiming@qiming:~/share/CTASK/TCP_test$ ./reactor

NetAssist 遠程連接
在這里插入圖片描述
發送信息(我們并沒發送什么東西,只是象征性的設置,讀者可自行設置,輸出)

qiming@qiming:~/share/CTASK/TCP_test$ ./reactor
[13244] RECV: The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows no intention of putting them off. In contrast, the man on the right exhibits a desire for delaying his task and holds that he doesn't go cracking until the deadline.
As the thought-provoking pictures intend to mirror, the idle boy as mentioned above is not likely to finish his assignments in the end, any more than a man can attain great achievement if he is accustomed to putting tasks off. For one thing, the habits shape the turns of our attitudes and behaviors of how to deal with our plans and tasks. In the course of pursuing our goals, we may face a great deal of temp-
tations and adversities, and a good habit of self-discipline can direct us to make right decisions and refrain from indulging us with entertainment. For another, if we fail to be self-disciplined, we will be deprived of the opportunities for forging the brilliant traits, like mental maturity, fortitude, creativity and so on, which will render us considerable autonomy and lead us to be successful.
Personally, we can't underscore the significance of the good habits too much. It is advisable that we should concentrate on our daily tasks through strict time management and enjoy the course of striving.
The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows no intention of putting them off. In contrast, the man on the right exhibits a desire for delaying his task and holds that he doesn't go cracking until the deadline.
As the thought-provoking pictures intend to mirror, the idle boy as mentioned above is not likely to finish his assignments in the end, any more than a man can attain great achievement if he is accustomed to putting tasks off. For one thing, the habits shape the turns of our attitudes and behaviors of how to deal with our plans and tasks. In the course of pursuing our goals, we may face a great deal of temp-
tations and adversities, and a good habit of self-discipline can direct us to make right decisions and refrain from indulging us with entertainment. For another, if we fail to be self-disciplined, we will be deprived of the opportunities for forging the brilliant traits, like mental maturity, fortitude, creativity and so on, which will render us considerable autonomy and lead us to be successful.
Personally, we can't underscore the significance of the good habits too much. It is advisable that we should concentrate on our daily tasks through strict time management and enjoy the course of striving.The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows n

我發送了若干段考研英語一的大作文(同一篇文章多次復制粘貼),模擬客戶端發送的內容非常多,多到超過了一開始設置的讀緩沖區容量 BUFFER_LENGTH,即 1 KB 的內容。我們注意到

[13244] RECV: The drawings ... 略

共發送了 1 萬多個字節的內容,說明我們的動態內存擴充的代碼是有效的。

而且接受完信息后,還可以發送信息,說明事件轉化的設計也是有效的(在輸出的最末端,我們可在命令行處注意到)。

SEND: 0

這樣一來,我們這個服務器是可以完成一個準網絡 I/O 事務的。完整的網絡 I/O 事務是

客戶端服務器第一步,網絡請求第二步,網絡響應第三步,客戶端處理響應客戶端服務器

總結

本篇文章,對上一篇 文章 的 EPOLL 服務器作出模塊化的升級,讓代碼更有組織度的同時,還有以下的特性

  1. 定義了 conn 類型,它能記錄對應網絡 I/O 文件的輸入輸出內容,而且還記錄了對應套接字的輸入和輸出行為。類似于 “多態” 的概念,同類型的兩個變量內同一成員名字有不同含義。讓命名變得簡單了許多。
  2. 對任何套接字(無論是否為監聽套接字)設置了根據事件信號而觸發的網絡 I/O 行為。這就是 “Reactor” 這個名字的由來。
  3. 對于 recv_cb 函數設計了可動態擴充讀緩沖區的功能,能夠接收超大型數據。
  4. 函數 set_event 能夠有效的進行事件轉化,使之同一個套接字的事件能夠在 EPOLLIN | EPOLLETEPOLLOUT 之間來回轉換,剛好適應了網絡 I/O 的事務模式。

在下一篇文章里,我們針對這個事件驅動 reactor 式的服務器(還是這份代碼)測試其百萬并發的能力。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/87439.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/87439.shtml
英文地址,請注明出處:http://en.pswp.cn/web/87439.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何做好云服務器密碼管理

一、設置強密碼 強密碼就像是給云服務器上了一把“超級鎖”。專家建議&#xff0c;一個強密碼應該包含大寫字母、小寫字母、數字和特殊字符&#xff0c;長度至少在 12 位以上。比如說&#xff0c;“Abc12345678”就比簡單的“123456”要安全得多。有數據顯示&#xff0c;簡單密…

《新消費模式與消費者權益保護研討會》課題研討會在北京順利召開

近期&#xff0c;《新消費模式與消費者權益保護研討會》課題研討會在北京召開。來自市場監管、政法、宏觀管理等部門專家參會&#xff0c;聚焦《消費者權益保護法》《關于以新業態新模式引領新型消費加快發展的意見》等文件精神&#xff0c;探討激發市場主體活力、促進新型消費…

Gradio全解13——MCP協議詳解(6)——MCP服務器構建、測試與示例大全

Gradio全解13——MCP協議詳解&#xff08;6&#xff09;——MCP服務器構建、測試與示例大全第13章 MCP協議詳解13.6 MCP服務器構建、測試與示例大全13.6.1 開發MCP天氣服務器1. 天氣服務器概述2. 安裝Node.js并設置環境3. 構建服務器13.6.2 安裝Claude for Desktop1. 安裝Claud…

Windows 11 24H2 專業版/家庭版安裝教程(2025年6月更新版)- U盤啟動盤制作+詳細步驟

準備U盤啟動盤? 下載個叫「Rufus」的免費小工具&#xff08;百度搜就行&#xff09;。插入一個至少8GB的空U盤&#xff08;U盤會被清空&#xff0c;提前備份資料&#xff01;&#xff09;。打開Rufus&#xff0c;選你的U盤&#xff0c;ISO文件選你下載的那個 zh-cn_windows_1…

mac電腦wireshark快速實現http接口抓包

wireshark介紹 Wireshark 是一款功能強大的網絡協議分析工具&#xff0c;可以用來抓取網絡中的數據包&#xff0c;包括 HTTP 請求和響應。 wireshark安裝 安裝下載官網 https://www.wireshark.org/download.html&#xff0c;根據個人電腦環境下載安裝wireshark使用 1配置網卡2選…

Softhub軟件下載站實戰開發(十二):軟件管理編輯頁面實現

文章目錄 Softhub軟件下載站實戰開發&#xff08;十二&#xff09;&#xff1a;軟件管理編輯頁面實現?功能概述 &#x1f4cb;編輯頁面實現 &#x1f6e0;?1. 頁面結構設計2. aieEditor集成 &#x1f31f;初始化配置編輯器功能 3. 大整數處理 &#x1f522;4. 封面圖片上傳 &…

微服務外聯Feign調用:第三方API調用的負載均衡與容災實戰

01Feign 簡介 Feign 是 Spring Cloud Netflix 中的 聲明式 HTTP 客戶端&#xff0c;它如同一位貼心的信使&#xff0c;幫我們化繁為簡&#xff0c;讓服務間的調用變得輕松又高效。 Feign 的核心優勢在于&#xff1a;。 ? 聲明式調用&#xff1a;開發者只需定義接口和注解&a…

k8s pod調度基礎

目錄 一&#xff1a;replication controller和replicaset 1&#xff1a;replication controller replication controller的使用示例。 2&#xff1a;標簽與標簽選擇器 &#xff08;1&#xff09;標簽 &#xff08;2&#xff09;標簽選擇器 &#xff08;3&#xff09;標簽…

學習者的Python項目靈感

一、實用工具類 - 文件批量重命名工具 用 os 模塊實現按規則&#xff08;如添加日期、序號、替換關鍵詞&#xff09;批量重命名文件&#xff0c;適合處理大量圖片/文檔。 - 簡易待辦事項管理器&#xff08;To-Do List&#xff09; 用 tkinter 或 PyQt 做GUI界面&#xff0c;…

gRPC服務發現

基于 etcd 實現的服務發現&#xff0c;按照非規范化的 etcd key 實現&#xff0c;詳細見代碼注釋。 package discoveryimport ("context""encoding/json""fmt""go.etcd.io/etcd/api/v3/mvccpb"clientv3 "go.etcd.io/etcd/client/…

基于Linux的Spark本地模式環境搭建實驗指南

一、實驗目的 掌握Spark本地模式的安裝與配置方法驗證Spark本地環境是否搭建成功了解Spark基本操作和運行原理 二、實驗環境準備 操作系統&#xff1a;Linux&#xff08;推薦ubuntu&#xff09;Java環境&#xff1a;JDK 1.8或以上版本內存&#xff1a;至少4GB&#xff08;推…

數學建模_時間序列

什么是時間序列時間序列預測方法/模型條件&#xff1a;非白噪音平穩平穩性評估不平穩變成平穩然后用ARIMA模型確定p,qAR模型(ARMA特例)MA模型(ARMA特例)ARMA模型(普適)灰色模型神經網絡/LSTM組合預測模型向量數據預測結果和為1的情況什么是時間序列 省略具體圖形例子 時間序列…

linux用rpm包升級sudo包為sudo-1.9.17-2版本

rpm下載地址&#xff1a; https://www.sudo.ws/dist/packages/1.9.17p1/ 備注&#xff1a;其他壓縮包下載地址&#xff1a;https://www.sudo.ws/download.html sudo-1.9.17-2.el7.x86_64.rpm 檢查一下&#xff0c;本地sudo版本&#xff0c;執行&#xff1a;sudo -V 或者sudo -…

【開源項目】一款真正可修改視頻MD5工具視頻質量不損失

文章目錄 視頻MD5修改工具 ???? 目錄? 功能特點?? 系統要求??? 設計架構?? 技術原理?? 核心代碼1. 視頻MD5修改核心邏輯2. 前端異步處理代碼3. 錯誤處理與日志記錄?? 安裝方法方法一:直接下載方法二:使用本地服務器?? 使用教程基本使用步驟高級使用技巧??…

Day05: Python 中的并發和并行(1)

理解 Python 中的線程和進程 理解線程和進程是實現在 Python 中并發和并行的基礎。這種知識使你能夠編寫能夠看似同時執行多個任務的程序&#xff0c;從而提高性能和響應能力。本課程將深入探討線程和進程的核心概念、它們的區別&#xff0c;以及它們如何為更高級的并發技術奠…

Spring Boot 集成 MinIO 實現分布式文件存儲與管理

Spring Boot 集成 MinIO 實現分布式文件存儲與管理 一、MinIO 簡介 MinIO 是一個高性能的分布式對象存儲服務器&#xff0c;兼容 Amazon S3 API。它具有以下特點&#xff1a; 輕量級且易于部署高性能&#xff08;讀寫速度可達每秒數GB&#xff09;支持數據加密和訪問控制提供…

從小白入門,基于Cursor開發一個前端小程序之Cursor 編程實踐與案例分析

Cursor 編程實踐與案例分析 Cursor 編程實踐與案例分析 1. 什么是 Cursor&#xff1f; Cursor 是一款面向開發者的 AI 編程助手&#xff0c;集成于本地 IDE&#xff0c;支持自然語言與代碼的無縫協作。它不僅能自動補全、重構、查找代碼&#xff0c;還能理解業務上下文&#…

一、如何用MATLAB畫一個三角形 代碼

一、如何用MATLAB畫一個三角形 代碼在MATLAB中繪制三角形可以通過指定三個頂點的坐標并使用 fill 或 patch 函數實現。以下是詳細代碼示例&#xff1a;方法1&#xff1a;使用 fill 函數&#xff08;簡單填充&#xff09;% 定義三角形的三個頂點坐標 (x, y) x [0, 1, 0.5]; % …

Postman自動化測試提取相應body體中的參數

文章目錄Postman自動化測試提取相應body體中的參數1. 示例響應 Body 參數2. 提取響應 Body 參數Postman自動化測試提取相應body體中的參數 上一篇的文中介紹了使用postman自動化測試時從響應的header中提取token參數&#xff0c;很多同學私信問如何從響應體body中提取參數。 有…

vue-39(為復雜 Vue 組件編寫單元測試)

實際練習:為復雜 Vue 組件編寫單元測試 單元測試對于確保復雜 Vue 組件的可靠性和可維護性至關重要。通過隔離和測試代碼的各個單元,您可以在開發過程的早期發現并修復錯誤,從而構建更健壯和可預測的應用程序。本課程重點介紹為復雜 Vue 組件編寫單元測試的實用方面,建立在…