日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

多线程多进程解析:Python、os、sys、Queue、multiprocessing、threading

發布時間:2023/12/2 51 豆豆
生活随笔 收集整理的這篇文章主要介紹了 多线程多进程解析:Python、os、sys、Queue、multiprocessing、threading 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

當涉及到操作系統的時候,免不了要使用os模塊,有時還要用到sys模塊。

設計到并行程序,一般開單獨的進程,而不是線程,原因是python解釋器的全局解釋器鎖GIL(global interpreter lock),本文最后會講到。使用進程可以實現完全并行,無GIL的限制,可充分利用多cpu多核的環境。

?

os/sys模塊

1、os模塊

os.system() 函數可以啟動一個進程,執行完之后返回狀態碼。

os.fork() 復制一個進程,如果是子進程返回0,如果是父進程返回子進程的pid,使用這個函數的時候,建議你學習一下linux編程的知識。

os.popen 以管道的方式創建進程。

os.spawnl 也可以創建進程,并能指定環境變量。

os.kill(pid, sig) 關閉一個進程,pid是進程號,sig是信號。與fork配合使用,例如你剛才用fork創建了一個子進程,它的pid是11990, 那么調用

os.kill( 11990, signal.CTRL_BREAK_EVENT)

就以ctrl+c的方式殺死了這個進程。

os.wait() -> (pid, status)找到任一個僵死子進程,或者等待任一個子進程的SIGCHLD信號

os.waitpid(pid, options) -> (pid, status) 等待給定進程結束

?

除了利用os模塊實現多進程和通信,還有一個模塊multiprocessing封裝了很多創建進程和進程間通信的操作,發揮多核的威力。

附:

文件操作也是與操作系統相關的操作,也被封裝在os模塊。

文件操作的詳細內容見??http://www.cnblogs.com/xinchrome/p/5011304.html

?

2、sys模塊

同樣是一個與系統相關的模塊,它們都表示了程序運行的上下文環境。但是與os模塊不同的是,os模塊主要封裝系統操作,sys模塊主要封裝系統中的各種環境參數。

比如文件操作、進程線程操作封裝在os模塊中;

標準輸入輸出stdout/stdin、命令行參數argv、環境變量path、平臺platform等參數封裝在sys模塊中;

不過sys中也含有一些進程操作,比如sys.exit(n)和sys.exit('Unable to create first child.')

?

多進程multiprocessing

?multiprocessing模塊的內容:

multiprocessing.Process(target=run)類的實例表示一個進程,具有字段 pid,方法 start() join()等

multiprocessing.Pool(processes=4) 類的實例表示一個進程池

multiprocessing.Lock類的實例表示一個鎖,具有acquire()和release() 方法

multiprocessing.Semaphore(2) 信號量類的實例表示一個信號量,可以指定初始值,具有 acquire() 和 release() 方法

multiprocessing.Event() 表示一個信號,用于實現多進程等待某一個進程的情況

進程間要實現通信,除了鎖、信號量、事件,還有隊列multiprocessing.Queue。

?

import multiprocessingdef writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print q.get(block = False) except: passif __name__ == "__main__":q = multiprocessing.Queue()writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()

?

multiprocessing.Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。

get方法可以從隊列讀取并且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常。

?

?

進程同步互斥實例:

?

import multiprocessing import time# 信號量實現同步,進程間也可以使用信號量 def preq(s):print "a"s.release()s.release()def worker(s,i):s.acquire()s.acquire()print(multiprocessing.current_process().name + " acquire")time.sleep(i)print(multiprocessing.current_process().name + " release")s.release() if __name__ == "__main__":s = multiprocessing.Semaphore(0)for i in range(1):pre = multiprocessing.Process(target=preq, args=(s,))pre.start()p = multiprocessing.Process(target=worker, args=(s,1))p.start()# 鎖實現進程間對文件的互斥訪問 def worker_with(lock, f):with lock:fs = open(f,"a+")fs.write('Lock acquired via with\n')fs.close()def worker_no_with(lock, f):lock.acquire()try:fs = open(f,"a+")fs.write('Lock acquired directly\n')fs.close()finally:lock.release() if __name__ == "__main__":f = "file.txt"lock = multiprocessing.Lock()w = multiprocessing.Process(target=worker_with, args=(lock, f))nw = multiprocessing.Process(target=worker_no_with, args=(lock, f))w.start()nw.start()w.join()nw.join()# Event實現同步,Event類的實例表示一個信號,進程調用它的wait方法等待被喚醒,調用set方法喚醒所有正在等待的進程 def wait_for_event(e):"""Wait for the event to be set before doing anything"""print ('wait_for_event: starting')e.wait()print ('wait_for_event: e.is_set()->' + str(e.is_set())) def wait_for_event_timeout(e, t):"""Wait t seconds and then timeout"""print ('wait_for_event_timeout: starting')e.wait(t)print ('wait_for_event_timeout: e.is_set()->' + str(e.is_set()))if __name__ == '__main__':e = multiprocessing.Event()w1 = multiprocessing.Process(name='block', target=wait_for_event,args=(e,))w1.start()w2 = multiprocessing.Process(name='non-block', target=wait_for_event_timeout, args=(e, 2))w2.start()time.sleep(3)e.set()print ('main: event is set')

?

Process類中定義的方法

| is_alive(self)
| Return whether process is alive
|
| join(self, timeout=None)
| Wait until child process terminates
|
| run(self)
| Method to be run in sub-process; can be overridden in sub-class
|
| start(self)
| Start child process
|
| terminate(self)
| Terminate process; sends SIGTERM signal or uses TerminateProcess()

以上來自于Python自帶幫助

?

進程處理信號?signal

?利用signal模塊,進程可以捕獲信號,根據相應的handler做處理。

信號(signal)-- 進程之間通訊的方式,是一種軟件中斷。一個進程一旦接收到信號就會打斷原來的程序執行流程來處理信號。
幾個常用信號:
? ? SIGINT 終止進程 中斷進程 (control+c)

? ? SIGQUIT 退出進程

? ? SIGTERM 終止進程: 軟件終止信號 (命令行中輸入kill命令時,向進程發送的默認信號)?當直接寫kill PID,默認是向進程發送SIGTERM

? ? SIGKILL 終止進程:殺死進程,捕捉這個信號會報錯,也就是進程不能捕捉此信號(kill -9)
? ? SIGALRM 鬧鐘信號。Alarms信號是一個特殊信號類型,它可以讓程序要求系統經過一段時間對自己發送通知。os 標準模塊中指出,它可用于避免無限制阻塞 I/O 操作或其它系統調用。

? ??SIGCHLD?子進程退出時對父進程發出的信號,如果父進程還沒有處理它,子進程將會停留在僵死狀態等待其父進程調用wait函數,這個狀態下的子進程就是僵死進程

PS:常用信號簡介:

ctrl-c 發送 SIGINT 信號給前臺進程組中的所有進程。常用于終止正在運行的程序。
ctrl-z 發送 SIGTSTP 信號給前臺進程組中的所有進程,常用于掛起一個進程。
ctrl-d 不是發送信號,而是表示一個特殊的二進制值,表示 EOF,也就是輸入流(例如普通文件或者stdin)的結束。
ctrl-\ 發送 SIGQUIT 信號給前臺進程組中的所有進程,終止前臺進程并生成 core 文件。

?

常常會在python程序被關閉之前加一個鉤子,用atexit模塊以及signal模塊來實現

父進程捕獲終止信號后,還需要向每個子進程發送終止信號。

注意信號處理函數需要接受兩個參數。

?

#!/usr/bin python# 正常退出或者被ctl+c終止時,進程捕獲信號,調用處理函數(鉤子) import atexit from signal import signal, SIGTERMdef test():print 'exit........' atexit.register(test) signal(SIGTERM, lambda signum, stack_frame: exit(1))while True:pass# 進程發送信號終止其他進程 import os import signal #發送信號,16175是前面那個綁定信號處理函數的pid,需要自行修改 os.kill(16175,signal.SIGTERM) os.kill(16175,signal.SIGUSR1) # Linux編程范式:fork(),等待SIGCHLD信號 import os import signal from time import sleep def onsigchld(a,b): print '收到子進程結束信號' signal.signal(signal.SIGCHLD,onsigchld) pid = os.fork() if pid == 0: print '我是子進程,pid是',os.getpid() sleep(2) else: print '我是父進程,pid是',os.getpid() os.wait() #等待子進程結束 # 鬧鐘信號,用于告訴操作系統向自己發送信號 import signal import timedef receive_alarm(signum, stack):print 'Alarm :', time.ctime()# Call receive_alarm in 2 seconds signal.signal(signal.SIGALRM, receive_alarm) signal.alarm(2)print 'Before:', time.ctime() time.sleep(10) print 'After :', time.ctime()

?

?

多線程threading & thread

  與進程不同,線程要實現同步,直接用Python自帶的Queue模塊即可。Python的Queue模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(后入先出)隊列LifoQueue,和優先級隊列PriorityQueue。這些隊列都實現了鎖原語,能夠在多線程中直接使用。可以使用隊列來實現線程間的同步。

? ? ?python多線程編程,一般使用thread和threading模塊。thread模塊想對較底層,threading模塊對thread模塊進行了封裝,更便于使用。所有,通常多線程編程使用threading模塊。線程的創建一般有兩種:①將創建的函數傳遞進threading.Thread()對象的target字段,可以是函數或者定義了__call__方法的類實例。②繼承threading.Thread類,通常重寫run()方法。

1 threading模塊的內容

Thread類的實例可以引用一個線程,這是我們用的最多的一個類,你可以指定目標線程函數執行或者自定義繼承自它的子類都可以實現子線程功能;

Timer類是Thread類的子類,表示等待一段時間后才開始運行某段代碼,或者重復運行某段代碼;

(Python自帶的)Queue類的實例是實現了多生產者(Producer)、多消費者(Consumer)的隊列,支持鎖原語,能夠在多個線程之間提供很好的同步支持;

Lock類的實例引用了一個鎖原語,這個我們可以對全局變量互斥時使用,提供acquire和release方法;

RLock的實例表示可重入鎖,使單線程可以再次獲得已經獲得的鎖;

Condition類的實例表示條件變量,能讓一個線程停下來,等待其他線程滿足某個“條件”,除了提供acquire和release方法外,還提供了wait和notify方法,相當于一個多功能信號量;

Event 類的實例表示通用的條件變量。多個線程可以等待某個事件發生,在事件發生后,所有的線程都被激活;

Semaphore類的實例表示信號量,為等待資源的線程提供一個類似隊列的結構,提供acquire和release方法,初始化的時候可以指定初值Semaphore(3);

BoundedSemaphore 與semaphore類似,但不允許超過初始值;

?

?threading.Thread類的內容:

getName(self) 返回線程的名字

isAlive(self) 布爾標志,表示這個線程是否還在運行中

isDaemon(self) 返回線程的daemon標志

join(self, timeout=None) 程序掛起,直到線程結束,如果給出timeout,則最多阻塞timeout秒

run(self) 定義線程的功能函數

setDaemon(self, daemonic) 把線程的daemon標志設為daemonic

setName(self, name) 設置線程的名字

start(self) 開始線程執行

ps:th.join()方法可能不是很安全,如果th對應的線程沒有被真正啟動,那么調用th.join()的線程將不會等待,而會繼續運行下去。用信號量更好。

?

多線程舉例:

#coding:utf-8 import threading, time #最簡單的啟動線程的方式 def sayHi(): time.sleep(1) print 'Hi, linuxapp' th=threading.Thread(target=sayHi) th.start() th.join() # 使用threading.Thread(),設置線程類實例的target屬性,表示一個線程 def T():print threading.current_thread().getName()t1 = threading.Thread(target=T, name='tt11') t1.start() t1.join()# 通過threading.Thread類的子類實例表示線程,注意父類構造方法__init__不能省略 class T2(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):print "in run() of T2 " + threading.current_thread().getName()# threading.Lock類的實例表示一個互斥鎖,一個資源被加鎖后,其他線程不能訪問 class T3(threading.Thread):def __init__(self):threading.Thread.__init__(self)self.counter = 0;self.mutex = threading.Lock()def run(self):time.sleep(1)if self.mutex.acquire():self.counter += 1print self.counterself.mutex.release()# 如果同一個線程需要多次獲得資源,如果不使用 mutex = threading.RLock() ,就會死鎖 class T4(threading.Thread):def __init__(self):threading.Thread.__init__(self)self.counter = 0self.mutex = threading.Lock()def run(self):time.sleep(1)if self.mutex.acquire():self.counter += 1if self.mutex.acquire():self.counter += 1self.mutex.release()self.mutex.release()def main():t = T3()t.start()if __name__ == '__main__':main()# threading.Condition類的實例表示一個條件變量,相當于多功能信號量 condition = threading.Condition() products = 0 class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): global condition, products while True: if condition.acquire(): if products < 10: products += 1; print "Producer(%s):deliver one, now products:%s" %(self.name, products) condition.notify() else: print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products) condition.wait(); condition.release() time.sleep(2) class Consumer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): global condition, products while True: if condition.acquire(): if products > 1: products -= 1 print "Consumer(%s):consume one, now products:%s" %(self.name, products) condition.notify() else: print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products) condition.wait(); condition.release() time.sleep(2) if __name__ == "__main__": for p in range(0, 2): p = Producer() p.start() for c in range(0, 10): c = Consumer() c.start()# threading.Event類的實例表示一個信號,如果信號signal為true,那么等待這個signal的所有線程都將可以運行 class MyThread(threading.Thread): def __init__(self, signal): threading.Thread.__init__(self) self.singal = signal def run(self): print "I am %s,I will sleep ..."%self.name # 進入等待狀態 self.singal.wait() print "I am %s, I awake..." %self.name if __name__ == "__main__":# 初始 為 False singal = threading.Event() for t in range(0, 3): thread = MyThread(singal) thread.start() print "main thread sleep 3 seconds... " time.sleep(3) # 喚醒含有signal, 處于等待狀態的線程 singal.set()

?

python多線程的限制
python多線程有個討厭的限制,全局解釋器鎖(global interpreter lock),這個鎖的意思是任一時間只能有一個線程使用解釋器,跟單cpu跑多個程序也是一個意思,大家都是輪著用的,這叫“并發”,不是“并行”。手冊上的解釋是為了保證對象模型的正確性!這個鎖造成的困擾是如果有一個計算密集型的線程占著cpu,其他的線程都得等著,試想你的多個線程中有這么一個線程,多線程生生被搞成串行;當然這個模塊也不是毫無用處,手冊上又說了:當用于IO密集型任務時,IO期間線程會釋放解釋器,這樣別的線程就有機會使用解釋器了!所以是否使用這個模塊需要考慮面對的任務類型。

?

轉載自:http://www.cnblogs.com/xinchrome/p/5031497.html


總結

以上是生活随笔為你收集整理的多线程多进程解析:Python、os、sys、Queue、multiprocessing、threading的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。