文章目錄
- 業務拆解
- 事件驅動的 reactor
- 總流程圖
- C 代碼實現
- 準備工作
- 編寫頭文件 reactor.h
- 準備頭文件
- 準備宏定義
- 聲明三大模塊函數和基礎的內存變量長度
- 定義全局變量
- 定義 EPOLL 實例事件處理的函數與釋放資源的函數
- 注冊服務器監聽套接字的函數
- accept_cb 模塊
- read_cb 模塊
- send_cb 模塊
- 服務器代碼
- 代碼運行效果
- 總結
推薦一個零聲教育學習教程,個人覺得老師講得不錯,分享給大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK等技術內容,點擊立即學習: https://github.com/0voice 鏈接。
業務拆解
在上一篇 文章 里,我們使用 EPOLL
機制去搭建服務器,能夠低成本高效率的執行 “多路復用網絡 I/O 高并發”。在本篇文章里,我們將要把上一篇問文章的代碼切分開來,使之模塊化,更好地滿足業務特色化需求。比如,最基礎的網絡 I 任務,就是讀取信息,如果業務有特殊需求,我們要特殊處理所讀取的信息,我們就要專門在源代碼的基礎上額外寫多幾個業務讀函數;再比如,最基礎的網絡 O 任務,是發送特定資源信息,如果業務有特殊需求,我們要特殊處理將要發送的信息,比如以 HTTP 報文形式發送給用戶,使得用戶可以在瀏覽器上看到我們所發的內容。
總之,我們要實現一個服務器底座,一個可根據業務內容做簡單擴展的服務器代碼,降低開發難度。我們稱之為 “事件驅動的 reactor”。
事件驅動的 reactor
reactor 顧名思義就是反應器的原理,不同的信號就會有不同的反應,回顧上一篇 文章 的 EPOLL 服務器,主要有三類網絡 I/O 任務
- 輸入任務1:監聽套接字
sockfd
監聽到來訪 IP,讀取連接信息,分配套接字clientfd
,以負責對應網絡 I/O。 - 輸入任務2:客戶端發來信息,
epoll
調用操作系統內核通知進程處理讀事件。 - 輸出任務:沿著對應套接字
clientfd
向客戶端發送信息。
也就是說我們要把代碼分成三個模塊,實現三種因事件信號而異的反應,綜合起來就是一個反應器 reactor。我們在每一個模塊中預留一個地方給各自的業務代碼函數。這三個模塊我們分別記成
- accept_cb 模塊,
- read_cb 模塊,
- send_cb 模塊
總流程圖
為了簡化表達,避免像上一篇 文章 那樣寫的那么復雜。我們也分模塊來寫流程圖,各個模塊再給出它自己的流程圖。我們先給總體的流程圖。
C 代碼實現
準備工作
編寫頭文件 reactor.h
此文件內定義了特殊的結構體,記錄了若干個套接字每次 I/O 的內容
#ifndef __SERVER_H__
#define __SERVER_H__#define BUFFER_LENGTH 1024typedef int (*RCALLBACK)(int fd);struct conn {int fd;// 申請緩沖區的大小 1024 --> 2048 ...... 逐步遞增 KB,在 accept_cb 函數中用 malloc 建立,在 recv_cb 和 send_cb 函數中用 realloc 函數調整其大小char *rbuffer; // 邊沿模式下,我們是不能夠假設我們要讀取總量多少的內容,只能是全部讀取ssize_t rlength;ssize_t rcap; // 讀緩沖區容量char *wbuffer; // 邊沿模式下,我們是不能夠假設我們要讀取總量多少的內容,只能是全部寫入ssize_t wlength;ssize_t wcap; // 寫緩沖區容量RCALLBACK send_callback;union {RCALLBACK recv_callback;RCALLBACK accept_callback;} r_action;};#endif
其中以下代碼結構是頭文件為了保護定義而專門設置的,避免同一個頭文件重復定義
#ifndef __SERVER_H__
#define __SERVER_H__// 定義類型#endif
所定義的 struct conn
類型中有一個成員值得注意,那就是 r_action
,它是一個聯合體而非結構體,它有一詞多義性,我們知道 RCALLBACK
是一個回調函數,
typedef int (*RCALLBACK)(int fd);
回調函數可以是任何 同 (返回類型、傳入參數類型)的函數。而在類型 struct conn
中的成員 r_action
就好比 “面向對象編程中的多態”,成員都叫同一個名字,但是具體的定義是不一樣的。下文的主函數(服務器代碼)中會出現兩行類似的代碼,就是來自于回調函數的使用,它們分別是(讀者可以通篇閱讀完這篇文章后再回來)
1、conn_list[sockfd].r_action.accept_callback = accept_cb;
2、conn_list[fd].r_action.recv_callback = recv_cb;
3、conn_list[fd].send_callback = send_cb;
4、for (int j = 0; j < MAX_PORTS; j++) {if (connfd == listen_fds[j]) {conn_list[connfd].r_action.accept_callback(connfd);is_listener = 1;break;}}
5、if (conn_list[connfd].r_action.recv_callback(connfd) >= 0) { // 返回0表示成功printf("[%ld] RECV: %s\n",conn_list[connfd].rlength, conn_list[connfd].rbuffer);} else {// 連接在 recv 函數處早已釋放continue; // 跳過后續處理}
6、conn_list[connfd].send_callback(connfd);
準備頭文件
#include <errno.h> // 這是全局變量 errno,用于健壯的讀取功能
#include <stdio.h>
#include <stdlib.h> // 動態內存分配
#include <sys/socket.h> // 創建和管理套接字。綁定地址、監聽連接和接受連接。發送和接收數據。設置和獲取套接字選項。 socket()、connect()、sendto()、recvfrom()、accept()
#include <netinet/in.h> // 提供了結構體 sockaddr_in
#include <string.h> // strerror 函數
#include <fcntl.h> // 用于更改套接字的模式,比如非阻塞模式
#include <unistd.h> // close 函數,關閉套接字
#include <sys/types.h> // ssize_t 是一個有符號的整數類型
#include <sys/epoll.h> // EPOLL 高并發機制
#include <sys/time.h> // timeval 類型,用于表述等待時間#include "reactor.h" // 底層數據結構,回調函數、業務端函數的統一聲明
此處注意到我們是導入了剛剛所寫的頭文件 “reactor.h”。
準備宏定義
這個代碼可建立一百多萬個連接,下一篇文章里我將介紹百萬連接的方法。為了釋放這百萬連接的代碼潛力,我們要定義這三個宏。
#define CONNECTION_SIZE 1048576 // 1024 * 1024,即我們要測試 1 M 的連接數#define MAX_PORTS 20 // 該服務器占用本地 20 個端口,用以建立網絡 I/O ,更好地實現百萬并發// 這是一個宏操作,是兩個時間戳的相減,表示 “計時”
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
MAX_PORTS
是用來支持百萬連接的,CONNECTION_SIZE
是指定最大的連接數。TIME_SUB_MS(tv1, tv2)
宏操作是用來測試服務器的性能,用來計算運行的時長
聲明三大模塊函數和基礎的內存變量長度
// 聲明函數,先寫主函數的代碼,適用于底層鏈接的
int accept_cb(int fd);
int recv_cb(int fd );
int send_cb(int fd);#define BUFFER_LENGTH 1024
無論是網絡輸入還是輸出,每次操作的字節數上限都是 BUFFER_LENGTH
個,慢慢的逐步地有條不紊地接收發送信息,而非一次過把所有內容都吞下。
定義全局變量
當我們定義了某個全局變量,我們所定義所有函數都可以在不傳入該變量的前提下對其進行改變,無通過效仿一般函數的傳入參數前外加取址符,以求在內存層次改變變量本身。
全局變量并不占用線程棧的內存空間,而是存儲在 “靜態數據區” 之中。
int epfd = 0; // epoll 事件文件套接字,它申請為一個全局變量
struct timeval begin; // 聲明一個全局時間戳變量// 本 EPOLL 實例一次最多建立 1 M = 1024*1024 個網絡 I/O 事件(百萬并發);這又被稱之為總體事件集
struct conn conn_list[CONNECTION_SIZE] = {0};
// 這是一個全局變量
// 在函數中對全局變量賦值后,函數執行完畢后全局變量的值會保持修改后的值。全局變量的特性決定了它的值在程序運行期間會持久存在,不會因為函數執行結束而重置。
如果讀者不知道靜態數據區的大小是多少,可以通過以下方法查看(我是用 Linux 系統編程的)
qiming@qiming:~/share/CTASK/TCP_test$ ulimit -a
real-time non-blocking time (microseconds, -R) unlimited
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 15051
max locked memory (kbytes, -l) 496096
max memory size (kbytes, -m) unlimited
open files (-n) 1024
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 15051
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited
我們注意到
data seg size (kbytes, -d) unlimited
- data seg size:表示程序數據段的最大大小,單位為 KB。數據段包括全局變量和靜態變量,這些變量存儲在全局數據區中。
這說明靜態數據區沒有特別的限制,僅由操作系統的內存所限制。故而我們不必擔心 conn_list
這個一百萬多元的超大數組會超規模占用空間。
另外,epfd
將會是下文的 “EPOLL 實例”,begin
是全局的開端時間戳。
定義 EPOLL 實例事件處理的函數與釋放資源的函數
EPOLL 實例事件處理的函數如下。
// 該函數可以實現對文件描述符的 EPOLL 事件注冊、修改或刪除,flag 參數是用來區分情況的
int set_event(int fd, int event, int flag) {// fd 是目標文件描述符或套接字;// event 是事件類型,比如讀事件 EPOLLIN;// flag 是用來區分是注冊還是修改的;1 表示是注冊;0 表示修改;2 表示刪除if (flag == 1) { // add 1struct epoll_event ev;ev.events = event;ev.data.fd = fd;if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {// 老板感興趣的事件是,前臺小姐姐的工作情況;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注冊入 EPOLL 之中// (即一個套接字 fd 對應一個就緒狀態表 0/1)perror("epoll_ctl failed");close(fd);return -1;}} else if (flag == 2) { // delete 2struct epoll_event ev;ev.events = event;ev.data.fd = fd;if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev) < 0) {// 老板感興趣的事件是,前臺小姐姐的工作情況;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注冊入 EPOLL 之中// (即一個套接字 fd 對應一個就緒狀態表 0/1)perror("epoll_ctl failed");return -1;}} else if (flag == 0) { // modify 0struct epoll_event ev;ev.events = event;ev.data.fd = fd;// EPOLL_CTL_MOD:修改已經注冊到 epoll 實例中的文件描述符 fd 的監視事件。if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {// 老板感興趣的事件是,前臺小姐姐的工作情況;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注冊入 EPOLL 之中// (即一個套接字 fd 對應一個就緒狀態表 0/1)perror("epoll_ctl failed");close(fd);return -1;}} else {printf("Param Error: flag =0, 1, 2\n");}}
我們注意到,該函數有一個 int flag
參數,這其實是一個狀態機,用來標明選擇處理方式。用來區分是注冊還是修改的,抑或刪除;1 表示是注冊;0 表示修改;2 表示刪除。
釋放資源的函數如下。
// 只在連接關閉時釋放資源:事件 event 代號 0 通常表示沒有事件發生
void close_connection(int fd) {set_event(fd, 0, 2);close(fd);if (conn_list[fd].rbuffer) {free(conn_list[fd].rbuffer);conn_list[fd].rbuffer = NULL; // 防止重復釋放}if (conn_list[fd].wbuffer) {free(conn_list[fd].wbuffer);conn_list[fd].wbuffer = NULL;}memset(&conn_list[fd], 0, sizeof(struct conn));
}
緩沖區生命周期必須與連接生命周期一致,在連接關閉前不要釋放緩沖區,在連接關閉后確保完全釋放。
注冊服務器監聽套接字的函數
占用設備的端口,設置監聽套接字。
// 創建監聽套接字 sockfd,并且綁定本機 IP 和端口 port
// (遠程IP, 遠程PORT, 本地IP, 本地PORT, 協議) 與 網絡 I/O 一對一成型一個 sockfd
int init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0servaddr.sin_port = htons(port); // 0-1023, // 設置端口復用// 當服務器主動關閉 TCP 連接時,會進入 TIME_WAIT 狀態(通常持續 2MSL,約 1-4 分鐘)。在此期間,操作系統會保留該端口綁定記錄,防止延遲到達的數據包干擾新連接。// 問題:服務器崩潰或重啟后嘗試重新綁定端口時,會因 TIME_WAIT 狀態導致 bind() 失敗(錯誤:Address already in use) // 以下處理措施:能避免再次啟用服務器程序時,系統的宕機int reuse = 1;if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {// 如果未設置 SO_REUSEADDR,導致端口被占用后無法立即重用(TIME_WAIT 狀態)printf("setsockopt failed: %s\n", strerror(errno));return -1;}// setsockopt 是一個用于設置套接字選項的系統調用函數。// 第二項參數 level:指定選項所在的協議級別。常見的值包括:SOL_SOCKET:表示套接字級別的選項。IPPROTO_TCP:表示 TCP 協議級別的選項。IPPROTO_IP:表示 IP 協議級別的選項。IPPROTO_IPV6:表示 IPv6 協議級別的選項。// 第三項參數 optname:指定要設置的選項名稱。不同的協議級別有不同的選項名稱。例如:在 SOL_SOCKET 級別,常見的選項包括 SO_REUSEADDR、SO_KEEPALIVE、SO_LINGER 等。在 IPPROTO_TCP 級別,常見的選項包括 TCP_NODELAY 等。// 第四項參數 optval:指向包含選項值的內存區域。選項值的類型和大小取決于 optname// 第五項參數 optlen:指定 optval 的長度(以字節為單位)。if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {printf("bind failed: %s\n", strerror(errno)); // strerror 函數定義在 <string.h> 中,而 errno 是 <errno.h> 的全局變量return -1;}// 一次監聽 10 個來訪 IP:PORT// printf("listen finshed: %d\n", sockfd); // 3 if (listen(sockfd, 10) < 0) {// 將套接字設置為被動模式:套接字從主動連接模式(用于客戶端)轉換為被動監聽模式(用于服務器)。我們可以把這個 socket 想象成公司的前臺小姐。// 5:是監聽隊列的最大長度,表示系統可以為該套接字排隊的最大未完成連接數。當新的連接請求到達時,如果隊列已滿,新的連接請求將被拒絕。// 返回值:成功,返回 0。失敗,返回 -1,并設置 errno 以指示錯誤原因。printf("listen finshed: %d\n", sockfd); return -1;}return sockfd;}
accept_cb 模塊
首先給出的是,針對已經在 EPOLL 實例中注冊的套接字,初始化其在全局變量 conn_list 對應位置上的內存配置。
// 針對已經注冊的 clientfd (當然也有可能注冊不成功,要注意處理失敗) 在事件總集的對應 fd 上的綜合情況進行初始化
int event_register(int fd, int event) {if (fd < 0) return -1; // 這里是用來應對 accept 函數調用失敗的錯誤conn_list[fd].fd = fd;conn_list[fd].r_action.recv_callback = recv_cb;conn_list[fd].send_callback = send_cb;conn_list[fd].rbuffer = NULL; // 重置讀緩沖區conn_list[fd].rlength = 0;conn_list[fd].rcap = 0;conn_list[fd].wbuffer = NULL; // 確保初始化為NULLconn_list[fd].wcap = 0;conn_list[fd].wlength = 0;set_event(fd, event, 1); // 標志 1 表示當前是對 fd 的事件注冊而非修改
}
緊接著,當服務器的監聽套接字監聽到了來訪 IP 時,分配出新的套接字以對接接下來對該 IP 的 I/O 任務。
// listenfd(sockfd) --> EPOLLIN --> accept_cb 根據情況使用回調函數————監控套接字 sockfd 的讀事件是 accept 注冊 clientfd
int accept_cb(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);// 這兩個變量是專門用來注冊 clientfd 的int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);// accept 會因為無輸入而阻塞(因為 sockfd 是默認的阻塞模式),本設計就是防止其阻塞(利用條件判斷繞開阻塞)if (clientfd < 0) {printf("accept errno: %d --> %s\n", errno, strerror(errno));return -1;}// 動態分配讀入內存的空間conn_list[clientfd].rbuffer = malloc(BUFFER_LENGTH); // 先行分配1024字節的內存,它是讀取內容最終歸宿if (conn_list[clientfd].rbuffer) {memset(conn_list[clientfd].rbuffer, 0, BUFFER_LENGTH); // 初始化為0}conn_list[clientfd].rlength = 0;conn_list[clientfd].rcap = BUFFER_LENGTH; // 記錄容量if (conn_list[clientfd].rbuffer == NULL) {perror("Malloc ReadBuffer Error");return -1;}// 使用 `EPOLLET` 必須配合非阻塞套接字,否則可能阻塞線程int flags = fcntl(clientfd, F_GETFL, 0); // F_GETFL 是獲取標志的命令fcntl(clientfd, F_SETFL, flags | O_NONBLOCK); // 要注意 “|” 是按位或操作,能進行掩碼疊加;F_SETFL 是設置文件狀態標志,在原來的基礎上增加非阻塞功能// 函數 fcntl() 作用:讀取 clientfd 當前的所有文件狀態標志// 返回值:包含位掩碼的整數,表示當前所有設置的標志// O_RDONLY:只讀模式 (0);O_WRONLY:只寫模式 (1);O_RDWR:讀寫模式 (2);// O_NONBLOCK:非阻塞模式 (04000);O_APPEND:追加模式 (02000)event_register(clientfd, EPOLLIN | EPOLLET); // | EPOLLET 是 clientfd 使用邊沿觸發模式// 這是用于說明情況的if ((clientfd % 1000) == 0) { // 為了使得打印不過分密集,每一千個 I/O 就打印一次struct timeval current;gettimeofday(¤t, NULL); // 獲取當前時間戳,定義在 <sys/time.h> 頭文件中int time_used = TIME_SUB_MS(current, begin); // 獲取時間差memcpy(&begin, ¤t, sizeof(struct timeval));printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);}return 0;
}
read_cb 模塊
這個函數是有相當多細節的。首先,我們用于計數的變量 count
所用的數字類型是 ssize_t
類型的(在頭文件 <sys/types.h> 中定義的),是一個ssize_t 是一個有符號的整數類型,在 64 位操作系統中,范圍是 [-263,263-1],是一個相當大的數。這說明這個服務器是有被用戶上傳視頻這種大型數據的能力的。
// 根據情況使用回調函數————普通套接字 clientfd 的讀事件是 recv_cb 讀取內存
// 在邊沿模式下,該函數要循環執行
int recv_cb(int fd) {ssize_t count=0;char buffer[BUFFER_LENGTH] = {0}; //struct conn* c = &conn_list[fd]; // 使用局部變量簡化代碼if (conn_list[fd].rbuffer != NULL) { // 連接復用,二次收信息的時候free(conn_list[fd].rbuffer);conn_list[fd].rbuffer = NULL; // 防止重復釋放}conn_list[fd].rlength =0;conn_list[fd].rcap = BUFFER_LENGTH; while (1) {count = recv(fd, buffer, BUFFER_LENGTH, 0); // 讀取固定長度的內容,0 代表以阻塞模式讀取數據// 因為 clientfd 不是阻塞模式, recv 不會因無輸入而阻斷,而是會立即返回 -1// 函數 recv: 讀取固定字節的內容。// 返回值 > 0:表示成功接收了數據,返回值表示實際接收到的字節數。// 返回值 == 0:表示對端已經關閉了連接(TCP連接的正常關閉)。這是TCP協議的對端關閉連接的標志。// 返回值 == -1:表示無信息可讀。并設置錯誤碼為 EAGAIN 或 EWOULDBLOCK。這表示當前沒有數據可讀,但連接仍然有效。// 當調用 recv 時,會觸發用戶態到內核態的切換,內核負責從套接字接收緩沖區復制數據到用戶提供的緩沖區,內核會更新接收緩沖區的狀態(如移除已讀取數據)if (count == 0) { // disconnectclose_connection(fd); // 連接斷開必須要清理套接字資源,前面已經清理過一次了,避免在次釋放printf("client disconnect: %d\n", fd);// 當客戶端主動發來斷開連接的請求時,return -1;} else if (count < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) {// 無數據可讀,退出循環return 0;}else {printf("recv error: %s\n", strerror(errno));close_connection(fd);return -2;}} else {// 使用結構體中的長度和容量字段ssize_t new_length = conn_list[fd].rlength + count;// 確保緩沖區存在,這是下文能成功使用 memcpy 的原因if (!conn_list[fd].rbuffer) {conn_list[fd].rbuffer = malloc(BUFFER_LENGTH);if (!conn_list[fd].rbuffer) {perror("malloc failed");close_connection(fd);return -3;}conn_list[fd].rcap = BUFFER_LENGTH;conn_list[fd].rlength = 0;}// 動態擴容if (new_length > conn_list[fd].rcap) {ssize_t new_cap = conn_list[fd].rcap * 2;char *new_buf = realloc(conn_list[fd].rbuffer, new_cap);if (!new_buf) {perror("realloc failed");close_connection(fd);return -3;}conn_list[fd].rbuffer = new_buf;conn_list[fd].rcap = new_cap;}// 復制數據memcpy(conn_list[fd].rbuffer + conn_list[fd].rlength, buffer, count); // 我在這個地方多次出錯,memcpy 用的不好,我在此處爆棧了,conn_list[fd].rlength = new_length;// 安全打印(指定長度)// printf("[%zd]RECV: %.*s\n", count, (int)count, buffer); // 這是特殊的占位符用法}memset(buffer,0,BUFFER_LENGTH); // 重置讀取的緩沖區,他是固定長度的,用于接收 fd 里面的字節,每次只讀取一點點}return 0;
}
這個函數先是對初始化 conn_list
的讀緩沖區和讀緩沖區的長度和容量進行初始化,而后采用邊沿讀取的方法,無限循化地讀取,直至讀完。邊沿觸發模式下,事件只在狀態發生變化時通知一次,不會因為緩沖區中持續有數據而反復觸發。這大大減少了epoll_wait
的調用次數,降低了內核與用戶態之間的上下文切換開銷,從而顯著提高了性能。邊沿觸發模式通常與非阻塞I/O搭配使用。在非阻塞模式下,程序會盡可能多地讀取或寫入數據,直到遇到EAGAIN
或EWOULDBLOCK
錯誤為止。這種模式下,邊沿觸發能夠更好地發揮其優勢。
- 當用戶遠程斷開連接時,會自動給服務器發送信息,觸發 EPOLL 的讀事件
event
,并且使用recv
函數后,返回值是0
。 - 當文件讀取完畢后,由于
clientfd
被設置成非阻塞模式,recv 不會因無輸入而阻斷,而是會立即返回 -1,對應的錯誤是EAGAIN
和EWOULDBLOCK
。但這是正常現象。換句話說,在阻塞模式下,是不存在這個錯誤的。 - 我們在讀取 I/O 文件內容時,是使用了動態內存擴充的方法,這使得我們的服務器可以接受超大型的數據。
send_cb 模塊
在處理網絡輸出(即 EPOLLOUT
事件)時,套接字 clientfd
的模式被函數set_event
調整為阻塞模式,可保證發送到內核緩沖區(即套接字對應的 I/O 文件上),但不保證完整傳輸到對端(受網絡狀況影響),因而還需要循環發送。我們還需設置超時處理,保證可以全部發送。
// 掛起等待,寫事件就緒,處理 send 函數的 Bug
void wait_for_socket_writable(int sockfd) {// 需提前創建epoll實例并注冊事件int epoll_fd_1 = epoll_create1(0);struct epoll_event ev;ev.events = EPOLLOUT; // 監聽可寫ev.data.fd = sockfd;epoll_ctl(epoll_fd_1, EPOLL_CTL_ADD, sockfd, &ev);// 等待事件觸發,無限阻塞epoll_wait(epoll_fd_1, &ev, 1, -1);close(epoll_fd_1); // 添加關閉
}// 確保數據全部發送
// 即使使用阻塞 sockfd 也必須循環發送!因為阻塞模式只保證發送到內核緩沖區,不保證完整傳輸到對端(受網絡狀況影響)。
ssize_t send_all(int sockfd, const void *buf, size_t len, int flags) {ssize_t total_sent = 0; // 已發送字節數const char *ptr = (const char *)buf; // 移動指針指向未發送數據// 即使使用阻塞 sockfd 也必須循環發送!因為阻塞模式只保證發送到內核緩沖區,不保證完整傳輸到對端(受網絡狀況影響)。while (total_sent < len) {// 嘗試發送剩余數據ssize_t n = send(sockfd, ptr, len - total_sent, flags);if (n < 0) {// 錯誤處理(重點!)if (errno == EINTR) continue; // 信號中斷:重試if (errno == EAGAIN || errno == EWOULDBLOCK) {// 阻塞模式:等待可寫(需配合 select/poll/epoll)wait_for_socket_writable(sockfd);continue;}return -1; // 其他錯誤(如連接斷開)} else if (n == 0) {printf("client disconnect: %d\n", sockfd);close_connection(sockfd);return total_sent; // 連接關閉(部分發送)}// 更新狀態total_sent += n;ptr += n; // 移動指針到未發送數據位置}return total_sent; // 返回實際發送的字節數(應等于 len)
}
綜合以上兩個函數,我們還指示了業務處理位置。
// 根據情況使用回調函數————普通套接字 clientfd 的輸出事件是 send_cb 發送內容
// 在邊沿模式下,該函數要循環執行
int send_cb(int fd) {if (conn_list[fd].wbuffer != NULL) { // 連接復用,二次發送信息的時候,寫緩沖區的初始化free(conn_list[fd].wbuffer); conn_list[fd].wbuffer = NULL; // 防止重復釋放}conn_list[fd].wlength =0;/////////////////////////////////////// 響應操作的業務端(開始) //////////////////////////////////////////////////////////////////////////////// 響應操作的業務端(結束) /////////////////////////////////////////ssize_t count = 0;if (conn_list[fd].wlength != 0) {count = send_all(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);}printf("SEND: %zd\n", count);return count;
}
服務器代碼
結合前面的流程圖,我們可以給出這個服務器的代碼
int main() {unsigned short port = 2000; // 端口 portepfd = epoll_create(1); // epfd 已經被聲明為一個全局變量int listen_fds[MAX_PORTS] = {0}; // 存儲所有監聽socketint i = 0;for (i = 0; i < MAX_PORTS; i++) {int sockfd = init_server(port + i);listen_fds[i] = sockfd;conn_list[sockfd].fd = sockfd; // 使用fd作為索引conn_list[sockfd].r_action.accept_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);}gettimeofday(&begin, NULL); // 獲取最初的時間戳,begin 是全局變量while (1) { // mainloop 服務器的根本struct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, 5); // 5 指代等待 5 毫秒,如果 I/O 響應滿員的話,立即返回// 該通知函數是需要調用系統內核的,是需要花費成本的,如果只是通知某些內容沒有讀完而調用,是極其得不償失的,這是我們需要用到邊沿觸發的原因,只是編程難度更大了// 我們要重視操作系統內核的調動,系統 I/O 并不全體現在代碼之上,而代碼要考慮操作系統內核可能出現的情況;編寫代碼要看到代碼之外的東西int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;int is_listener = 0; // 狀態機-重置// 當連接異常斷開時未清理資源,添加錯誤處理:if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) { // 要注意 “|” 是按位或操作,能進行掩碼疊加;“&” 是按位與操作// EPOLLHUP 表示對應的文件描述符被掛斷。EPOLLERR 表示對應的文件描述符發生錯誤。close_connection(connfd);continue;}// 檢查是否為監聽套接字for (int j = 0; j < MAX_PORTS; j++) {if (connfd == listen_fds[j]) {conn_list[connfd].r_action.accept_callback(connfd);is_listener = 1;break;}}if (is_listener) continue; // 狀態機-進入下一個環節// 處理讀事件if (events[i].events & EPOLLIN) {// 這是監控到了除 sockfd 以外的套接字// ET 邊沿模式需循環讀取,原因是要保證網絡 I/O 所有內容都被讀取!if (conn_list[connfd].recv_callback(connfd) >= 0) { // 返回0表示成功printf("[%ld] RECV: %s\n",conn_list[connfd].rlength, conn_list[connfd].rbuffer);} else {// 連接在 recv 函數處早已釋放continue; // 跳過后續處理}/////////////////////// 讀操作的業務端(start) ////////////////////////// 至此,客戶端的請求報文全部寫完了,寫入了 rbuffer 之中。我們要利用這個內存去執行業務操作// 實現 HTTP 請求,在代碼里面,該函數只是形式上存在,并不是重點// http_request(&conn_list[fd]);// WebSocket 協議的請求// ws_request(&conn_list[fd]);/////////////////////// 讀操作的業務端(start) ////////////////////////set_event(connfd, EPOLLOUT, 0);} else if (events[i].events & EPOLLOUT) {// 這里必須注意 EPOLLOUT 不是邊沿事件觸發模式,我們通常只把響應報文的 header 寫入 wbuffer 中,長度是固定的,因此水平出發即可。// 至于那大段大段的文件資源傳輸,則是通過文件描述符之間操作完成,跳過緩沖區讀寫// send 會因無輸輸出而阻斷// 函數 send: 發送固定字節的內容。// 返回值 > 0:表示成功發送了數據,返回值表示實際發送的字節數。// 返回值 == -1:表示發送操作失敗。錯誤原因可以通過 errno 獲取conn_list[connfd].send_callback(connfd);// 如果是為了測試百萬并發,則用這個,不要把 fd 關閉set_event(connfd, EPOLLIN | EPOLLET, 0); // 這次輸出發送事件結束了;改為 “邊沿讀寫模式”,執行 epoll_ctl 函數,系統內核會直接把 fd 加載到 epoll 的就緒集之中} else {printf("Unknown event on clientfd: %d, errno:%d\n", connfd, errno);close_connection(connfd);}}}}
我們注意到,對于模塊函數 accept_cb
的使用,我們借助了 “狀態機” 的思路,即代碼中的 is_listener
,讓情況得以分類。
代碼運行效果
代碼編譯
qiming@qiming:~/share/CTASK/TCP_test$ gcc -o reactor reactor.c
程序執行,一開始沒鏈接的時候,程序并沒有掛起,而是作無意義的 while 死循環
,原因是代碼中的 epoll_wait(epfd, events, 1024, 5)
并非阻塞運行,而是等待 5 毫秒后運行下一行代碼。
qiming@qiming:~/share/CTASK/TCP_test$ ./reactor
NetAssist 遠程連接
發送信息(我們并沒發送什么東西,只是象征性的設置,讀者可自行設置,輸出)
qiming@qiming:~/share/CTASK/TCP_test$ ./reactor
[13244] RECV: The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows no intention of putting them off. In contrast, the man on the right exhibits a desire for delaying his task and holds that he doesn't go cracking until the deadline.
As the thought-provoking pictures intend to mirror, the idle boy as mentioned above is not likely to finish his assignments in the end, any more than a man can attain great achievement if he is accustomed to putting tasks off. For one thing, the habits shape the turns of our attitudes and behaviors of how to deal with our plans and tasks. In the course of pursuing our goals, we may face a great deal of temp-
tations and adversities, and a good habit of self-discipline can direct us to make right decisions and refrain from indulging us with entertainment. For another, if we fail to be self-disciplined, we will be deprived of the opportunities for forging the brilliant traits, like mental maturity, fortitude, creativity and so on, which will render us considerable autonomy and lead us to be successful.
Personally, we can't underscore the significance of the good habits too much. It is advisable that we should concentrate on our daily tasks through strict time management and enjoy the course of striving.
The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows no intention of putting them off. In contrast, the man on the right exhibits a desire for delaying his task and holds that he doesn't go cracking until the deadline.
As the thought-provoking pictures intend to mirror, the idle boy as mentioned above is not likely to finish his assignments in the end, any more than a man can attain great achievement if he is accustomed to putting tasks off. For one thing, the habits shape the turns of our attitudes and behaviors of how to deal with our plans and tasks. In the course of pursuing our goals, we may face a great deal of temp-
tations and adversities, and a good habit of self-discipline can direct us to make right decisions and refrain from indulging us with entertainment. For another, if we fail to be self-disciplined, we will be deprived of the opportunities for forging the brilliant traits, like mental maturity, fortitude, creativity and so on, which will render us considerable autonomy and lead us to be successful.
Personally, we can't underscore the significance of the good habits too much. It is advisable that we should concentrate on our daily tasks through strict time management and enjoy the course of striving.The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows n
我發送了若干段考研英語一的大作文(同一篇文章多次復制粘貼),模擬客戶端發送的內容非常多,多到超過了一開始設置的讀緩沖區容量 BUFFER_LENGTH
,即 1 KB
的內容。我們注意到
[13244] RECV: The drawings ... 略
共發送了 1 萬多個字節的內容,說明我們的動態內存擴充的代碼是有效的。
而且接受完信息后,還可以發送信息,說明事件轉化的設計也是有效的(在輸出的最末端,我們可在命令行處注意到)。
SEND: 0
這樣一來,我們這個服務器是可以完成一個準網絡 I/O 事務的。完整的網絡 I/O 事務是
總結
本篇文章,對上一篇 文章 的 EPOLL 服務器作出模塊化的升級,讓代碼更有組織度的同時,還有以下的特性
- 定義了
conn
類型,它能記錄對應網絡 I/O 文件的輸入輸出內容,而且還記錄了對應套接字的輸入和輸出行為。類似于 “多態” 的概念,同類型的兩個變量內同一成員名字有不同含義。讓命名變得簡單了許多。 - 對任何套接字(無論是否為監聽套接字)設置了根據事件信號而觸發的網絡 I/O 行為。這就是 “Reactor” 這個名字的由來。
- 對于 recv_cb 函數設計了可動態擴充讀緩沖區的功能,能夠接收超大型數據。
- 函數
set_event
能夠有效的進行事件轉化,使之同一個套接字的事件能夠在EPOLLIN | EPOLLET
和EPOLLOUT
之間來回轉換,剛好適應了網絡 I/O 的事務模式。
在下一篇文章里,我們針對這個事件驅動 reactor 式的服務器(還是這份代碼)測試其百萬并發的能力。