python结束线程池正在运行的线程_python之线程与线程池
#進(jìn)程是資源分配的最小單位,線程是CPU調(diào)度的最小單位.每一個(gè)進(jìn)程中至少有一個(gè)線程。#傳統(tǒng)的不確切使用線程的程序稱為只含有一個(gè)線程或單線程程序,而可以使用線程的程序被稱為多線程程序,在程序中使用一個(gè)線程的方法#被稱為多線程#線程的模塊:#thread >> 實(shí)現(xiàn)線程的低級(jí)接口#threading>>> 可以提供高級(jí)方法
#同一進(jìn)程下的各個(gè)線程是可以共享該進(jìn)程的所有的資源的,各個(gè)線程之間是可以相互影響的
1.線程創(chuàng)建的兩種方式,與進(jìn)程創(chuàng)建的兩種方式基本
from threading importThreadfrom multiprocessing importProcessimporttimedeffucn1(n):
time.sleep(1)print('XXXXXXXXXXXXX',n)if __name__ == '__main__':#p = Process(target=fucn1,args=(1,))
t = Thread(target=fucn1,args=(1,))print(t1.isAlive())#返回線程是否活動(dòng)的
print(t1.getName())#返回線程名
t1.setName()#設(shè)置線程名
t.start()#開啟線程的速度非常快
t.join() #等待子線程運(yùn)行結(jié)束之后才進(jìn)行下面的代碼
print('主線程結(jié)束')
方式1
classgg(Thread):def __init__(self,n):
super().__init__()
self.n=ndefrun(self):print('xxx')print(self.n)if __name__ == '__main__':
t1= gg(66)
t1.start()
與進(jìn)程創(chuàng)建運(yùn)行相似
方式2
# threading模塊提供的一些方法:
# threading.currentThread(): 返回當(dāng)前的線程變量。
# threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前,不包括啟動(dòng)前和終止后的線程。
# threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果
1.1
from threading importThreadfrom multiprocessing importProcessimporttimedefjisuan():for i in range(100000):
i+= 1
#print(i)
defpro():
p= Process(target=jisuan)
p1= Process(target=jisuan)
p.start()
p1.start()
p1.join()
p.join()defthreading():
t1= Thread(target=jisuan)
t2= Thread(target=jisuan)
t1.start()
t2.start()if __name__ == '__main__':
t1=time.time()
pro()
t2=time.time()
t3=time.time()
threading()
t4=time.time()print(t2-t1)#0.24815773963928223
print(t4-t3)#0.01202702522277832
線程與進(jìn)程之間的效率對(duì)比
1.2
from threading importThreadimportthreadingfrom multiprocessing importProcessimportosdefwork():importtime
time.sleep(3)print(threading.current_thread().getName())if __name__ == '__main__':#在主進(jìn)程下開啟線程
t=Thread(target=work)
t.start()print(threading.current_thread())#主線程對(duì)象
print(threading.current_thread().getName()) #主線程名稱
print(threading.current_thread().ident) #主線程ID
print(threading.get_ident()) #主線程ID
print(threading.enumerate()) #連同主線程在內(nèi)有兩個(gè)運(yùn)行的線程
print(threading.active_count())print('主線程/主進(jìn)程')#'''#打印結(jié)果:#<_mainthread started>#MainThread#14104#[<_mainthread started>, ]#主線程/主進(jìn)程#Thread-1#'''
一些不常用的方法
1.3
#一個(gè)主線程要等待所有的非守護(hù)線程結(jié)束才結(jié)束#(主線程的代碼執(zhí)行完之后主線程并沒有結(jié)束,而要等待所有的非守護(hù)進(jìn)程執(zhí)行完并返回結(jié)果后才結(jié)束)
#主進(jìn)程默認(rèn)是在執(zhí)行完代碼之后,相當(dāng)于結(jié)束了,并不關(guān)心所有的子進(jìn)程的執(zhí)行結(jié)果,只是關(guān)心所有的子進(jìn)程是否結(jié)束的的信號(hào),#接收到所有子進(jìn)程結(jié)束的信號(hào)之后,主進(jìn)程(程序)才結(jié)束
importtimefrom threading importThreaddeffunc():
time.sleep(3)print('任務(wù)1')deffunc1():
time.sleep(2)print('任務(wù)2')if __name__ == '__main__':
t1= Thread(target=func)
t2= Thread(target=func1)
t1.daemon=True
t1.start()
t2.start()print('主線程結(jié)束')#結(jié)果
'''主線程結(jié)束
任務(wù)2'''
守護(hù)進(jìn)程
1.4
from threading importThreadfrom multiprocessing importProcessimporttime
a= 100
deffucn1():globala
a-= 1 #等同于
temp =a
time.sleep(0.001)#驗(yàn)證關(guān)鍵點(diǎn)
temp = temp -1a=temp
time.sleep(5)if __name__ == '__main__':
gg=[]for i in range(100):
t= Thread(target=fucn1)
t.start()#開啟線程的速度非常快
gg.append(t)print(t.is_alive())
[tt.join()for tt ingg]print(a)print('主線程結(jié)束')#線程共享進(jìn)程的數(shù)據(jù),由于數(shù)據(jù)是共享的也會(huì)有數(shù)據(jù)的不安全的情況(數(shù)據(jù)混亂),#但是由于線程的創(chuàng)建的速度非常快,如果加上系統(tǒng)的線程不多的話,#效果不明顯#解決共享數(shù)據(jù)不安全: 加鎖 ,對(duì)取值和修改值的的操作開始加鎖(與多進(jìn)程加鎖一樣)
驗(yàn)證線程之間的數(shù)據(jù)是共享的,但是也存在數(shù)據(jù)的安全的問題
信號(hào)量,事件等與進(jìn)程的操作方法一樣#Semaphore管理一個(gè)內(nèi)置的計(jì)數(shù)器,#每當(dāng)調(diào)用acquire()時(shí)內(nèi)置計(jì)數(shù)器-1;#調(diào)用release() 時(shí)內(nèi)置計(jì)數(shù)器+1;#計(jì)數(shù)器不能小于0;當(dāng)計(jì)數(shù)器為0時(shí),acquire()將阻塞線程直到其他線程調(diào)用release()。#
#實(shí)例:(同時(shí)只有5個(gè)線程可以獲得semaphore,即可以限制最大連接數(shù)為5):## 基本代碼如下#def func(sm):#sm.acquire()## 賦值或修改的代碼#sm.release()#if __name__ == '__main__':#sm=Semaphore(5)#t = Thread(target=func)
#事件與進(jìn)程的一樣,#event.isSet() 查看等待的狀態(tài)不一樣#event.wait():如果 event.isSet()==False將阻塞線程;#event.set(): 設(shè)置event的狀態(tài)值為True,所有阻塞池的線程激活進(jìn)入就緒狀態(tài), 等待操作系統(tǒng)調(diào)度;#event.clear():恢復(fù)event的狀態(tài)值為False。
與進(jìn)程功能基本一樣的相關(guān)說明
線程隊(duì)列
#線程隊(duì)列#使用import queue,用法與進(jìn)程Queue一樣,直接引入,不用通過threading 模塊引入
#1>class queue.Queue(maxsize=0) #先進(jìn)先出
importqueue#q = queue.Queue(3)#創(chuàng)建一個(gè)容量為3的隊(duì)列#q.put(1)#q.put(2)#q.put(3)#在隊(duì)列塞滿三個(gè)元素后,如果繼續(xù)塞元素,就會(huì)進(jìn)入一個(gè)阻塞的狀態(tài)## 但是如果使用q.put_nowait()塞元素的話,到塞滿之后再塞的話,就會(huì)直接拋出隊(duì)列已滿的異常,## 不會(huì)進(jìn)入阻塞的狀態(tài),與q.get_nowait()相似#print(q.get())#1#print(q.get())#2#print(q.get())#3##按照添加的順序進(jìn)行輸出#print(q.get())#>>>>>>>>>取到第四個(gè)的時(shí)候,隊(duì)列已經(jīng)是空的了,如果使用這個(gè)的話,就會(huì)進(jìn)入## 阻塞的狀態(tài)#print(q.get_nowait())#>>>>>>但是如果取到第四個(gè)使用這個(gè)的話,不會(huì)進(jìn)入阻塞的狀態(tài),直接##拋出異常#
#
## 2>class queue.LifoQueue(maxsize=0) #先進(jìn)后出#q = queue.LifoQueue(3)#q.put(1)#q.put(2)#q.put(None)#### # 取值的時(shí)候輸出為#print(q.get())#None#print(q.get())#2#print(q.get())#1#
## 3>class queue.PriorityQueue(maxsize=0) #存儲(chǔ)數(shù)據(jù)時(shí)可設(shè)置優(yōu)先級(jí)的隊(duì)列#q = queue.PriorityQueue(4)## 在設(shè)置的時(shí)候,元組的方式進(jìn)行添加 如:(優(yōu)先級(jí),元素),## 優(yōu)先級(jí)通過使用數(shù)字來表示,數(shù)字越小優(yōu)先級(jí)越高##如果優(yōu)先級(jí)一樣,就會(huì)按照元素的ASCIll順序進(jìn)行輸出,相同優(yōu)先級(jí)的兩個(gè)元素能夠進(jìn)行比較(同優(yōu)先級(jí)的兩個(gè)元素必須是同種類型的)##字典類型的東西不能進(jìn)行比較#q.put((-10,1))#q.put((-10,3))#q.put((1,20))#q.put((2,'我'))#
##按照優(yōu)先級(jí)進(jìn)行輸出#print(q.get())#(-10, 1)#print(q.get())#(-10, 3)#print(q.get())#(1, 20)#print(q.get())#(2, '我')#
#
## 這三隊(duì)列是安全的,不存在多個(gè)線程搶占同一資源或數(shù)據(jù)的情況
線程三種隊(duì)列的使用
線程池
submit的使用
源代碼欣賞importthreadingimportosclassThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None, thread_name_prefix=''): #初始化方法,設(shè)置線程池的最大線程數(shù)量
if max_workers is None:#線程池的默認(rèn)設(shè)置
max_workers = (os.cpu_count() or 1) *5默認(rèn)設(shè)置的線程數(shù)是CPU核數(shù)的5倍if max_workers <=0:raise ValueError("max_workers must be greater than 0")
self._max_workers=max_workers
self._work_queue=queue.Queue()
self._threads=set()
self._shutdown=False
self._shutdown_lock= threading.Lock()#創(chuàng)建線程鎖
self._thread_name_prefix = (thread_name_prefix or("ThreadPoolExecutor-%d" %self._counter()))def submit(self, fn, *args, **kwargs):#創(chuàng)建一個(gè)線程,并異步提交任務(wù)
with self._shutdown_lock:ifself._shutdown:raise RuntimeError('cannot schedule new futures after shutdown')
f=_base.Future()
w=_WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()returnf
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):#調(diào)整線程池的數(shù)量
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads=len(self._threads)if num_threads < self._max_workers: #創(chuàng)建線程的過程
thread_name = '%s_%d' % (self._thread_name_prefix orself,
num_threads)
t= threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon=True
t.start()
self._threads.add(t)
_threads_queues[t]=self._work_queuedef shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown=True
self._work_queue.put(None)ifwait:for t inself._threads:
t.join()
class ThreadPoolExecutor的源碼欣賞
1.1 submit的基本使用
#常用基本方法#class ThreadPoolExecutor():#def submit(self, fn, *args, **kwargs):#創(chuàng)建一個(gè)線程,并異步提交任務(wù)#pass#def shutdown(self,wait=True):#相當(dāng)于進(jìn)程池中的p.close() 和p.join()#pass##wait = True ,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù)##wait = False,立即返回,并不會(huì)等待池內(nèi)的任務(wù)執(zhí)行完畢## 但不管wait參數(shù)為何值,整個(gè)程序都會(huì)等到所有任務(wù)執(zhí)行完畢## submit和map必須在shutdown之前#通過
#import time#import threading#from concurrent.futures import ThreadPoolExecutor#def func(i):#time.sleep(2)#print('%s打印的:'%(threading.get_ident()),)#return i*i#
#tpool = ThreadPoolExecutor(max_workers= 5)#
#t_lst = []#for i in range(5):#t = tpool.submit(func,i)#異步提交任務(wù),與apply_async 相似,返回的也是一個(gè)結(jié)果對(duì)象#t_lst.append(t)#tpool.shutdown()#for a in t_lst:#print('>>',a.result())#獲取
線程池submit的基本使用
1.2 map的基本使用
1.2.1 源碼欣賞
def map(self, fn, *iterables, timeout=None, chunksize=1):if timeout is notNone:
end_time= timeout +time.time()
fs= [self.submit(fn, *args) for args in zip(*iterables)]def result_iterator():#生成器
try:
fs.reverse()whilefs:#Careful not to keep a reference to the popped future
if timeout isNone:yieldfs.pop().result()else:yield fs.pop().result(end_time -time.time())finally:for future infs:
future.cancel()return result_iterator()
map方法的源碼欣賞
1.2.2 map的基本使用(驗(yàn)證使用過程)
#簡單使用
from concurrent.futures importThreadPoolExecutorimportthreadingimportos,time,randomdeftask(n):print('%s is running'%(threading.get_ident()))#time.sleep(random.randint(1,2))
time.sleep(10)#測(cè)試for循環(huán)取值的時(shí)候,如果執(zhí)行的子線程還沒有執(zhí)行完的時(shí)候的情況
return n**2
if __name__ == '__main__':
t_pool= ThreadPoolExecutor(max_workers=3)
s= t_pool.map(task,range(1,5))#map取代for + sumbit
print(s) #.result_iterator at 0x000000C536C2B0F8>
for i ins :print(i)#print([i for i in s])#
print('主程序結(jié)束')'''#前面4個(gè)瞬間就出來
7252 is running
3084 is running
7824 is running
.result_iterator at 0x000000AF18AAA2B0>
7252 is running #延遲大概10s后后面4個(gè)瞬間出來
1
4
9
16#延遲10s后兩個(gè)瞬間出來
主程序結(jié)束'''
map的簡單使用
1.3 submit回調(diào)函數(shù)的應(yīng)用
from concurrent.futures importThreadPoolExecutor,ProcessPoolExecutorfrom multiprocessing importPoolimportrequestsimportjsonimportosdefget_page(url):print(' get %s' %(os.getpid(),url))
respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}defparse_page(res):
res=res.result()print(' parse %s' %(os.getpid(),res['url']))
parse_res='url: size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)if __name__ == '__main__':
urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']#p=Pool(3)
#for url in urls:
#p.apply_async(get_page,args=(url,),callback=pasrse_page)
#p.close()
#p.join()
p=ProcessPoolExecutor(3)for url inurls:
p.submit(get_page,url).add_done_callback(parse_page)#parse_page拿到的是一個(gè)future對(duì)象obj,需要用obj.result()拿到結(jié)果
回調(diào)函數(shù)的應(yīng)用
1.4 線程與進(jìn)程之間的性能測(cè)試
#進(jìn)程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優(yōu)勢(shì)
#現(xiàn)在的計(jì)算機(jī)基本上都是多核,python對(duì)于計(jì)算密集型的任務(wù)開多線程的效率并不能帶來多大性能上的提升,#甚至不如串行(沒有大量切換),但是,對(duì)于IO密集型的任務(wù)效率還是有顯著提升的。
from multiprocessing importProcessfrom threading importThreadimportthreadingimportos,timedefwork():
time.sleep(2)print('===>')if __name__ == '__main__':
l=[]print(os.cpu_count()) #本機(jī)為4核
start=time.time()for i in range(400):#p=Process(target=work) #耗時(shí)12s多,大部分時(shí)間耗費(fèi)在創(chuàng)建進(jìn)程上
p=Thread(target=work) #耗時(shí)2s多
l.append(p)
p.start()for p inl:
p.join()
stop=time.time()print('run time is %s' %(stop-start))## I/O密集型:多線程效率高
from multiprocessing importProcessfrom threading importThreadimportos,timedefwork():
res=0for i in range(100000000):
res*=iif __name__ == '__main__':
l=[]print(os.cpu_count()) #本機(jī)為4核
start=time.time()for i in range(4):
p=Process(target=work) #耗時(shí)5s多
p=Thread(target=work) #耗時(shí)18s多
l.append(p)
p.start()for p inl:
p.join()
stop=time.time()print('run time is %s' %(stop-start))#
## 計(jì)算密集型:多進(jìn)程效率高#多線程用于IO密集型,如socket,爬蟲,web#多進(jìn)程用于計(jì)算密集型,如金融分析
線程與進(jìn)程之家你的性能測(cè)試
1.5 線程的使用補(bǔ)充
#線程提供了一種便利的能夠同時(shí)處理多個(gè)請(qǐng)求的高效的服務(wù)器#多線程服務(wù)器基本有著同樣的體系結(jié)構(gòu), :主線程負(fù)責(zé)偵聽請(qǐng)求的線程#當(dāng)它收到一個(gè)請(qǐng)求的時(shí)候,一個(gè)新的工作者線程就會(huì)被建立起來,處理該客戶端#的請(qǐng)求,當(dāng)客戶端斷開連接時(shí)候,工作者線程會(huì)終止
#線程池被設(shè)計(jì)成一個(gè)線程同時(shí)只為一個(gè)客戶服務(wù),但是在服務(wù)結(jié)束之后#線程并不終止,線程池中的線程要么是事先全部建立起來,要么是在需要的時(shí)候被建立起來#在客戶端斷開連接的時(shí)候,線程并不終止,而是保持著,等待為更多的連接提供服務(wù)#
#線程池通常包含:#1.一個(gè)主要的偵聽線程來接收和分派客戶端的連接#2.一些工作者線程用來處理客戶端請(qǐng)求#3.一個(gè)線程管理系統(tǒng)用來處理那些意外終止的線程#
總結(jié)
以上是生活随笔為你收集整理的python结束线程池正在运行的线程_python之线程与线程池的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: windows ftp服务器_ftp客户
- 下一篇: python扩展库丰富吗_python扩