Tornado 高并发源码分析之六---异步编程的几种实现方式
生活随笔
收集整理的這篇文章主要介紹了
Tornado 高并发源码分析之六---异步编程的几种实现方式
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
?
方式一:通過線程池或者進(jìn)程池 導(dǎo)入庫futures是python3自帶的庫,如果是python2,需要pip安裝future這個庫 備注:進(jìn)程池和線程池寫法相同 1 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor 2 from tornado.concurrent import run_on_executor 3 4 def doing(s): 5 print('xiumian--{}'.format(s)) 6 time.sleep(s) 7 return s 8 9 class MyMainHandler(RequestHandler): 10 executor = ProcessPoolExecutor(2) #新建一個進(jìn)程池,靜態(tài)變量,屬于類,所以全程只有這個幾個進(jìn)程,不需要關(guān)閉,如果放在__init__中,則屬于對象,每次請求都會新建pool,當(dāng)請求增多的時候,會導(dǎo)致今天變得非常多,這個方法不可取 11 12 @gen.coroutine 13 def get(self, *args, **kwargs): 14 print('開始{}'.format(self.pool_temp)) 15 a = yield self.executor.submit(doing, 20) 16 print('進(jìn)程 %s' % self.executor._processes) 17 self.write(str(a)) 18 print('執(zhí)行完畢{}'.format(a)) 19 20 @run_on_executor #tornado 另外一種寫法,需要在靜態(tài)變量中有executor的進(jìn)程池變量 21 def post(self, *args, **kwargs): 22 a = yield doing(20)?
?
方式二:Tornado + Celery + RabbitMQ 實現(xiàn) 使用Celery任務(wù)隊列,Celery 只是一個任務(wù)隊列,需要一個broker媒介,將耗時的任務(wù)傳遞給Celery任務(wù)隊列執(zhí)行,執(zhí)行完畢將結(jié)果通過broker媒介返回。官方推薦使用RabbitMQ作為消息傳遞,redis也可以 一、Celery 介紹: 1.1、注意: 1、當(dāng)使用RabbitMQ時,需要按照pika第三方庫,pika0.10.0存在bug,無法獲得回調(diào)信息,需要按照0.9.14版本即可 2、tornado-celery 庫比較舊,無法適應(yīng)Celery的最新版,會導(dǎo)致報無法導(dǎo)入task Producter包錯誤,只需要將celery版本按照在3.0.25就可以了 1.2、關(guān)于配置: 單個參數(shù)配置: 1 app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'? 多個參數(shù)配置: 1 app.conf.update( 2 CELERY_BROKER_URL = 'amqp://guest@localhost//', 3 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' 4 ) 從配置文件中獲取:(將配置參數(shù)寫在文件app.py中) 1 BROKER_URL='amqp://guest@localhost//' 2 CELERY_RESULT_BACKEND='redis://localhost:6379/0' 3 app.config_from_object('celeryconfig') 二、案例 2.1、啟動一個Celery 任務(wù)隊列,也就是消費者: 1 from celery import Celery 2 celery = Celery('tasks', broker='amqp://guest:guest@119.29.151.45:5672', backend='amqp') 使用RabbitMQ作為載體, 回調(diào)也是使用rabbit作為載體 3 4 @celery.task(name='doing') #異步任務(wù),需要命一個獨一無二的名字 5 def doing(s, b): 6 print('開始任務(wù)') 7 logging.warning('開始任務(wù)--{}'.format(s)) 8 time.sleep(s) 9 return s+b 命令行啟動任務(wù)隊列守護(hù)進(jìn)程,當(dāng)隊列中有任務(wù)時,自動執(zhí)行 (命令行可以放在supervisor中管理) --loglevel=info --concurrency=5 記錄等級,默認(rèn)是concurrency:指定工作進(jìn)程數(shù)量,默認(rèn)是CPU核心數(shù) 2.2、啟動任務(wù)生產(chǎn)者 1 import tcelery 2 tcelery.setup_nonblocking_producer() #設(shè)置為非阻塞生產(chǎn)者,否則無法獲取回調(diào)信息 3 4 class MyMainHandler(RequestHandler): 5 6 @web.asynchronous 7 @gen.coroutine 8 def get(self, *args, **kwargs): 9 print('begin') 10 result = yield gen.Task(sleep.apply_async, args=[10]) #使用yield 獲取異步返回值,會一直等待但是不阻塞其他請求 11 print('ok--{}'.format(result.result)) #返回值結(jié)果 12 13 # sleep.apply_async((10, ), callback=self.on_success) 14 # print('ok -- {}'.format(result.get(timeout=100)))#使用回調(diào)的方式獲取返回值,發(fā)送任務(wù)之后,請求結(jié)束,所以不能放在處理tornado的請求任務(wù)當(dāng)中,因為請求已經(jīng)結(jié)束了,與客戶端已經(jīng)斷開連接,無法再在獲取返回值的回調(diào)中繼續(xù)向客戶端返回數(shù)據(jù) 15 16 # result = sleep.delay(10) #delay方法只是對apply_async方法的封裝而已 17 # data = result.get(timeout=100) #使用get方法獲取返回值,會導(dǎo)致阻塞,相當(dāng)于同步執(zhí)行 18 19 20 def on_success(self, response): #回調(diào)函數(shù) 21 print ('Ok-- {}'.format(response))?
轉(zhuǎn)載于:https://www.cnblogs.com/hepingqingfeng/p/6655790.html
總結(jié)
以上是生活随笔為你收集整理的Tornado 高并发源码分析之六---异步编程的几种实现方式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ios 绘制字符串
- 下一篇: to_string作用