原文是由寫的,寫的真的很好,原文鏈接:純c協程框架NtyCo實現與原理-CSDN博客
1.為什么會有協程,協程解決了什么問題?
網絡IO優化
? ? ? ? 在CS,BS的開發模式下,服務器的吞吐量是一個受關注的參數,吞吐量等于1秒內業務處理的次數,那么這個業務處理其實是 由 網絡IO事件 + 業務處理時間 組成的。 業務不同處理時間也就不同,但是網絡IO時間是可以進行優化的。
? ? ? ? 也就是說,如何提升recv和send的性能?以epoll管理百萬長連接為例,測試IO同步操作與異步操作的性能差別。
? ? ? ? 對于響應式服務器來說,所有客戶端的操作都是源于這個大循環,對于服務器處理網絡IO,有兩種方式。第一種,IO同步;第二種,IO異步。
?IO同步 操作性能測試
? ? ? ? 對于IO同步操作來說,handle(sockfd) 函數內部實現如下
? ? ? ? 同步:檢測 IO 與 讀寫 IO 在同一個流程中
?測試出來,每一千個連接,耗時7.5秒左右。
優點:
? ? ? ? 1.sockfd 管理方便
? ? ? ? 2.代碼邏輯清晰
缺點:
? ? ? ? 1.服務器程序依賴 epoll_wait 的循環,響應速度慢。
? ? ? ? 2.程序性能差
IO異步 操作性能測試?
? ? ? ? 對于IO 異步操作來說,將任務push到線程池中,有其他線程進行讀寫。
? ? ? ? 異步:檢測 IO 與 讀寫 IO 不在同一個流程中
?????????IO操作與epoll_wait不在一個處理流程中,實現了解耦,這是IO異步操作,每一千個連接耗時2.5秒左右
優點:
? ? ? ? 1.子模塊好規劃
? ? ? ? 2.程序性能高
缺點:
? ? ? ? 1.管理fd麻煩,需要避免一個fd被多個線程操作的情況發生。
協程的誕生
對比項 | IO 同步操作 | IO 異步操作 | 協程 |
sockfd 管理? ? ?? | 管理方便 | 多個線程共同管理 | 管理方便 |
代碼邏輯 | 程序整體邏輯清晰 | 子模塊邏輯清晰 | 程序整體邏輯清晰 |
程序性能 | 響應時間長,性能差 | 響應時間短,性能好 | 響應時間短,性能好 |
?? ?從上面我們知道了IO同步操作,寫代碼邏輯清晰,但是效率低;而IO異步操作,fd管理復雜,但是效率高。由此,協程便出現了。
??協程:把兩者結合起來,以同步的編程方式,實現異步的性能。
????????即寫代碼的時候,同步;運行的邏輯,異步。
2.原語
yield()
? ? ? ? 讓出,將當前的執行流程讓出,讓出給調度器。
????????那么什么時候需要yield讓出呢?很明顯在recv之前,send之前,也就是在io之前,因為我們不知道io是否準備就緒了,所以我們先將fd加入epoll中,然后yield讓出,將執行流程給調度器運行。
schedule
? ? ? ??schedule調度器做什么事情呢?調度器就是io檢測,調度器就是不斷的調用epoll_wait,來檢測哪些fd準備就緒了,然后就恢復相應fd的執行流程執行現場
。注意schedule不是原語,schedule是調度器。
resume()
????????從上面我們得知恢復是被schedule恢復的,那么現在恢復到了原來流程的哪里呢?其實是恢復到了yield的下一條代碼處
。通常下面的代碼都會將fd從epoll中移除,然后執行recv或send操作,因為一旦被resume,就說明肯定是準備就緒的。
?如何實現yield和resume
- yield :從io操作流程切換到調度器流程(讓出)
- resume : 從調度器流程切換到io操作流程
?可以基于以下方法實現yield和resume:
? ? ? ? 1.setjmp/longjmp
#include <stdio.h>
#include <setjmp.h>jmp_buf env; // 定義一個jmp_buf類型的變量env,用于保存跳轉環境void func(int arg) {printf("func: %d\n", arg);longjmp(env, ++arg); // 使用longjmp函數跳轉回之前設置的環境,并傳遞增加后的參數值
}int main() {int ret = setjmp(env); // 調用setjmp函數,將當前環境保存到env中,并返回0if (ret == 0) { // 如果setjmp返回0,表示這是第一次調用setjmpfunc(ret);} else if (ret == 1) { // 如果setjmp返回1,表示這是通過longjmp跳轉回來的func(ret);} else if (ret == 2) { // 如果setjmp返回2,表示這是通過longjmp跳轉回來的func(ret);} else if (ret == 3) { // 如果setjmp返回3,表示這是通過longjmp跳轉回來的func(ret);}return 0;
}
? ? ? ? 2.ucontext
void func1(void) {while (count ++ < 30) {printf("1\n");//swapcontext(&ctx[0], &ctx[1]); // 注釋掉的代碼:交換上下文,從ctx[0]切換到ctx[1]swapcontext(&ctx[0], &main_ctx); // 實際執行的代碼:交換上下文,從ctx[0]切換到main_ctx,即主程序的上下文printf("4\n");}}
// coroutine2
void func2(void) {while (count ++ < 30) {printf("2\n");//swapcontext(&ctx[1], &ctx[2]);swapcontext(&ctx[1], &main_ctx); // 注釋掉的代碼:將當前上下文ctx[1]切換到上下文ctx[2]printf("5\n"); // 將當前上下文ctx[1]切換到主上下文main_ctx}
}// coroutine3
void func3(void) {while (count ++ < 30) {printf("3\n");//swapcontext(&ctx[2], &ctx[0]);swapcontext(&ctx[2], &main_ctx); // 注釋掉的代碼:將當前上下文ctx[2]切換到上下文ctx[0]printf("6\n"); // 將當前上下文ctx[2]切換到主上下文main_ctx}
}char stack1[2048] = {0}; // 定義三個棧,每個棧大小為2048字節,并初始化為0char stack2[2048] = {0};char stack3[2048] = {0};getcontext(&ctx[0]); // 獲取當前上下文并保存到ctx[0]ctx[0].uc_stack.ss_sp = stack1; // 設置ctx[0]的棧指針為stack1ctx[0].uc_stack.ss_size = sizeof(stack1); // 設置ctx[0]的棧大小為stack1的大小ctx[0].uc_link = &main_ctx; // 設置ctx[0]的鏈接上下文為main_ctx,當ctx[0]執行完畢后,會切換到main_ctxmakecontext(&ctx[0], func1, 0); // 創建一個新的上下文ctx[0],并指定其執行的函數為func1,參數個數為0getcontext(&ctx[1]);ctx[1].uc_stack.ss_sp = stack2;ctx[1].uc_stack.ss_size = sizeof(stack2);ctx[1].uc_link = &main_ctx;makecontext(&ctx[1], func2, 0);getcontext(&ctx[2]);ctx[2].uc_stack.ss_sp = stack3;ctx[2].uc_stack.ss_size = sizeof(stack3);ctx[2].uc_link = &main_ctx;makecontext(&ctx[2], func3, 0);printf("swapcontext\n");//int i = 30;while (count <= 30) { // schedulerswapcontext(&main_ctx, &ctx[count%3]);}
? ? ? ? 3.用匯編代碼自己實現切換
//new_ctx[%rdi]:即將運行協程的上下文寄存器列表; cur_ctx[%rsi]:正在運行協程的上下文寄存器列表
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);// yield讓出
void nty_coroutine_yield(nty_coroutine *co) {_switch(&co->sched->ctx, &co->ctx);
}// resume協程恢復執行
int nty_coroutine_resume(nty_coroutine *co) {//...nty_schedule * sched = nty_coroutine_get_sched();sched->curr_thread = co;_switch(&co->ctx, &co->sched->ctx);//...
}
? ? ? ? 如何從一個協程切換到另一個協程呢?我們只需要將當前協程的上下文從寄存器組中保存下來;將下一個要運行的協程的上下文放到寄存器組上去,即可實現協程的切換。
3.切換
寄存器介紹
下面介紹的都是x86_64的寄存器。
- %rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函數參數,依次對應第1參數,第2參數…(這里我們只需關注%rdi和%rsi)
- %rbx,%rbp,%r12,%r13,%14,%15 用作數據存儲,遵循被調用者使用規則,簡單說就是隨便用,調用子函數之前要備份它,以防他被修改
- new_ctx是一個指針,指向一塊內存,它現在存在%rid里面,同理cur_ctx存在%rsi里面
- %rsp代表棧頂,%rbp代表棧底,%eip代表cpu下一條待取指令的地址(這也就是為什么resume之后會接著運行代碼流程的原因)
//new_ctx[%rdi]:即將運行協程的上下文寄存器列表; cur_ctx[%rsi]:正在運行協程的上下文寄存器列表
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
?匯編實現切換
//寄存器 cpu上下文
typedef struct _nty_cpu_ctx {void *rsp;//棧頂void *rbp;//棧底void *eip;//CPU通過EIP寄存器讀取即將要執行的指令void *edi;void *esi;void *rbx;void *r1;void *r2;void *r3;void *r4;void *r5;
} nty_cpu_ctx;//new_ctx[%rdi]:即將運行協程的上下文寄存器列表; cur_ctx[%rsi]:正在運行協程的上下文寄存器列表
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
//默認x86_64
__asm__(
" .text \n"
" .p2align 4,,15 \n"
".globl _switch \n"
".globl __switch \n"
"_switch: \n"
"__switch: \n"
" movq %rsp, 0(%rsi) # save stack_pointer \n"
" movq %rbp, 8(%rsi) # save frame_pointer \n"
" movq (%rsp), %rax # save insn_pointer \n"
" movq %rax, 16(%rsi) # save eip \n"
" movq %rbx, 24(%rsi) # save rbx,r12-r15 \n"
" movq %r12, 32(%rsi) \n"
" movq %r13, 40(%rsi) \n"
" movq %r14, 48(%rsi) \n"
" movq %r15, 56(%rsi) \n"" movq 56(%rdi), %r15 \n"
" movq 48(%rdi), %r14 \n"
" movq 40(%rdi), %r13 \n"
" movq 32(%rdi), %r12 \n"
" movq 24(%rdi), %rbx # restore rbx,r12-r15 \n"
" movq 8(%rdi), %rbp # restore frame_pointer \n"
" movq 0(%rdi), %rsp # restore stack_pointer \n"
" movq 16(%rdi), %rax # restore insn_pointer \n"
" movq %rax, (%rsp) # restore eip \n"
" ret # 出棧,回到棧指針,執行eip指向的指令。\n"
);
?????????上下文切換,就是將 CPU 的寄存器暫時保存,再將即將運行的協程的上下文寄存器,分別mov 到相對應的寄存器上。此時上下文完成切換。
4.協程的運行流程
協程如何使用,協程的api
????????在網絡IO編程的時候,如果每次accept返回的時候,為新來的fd單獨分配一個線程,這一個fd對應一個線程,就不會存在多個線程共用一個fd的問題了,雖然這樣代碼邏輯清晰易讀,但是這是無稽之談,線程創建與線程調度的代價是很大的
? ? ? ? 但是如果把線程換成協程,線程API的思維來使用協程,那不就可行了嗎?
?
? ? ? ? ?NtyCo封裝了兩類接口
- 協程本身的api
-
//創建協程 int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg); //調度器運行 void nty_schedule_run(void);
- posix api的異步封裝協程api
-
//POSIX 異步封裝 API int nty_socket(int domain, int type, int protocol);int nty_accept(int fd, struct sockaddr *addr, socklen_t *len);ssize_t nty_recv(int fd, void *buf, size_t len, int flags);ssize_t nty_send(int fd, const void *buf, size_t len, int flags);int nty_close(int fd);int nty_connect(int fd, struct sockaddr *name, socklen_t len);ssize_t nty_recvfrom(int fd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);ssize_t nty_sendto(int fd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
協程工作流程
創建協程
int nty_coroutine_create(nty_coroutine **new_co,proc_coroutine func,void *arg);
- ?nty_coroutine **new_co:需要傳入空的協程對象,這個對象是由內部創建的,并且在函數返回的時候,會返回一個內部創建的協程對象。
- proc_coroutine func:協程的子過程。當協程被調度的時候,就會執行該函數
- void *arg : 需要傳入到新協程子過程的參數。
? ? ? ? ?協程不存在親屬關系,都是一致的調度關系,接受調度器的調度。調用 create API就會創建一個新協程,新協程就會加入到調度器的就緒隊列中。
?回調協程的子過程
????????在 create 協程后,何時回調子過程?何種方式回調子過程?我們知道CPU的EIP寄存器就是存儲cpu下一條指令的地址,我們可以把回調函數的地址存儲到 EIP 中。這樣在resume之后,就會執行協程的子過程了。
// eip 執行入口
static void _exec(void *lt) {nty_coroutine *co = (nty_coroutine *) lt;co->func(co->arg);
}
// 初始化協程棧
static void nty_coroutine_init(nty_coroutine *co) {void **stack = (void **) (co->stack + co->stack_size);stack[-3] = NULL;stack[-2] = (void *) co;//設置參數co->ctx.rsp = (void *) stack - (4 * sizeof(void *));co->ctx.rbp = (void *) stack - (3 * sizeof(void *));co->ctx.eip = (void *) _exec;//設置回調函數入口co->status = BIT(NTY_COROUTINE_STATUS_READY);
}
協程封裝posix api異步原理
? ? ? ? 在send與recv 調用的時候,如何實現異步操作??
?
在進行 IO 操作(recv,send)之前,先執行了 epoll_ctl 的 del 操作,將相應的 sockfd 從 epfd中刪除掉,在執行完 IO 操作(recv,send)再進行 epoll_ctl 的 add 的動作。這段代碼看起來似乎好像沒有什么作用。
??如果是在多個上下文中,這樣的做法就很有意義了。能夠保證 sockfd 只在一個上下文中能夠操作 IO 的。不會出現在多個上下文同時對一個 IO 進行操作的。協程的 IO 異步操作正式是采用此模式進行的。
// 創建協程recv接口
ssize_t nty_recv(int fd, void *buf, size_t len, int flags) {struct epoll_event ev;ev.events = POLLIN | POLLERR | POLLHUP;ev.data.fd = fd;//加入epoll,然后yieldnty_epoll_inner(&ev, 1, 1);//resumessize_t ret = recv(fd, buf, len, flags);return ret;
}
// 加入epoll,更改狀態,加入wait集合,然后yield與resume
static int nty_epoll_inner(struct epoll_event *ev, int ev_num, int timeout) {nty_schedule * sched = nty_coroutine_get_sched();nty_coroutine *co = sched->curr_thread;int i;for (i = 0; i < ev_num; i++) {epoll_ctl(sched->epfd, EPOLL_CTL_ADD, ev->data.fd, ev);co->events = ev->events;//加入wait集合,添加wait狀態nty_schedule_sched_wait(co, ev->data.fd, ev->events, timeout);}//yieldnty_coroutine_yield(co);for (i = 0; i < ev_num; i++) {epoll_ctl(sched->epfd, EPOLL_CTL_DEL, ev->data.fd, ev);//移除wait集合,移除wait狀態nty_schedule_desched_wait(ev->data.fd);}return ev_num;
}
一個簡單的使用案例
? ? ? ? 可以看到,我們編寫代碼只需以同步的編程方式,就能實現異步的性能了。
#include "nty_coroutine.h"
#include <arpa/inet.h>void server_reader(void *arg) {int fd = *(int *) arg;ssize_t ret;struct pollfd fds;fds.fd = fd;fds.events = POLLIN;while (1) {char buf[1024] = {0};ret = nty_recv(fd, buf, 1024, 0);if (ret > 0) {nty_send(fd, buf, strlen(buf), 0);}else if (ret == 0) {nty_close(fd);break;}}
}void server(void *arg) {unsigned short port = *(unsigned short *) arg;int fd = nty_socket(AF_INET, SOCK_STREAM, 0);if (fd < 0) return;struct sockaddr_in local, remote;local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;bind(fd, (struct sockaddr *) &local, sizeof(struct sockaddr_in));listen(fd, 128);while (1) {socklen_t len = sizeof(struct sockaddr_in);int cli_fd = nty_accept(fd, (struct sockaddr *) &remote, &len);printf("new client comming\n");nty_coroutine *read_co;nty_coroutine_create(&read_co, server_reader, &cli_fd);}
}int main(int argc, char *argv[]) {nty_coroutine *co = NULL;unsigned short port = 8080;nty_coroutine_create(&co, server, &port);nty_schedule_run(); //runreturn 0;
}
5.協程 與 調度器 結構體定義
協程定義
??一個協程會有哪些狀態呢?如果協程sleep了,那么就是睡眠狀態,如果協程剛創建出來,那它肯定是就緒狀態,如果協程在等待數據的到來,那就是等待狀態。這里這里定義協程的三個運行狀態{就緒,睡眠,等待}。
- 新創建的協程,加入就緒集合等待調度
- io未就緒的協程,加入等待集合等待epoll_wait
- 有sleep操作的協程,加入睡眠集合
- 就緒集合沒有設置優先級,所以在就緒集合里面的協程優先級一樣,那么就可以用隊列來存儲,先進先出
- 等待集合就是等待IO準備就緒,這個等待IO是有時間長短的,這里用紅黑樹來存儲
- 睡眠集合需要按照睡眠時間的長短進行喚醒,所以也用紅黑樹存儲,key為睡眠時長
?
我們描述了每一個協程有自己的上下文環境,需要保存 CPU 的寄存器 ctx;需要有子過程的回調函數 func;需要有子過程回調函數的參數 arg;需要定義自己的棧空stack;需要有自己棧空間的大小 stack_size;需要定義協程的創建時間birth;需要定義協程當前的運行狀態 status;需要定當前運行狀態的結點(ready_next, wait_node, sleep_node);需要定義協程 id;需要定義調度器的全局對象 sched。
typedef struct _nty_coroutine {//cpu ctxnty_cpu_ctx ctx;// funcproc_coroutine func;void *arg;// create timeuint64_t birth;//stackvoid *stack;size_t stack_size;size_t last_stack_size;//statusnty_coroutine_status status;//rootnty_schedule *sched;//co iduint64_t id;//fd eventint fd;uint16_t events;//sleep timeuint64_t sleep_usecs;//setRB_ENTRY(_nty_coroutine) sleep_node;RB_ENTRY(_nty_coroutine) wait_node;TAILQ_ENTRY(_nty_coroutine) ready_node;
} nty_coroutine;
調度器定義
???????????每個協程所需要使用的,而且不同的,就是協程的屬性,那么每個協程所需要的,且相同的,就是調度器的屬性。用來管理所有協程的屬性,作為調度器的屬性。調度器的屬性,需要有保存 CPU 的寄存器上下文 ctx,可以從協程運行狀態yield 到調度器運行的。從協程到調度器用 yield,從調度器到協程用 resume。
typedef struct _nty_schedule {// create timeuint64_t birth;//cpu ctxnty_cpu_ctx ctx;//stack_sizesize_t stack_size;//coroutine numint spawned_coroutines;//default_timeoutuint64_t default_timeout;//當前調度的協程struct _nty_coroutine *curr_thread;//頁大小int page_size;//epoll fdint epfd;//線程通知相關,暫未實現int eventfd;//eventsstruct epoll_event eventlist[NTY_CO_MAX_EVENTS];int num_new_events;//setnty_coroutine_queue ready;nty_coroutine_rbtree_sleep sleeping;nty_coroutine_rbtree_wait waiting;
} nty_schedule;
調度的策略
? ? ? ? 調度器的實現,有兩種方案
? ? ? ? ?1.生產者消費者模式
? ? ? ? ?2.多狀態運行
生產者消費者模式
? ? ? ??
多狀態運行
7.協程api 與 hook
需要封裝為異步的posix api 分析
所有對io的操作,我們都需要取重新封裝一遍。為什么不能用posix api,而是我們需要再去封裝一次呢?比如我們調用recv的時候,如果我們調用系統的,那么這個fd怎么yield到調度器上呢,所以我們需要在posix api的基礎上封裝,當然有些接口需要封裝,有些不需要。
就像下面的偽代碼一樣,從同步的recv變成異步的ney_recv
//偽代碼
ney_recv(){epoll add fd;yield();epoll del fd;recv(fd);
}
???????站在同步封裝成異步的角度,如果不需要判斷io是否就緒的這些api,則不需要封裝為異步的。
需要封裝的api,這些api在實現的時候,皆采用上面偽代碼的策略
1. accept()
2. connect()
3. recv()
4. read()
5. send()
6. write()
7. recvfrom()
8. sendto()
不需要封裝的api,這些api因為不會引起阻塞,所以不用封裝。
socket()
listen()
close()
fcntl()
setsockopt()
getsockopt()
hook
????????hook提供了兩個接口;1. dlsym()是針對系統的,系統原始的api。2. dlopen()是針對第三方的庫。
connect_f = dlsym(RTLD_NEXT, "connect");
#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <dlfcn.h>
#include<mysql/mysql.h>
//
// Created by 68725 on 2022/7/17.
//typedef int (*connect_t)(int, struct sockaddr *, socklen_t);connect_t connect_f;typedef ssize_t (*recv_t)(int, void *buf, size_t, int);recv_t recv_f;typedef ssize_t (*send_t)(int, const void *buf, size_t, int);send_t send_f;typedef ssize_t (*read_t)(int, void *buf, size_t);read_t read_f;typedef ssize_t (*write_t)(int, const void *buf, size_t);write_t write_f;int connect(int fd, struct sockaddr *name, socklen_t len) {printf("in connect\n");return connect_f(fd, name, len);
}ssize_t recv(int fd, void *buf, size_t len, int flags) {printf("in recv\n");return recv_f(fd, buf, len, flags);
}ssize_t send(int fd, const void *buf, size_t len, int flags) {printf("in send\n");return send_f(fd, buf, len, flags);
}
ssize_t read(int fd, void *buf, size_t len) {printf("in read\n");return read_f(fd, buf, len);
}ssize_t write(int fd, const void *buf, size_t len) {printf("in write\n");return write_f(fd, buf, len);
}static int init_hook() {connect_f = dlsym(RTLD_NEXT, "connect");recv_f = dlsym(RTLD_NEXT, "recv");send_f = dlsym(RTLD_NEXT, "send");read_f = dlsym(RTLD_NEXT, "read");write_f = dlsym(RTLD_NEXT, "write");
}void main() {init_hook();MYSQL *m_mysql = mysql_init(NULL);if (!m_mysql) {printf("mysql_init failed\n");return;}if (!mysql_real_connect(m_mysql, "192.168.109.1", "root", "123456", "cdb", 3306, NULL, 0)) {printf("mysql_real_connect failed\n");return;}else {printf("mysql_real_connect success\n");}
}
//gcc -o hook hook.c -lmysqlclient -I /usr/include/mysql/ -ldl
8.多核模式
解決協程多核的問題有兩種方式,多線程/多進程 與 CPU核心做親和性。
- 多進程(實現起來容易,對協程代碼本身不用去改)
- 多線程(復雜,需要對調度器進行加鎖)
那么做多線程對調度器進行加鎖,鎖放在哪呢?鎖放在調度器結構體里面,因為調度器是全局唯一的,那么要鎖哪里呢,很明顯,<取協程,恢復協程>,這里需要加鎖。
?