日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

python redis 消息队列_Python的Flask框架应用调用Redis队列数据的方法

發(fā)布時間:2025/3/11 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python redis 消息队列_Python的Flask框架应用调用Redis队列数据的方法 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

任務(wù)異步化打開瀏覽器,輸入地址,按下回車,打開了頁面。于是一個HTTP請求(request)就由客戶端發(fā)送到服務(wù)器,服務(wù)器處理請求,返回響應(yīng)(response)內(nèi)容。

我們每天都在瀏覽網(wǎng)頁,發(fā)送大大小小的請求給服務(wù)器。有時候,服務(wù)器接到了請求,會發(fā)現(xiàn)他也需要給另外的服務(wù)器發(fā)送請求,或者服務(wù)器也需要做另外一些事情,于是最初們發(fā)送的請求就被阻塞了,也就是要等待服務(wù)器完成其他的事情。

更多的時候,服務(wù)器做的額外事情,并不需要客戶端等待,這時候就可以把這些額外的事情異步去做。從事異步任務(wù)的工具有很多。主要原理還是處理通知消息,針對通知消息通常采取是隊列結(jié)構(gòu)。生產(chǎn)和消費消息進行通信和業(yè)務(wù)實現(xiàn)。

生產(chǎn)消費與隊列上述異步任務(wù)的實現(xiàn),可以抽象為生產(chǎn)者消費模型。如同一個餐館,廚師在做飯,吃貨在吃飯。如果廚師做了很多,暫時賣不完,廚師就會休息;如果客戶很多,廚師馬不停蹄的忙碌,客戶則需要慢慢等待。實現(xiàn)生產(chǎn)者和消費者的方式用很多,下面使用Python標準庫Queue寫個小例子:

import random

import time

from Queue import Queue

from threading import Thread

queue = Queue(10)

class Producer(Thread):

def run(self):

while True:

elem = random.randrange(9)

queue.put(elem)

print "廚師 {} 做了 {} 飯 --- 還剩 {} 飯沒賣完".format(self.name, elem, queue.qsize())

time.sleep(random.random())

class Consumer(Thread):

def run(self):

while True:

elem = queue.get()

print "吃貨{} 吃了 {} 飯 --- 還有 {} 飯可以吃".format(self.name, elem, queue.qsize())

time.sleep(random.random())

def main():

for i in range(3):

p = Producer()

p.start()

for i in range(2):

c = Consumer()

c.start()

if __name__ == '__main__':

main()

大概輸出如下:

廚師 Thread-1 做了 1 飯 --- 還剩 1 飯沒賣完

廚師 Thread-2 做了 8 飯 --- 還剩 2 飯沒賣完

廚師 Thread-3 做了 3 飯 --- 還剩 3 飯沒賣完

吃貨Thread-4 吃了 1 飯 --- 還有 2 飯可以吃

吃貨Thread-5 吃了 8 飯 --- 還有 1 飯可以吃

吃貨Thread-4 吃了 3 飯 --- 還有 0 飯可以吃

廚師 Thread-1 做了 0 飯 --- 還剩 1 飯沒賣完

廚師 Thread-2 做了 0 飯 --- 還剩 2 飯沒賣完

廚師 Thread-1 做了 1 飯 --- 還剩 3 飯沒賣完

廚師 Thread-1 做了 1 飯 --- 還剩 4 飯沒賣完

吃貨Thread-4 吃了 0 飯 --- 還有 3 飯可以吃

廚師 Thread-3 做了 3 飯 --- 還剩 4 飯沒賣完

吃貨Thread-5 吃了 0 飯 --- 還有 3 飯可以吃

吃貨Thread-5 吃了 1 飯 --- 還有 2 飯可以吃

廚師 Thread-2 做了 8 飯 --- 還剩 3 飯沒賣完

廚師 Thread-2 做了 8 飯 --- 還剩 4 飯沒賣完

Redis 隊列Python內(nèi)置了一個好用的隊列結(jié)構(gòu)。我們也可以是用redis實現(xiàn)類似的操作。并做一個簡單的異步任務(wù)。

Redis提供了兩種方式來作消息隊列。一個是使用生產(chǎn)者消費模式模式,另外一個方法就是發(fā)布訂閱者模式。前者會讓一個或者多個客戶端監(jiān)聽消息隊列,一旦消息到達,消費者馬上消費,誰先搶到算誰的,如果隊列里沒有消息,則消費者繼續(xù)監(jiān)聽。后者也是一個或多個客戶端訂閱消息頻道,只要發(fā)布者發(fā)布消息,所有訂閱者都能收到消息,訂閱者都是ping的。

生產(chǎn)消費模式主要使用了redis提供的blpop獲取隊列數(shù)據(jù),如果隊列沒有數(shù)據(jù)則阻塞等待,也就是監(jiān)聽。

import redis

class Task(object):

def __init__(self):

self.rcon = redis.StrictRedis(host='localhost', db=5)

self.queue = 'task:prodcons:queue'

def listen_task(self):

while True:

task = self.rcon.blpop(self.queue, 0)[1]

print "Task get", task

if __name__ == '__main__':

print 'listen task queue'

Task().listen_task()

發(fā)布訂閱模式使用redis的pubsub功能,訂閱者訂閱頻道,發(fā)布者發(fā)布消息到頻道了,頻道就是一個消息隊列。

import redis

class Task(object):

def __init__(self):

self.rcon = redis.StrictRedis(host='localhost', db=5)

self.ps = self.rcon.pubsub()

self.ps.subscribe('task:pubsub:channel')

def listen_task(self):

for i in self.ps.listen():

if i['type'] == 'message':

print "Task get", i['data']

if __name__ == '__main__':

print 'listen task channel'

Task().listen_task()

Flask 入口我們分別實現(xiàn)了兩種異步任務(wù)的后端服務(wù),直接啟動他們,就能監(jiān)聽redis隊列或頻道的消息了。簡單的測試如下:

import redis

import random

import logging

from flask import Flask, redirect

app = Flask(__name__)

rcon = redis.StrictRedis(host='localhost', db=5)

prodcons_queue = 'task:prodcons:queue'

pubsub_channel = 'task:pubsub:channel'

@app.route('/')

def index():

html = """


Redis Message Queue


生產(chǎn)消費者模式



發(fā)布訂閱者模式

"""

return html

@app.route('/prodcons')

def prodcons():

elem = random.randrange(10)

rcon.lpush(prodcons_queue, elem)

logging.info("lpush {} -- {}".format(prodcons_queue, elem))

return redirect('/')

@app.route('/pubsub')

def pubsub():

ps = rcon.pubsub()

ps.subscribe(pubsub_channel)

elem = random.randrange(10)

rcon.publish(pubsub_channel, elem)

return redirect('/')

if __name__ == '__main__':

app.run(debug=True)

啟動腳本,使用

siege -c10 -r 5 http://127.0.0.1:5000/prodcons

siege -c10 -r 5 http://127.0.0.1:5000/pubsub

可以分別在監(jiān)聽的腳本輸入中看到異步消息。在異步的任務(wù)中,可以執(zhí)行一些耗時間的操作,當然目前這些做法并不知道異步的執(zhí)行結(jié)果,如果需要知道異步的執(zhí)行結(jié)果,可以考慮設(shè)計協(xié)程任務(wù)或者使用一些工具如RQ或者celery等。

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結(jié)

以上是生活随笔為你收集整理的python redis 消息队列_Python的Flask框架应用调用Redis队列数据的方法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。