2019獨角獸企業重金招聘Python工程師標準>>>
????協程,又被稱為用戶級線程,是在應用層被調度,可以減少因為調用系統調用而阻塞的線程切換的時間.目前有很多協程的實現,由于微信內部大量使用了其直研的的libco協程庫,所以我選擇了騰訊開源的libco協程庫進行研究,學習協程的基本思想.
1,基本原理
? ? 協程實質上可以看成是子程序、函數。一個線程上面可以運行多個協程,但是同一時間只能運行一個協程,協程在線程上的切換,是由于遇到阻塞的操作,或者主動讓出線程使用權。比如,有10個協程,當前線程正在運行協程1,然后協程1執行一個recv的阻塞操作,協程的調度器能夠檢測到這個操作,將協程1切換出去,將協程2調度進來執行。如果沒有協程的調度器,此時協程1將會由于調用recv這個系統調用且數據未到達而阻塞,進行休眠,此時操作系統將會發生線程切換,調度其他線程執行,而線程切換非常耗時,高達幾十微秒(同事測試是20us),即便新執行的線程是用戶任務相關的,用戶任務也會多了幾十微秒的線程切換的消耗。而如果使用協程,協程之間的切換只需要幾百納秒(同事測試為0.35us,即350納秒),耗時很少。這就是協程發揮優勢的地方。
? ? 下面講解libco的源碼部分,有一篇文章:C++開源協程庫libco-原理與應用.pdf,非常深入的講解了libco的原理,而且不枯燥,十分推薦讀者先看看這篇文章。
? ? 由于libco是非對稱的協程機制,如果從當前協程A切換到協程B,而協程B又沒有切換到下一個協程,在協程B執行結束之后,會返回到協程A執行。
2,libco基本框架
? ? libco中的基本框架如下(引自C/C++協程庫libco:微信怎樣漂亮地完成異步化改造):
協程接口層實現了協程的基本源語。co_create、co_resume等簡單接口負責協程創建于恢復。co_cond_signal類接口可以在協程間創建一個協程信號量,可用于協程間的同步通信。
系統函數Hook層負責主要負責系統中同步API到異步執行的轉換。對于常用的同步網絡接口,Hook層會把本次網絡請求注冊為異步事件,然后等待事件驅動層的喚醒執行。
事件驅動層實現了一個簡單高效的異步網路框架,里面包含了異步網絡框架所需要的事件與超時回調。對于來源于同步系統函數Hook層的請求,事件注冊與回調實質上是協程的讓出與恢復執行。
本文通過講解接口層的幾個主要函數,使讀者對libco協程的框架和原理有一個大概的認識,下一篇文章將會講解libco如何處理事件循環等。
下面我們從幾個主要的協程函數一一分析。
3,主要函數源碼解析
- co_create?????首先來開一下協程創建的函數,源碼如下:
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{if( !co_get_curr_thread_env() ) {co_init_curr_thread_env();}stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );*ppco = co;return 0;
}
void co_init_curr_thread_env()
{pid_t pid = GetPid(); g_arrCoEnvPerThread[ pid ] = (stCoRoutineEnv_t*)calloc( 1,sizeof(stCoRoutineEnv_t) );stCoRoutineEnv_t *env = g_arrCoEnvPerThread[ pid ];env->iCallStackSize = 0;struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );self->cIsMain = 1;env->pending_co = NULL;env->occupy_co = NULL;coctx_init( &self->ctx );env->pCallStack[ env->iCallStackSize++ ] = self;stCoEpoll_t *ev = AllocEpoll();SetEpoll( env,ev );
}
????? ? co_create()的第一行判斷是當前線程初始化環境變量的判斷,如果沒進行環境初始化,那么調用co_init_curr_thread_env() 進行環境初始化,會生成當前環境g_arrCoEnvPerThread[ GetPid() ]的第一個協程 env->pCallStack,其?cIsMain 標志位 1,iCallStackSize表示協程層數,目前只有1層,AllocEpoll()函數中初始化當前環境env的 pstActiveList,pstTimeoutList 這兩個列表,這兩個列表分別記錄了活動協程和超時協程。環境初始化操作在一個線程中只會進行一次。在初始化完成之后,會調用co_create_env()創建一個新的協程,新協程的結構體中的env這個域始終指向當前協程環境g_arrCoEnvPerThread[ GetPid() ]。新協程創建之后,并沒有做什么操作。
- co_resume
co_resume()函數是切換協程的函數,也可以稱為是啟動協程的函數。co_resume()函數的第一行是獲取當前線程的協程環境env,第二行獲取當前正在執行的協程,也即馬上要被切換出去的協程。接下來判斷待切換的協程co是否已經被切換過,如果沒有,那么為co準備上下文,cStart字段設置為1。這里為co準備的上下文,就是在coctx_make()函數里面,這個函數將函數指針CoRoutineFunc賦值給co->ctx的reg[0],將來上下文切換的時候,就能切換到reg[0]所指向的地址去執行.準備好co的上下文之后,然后將待切換的協程co入棧,置于協程環境env的協程棧的頂端,表明當前最新的協程是co。注意,這并不是說協程棧中只有棧頂才是co,可能棧中某些位置也存了co。最后,調用co_swap(),該函數將協程上下文環境切換為co的上下文環境,并進入co指定的函數內執行,之前被切換出去的協程被掛起,直到co主動yield,讓出cpu,才會恢復被切換出去的協程執行.注意,這里的所有的協程都是在當前協程執行的,也就是說,所有的協程都是串行執行的,調用co_resume()之后,執行上下文就跳到co的代碼空間中去了。因為co_swap()要等co主動讓出cpu才會返回,而co的協程內部可能會resume新的協程繼續執行下去,所以co_swap()函數調用可能要等到很長時間才能返回。void co_resume( stCoRoutine_t *co ) {stCoRoutineEnv_t *env = co->env;stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];if( !co->cStart ){coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );co->cStart = 1;}env->pCallStack[ env->iCallStackSize++ ] = co;co_swap( lpCurrRoutine, co ); }
在co_swap()函數代碼中,由于libco不是共享棧的模式,即pending_co->cIsShareStack為0,所以執行了if分支,接下來執行coctx_swap(),這是一段匯編源碼,內容就是從curr的上下文跳轉到pending_co的上下文中執行,通過回調CoRoutineFunc()函數實現,此時當前線程的cpu已經開始執行pending_co協程中的代碼,直到pending_co主動讓出cpu,才接著執行coctx_swap()下面的代碼,由于update_occupy_co為NULL,下面的if語句沒有執行,所以相當于coctx_swap()下面沒有代碼,直接返回到curr協程中.void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co) {stCoRoutineEnv_t* env = co_get_curr_thread_env();//get curr stack spchar c;curr->stack_sp= &c;if (!pending_co->cIsShareStack){env->pending_co = NULL;env->occupy_co = NULL;}else {env->pending_co = pending_co;//get last occupy co on the same stack memstCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;//set pending co to occupy thest stack mem;pending_co->stack_mem->occupy_co = pending_co;env->occupy_co = occupy_co;if (occupy_co && occupy_co != pending_co){save_stack_buffer(occupy_co);}}//swap contextcoctx_swap(&(curr->ctx),&(pending_co->ctx) );//stack buffer may be overwrite, so get again;stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();stCoRoutine_t* update_occupy_co = curr_env->occupy_co;stCoRoutine_t* update_pending_co = curr_env->pending_co;if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co){//resume stack bufferif (update_pending_co->save_buffer && update_pending_co->save_size > 0){memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);}} }
- co_yield
co_yield()與co_yield_ct()的功能是一樣的,都是使得當前協程讓出cpu.
co_yield_env()函數中的第二行獲取當前執行的協程,也即當前協程環境的協程棧的棧頂,函數的第一行獲取協程棧的次頂,也即上一次被切換的協程last,從這里也可以看出,libco的協程讓出cpu,只能讓給上一次被切換出去的協程.最后一行是co_swap()函數,前面講到,該函數會進入last協程的上下文去執行代碼,也就是回到上次co_resume()函數內部的co_swap()的地方,繼續往下走.void co_yield_env( stCoRoutineEnv_t *env ) {stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];env->iCallStackSize--;co_swap( curr, last); }
當協程正常結束的時候,會繼續執行CoRoutineFunc()函數,將協程的cEnd設置為1,表示已經結束,并執行一次co_yield_env(),讓出cpu,切換回上一次被讓出的協程繼續執行.
這里有一點我之前不太理解,懷疑會發生棧溢出的地方,那就是在調用co_yield_env(),進入co_swap()之后,調用coctx_swap(),切換到上一次的last協程的上下文,那么當前協程的co_swap()函數里面的變量,都是在棧空間上面的,切換到last協程的上下文之后,那些變量依然在棧空間上面,不會被銷毀,直到回到了main函數的協程,還是沒有被銷毀。其實這是個誤區,這些變量其實不是在棧空間上面,而是在CPU的通用寄存器里面,當調用coctx_swap()之后,這些寄存器變量就會保存到當前協程的棧空間中去,其實是我們之前co_create()函數malloc出來的一片堆空間。這是因為cpu的工作寄存器數量較多,而局部變量較少,而co_swap()函數的變量都是局部變量,直接存放在cpu的工作寄存器中,而coctx_swap()的作用就是將CPU的各個通用寄存器保存到coctx_t結構的regs[1] ~ regs[6]的位置,然后將last協程的coctx_t結構的regs[1]~regs[6]的內容加載到當前的通用寄存器中,并將執行cpu的執行順序切換到last協程中去執行。 - co_release
co_release()的功能比較簡單,就是釋放資源void co_release( stCoRoutine_t *co ) {if( co->cEnd ){free( co );} }
- co_self
co_self()函數是獲取當前正在執行的協程,只要獲取到當前協程環境的線程棧頂的協程即可。stCoRoutine_t *co_self() {return GetCurrThreadCo(); } stCoRoutine_t *GetCurrThreadCo( ) {stCoRoutineEnv_t *env = co_get_curr_thread_env();if( !env ) return 0;return GetCurrCo(env); } stCoRoutine_t *GetCurrCo( stCoRoutineEnv_t *env ) {return env->pCallStack[ env->iCallStackSize - 1 ]; }
- co_enable_hook_sys
libco封裝了系統調用,在系統調用,比如send/recv/condition_wait等函數前面加了一層hook,有了這層hook就可以在系統調用的時候不讓線程阻塞而產生線程切換,co_enable_hook_sys()函數允許協程hook,當然也可以不允許hook,直接使用原生的系統調用。void co_enable_hook_sys() {stCoRoutine_t *co = GetCurrThreadCo();if( co ){co->cEnableSysHook = 1;} }
?