redis延迟队列 如何确保成功消费_千万级延时任务队列如何实现,看美图开源的-LMSTFY...
導讀:Task是web開發(fā)中一個經(jīng)典場景,我們時常需要延時任務,或者定時任務,通常都需要任務隊列。常見的任務隊列如celery,lmstfy是美圖開源的任務隊列。本文作者詳細剖析了lmstfy的架構實現(xiàn),干貨滿滿,適合技術人員閱讀。
lmstfy(Let Me Schedule Task For You) 是美圖架構基礎服務團隊在 2018 年初基于 Redis 實現(xiàn)的簡單任務隊列(Task Queue)服務,目前在美圖多個線上產(chǎn)品使用接近兩年的時間。主要提供以下特性:
任務具備延時、自動重試、優(yōu)先級以及過期等功能
通過 HTTP restful API 提供服務
具備橫向擴展能力
豐富的業(yè)務和性能指標
Github 項目地址: https://github.com/meitu/lmstfy
使用場景
任務隊列跟消息隊列在使用場景上最大的區(qū)別是: 任務之間是沒有順序約束而消息要求順序(FIFO),且可能會對任務的狀態(tài)更新而消息一般只會消費不會更新。 類似 Kafka 利用消息 FIFO 和不需要更新(不需要對消息做索引)的特性來設計消息存儲,將消息讀寫變成磁盤的順序讀寫來實現(xiàn)比較好的性能。而任務隊列需要能夠任務狀態(tài)進行更新則需要對每個消息進行索引,如果把兩者放到一起實現(xiàn)則很難實現(xiàn)在功能和性能上兼得。在美圖內(nèi)部選型上,如果是異步消息模型一般會選擇消息隊列,比如類似日志上報,搶購等。而對于需要延時/定時下發(fā)或者修改狀態(tài)任務則是使用任務隊列。
比如在以下幾種場景會使用任務隊列:
定時任務,如每天早上 8 點開始推送消息,定期刪除過期數(shù)據(jù)等
任務流,如自動創(chuàng)建 Redis 流程由資源創(chuàng)建,資源配置,DNS 修改等部分組成,使用任務隊列可以簡化整體的設計和重試流程
重試任務,典型場景如離線圖片處理
目標與調(diào)研
在自研任務隊列之前,我們基于以下幾個要求作為約束調(diào)研了現(xiàn)有一些開源方案:
任務支持延時/優(yōu)先級任務和自動重試
高可用,服務不能有單點以及保證數(shù)據(jù)不丟失
可擴展,主要是容量和性能需要可擴展
第一種方案是 Redis 作者開源的分布式內(nèi)存隊列 disque(https://github.com/antirez/disque)。disque 采用和 Redis Cluster 類似無中心設計,所有節(jié)點都可以寫入并復制到其他節(jié)點。不管是從功能上、設計還是可靠性都是比較好的選擇。我們在 2017 年也引入 disque 在部分業(yè)務使用過一段時間,后面遇到 bug 在內(nèi)部修復后想反饋到社區(qū),發(fā)現(xiàn) Redis 作者決定不再維護這個項目(要把 disque 功能作為 redis module 來維護,應該是會伴隨 Redis 6 發(fā)布)。最終我們也放棄了 disque 方案,將數(shù)據(jù)遷移到我們自研任務隊列服務。
第二種方案是 2007 年就開源的 beanstalkd(https://github.com/beanstalkd/beanstalkd),現(xiàn)在仍然還是在維護狀態(tài)。beanstalkd 是類 memcached 協(xié)議全內(nèi)存任務隊列,斷電或者重啟時通過 WAL 文件來恢復數(shù)據(jù)。但 benstalkd 不支持復制功能,服務存在單點問題且數(shù)據(jù)可靠性也無法滿足。當時也有考慮基于 beanstalkd 去做二次開發(fā),但看完代碼之后覺得需要改造的點不只是復制,還有類似內(nèi)存控制等等,所以沒有選擇 beanstalkd 二次開發(fā)的方案。
也考慮過類似基于 kafka/rocketmq 等消息隊列作為存儲的方案,最后從存儲設計模型和團隊技術棧等原因決定選擇基于 redis 作為存儲來實現(xiàn)任務隊列的功能。舉個例子,假設以 Kafka 這種消息隊列存儲來實現(xiàn)延時功能,每個隊列的時間都需要創(chuàng)建一個單獨的 topic(如: Q1-1s, Q1-2s..)。這種設計在延時時間比較固定的場景下問題不太大,但如果是延時時間變化比較大會導致 topic 數(shù)目過多,會把磁盤從順序讀寫會變成隨機讀寫從導致性能衰減,同時也會帶來其他類似重啟或者恢復時間過長的問題。
設計和實現(xiàn)
整體設計
lmstfy 是 HTTP 協(xié)議的無狀態(tài)服務,可以通過 4/L7 的 LB 來接入。內(nèi)部主要由四個模塊組成:
Pump Thread: 每秒輪詢 Redis 將到期的任務遷移到就緒隊列(ready queue)
Metric Collector, 定時收集隊列相關統(tǒng)計數(shù)據(jù)到實例再通過 prometheus exporter 暴露給監(jiān)控系統(tǒng)
Token Manager,用來管理 namespace 和 token 的模塊,namespace 是用來做業(yè)務隔離的單位
Producer/Consumer,用來處理用戶的任務和消費請求
Default Pool 除了用來存儲業(yè)務數(shù)據(jù),namespace/token 這類元數(shù)據(jù)也會默認存儲到 Default 這個 Redis 池子里面
基礎概念
namespace - 用來隔離業(yè)務,每個業(yè)務是獨立的 namespace
queue - 隊列名稱,用區(qū)分同一業(yè)務不同消息類型
job - 業(yè)務定義的業(yè)務,主要包含以下幾個屬性:
id: 任務 ID,全局唯一
delay: 任務延時下發(fā)時間, 單位是秒
tries: 任務最大重試次數(shù),tries = N 表示任務會最多下發(fā) N 次
ttl(time to live): 任務最長有效期,超過之后任務自動消失
ttr(time to run): 任務預期執(zhí)行時間,超過 ttr 則認為任務消費失敗,觸發(fā)任務自動重試
數(shù)據(jù)存儲
lmstfy 的 redis 存儲由四部分組成:
timer(sorted set) - 用來實現(xiàn)延遲任務的排序,再由后臺線程定期將到期的任務寫入到 Ready Queue 里面
ready queue (list) - 無延時或者已到期任務的隊列
deadletter (list) - 消費失敗(重試次數(shù)到達上限)的任務,可以手動重新放回隊列
job pool(string) - 存儲消息內(nèi)容的池子
支持延遲的任務隊列本質(zhì)上是兩個數(shù)據(jù)結構的結合: FIFO 和 sorted set。sorted set 用來實現(xiàn)延時的部分,將任務按照到期時間戳升序存儲,然后定期將到期的任務遷移至 FIFO(ready queue)。任務的具體內(nèi)容只會存儲一份在 job pool 里面,其他的像 ready queue,timer,deadletter 只是存儲 job id,這樣可以節(jié)省一些內(nèi)存空間。
以下是整體設計:
任務寫入
任務在寫入時會先產(chǎn)生一個 job id,目前 job id (16bytes) 包含寫入時間戳、 隨機數(shù)和延遲秒數(shù), 然后寫入 key 為 j:{namespace}/{queue}/{ID} 的任務到任務池 (pool) 里面。之后根據(jù)延時時間來決定這個 job id 應該到 ready queue 還是 timer 里面:
delay = 0,表示不需要延時則直接寫到 ready queue(list)
delay = n(n > 0),表示需要延時,將延時加上當前系統(tǒng)時間作為絕對時間戳寫到 timer(sorted set)
timer 的實現(xiàn)是利用 zset 根據(jù)絕對時間戳進行排序,再由旁路線程定期輪詢將到期的任務通過 redis lua script 來將數(shù)據(jù)原子地轉移到 ready queue 里面。
任務消費
之前提到任務在消費失敗之后預期能夠重試,所以必須知道什么時候可認為任務消費失敗?業(yè)務在消費時需要攜帶 ttr(time to run) 參數(shù),用來表示業(yè)務預期任務最長執(zhí)行時間,如果在 ttr 時間內(nèi)沒有收到業(yè)務主動回復 ACK 消息則會認為任務失敗(類似 tcp 的重傳 timer)。
消費時從 ready queue 中 (B)RPOP 出任務的 job id,然后根據(jù) job id 從 pool 中將任務內(nèi)容發(fā)送給消費者。同時對 tries 減1,根據(jù)消費的 ttr(time to run) 參數(shù), 將任務放入 timer 中。如果 tries 為零, 在 ttr 時間到期后該 job id 會被放入 dead letter 隊列中(表示任務執(zhí)行失敗)。
同步任務模型
lmstfy 除了可以用來實現(xiàn)異步和延時任務模型之外,因為 namespace 下面的隊列是動態(tài)創(chuàng)建且 job id 全局唯一,還可以用來實現(xiàn)同步任務模型 (producer 等到任務執(zhí)行成功之后返回)。大概如下:
producer 寫入任務之后拿到 job id, 然后監(jiān)聽(consume)以 job id 為名的隊列
consumer 消費任務成功后,寫回復消息到同樣以 job id 為名的隊列中
producer 如果規(guī)定時間內(nèi)能讀到回復消息則認為消費成功,等待超時則認為任務失敗
如何實現(xiàn)橫向擴展
lmstfy 本身是無狀態(tài)的服務可以很簡單的實現(xiàn)橫向擴展,這里的橫向擴展主要是存儲(目前只支持 Redis)的橫向擴展。設計也比較簡單,主要通過通過 namespace 對應的 token 路由來實現(xiàn), 比如我們當前配置兩組 Redis 資源: default 和 meipai:
[Pool][Pool.default]Addr = "1.1.1.1:6379"[Pool.meipai]Addr = "2.2.2.2:6389"在創(chuàng)建 namespace 時可以指定資源池,token 里面會攜帶資源池名字作為前綴。比指定美拍資源池,那么 token 類似: meipai:01DT8EZ1N6XT ,后續(xù)在處理請求時就可以根據(jù) token 里面攜帶的資源池名稱來進行路由數(shù)據(jù)。不過這種設計實現(xiàn)隊列級別的擴展,如果單隊列存儲消息量超過 Redis 內(nèi)存上限則需要其他手段來解決(后面會支持磁盤類型存儲)。
如何使用
# 創(chuàng)建 namespace 和 token, 注意這里使用管理端口$ ./scripts/token-cli -c -n test_ns -p default -D "test ns apply by @hulk" 127.0.0.1:7778{ "token": "01DT9323JACNBQ9JESV80G0000"}# 寫入內(nèi)容為 value 的任務$ curl -XPUT -d "value" -i "http://127.0.0.1:7777/api/test_ns/q1?tries=3&delay=1&token=01DT931XGSPKNB7E2XFKPY3ZPB"{"job_id":"01DT9323JACNBQ9JESV80G0000總結
以上是生活随笔為你收集整理的redis延迟队列 如何确保成功消费_千万级延时任务队列如何实现,看美图开源的-LMSTFY...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis数据库价格_Caffeine和
- 下一篇: 8s pod 查看 的yaml_Kube