日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python 分布式框架_python分布式框架rq的使用

發布時間:2024/9/19 python 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 分布式框架_python分布式框架rq的使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RedisQueue是一款輕量級的分布式異步任務隊列調度框架,基于redis數據庫作為broker,生產端將任務job存儲到redis數據庫中,消費端監聽隊列并取出任務執行。

1.基礎架構

rq框架使用前需要安裝rq庫,使用pip安裝即可:pip install rq

1.1 生產者

生產者將任務發送到指定redis的指定隊列中,job.py:

import requests

from rq import Queue

# 具體業務執行代碼

def count_words_at_url(url):

resp = requests.get(url)

return len(resp.text.split())

conn = redis.Redis(host='127.0.0.1', password='123456', port=8888, db=10) # 指定redis數據庫

q = Queue("high", connection=conn) # 指定隊列

job_urls = ['http://baidu.com', 'http://qq.com', 'http://sohu.com']

for url in job_urls:

q.enqueue_call(count_words_at_url, args=(url,)) # 發送任務

1.2 消費者

消費者采用多進程架構,監聽指定隊列從中取出任務消息,并調用執行業務處理函數,worker.py:

import redis

from multiprocessing import Pool

from rq import Worker, Queue, Connection

conn = redis.Redis(host='127.0.0.1', password='123456', port=8888, db=10) # 同一個redis連接

# worker邏輯,無需修改

def worker(listen):

with Connection(conn):

worker = Worker(map(Queue, listen))

worker.work()

if __name__ == "__main__":

listen = ['high', 'default', 'low'] # 監聽的隊列,可自定義修改

try:

cpu_num = 4

p = Pool(cpu_num)

for i in range(cpu_num):

p.apply_async(worker, args=(listen,)) # 開啟worker

p.close()

p.join()

except Exception as e:

print(e)

2.基于redis集群

若是基于redis集群,可以用如下方式獲取redis連接:

from rq.cli.helpers import get_redis_from_config

settings = {'SENTINEL': {

'INSTANCES': [('redis1.cloud.bz', 8080),

('redis2.cloud.bz', 8080),

('redis3.cloud.bz', 8080)],

'SOCKET_TIMEOUT': None,

'PASSWORD': '',

'DB': 10,

'MASTER_NAME': 'sentinel-127.0.0.1-8080'

}

}

conn = get_redis_from_config(settings)

總結

以上是生活随笔為你收集整理的python 分布式框架_python分布式框架rq的使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。