日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

java openresty 调用_玩转 OpenResty 协程 API

發(fā)布時(shí)間:2025/3/15 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java openresty 调用_玩转 OpenResty 协程 API 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

注意:本文中列出的所有代碼只是 Proof Of Concept,基本上都沒有進(jìn)行錯(cuò)誤處理。另外對(duì)于一些邊際情況,也可能沒有考慮清楚。所以對(duì)于直接復(fù)制文中代碼到項(xiàng)目中所造成的一切后果,請(qǐng)自負(fù)責(zé)任。

OK,言歸正題。OpenResty 提供了以 ngx.thread.*,coroutine.* 和 ngx.semaphore 等一系列協(xié)程 API。雖然受限于 Nginx 的請(qǐng)求處理方式,表現(xiàn)力不如通用語言的協(xié)程 API 那么強(qiáng)大。但是開開腦洞,還是可以玩出一些花樣來的。

借助這些 API,讓我們嘗試模擬下其他編程平臺(tái)里面的調(diào)度方式。

模擬 Java 里面的 Future

Java 里的 Future 可以讓我們創(chuàng)建一個(gè)任務(wù),然后在需要的時(shí)候才去 get 任務(wù)的返回值。另外 Future 還有超時(shí)功能。

我們可以啟用一個(gè)協(xié)程來完成具體的任務(wù),再加一個(gè)定時(shí)結(jié)束的協(xié)程,用于實(shí)現(xiàn)超時(shí)。

像這樣:

local function task()

ngx.sleep(3)

ngx.say("Done")

end

local task_thread = ngx.thread.spawn(task)

local timeout_thread = ngx.thread.spawn(function(timeout)

ngx.sleep(timeout)

error("timeout")

end, 2)

local ok, res = ngx.thread.wait(task_thread, timeout_thread)

if not ok then

if res == "timeout" then

ngx.thread.kill(task_thread)

ngx.say("task cancelled by timeout")

return

end

ngx.say("task failed, result: ", res)

end

ngx.thread.kill(timeout_thread)

注意一點(diǎn),在某一協(xié)程退出之后,我們需要 kill 掉另外一個(gè)協(xié)程。因?yàn)槿绻麤]有調(diào)用 ngx.exit 之類的方法顯式退出的話,一直到所有協(xié)程退出為止,當(dāng)前階段都不會(huì)結(jié)束。

引用文檔里相關(guān)的內(nèi)容:

By default, the corresponding Nginx handler (e.g., rewrite_by_lua handler) will not terminate until

both the "entry thread" and all the user "light threads" terminates,

a "light thread" (either the "entry thread" or a user "light thread" aborts by calling ngx.exit, ngx.exec, ngx.redirect, or ngx.req.set_uri(uri, true), or

the "entry thread" terminates with a Lua error.

模擬 Javascript 里面的 Promise.race/all

Promise.race/all 可以接收多個(gè) Promise,然后打包成一個(gè)新的 Promise 返回。引用相關(guān)的文檔:

The Promise.race(iterable) method returns a promise that resolves or rejects as soon as one of the promises in the iterable resolves or rejects, with the value or reason from that promise.

The Promise.all(iterable) method returns a promise that resolves when all of the promises in the iterable argument have resolved, or rejects with the reason of the first passed promise that rejects.

這里 reject 等價(jià)于協(xié)程運(yùn)行中拋出 error,而 resolve 相對(duì)于協(xié)程返回了結(jié)果。這兩個(gè) API 對(duì)于 reject 的處理是一致的,都是有任一出錯(cuò)則立刻返回異常結(jié)果。對(duì)于正常結(jié)果,race 會(huì)在第一個(gè)結(jié)果出來時(shí)返回,而 all 則會(huì)在所有結(jié)果都出來后返回。

值得注意的是,Javascript 原生的 Promise 暫時(shí)沒有 cancell 的功能。所以即使其中一個(gè) Promise reject 了,其他 Promise 依然會(huì)繼續(xù)運(yùn)行。對(duì)此我們也照搬過來。

Promise.race 的實(shí)現(xiàn):

local function apple()

ngx.sleep(0.1)

--error("apple lost")

return "apple done"

end

local function banana()

ngx.sleep(0.2)

return "banana done"

end

local function carrot()

ngx.sleep(0.3)

return "carrot done"

end

local function race(...)

local functions = {...}

local threads = {}

for _, f in ipairs(functions) do

local th, err = ngx.thread.spawn(f)

if not th then

-- Promise.race 沒有實(shí)現(xiàn) cancell 接口,

-- 所以我偷下懶,不管已經(jīng)創(chuàng)建的協(xié)程了

return nil, err

end

table.insert(threads, th)

end

local ok, res = ngx.thread.wait(unpack(threads))

if not ok then

return nil, res

end

return res

end

local res, err = race(apple, banana, carrot)

ngx.say("res: ", res, " err: ", err)

ngx.exit(ngx.OK)

Promise.all 的實(shí)現(xiàn):

local function all(...)

local functions = {...}

local threads = {}

for _, f in ipairs(functions) do

local th, err = ngx.thread.spawn(f)

if not th then

return nil, err

end

table.insert(threads, th)

end

local res_group = {}

for _ = 1, #threads do

local ok, res = ngx.thread.wait(unpack(threads))

if not ok then

return nil, res

end

table.insert(res_group, res)

end

return res_group

end

模擬 Go 里面的 channel (僅部分實(shí)現(xiàn))

再進(jìn)一步,試試模擬下 Go 里面的 channel。

我們需要實(shí)現(xiàn)如下的語義:

當(dāng)數(shù)據(jù)沒有被消費(fèi)時(shí),生產(chǎn)者會(huì)在發(fā)送數(shù)據(jù)之后中斷運(yùn)行。

當(dāng)數(shù)據(jù)沒有被生產(chǎn)時(shí),消費(fèi)者會(huì)在接收數(shù)據(jù)之前中斷運(yùn)行。

當(dāng)存在等待消費(fèi)者接收數(shù)據(jù)的生產(chǎn)者時(shí),其他生產(chǎn)者會(huì)在發(fā)送數(shù)據(jù)之前中斷運(yùn)行。

這次要用到 ngx.semaphore。

local semaphore = require "ngx.semaphore"

local Chan = {

new = function(self)

local chan_attrs = {

_read_sema = semaphore:new(),

_write_sema = semaphore:new(),

_exclude_sema = semaphore:new(),

_buffer = nil,

_waiting_thread_num = 0,

}

return setmetatable(chan_attrs, {__index = self})

end,

send = function(self, value, timeout)

timeout = timeout or 60

while self._buffer do

self._waiting_thread_num = self._waiting_thread_num + 1

self._exclude_sema:wait(timeout)

self._waiting_thread_num = self._waiting_thread_num - 1

end

self._buffer = value

self._read_sema:post()

self._write_sema:wait(timeout)

end,

receive = function(self, timeout)

timeout = timeout or 60

self._read_sema:wait(timeout)

local value = self._buffer

self._buffer = nil

self._write_sema:post()

if self._waiting_thread_num > 0 then

self._exclude_sema:post()

end

return value

end,

}

local chan = Chan:new()

-- 以下是使用方法

local function worker_a(ch)

for i = 1, 10 do

ngx.sleep(math.random() / 10)

ch:send(i, 1)

end

end

local function worker_c(ch)

for i = 11, 20 do

ngx.sleep(math.random() / 10)

ch:send(i, 1)

end

end

local function worker_d(ch)

for i = 21, 30 do

ngx.sleep(math.random() / 10)

ch:send(i, 1)

end

end

local function worker_b(ch)

for _ = 1, 20 do

ngx.sleep(math.random() / 10)

local v = ch:receive(1)

ngx.say("recv ", v)

end

end

local function worker_e(ch)

for _ = 1, 10 do

ngx.sleep(math.random() / 10)

local v = ch:receive(1)

ngx.say("recv ", v)

end

end

ngx.thread.spawn(worker_a, chan)

ngx.thread.spawn(worker_b, chan)

ngx.thread.spawn(worker_c, chan)

ngx.thread.spawn(worker_d, chan)

ngx.thread.spawn(worker_e, chan)

模擬 Buffered channel 也是可行的。

local ok, new_tab = pcall(require, "table.new")

if not ok then

new_tab = function (_, _) return {} end

end

local BufferedChan = {

new = function(self, buffer_size)

if not buffer_size or buffer_size <= 0 then

error("Invalid buffer_size " .. (buffer_size or "nil") .. " given")

end

local chan_attrs = {

_read_sema = semaphore:new(),

_write_sema = semaphore:new(),

_waiting_thread_num = 0,

_buffer_size = buffer_size,

}

chan_attrs._buffer = new_tab(buffer_size, 0)

return setmetatable(chan_attrs, {__index = self})

end,

send = function (self, value, timeout)

timeout = timeout or 60

while #self._buffer >= self._buffer_size do

self._waiting_thread_num = self._waiting_thread_num + 1

self._write_sema:wait(timeout)

self._waiting_thread_num = self._waiting_thread_num - 1

end

table.insert(self._buffer, value)

self._read_sema:post()

end,

receive = function(self, timeout)

timeout = timeout or 60

self._read_sema:wait(timeout)

local value = table.remove(self._buffer)

if self._waiting_thread_num > 0 then

self._write_sema:post()

end

return value

end,

}

local chan = BufferedChan:new(2)

-- ...

當(dāng)然上面的山寨貨還是有很多問題的。比如它缺少至關(guān)重要的 select 支持,另外也沒有實(shí)現(xiàn) close 相關(guān)的特性。

總結(jié)

以上是生活随笔為你收集整理的java openresty 调用_玩转 OpenResty 协程 API的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。