日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

并发编程——多线程

發(fā)布時(shí)間:2025/4/9 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 并发编程——多线程 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本節(jié)導(dǎo)讀:

  • 什么是線程
  • 線程與進(jìn)程的區(qū)別
  • 開啟線程的兩種方法
  • 多線程與多進(jìn)程的區(qū)別
  • thread對(duì)象的其他屬性
  • 守護(hù)線程
  • gil全局解釋器鎖
  • 死鎖現(xiàn)象與遞歸鎖
  • 信號(hào)量,event,定時(shí)器
  • 線程queue
  • 進(jìn)程池與線程池

?

一 什么是線程

  線程顧名思義,就是一條流水線工作的過程(流水線的工作需要電源,電源就相當(dāng)于cpu),而一條流水線必須屬于一個(gè)車間,一個(gè)車間的工作過程是一個(gè)進(jìn)程,車間負(fù)責(zé)把資源整合到一起,是一個(gè)資源單位,而一個(gè)車間內(nèi)至少有一條流水線。

所以,進(jìn)程只是用來把資源集中到一起(進(jìn)程只是一個(gè)資源單位,或者說資源集合),而線程才是cpu上的執(zhí)行單位。

多線程(即多個(gè)控制線程)的概念是,在一個(gè)進(jìn)程中存在多個(gè)線程,多個(gè)線程共享該進(jìn)程的地址空間,相當(dāng)于一個(gè)車間內(nèi)有多條流水線,都共用一個(gè)車間的資源。例如,北京地鐵與上海地鐵是不同的進(jìn)程,而北京地鐵里的13號(hào)線是一個(gè)線程,北京地鐵所有的線路共享北京地鐵所有的資源,比如所有的乘客可以被所有線路拉。

?

二 線程與進(jìn)程的區(qū)別

  • 同一個(gè)進(jìn)程內(nèi)的多個(gè)線程共享該進(jìn)程內(nèi)的地址資源
  • 創(chuàng)建線程的開銷要遠(yuǎn)小于創(chuàng)建進(jìn)程的開銷(創(chuàng)建一個(gè)進(jìn)程,就是創(chuàng)建一個(gè)車間,涉及到申請(qǐng)空間,而且在該空間內(nèi)建至少一條流水線,但創(chuàng)建線程,就只是在一個(gè)車間內(nèi)造一條流水線,無需申請(qǐng)空間,所以創(chuàng)建開銷小)

三 開啟線程的兩種方法

threading模塊介紹

multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面,有很大的相似性,因而不再詳細(xì)介紹

方式一

from threading import Thread import timedef sayhi(name):time.sleep(2)print('%s say hello' %name)if __name__ == '__main__':t=Thread(target=sayhi,args=('egon',))t.start()print('主線程') 實(shí)例Thread對(duì)象

方式二

from threading import Thread import timeclass Sayhi(Thread):def __init__(self,name):super().__init__()self.name=namedef run(self):time.sleep(2)print('%s say hello' % self.name)if __name__ == '__main__':t = Sayhi('egon')t.start()print('主線程') 繼承Thread類

?

四 多線程與多進(jìn)程的區(qū)別

開啟速度

在主進(jìn)程下開啟線程,幾乎是t.start ()的同時(shí)就將線程開啟了,說明開銷極小

在主進(jìn)程下開子進(jìn)程,p.start ()將開啟進(jìn)程的信號(hào)發(fā)給操作系統(tǒng)后,操作系統(tǒng)要申請(qǐng)內(nèi)存空間,讓好拷貝父進(jìn)程地址空間到子進(jìn)程,開銷遠(yuǎn)大于線程

pid

在主進(jìn)程下開啟多個(gè)線程,每個(gè)線程都跟主進(jìn)程的pid一樣

開多個(gè)進(jìn)程,每個(gè)進(jìn)程都有不同的pid

數(shù)據(jù)共享

進(jìn)程之間地址空間是隔離的

同一進(jìn)程內(nèi)開啟的多個(gè)線程是共享該進(jìn)程地址空間的

?

五 thread對(duì)象的其他屬性

Thread實(shí)例對(duì)象的方法# isAlive(): 返回線程是否活動(dòng)的。# getName(): 返回線程名。# setName(): 設(shè)置線程名。 threading模塊提供的一些方法:# threading.currentThread(): 返回當(dāng)前的線程變量。# threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前,不包括啟動(dòng)前和終止后的線程。# threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。

?

六 守護(hù)線程

無論是進(jìn)程還是線程,都遵循:守護(hù)xxx會(huì)等待主xxx運(yùn)行完畢后被銷毀

需要強(qiáng)調(diào)的是:運(yùn)行完畢并非終止運(yùn)行

對(duì)主進(jìn)程來說,運(yùn)行完畢指的是主進(jìn)程代碼運(yùn)行完畢,

對(duì)主線程來說,運(yùn)行完畢指的是主線程所在的進(jìn)程內(nèi)所有非守護(hù)線程統(tǒng)統(tǒng)運(yùn)行完畢,主線程才算運(yùn)行完畢,

主進(jìn)程在其代碼結(jié)束后就已經(jīng)算運(yùn)行完畢了(守護(hù)進(jìn)程在此時(shí)就被回收),然后主進(jìn)程會(huì)一直等非守護(hù)的子進(jìn)程都運(yùn)行完畢后回收子進(jìn)程的資源(否則會(huì)產(chǎn)生僵尸進(jìn)程),才會(huì)結(jié)束,

主線程在其他非守護(hù)線程運(yùn)行完畢后才算運(yùn)行完畢(守護(hù)線程在此時(shí)就被回收)。因?yàn)橹骶€程的結(jié)束意味著進(jìn)程的結(jié)束,進(jìn)程整體的資源都將被回收,而進(jìn)程必須保證非守護(hù)線程都運(yùn)行完畢后才能結(jié)束。

?

七 gil全局解釋器鎖

首先需要明確的一點(diǎn)是GIL并不是Python的特性,它是在實(shí)現(xiàn)Python解析器(CPython)時(shí)所引入的一個(gè)概念。就好比C++是一套語言(語法)標(biāo)準(zhǔn),但是可以用不同的編譯器來編譯成可執(zhí)行代碼。>有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執(zhí)行環(huán)境來執(zhí)行。像其中的JPython就沒有GIL。然而因?yàn)镃Python是大部分環(huán)境下默認(rèn)的Python執(zhí)行環(huán)境。所以在很多人的概念里CPython就是Python,也就想當(dāng)然的把GIL歸結(jié)為Python語言的缺陷。所以這里要先明確一點(diǎn):GIL并不是Python的特性,Python完全可以不依賴于GIL

八 死鎖現(xiàn)象與遞歸鎖

死鎖: 是指兩個(gè)或兩個(gè)以上的進(jìn)程或線程在執(zhí)行過程中,因爭(zhēng)奪資源而造成的一種互相等待的現(xiàn)象,若無外力作用,它們都將無法推進(jìn)下去。此時(shí)稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖,這些永遠(yuǎn)在互相等待的進(jìn)程稱為死鎖進(jìn)程,如下就是死鎖

?

from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock()class MyThread(Thread):def run(self):self.func1()self.func2()def func1(self):mutexA.acquire()print('\033[41m%s 拿到A鎖\033[0m' %self.name)mutexB.acquire()print('\033[42m%s 拿到B鎖\033[0m' %self.name)mutexB.release()mutexA.release()def func2(self):mutexB.acquire()print('\033[43m%s 拿到B鎖\033[0m' %self.name)time.sleep(2)mutexA.acquire()print('\033[44m%s 拿到A鎖\033[0m' %self.name)mutexA.release()mutexB.release()if __name__ == '__main__':for i in range(10):t=MyThread()t.start() 死鎖

遞歸鎖

?

遞歸鎖,死鎖的解決方案,在Python中為了支持在同一線程中多次請(qǐng)求同一資源,python提供了可重入鎖RLock。

?

這個(gè)RLock內(nèi)部維護(hù)著一個(gè)Lock和一個(gè)counter變量,counter記錄了acquire的次數(shù),從而使得資源可以被多次require。直到一個(gè)線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會(huì)發(fā)生死鎖,二者的區(qū)別是:遞歸鎖可以連續(xù)acquire多次,而互斥鎖只能acquire一次

from threading import Thread,RLock import timemutexA=mutexB=RLock() #一個(gè)線程拿到鎖,counter加1,該線程內(nèi)又碰到加鎖的情況,則counter繼續(xù)加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止class MyThread(Thread):def run(self):self.func1()self.func2()def func1(self):mutexA.acquire()print('\033[41m%s 拿到A鎖\033[0m' %self.name)mutexB.acquire()print('\033[42m%s 拿到B鎖\033[0m' %self.name)mutexB.release()mutexA.release()def func2(self):mutexB.acquire()print('\033[43m%s 拿到B鎖\033[0m' %self.name)time.sleep(2)mutexA.acquire()print('\033[44m%s 拿到A鎖\033[0m' %self.name)mutexA.release()mutexB.release()if __name__ == '__main__':for i in range(10):t=MyThread()t.start() 遞歸鎖

?

?

九 信號(hào)量,event,定時(shí)器

?

信號(hào)量

  信號(hào)量也是一把鎖,可以指定信號(hào)量為5,對(duì)比互斥鎖同一時(shí)間只能有一個(gè)任務(wù)搶到鎖去執(zhí)行,信號(hào)量同一時(shí)間可以有5個(gè)任務(wù)拿到鎖去執(zhí)行,如果說互斥鎖是合租房屋的人去搶一個(gè)廁所,那么信號(hào)量就相當(dāng)于一群路人爭(zhēng)搶公共廁所,公共廁所有多個(gè)坑位,這意味著同一時(shí)間可以有多個(gè)人上公共廁所,但公共廁所容納的人數(shù)是一定的,這便是信號(hào)量的大小

?

from threading import Thread,Semaphore import threading import timedef func():sm.acquire()print('%s get sm' %threading.current_thread().getName())time.sleep(3)sm.release()if __name__ == '__main__':sm=Semaphore(5)for i in range(23):t=Thread(target=func)t.start()#Semaphore管理一個(gè)內(nèi)置的計(jì)數(shù)器,每當(dāng)調(diào)用acquire()時(shí)內(nèi)置計(jì)數(shù)器-1;調(diào)用release() 時(shí)內(nèi)置計(jì)數(shù)器+1;計(jì)數(shù)器不能小于0;當(dāng)計(jì)數(shù)器為0時(shí)acquire()將阻塞線程直到其他線程調(diào)用release()。 View Code

?

event

線程的一個(gè)關(guān)鍵特性是每個(gè)線程都是獨(dú)立運(yùn)行且狀態(tài)不可預(yù)測(cè)。如果程序中的其 他線程需要通過判斷某個(gè)線程的狀態(tài)來確定自己下一步的操作,這時(shí)線程同步問題就會(huì)變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對(duì)象。 對(duì)象包含一個(gè)可由線程設(shè)置的信號(hào)標(biāo)志,它允許線程等待某些事件的發(fā)生。在 初始情況下,Event對(duì)象中的信號(hào)標(biāo)志被設(shè)置為假。如果有線程等待一個(gè)Event對(duì)象, 而這個(gè)Event對(duì)象的標(biāo)志為假,那么這個(gè)線程將會(huì)被一直阻塞直至該標(biāo)志為真。一個(gè)線程如果將一個(gè)Event對(duì)象的信號(hào)標(biāo)志設(shè)置為真,它將喚醒所有等待這個(gè)Event對(duì)象的線程。如果一個(gè)線程等待一個(gè)已經(jīng)被設(shè)置為真的Event對(duì)象,那么它將忽略這個(gè)事件, 繼續(xù)執(zhí)行

from threading import Eventevent.isSet():返回event的狀態(tài)值;event.wait():如果 event.isSet()==False將阻塞線程;event.set(): 設(shè)置event的狀態(tài)值為True,所有阻塞池的線程激活進(jìn)入就緒狀態(tài), 等待操作系統(tǒng)調(diào)度;event.clear():恢復(fù)event的狀態(tài)值為False。

例如,有多個(gè)工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務(wù)正常才讓那些工作線程去連接MySQL服務(wù)器,如果連接不成功,都會(huì)去嘗試重新連接。那么我們就可以采用threading.Event機(jī)制來協(xié)調(diào)各個(gè)工作線程的連接操作

?

from threading import Thread,Event import threading import time,random def conn_mysql():count=1while not event.is_set():if count > 3:raise TimeoutError('鏈接超時(shí)')print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))event.wait(0.5)count+=1print('<%s>鏈接成功' %threading.current_thread().getName())def check_mysql():print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())time.sleep(random.randint(2,4))event.set() if __name__ == '__main__':event=Event()conn1=Thread(target=conn_mysql)conn2=Thread(target=conn_mysql)check=Thread(target=check_mysql)conn1.start()conn2.start()check.start()

定時(shí)器

定時(shí)器,指定n秒后執(zhí)行某操作

?

from threading import Timerdef hello():print("hello, world")t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed

十 線程queue

有三種不同的用法

class queue.Queue(maxsize=0) #隊(duì)列:先進(jìn)先出

import queueq=queue.Queue() q.put('first') q.put('second') q.put('third')print(q.get()) print(q.get()) print(q.get())''' 結(jié)果(先進(jìn)先出): first second third '''

class queue.LifoQueue(maxsize=0) #堆棧:last in fisrt out

import queueq=queue.LifoQueue() q.put('first') q.put('second') q.put('third')print(q.get()) print(q.get()) print(q.get())''' 結(jié)果(后進(jìn)先出): third second first '''

class queue.PriorityQueue(maxsize=0) #優(yōu)先級(jí)隊(duì)列:存儲(chǔ)數(shù)據(jù)時(shí)可設(shè)置優(yōu)先級(jí)的隊(duì)列

?

import queueq=queue.PriorityQueue() #put進(jìn)入一個(gè)元組,元組的第一個(gè)元素是優(yōu)先級(jí)(通常是數(shù)字,也可以是非數(shù)字之間的比較),數(shù)字越小優(yōu)先級(jí)越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c'))print(q.get()) print(q.get()) print(q.get())''' 結(jié)果(數(shù)字越小優(yōu)先級(jí)越高,優(yōu)先級(jí)高的優(yōu)先出隊(duì)): (10, 'b') (20, 'a') (30, 'c') '''

十一 進(jìn)程池與線程池

在剛開始學(xué)多進(jìn)程或多線程時(shí),我們迫不及待地基于多進(jìn)程或多線程實(shí)現(xiàn)并發(fā)的套接字通信,然而這種實(shí)現(xiàn)方式的致命缺陷是:服務(wù)的開啟的進(jìn)程數(shù)或線程數(shù)都會(huì)隨著并發(fā)的客戶端數(shù)目地增多而增多,這會(huì)對(duì)服務(wù)端主機(jī)帶來巨大的壓力,甚至于不堪重負(fù)而癱瘓,于是我們必須對(duì)服務(wù)端開啟的進(jìn)程數(shù)或線程數(shù)加以控制,讓機(jī)器在一個(gè)自己可以承受的范圍內(nèi)運(yùn)行,這就是進(jìn)程池或線程池的用途,例如進(jìn)程池,就是用來存放進(jìn)程的池子,本質(zhì)還是基于多進(jìn)程,只不過是對(duì)開啟進(jìn)程的數(shù)目加上了限制

介紹

官網(wǎng):https://docs.python.org/dev/library/concurrent.futures.htmlconcurrent.futures模塊提供了高度封裝的異步調(diào)用接口 ThreadPoolExecutor:線程池,提供異步調(diào)用 ProcessPoolExecutor: 進(jìn)程池,提供異步調(diào)用 Both implement the same interface, which is defined by the abstract Executor class.

基本方法

1、submit(fn, *args, **kwargs) 異步提交任務(wù)2、map(func, *iterables, timeout=None, chunksize=1) 取代for循環(huán)submit的操作3、shutdown(wait=True) 相當(dāng)于進(jìn)程池的pool.close()+pool.join()操作 wait=True,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù) wait=False,立即返回,并不會(huì)等待池內(nèi)的任務(wù)執(zhí)行完畢 但不管wait參數(shù)為何值,整個(gè)程序都會(huì)等到所有任務(wù)執(zhí)行完畢 submit和map必須在shutdown之前4、result(timeout=None) 取得結(jié)果5、add_done_callback(fn) 回調(diào)函數(shù)

進(jìn)程池

用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport os,time,random def task(n):print('%s is runing' %os.getpid())time.sleep(random.randint(1,3))return n**2if __name__ == '__main__':executor=ProcessPoolExecutor(max_workers=3)futures=[]for i in range(11):future=executor.submit(task,i)futures.append(future)executor.shutdown(True)print('+++>')for future in futures:print(future.result()) 進(jìn)程池

線程池

用法

把ProcessPoolExecutor換成ThreadPoolExecutor,其余用法全部相同

map方法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport os,time,random def task(n):print('%s is runing' %os.getpid())time.sleep(random.randint(1,3))return n**2if __name__ == '__main__':executor=ThreadPoolExecutor(max_workers=3)# for i in range(11):# future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit map

回調(diào)函數(shù)

可以為進(jìn)程池或線程池內(nèi)的每個(gè)進(jìn)程或線程綁定一個(gè)函數(shù),該函數(shù)在進(jìn)程或線程的任務(wù)執(zhí)行完畢后自動(dòng)觸發(fā),并接收任務(wù)的返回值當(dāng)作參數(shù),該函數(shù)稱為回調(diào)函數(shù)

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import osdef get_page(url):print('<進(jìn)程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def parse_page(res):res=res.result()print('<進(jìn)程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=ProcessPoolExecutor(3)for url in urls:p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個(gè)future對(duì)象obj,需要用obj.result()拿到結(jié)果 回調(diào)函數(shù)

?

?

轉(zhuǎn)載于:https://www.cnblogs.com/leiyiming/p/9367200.html

總結(jié)

以上是生活随笔為你收集整理的并发编程——多线程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。