skynet源碼閱讀5--協程調度模型

注:為方便理解,本文貼出的代碼部分經過了縮減或展開,與實際skynet代碼可能會有所出入。
? ? 作為一個skynet actor,在啟動腳本被加載的過程中,總是要調用skynet.start和skynet.dispatch的,前者在skynet-os中做一些初始化工作,設置消息的Lua回調,后者則注冊針對某協議的解析回調。舉個例子:

 1 local skynet = require "skynet"
 2 
 3 local function hello()
 4     skynet.ret(skynet.pack("Hello, World!"))
 5     print("hello OK")
 6 end
 7 
 8 skynet.start(function()
 9     skynet.dispatch("lua", function(session, address, cmd, ...)
10         assert(cmd == "hello")
11         hello()
12     end)
13 end)

? ? 先是調用skynet.start注冊初始化回調,在其中調用skynet.dispatch注冊針對"lua"協議的解析回調。skynet的基本使用這里我們就不多說了,具體見官方文檔。下面我們就從skynet.start(見skynet.lua)開始,逐一分析流程。

1 function skynet.start(start_func)
2     c.callback(skynet.dispatch_message)
3     skynet.timeout(0, function()
4         skynet.init_service(start_func)
5     end)
6 end

這里的c來自于require "skynet.core",它是在lua-skynet.c中注冊的,如下:?

 1 int luaopen_skynet_core(lua_State *L) {
 2     luaL_checkversion(L);
 3 
 4     luaL_Reg l[] = {
 5         ...
 6         { "callback", _callback },
 7         { NULL, NULL },
 8     };
 9 
10     luaL_newlibtable(L, l);
11 
12     lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
13     struct skynet_context *ctx = lua_touserdata(L,-1);
14     if (ctx == NULL) {
15         return luaL_error(L, "Init skynet context first");
16     }
17 
18     luaL_setfuncs(L,l,1);
19 
20     return 1;
21 }

?? ? 可以看到,它注冊了幾個函數,并將skynet_context實例作為各函數的upvalue,方便調用時獲取。skynet.start中調用c.callback,對應的就是lua-skynet.c中的_callback函數,skynet.dispatch_message回調就是它的參數:

 1 static int _callback(lua_State *L) {
 2     struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
 3     int forward = lua_toboolean(L, 2);
 4     luaL_checktype(L,1,LUA_TFUNCTION);
 5     lua_settop(L,1);
 6     lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
 7 
 8     lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
 9     lua_State *gL = lua_tothread(L,-1);
10 
11     if (forward) {
12         skynet_callback(context, gL, forward_cb);
13     } else {
14         skynet_callback(context, gL, _cb);
15     }
16 
17     return 0;
18 }

? ? 可以看到,其以函數_cb為key,LUA回調(skynet.dispatch_message)作為value被注冊到全局注冊表中。skynet_callback(在skynet_server.c中)則設置函數指針_cb為C層面的消息處理函數:

1 void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
2     context->cb = cb;
3     context->cb_ud = ud;
4 }

? ? 先不關注skynet-os內部的線程調度細節,只需要知道,skynet-context接收到消息后會轉發給context->cb處理,也就是_cb函數。在_cb中,從全局表中取到關聯的LUA回調,將type, msg, sz, session, source壓棧調用:

 1 static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
 2     lua_State *L = ud;
 3     int trace = 1;
 4     int r;
 5     int top = lua_gettop(L);
 6     if (top == 0) {
 7         lua_pushcfunction(L, traceback);
 8         lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
 9     } else {
10         assert(top == 2);
11     }
12     lua_pushvalue(L,2);
13 
14     lua_pushinteger(L, type);
15     lua_pushlightuserdata(L, (void *)msg);
16     lua_pushinteger(L,sz);
17     lua_pushinteger(L, session);
18     lua_pushinteger(L, source);
19 
20     r = lua_pcall(L, 5, 0 , trace);
21 
22     if (r == LUA_OK) {
23         return 0;
24     }
25 }

? ? 此時調用流程正式轉到skynet.lua中的skynet.dispatch_message:?

1 function skynet.dispatch_message(...)
2     local succ, err = pcall(raw_dispatch_message,...)
3     while true do
4         local key,co = next(fork_queue)
5         fork_queue[key] = nil
6         pcall(suspend,co,coroutine_resume(co))
7     end
8 end

?? ?首先是將msg交由raw_dispatch_message作分發,然后開始處理fork_queue中緩存的fork協程:

? ? ? ? pcall(suspend, co, coroutine_resume(co))

? ? 這行代碼是我們今天關注的重點。在繼續之前,我假設你對lua的協程有一定的了解,了解coroutine.resume,coroutine.yield的基本用法。coroutine就是lua里的線程,它擁有自己的函數棧,但與我們平常接觸的大多數操作系統里的線程不同,是非搶占式的。skynet對lua的coroutine作了封裝(詳見lua-profile.c),主要是增加了starttime和totaltime的監測,最終還是交由lua的coroutine庫來處理的。既然這里分析到了fork_queue,那我們就先以skynet.fork為例,看看它作了什么:

1 function skynet.fork(func,...)
2     local args = table.pack(...)
3     local co = co_create(function()
4         func(table.unpack(args,1,args.n))
5     end)
6     table.insert(fork_queue, co)
7     return co
8 end

? ? skynet.fork做的事情很簡單,通過co_create創建一個coroutine并將其入隊fork_queue。看看co_create是如何創建協程的:

 1 local function co_create(f)
 2     local co = table.remove(coroutine_pool)
 3     if co == nil then
 4         co = coroutine.create(function(...)
 5             f(...)
 6             while true do
 7                 f = nil
 8                 coroutine_pool[#coroutine_pool+1] = co
 9                 f = coroutine_yield "EXIT"
10                 f(coroutine_yield())
11             end
12         end)
13     else
14         coroutine_resume(co, f)
15     end
16     return co
17 end

? ? 調用co_create時,如果coroutine_pool為空,它會創建一個新的co。co在第一次被resume時,會執行f,接著便進入一個使用和回收的無限循環。在這個循環中,先是收回co到coroutine_pool中,接著便yield "EXIT"到上一次的resume點A。當下一次被resume在點B喚醒時,會先將函數f傳遞過來,接著再次yield到點B,等待下一次在點D被resume喚醒時,傳遞需要的參數過來加以執行,完畢后回收,如此反復。這樣看來,co的執行似乎相當簡單。但是實際上要復雜一些,因為在執行f的過程中,可以再反復地yield和resume。下面我們舉個簡單的例子:?

1     skynet.fork(function()
2         print ("skynet fork: <1>")
3         skynet.sleep(10)
4         print ("skynet fork: <2>")
5     end)

?? ? 我們把skynet.sleep展開:?

1     skynet.fork(function()
2         print ("skynet fork: <1>")
3         coroutine_yield("SLEEP", COMMAND_TIMEOUT(10))
4         print ("skynet fork: <2>")
5     end)

?? ? 下面開始分析調用流程。fork-co入隊,主co在skynet.dispatch_message中分發消息后取出fork-co,調用resume開始進入fork-co的函數f執行,如下圖所示,如果fork-co是第一次執行,是走圈1,如果是復用,則走圈1*(如果是復用的話,調用co_create時,會先coroutine_resume(co,f)一次進入fork-co,將用戶函數傳遞給while循環中的coroutine_yield "EXIT"點之后,接著fork-co再次yield讓出,等待實際傳參的調用)。接著進入用戶函數,COMMAND_TIMEOUT會先向skynet-kernal發送TIMEOUT命令,如圈2所示。然后yield "SLEEP"到主co的resume點1之后繼續執行,如圈3所示,按圈4的指向,調用suspend進入"SLEEP"分支,記錄下TIMEOUT-session與fork-co的映射關系。此時主co回到skynet.dispatch_message中繼續下一個fork-co的處理。當TIMEOUT消息回來時,會由主co再次進入skynet.dispatch_message并調用raw_dispatch_message分發,這時通過session拿到之前映射的fork-co,再次resume,按照圈5的指向,會跳轉到fork-co的yield "SLEEP"點之后繼續向下處理。用戶函數處理完畢后,回到上層調用,即圈6所指,回收fork-co,接著yield "EXIT"到主co所在raw_dispatch_message中的resume點之后,如圈7所示。進入suspend后,無額外命令,raw_dispatch_message處理結束,繼續主co的消息處理流程。

? ? 由以上分析可以看到,實際的協程跳轉過程是比較復雜的,也更顯得小小的LUA在skynet中的精巧運用。為方便理解,順便貼出suspend的代碼(只列出了我們關注的幾個命令,并做了刪減):?

 1 function suspend(co, result, command, param, size)
 2     if command == "CALL" then
 3         session_id_coroutine[param] = co
 4     elseif command == "SLEEP" then
 5         session_id_coroutine[param] = co
 6         sleep_session[co] = param
 7     elseif command == "RETURN" then
 8         local co_session = session_coroutine_id[co]
 9         local co_address = session_coroutine_address[co]
10         session_response[co] = true
11         c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size)
12         return suspend(co, coroutine_resume(co, ret))
13     elseif command == "EXIT" then
14         -- coroutine exit
15         local address = session_coroutine_address[co]
16         session_coroutine_id[co] = nil
17         session_coroutine_address[co] = nil
18         session_response[co] = nil
19     elseif command == nil then
20         -- debug trace
21         return
22     end
23 end

? ? 看完skynet.fork,我們再回過頭來,看一看skynet.dispatch_message中消息分發raw_dispatch_message的具體細節:

 1 local function raw_dispatch_message(prototype, msg, sz, session, source)
 2     -- skynet.PTYPE_RESPONSE = 1, read skynet.h
 3     if prototype == 1 then
 4         local co = session_id_coroutine[session]
 5         session_id_coroutine[session] = nil
 6         suspend(co, coroutine_resume(co, true, msg, sz))
 7     else
 8         local p = proto[prototype]
 9         local f = p.dispatch
10         local co = co_create(f)
11         session_coroutine_id[co] = session
12         session_coroutine_address[co] = source
13         suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
14     end
15 end

? ? RESPONSE的處理先就不說了,在敘述skynet.sleep時已經有所討論。消息會根據它的prototype查找proto,接著調用co_create取得協程user-co,將message解包后resume到user-co進入用戶函數f。而后續的流程則與上文我們討論的fork是一樣的了。比如,在f中調用skynet.call時,會向目標發送消息,接著會yield到主co,回到這里的resume點,接著進入suspend的"CALL"分支,記錄session與user-co的映射關系。下一次response消息回來時,會查找到user-co并resume喚醒,在skynet.call后繼續執行,用戶函數f結束后進入上層調用,回收user-co,等待新的調用。

? ? 因為本篇我們關注的是協程調度模型,而非具體的處理細節,因此有空再對skynet.call,skynet.ret等作詳細的細節分析。

轉載于:https://www.cnblogs.com/Jackie-Snow/p/6687651.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/282390.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/282390.shtml
英文地址,請注明出處:http://en.pswp.cn/news/282390.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ASP.NET Core GRPC 和 Dubbo 互通

一.前言Dubbo 是比較流行的服務治理框架&#xff0c;國內不少大廠都在使用。以前的 Dubbo 使用的是私有協議&#xff0c;采集用的 hessian 序列化&#xff0c;對于多語言生態來說是極度的不友好。現在 Dubbo 發布了新版本 v3&#xff0c;推出了基于 gRPC 的新協議 Triple&#…

詳解C# 迭代器

[引用&#xff1a;https://www.cnblogs.com/yangecnu/archive/2012/03/17/2402432.html] 迭代器模式是設計模式中行為模式(behavioral pattern)的一個例子&#xff0c;他是一種簡化對象間通訊的模式&#xff0c;也是一種非常容易理解和使用的模式。簡單來說&#xff0c;迭代器模…

利用redis List隊列簡單實現秒殺 PHP代碼實現

一 生產者producer部分 --------------------------------producer 部分注釋------------------------------------------------------------ 用戶在頁面請求之后, 獲取到用戶uid , 跳轉到這個加入隊列的方法 (這里直接在producer中模擬了多個uid) 在方法內部判斷redis隊列長…

使用Filezilla 與 linux遠程服務器傳輸文件時,設置默認打開編輯器

1. 點擊編輯 2. 選擇設置&#xff0c;點擊文本編輯 3. 設置編輯器目錄 4. 確定作用&#xff1a; 這樣設置之后&#xff0c;可以實現在遠程站點欄直接下載并使用phpstorm編輯的作用 正常需要下載之后&#xff0c;再去本地相應下載目錄打開&#xff0c;然后再進行上傳文件&#x…

SDOI2017 新生舞會

01規劃 a1a2a3...ai/b1b2b2..bi最大 設一個k 使得 a1a2a3...ai/b1b2b3...bi>k 變換式子得到 a1a2a3...ai>(b1b2b3..bi)*k a1-b1*ka2-b2*ka3-b3*k...ai-bi*k>0 ai-bi*k即流量 最大費用流二分答案 來&#xff0c;上代碼&#xff1a; #include <cmath> #include &l…

在 .NET 中使用 FluentValidation 進行參數驗證

不用說&#xff0c;參數驗證很重要&#xff0c;無效的參數&#xff0c;可能會導致程序的異常。如果使用Web API或MVC頁面&#xff0c;那么可能習慣了自帶的規則驗證&#xff0c;我們的控制器很干凈&#xff1a;public class User {[Required]public string FirstName { get; se…

在win10系統下怎樣快速切換任務視圖

2019獨角獸企業重金招聘Python工程師標準>>> 切換窗口&#xff1a;Alt Tab 任務視圖&#xff1a;Win Tab (松開鍵盤界面不會消失) 切換任務視圖&#xff1a;Win Ctrl 左/右 創建新的虛擬桌面&#xff1a;Win Ctrl D 關閉當前虛擬桌面&#xff1a;Win Ctrl F4…

uwp應用在debug模式下運行正常,編譯為release版本的時候拋出異常

原因是在代碼中使用了dynamic關鍵字&#xff0c;導致release時.net native優化了代碼造成元數據丟失 所以在代碼中要盡量不用dynamic。轉載于:https://www.cnblogs.com/poison/p/7532142.html

Linux上搭建Samba,實現windows與Linux文件數據同步

一 環境介紹 1. 本地win10 2. Linux (centos7.4) 注&#xff1a;因為運營商方面禁止smb協議&#xff0c;導致無法在云服務器上使用smb&#xff0c;如果不是在虛擬機上操作&#xff0c;而是在云服務器上操作&#xff0c;建議還是使用 filezillaxshell組合 或者 使用finalshell等…

A5-1和DES兩個加密算法的學習

A5-1加密算法 1、基本原理 A5-1加密算法是一種流password&#xff0c;通過密鑰流對明文進行加密。然后用密鑰流進行對密文的解密操作。 這樣的算法主要用于GSM加密。也就是我們平時打電話的時候。通信數據發送到基站&#xff0c;基站發送到還有一個基站&#xff0c;基站發送到接…

從0到1簡易區塊鏈開發手冊V0.3-數據持久化與創世區塊

Author: brucefeng Email: brucefengbrucefeng.com 編程語言:Golang 1.BoltDB簡介 Bolt是一個純粹Key/Value模型的程序。該項目的目標是為不需要完整數據庫服務器&#xff08;如Postgres或MySQL&#xff09;的項目提供一個簡單&#xff0c;快速&#xff0c;可靠的數據庫。 Bolt…

ELK之elasticsearch5.6的安裝和head插件的安裝

這里選擇的elasticsearch為5.6的新版本&#xff0c;根據官方文檔有幾種暗裝方式&#xff1a; https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html 這里選擇rpm包安裝https://www.elastic.co/guide/en/elasticsearch/reference/curre…

Nginx 基礎(一)

一 、Nginx簡述 Nginx是一個開源、高性能、可靠的HTTP中間件、代理服務。二 、常見的HTTP服務 1. HTTPD-Apache基金會 2. IIS-微軟 3. GWS-Google 4. Nginx三、為什么選擇Nginx 原因一&#xff1a;IO多路復用epoll &#xff08;主要解決了并發性的問題&#xff09; 注1&#xf…

Ajax基本案例詳解之load的實現

Ajax的load實現&#xff1a; 看這篇之前建議大家去看看前面兩篇文章&#xff1a; 1.Ajax基本案例詳解之$.ajax的實現 2.Ajax基本案例詳解之$.get的實現 現在寫一下$.load()里面的主要內容&#xff1a; $("#semail").load("doindex.jsp","email1&q…

ASP.NET Core高性能服務器HTTP.SYS

如果我們只需要將ASP.NET CORE應用部署到Windows環境下&#xff0c;并且希望獲得更好的性能&#xff0c;那么我們選擇的服務器類型應該是HTTP.SYS。Windows環境下任何針對HTTP的網絡監聽器/服務器在性能上都無法與HTTP.SYS比肩。[本文節選《ASP.NET Core 6框架揭秘》第18章]一、…

神經網絡- receptive field

記錄一下感受野的理解&#xff1a; 在神經網絡中&#xff0c;感受野的定義是&#xff1a; 神經網絡的每一層輸出的特征圖&#xff08;Feature ap&#xff09;上的像素點在原圖像上映射的區域大小。 1. 神經網絡中&#xff0c;第一個卷積層的 感受野大小&#xff0c;就等于filt…

734. [網絡流24題] 方格取數問題 二分圖點權最大獨立集/最小割/最大流

問題描述&#xff1a;在一個有m*n 個方格的棋盤中&#xff0c;每個方格中有一個正整數。現要從方格中取數&#xff0c;使任意2 個數所在方格沒有公共邊&#xff0c;且取出的數的總和最大。試設計一個滿足要求的取數算法。編程任務&#xff1a;對于給定的方格棋盤&#xff0c;按…

Nginx 基礎 ( 二)

一、HTTP請求 http請求包括客戶端請求服務端 以及 服務端響應數據回客戶端&#xff0c;如下 請求&#xff1a;包括請求行、請求頭部、請求數據 響應&#xff1a;包括狀態行、消息報頭、響應正文 比如在Linux中curl請求網站獲取請求信息和響應信息 curl -v http://www.kugou.com…

《金融行業應用解決方案白皮書》發布,金融自主創新未來可期!

日前&#xff0c;以“聚勢賦能 行業共創”為主題的金融行業解決方案發布會在線上舉行。麒麟軟件發布《金融行業應用解決方案白皮書》&#xff0c;并發起成立“金融機具生態圈俱樂部”&#xff0c;助力金融行業用戶高質量發展。金融信息系統曾經被國外廠商壟斷金融信息系統作為國…

leetcode53 Maximum Subarray 最大連續子數組

題目要求 Find the contiguous subarray within an array (containing at least one number) which has the largest sum.For example, given the array [-2,1,-3,4,-1,2,1,-5,4], the contiguous subarray [4,-1,2,1] has the largest sum 6.即&#xff1a;尋找數列中的一個子…