日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

多线程多进程解析:Python、os、sys、Queue、multiprocessing、threading

發(fā)布時(shí)間:2023/12/2 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 多线程多进程解析:Python、os、sys、Queue、multiprocessing、threading 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

當(dāng)涉及到操作系統(tǒng)的時(shí)候,免不了要使用os模塊,有時(shí)還要用到sys模塊。

設(shè)計(jì)到并行程序,一般開(kāi)單獨(dú)的進(jìn)程,而不是線程,原因是python解釋器的全局解釋器鎖GIL(global interpreter lock),本文最后會(huì)講到。使用進(jìn)程可以實(shí)現(xiàn)完全并行,無(wú)GIL的限制,可充分利用多cpu多核的環(huán)境。

?

os/sys模塊

1、os模塊

os.system() 函數(shù)可以啟動(dòng)一個(gè)進(jìn)程,執(zhí)行完之后返回狀態(tài)碼。

os.fork() 復(fù)制一個(gè)進(jìn)程,如果是子進(jìn)程返回0,如果是父進(jìn)程返回子進(jìn)程的pid,使用這個(gè)函數(shù)的時(shí)候,建議你學(xué)習(xí)一下linux編程的知識(shí)。

os.popen 以管道的方式創(chuàng)建進(jìn)程。

os.spawnl 也可以創(chuàng)建進(jìn)程,并能指定環(huán)境變量。

os.kill(pid, sig) 關(guān)閉一個(gè)進(jìn)程,pid是進(jìn)程號(hào),sig是信號(hào)。與fork配合使用,例如你剛才用fork創(chuàng)建了一個(gè)子進(jìn)程,它的pid是11990, 那么調(diào)用

os.kill( 11990, signal.CTRL_BREAK_EVENT)

就以ctrl+c的方式殺死了這個(gè)進(jìn)程。

os.wait() -> (pid, status)找到任一個(gè)僵死子進(jìn)程,或者等待任一個(gè)子進(jìn)程的SIGCHLD信號(hào)

os.waitpid(pid, options) -> (pid, status) 等待給定進(jìn)程結(jié)束

?

除了利用os模塊實(shí)現(xiàn)多進(jìn)程和通信,還有一個(gè)模塊multiprocessing封裝了很多創(chuàng)建進(jìn)程和進(jìn)程間通信的操作,發(fā)揮多核的威力。

附:

文件操作也是與操作系統(tǒng)相關(guān)的操作,也被封裝在os模塊。

文件操作的詳細(xì)內(nèi)容見(jiàn)??http://www.cnblogs.com/xinchrome/p/5011304.html

?

2、sys模塊

同樣是一個(gè)與系統(tǒng)相關(guān)的模塊,它們都表示了程序運(yùn)行的上下文環(huán)境。但是與os模塊不同的是,os模塊主要封裝系統(tǒng)操作,sys模塊主要封裝系統(tǒng)中的各種環(huán)境參數(shù)。

比如文件操作、進(jìn)程線程操作封裝在os模塊中;

標(biāo)準(zhǔn)輸入輸出stdout/stdin、命令行參數(shù)argv、環(huán)境變量path、平臺(tái)platform等參數(shù)封裝在sys模塊中;

不過(guò)sys中也含有一些進(jìn)程操作,比如sys.exit(n)和sys.exit('Unable to create first child.')

?

多進(jìn)程multiprocessing

?multiprocessing模塊的內(nèi)容:

multiprocessing.Process(target=run)類的實(shí)例表示一個(gè)進(jìn)程,具有字段 pid,方法 start() join()等

multiprocessing.Pool(processes=4) 類的實(shí)例表示一個(gè)進(jìn)程池

multiprocessing.Lock類的實(shí)例表示一個(gè)鎖,具有acquire()和release() 方法

multiprocessing.Semaphore(2) 信號(hào)量類的實(shí)例表示一個(gè)信號(hào)量,可以指定初始值,具有 acquire() 和 release() 方法

multiprocessing.Event() 表示一個(gè)信號(hào),用于實(shí)現(xiàn)多進(jìn)程等待某一個(gè)進(jìn)程的情況

進(jìn)程間要實(shí)現(xiàn)通信,除了鎖、信號(hào)量、事件,還有隊(duì)列multiprocessing.Queue。

?

import multiprocessingdef writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print q.get(block = False) except: passif __name__ == "__main__":q = multiprocessing.Queue()writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()

?

multiprocessing.Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。put方法用以插入數(shù)據(jù)到隊(duì)列中,put方法還有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會(huì)阻塞timeout指定的時(shí)間,直到該隊(duì)列有剩余的空間。如果超時(shí),會(huì)拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會(huì)立即拋出Queue.Full異常。

get方法可以從隊(duì)列讀取并且刪除一個(gè)元素。同樣,get方法有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時(shí)間內(nèi)沒(méi)有取到任何元素,會(huì)拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個(gè)值可用,則立即返回該值,否則,如果隊(duì)列為空,則立即拋出Queue.Empty異常。

?

?

進(jìn)程同步互斥實(shí)例:

?

import multiprocessing import time# 信號(hào)量實(shí)現(xiàn)同步,進(jìn)程間也可以使用信號(hào)量 def preq(s):print "a"s.release()s.release()def worker(s,i):s.acquire()s.acquire()print(multiprocessing.current_process().name + " acquire")time.sleep(i)print(multiprocessing.current_process().name + " release")s.release() if __name__ == "__main__":s = multiprocessing.Semaphore(0)for i in range(1):pre = multiprocessing.Process(target=preq, args=(s,))pre.start()p = multiprocessing.Process(target=worker, args=(s,1))p.start()# 鎖實(shí)現(xiàn)進(jìn)程間對(duì)文件的互斥訪問(wèn) def worker_with(lock, f):with lock:fs = open(f,"a+")fs.write('Lock acquired via with\n')fs.close()def worker_no_with(lock, f):lock.acquire()try:fs = open(f,"a+")fs.write('Lock acquired directly\n')fs.close()finally:lock.release() if __name__ == "__main__":f = "file.txt"lock = multiprocessing.Lock()w = multiprocessing.Process(target=worker_with, args=(lock, f))nw = multiprocessing.Process(target=worker_no_with, args=(lock, f))w.start()nw.start()w.join()nw.join()# Event實(shí)現(xiàn)同步,Event類的實(shí)例表示一個(gè)信號(hào),進(jìn)程調(diào)用它的wait方法等待被喚醒,調(diào)用set方法喚醒所有正在等待的進(jìn)程 def wait_for_event(e):"""Wait for the event to be set before doing anything"""print ('wait_for_event: starting')e.wait()print ('wait_for_event: e.is_set()->' + str(e.is_set())) def wait_for_event_timeout(e, t):"""Wait t seconds and then timeout"""print ('wait_for_event_timeout: starting')e.wait(t)print ('wait_for_event_timeout: e.is_set()->' + str(e.is_set()))if __name__ == '__main__':e = multiprocessing.Event()w1 = multiprocessing.Process(name='block', target=wait_for_event,args=(e,))w1.start()w2 = multiprocessing.Process(name='non-block', target=wait_for_event_timeout, args=(e, 2))w2.start()time.sleep(3)e.set()print ('main: event is set')

?

Process類中定義的方法

| is_alive(self)
| Return whether process is alive
|
| join(self, timeout=None)
| Wait until child process terminates
|
| run(self)
| Method to be run in sub-process; can be overridden in sub-class
|
| start(self)
| Start child process
|
| terminate(self)
| Terminate process; sends SIGTERM signal or uses TerminateProcess()

以上來(lái)自于Python自帶幫助

?

進(jìn)程處理信號(hào)?signal

?利用signal模塊,進(jìn)程可以捕獲信號(hào),根據(jù)相應(yīng)的handler做處理。

信號(hào)(signal)-- 進(jìn)程之間通訊的方式,是一種軟件中斷。一個(gè)進(jìn)程一旦接收到信號(hào)就會(huì)打斷原來(lái)的程序執(zhí)行流程來(lái)處理信號(hào)。
幾個(gè)常用信號(hào):
? ? SIGINT 終止進(jìn)程 中斷進(jìn)程 (control+c)

? ? SIGQUIT 退出進(jìn)程

? ? SIGTERM 終止進(jìn)程: 軟件終止信號(hào) (命令行中輸入kill命令時(shí),向進(jìn)程發(fā)送的默認(rèn)信號(hào))?當(dāng)直接寫kill PID,默認(rèn)是向進(jìn)程發(fā)送SIGTERM

? ? SIGKILL 終止進(jìn)程:殺死進(jìn)程,捕捉這個(gè)信號(hào)會(huì)報(bào)錯(cuò),也就是進(jìn)程不能捕捉此信號(hào)(kill -9)
? ? SIGALRM 鬧鐘信號(hào)。Alarms信號(hào)是一個(gè)特殊信號(hào)類型,它可以讓程序要求系統(tǒng)經(jīng)過(guò)一段時(shí)間對(duì)自己發(fā)送通知。os 標(biāo)準(zhǔn)模塊中指出,它可用于避免無(wú)限制阻塞 I/O 操作或其它系統(tǒng)調(diào)用。

? ??SIGCHLD?子進(jìn)程退出時(shí)對(duì)父進(jìn)程發(fā)出的信號(hào),如果父進(jìn)程還沒(méi)有處理它,子進(jìn)程將會(huì)停留在僵死狀態(tài)等待其父進(jìn)程調(diào)用wait函數(shù),這個(gè)狀態(tài)下的子進(jìn)程就是僵死進(jìn)程

PS:常用信號(hào)簡(jiǎn)介:

ctrl-c 發(fā)送 SIGINT 信號(hào)給前臺(tái)進(jìn)程組中的所有進(jìn)程。常用于終止正在運(yùn)行的程序。
ctrl-z 發(fā)送 SIGTSTP 信號(hào)給前臺(tái)進(jìn)程組中的所有進(jìn)程,常用于掛起一個(gè)進(jìn)程。
ctrl-d 不是發(fā)送信號(hào),而是表示一個(gè)特殊的二進(jìn)制值,表示 EOF,也就是輸入流(例如普通文件或者stdin)的結(jié)束。
ctrl-\ 發(fā)送 SIGQUIT 信號(hào)給前臺(tái)進(jìn)程組中的所有進(jìn)程,終止前臺(tái)進(jìn)程并生成 core 文件。

?

常常會(huì)在python程序被關(guān)閉之前加一個(gè)鉤子,用atexit模塊以及signal模塊來(lái)實(shí)現(xiàn)

父進(jìn)程捕獲終止信號(hào)后,還需要向每個(gè)子進(jìn)程發(fā)送終止信號(hào)。

注意信號(hào)處理函數(shù)需要接受兩個(gè)參數(shù)。

?

#!/usr/bin python# 正常退出或者被ctl+c終止時(shí),進(jìn)程捕獲信號(hào),調(diào)用處理函數(shù)(鉤子) import atexit from signal import signal, SIGTERMdef test():print 'exit........' atexit.register(test) signal(SIGTERM, lambda signum, stack_frame: exit(1))while True:pass# 進(jìn)程發(fā)送信號(hào)終止其他進(jìn)程 import os import signal #發(fā)送信號(hào),16175是前面那個(gè)綁定信號(hào)處理函數(shù)的pid,需要自行修改 os.kill(16175,signal.SIGTERM) os.kill(16175,signal.SIGUSR1) # Linux編程范式:fork(),等待SIGCHLD信號(hào) import os import signal from time import sleep def onsigchld(a,b): print '收到子進(jìn)程結(jié)束信號(hào)' signal.signal(signal.SIGCHLD,onsigchld) pid = os.fork() if pid == 0: print '我是子進(jìn)程,pid是',os.getpid() sleep(2) else: print '我是父進(jìn)程,pid是',os.getpid() os.wait() #等待子進(jìn)程結(jié)束 # 鬧鐘信號(hào),用于告訴操作系統(tǒng)向自己發(fā)送信號(hào) import signal import timedef receive_alarm(signum, stack):print 'Alarm :', time.ctime()# Call receive_alarm in 2 seconds signal.signal(signal.SIGALRM, receive_alarm) signal.alarm(2)print 'Before:', time.ctime() time.sleep(10) print 'After :', time.ctime()

?

?

多線程threading & thread

  與進(jìn)程不同,線程要實(shí)現(xiàn)同步,直接用Python自帶的Queue模塊即可。Python的Queue模塊中提供了同步的、線程安全的隊(duì)列類,包括FIFO(先入先出)隊(duì)列Queue,LIFO(后入先出)隊(duì)列LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(yǔ),能夠在多線程中直接使用。可以使用隊(duì)列來(lái)實(shí)現(xiàn)線程間的同步。

? ? ?python多線程編程,一般使用thread和threading模塊。thread模塊想對(duì)較底層,threading模塊對(duì)thread模塊進(jìn)行了封裝,更便于使用。所有,通常多線程編程使用threading模塊。線程的創(chuàng)建一般有兩種:①將創(chuàng)建的函數(shù)傳遞進(jìn)threading.Thread()對(duì)象的target字段,可以是函數(shù)或者定義了__call__方法的類實(shí)例。②繼承threading.Thread類,通常重寫run()方法。

1 threading模塊的內(nèi)容

Thread類的實(shí)例可以引用一個(gè)線程,這是我們用的最多的一個(gè)類,你可以指定目標(biāo)線程函數(shù)執(zhí)行或者自定義繼承自它的子類都可以實(shí)現(xiàn)子線程功能;

Timer類是Thread類的子類,表示等待一段時(shí)間后才開(kāi)始運(yùn)行某段代碼,或者重復(fù)運(yùn)行某段代碼;

(Python自帶的)Queue類的實(shí)例是實(shí)現(xiàn)了多生產(chǎn)者(Producer)、多消費(fèi)者(Consumer)的隊(duì)列,支持鎖原語(yǔ),能夠在多個(gè)線程之間提供很好的同步支持;

Lock類的實(shí)例引用了一個(gè)鎖原語(yǔ),這個(gè)我們可以對(duì)全局變量互斥時(shí)使用,提供acquire和release方法;

RLock的實(shí)例表示可重入鎖,使單線程可以再次獲得已經(jīng)獲得的鎖;

Condition類的實(shí)例表示條件變量,能讓一個(gè)線程停下來(lái),等待其他線程滿足某個(gè)“條件”,除了提供acquire和release方法外,還提供了wait和notify方法,相當(dāng)于一個(gè)多功能信號(hào)量;

Event 類的實(shí)例表示通用的條件變量。多個(gè)線程可以等待某個(gè)事件發(fā)生,在事件發(fā)生后,所有的線程都被激活;

Semaphore類的實(shí)例表示信號(hào)量,為等待資源的線程提供一個(gè)類似隊(duì)列的結(jié)構(gòu),提供acquire和release方法,初始化的時(shí)候可以指定初值Semaphore(3);

BoundedSemaphore 與semaphore類似,但不允許超過(guò)初始值;

?

?threading.Thread類的內(nèi)容:

getName(self) 返回線程的名字

isAlive(self) 布爾標(biāo)志,表示這個(gè)線程是否還在運(yùn)行中

isDaemon(self) 返回線程的daemon標(biāo)志

join(self, timeout=None) 程序掛起,直到線程結(jié)束,如果給出timeout,則最多阻塞timeout秒

run(self) 定義線程的功能函數(shù)

setDaemon(self, daemonic) 把線程的daemon標(biāo)志設(shè)為daemonic

setName(self, name) 設(shè)置線程的名字

start(self) 開(kāi)始線程執(zhí)行

ps:th.join()方法可能不是很安全,如果th對(duì)應(yīng)的線程沒(méi)有被真正啟動(dòng),那么調(diào)用th.join()的線程將不會(huì)等待,而會(huì)繼續(xù)運(yùn)行下去。用信號(hào)量更好。

?

多線程舉例:

#coding:utf-8 import threading, time #最簡(jiǎn)單的啟動(dòng)線程的方式 def sayHi(): time.sleep(1) print 'Hi, linuxapp' th=threading.Thread(target=sayHi) th.start() th.join() # 使用threading.Thread(),設(shè)置線程類實(shí)例的target屬性,表示一個(gè)線程 def T():print threading.current_thread().getName()t1 = threading.Thread(target=T, name='tt11') t1.start() t1.join()# 通過(guò)threading.Thread類的子類實(shí)例表示線程,注意父類構(gòu)造方法__init__不能省略 class T2(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):print "in run() of T2 " + threading.current_thread().getName()# threading.Lock類的實(shí)例表示一個(gè)互斥鎖,一個(gè)資源被加鎖后,其他線程不能訪問(wèn) class T3(threading.Thread):def __init__(self):threading.Thread.__init__(self)self.counter = 0;self.mutex = threading.Lock()def run(self):time.sleep(1)if self.mutex.acquire():self.counter += 1print self.counterself.mutex.release()# 如果同一個(gè)線程需要多次獲得資源,如果不使用 mutex = threading.RLock() ,就會(huì)死鎖 class T4(threading.Thread):def __init__(self):threading.Thread.__init__(self)self.counter = 0self.mutex = threading.Lock()def run(self):time.sleep(1)if self.mutex.acquire():self.counter += 1if self.mutex.acquire():self.counter += 1self.mutex.release()self.mutex.release()def main():t = T3()t.start()if __name__ == '__main__':main()# threading.Condition類的實(shí)例表示一個(gè)條件變量,相當(dāng)于多功能信號(hào)量 condition = threading.Condition() products = 0 class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): global condition, products while True: if condition.acquire(): if products < 10: products += 1; print "Producer(%s):deliver one, now products:%s" %(self.name, products) condition.notify() else: print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products) condition.wait(); condition.release() time.sleep(2) class Consumer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): global condition, products while True: if condition.acquire(): if products > 1: products -= 1 print "Consumer(%s):consume one, now products:%s" %(self.name, products) condition.notify() else: print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products) condition.wait(); condition.release() time.sleep(2) if __name__ == "__main__": for p in range(0, 2): p = Producer() p.start() for c in range(0, 10): c = Consumer() c.start()# threading.Event類的實(shí)例表示一個(gè)信號(hào),如果信號(hào)signal為true,那么等待這個(gè)signal的所有線程都將可以運(yùn)行 class MyThread(threading.Thread): def __init__(self, signal): threading.Thread.__init__(self) self.singal = signal def run(self): print "I am %s,I will sleep ..."%self.name # 進(jìn)入等待狀態(tài) self.singal.wait() print "I am %s, I awake..." %self.name if __name__ == "__main__":# 初始 為 False singal = threading.Event() for t in range(0, 3): thread = MyThread(singal) thread.start() print "main thread sleep 3 seconds... " time.sleep(3) # 喚醒含有signal, 處于等待狀態(tài)的線程 singal.set()

?

python多線程的限制
python多線程有個(gè)討厭的限制,全局解釋器鎖(global interpreter lock),這個(gè)鎖的意思是任一時(shí)間只能有一個(gè)線程使用解釋器,跟單cpu跑多個(gè)程序也是一個(gè)意思,大家都是輪著用的,這叫“并發(fā)”,不是“并行”。手冊(cè)上的解釋是為了保證對(duì)象模型的正確性!這個(gè)鎖造成的困擾是如果有一個(gè)計(jì)算密集型的線程占著cpu,其他的線程都得等著,試想你的多個(gè)線程中有這么一個(gè)線程,多線程生生被搞成串行;當(dāng)然這個(gè)模塊也不是毫無(wú)用處,手冊(cè)上又說(shuō)了:當(dāng)用于IO密集型任務(wù)時(shí),IO期間線程會(huì)釋放解釋器,這樣別的線程就有機(jī)會(huì)使用解釋器了!所以是否使用這個(gè)模塊需要考慮面對(duì)的任務(wù)類型。

?

轉(zhuǎn)載自:http://www.cnblogs.com/xinchrome/p/5031497.html


總結(jié)

以上是生活随笔為你收集整理的多线程多进程解析:Python、os、sys、Queue、multiprocessing、threading的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。