Python多线程原理与实现
Date: 2019-06-04
Author: Sun
Python多線程原理與實(shí)戰(zhàn)
目的:
(1)了解python線程執(zhí)行原理
(2)掌握多線程編程與線程同步
(3)了解線程池的使用
1 線程基本概念
1.1 線程是什么?
線程是指進(jìn)程內(nèi)的一個(gè)執(zhí)行單元,也是進(jìn)程內(nèi)的可調(diào)度實(shí)體.
與進(jìn)程的區(qū)別:
(1) 地址空間:進(jìn)程內(nèi)的一個(gè)執(zhí)行單元;進(jìn)程至少有一個(gè)線程;它們共享進(jìn)程的地址空間;而進(jìn)程有自己獨(dú)立的地址空間;
(2) 資源擁有:進(jìn)程是資源分配和擁有的單位,同一個(gè)進(jìn)程內(nèi)的線程共享進(jìn)程的資源
(3) 線程是CPU處理器調(diào)度的基本單位,但進(jìn)程不是.
(4) 二者均可并發(fā)執(zhí)行.
簡(jiǎn)而言之,一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線程.
線程的劃分尺度小于進(jìn)程,使得多線程程序的并發(fā)性高。
另外,進(jìn)程在執(zhí)行過程中擁有獨(dú)立的內(nèi)存單元,而多個(gè)線程共享內(nèi)存,從而極大地提高了程序的運(yùn)行效率。
1.2 線程和進(jìn)程關(guān)系?
? 進(jìn)程就是一個(gè)應(yīng)用程序在處理機(jī)上的一次執(zhí)行過程,它是一個(gè)動(dòng)態(tài)的概念,而線程是進(jìn)程中的一部分,進(jìn)程包含多個(gè)線程在運(yùn)行。
? 多線程可以共享全局變量,多進(jìn)程不能。多線程中,所有子線程的進(jìn)程號(hào)相同;多進(jìn)程中,不同的子進(jìn)程進(jìn)程號(hào)不同。
? 進(jìn)程是具有一定獨(dú)立功能的程序關(guān)于某個(gè)數(shù)據(jù)集合上的一次運(yùn)行活動(dòng),進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位.
? 線程是進(jìn)程的一個(gè)實(shí)體,是CPU調(diào)度和分派的基本單位,它是比進(jìn)程更小的能獨(dú)立運(yùn)行的基本單位.線程自己基本上不擁有系統(tǒng)資源,只擁有一點(diǎn)在運(yùn)行中必不可少的資源(如程序計(jì)數(shù)器,一組寄存器和棧),但是它可與同屬一個(gè)進(jìn)程的其他的線程共享進(jìn)程所擁有的全部資源.
? 一個(gè)線程可以創(chuàng)建和撤銷另一個(gè)線程;同一個(gè)進(jìn)程中的多個(gè)線程之間可以并發(fā)執(zhí)行.
?
2 Python線程模塊
? python主要是通過thread和threading這兩個(gè)模塊來實(shí)現(xiàn)多線程支持。python的thread模塊是比較底層的模塊,python的threading模塊是對(duì)thread做了一些封裝,可以更加方便的被使用。但是python(cpython)由于GIL的存在無法使用threading充分利用CPU資源,如果想充分發(fā)揮多核CPU的計(jì)算能力需要使用multiprocessing模塊(Windows下使用會(huì)有諸多問題)。
2.1 如何創(chuàng)建線程
? python3.x中已經(jīng)摒棄了Python2.x中采用函數(shù)式thread模塊中的start_new_thread()函數(shù)來產(chǎn)生新線程方式。
? python3.x中通過threading模塊創(chuàng)建新的線程有兩種方法:一種是通過threading.Thread(Target=executable Method)-即傳遞給Thread對(duì)象一個(gè)可執(zhí)行方法(或?qū)ο?#xff09;;第二種是繼承threading.Thread定義子類并重寫run()方法。第二種方法中,唯一必須重寫的方法是run().
(1)通過threading.Thread進(jìn)行創(chuàng)建多線程
import threading import time def target():print("the current threading %s is runing"%(threading.current_thread().name))time.sleep(1)print("the current threading %s is ended"%(threading.current_thread().name))print("the current threading %s is runing"%(threading.current_thread().name)) ## 屬于線程t的部分 t = threading.Thread(target=target) t.start() ## 屬于線程t的部分 t.join() # join是阻塞當(dāng)前線程(此處的當(dāng)前線程時(shí)主線程) 主線程直到Thread-1結(jié)束之后才結(jié)束 print("the current threading %s is ended"%(threading.current_thread().name))(2)通過繼承threading.Thread定義子類創(chuàng)建多線程
? 使用Threading模塊創(chuàng)建線程,直接從threading.Thread繼承,然后重寫__init__方法和run方法:
import threading import timeclass myThread(threading.Thread): # 繼承父類threading.Threaddef __init__(self, threadID, name, counter):threading.Thread.__init__(self)self.threadID = threadIDself.name = nameself.counter = counterdef run(self): # 把要執(zhí)行的代碼寫到run函數(shù)里面 線程在創(chuàng)建后會(huì)直接運(yùn)行run函數(shù)print("Starting " + self.name)print_time(self.name, self.counter, 5)print("Exiting " + self.name)def print_time(threadName, delay, counter):while counter:time.sleep(delay)print("%s process at: %s" % (threadName, time.ctime(time.time())))counter -= 1thread1 = myThread(1, "Thread-1", 1) # 創(chuàng)建新線程 thread2 = myThread(2, "Thread-2", 2)thread1.start() # 開啟線程 thread2.start()thread1.join() # 等待線程結(jié)束 thread2.join() print("Exiting Main Thread")通過以上案例可以知道,thread1和thread2執(zhí)行順序是亂序的。要使之有序,需要進(jìn)行線程同步
3 線程間同步
? 如果多個(gè)線程共同對(duì)某個(gè)數(shù)據(jù)修改,則可能出現(xiàn)不可預(yù)料的結(jié)果,為了保證數(shù)據(jù)的正確性,需要對(duì)多個(gè)線程進(jìn)行同步。
? 使用Thread對(duì)象的Lock和Rlock可以實(shí)現(xiàn)簡(jiǎn)單的線程同步,這兩個(gè)對(duì)象都有acquire方法和release方法,對(duì)于那些需要每次只允許一個(gè)線程操作的數(shù)據(jù),可以將其操作放到acquire和release方法之間。
? 需要注意的是,Python有一個(gè)GIL(Global Interpreter Lock)機(jī)制,任何線程在運(yùn)行之前必須獲取這個(gè)全局鎖才能執(zhí)行,每當(dāng)執(zhí)行完100條字節(jié)碼,全局鎖才會(huì)釋放,切換到其他線程執(zhí)行。
3.1 線程同步問題
多線程實(shí)現(xiàn)同步有四種方式:
鎖機(jī)制,信號(hào)量,條件判斷和同步隊(duì)列。
下面我主要關(guān)注兩種同步機(jī)制:鎖機(jī)制和同步隊(duì)列。
(1)鎖機(jī)制
threading的Lock類,用該類的acquire函數(shù)進(jìn)行加鎖,用realease函數(shù)進(jìn)行解鎖
import threading import time class myThread(threading.Thread):def __init__(self, threadID, name, counter):threading.Thread.__init__(self)self.threadID = threadIDself.name = nameself.counter = counterdef run(self):print("Starting " + self.name)# 獲得鎖,成功獲得鎖定后返回True# 可選的timeout參數(shù)不填時(shí)將一直阻塞直到獲得鎖定# 否則超時(shí)后將返回FalsethreadLock.acquire()print_time(self.name, self.counter, 5)# 釋放鎖threadLock.release() def print_time(threadName, delay, counter):while counter:time.sleep(delay)print("%s: %s" % (threadName, time.ctime(time.time())))counter -= 1 threadLock = threading.Lock() threads = [] thread1 = myThread(1, "Thread-1", 1) # 創(chuàng)建新線程 thread2 = myThread(2, "Thread-2", 2) thread1.start() # 開啟新線程 thread2.start() threads.append(thread1) # 添加線程到線程列表 threads.append(thread2) for t in threads: # 等待所有線程完成t.join() print("Exiting Main Thread")?
(2) 線程同步隊(duì)列queue
python2.x中提供的Queue, Python3.x中提供的是queue
見import queue.
Python的queue模塊中提供了同步的、線程安全的隊(duì)列類,包括FIFO(先入先出)隊(duì)列Queue,LIFO(后入先出)隊(duì)列LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都實(shí)現(xiàn)了鎖原語,能夠在多線程中直接使用。可以使用隊(duì)列來實(shí)現(xiàn)線程間的同步。
queue模塊中的常用方法:
- queue.qsize() 返回隊(duì)列的大小
- queue.empty() 如果隊(duì)列為空,返回True,反之False
- queue.full() 如果隊(duì)列滿了,返回True,反之False
- queue.full 與 maxsize 大小對(duì)應(yīng)
- queue.get([block[, timeout]])獲取隊(duì)列,timeout等待時(shí)間
- queue.get_nowait() 相當(dāng)Queue.get(False)
- queue.put(item) 寫入隊(duì)列,timeout等待時(shí)間
- queue.put_nowait(item) 相當(dāng)Queue.put(item, False)
- queue.task_done() 在完成一項(xiàng)工作之后,Queue.task_done()函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
- queue.join() 實(shí)際上意味著等到隊(duì)列為空,再執(zhí)行別的操作
案例1:
import queue import threading import timeexitFlag = 0class myThread(threading.Thread):def __init__(self, threadID, name, q):threading.Thread.__init__(self)self.threadID = threadIDself.name = nameself.q = qdef run(self):print("Starting " + self.name)process_data(self.name, self.q)print("Exiting " + self.name)def process_data(threadName, q):while not exitFlag:queueLock.acquire()if not workQueue.empty():data = q.get()queueLock.release()print("%s processing %s" % (threadName, data))else:queueLock.release()time.sleep(1)threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = queue.Queue(10) threads = [] threadID = 1# 創(chuàng)建新線程 for tName in threadList:thread = myThread(threadID, tName, workQueue)thread.start()threads.append(thread)threadID += 1# 填充隊(duì)列 queueLock.acquire() for word in nameList:workQueue.put(word) queueLock.release()# 等待隊(duì)列清空 while not workQueue.empty():pass# 通知線程是時(shí)候退出 exitFlag = 1# 等待所有線程完成 for t in threads:t.join() print("Exiting Main Thread")案例2:
import time import threading import queueclass Worker(threading.Thread):def __init__(self, name, queue):threading.Thread.__init__(self)self.queue = queueself.start() #執(zhí)行run()def run(self):#循環(huán),保證接著跑下一個(gè)任務(wù)while True:# 隊(duì)列為空則退出線程if self.queue.empty():break# 獲取一個(gè)隊(duì)列數(shù)據(jù)foo = self.queue.get()# 延時(shí)1S模擬你要做的事情time.sleep(1)# 打印print(self.getName() + " process " + str(foo))# 任務(wù)完成self.queue.task_done()# 隊(duì)列 queue = queue.Queue() # 加入100個(gè)任務(wù)隊(duì)列 for i in range(100):queue.put(i) # 開10個(gè)線程 for i in range(10):threadName = 'Thread' + str(i)Worker(threadName, queue) # 所有線程執(zhí)行完畢后關(guān)閉 queue.join()4. 多線程的生產(chǎn)者消費(fèi)者模式
# -*- coding: utf-8 -*- __author__ = 'sun' __date__ = '2019/6/04 19:40'from queue import Queue import random, threading, time# 生產(chǎn)者類 class Producer(threading.Thread):def __init__(self, name, queue):threading.Thread.__init__(self, name=name)self.data = queuedef run(self):for i in range(5):print("%s is producing %d to the queue!" % (self.getName(), i))self.data.put(i)time.sleep(random.randrange(10) / 5)print("%s finished!" % self.getName())# 消費(fèi)者類 class Consumer(threading.Thread):def __init__(self, name, queue):threading.Thread.__init__(self, name=name)self.data = queuedef run(self):for i in range(5):val = self.data.get()print("%s is consuming. %d in the queue is consumed!" % (self.getName(), val))time.sleep(random.randrange(10))print("%s finished!" % self.getName())def main():queue = Queue()producer = Producer('Producer', queue)consumer = Consumer('Consumer', queue)producer.start()consumer.start()producer.join()consumer.join()print('All threads finished!')if __name__ == '__main__':main()5 線程池
傳統(tǒng)多線程問題?
? 傳統(tǒng)多線程方案會(huì)使用“即時(shí)創(chuàng)建, 即時(shí)銷毀”的策略。盡管與創(chuàng)建進(jìn)程相比,創(chuàng)建線程的時(shí)間已經(jīng)大大的縮短,但是如果提交給線程的任務(wù)是執(zhí)行時(shí)間較短,而且執(zhí)行次數(shù)極其頻繁,那么服務(wù)器將處于不停的創(chuàng)建線程,銷毀線程的狀態(tài)。
? 一個(gè)線程的運(yùn)行時(shí)間可以分為3部分:線程的啟動(dòng)時(shí)間、線程體的運(yùn)行時(shí)間和線程的銷毀時(shí)間。在多線程處理的情景中,如果線程不能被重用,就意味著每次創(chuàng)建都需要經(jīng)過啟動(dòng)、銷毀和運(yùn)行3個(gè)過程。這必然會(huì)增加系統(tǒng)相應(yīng)的時(shí)間,降低了效率。
有沒有一種高效的解決方案呢? —— 線程池
線程池基本原理:
? 我們把任務(wù)放進(jìn)隊(duì)列中去,然后開N個(gè)線程,每個(gè)線程都去隊(duì)列中取一個(gè)任務(wù),執(zhí)行完了之后告訴系統(tǒng)說我執(zhí)行完了,然后接著去隊(duì)列中取下一個(gè)任務(wù),直至隊(duì)列中所有任務(wù)取空,退出線程。
使用線程池:
? 由于線程預(yù)先被創(chuàng)建并放入線程池中,同時(shí)處理完當(dāng)前任務(wù)之后并不銷毀而是被安排處理下一個(gè)任務(wù),因此能夠避免多次創(chuàng)建線程,從而節(jié)省線程創(chuàng)建和銷毀的開銷,能帶來更好的性能和系統(tǒng)穩(wěn)定性。
線程池要設(shè)置為多少?
服務(wù)器CPU核數(shù)有限,能夠同時(shí)并發(fā)的線程數(shù)有限,并不是開得越多越好,以及線程切換是有開銷的,如果線程切換過于頻繁,反而會(huì)使性能降低
線程執(zhí)行過程中,計(jì)算時(shí)間分為兩部分:
- CPU計(jì)算,占用CPU
- 不需要CPU計(jì)算,不占用CPU,等待IO返回,比如recv(), accept(), sleep()等操作,具體操作就是比如
訪問cache、RPC調(diào)用下游service、訪問DB,等需要網(wǎng)絡(luò)調(diào)用的操作
那么如果計(jì)算時(shí)間占50%, 等待時(shí)間50%,那么為了利用率達(dá)到最高,可以開2個(gè)線程:
假如工作時(shí)間是2秒, CPU計(jì)算完1秒后,線程等待IO的時(shí)候需要1秒,此時(shí)CPU空閑了,這時(shí)就可以切換到另外一個(gè)線程,讓CPU工作1秒后,線程等待IO需要1秒,此時(shí)CPU又可以切回去,第一個(gè)線程這時(shí)剛好完成了1秒的IO等待,可以讓CPU繼續(xù)工作,就這樣循環(huán)的在兩個(gè)線程之前切換操作。
那么如果計(jì)算時(shí)間占20%, 等待時(shí)間80%,那么為了利用率達(dá)到最高,可以開5個(gè)線程:
可以想象成完成任務(wù)需要5秒,CPU占用1秒,等待時(shí)間4秒,CPU在線程等待時(shí),可以同時(shí)再激活4個(gè)線程,這樣就把CPU和IO等待時(shí)間,最大化的重疊起來
抽象一下,計(jì)算線程數(shù)設(shè)置的公式就是:
N核服務(wù)器,通過執(zhí)行業(yè)務(wù)的單線程分析出本地計(jì)算時(shí)間為x,等待時(shí)間為y,則工作線程數(shù)(線程池線程數(shù))設(shè)置為 N*(x+y)/x,能讓CPU的利用率最大化。
由于有GIL的影響,python只能使用到1個(gè)核,所以這里設(shè)置N=1
6. python 進(jìn)行并發(fā)編程
? 在Python 2的時(shí)代,高性能的網(wǎng)絡(luò)編程主要是使用Twisted、Tornado和Gevent這三個(gè)庫,但是它們的異步代碼相互之間既不兼容也不能移植。 asyncio是Python 3.4版本引入的標(biāo)準(zhǔn)庫,直接內(nèi)置了對(duì)異步IO的支持。
? asyncio的編程模型就是一個(gè)消息循環(huán)。我們從asyncio模塊中直接獲取一個(gè)EventLoop的引用,然后把需要執(zhí)行的協(xié)程扔到EventLoop中執(zhí)行,就實(shí)現(xiàn)了異步IO。
? Python的在3.4中引入了協(xié)程的概念,可是這個(gè)還是以生成器對(duì)象為基礎(chǔ)。
? Python 3.5添加了async和await這兩個(gè)關(guān)鍵字,分別用來替換asyncio.coroutine和yield from。
? python3.5則確定了協(xié)程的語法。下面將簡(jiǎn)單介紹asyncio的使用。實(shí)現(xiàn)協(xié)程的不僅僅是asyncio,tornado和gevent都實(shí)現(xiàn)了類似的功能。
(1)協(xié)程定義
用asyncio實(shí)現(xiàn)Hello world代碼如下:
import asyncio@asyncio.coroutine def hello():print("Hello world!")# 異步調(diào)用asyncio.sleep(1):r = yield from asyncio.sleep(1)print("Hello again!") # 獲取EventLoop: loop = asyncio.get_event_loop() # 執(zhí)行coroutine loop.run_until_complete(hello()) loop.close()? @asyncio.coroutine把一個(gè)generator標(biāo)記為coroutine類型,然后,我們就把這個(gè)coroutine扔到EventLoop中執(zhí)行。 hello()會(huì)首先打印出Hello world!,然后,yield from語法可以讓我們方便地調(diào)用另一個(gè)generator。由于asyncio.sleep()也是一個(gè)coroutine,所以線程不會(huì)等待asyncio.sleep(),而是直接中斷并執(zhí)行下一個(gè)消息循環(huán)。當(dāng)asyncio.sleep()返回時(shí),線程就可以從yield from拿到返回值(此處是None),然后接著執(zhí)行下一行語句。
? 把a(bǔ)syncio.sleep(1)看成是一個(gè)耗時(shí)1秒的IO操作,在此期間,主線程并未等待,而是去執(zhí)行EventLoop中其他可以執(zhí)行的coroutine了,因此可以實(shí)現(xiàn)并發(fā)執(zhí)行。
我們用Task封裝兩個(gè)coroutine試試:
import threading import asyncio@asyncio.coroutine def hello():print('Hello world! (%s)' % threading.currentThread())yield from asyncio.sleep(1)print('Hello again! (%s)' % threading.currentThread())loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()觀察執(zhí)行過程:
Hello world! (<_MainThread(MainThread, started 140735195337472)>) Hello world! (<_MainThread(MainThread, started 140735195337472)>) (暫停約1秒) Hello again! (<_MainThread(MainThread, started 140735195337472)>) Hello again! (<_MainThread(MainThread, started 140735195337472)>)由打印的當(dāng)前線程名稱可以看出,兩個(gè)coroutine是由同一個(gè)線程并發(fā)執(zhí)行的。
如果把a(bǔ)syncio.sleep()換成真正的IO操作,則多個(gè)coroutine就可以由一個(gè)線程并發(fā)執(zhí)行。
asyncio案例實(shí)戰(zhàn)
我們用asyncio的異步網(wǎng)絡(luò)連接來獲取sina、sohu和163的網(wǎng)站首頁:
async_wget.py
import asyncio@asyncio.coroutine def wget(host):print('wget %s...' % host)connect = asyncio.open_connection(host, 80)reader, writer = yield from connectheader = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % hostwriter.write(header.encode('utf-8'))yield from writer.drain()while True:line = yield from reader.readline()if line == b'\r\n':breakprint('%s header > %s' % (host, line.decode('utf-8').rstrip()))# Ignore the body, close the socketwriter.close()loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()結(jié)果信息如下:
wget www.sohu.com... wget www.sina.com.cn... wget www.163.com... (等待一段時(shí)間) (打印出sohu的header) www.sohu.com header > HTTP/1.1 200 OK www.sohu.com header > Content-Type: text/html ... (打印出sina的header) www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT ... (打印出163的header) www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0 ...可見3個(gè)連接由一個(gè)線程通過coroutine并發(fā)完成。
小結(jié)
asyncio提供了完善的異步IO支持;
異步操作需要在coroutine中通過yield from完成;
多個(gè)coroutine可以封裝成一組Task然后并發(fā)執(zhí)行。
轉(zhuǎn)載于:https://www.cnblogs.com/sunBinary/p/10976929.html
總結(jié)
以上是生活随笔為你收集整理的Python多线程原理与实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis底层数据结构简述
- 下一篇: python将某个列表按元素值分成多个子