python消息队列celery高可用_分布式消息队列-Celery
怎么能不恨呢,在我發現自己是惡鬼的時候,在我最絕望最虛弱的時候,這個世上最該跟我在一起的人卻用刀把我的心刺穿了
Celery 是 Distributed Task Queue,分布式任務隊列。分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作。它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。
安裝
Celery4.x 開始不再支持Windows平臺了。下面裝的是3.1.25。
安裝使用命令
pip3 install celery==3.1.25
查看是否安裝成功,使用命令即可:
celery --version
如果在win10下想要使用celery4.x的話,可以這么做:
pip3 install eventlet
運行worker的時候加上一個參數:
celery -A xxxx worker -l info -P eventlet
然后安裝redis(個人比較喜歡redis)
首先下載安裝redis windowns服務端
解壓后,在其目錄下執行命令
redis-server.exe
即可啟動redis數據庫
然后安裝python連接操作redis的庫
pip3 install redis==2.10.6
注意版本號
核心主件
celery通過五大模塊實現
Task
就是任務,有異步任務和定時任務
Broker
中間人,接收生產者發來的消息即Task,將任務存入隊列。任務的消費者是Worker。Celery本身不提供隊列服務,推薦用Redis或RabbitMQ實現隊列服務。
Worker
執行任務的單元,它實時監控消息隊列,如果有任務就獲取任務并執行它。
Beat
定時任務調度器,根據配置定時將任務發送給Broler。
Backend
用于存儲任務的執行結果。
組成關系
各個角色間的關系看下面這張圖理解一下:
初次使用
首先編寫一個文件 命名為task1.py
from celery import Celery
app = Celery('tasks',broker='redis://192.168.1.102:6379/0')
# redis://192.168.1.102:6379/0 是redis數據庫地址,無需賬號密碼驗證,也是ssrf在獲取內網系統權限的方式之一
@app.task
def add(x,y):
print('傳遞 {} + {} = {}'.format(x,y,x+y))
return x+y
然后啟動redis數據庫
接下來再task1文件夾執行命令
celery -A task1 worker --loglevel=info
就會看到消息隊列都啟動
到現在所有的隊列都啟動,可以向這個隊列添加任務等待處理
方法是再task1目錄下打開cmd窗口,進入python3交互界面
python3
from task1 import add
add.delay(6,12)
add.delay(6,6)
上面只是一個發送任務的調用,結果是拿不到的。上面也沒有接收返回值,這次把返回值保存到起來
修改task1內容
app = Celery('tasks',broker='redis://192.168.1.102:6379/0',backend='redis://192.168.1.102:6379/0')
然后要重啟Worker,IDLE也要重啟
然后這樣就能獲取結果了
t = add.delay(1, 1)
t.get()
# 還可以設置超時時間 t.get(timeout=5)
# 如果出錯,獲取錯誤結果,不觸發異常
# 使用命令t.get(propagate=False)
# t.traceback (打印異常詳細結果)
# 還可以獲取任務狀態
# t.ready() 返回True 或者False
在項目中使用Celery
可以把celery配置成一個應用,假設應用名字是CeleryPro,目錄格式如下:
CeleryPro
├─__init.py
├─celery.py
├─tasks.py
這里的連接文件命名必須為celery.py,其他名字隨意
celery文件
這個文件名必須是celery.py:
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('CeleryPro',
broker='redis://192.168.1.102:6379',
backend='redis://192.168.1.102:6379',
include=['CeleryPro.tasks'])
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
tasks文件
這個文件開始兩行就多了一個點,這里要導入上面的celery.py文件。后面只要寫各種任務加上裝飾器就可以了:
from __future__ import absolute_import, unicode_literals
from .celery import app
import time
@app.task
def add(x, y):
print("計算2個值的和: %s %s" % (x, y))
return x+y
@app.task
def upper(v):
for i in range(10):
time.sleep(1)
print(i)
return v.upper()
啟動worker
啟動的時候,-A 參數后面用應用名稱 CeleryPro 。你還需要cd到你CeleryPro的父級目錄上啟動,否則找不到:
啟動的姿勢
這里注意用的都是CeleryPro:
celery -A CeleryPro worker -loglevel=info # 前臺啟動不推薦
celery -A CeleryPro worker -l info # 前臺啟動簡寫
celery multi start w1 -A CeleryPro -l info # 推薦用后臺啟動
定時任務
主要修改 celery.py文件
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
app = Celery('CeleryPro',
broker='redis://192.168.1.102',
backend='redis://192.168.1.102',
include=['CeleryPro.tasks'])
app.conf.CELERYBEAT_SCHEDULE = {
'add every 10 seconds': {
'task': 'CeleryPro.tasks.add',
'schedule': timedelta(seconds=10),
# 可以用timedelta對象
# 'schedule': 10, # 也支持直接用數字表示秒數
'args': (1, 2)
},
'upper every 2 minutes': {
'task': 'CeleryPro.tasks.upper',
'schedule': crontab(minute='*/2'),
'args': ('abc', ),
},
}
# app.conf.CELERY_TIMEZONE = 'UTC'
app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
啟動使用命令
celery -A CeleryPro beat -l info
celery -A CeleryPro worker -l info
參數解析:
-l info 與--loglevel=info的作用是一樣的。
--beat 周期性的運行。即設置 心跳。
新例子# tasks.py
# coding:utf-8
from celery import Celery,platforms
app = Celery('tasks')
app.config_from_object('config')
platforms.C_FORCE_ROOT = True
@app.task
def add(x,y):
return x + y
和另一個文件
# config.py
# coding:utf-8
from __future__ import absolute_import
from celery.schedules import crontab
from datetime import timedelta
BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERYBEAT_SCHEDULE = {
'add-every-2-seconds': {
'task': 'tasks.add',
'schedule': timedelta(seconds=2),
'args': (16, 10),
},
}
CELERY_TIMEZONE = 'Asia/Shanghai'
然后打開三個cmd窗口,依次輸入:
celery -A tasks beat -l info
celery -A tasks worker -l info
celery -A tasks flower
然后訪問本地5555端口即可~
查看異步任務情況
Celery提供了一個工具flower,將各個任務的執行情況、各個worker的健康狀態進行監控并以可視化的方式展現
下實現的方式如下:
安裝flower:
pip3 install flower
啟動flower(默認會啟動一個webserver,端口為5555):
在另一個Terminal中:
celery -A task1 flower
這里的task1是上面創建的py文件
進入
http://localhost:5555
即可查看。
資料文檔
總結
以上是生活随笔為你收集整理的python消息队列celery高可用_分布式消息队列-Celery的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 京东白条会弄花征信吗
- 下一篇: 盘点罗杰斯的生平 履历丰富投资天才