日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java openresty 调用_玩转 OpenResty 协程 API

發布時間:2025/3/15 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java openresty 调用_玩转 OpenResty 协程 API 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

注意:本文中列出的所有代碼只是 Proof Of Concept,基本上都沒有進行錯誤處理。另外對于一些邊際情況,也可能沒有考慮清楚。所以對于直接復制文中代碼到項目中所造成的一切后果,請自負責任。

OK,言歸正題。OpenResty 提供了以 ngx.thread.*,coroutine.* 和 ngx.semaphore 等一系列協程 API。雖然受限于 Nginx 的請求處理方式,表現力不如通用語言的協程 API 那么強大。但是開開腦洞,還是可以玩出一些花樣來的。

借助這些 API,讓我們嘗試模擬下其他編程平臺里面的調度方式。

模擬 Java 里面的 Future

Java 里的 Future 可以讓我們創建一個任務,然后在需要的時候才去 get 任務的返回值。另外 Future 還有超時功能。

我們可以啟用一個協程來完成具體的任務,再加一個定時結束的協程,用于實現超時。

像這樣:

local function task()

ngx.sleep(3)

ngx.say("Done")

end

local task_thread = ngx.thread.spawn(task)

local timeout_thread = ngx.thread.spawn(function(timeout)

ngx.sleep(timeout)

error("timeout")

end, 2)

local ok, res = ngx.thread.wait(task_thread, timeout_thread)

if not ok then

if res == "timeout" then

ngx.thread.kill(task_thread)

ngx.say("task cancelled by timeout")

return

end

ngx.say("task failed, result: ", res)

end

ngx.thread.kill(timeout_thread)

注意一點,在某一協程退出之后,我們需要 kill 掉另外一個協程。因為如果沒有調用 ngx.exit 之類的方法顯式退出的話,一直到所有協程退出為止,當前階段都不會結束。

引用文檔里相關的內容:

By default, the corresponding Nginx handler (e.g., rewrite_by_lua handler) will not terminate until

both the "entry thread" and all the user "light threads" terminates,

a "light thread" (either the "entry thread" or a user "light thread" aborts by calling ngx.exit, ngx.exec, ngx.redirect, or ngx.req.set_uri(uri, true), or

the "entry thread" terminates with a Lua error.

模擬 Javascript 里面的 Promise.race/all

Promise.race/all 可以接收多個 Promise,然后打包成一個新的 Promise 返回。引用相關的文檔:

The Promise.race(iterable) method returns a promise that resolves or rejects as soon as one of the promises in the iterable resolves or rejects, with the value or reason from that promise.

The Promise.all(iterable) method returns a promise that resolves when all of the promises in the iterable argument have resolved, or rejects with the reason of the first passed promise that rejects.

這里 reject 等價于協程運行中拋出 error,而 resolve 相對于協程返回了結果。這兩個 API 對于 reject 的處理是一致的,都是有任一出錯則立刻返回異常結果。對于正常結果,race 會在第一個結果出來時返回,而 all 則會在所有結果都出來后返回。

值得注意的是,Javascript 原生的 Promise 暫時沒有 cancell 的功能。所以即使其中一個 Promise reject 了,其他 Promise 依然會繼續運行。對此我們也照搬過來。

Promise.race 的實現:

local function apple()

ngx.sleep(0.1)

--error("apple lost")

return "apple done"

end

local function banana()

ngx.sleep(0.2)

return "banana done"

end

local function carrot()

ngx.sleep(0.3)

return "carrot done"

end

local function race(...)

local functions = {...}

local threads = {}

for _, f in ipairs(functions) do

local th, err = ngx.thread.spawn(f)

if not th then

-- Promise.race 沒有實現 cancell 接口,

-- 所以我偷下懶,不管已經創建的協程了

return nil, err

end

table.insert(threads, th)

end

local ok, res = ngx.thread.wait(unpack(threads))

if not ok then

return nil, res

end

return res

end

local res, err = race(apple, banana, carrot)

ngx.say("res: ", res, " err: ", err)

ngx.exit(ngx.OK)

Promise.all 的實現:

local function all(...)

local functions = {...}

local threads = {}

for _, f in ipairs(functions) do

local th, err = ngx.thread.spawn(f)

if not th then

return nil, err

end

table.insert(threads, th)

end

local res_group = {}

for _ = 1, #threads do

local ok, res = ngx.thread.wait(unpack(threads))

if not ok then

return nil, res

end

table.insert(res_group, res)

end

return res_group

end

模擬 Go 里面的 channel (僅部分實現)

再進一步,試試模擬下 Go 里面的 channel。

我們需要實現如下的語義:

當數據沒有被消費時,生產者會在發送數據之后中斷運行。

當數據沒有被生產時,消費者會在接收數據之前中斷運行。

當存在等待消費者接收數據的生產者時,其他生產者會在發送數據之前中斷運行。

這次要用到 ngx.semaphore。

local semaphore = require "ngx.semaphore"

local Chan = {

new = function(self)

local chan_attrs = {

_read_sema = semaphore:new(),

_write_sema = semaphore:new(),

_exclude_sema = semaphore:new(),

_buffer = nil,

_waiting_thread_num = 0,

}

return setmetatable(chan_attrs, {__index = self})

end,

send = function(self, value, timeout)

timeout = timeout or 60

while self._buffer do

self._waiting_thread_num = self._waiting_thread_num + 1

self._exclude_sema:wait(timeout)

self._waiting_thread_num = self._waiting_thread_num - 1

end

self._buffer = value

self._read_sema:post()

self._write_sema:wait(timeout)

end,

receive = function(self, timeout)

timeout = timeout or 60

self._read_sema:wait(timeout)

local value = self._buffer

self._buffer = nil

self._write_sema:post()

if self._waiting_thread_num > 0 then

self._exclude_sema:post()

end

return value

end,

}

local chan = Chan:new()

-- 以下是使用方法

local function worker_a(ch)

for i = 1, 10 do

ngx.sleep(math.random() / 10)

ch:send(i, 1)

end

end

local function worker_c(ch)

for i = 11, 20 do

ngx.sleep(math.random() / 10)

ch:send(i, 1)

end

end

local function worker_d(ch)

for i = 21, 30 do

ngx.sleep(math.random() / 10)

ch:send(i, 1)

end

end

local function worker_b(ch)

for _ = 1, 20 do

ngx.sleep(math.random() / 10)

local v = ch:receive(1)

ngx.say("recv ", v)

end

end

local function worker_e(ch)

for _ = 1, 10 do

ngx.sleep(math.random() / 10)

local v = ch:receive(1)

ngx.say("recv ", v)

end

end

ngx.thread.spawn(worker_a, chan)

ngx.thread.spawn(worker_b, chan)

ngx.thread.spawn(worker_c, chan)

ngx.thread.spawn(worker_d, chan)

ngx.thread.spawn(worker_e, chan)

模擬 Buffered channel 也是可行的。

local ok, new_tab = pcall(require, "table.new")

if not ok then

new_tab = function (_, _) return {} end

end

local BufferedChan = {

new = function(self, buffer_size)

if not buffer_size or buffer_size <= 0 then

error("Invalid buffer_size " .. (buffer_size or "nil") .. " given")

end

local chan_attrs = {

_read_sema = semaphore:new(),

_write_sema = semaphore:new(),

_waiting_thread_num = 0,

_buffer_size = buffer_size,

}

chan_attrs._buffer = new_tab(buffer_size, 0)

return setmetatable(chan_attrs, {__index = self})

end,

send = function (self, value, timeout)

timeout = timeout or 60

while #self._buffer >= self._buffer_size do

self._waiting_thread_num = self._waiting_thread_num + 1

self._write_sema:wait(timeout)

self._waiting_thread_num = self._waiting_thread_num - 1

end

table.insert(self._buffer, value)

self._read_sema:post()

end,

receive = function(self, timeout)

timeout = timeout or 60

self._read_sema:wait(timeout)

local value = table.remove(self._buffer)

if self._waiting_thread_num > 0 then

self._write_sema:post()

end

return value

end,

}

local chan = BufferedChan:new(2)

-- ...

當然上面的山寨貨還是有很多問題的。比如它缺少至關重要的 select 支持,另外也沒有實現 close 相關的特性。

總結

以上是生活随笔為你收集整理的java openresty 调用_玩转 OpenResty 协程 API的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。