日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

skynet源码阅读5--协程调度模型

發(fā)布時(shí)間:2023/12/9 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 skynet源码阅读5--协程调度模型 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

注:為方便理解,本文貼出的代碼部分經(jīng)過(guò)了縮減或展開(kāi),與實(shí)際skynet代碼可能會(huì)有所出入。
? ? 作為一個(gè)skynet actor,在啟動(dòng)腳本被加載的過(guò)程中,總是要調(diào)用skynet.start和skynet.dispatch的,前者在skynet-os中做一些初始化工作,設(shè)置消息的Lua回調(diào),后者則注冊(cè)針對(duì)某協(xié)議的解析回調(diào)。舉個(gè)例子:

1 local skynet = require "skynet" 2 3 local function hello() 4 skynet.ret(skynet.pack("Hello, World!")) 5 print("hello OK") 6 end 7 8 skynet.start(function() 9 skynet.dispatch("lua", function(session, address, cmd, ...) 10 assert(cmd == "hello") 11 hello() 12 end) 13 end)

? ? 先是調(diào)用skynet.start注冊(cè)初始化回調(diào),在其中調(diào)用skynet.dispatch注冊(cè)針對(duì)"lua"協(xié)議的解析回調(diào)。skynet的基本使用這里我們就不多說(shuō)了,具體見(jiàn)官方文檔。下面我們就從skynet.start(見(jiàn)skynet.lua)開(kāi)始,逐一分析流程。

1 function skynet.start(start_func) 2 c.callback(skynet.dispatch_message) 3 skynet.timeout(0, function() 4 skynet.init_service(start_func) 5 end) 6 end

這里的c來(lái)自于require "skynet.core",它是在lua-skynet.c中注冊(cè)的,如下:?

1 int luaopen_skynet_core(lua_State *L) { 2 luaL_checkversion(L); 3 4 luaL_Reg l[] = { 5 ... 6 { "callback", _callback }, 7 { NULL, NULL }, 8 }; 9 10 luaL_newlibtable(L, l); 11 12 lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context"); 13 struct skynet_context *ctx = lua_touserdata(L,-1); 14 if (ctx == NULL) { 15 return luaL_error(L, "Init skynet context first"); 16 } 17 18 luaL_setfuncs(L,l,1); 19 20 return 1; 21 }

?? ? 可以看到,它注冊(cè)了幾個(gè)函數(shù),并將skynet_context實(shí)例作為各函數(shù)的upvalue,方便調(diào)用時(shí)獲取。skynet.start中調(diào)用c.callback,對(duì)應(yīng)的就是lua-skynet.c中的_callback函數(shù),skynet.dispatch_message回調(diào)就是它的參數(shù):

1 static int _callback(lua_State *L) { 2 struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1)); 3 int forward = lua_toboolean(L, 2); 4 luaL_checktype(L,1,LUA_TFUNCTION); 5 lua_settop(L,1); 6 lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); 7 8 lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD); 9 lua_State *gL = lua_tothread(L,-1); 10 11 if (forward) { 12 skynet_callback(context, gL, forward_cb); 13 } else { 14 skynet_callback(context, gL, _cb); 15 } 16 17 return 0; 18 }

? ? 可以看到,其以函數(shù)_cb為key,LUA回調(diào)(skynet.dispatch_message)作為value被注冊(cè)到全局注冊(cè)表中。skynet_callback(在skynet_server.c中)則設(shè)置函數(shù)指針_cb為C層面的消息處理函數(shù):

1 void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) { 2 context->cb = cb; 3 context->cb_ud = ud; 4 }

? ? 先不關(guān)注skynet-os內(nèi)部的線程調(diào)度細(xì)節(jié),只需要知道,skynet-context接收到消息后會(huì)轉(zhuǎn)發(fā)給context->cb處理,也就是_cb函數(shù)。在_cb中,從全局表中取到關(guān)聯(lián)的LUA回調(diào),將type, msg, sz, session, source壓棧調(diào)用:

1 static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) { 2 lua_State *L = ud; 3 int trace = 1; 4 int r; 5 int top = lua_gettop(L); 6 if (top == 0) { 7 lua_pushcfunction(L, traceback); 8 lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); 9 } else { 10 assert(top == 2); 11 } 12 lua_pushvalue(L,2); 13 14 lua_pushinteger(L, type); 15 lua_pushlightuserdata(L, (void *)msg); 16 lua_pushinteger(L,sz); 17 lua_pushinteger(L, session); 18 lua_pushinteger(L, source); 19 20 r = lua_pcall(L, 5, 0 , trace); 21 22 if (r == LUA_OK) { 23 return 0; 24 } 25 }

? ? 此時(shí)調(diào)用流程正式轉(zhuǎn)到skynet.lua中的skynet.dispatch_message:?

1 function skynet.dispatch_message(...) 2 local succ, err = pcall(raw_dispatch_message,...) 3 while true do 4 local key,co = next(fork_queue) 5 fork_queue[key] = nil 6 pcall(suspend,co,coroutine_resume(co)) 7 end 8 end

?? ?首先是將msg交由raw_dispatch_message作分發(fā),然后開(kāi)始處理fork_queue中緩存的fork協(xié)程:

? ? ? ? pcall(suspend, co, coroutine_resume(co))

? ? 這行代碼是我們今天關(guān)注的重點(diǎn)。在繼續(xù)之前,我假設(shè)你對(duì)lua的協(xié)程有一定的了解,了解coroutine.resume,coroutine.yield的基本用法。coroutine就是lua里的線程,它擁有自己的函數(shù)棧,但與我們平常接觸的大多數(shù)操作系統(tǒng)里的線程不同,是非搶占式的。skynet對(duì)lua的coroutine作了封裝(詳見(jiàn)lua-profile.c),主要是增加了starttime和totaltime的監(jiān)測(cè),最終還是交由lua的coroutine庫(kù)來(lái)處理的。既然這里分析到了fork_queue,那我們就先以skynet.fork為例,看看它作了什么:

1 function skynet.fork(func,...) 2 local args = table.pack(...) 3 local co = co_create(function() 4 func(table.unpack(args,1,args.n)) 5 end) 6 table.insert(fork_queue, co) 7 return co 8 end

? ? skynet.fork做的事情很簡(jiǎn)單,通過(guò)co_create創(chuàng)建一個(gè)coroutine并將其入隊(duì)fork_queue。看看co_create是如何創(chuàng)建協(xié)程的:

1 local function co_create(f) 2 local co = table.remove(coroutine_pool) 3 if co == nil then 4 co = coroutine.create(function(...) 5 f(...) 6 while true do 7 f = nil 8 coroutine_pool[#coroutine_pool+1] = co 9 f = coroutine_yield "EXIT" 10 f(coroutine_yield()) 11 end 12 end) 13 else 14 coroutine_resume(co, f) 15 end 16 return co 17 end

? ? 調(diào)用co_create時(shí),如果coroutine_pool為空,它會(huì)創(chuàng)建一個(gè)新的co。co在第一次被resume時(shí),會(huì)執(zhí)行f,接著便進(jìn)入一個(gè)使用和回收的無(wú)限循環(huán)。在這個(gè)循環(huán)中,先是收回co到coroutine_pool中,接著便yield "EXIT"到上一次的resume點(diǎn)A。當(dāng)下一次被resume在點(diǎn)B喚醒時(shí),會(huì)先將函數(shù)f傳遞過(guò)來(lái),接著再次yield到點(diǎn)B,等待下一次在點(diǎn)D被resume喚醒時(shí),傳遞需要的參數(shù)過(guò)來(lái)加以執(zhí)行,完畢后回收,如此反復(fù)。這樣看來(lái),co的執(zhí)行似乎相當(dāng)簡(jiǎn)單。但是實(shí)際上要復(fù)雜一些,因?yàn)樵趫?zhí)行f的過(guò)程中,可以再反復(fù)地yield和resume。下面我們舉個(gè)簡(jiǎn)單的例子:?

1 skynet.fork(function() 2 print ("skynet fork: <1>") 3 skynet.sleep(10) 4 print ("skynet fork: <2>") 5 end)

?? ? 我們把skynet.sleep展開(kāi):?

1 skynet.fork(function() 2 print ("skynet fork: <1>") 3 coroutine_yield("SLEEP", COMMAND_TIMEOUT(10)) 4 print ("skynet fork: <2>") 5 end)

?? ? 下面開(kāi)始分析調(diào)用流程。fork-co入隊(duì),主co在skynet.dispatch_message中分發(fā)消息后取出fork-co,調(diào)用resume開(kāi)始進(jìn)入fork-co的函數(shù)f執(zhí)行,如下圖所示,如果fork-co是第一次執(zhí)行,是走圈1,如果是復(fù)用,則走圈1*(如果是復(fù)用的話,調(diào)用co_create時(shí),會(huì)先coroutine_resume(co,f)一次進(jìn)入fork-co,將用戶函數(shù)傳遞給while循環(huán)中的coroutine_yield "EXIT"點(diǎn)之后,接著fork-co再次yield讓出,等待實(shí)際傳參的調(diào)用)。接著進(jìn)入用戶函數(shù),COMMAND_TIMEOUT會(huì)先向skynet-kernal發(fā)送TIMEOUT命令,如圈2所示。然后yield "SLEEP"到主co的resume點(diǎn)1之后繼續(xù)執(zhí)行,如圈3所示,按圈4的指向,調(diào)用suspend進(jìn)入"SLEEP"分支,記錄下TIMEOUT-session與fork-co的映射關(guān)系。此時(shí)主co回到skynet.dispatch_message中繼續(xù)下一個(gè)fork-co的處理。當(dāng)TIMEOUT消息回來(lái)時(shí),會(huì)由主co再次進(jìn)入skynet.dispatch_message并調(diào)用raw_dispatch_message分發(fā),這時(shí)通過(guò)session拿到之前映射的fork-co,再次resume,按照圈5的指向,會(huì)跳轉(zhuǎn)到fork-co的yield "SLEEP"點(diǎn)之后繼續(xù)向下處理。用戶函數(shù)處理完畢后,回到上層調(diào)用,即圈6所指,回收f(shuō)ork-co,接著yield "EXIT"到主co所在raw_dispatch_message中的resume點(diǎn)之后,如圈7所示。進(jìn)入suspend后,無(wú)額外命令,raw_dispatch_message處理結(jié)束,繼續(xù)主co的消息處理流程。

? ? 由以上分析可以看到,實(shí)際的協(xié)程跳轉(zhuǎn)過(guò)程是比較復(fù)雜的,也更顯得小小的LUA在skynet中的精巧運(yùn)用。為方便理解,順便貼出suspend的代碼(只列出了我們關(guān)注的幾個(gè)命令,并做了刪減):?

1 function suspend(co, result, command, param, size) 2 if command == "CALL" then 3 session_id_coroutine[param] = co 4 elseif command == "SLEEP" then 5 session_id_coroutine[param] = co 6 sleep_session[co] = param 7 elseif command == "RETURN" then 8 local co_session = session_coroutine_id[co] 9 local co_address = session_coroutine_address[co] 10 session_response[co] = true 11 c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) 12 return suspend(co, coroutine_resume(co, ret)) 13 elseif command == "EXIT" then 14 -- coroutine exit 15 local address = session_coroutine_address[co] 16 session_coroutine_id[co] = nil 17 session_coroutine_address[co] = nil 18 session_response[co] = nil 19 elseif command == nil then 20 -- debug trace 21 return 22 end 23 end

? ? 看完skynet.fork,我們?cè)倩剡^(guò)頭來(lái),看一看skynet.dispatch_message中消息分發(fā)raw_dispatch_message的具體細(xì)節(jié):

1 local function raw_dispatch_message(prototype, msg, sz, session, source) 2 -- skynet.PTYPE_RESPONSE = 1, read skynet.h 3 if prototype == 1 then 4 local co = session_id_coroutine[session] 5 session_id_coroutine[session] = nil 6 suspend(co, coroutine_resume(co, true, msg, sz)) 7 else 8 local p = proto[prototype] 9 local f = p.dispatch 10 local co = co_create(f) 11 session_coroutine_id[co] = session 12 session_coroutine_address[co] = source 13 suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) 14 end 15 end

? ? RESPONSE的處理先就不說(shuō)了,在敘述skynet.sleep時(shí)已經(jīng)有所討論。消息會(huì)根據(jù)它的prototype查找proto,接著調(diào)用co_create取得協(xié)程user-co,將message解包后resume到user-co進(jìn)入用戶函數(shù)f。而后續(xù)的流程則與上文我們討論的fork是一樣的了。比如,在f中調(diào)用skynet.call時(shí),會(huì)向目標(biāo)發(fā)送消息,接著會(huì)yield到主co,回到這里的resume點(diǎn),接著進(jìn)入suspend的"CALL"分支,記錄session與user-co的映射關(guān)系。下一次response消息回來(lái)時(shí),會(huì)查找到user-co并resume喚醒,在skynet.call后繼續(xù)執(zhí)行,用戶函數(shù)f結(jié)束后進(jìn)入上層調(diào)用,回收user-co,等待新的調(diào)用。

? ? 因?yàn)楸酒覀冴P(guān)注的是協(xié)程調(diào)度模型,而非具體的處理細(xì)節(jié),因此有空再對(duì)skynet.call,skynet.ret等作詳細(xì)的細(xì)節(jié)分析。

轉(zhuǎn)載于:https://www.cnblogs.com/Jackie-Snow/p/6687651.html

總結(jié)

以上是生活随笔為你收集整理的skynet源码阅读5--协程调度模型的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。