日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python结束线程池正在运行的线程_python之线程与线程池

發(fā)布時(shí)間:2025/3/20 python 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python结束线程池正在运行的线程_python之线程与线程池 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

#進(jìn)程是資源分配的最小單位,線程是CPU調(diào)度的最小單位.每一個(gè)進(jìn)程中至少有一個(gè)線程。#傳統(tǒng)的不確切使用線程的程序稱為只含有一個(gè)線程或單線程程序,而可以使用線程的程序被稱為多線程程序,在程序中使用一個(gè)線程的方法#被稱為多線程#線程的模塊:#thread >> 實(shí)現(xiàn)線程的低級(jí)接口#threading>>> 可以提供高級(jí)方法

#同一進(jìn)程下的各個(gè)線程是可以共享該進(jìn)程的所有的資源的,各個(gè)線程之間是可以相互影響的

1.線程創(chuàng)建的兩種方式,與進(jìn)程創(chuàng)建的兩種方式基本

from threading importThreadfrom multiprocessing importProcessimporttimedeffucn1(n):

time.sleep(1)print('XXXXXXXXXXXXX',n)if __name__ == '__main__':#p = Process(target=fucn1,args=(1,))

t = Thread(target=fucn1,args=(1,))print(t1.isAlive())#返回線程是否活動(dòng)的

print(t1.getName())#返回線程名

t1.setName()#設(shè)置線程名

t.start()#開啟線程的速度非常快

t.join() #等待子線程運(yùn)行結(jié)束之后才進(jìn)行下面的代碼

print('主線程結(jié)束')

方式1

classgg(Thread):def __init__(self,n):

super().__init__()

self.n=ndefrun(self):print('xxx')print(self.n)if __name__ == '__main__':

t1= gg(66)

t1.start()

與進(jìn)程創(chuàng)建運(yùn)行相似

方式2

# threading模塊提供的一些方法:

# threading.currentThread(): 返回當(dāng)前的線程變量。

# threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前,不包括啟動(dòng)前和終止后的線程。

# threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果

1.1

from threading importThreadfrom multiprocessing importProcessimporttimedefjisuan():for i in range(100000):

i+= 1

#print(i)

defpro():

p= Process(target=jisuan)

p1= Process(target=jisuan)

p.start()

p1.start()

p1.join()

p.join()defthreading():

t1= Thread(target=jisuan)

t2= Thread(target=jisuan)

t1.start()

t2.start()if __name__ == '__main__':

t1=time.time()

pro()

t2=time.time()

t3=time.time()

threading()

t4=time.time()print(t2-t1)#0.24815773963928223

print(t4-t3)#0.01202702522277832

線程與進(jìn)程之間的效率對(duì)比

1.2

from threading importThreadimportthreadingfrom multiprocessing importProcessimportosdefwork():importtime

time.sleep(3)print(threading.current_thread().getName())if __name__ == '__main__':#在主進(jìn)程下開啟線程

t=Thread(target=work)

t.start()print(threading.current_thread())#主線程對(duì)象

print(threading.current_thread().getName()) #主線程名稱

print(threading.current_thread().ident) #主線程ID

print(threading.get_ident()) #主線程ID

print(threading.enumerate()) #連同主線程在內(nèi)有兩個(gè)運(yùn)行的線程

print(threading.active_count())print('主線程/主進(jìn)程')#'''#打印結(jié)果:#<_mainthread started>#MainThread#14104#[<_mainthread started>, ]#主線程/主進(jìn)程#Thread-1#'''

一些不常用的方法

1.3

#一個(gè)主線程要等待所有的非守護(hù)線程結(jié)束才結(jié)束#(主線程的代碼執(zhí)行完之后主線程并沒有結(jié)束,而要等待所有的非守護(hù)進(jìn)程執(zhí)行完并返回結(jié)果后才結(jié)束)

#主進(jìn)程默認(rèn)是在執(zhí)行完代碼之后,相當(dāng)于結(jié)束了,并不關(guān)心所有的子進(jìn)程的執(zhí)行結(jié)果,只是關(guān)心所有的子進(jìn)程是否結(jié)束的的信號(hào),#接收到所有子進(jìn)程結(jié)束的信號(hào)之后,主進(jìn)程(程序)才結(jié)束

importtimefrom threading importThreaddeffunc():

time.sleep(3)print('任務(wù)1')deffunc1():

time.sleep(2)print('任務(wù)2')if __name__ == '__main__':

t1= Thread(target=func)

t2= Thread(target=func1)

t1.daemon=True

t1.start()

t2.start()print('主線程結(jié)束')#結(jié)果

'''主線程結(jié)束

任務(wù)2'''

守護(hù)進(jìn)程

1.4

from threading importThreadfrom multiprocessing importProcessimporttime

a= 100

deffucn1():globala

a-= 1 #等同于

temp =a

time.sleep(0.001)#驗(yàn)證關(guān)鍵點(diǎn)

temp = temp -1a=temp

time.sleep(5)if __name__ == '__main__':

gg=[]for i in range(100):

t= Thread(target=fucn1)

t.start()#開啟線程的速度非常快

gg.append(t)print(t.is_alive())

[tt.join()for tt ingg]print(a)print('主線程結(jié)束')#線程共享進(jìn)程的數(shù)據(jù),由于數(shù)據(jù)是共享的也會(huì)有數(shù)據(jù)的不安全的情況(數(shù)據(jù)混亂),#但是由于線程的創(chuàng)建的速度非常快,如果加上系統(tǒng)的線程不多的話,#效果不明顯#解決共享數(shù)據(jù)不安全: 加鎖 ,對(duì)取值和修改值的的操作開始加鎖(與多進(jìn)程加鎖一樣)

驗(yàn)證線程之間的數(shù)據(jù)是共享的,但是也存在數(shù)據(jù)的安全的問題

信號(hào)量,事件等與進(jìn)程的操作方法一樣#Semaphore管理一個(gè)內(nèi)置的計(jì)數(shù)器,#每當(dāng)調(diào)用acquire()時(shí)內(nèi)置計(jì)數(shù)器-1;#調(diào)用release() 時(shí)內(nèi)置計(jì)數(shù)器+1;#計(jì)數(shù)器不能小于0;當(dāng)計(jì)數(shù)器為0時(shí),acquire()將阻塞線程直到其他線程調(diào)用release()。#

#實(shí)例:(同時(shí)只有5個(gè)線程可以獲得semaphore,即可以限制最大連接數(shù)為5):## 基本代碼如下#def func(sm):#sm.acquire()## 賦值或修改的代碼#sm.release()#if __name__ == '__main__':#sm=Semaphore(5)#t = Thread(target=func)

#事件與進(jìn)程的一樣,#event.isSet() 查看等待的狀態(tài)不一樣#event.wait():如果 event.isSet()==False將阻塞線程;#event.set(): 設(shè)置event的狀態(tài)值為True,所有阻塞池的線程激活進(jìn)入就緒狀態(tài), 等待操作系統(tǒng)調(diào)度;#event.clear():恢復(fù)event的狀態(tài)值為False。

與進(jìn)程功能基本一樣的相關(guān)說明

線程隊(duì)列

#線程隊(duì)列#使用import queue,用法與進(jìn)程Queue一樣,直接引入,不用通過threading 模塊引入

#1>class queue.Queue(maxsize=0) #先進(jìn)先出

importqueue#q = queue.Queue(3)#創(chuàng)建一個(gè)容量為3的隊(duì)列#q.put(1)#q.put(2)#q.put(3)#在隊(duì)列塞滿三個(gè)元素后,如果繼續(xù)塞元素,就會(huì)進(jìn)入一個(gè)阻塞的狀態(tài)## 但是如果使用q.put_nowait()塞元素的話,到塞滿之后再塞的話,就會(huì)直接拋出隊(duì)列已滿的異常,## 不會(huì)進(jìn)入阻塞的狀態(tài),與q.get_nowait()相似#print(q.get())#1#print(q.get())#2#print(q.get())#3##按照添加的順序進(jìn)行輸出#print(q.get())#>>>>>>>>>取到第四個(gè)的時(shí)候,隊(duì)列已經(jīng)是空的了,如果使用這個(gè)的話,就會(huì)進(jìn)入## 阻塞的狀態(tài)#print(q.get_nowait())#>>>>>>但是如果取到第四個(gè)使用這個(gè)的話,不會(huì)進(jìn)入阻塞的狀態(tài),直接##拋出異常#

#

## 2>class queue.LifoQueue(maxsize=0) #先進(jìn)后出#q = queue.LifoQueue(3)#q.put(1)#q.put(2)#q.put(None)#### # 取值的時(shí)候輸出為#print(q.get())#None#print(q.get())#2#print(q.get())#1#

## 3>class queue.PriorityQueue(maxsize=0) #存儲(chǔ)數(shù)據(jù)時(shí)可設(shè)置優(yōu)先級(jí)的隊(duì)列#q = queue.PriorityQueue(4)## 在設(shè)置的時(shí)候,元組的方式進(jìn)行添加 如:(優(yōu)先級(jí),元素),## 優(yōu)先級(jí)通過使用數(shù)字來表示,數(shù)字越小優(yōu)先級(jí)越高##如果優(yōu)先級(jí)一樣,就會(huì)按照元素的ASCIll順序進(jìn)行輸出,相同優(yōu)先級(jí)的兩個(gè)元素能夠進(jìn)行比較(同優(yōu)先級(jí)的兩個(gè)元素必須是同種類型的)##字典類型的東西不能進(jìn)行比較#q.put((-10,1))#q.put((-10,3))#q.put((1,20))#q.put((2,'我'))#

##按照優(yōu)先級(jí)進(jìn)行輸出#print(q.get())#(-10, 1)#print(q.get())#(-10, 3)#print(q.get())#(1, 20)#print(q.get())#(2, '我')#

#

## 這三隊(duì)列是安全的,不存在多個(gè)線程搶占同一資源或數(shù)據(jù)的情況

線程三種隊(duì)列的使用

線程池

submit的使用

源代碼欣賞importthreadingimportosclassThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None, thread_name_prefix=''): #初始化方法,設(shè)置線程池的最大線程數(shù)量

if max_workers is None:#線程池的默認(rèn)設(shè)置

max_workers = (os.cpu_count() or 1) *5默認(rèn)設(shè)置的線程數(shù)是CPU核數(shù)的5倍if max_workers <=0:raise ValueError("max_workers must be greater than 0")

self._max_workers=max_workers

self._work_queue=queue.Queue()

self._threads=set()

self._shutdown=False

self._shutdown_lock= threading.Lock()#創(chuàng)建線程鎖

self._thread_name_prefix = (thread_name_prefix or("ThreadPoolExecutor-%d" %self._counter()))def submit(self, fn, *args, **kwargs):#創(chuàng)建一個(gè)線程,并異步提交任務(wù)

with self._shutdown_lock:ifself._shutdown:raise RuntimeError('cannot schedule new futures after shutdown')

f=_base.Future()

w=_WorkItem(f, fn, args, kwargs)

self._work_queue.put(w)

self._adjust_thread_count()returnf

submit.__doc__ = _base.Executor.submit.__doc__

def _adjust_thread_count(self):#調(diào)整線程池的數(shù)量

def weakref_cb(_, q=self._work_queue):

q.put(None)

num_threads=len(self._threads)if num_threads < self._max_workers: #創(chuàng)建線程的過程

thread_name = '%s_%d' % (self._thread_name_prefix orself,

num_threads)

t= threading.Thread(name=thread_name, target=_worker,

args=(weakref.ref(self, weakref_cb),

self._work_queue))

t.daemon=True

t.start()

self._threads.add(t)

_threads_queues[t]=self._work_queuedef shutdown(self, wait=True):

with self._shutdown_lock:

self._shutdown=True

self._work_queue.put(None)ifwait:for t inself._threads:

t.join()

class ThreadPoolExecutor的源碼欣賞

1.1 submit的基本使用

#常用基本方法#class ThreadPoolExecutor():#def submit(self, fn, *args, **kwargs):#創(chuàng)建一個(gè)線程,并異步提交任務(wù)#pass#def shutdown(self,wait=True):#相當(dāng)于進(jìn)程池中的p.close() 和p.join()#pass##wait = True ,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù)##wait = False,立即返回,并不會(huì)等待池內(nèi)的任務(wù)執(zhí)行完畢## 但不管wait參數(shù)為何值,整個(gè)程序都會(huì)等到所有任務(wù)執(zhí)行完畢## submit和map必須在shutdown之前#通過

#import time#import threading#from concurrent.futures import ThreadPoolExecutor#def func(i):#time.sleep(2)#print('%s打印的:'%(threading.get_ident()),)#return i*i#

#tpool = ThreadPoolExecutor(max_workers= 5)#

#t_lst = []#for i in range(5):#t = tpool.submit(func,i)#異步提交任務(wù),與apply_async 相似,返回的也是一個(gè)結(jié)果對(duì)象#t_lst.append(t)#tpool.shutdown()#for a in t_lst:#print('>>',a.result())#獲取

線程池submit的基本使用

1.2 map的基本使用

1.2.1 源碼欣賞

def map(self, fn, *iterables, timeout=None, chunksize=1):if timeout is notNone:

end_time= timeout +time.time()

fs= [self.submit(fn, *args) for args in zip(*iterables)]def result_iterator():#生成器

try:

fs.reverse()whilefs:#Careful not to keep a reference to the popped future

if timeout isNone:yieldfs.pop().result()else:yield fs.pop().result(end_time -time.time())finally:for future infs:

future.cancel()return result_iterator()

map方法的源碼欣賞

1.2.2 map的基本使用(驗(yàn)證使用過程)

#簡單使用

from concurrent.futures importThreadPoolExecutorimportthreadingimportos,time,randomdeftask(n):print('%s is running'%(threading.get_ident()))#time.sleep(random.randint(1,2))

time.sleep(10)#測(cè)試for循環(huán)取值的時(shí)候,如果執(zhí)行的子線程還沒有執(zhí)行完的時(shí)候的情況

return n**2

if __name__ == '__main__':

t_pool= ThreadPoolExecutor(max_workers=3)

s= t_pool.map(task,range(1,5))#map取代for + sumbit

print(s) #.result_iterator at 0x000000C536C2B0F8>

for i ins :print(i)#print([i for i in s])#

print('主程序結(jié)束')'''#前面4個(gè)瞬間就出來

7252 is running

3084 is running

7824 is running

.result_iterator at 0x000000AF18AAA2B0>

7252 is running #延遲大概10s后后面4個(gè)瞬間出來

1

4

9

16#延遲10s后兩個(gè)瞬間出來

主程序結(jié)束'''

map的簡單使用

1.3 submit回調(diào)函數(shù)的應(yīng)用

from concurrent.futures importThreadPoolExecutor,ProcessPoolExecutorfrom multiprocessing importPoolimportrequestsimportjsonimportosdefget_page(url):print(' get %s' %(os.getpid(),url))

respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}defparse_page(res):

res=res.result()print(' parse %s' %(os.getpid(),res['url']))

parse_res='url: size:[%s]\n' %(res['url'],len(res['text']))

with open('db.txt','a') as f:

f.write(parse_res)if __name__ == '__main__':

urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']#p=Pool(3)

#for url in urls:

#p.apply_async(get_page,args=(url,),callback=pasrse_page)

#p.close()

#p.join()

p=ProcessPoolExecutor(3)for url inurls:

p.submit(get_page,url).add_done_callback(parse_page)#parse_page拿到的是一個(gè)future對(duì)象obj,需要用obj.result()拿到結(jié)果

回調(diào)函數(shù)的應(yīng)用

1.4 線程與進(jìn)程之間的性能測(cè)試

#進(jìn)程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優(yōu)勢(shì)

#現(xiàn)在的計(jì)算機(jī)基本上都是多核,python對(duì)于計(jì)算密集型的任務(wù)開多線程的效率并不能帶來多大性能上的提升,#甚至不如串行(沒有大量切換),但是,對(duì)于IO密集型的任務(wù)效率還是有顯著提升的。

from multiprocessing importProcessfrom threading importThreadimportthreadingimportos,timedefwork():

time.sleep(2)print('===>')if __name__ == '__main__':

l=[]print(os.cpu_count()) #本機(jī)為4核

start=time.time()for i in range(400):#p=Process(target=work) #耗時(shí)12s多,大部分時(shí)間耗費(fèi)在創(chuàng)建進(jìn)程上

p=Thread(target=work) #耗時(shí)2s多

l.append(p)

p.start()for p inl:

p.join()

stop=time.time()print('run time is %s' %(stop-start))## I/O密集型:多線程效率高

from multiprocessing importProcessfrom threading importThreadimportos,timedefwork():

res=0for i in range(100000000):

res*=iif __name__ == '__main__':

l=[]print(os.cpu_count()) #本機(jī)為4核

start=time.time()for i in range(4):

p=Process(target=work) #耗時(shí)5s多

p=Thread(target=work) #耗時(shí)18s多

l.append(p)

p.start()for p inl:

p.join()

stop=time.time()print('run time is %s' %(stop-start))#

## 計(jì)算密集型:多進(jìn)程效率高#多線程用于IO密集型,如socket,爬蟲,web#多進(jìn)程用于計(jì)算密集型,如金融分析

線程與進(jìn)程之家你的性能測(cè)試

1.5 線程的使用補(bǔ)充

#線程提供了一種便利的能夠同時(shí)處理多個(gè)請(qǐng)求的高效的服務(wù)器#多線程服務(wù)器基本有著同樣的體系結(jié)構(gòu), :主線程負(fù)責(zé)偵聽請(qǐng)求的線程#當(dāng)它收到一個(gè)請(qǐng)求的時(shí)候,一個(gè)新的工作者線程就會(huì)被建立起來,處理該客戶端#的請(qǐng)求,當(dāng)客戶端斷開連接時(shí)候,工作者線程會(huì)終止

#線程池被設(shè)計(jì)成一個(gè)線程同時(shí)只為一個(gè)客戶服務(wù),但是在服務(wù)結(jié)束之后#線程并不終止,線程池中的線程要么是事先全部建立起來,要么是在需要的時(shí)候被建立起來#在客戶端斷開連接的時(shí)候,線程并不終止,而是保持著,等待為更多的連接提供服務(wù)#

#線程池通常包含:#1.一個(gè)主要的偵聽線程來接收和分派客戶端的連接#2.一些工作者線程用來處理客戶端請(qǐng)求#3.一個(gè)線程管理系統(tǒng)用來處理那些意外終止的線程#

總結(jié)

以上是生活随笔為你收集整理的python结束线程池正在运行的线程_python之线程与线程池的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 日韩精品视频一区二区在线观看 | 国产精品久热 | 激情狠狠 | 97精品人妻麻豆一区二区 | 亚洲色图婷婷 | 干一干操一操 | 少妇一级免费 | 国产图区 | 高清乱码免费看污 | 在线你懂 | 99久久精品免费看国产交换 | 日本偷偷操 | 国产a∨精品一区二区三区仙踪林 | 精品乱码一区内射人妻无码 | 午夜一区二区三区免费 | 蜜桃精品久久久久久久免费影院 | 天海翼av在线播放 | 丁香久久综合 | 精精国产xxxx视频在线播放 | 国产女人水真多18毛片18精品 | 国产农村妇女精品久久久 | 国产精品传媒一区二区 | 日韩黄色免费视频 | 欧美成人高潮一二区在线看 | 怨女1988国语版在线观看高清 | 亚洲天堂影视 | 午夜在线免费观看视频 | 亚洲高清在线观看 | 国产午夜伦鲁鲁 | 日韩色视频在线观看 | 天天射干 | 日本电影一区二区三区 | 成人精品免费看 | 久久97人妻无码一区二区三区 | 福利片在线观看 | 色九九视频 | av在线看片 | 人人爽人人爽人人 | 日韩精品国产一区二区 | 日本色悠悠 | 亚洲第一女人av | 清纯唯美亚洲色图 | 亚洲一区二区电影网 | missav在线| 欧美午夜精品一区二区三区电影 | 超碰黄色 | 91欧美激情一区二区三区 | 粉色午夜视频 | 国产v亚洲v天堂无码久久久 | 狠狠夜夜 | 久久久久一区二区精码av少妇 | 欧美一级视频免费 | 欧美大片一区二区 | 久久久久在线观看 | 欧美精品久久久久久久久 | 乳色吐息在线看 | 波多野结衣福利 | 国产搞逼视频 | 精品国产AV色欲天媒传媒 | 深夜视频免费在线观看 | 成人一级视频在线观看 | 久久久久一级 | 中文字幕人妻丝袜乱一区三区 | 极品人妻videosss人妻 | 456亚洲影院 | 欧美一区二区三区成人精品 | 456亚洲影视 | 91精品国产综合久久久密臀九色 | 欧美少妇网 | 免费精品在线视频 | 寡妇av| 国产人妻精品一区二区三区不卡 | 一区二区三区精品国产 | 国产在线精品一区二区三区 | 天天色一色| 国产乱码久久久久 | 九热在线视频 | 真实新婚偷拍xxxxx | 91亚洲精品久久久蜜桃 | 日本一区二区三区视频在线观看 | 国产精品男女 | 草逼视频网 | 一区二区三区四区欧美 | 日本黄色录像片 | 午夜免费网站 | www.国产视频| 久久久久亚洲av成人无码电影 | 欧美国产一区二区 | 色哟哟日韩精品 | 天天av网 | 亚洲理论电影在线观看 | 久草视频2 | 97超碰人| 老熟女高潮一区二区三区 | 日本作爱视频 | 欧美午夜三级 | 人人插人人 | 亚洲精品久久久久久久久久久 | 中文字幕乱伦视频 |