日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > python >内容正文

python

python高性能写法_py 高性能低级,高级写法思考

發(fā)布時(shí)間:2023/12/10 python 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python高性能写法_py 高性能低级,高级写法思考 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

這里所謂的低級(jí),高級(jí)是指封裝抽象的程度。

低級(jí)指os.fork()

高級(jí)是指 multiprocessing包

一般根據(jù)業(yè)務(wù)需求,一個(gè)主進(jìn)程負(fù)責(zé)維護(hù)接收, 不同的子進(jìn)程處理不同的需求。根據(jù)各同需求組合

多進(jìn)程

多線程

多進(jìn)程+多線程

協(xié)程

也可基于uvloop事件啟動(dòng)方式

低級(jí)版

def main_process():

r = os.fork()

if r == 0 :

print("sub_process_buiness")

else:

print("main process buiness")

高級(jí)版

def sub_process_write_data():

pass

def sub_process_read_date():

pass

def main_handler():

from multiprocessing import Process

w1=Process(target=sub_process_write_data)

w1.start()

w1.join()

r1=Process(target=sub_process_read_data)

r1.start()

r1.join()

if __name__ == "__main__":

main_handler()

以下部分轉(zhuǎn)自

作者:曉可加油

鏈接:https://www.jianshu.com/p/414e89248285

來(lái)源:簡(jiǎn)書

著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

Num01-->進(jìn)程的創(chuàng)建-fork

Python的os模塊封裝了常見的系統(tǒng)調(diào)用,其中就包括fork,可以在Python程序中輕松創(chuàng)建子進(jìn)程。

import os

# 注意,fork函數(shù),只在Unix/Linux/Mac上運(yùn)行,windows不可以

pid = os.fork()

if pid == 0:

print('我是子進(jìn)程')

else:

print('我是父進(jìn)程')

以上代碼加以說(shuō)明如下:

程序執(zhí)行到os.fork()時(shí),操作系統(tǒng)會(huì)創(chuàng)建一個(gè)新的進(jìn)程(子進(jìn)程),然后復(fù)制父進(jìn)程的所有信息到子進(jìn)程中。

然后父進(jìn)程和子進(jìn)程都會(huì)從fork()函數(shù)中得到一個(gè)返回值,在子進(jìn)程中這個(gè)值一定是0,而父進(jìn)程中是子進(jìn)程的 id號(hào)。

在Unix/Linux操作系統(tǒng)中,提供了一個(gè)fork()系統(tǒng)函數(shù),它非常特殊。

普通的函數(shù)調(diào)用,調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程(稱為父進(jìn)程)復(fù)制了一份(稱為子進(jìn)程),然后,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。

子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的ID。

這樣做的理由是,一個(gè)父進(jìn)程可以fork出很多子進(jìn)程,所以,父進(jìn)程要記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getppid()就可以拿到父進(jìn)程的ID。

Num02-->多進(jìn)程修改全局變量

``

coding=utf-8

import os

import time

num = 0

注意,fork函數(shù),只在Unix/Linux/Mac上運(yùn)行,windows不可以

pid = os.fork()

if pid == 0:

num+=1

print('我是子進(jìn)程---num=%d'%num)

else:

time.sleep(1)

num+=1

print('我是父進(jìn)程---num=%d'%num)

在多進(jìn)程中個(gè),每個(gè)進(jìn)程中所有數(shù)據(jù)(包括全局變量)都各擁有一份,互不影響。

Num03-->多次fork問(wèn)題

Test01-->fork兩次產(chǎn)生四個(gè)進(jìn)程

coding=utf-8

import os

import time

注意,fork函數(shù),只在Unix/Linux/Mac上運(yùn)行,windows不可以

pid = os.fork()

if pid == 0:

print('我是第一次fork中的子進(jìn)程')

else:

print('我是第一次fork中的父進(jìn)程')

pid = os.fork()

if pid == 0:

print('我是第二次fork中的子進(jìn)程')

else:

print('我是第二次fork中的父進(jìn)程')

time.sleep(1)

Test02-->fork兩次產(chǎn)生三個(gè)進(jìn)程

! /usr/bin/env python3

-- coding:utf-8 --

import os

import time

def sing():

print('--我是第一次fork的子進(jìn)程--')

time.sleep(1)

def dance():

ppid = os.fork()

if ppid > 0:

print('--我是第二次fork的父進(jìn)程--')

time.sleep(1)

elif ppid == 0:

print('--我是第二次fork的子進(jìn)程--')

time.sleep(1)

def main():

pid = os.fork()

if pid > 0:

dance()

elif pid == 0:

sing()

if name == "main":

main()

Num04-->進(jìn)程的第一種創(chuàng)建方式-multiprocessing

multiprocessing模塊提供了一個(gè)Process類來(lái)代表一個(gè)進(jìn)程對(duì)象。

coding=utf-8

from multiprocessing import Process

import os

子進(jìn)程要執(zhí)行的代碼

def fun_proc(name):

print('子進(jìn)程運(yùn)行中,name= %s ,pid=%d' % (name, os.getpid()))

if name=='main':

print('父進(jìn)程 %d' % os.getpid())

p = Process(target=fun_proc, args=('我是子進(jìn)程',))

print('子進(jìn)程將要執(zhí)行')

p.start()

p.join()

print('子進(jìn)程已結(jié)束')

對(duì)以上代碼加以說(shuō)明:

1,用Process類創(chuàng)建子進(jìn)程時(shí),只需要傳入一個(gè)執(zhí)行函數(shù)和函數(shù)的參數(shù)(是一個(gè)元組)。

2,調(diào)用start()方式啟動(dòng)子進(jìn)程。

3,join()方法可以等待子進(jìn)程結(jié)束后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步。

Test01-->Process的語(yǔ)法如下:

Process([group [, target [, name [, args [, kwargs]]]]])

target:表示這個(gè)進(jìn)程實(shí)例所調(diào)用對(duì)象;

args:表示調(diào)用對(duì)象的位置參數(shù)元組;

kwargs:表示調(diào)用對(duì)象的關(guān)鍵字參數(shù)字典;

name:為當(dāng)前進(jìn)程實(shí)例的別名;

group:大多數(shù)情況下用不到,表示在哪個(gè)組;

Process類常用方法:

is_alive():判斷進(jìn)程實(shí)例是否還在執(zhí)行;

join([timeout]):是否等待進(jìn)程實(shí)例執(zhí)行結(jié)束,或等待多少秒;

start():啟動(dòng)進(jìn)程實(shí)例(創(chuàng)建子進(jìn)程);

run():如果沒(méi)有給定target參數(shù),對(duì)這個(gè)對(duì)象調(diào)用start()方法時(shí),就將執(zhí)行對(duì)象中的run()方法;

terminate():不管任務(wù)是否完成,立即終止;

Process類常用屬性:

name:當(dāng)前進(jìn)程實(shí)例別名,默認(rèn)為Process-N,N為從1開始遞增的整數(shù);

pid:當(dāng)前進(jìn)程實(shí)例的PID值;

Test02-->創(chuàng)建一個(gè)進(jìn)程對(duì)象

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @Date : 2017-04-25 16:36:47

# @Author : xiaoke

from multiprocessing import Process

import os

from time import sleep

# 子進(jìn)程要執(zhí)行的代碼

def fun_proc(name, age, **kwargs):

for i in range(5):

print('子進(jìn)程運(yùn)行中,name= %s,age=%d ,pid=%d...' % (name, age,os.getpid()))

print(kwargs)

sleep(1)

if __name__=='__main__':

print('父進(jìn)程 %d' % os.getpid())

p = Process(target=fun_proc, args=('我是子進(jìn)程',66), kwargs={"得分":666})

print('子進(jìn)程將要執(zhí)行')

p.start()

sleep(1)

# p.terminate()# 提前結(jié)束子進(jìn)程,不管子進(jìn)程的任務(wù)是否完成

p.join()

print('子進(jìn)程已結(jié)束')

# 結(jié)果如下:

# 父進(jìn)程 7744

# 子進(jìn)程將要執(zhí)行

# 子進(jìn)程運(yùn)行中,name= 我是子進(jìn)程,age=66 ,pid=8064...

# {'得分': 666}

# 子進(jìn)程運(yùn)行中,name= 我是子進(jìn)程,age=66 ,pid=8064...

# {'得分': 666}

# 子進(jìn)程運(yùn)行中,name= 我是子進(jìn)程,age=66 ,pid=8064...

# {'得分': 666}

# 子進(jìn)程運(yùn)行中,name= 我是子進(jìn)程,age=66 ,pid=8064...

# {'得分': 666}

# 子進(jìn)程運(yùn)行中,name= 我是子進(jìn)程,age=66 ,pid=8064...

# {'得分': 666}

# 子進(jìn)程已結(jié)束

Test03-->創(chuàng)建兩個(gè)進(jìn)程對(duì)象

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @Date : 2017-04-25 16:36:47

# @Author : xiaoke

#coding=utf-8

from multiprocessing import Process

import time

import os

def worker_1(interval):

print("worker_1,父進(jìn)程(%s),當(dāng)前進(jìn)程(%s)"%(os.getppid(),os.getpid()))

t_start = time.time()

time.sleep(interval) #程序?qū)?huì)被掛起interval秒

t_end = time.time()

print("worker_1,執(zhí)行時(shí)間為'%0.2f'秒"%(t_end - t_start))

def worker_2(interval):

print("worker_2,父進(jìn)程(%s),當(dāng)前進(jìn)程(%s)"%(os.getppid(),os.getpid()))

t_start = time.time()

time.sleep(interval)

t_end = time.time()

print("worker_2,執(zhí)行時(shí)間為'%0.2f'秒"%(t_end - t_start))

def main():

#輸出當(dāng)前程序的ID

print("進(jìn)程ID:%s"%os.getpid())

#創(chuàng)建兩個(gè)進(jìn)程對(duì)象,target指向這個(gè)進(jìn)程對(duì)象要執(zhí)行的對(duì)象名稱,

#如果不指定name參數(shù),默認(rèn)的進(jìn)程對(duì)象名稱為Process-N,N為一個(gè)遞增的整數(shù)

p1=Process(target=worker_1,args=(2,))

p2=Process(target=worker_2,name="xiaoke",args=(1,))

#使用"進(jìn)程對(duì)象名稱.start()"來(lái)創(chuàng)建并執(zhí)行一個(gè)子進(jìn)程,

#這兩個(gè)進(jìn)程對(duì)象在start后,就會(huì)分別去執(zhí)行worker_1和worker_2方法中的內(nèi)容

p1.start()

p2.start()

#同時(shí)父進(jìn)程仍然往下執(zhí)行,如果p2進(jìn)程還在執(zhí)行,將會(huì)返回True

print("p2.is_alive=%s"%p2.is_alive())

print("p1.is_alive=%s"%p1.is_alive())

#輸出p1和p2進(jìn)程的別名和pid

print("--p1進(jìn)程的別名和pid--")

print("p1.name=%s"%p1.name)

print("p1.pid=%s"%p1.pid)

print("--p2進(jìn)程的別名和pid--")

print("p2.name=%s"%p2.name)

print("p2.pid=%s"%p2.pid)

#join括號(hào)中不攜帶參數(shù),表示父進(jìn)程在這個(gè)位置要等待p1進(jìn)程執(zhí)行完成后,再繼續(xù)執(zhí)行下面的語(yǔ)句,一般用于進(jìn)程間的數(shù)據(jù)同步,

# 如果不寫這一句,下面的is_alive判斷將會(huì)是True,

#可以嘗試著將下面的這條語(yǔ)句改成p1.join(1),

#因?yàn)閜2需要2秒以上才可能執(zhí)行完成,父進(jìn)程等待1秒很可能,不能讓p1完全執(zhí)行完成,

#所以下面的print會(huì)輸出True,即p1仍然在執(zhí)行

print("--p1進(jìn)程是否執(zhí)行完畢??--")

p1.join()

print("p1.is_alive=%s"%p1.is_alive())

p2.join()

print("p2.is_alive=%s"%p2.is_alive())

if __name__ == '__main__':

main()

# 結(jié)果如下:

# 進(jìn)程ID:4004

# p2.is_alive=True

# p1.is_alive=True

# --p1進(jìn)程的別名和pid--

# p1.name=Process-1

# p1.pid=3352

# --p2進(jìn)程的別名和pid--

# p2.name=xiaoke

# p2.pid=6092

# --p1進(jìn)程是否執(zhí)行完畢??--

# worker_2,父進(jìn)程(4004),當(dāng)前進(jìn)程(6092)

# worker_2,執(zhí)行時(shí)間為'1.00'秒

# worker_1,父進(jìn)程(4004),當(dāng)前進(jìn)程(3352)

# worker_1,執(zhí)行時(shí)間為'2.00'秒

# p1.is_alive=False

# p2.is_alive=False

Num05-->進(jìn)程的第二種創(chuàng)建方式--自己創(chuàng)建一個(gè)類,繼承Process類

定義:創(chuàng)建新的進(jìn)程還可以使用類的方式。可以自定義一個(gè)類,繼承Process類。每次實(shí)例化這個(gè)類的時(shí)候,就等同于實(shí)例化這個(gè)進(jìn)程對(duì)象。

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @Date : 2017-04-25 16:36:47

# @Author : xiaoke

from multiprocessing import Process

import time

import os

#繼承Process類

class Process_Class(Process):

#因?yàn)镻rocess類本身也有__init__方法,這個(gè)子類相當(dāng)于重寫了這個(gè)方法,

#但這樣就會(huì)帶來(lái)一個(gè)問(wèn)題,我們并沒(méi)有完全的初始化一個(gè)Process類,

# 所以就不能使用從這個(gè)類繼承的一些方法和屬性,

#最好的方法就是將繼承類本身傳遞給Process.__init__方法,完成這些初始化操作

def __init__(self,interval):

Process.__init__(self)

# 傳遞進(jìn)來(lái)的屬性

self.interval = interval

#重寫了Process類的run()方法

def run(self):

print("子進(jìn)程(%s) 開始執(zhí)行,父進(jìn)程為(%s)"%(os.getpid(),os.getppid()))

t_start = time.time()

time.sleep(self.interval)

t_stop = time.time()

print("子進(jìn)程(%s)執(zhí)行結(jié)束,耗時(shí)%0.2f秒"%(os.getpid(),t_stop-t_start))

if __name__=="__main__":

t_start = time.time()

print("當(dāng)前程序進(jìn)程(%s)"%os.getpid())

p1 = Process_Class(2)

#對(duì)一個(gè)不包含target屬性的Process類執(zhí)行start()方法,就會(huì)運(yùn)行這個(gè)類中的run()方法,所以這里會(huì)執(zhí)行p1.run()

p1.start()

p1.join()

t_stop = time.time()

print("父進(jìn)程(%s)執(zhí)行結(jié)束,耗時(shí)%0.2f秒"%(os.getpid(),t_stop-t_start))

# 結(jié)果為:

# 當(dāng)前程序進(jìn)程(14736)

# 子進(jìn)程(4292) 開始執(zhí)行,父進(jìn)程為(14736)

# 子進(jìn)程(4292)執(zhí)行結(jié)束,耗時(shí)2.00秒

# 父進(jìn)程(14736)執(zhí)行結(jié)束,耗時(shí)2.11秒

Num06-->進(jìn)程池--Pool

當(dāng)需要?jiǎng)?chuàng)建的子進(jìn)程數(shù)量不多時(shí),可以直接利用multiprocessing中的Process動(dòng)態(tài)生成多個(gè)進(jìn)程。但如果是上百甚至上千個(gè)目標(biāo),手動(dòng)的去創(chuàng)建進(jìn)程的工作量巨大,此時(shí)就可以用到multiprocessing模塊提供的Pool方法。

初始化Pool時(shí),可以指定一個(gè)最大進(jìn)程數(shù),當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果池還沒(méi)有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來(lái)執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到指定的最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來(lái)執(zhí)行。

#采用非阻塞的方式

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @Time : 2017/4/27 8:35

# @Author : xiaoke

# @Site :

# @File : Test33.py

# @Software: PyCharm Community Edition

from multiprocessing import Pool

import os, time, random

def worker(num):

t_start = time.time()

print("子進(jìn)程-%s開始執(zhí)行,進(jìn)程號(hào)為%d" % (num, os.getpid()))

# random.random()隨機(jī)生成0~1之間的浮點(diǎn)數(shù)

time.sleep(random.random() * 2)

t_stop = time.time()

print("子進(jìn)程-%s執(zhí)行完畢,耗時(shí)%0.2f" % (num, t_stop - t_start))

def main():

# 定義一個(gè)進(jìn)程池,最大進(jìn)程數(shù)5

p = Pool(3)

for i in range(10):

# Pool.apply_async(要調(diào)用的目標(biāo),(傳遞給目標(biāo)的參數(shù)元祖,))

# 異步的方式,每次循環(huán)將會(huì)用空閑出來(lái)的子進(jìn)程去調(diào)用目標(biāo)

p.apply_async(worker, (i,))

m_start = time.time()

print("----start----")

p.close() # 關(guān)閉進(jìn)程池,關(guān)閉后p不再接收新的請(qǐng)求

p.join() # 等待p中所有子進(jìn)程執(zhí)行完成,必須放在close語(yǔ)句之后

print("-----end-----")

m_stop = time.time()

print("父進(jìn)程執(zhí)行完畢,耗時(shí)%0.2f" % (m_stop - m_start))

if __name__ == '__main__':

main()

# 結(jié)果如下:

# ----start----

# 子進(jìn)程-0開始執(zhí)行,進(jìn)程號(hào)為3360

# 子進(jìn)程-1開始執(zhí)行,進(jìn)程號(hào)為16084

# 子進(jìn)程-2開始執(zhí)行,進(jìn)程號(hào)為12580

# 子進(jìn)程-1執(zhí)行完畢,耗時(shí)0.05

# 子進(jìn)程-3開始執(zhí)行,進(jìn)程號(hào)為16084

# 子進(jìn)程-3執(zhí)行完畢,耗時(shí)0.80

# 子進(jìn)程-4開始執(zhí)行,進(jìn)程號(hào)為16084

# 子進(jìn)程-2執(zhí)行完畢,耗時(shí)1.63

# 子進(jìn)程-5開始執(zhí)行,進(jìn)程號(hào)為12580

# 子進(jìn)程-0執(zhí)行完畢,耗時(shí)1.80

# 子進(jìn)程-6開始執(zhí)行,進(jìn)程號(hào)為3360

# 子進(jìn)程-4執(zhí)行完畢,耗時(shí)1.55

# 子進(jìn)程-7開始執(zhí)行,進(jìn)程號(hào)為16084

# 子進(jìn)程-7執(zhí)行完畢,耗時(shí)0.07

# 子進(jìn)程-8開始執(zhí)行,進(jìn)程號(hào)為16084

# 子進(jìn)程-8執(zhí)行完畢,耗時(shí)0.05

# 子進(jìn)程-9開始執(zhí)行,進(jìn)程號(hào)為16084

# 子進(jìn)程-5執(zhí)行完畢,耗時(shí)1.01

# 子進(jìn)程-6執(zhí)行完畢,耗時(shí)1.10

# 子進(jìn)程-9執(zhí)行完畢,耗時(shí)1.57

# -----end-----

# 父進(jìn)程執(zhí)行完畢,耗時(shí)4.26

multiprocessing.Pool常用函數(shù)解析:

apply_async(func[, args[, kwds]]) :使用非阻塞方式調(diào)用func(并行執(zhí)行,堵塞方式必須等待上一個(gè)進(jìn)程退出才能執(zhí)行下一個(gè)進(jìn)程),args為傳遞給func的參數(shù)列表,kwds為傳遞給func的關(guān)鍵字參數(shù)列表;

apply(func[, args[, kwds]]):使用阻塞方式調(diào)用func

close():關(guān)閉Pool,使其不再接受新的任務(wù);

terminate():不管任務(wù)是否完成,立即終止;

join():主進(jìn)程阻塞,等待子進(jìn)程的退出, 必須在close或terminate之后使用;

采用apply阻塞的方式

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @Time : 2017/4/27 8:35

# @Author : xiaoke

# @Site :

# @File : Test33.py

# @Software: PyCharm Community Edition

from multiprocessing import Pool

import os, time, random

def worker(num):

t_start = time.time()

print("子進(jìn)程-%s開始執(zhí)行,進(jìn)程號(hào)為%d" % (num, os.getpid()))

# random.random()隨機(jī)生成0~1之間的浮點(diǎn)數(shù)

time.sleep(random.random() * 2)

t_stop = time.time()

print("子進(jìn)程-%s執(zhí)行完畢,耗時(shí)%0.2f" % (num, t_stop - t_start))

def main():

# 定義一個(gè)進(jìn)程池,最大進(jìn)程數(shù)5

p = Pool(3)

for i in range(10):

# Pool.apply_async(要調(diào)用的目標(biāo),(傳遞給目標(biāo)的參數(shù)元祖,))

# 異步的方式,每次循環(huán)將會(huì)用空閑出來(lái)的子進(jìn)程去調(diào)用目標(biāo)

p.apply(worker, (i,))

m_start = time.time()

print("----start----")

p.close() # 關(guān)閉進(jìn)程池,關(guān)閉后p不再接收新的請(qǐng)求

p.join() # 等待p中所有子進(jìn)程執(zhí)行完成,必須放在close語(yǔ)句之后

print("-----end-----")

m_stop = time.time()

print("父進(jìn)程執(zhí)行完畢,耗時(shí)%0.2f" % (m_stop - m_start))

if __name__ == '__main__':

main()

# 結(jié)果如下:

# 子進(jìn)程-0開始執(zhí)行,進(jìn)程號(hào)為4464

# 子進(jìn)程-0執(zhí)行完畢,耗時(shí)1.75

# 子進(jìn)程-1開始執(zhí)行,進(jìn)程號(hào)為11640

# 子進(jìn)程-1執(zhí)行完畢,耗時(shí)1.33

# 子進(jìn)程-2開始執(zhí)行,進(jìn)程號(hào)為8756

# 子進(jìn)程-2執(zhí)行完畢,耗時(shí)1.86

# 子進(jìn)程-3開始執(zhí)行,進(jìn)程號(hào)為4464

# 子進(jìn)程-3執(zhí)行完畢,耗時(shí)0.70

# 子進(jìn)程-4開始執(zhí)行,進(jìn)程號(hào)為11640

# 子進(jìn)程-4執(zhí)行完畢,耗時(shí)1.29

# 子進(jìn)程-5開始執(zhí)行,進(jìn)程號(hào)為8756

# 子進(jìn)程-5執(zhí)行完畢,耗時(shí)0.69

# 子進(jìn)程-6開始執(zhí)行,進(jìn)程號(hào)為4464

# 子進(jìn)程-6執(zhí)行完畢,耗時(shí)0.33

# 子進(jìn)程-7開始執(zhí)行,進(jìn)程號(hào)為11640

# 子進(jìn)程-7執(zhí)行完畢,耗時(shí)1.83

# 子進(jìn)程-8開始執(zhí)行,進(jìn)程號(hào)為8756

# 子進(jìn)程-8執(zhí)行完畢,耗時(shí)1.58

# 子進(jìn)程-9開始執(zhí)行,進(jìn)程號(hào)為4464

# 子進(jìn)程-9執(zhí)行完畢,耗時(shí)1.37

# ----start----

# -----end-----

# 父進(jìn)程執(zhí)行完畢,耗時(shí)0.08

Num07-->進(jìn)程間的通信--Queue

進(jìn)程(Process)之間有時(shí)間需要通信,操作系統(tǒng)提供了很多機(jī)制來(lái)實(shí)現(xiàn)進(jìn)程間的通信。如Queue、Pipes等。Queue本身是一個(gè)消息隊(duì)列。

Test01--> 先看一個(gè)簡(jiǎn)單的案例:

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @Time : 2017/4/27 8:35

# @Author : xiaoke

# @Site :

# @File : Test33.py

# @Software: PyCharm Community Edition

from multiprocessing import Queue

q = Queue(3) # 初始化一個(gè)Queue對(duì)象,最多可接收三條put消息

q.put("消息1")

q.put("消息2")

print(q.full()) # False

q.put("消息3")

print(q.full()) # True

# 因?yàn)橄⒘嘘?duì)已滿下面的try都會(huì)拋出異常,第一個(gè)try會(huì)等待3秒后再拋出異常,第二個(gè)Try會(huì)立刻拋出異常

try:

q.put("消息4", True, 3)

except:

print("消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:%s" % q.qsize())

try:

q.put_nowait("消息4")

except:

print("消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:%s" % q.qsize())

# 推薦的方式,先判斷消息列隊(duì)是否已滿,再寫入

if not q.full():

q.put_nowait("消息4")

# 讀取消息時(shí),先判斷消息列隊(duì)是否為空,再讀取

if not q.empty():

for i in range(q.qsize()):

# print("取出消息:%s" % q.get())

print("取出消息:%s" % q.get_nowait())

# 結(jié)果是:

# False

# True

# 消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:3

# 消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:3

# 取出消息:消息1

# 取出消息:消息2

# 取出消息:消息3

以上代碼加以說(shuō)明:

初始化Queue()對(duì)象時(shí)(例如:q=Queue()),若括號(hào)中沒(méi)有指定最大可接收的消息數(shù)量,或數(shù)量為負(fù)值,那么就代表可接受的消息數(shù)量沒(méi)有上限(直到內(nèi)存的盡頭);

Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量;

Queue.empty():如果隊(duì)列為空,返回True,反之False ;

Queue.full():如果隊(duì)列滿了,返回True,反之False;

Queue.get([block[, timeout]]):獲取隊(duì)列中的一條消息,然后將其從列隊(duì)中移除,block默認(rèn)值為True;

1)如果block使用默認(rèn)值,且沒(méi)有設(shè)置timeout(單位秒),消息列隊(duì)如果為空,此時(shí)程序?qū)⒈蛔枞?停在讀取狀態(tài)),直到從消息列隊(duì)讀到消息為止,如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒(méi)讀取到任何消息,則拋出"Queue.Empty"異常;

2)如果block值為False,消息列隊(duì)如果為空,則會(huì)立刻拋出"Queue.Empty"異常;

Queue.get_nowait():相當(dāng)Queue.get(False);

Queue.put(item,[block[, timeout]]):將item消息寫入隊(duì)列,block默認(rèn)值為True;

1)如果block使用默認(rèn)值,且沒(méi)有設(shè)置timeout(單位秒),消息列隊(duì)如果已經(jīng)沒(méi)有空間可寫入,此時(shí)程序?qū)⒈蛔枞?停在寫入狀態(tài)),直到從消息列隊(duì)騰出空間為止,如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒(méi)空間,則拋出"Queue.Full"異常;

2)如果block值為False,消息列隊(duì)如果沒(méi)有空間可寫入,則會(huì)立刻拋出"Queue.Full"異常;

Queue.put_nowait(item):相當(dāng)Queue.put(item, False);

Test02-->在父進(jìn)程創(chuàng)建兩個(gè)子進(jìn)程,一個(gè)往Queue里面寫數(shù)據(jù),一個(gè)從Queue里面讀數(shù)據(jù)。

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @Time : 2017/4/27 8:35

# @Author : xiaoke

# @Site :

# @File : Test33.py

# @Software: PyCharm Community Edition

from multiprocessing import Process, Queue

import os, time, random

# 寫數(shù)據(jù)進(jìn)程

def write(q):

for value in ['A', 'B', 'C', 'quit']:

print('Put %s to queue...' % value)

q.put(value)

time.sleep(random.random())

# 讀數(shù)據(jù)進(jìn)程

def read(q):

while True:

if not q.empty():

value = q.get(True)

print('Get %s from queue.' % value)

# 因?yàn)樽x進(jìn)程是一個(gè)死循環(huán),所以要設(shè)置一個(gè)標(biāo)記,用于退出

if value == "quit":

break

time.sleep(random.random())

if __name__ == '__main__':

# 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:

q = Queue()

pw = Process(target=write, args=(q,))

pr = Process(target=read, args=(q,))

# 啟動(dòng)子進(jìn)程pw,寫入:

pw.start()

# 等待pw結(jié)束:

pw.join()

# 啟動(dòng)子進(jìn)程pr,讀取:

pr.start()

pr.join()

print('所有數(shù)據(jù)都寫入并且讀完')

print("pw is alive:%s" % pw.is_alive())

print("pr is alive:%s" % pr.is_alive())

# 結(jié)果如下:

# Put A to queue...

# Put B to queue...

# Put C to queue...

# Put quit to queue...

# Get A from queue.

# Get B from queue.

# Get C from queue.

# Get quit from queue.

# 所有數(shù)據(jù)都寫入并且讀完

# pw is alive:False

# pr is alive:False

Test03-->進(jìn)程池Pool中的Queue來(lái)進(jìn)行進(jìn)程間的通信

#!/usr/bin/env python

# -*- coding: utf-8 -*-

# @Author : xiaoke

# @Site :

# @File : Test33.py

# @Software: PyCharm Community Edition

from multiprocessing import Pool, Queue, Manager

import os

import time

import random

def task_write(q):

for s in ('hello', 'python', 'world', 'quit'):

q.put(s) # 向隊(duì)列中添加消息

print('%s 進(jìn)程向隊(duì)列中添加消息:%s' % (os.getpid(), s))

time.sleep(random.random() * 2)

print('%s 進(jìn)程要結(jié)束了' % os.getpid())

def task_read(q):

while True:

msg = q.get() # 阻塞式從隊(duì)列中收消息

print('%s 進(jìn)程從隊(duì)列中取出消息:%s' % (os.getpid(), msg))

if msg == "quit":

break

time.sleep(random.random() * 2)

print('%s 進(jìn)程要結(jié)束了' % os.getpid())

def main():

# 1.創(chuàng)建消息隊(duì)列對(duì)象

# q = Queue() #只能用于父子進(jìn)程

# Manger().Queue() 消息隊(duì)列可用于進(jìn)程池

q = Manager().Queue()

# 2.創(chuàng)建進(jìn)程池,里面放兩個(gè)進(jìn)程

my_pool = Pool(2)

# 3.添加任務(wù)

# 采用阻塞的方式

my_pool.apply(task_write, args=(q,))

my_pool.apply(task_read, args=(q,))

# 采用非阻塞的方式

# my_pool.apply_async(task_write, args=(q,))

# my_pool.apply_async(task_read, args=(q,))

# 4.關(guān)閉進(jìn)程池

my_pool.close()

# 5.等待所有進(jìn)程結(jié)束

my_pool.join()

if __name__ == "__main__":

main()

# 采用非阻塞的方式結(jié)果:

# 7380 進(jìn)程向隊(duì)列中添加消息:hello

# 11256 進(jìn)程從隊(duì)列中取出消息:hello

# 7380 進(jìn)程向隊(duì)列中添加消息:python

# 11256 進(jìn)程從隊(duì)列中取出消息:python

# 7380 進(jìn)程向隊(duì)列中添加消息:world

# 11256 進(jìn)程從隊(duì)列中取出消息:world

# 7380 進(jìn)程向隊(duì)列中添加消息:quit

# 11256 進(jìn)程從隊(duì)列中取出消息:quit

# 11256 進(jìn)程要結(jié)束了

# 7380 進(jìn)程要結(jié)束了

# 采用阻塞的方式結(jié)果:

# 96 進(jìn)程向隊(duì)列中添加消息:hello

# 96 進(jìn)程向隊(duì)列中添加消息:python

# 96 進(jìn)程向隊(duì)列中添加消息:world

# 96 進(jìn)程向隊(duì)列中添加消息:quit

# 96 進(jìn)程要結(jié)束了

# 12412 進(jìn)程從隊(duì)列中取出消息:hello

# 12412 進(jìn)程從隊(duì)列中取出消息:python

# 12412 進(jìn)程從隊(duì)列中取出消息:world

# 12412 進(jìn)程從隊(duì)列中取出消息:quit

# 12412 進(jìn)程要結(jié)束了

總結(jié)

以上是生活随笔為你收集整理的python高性能写法_py 高性能低级,高级写法思考的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。