日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控

發(fā)布時間:2024/10/8 python 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

@Author:Runsen

消息隊列

消息隊列讓應用程序在用戶請求之外異步執(zhí)行稱為任務的工作。如果應用程序需要在后臺執(zhí)行工作,它會將任務添加到任務隊列中。這些任務稍后由工作服務執(zhí)行。

Celery

Celery 是一個分布式任務隊列,可幫助在后臺異步執(zhí)行大量進程/消息并進行實時處理。芹菜由三個主要成分組成:

  • Celery 客戶端
  • 消息代理
  • Celery 工人

下圖顯示了組件之間如何交互的簡化圖。我們將使用 FastAPI 作為我們的 Celery 客戶端和 RabbitMQ 作為消息代理。

  • 將Celery 客戶將運行FastAPI應用程序,并會發(fā)出消息/后臺作業(yè)的RabbitMQ。

  • RabbitMQ 將作為消息代理來調解客戶端和工作線程之間的消息。

  • RabbitMQ 收到客戶端的消息后,會通過將消息發(fā)送給 celery worker 來啟動客戶端任務。

  • 一個celery 工人被認為是將實現(xiàn)在任何Web服務器的請求的異步后臺任務。可以有多個工人一次執(zhí)行/完成許多任務。

  • celery 將確保每個 worker 一次只執(zhí)行一個任務,并且每個任務只由一個 worker 分配。

FastAPI 是一個現(xiàn)代、快速(高性能)的 Web 框架,

安裝 fastapi包

pip install fastapi

安裝 celery 包

pip install celery

我們還需要安裝 ASGI 服務器來運行我們的 FastAPI 應用程序。

pip install uvicorn

在我們的本地機器上運行 RabbitMQ 的最簡單方法之一是使用 Docker。

Docker安裝查看:https://docs.docker.com/get-docker/

運行以下命令即可通過終端中的 docker 啟動 RabbitMQ 映像。

docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:3

如果沒有安裝RabbitMQ 映像,會直接下載安裝。

創(chuàng)建 Celery Worker 任務

現(xiàn)在,一旦消息代理RabbitMQ 運行,就可以創(chuàng)建Worker 程序。

這是因為 celery Worker 會偵聽消息代理以執(zhí)行排隊的任務。

在這一部分,我們將為 celery worker 創(chuàng)建任務。創(chuàng)建一個文件celery_worker.py:

from time import sleep from celery import Celery from celery.utils.log import get_task_logger #初始Celery celery = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672//') #創(chuàng)建記錄器-啟用以在任務記錄器上顯示消息 celery_log = get_task_logger(__name__) #創(chuàng)建訂單-與芹菜異步運行 #長時間運行任務的示例流程 @celery.task def create_order(name, quantity):# 每1個訂單5秒complete_time_per_item = 5# 根據(jù)訂購的物品數(shù)量不斷增加sleep(complete_time_per_item * quantity)celery_log.info(f"Order Complete!")return {"message": f"Hi {name}, Your order has completed!","order_quantity": quantity}

新建一個model.py

from pydantic import BaseModel #請求主體的訂單類模型 class Order(BaseModel):customer_name: strorder_quantity: int

然后,創(chuàng)建一個名為main.py的新文件:

from fastapi import FastAPI from celery_worker import create_order from model import Order # 創(chuàng)建FastAPI應用程序 app = FastAPI() # 創(chuàng)建訂單 @app.post('/order') def add_order(order: Order):# 使用delay=() 方法調用芹菜任務create_order.delay(order.customer_name, order.order_quantity)return {"message": "Order Received! Thank you for your patience."}

運行 FastAPI 應用程序:

uvicorn main:app --reload

訪問http://localhost:8000/docs

以查看 Swagger 文檔中正在運行的 FastAPI。

celery -A celery_worker.celery worker --loglevel=info

可以通過插入請求正文輸入來測試我們的端點。這是請求正文的示例輸入:

單擊“Execute”以從端點獲取響應,將看到以下結果:

有沒有什么有效的方法來監(jiān)控后臺任務?

我們可以使用Flower 監(jiān)控工具來監(jiān)控我們所有的Celery 工人。

pip install flower

然后,flower在我們的本地機器上運行服務器:

celery flower -A celery_worker.celery --broker:amqp://localhost//

訪問http://localhost:5555/。

Flower 服務器并單擊導航欄上的“Tasks”選項卡來監(jiān)控我們的 celery 工人。

總結

以上是生活随笔為你收集整理的Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。