Python学习:线程池原理及实现
傳統(tǒng)多線程方案會使用“即時(shí)創(chuàng)建, 即時(shí)銷毀”的策略。盡管與創(chuàng)建進(jìn)程相比,創(chuàng)建線程的時(shí)間已經(jīng)大大的縮短,但是如果提交給線程的任務(wù)是執(zhí)行時(shí)間較短,而且執(zhí)行次數(shù)極其頻繁,那么服務(wù)器將處于不停的創(chuàng)建線程,銷毀線程的狀態(tài)。
一個(gè)線程的運(yùn)行時(shí)間可以分為3部分:線程的啟動時(shí)間、線程體的運(yùn)行時(shí)間和線程的銷毀時(shí)間。在多線程處理的情景中,如果線程不能被重用,就意味著每次創(chuàng)建都需要經(jīng)過啟動、銷毀和運(yùn)行3個(gè)過程。這必然會增加系統(tǒng)相應(yīng)的時(shí)間,降低了效率。
1.使用線程池:
由于線程預(yù)先被創(chuàng)建并放入線程池中,同時(shí)處理完當(dāng)前任務(wù)之后并不銷毀而是被安排處理下一個(gè)任務(wù),因此能夠避免多次創(chuàng)建線程,從而節(jié)省線程創(chuàng)建和銷毀的開銷,能帶來更好的性能和系統(tǒng)穩(wěn)定性。
2.線程池模型
這里使用創(chuàng)建Thread()實(shí)例來實(shí)現(xiàn),下面會再用繼承threading.Thread()的類來實(shí)現(xiàn)
# 創(chuàng)建隊(duì)列實(shí)例, 用于存儲任務(wù) queue = Queue()# 定義需要線程池執(zhí)行的任務(wù) def do_job():while True:i = queue.get()time.sleep(1)print 'index %s, curent: %s' % (i, threading.current_thread())queue.task_done()if __name__ == '__main__':# 創(chuàng)建包括3個(gè)線程的線程池for i in range(3):t = Thread(target=do_job)t.daemon=True # 設(shè)置線程daemon 主線程退出,daemon線程也會推出,即時(shí)正在運(yùn)行t.start()# 模擬創(chuàng)建線程池3秒后塞進(jìn)10個(gè)任務(wù)到隊(duì)列time.sleep(3)for i in range(10):queue.put(i)queue.join()daemon說明:
如果某個(gè)子線程的daemon屬性為False,主線程結(jié)束時(shí)會檢測該子線程是否結(jié)束,如果該子線程還在運(yùn)行,則主線程會等待它完成后再退出;
如果某個(gè)子線程的daemon屬性為True,主線程運(yùn)行結(jié)束時(shí)不對這個(gè)子線程進(jìn)行檢查而直接退出,同時(shí)所有daemon值為True的子線程將隨主線程一起結(jié)束,而不論是否運(yùn)行完成。
daemon=True 說明線程是守護(hù)線程,守護(hù)線程外部沒法觸發(fā)它的退出,所以主線程退出就直接讓子線程跟隨退出
queue.task_done() 說明:
queue.join()的作用是讓主程序阻塞等待隊(duì)列完成,就結(jié)束退出,但是怎么讓主程序知道隊(duì)列已經(jīng)全部取出并且完成呢?queue.get() 只能讓主程序知道隊(duì)列取完了,但不代表隊(duì)列里的任務(wù)都完成,所以程序需要調(diào)用queue.task_done() 告訴主程序,又一個(gè)任務(wù)完成了,直到全部任務(wù)完成,主程序退出
輸出結(jié)果
index 1, curent: <Thread(Thread-2, started daemon 139652180764416)> index 0, curent: <Thread(Thread-1, started daemon 139652189157120)> index 2, curent: <Thread(Thread-3, started daemon 139652172371712)> index 4, curent: <Thread(Thread-1, started daemon 139652189157120)> index 3, curent: <Thread(Thread-2, started daemon 139652180764416)> index 5, curent: <Thread(Thread-3, started daemon 139652172371712)> index 6, curent: <Thread(Thread-1, started daemon 139652189157120)> index 7, curent: <Thread(Thread-2, started daemon 139652180764416)> index 8, curent: <Thread(Thread-3, started daemon 139652172371712)> index 9, curent: <Thread(Thread-1, started daemon 139652189157120)> finish可以看到所有任務(wù)都是在這幾個(gè)線程中完成Thread-(1-3)
3.線程池原理
線程池基本原理: 我們把任務(wù)放進(jìn)隊(duì)列中去,然后開N個(gè)線程,每個(gè)線程都去隊(duì)列中取一個(gè)任務(wù),執(zhí)行完了之后告訴系統(tǒng)說我執(zhí)行完了,然后接著去隊(duì)列中取下一個(gè)任務(wù),直至隊(duì)列中所有任務(wù)取空,退出線程。
上面這個(gè)例子生成一個(gè)有3個(gè)線程的線程池,每個(gè)線程都無限循環(huán)阻塞讀取Queue隊(duì)列的任務(wù)所有任務(wù)都只會讓這3個(gè)預(yù)生成的線程來處理。
具體工作描述如下:
- 創(chuàng)建Queue.Queue()實(shí)例,然后對它填充數(shù)據(jù)或任務(wù)
- 生成守護(hù)線程池,把線程設(shè)置成了daemon守護(hù)線程
- 每個(gè)線程無限循環(huán)阻塞讀取queue隊(duì)列的項(xiàng)目item,并處理
- 每次完成一次工作后,使用queue.task_done()函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號
- 主線程設(shè)置queue.join()阻塞,直到任務(wù)隊(duì)列已經(jīng)清空了,解除阻塞,向下執(zhí)行
這個(gè)模式下有幾個(gè)注意的點(diǎn):
-
將線程池的線程設(shè)置成daemon守護(hù)進(jìn)程,意味著主線程退出時(shí),守護(hù)線程也會自動退出,如果使用默認(rèn)
-
daemon=False的話, 非daemon的線程會阻塞主線程的退出,所以即使queue隊(duì)列的任務(wù)已經(jīng)完成
線程池依然阻塞無限循環(huán)等待任務(wù),使得主線程也不會退出。 -
當(dāng)主線程使用了queue.join()的時(shí)候,說明主線程會阻塞直到queue已經(jīng)是清空的,而主線程怎么知道queue已經(jīng)是清空的呢?就是通過每次線程queue.get()后并處理任務(wù)后,發(fā)送queue.task_done()信號,queue的數(shù)據(jù)就會減1,直到queue的數(shù)據(jù)是空的,queue.join()解除阻塞,向下執(zhí)行。
-
這個(gè)模式主要是以隊(duì)列queue的任務(wù)來做主導(dǎo)的,做完任務(wù)就退出,由于線程池是daemon的,所以主退出線程池所有線程都會退出。 有別于我們平時(shí)可能以隊(duì)列主導(dǎo)thread.join()阻塞,這種線程完成之前阻塞主線程。看需求使用哪個(gè)join():
如果是想做完一定數(shù)量任務(wù)的隊(duì)列就結(jié)束,使用queue.join(),比如爬取指定數(shù)量的網(wǎng)頁
如果是想線程做完任務(wù)就結(jié)束,使用thread.join()
4.示例:使用線程池寫web服務(wù)器
''' 學(xué)習(xí)中遇到問題沒人解答?小編創(chuàng)建了一個(gè)Python學(xué)習(xí)交流群:711312441 尋找有志同道合的小伙伴,互幫互助,群里還有不錯(cuò)的視頻學(xué)習(xí)教程和PDF電子書! ''' import socket import threading from threading import Thread import threading import sys import time import random from Queue import Queuehost = '' port = 8888 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((host, port)) s.listen(3)class ThreadPoolManger():"""線程池管理器"""def __init__(self, thread_num):# 初始化參數(shù)self.work_queue = Queue()self.thread_num = thread_numself.__init_threading_pool(self.thread_num)def __init_threading_pool(self, thread_num):# 初始化線程池,創(chuàng)建指定數(shù)量的線程池for i in range(thread_num):thread = ThreadManger(self.work_queue)thread.start()def add_job(self, func, *args):# 將任務(wù)放入隊(duì)列,等待線程池阻塞讀取,參數(shù)是被執(zhí)行的函數(shù)和函數(shù)的參數(shù)self.work_queue.put((func, args))class ThreadManger(Thread):"""定義線程類,繼承threading.Thread"""def __init__(self, work_queue):Thread.__init__(self)self.work_queue = work_queueself.daemon = Truedef run(self):# 啟動線程while True:target, args = self.work_queue.get()target(*args)self.work_queue.task_done()# 創(chuàng)建一個(gè)有4個(gè)線程的線程池 thread_pool = ThreadPoolManger(4)# 處理http請求,這里簡單返回200 hello world def handle_request(conn_socket):recv_data = conn_socket.recv(1024)reply = 'HTTP/1.1 200 OK \r\n\r\n'reply += 'hello world'print 'thread %s is running ' % threading.current_thread().nameconn_socket.send(reply)conn_socket.close()# 循環(huán)等待接收客戶端請求 while True:# 阻塞等待請求conn_socket, addr = s.accept()# 一旦有請求了,把socket扔到我們指定處理函數(shù)handle_request處理,等待線程池分配線程處理thread_pool.add_job(handle_request, *(conn_socket, ))s.close()運(yùn)行進(jìn)程
[master][/data/web/advance_python/socket]$ python sock_s_threading_pool.py # 查看線程池狀況 [master][/data/web/advance_python/socket]$ ps -eLf|grep sock_s_threading_pool lisa+ 27488 23705 27488 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py lisa+ 27488 23705 27489 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py lisa+ 27488 23705 27490 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py lisa+ 27488 23705 27491 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py lisa+ 27488 23705 27492 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py# 跟我們預(yù)期一樣一共有5個(gè)線程,一個(gè)主線程,4個(gè)線程池線程這個(gè)線程池web服務(wù)器編寫框架包括下面幾個(gè)組成部分及步驟:
- 定義線程池管理器ThreadPoolManger,用于創(chuàng)建并管理線程池,提供add_job()接口,給線程池加任務(wù)
- 定義工作線程ThreadManger, 定義run()方法,負(fù)責(zé)無限循環(huán)工作隊(duì)列,并完成隊(duì)列任務(wù)
- 定義socket監(jiān)聽請求s.accept() 和處理請求 handle_requests() 任務(wù)。
- 初始化一個(gè)4個(gè)線程的線程池,都阻塞等待這讀取隊(duì)列queue的任務(wù)
- 當(dāng)socket.accept()有請求,則把conn_socket做為參數(shù),handle_request方法,丟給線程池,等待線程池分配線程處理
5.GIL 對多線程的影響
因?yàn)镻ython的線程雖然是真正的線程,但解釋器執(zhí)行代碼時(shí),有一個(gè)GIL鎖:Global Interpreter Lock,任何Python線程執(zhí)行前,必須先獲得GIL鎖,然后,每執(zhí)行100條字節(jié)碼,解釋器就自動釋放GIL鎖,讓別的線程有機(jī)會執(zhí)行。這個(gè)GIL全局鎖實(shí)際上把所有線程的執(zhí)行代碼都給上了鎖,所以,多線程在Python中只能交替執(zhí)行,即使100個(gè)線程跑在100核CPU上,也只能用到1個(gè)核。
但是對于IO密集型的任務(wù),多線程還是起到很大效率提升,這是協(xié)同式多任務(wù)
當(dāng)一項(xiàng)任務(wù)比如網(wǎng)絡(luò) I/O啟動,而在長的或不確定的時(shí)間,沒有運(yùn)行任何 Python 代碼的需要,一個(gè)線程便會讓出GIL,從而其他線程可以獲取 GIL 而運(yùn)行 Python。這種禮貌行為稱為協(xié)同式多任務(wù)處理,它允許并發(fā);多個(gè)線程同時(shí)等待不同事件。
兩個(gè)線程在同一時(shí)刻只能有一個(gè)執(zhí)行 Python ,但一旦線程開始連接,它就會放棄 GIL ,這樣其他線程就可以運(yùn)行。這意味著兩個(gè)線程可以并發(fā)等待套接字連接,這是一件好事。在同樣的時(shí)間內(nèi)它們可以做更多的工作。
6.線程池要設(shè)置為多少?
服務(wù)器CPU核數(shù)有限,能夠同時(shí)并發(fā)的線程數(shù)有限,并不是開得越多越好,以及線程切換是有開銷的,如果線程切換過于頻繁,反而會使性能降低
線程執(zhí)行過程中,計(jì)算時(shí)間分為兩部分:
- CPU計(jì)算,占用CPU
- 不需要CPU計(jì)算,不占用CPU,等待IO返回,比如recv(), accept(), sleep()等操作,具體操作就是比如
- 訪問cache、RPC調(diào)用下游service、訪問DB,等需要網(wǎng)絡(luò)調(diào)用的操作
那么如果計(jì)算時(shí)間占50%, 等待時(shí)間50%,那么為了利用率達(dá)到最高,可以開2個(gè)線程:
假如工作時(shí)間是2秒, CPU計(jì)算完1秒后,線程等待IO的時(shí)候需要1秒,此時(shí)CPU空閑了,這時(shí)就可以切換到另外一個(gè)線程,讓CPU工作1秒后,線程等待IO需要1秒,此時(shí)CPU又可以切回去,第一個(gè)線程這時(shí)剛好完成了1秒的IO等待,可以讓CPU繼續(xù)工作,就這樣循環(huán)的在兩個(gè)線程之前切換操作。
那么如果計(jì)算時(shí)間占20%, 等待時(shí)間80%,那么為了利用率達(dá)到最高,可以開5個(gè)線程:
可以想象成完成任務(wù)需要5秒,CPU占用1秒,等待時(shí)間4秒,CPU在線程等待時(shí),可以同時(shí)再激活4個(gè)線程,這樣就把CPU和IO等待時(shí)間,最大化的重疊起來
抽象一下,計(jì)算線程數(shù)設(shè)置的公式就是:
N核服務(wù)器,通過執(zhí)行業(yè)務(wù)的單線程分析出本地計(jì)算時(shí)間為x,等待時(shí)間為y,則工作線程數(shù)(線程池線程數(shù))設(shè)置為 N*(x+y)/x,能讓CPU的利用率最大化。
由于有GIL的影響,python只能使用到1個(gè)核,所以這里設(shè)置N=1
總結(jié)
以上是生活随笔為你收集整理的Python学习:线程池原理及实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: YCOJ黑熊过河
- 下一篇: Python周立功CAN接口卡接口库函数