python高性能写法_py 高性能低级,高级写法思考
這里所謂的低級(jí),高級(jí)是指封裝抽象的程度。
低級(jí)指os.fork()
高級(jí)是指 multiprocessing包
一般根據(jù)業(yè)務(wù)需求,一個(gè)主進(jìn)程負(fù)責(zé)維護(hù)接收, 不同的子進(jìn)程處理不同的需求。根據(jù)各同需求組合
多進(jìn)程
多線程
多進(jìn)程+多線程
協(xié)程
也可基于uvloop事件啟動(dòng)方式
低級(jí)版
def main_process():
r = os.fork()
if r == 0 :
print("sub_process_buiness")
else:
print("main process buiness")
高級(jí)版
def sub_process_write_data():
pass
def sub_process_read_date():
pass
def main_handler():
from multiprocessing import Process
w1=Process(target=sub_process_write_data)
w1.start()
w1.join()
r1=Process(target=sub_process_read_data)
r1.start()
r1.join()
if __name__ == "__main__":
main_handler()
以下部分轉(zhuǎn)自
作者:曉可加油
鏈接:https://www.jianshu.com/p/414e89248285
來(lái)源:簡(jiǎn)書
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
Num01-->進(jìn)程的創(chuàng)建-fork
Python的os模塊封裝了常見的系統(tǒng)調(diào)用,其中就包括fork,可以在Python程序中輕松創(chuàng)建子進(jìn)程。
import os
# 注意,fork函數(shù),只在Unix/Linux/Mac上運(yùn)行,windows不可以
pid = os.fork()
if pid == 0:
print('我是子進(jìn)程')
else:
print('我是父進(jìn)程')
以上代碼加以說(shuō)明如下:
程序執(zhí)行到os.fork()時(shí),操作系統(tǒng)會(huì)創(chuàng)建一個(gè)新的進(jìn)程(子進(jìn)程),然后復(fù)制父進(jìn)程的所有信息到子進(jìn)程中。
然后父進(jìn)程和子進(jìn)程都會(huì)從fork()函數(shù)中得到一個(gè)返回值,在子進(jìn)程中這個(gè)值一定是0,而父進(jìn)程中是子進(jìn)程的 id號(hào)。
在Unix/Linux操作系統(tǒng)中,提供了一個(gè)fork()系統(tǒng)函數(shù),它非常特殊。
普通的函數(shù)調(diào)用,調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程(稱為父進(jìn)程)復(fù)制了一份(稱為子進(jìn)程),然后,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。
子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的ID。
這樣做的理由是,一個(gè)父進(jìn)程可以fork出很多子進(jìn)程,所以,父進(jìn)程要記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getppid()就可以拿到父進(jìn)程的ID。
Num02-->多進(jìn)程修改全局變量
``
coding=utf-8
import os
import time
num = 0
注意,fork函數(shù),只在Unix/Linux/Mac上運(yùn)行,windows不可以
pid = os.fork()
if pid == 0:
num+=1
print('我是子進(jìn)程---num=%d'%num)
else:
time.sleep(1)
num+=1
print('我是父進(jìn)程---num=%d'%num)
在多進(jìn)程中個(gè),每個(gè)進(jìn)程中所有數(shù)據(jù)(包括全局變量)都各擁有一份,互不影響。
Num03-->多次fork問(wèn)題
Test01-->fork兩次產(chǎn)生四個(gè)進(jìn)程
coding=utf-8
import os
import time
注意,fork函數(shù),只在Unix/Linux/Mac上運(yùn)行,windows不可以
pid = os.fork()
if pid == 0:
print('我是第一次fork中的子進(jìn)程')
else:
print('我是第一次fork中的父進(jìn)程')
pid = os.fork()
if pid == 0:
print('我是第二次fork中的子進(jìn)程')
else:
print('我是第二次fork中的父進(jìn)程')
time.sleep(1)
Test02-->fork兩次產(chǎn)生三個(gè)進(jìn)程
! /usr/bin/env python3
-- coding:utf-8 --
import os
import time
def sing():
print('--我是第一次fork的子進(jìn)程--')
time.sleep(1)
def dance():
ppid = os.fork()
if ppid > 0:
print('--我是第二次fork的父進(jìn)程--')
time.sleep(1)
elif ppid == 0:
print('--我是第二次fork的子進(jìn)程--')
time.sleep(1)
def main():
pid = os.fork()
if pid > 0:
dance()
elif pid == 0:
sing()
if name == "main":
main()
Num04-->進(jìn)程的第一種創(chuàng)建方式-multiprocessing
multiprocessing模塊提供了一個(gè)Process類來(lái)代表一個(gè)進(jìn)程對(duì)象。
coding=utf-8
from multiprocessing import Process
import os
子進(jìn)程要執(zhí)行的代碼
def fun_proc(name):
print('子進(jìn)程運(yùn)行中,name= %s ,pid=%d' % (name, os.getpid()))
if name=='main':
print('父進(jìn)程 %d' % os.getpid())
p = Process(target=fun_proc, args=('我是子進(jìn)程',))
print('子進(jìn)程將要執(zhí)行')
p.start()
p.join()
print('子進(jìn)程已結(jié)束')
對(duì)以上代碼加以說(shuō)明:
1,用Process類創(chuàng)建子進(jìn)程時(shí),只需要傳入一個(gè)執(zhí)行函數(shù)和函數(shù)的參數(shù)(是一個(gè)元組)。
2,調(diào)用start()方式啟動(dòng)子進(jìn)程。
3,join()方法可以等待子進(jìn)程結(jié)束后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步。
Test01-->Process的語(yǔ)法如下:
Process([group [, target [, name [, args [, kwargs]]]]])
target:表示這個(gè)進(jìn)程實(shí)例所調(diào)用對(duì)象;
args:表示調(diào)用對(duì)象的位置參數(shù)元組;
kwargs:表示調(diào)用對(duì)象的關(guān)鍵字參數(shù)字典;
name:為當(dāng)前進(jìn)程實(shí)例的別名;
group:大多數(shù)情況下用不到,表示在哪個(gè)組;
Process類常用方法:
is_alive():判斷進(jìn)程實(shí)例是否還在執(zhí)行;
join([timeout]):是否等待進(jìn)程實(shí)例執(zhí)行結(jié)束,或等待多少秒;
start():啟動(dòng)進(jìn)程實(shí)例(創(chuàng)建子進(jìn)程);
run():如果沒(méi)有給定target參數(shù),對(duì)這個(gè)對(duì)象調(diào)用start()方法時(shí),就將執(zhí)行對(duì)象中的run()方法;
terminate():不管任務(wù)是否完成,立即終止;
Process類常用屬性:
name:當(dāng)前進(jìn)程實(shí)例別名,默認(rèn)為Process-N,N為從1開始遞增的整數(shù);
pid:當(dāng)前進(jìn)程實(shí)例的PID值;
Test02-->創(chuàng)建一個(gè)進(jìn)程對(duì)象
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2017-04-25 16:36:47
# @Author : xiaoke
from multiprocessing import Process
import os
from time import sleep
# 子進(jìn)程要執(zhí)行的代碼
def fun_proc(name, age, **kwargs):
for i in range(5):
print('子進(jìn)程運(yùn)行中,name= %s,age=%d ,pid=%d...' % (name, age,os.getpid()))
print(kwargs)
sleep(1)
if __name__=='__main__':
print('父進(jìn)程 %d' % os.getpid())
p = Process(target=fun_proc, args=('我是子進(jìn)程',66), kwargs={"得分":666})
print('子進(jìn)程將要執(zhí)行')
p.start()
sleep(1)
# p.terminate()# 提前結(jié)束子進(jìn)程,不管子進(jìn)程的任務(wù)是否完成
p.join()
print('子進(jìn)程已結(jié)束')
# 結(jié)果如下:
# 父進(jìn)程 7744
# 子進(jìn)程將要執(zhí)行
# 子進(jìn)程運(yùn)行中,name= 我是子進(jìn)程,age=66 ,pid=8064...
# {'得分': 666}
# 子進(jìn)程運(yùn)行中,name= 我是子進(jìn)程,age=66 ,pid=8064...
# {'得分': 666}
# 子進(jìn)程運(yùn)行中,name= 我是子進(jìn)程,age=66 ,pid=8064...
# {'得分': 666}
# 子進(jìn)程運(yùn)行中,name= 我是子進(jìn)程,age=66 ,pid=8064...
# {'得分': 666}
# 子進(jìn)程運(yùn)行中,name= 我是子進(jìn)程,age=66 ,pid=8064...
# {'得分': 666}
# 子進(jìn)程已結(jié)束
Test03-->創(chuàng)建兩個(gè)進(jìn)程對(duì)象
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2017-04-25 16:36:47
# @Author : xiaoke
#coding=utf-8
from multiprocessing import Process
import time
import os
def worker_1(interval):
print("worker_1,父進(jìn)程(%s),當(dāng)前進(jìn)程(%s)"%(os.getppid(),os.getpid()))
t_start = time.time()
time.sleep(interval) #程序?qū)?huì)被掛起interval秒
t_end = time.time()
print("worker_1,執(zhí)行時(shí)間為'%0.2f'秒"%(t_end - t_start))
def worker_2(interval):
print("worker_2,父進(jìn)程(%s),當(dāng)前進(jìn)程(%s)"%(os.getppid(),os.getpid()))
t_start = time.time()
time.sleep(interval)
t_end = time.time()
print("worker_2,執(zhí)行時(shí)間為'%0.2f'秒"%(t_end - t_start))
def main():
#輸出當(dāng)前程序的ID
print("進(jìn)程ID:%s"%os.getpid())
#創(chuàng)建兩個(gè)進(jìn)程對(duì)象,target指向這個(gè)進(jìn)程對(duì)象要執(zhí)行的對(duì)象名稱,
#如果不指定name參數(shù),默認(rèn)的進(jìn)程對(duì)象名稱為Process-N,N為一個(gè)遞增的整數(shù)
p1=Process(target=worker_1,args=(2,))
p2=Process(target=worker_2,name="xiaoke",args=(1,))
#使用"進(jìn)程對(duì)象名稱.start()"來(lái)創(chuàng)建并執(zhí)行一個(gè)子進(jìn)程,
#這兩個(gè)進(jìn)程對(duì)象在start后,就會(huì)分別去執(zhí)行worker_1和worker_2方法中的內(nèi)容
p1.start()
p2.start()
#同時(shí)父進(jìn)程仍然往下執(zhí)行,如果p2進(jìn)程還在執(zhí)行,將會(huì)返回True
print("p2.is_alive=%s"%p2.is_alive())
print("p1.is_alive=%s"%p1.is_alive())
#輸出p1和p2進(jìn)程的別名和pid
print("--p1進(jìn)程的別名和pid--")
print("p1.name=%s"%p1.name)
print("p1.pid=%s"%p1.pid)
print("--p2進(jìn)程的別名和pid--")
print("p2.name=%s"%p2.name)
print("p2.pid=%s"%p2.pid)
#join括號(hào)中不攜帶參數(shù),表示父進(jìn)程在這個(gè)位置要等待p1進(jìn)程執(zhí)行完成后,再繼續(xù)執(zhí)行下面的語(yǔ)句,一般用于進(jìn)程間的數(shù)據(jù)同步,
# 如果不寫這一句,下面的is_alive判斷將會(huì)是True,
#可以嘗試著將下面的這條語(yǔ)句改成p1.join(1),
#因?yàn)閜2需要2秒以上才可能執(zhí)行完成,父進(jìn)程等待1秒很可能,不能讓p1完全執(zhí)行完成,
#所以下面的print會(huì)輸出True,即p1仍然在執(zhí)行
print("--p1進(jìn)程是否執(zhí)行完畢??--")
p1.join()
print("p1.is_alive=%s"%p1.is_alive())
p2.join()
print("p2.is_alive=%s"%p2.is_alive())
if __name__ == '__main__':
main()
# 結(jié)果如下:
# 進(jìn)程ID:4004
# p2.is_alive=True
# p1.is_alive=True
# --p1進(jìn)程的別名和pid--
# p1.name=Process-1
# p1.pid=3352
# --p2進(jìn)程的別名和pid--
# p2.name=xiaoke
# p2.pid=6092
# --p1進(jìn)程是否執(zhí)行完畢??--
# worker_2,父進(jìn)程(4004),當(dāng)前進(jìn)程(6092)
# worker_2,執(zhí)行時(shí)間為'1.00'秒
# worker_1,父進(jìn)程(4004),當(dāng)前進(jìn)程(3352)
# worker_1,執(zhí)行時(shí)間為'2.00'秒
# p1.is_alive=False
# p2.is_alive=False
Num05-->進(jìn)程的第二種創(chuàng)建方式--自己創(chuàng)建一個(gè)類,繼承Process類
定義:創(chuàng)建新的進(jìn)程還可以使用類的方式。可以自定義一個(gè)類,繼承Process類。每次實(shí)例化這個(gè)類的時(shí)候,就等同于實(shí)例化這個(gè)進(jìn)程對(duì)象。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date : 2017-04-25 16:36:47
# @Author : xiaoke
from multiprocessing import Process
import time
import os
#繼承Process類
class Process_Class(Process):
#因?yàn)镻rocess類本身也有__init__方法,這個(gè)子類相當(dāng)于重寫了這個(gè)方法,
#但這樣就會(huì)帶來(lái)一個(gè)問(wèn)題,我們并沒(méi)有完全的初始化一個(gè)Process類,
# 所以就不能使用從這個(gè)類繼承的一些方法和屬性,
#最好的方法就是將繼承類本身傳遞給Process.__init__方法,完成這些初始化操作
def __init__(self,interval):
Process.__init__(self)
# 傳遞進(jìn)來(lái)的屬性
self.interval = interval
#重寫了Process類的run()方法
def run(self):
print("子進(jìn)程(%s) 開始執(zhí)行,父進(jìn)程為(%s)"%(os.getpid(),os.getppid()))
t_start = time.time()
time.sleep(self.interval)
t_stop = time.time()
print("子進(jìn)程(%s)執(zhí)行結(jié)束,耗時(shí)%0.2f秒"%(os.getpid(),t_stop-t_start))
if __name__=="__main__":
t_start = time.time()
print("當(dāng)前程序進(jìn)程(%s)"%os.getpid())
p1 = Process_Class(2)
#對(duì)一個(gè)不包含target屬性的Process類執(zhí)行start()方法,就會(huì)運(yùn)行這個(gè)類中的run()方法,所以這里會(huì)執(zhí)行p1.run()
p1.start()
p1.join()
t_stop = time.time()
print("父進(jìn)程(%s)執(zhí)行結(jié)束,耗時(shí)%0.2f秒"%(os.getpid(),t_stop-t_start))
# 結(jié)果為:
# 當(dāng)前程序進(jìn)程(14736)
# 子進(jìn)程(4292) 開始執(zhí)行,父進(jìn)程為(14736)
# 子進(jìn)程(4292)執(zhí)行結(jié)束,耗時(shí)2.00秒
# 父進(jìn)程(14736)執(zhí)行結(jié)束,耗時(shí)2.11秒
Num06-->進(jìn)程池--Pool
當(dāng)需要?jiǎng)?chuàng)建的子進(jìn)程數(shù)量不多時(shí),可以直接利用multiprocessing中的Process動(dòng)態(tài)生成多個(gè)進(jìn)程。但如果是上百甚至上千個(gè)目標(biāo),手動(dòng)的去創(chuàng)建進(jìn)程的工作量巨大,此時(shí)就可以用到multiprocessing模塊提供的Pool方法。
初始化Pool時(shí),可以指定一個(gè)最大進(jìn)程數(shù),當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果池還沒(méi)有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來(lái)執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到指定的最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來(lái)執(zhí)行。
#采用非阻塞的方式
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/27 8:35
# @Author : xiaoke
# @Site :
# @File : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Pool
import os, time, random
def worker(num):
t_start = time.time()
print("子進(jìn)程-%s開始執(zhí)行,進(jìn)程號(hào)為%d" % (num, os.getpid()))
# random.random()隨機(jī)生成0~1之間的浮點(diǎn)數(shù)
time.sleep(random.random() * 2)
t_stop = time.time()
print("子進(jìn)程-%s執(zhí)行完畢,耗時(shí)%0.2f" % (num, t_stop - t_start))
def main():
# 定義一個(gè)進(jìn)程池,最大進(jìn)程數(shù)5
p = Pool(3)
for i in range(10):
# Pool.apply_async(要調(diào)用的目標(biāo),(傳遞給目標(biāo)的參數(shù)元祖,))
# 異步的方式,每次循環(huán)將會(huì)用空閑出來(lái)的子進(jìn)程去調(diào)用目標(biāo)
p.apply_async(worker, (i,))
m_start = time.time()
print("----start----")
p.close() # 關(guān)閉進(jìn)程池,關(guān)閉后p不再接收新的請(qǐng)求
p.join() # 等待p中所有子進(jìn)程執(zhí)行完成,必須放在close語(yǔ)句之后
print("-----end-----")
m_stop = time.time()
print("父進(jìn)程執(zhí)行完畢,耗時(shí)%0.2f" % (m_stop - m_start))
if __name__ == '__main__':
main()
# 結(jié)果如下:
# ----start----
# 子進(jìn)程-0開始執(zhí)行,進(jìn)程號(hào)為3360
# 子進(jìn)程-1開始執(zhí)行,進(jìn)程號(hào)為16084
# 子進(jìn)程-2開始執(zhí)行,進(jìn)程號(hào)為12580
# 子進(jìn)程-1執(zhí)行完畢,耗時(shí)0.05
# 子進(jìn)程-3開始執(zhí)行,進(jìn)程號(hào)為16084
# 子進(jìn)程-3執(zhí)行完畢,耗時(shí)0.80
# 子進(jìn)程-4開始執(zhí)行,進(jìn)程號(hào)為16084
# 子進(jìn)程-2執(zhí)行完畢,耗時(shí)1.63
# 子進(jìn)程-5開始執(zhí)行,進(jìn)程號(hào)為12580
# 子進(jìn)程-0執(zhí)行完畢,耗時(shí)1.80
# 子進(jìn)程-6開始執(zhí)行,進(jìn)程號(hào)為3360
# 子進(jìn)程-4執(zhí)行完畢,耗時(shí)1.55
# 子進(jìn)程-7開始執(zhí)行,進(jìn)程號(hào)為16084
# 子進(jìn)程-7執(zhí)行完畢,耗時(shí)0.07
# 子進(jìn)程-8開始執(zhí)行,進(jìn)程號(hào)為16084
# 子進(jìn)程-8執(zhí)行完畢,耗時(shí)0.05
# 子進(jìn)程-9開始執(zhí)行,進(jìn)程號(hào)為16084
# 子進(jìn)程-5執(zhí)行完畢,耗時(shí)1.01
# 子進(jìn)程-6執(zhí)行完畢,耗時(shí)1.10
# 子進(jìn)程-9執(zhí)行完畢,耗時(shí)1.57
# -----end-----
# 父進(jìn)程執(zhí)行完畢,耗時(shí)4.26
multiprocessing.Pool常用函數(shù)解析:
apply_async(func[, args[, kwds]]) :使用非阻塞方式調(diào)用func(并行執(zhí)行,堵塞方式必須等待上一個(gè)進(jìn)程退出才能執(zhí)行下一個(gè)進(jìn)程),args為傳遞給func的參數(shù)列表,kwds為傳遞給func的關(guān)鍵字參數(shù)列表;
apply(func[, args[, kwds]]):使用阻塞方式調(diào)用func
close():關(guān)閉Pool,使其不再接受新的任務(wù);
terminate():不管任務(wù)是否完成,立即終止;
join():主進(jìn)程阻塞,等待子進(jìn)程的退出, 必須在close或terminate之后使用;
采用apply阻塞的方式
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/27 8:35
# @Author : xiaoke
# @Site :
# @File : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Pool
import os, time, random
def worker(num):
t_start = time.time()
print("子進(jìn)程-%s開始執(zhí)行,進(jìn)程號(hào)為%d" % (num, os.getpid()))
# random.random()隨機(jī)生成0~1之間的浮點(diǎn)數(shù)
time.sleep(random.random() * 2)
t_stop = time.time()
print("子進(jìn)程-%s執(zhí)行完畢,耗時(shí)%0.2f" % (num, t_stop - t_start))
def main():
# 定義一個(gè)進(jìn)程池,最大進(jìn)程數(shù)5
p = Pool(3)
for i in range(10):
# Pool.apply_async(要調(diào)用的目標(biāo),(傳遞給目標(biāo)的參數(shù)元祖,))
# 異步的方式,每次循環(huán)將會(huì)用空閑出來(lái)的子進(jìn)程去調(diào)用目標(biāo)
p.apply(worker, (i,))
m_start = time.time()
print("----start----")
p.close() # 關(guān)閉進(jìn)程池,關(guān)閉后p不再接收新的請(qǐng)求
p.join() # 等待p中所有子進(jìn)程執(zhí)行完成,必須放在close語(yǔ)句之后
print("-----end-----")
m_stop = time.time()
print("父進(jìn)程執(zhí)行完畢,耗時(shí)%0.2f" % (m_stop - m_start))
if __name__ == '__main__':
main()
# 結(jié)果如下:
# 子進(jìn)程-0開始執(zhí)行,進(jìn)程號(hào)為4464
# 子進(jìn)程-0執(zhí)行完畢,耗時(shí)1.75
# 子進(jìn)程-1開始執(zhí)行,進(jìn)程號(hào)為11640
# 子進(jìn)程-1執(zhí)行完畢,耗時(shí)1.33
# 子進(jìn)程-2開始執(zhí)行,進(jìn)程號(hào)為8756
# 子進(jìn)程-2執(zhí)行完畢,耗時(shí)1.86
# 子進(jìn)程-3開始執(zhí)行,進(jìn)程號(hào)為4464
# 子進(jìn)程-3執(zhí)行完畢,耗時(shí)0.70
# 子進(jìn)程-4開始執(zhí)行,進(jìn)程號(hào)為11640
# 子進(jìn)程-4執(zhí)行完畢,耗時(shí)1.29
# 子進(jìn)程-5開始執(zhí)行,進(jìn)程號(hào)為8756
# 子進(jìn)程-5執(zhí)行完畢,耗時(shí)0.69
# 子進(jìn)程-6開始執(zhí)行,進(jìn)程號(hào)為4464
# 子進(jìn)程-6執(zhí)行完畢,耗時(shí)0.33
# 子進(jìn)程-7開始執(zhí)行,進(jìn)程號(hào)為11640
# 子進(jìn)程-7執(zhí)行完畢,耗時(shí)1.83
# 子進(jìn)程-8開始執(zhí)行,進(jìn)程號(hào)為8756
# 子進(jìn)程-8執(zhí)行完畢,耗時(shí)1.58
# 子進(jìn)程-9開始執(zhí)行,進(jìn)程號(hào)為4464
# 子進(jìn)程-9執(zhí)行完畢,耗時(shí)1.37
# ----start----
# -----end-----
# 父進(jìn)程執(zhí)行完畢,耗時(shí)0.08
Num07-->進(jìn)程間的通信--Queue
進(jìn)程(Process)之間有時(shí)間需要通信,操作系統(tǒng)提供了很多機(jī)制來(lái)實(shí)現(xiàn)進(jìn)程間的通信。如Queue、Pipes等。Queue本身是一個(gè)消息隊(duì)列。
Test01--> 先看一個(gè)簡(jiǎn)單的案例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/27 8:35
# @Author : xiaoke
# @Site :
# @File : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Queue
q = Queue(3) # 初始化一個(gè)Queue對(duì)象,最多可接收三條put消息
q.put("消息1")
q.put("消息2")
print(q.full()) # False
q.put("消息3")
print(q.full()) # True
# 因?yàn)橄⒘嘘?duì)已滿下面的try都會(huì)拋出異常,第一個(gè)try會(huì)等待3秒后再拋出異常,第二個(gè)Try會(huì)立刻拋出異常
try:
q.put("消息4", True, 3)
except:
print("消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:%s" % q.qsize())
try:
q.put_nowait("消息4")
except:
print("消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:%s" % q.qsize())
# 推薦的方式,先判斷消息列隊(duì)是否已滿,再寫入
if not q.full():
q.put_nowait("消息4")
# 讀取消息時(shí),先判斷消息列隊(duì)是否為空,再讀取
if not q.empty():
for i in range(q.qsize()):
# print("取出消息:%s" % q.get())
print("取出消息:%s" % q.get_nowait())
# 結(jié)果是:
# False
# True
# 消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:3
# 消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:3
# 取出消息:消息1
# 取出消息:消息2
# 取出消息:消息3
以上代碼加以說(shuō)明:
初始化Queue()對(duì)象時(shí)(例如:q=Queue()),若括號(hào)中沒(méi)有指定最大可接收的消息數(shù)量,或數(shù)量為負(fù)值,那么就代表可接受的消息數(shù)量沒(méi)有上限(直到內(nèi)存的盡頭);
Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量;
Queue.empty():如果隊(duì)列為空,返回True,反之False ;
Queue.full():如果隊(duì)列滿了,返回True,反之False;
Queue.get([block[, timeout]]):獲取隊(duì)列中的一條消息,然后將其從列隊(duì)中移除,block默認(rèn)值為True;
1)如果block使用默認(rèn)值,且沒(méi)有設(shè)置timeout(單位秒),消息列隊(duì)如果為空,此時(shí)程序?qū)⒈蛔枞?停在讀取狀態(tài)),直到從消息列隊(duì)讀到消息為止,如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒(méi)讀取到任何消息,則拋出"Queue.Empty"異常;
2)如果block值為False,消息列隊(duì)如果為空,則會(huì)立刻拋出"Queue.Empty"異常;
Queue.get_nowait():相當(dāng)Queue.get(False);
Queue.put(item,[block[, timeout]]):將item消息寫入隊(duì)列,block默認(rèn)值為True;
1)如果block使用默認(rèn)值,且沒(méi)有設(shè)置timeout(單位秒),消息列隊(duì)如果已經(jīng)沒(méi)有空間可寫入,此時(shí)程序?qū)⒈蛔枞?停在寫入狀態(tài)),直到從消息列隊(duì)騰出空間為止,如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒(méi)空間,則拋出"Queue.Full"異常;
2)如果block值為False,消息列隊(duì)如果沒(méi)有空間可寫入,則會(huì)立刻拋出"Queue.Full"異常;
Queue.put_nowait(item):相當(dāng)Queue.put(item, False);
Test02-->在父進(jìn)程創(chuàng)建兩個(gè)子進(jìn)程,一個(gè)往Queue里面寫數(shù)據(jù),一個(gè)從Queue里面讀數(shù)據(jù)。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/27 8:35
# @Author : xiaoke
# @Site :
# @File : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Process, Queue
import os, time, random
# 寫數(shù)據(jù)進(jìn)程
def write(q):
for value in ['A', 'B', 'C', 'quit']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數(shù)據(jù)進(jìn)程
def read(q):
while True:
if not q.empty():
value = q.get(True)
print('Get %s from queue.' % value)
# 因?yàn)樽x進(jìn)程是一個(gè)死循環(huán),所以要設(shè)置一個(gè)標(biāo)記,用于退出
if value == "quit":
break
time.sleep(random.random())
if __name__ == '__main__':
# 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動(dòng)子進(jìn)程pw,寫入:
pw.start()
# 等待pw結(jié)束:
pw.join()
# 啟動(dòng)子進(jìn)程pr,讀取:
pr.start()
pr.join()
print('所有數(shù)據(jù)都寫入并且讀完')
print("pw is alive:%s" % pw.is_alive())
print("pr is alive:%s" % pr.is_alive())
# 結(jié)果如下:
# Put A to queue...
# Put B to queue...
# Put C to queue...
# Put quit to queue...
# Get A from queue.
# Get B from queue.
# Get C from queue.
# Get quit from queue.
# 所有數(shù)據(jù)都寫入并且讀完
# pw is alive:False
# pr is alive:False
Test03-->進(jìn)程池Pool中的Queue來(lái)進(jìn)行進(jìn)程間的通信
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
# @Site :
# @File : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Pool, Queue, Manager
import os
import time
import random
def task_write(q):
for s in ('hello', 'python', 'world', 'quit'):
q.put(s) # 向隊(duì)列中添加消息
print('%s 進(jìn)程向隊(duì)列中添加消息:%s' % (os.getpid(), s))
time.sleep(random.random() * 2)
print('%s 進(jìn)程要結(jié)束了' % os.getpid())
def task_read(q):
while True:
msg = q.get() # 阻塞式從隊(duì)列中收消息
print('%s 進(jìn)程從隊(duì)列中取出消息:%s' % (os.getpid(), msg))
if msg == "quit":
break
time.sleep(random.random() * 2)
print('%s 進(jìn)程要結(jié)束了' % os.getpid())
def main():
# 1.創(chuàng)建消息隊(duì)列對(duì)象
# q = Queue() #只能用于父子進(jìn)程
# Manger().Queue() 消息隊(duì)列可用于進(jìn)程池
q = Manager().Queue()
# 2.創(chuàng)建進(jìn)程池,里面放兩個(gè)進(jìn)程
my_pool = Pool(2)
# 3.添加任務(wù)
# 采用阻塞的方式
my_pool.apply(task_write, args=(q,))
my_pool.apply(task_read, args=(q,))
# 采用非阻塞的方式
# my_pool.apply_async(task_write, args=(q,))
# my_pool.apply_async(task_read, args=(q,))
# 4.關(guān)閉進(jìn)程池
my_pool.close()
# 5.等待所有進(jìn)程結(jié)束
my_pool.join()
if __name__ == "__main__":
main()
# 采用非阻塞的方式結(jié)果:
# 7380 進(jìn)程向隊(duì)列中添加消息:hello
# 11256 進(jìn)程從隊(duì)列中取出消息:hello
# 7380 進(jìn)程向隊(duì)列中添加消息:python
# 11256 進(jìn)程從隊(duì)列中取出消息:python
# 7380 進(jìn)程向隊(duì)列中添加消息:world
# 11256 進(jìn)程從隊(duì)列中取出消息:world
# 7380 進(jìn)程向隊(duì)列中添加消息:quit
# 11256 進(jìn)程從隊(duì)列中取出消息:quit
# 11256 進(jìn)程要結(jié)束了
# 7380 進(jìn)程要結(jié)束了
# 采用阻塞的方式結(jié)果:
# 96 進(jìn)程向隊(duì)列中添加消息:hello
# 96 進(jìn)程向隊(duì)列中添加消息:python
# 96 進(jìn)程向隊(duì)列中添加消息:world
# 96 進(jìn)程向隊(duì)列中添加消息:quit
# 96 進(jìn)程要結(jié)束了
# 12412 進(jìn)程從隊(duì)列中取出消息:hello
# 12412 進(jìn)程從隊(duì)列中取出消息:python
# 12412 進(jìn)程從隊(duì)列中取出消息:world
# 12412 進(jìn)程從隊(duì)列中取出消息:quit
# 12412 進(jìn)程要結(jié)束了
總結(jié)
以上是生活随笔為你收集整理的python高性能写法_py 高性能低级,高级写法思考的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: RepairImages\superbo
- 下一篇: 实例15:python