python3练习题:并发编程(21-25)
關(guān)于python3中的并發(fā)編程,經(jīng)過(guò)這些天的學(xué)習(xí),歸納如下:
#practice21:多線程
- 方法一:直接Thread()構(gòu)造
- 方法二:構(gòu)造Thread的子類
執(zhí)行結(jié)果:
downloading 1000001 : downloading 1000002 : downloading 1000003 : downloading 1000004 : downloading 1000005 : downloading 1000006 : downloading 1000007 : downloading 1000008 : downloading 1000009 : converting 1000003 : converting 1000006 : converting 1000004 : converting 1000009 : converting 1000001 : converting 1000005 : converting 1000008 : converting 1000002 : converting 1000007 : main thread ******************** downloading 1000001 : downloading 1000002 : downloading 1000003 : downloading 1000004 : downloading 1000005 : downloading 1000006 : downloading 1000007 : downloading 1000008 : downloading 1000009 : converting 1000003 : converting 1000002 : converting 1000005 : converting 1000004 : converting 1000001 : converting 1000009 : converting 1000008 : converting 1000006 : converting 1000007 : main thread [Finished in 0.9s]#practice22:線程間通信
- .Queue,該隊(duì)列是線程安全的;
- 一個(gè)進(jìn)程內(nèi)的多個(gè)線程共用地址空間,這是線程間通信的基本依據(jù);
- 本例采用生產(chǎn)者/消費(fèi)者模型,有多個(gè)生產(chǎn)者和一個(gè)消費(fèi)者,每個(gè)生產(chǎn)者占用一個(gè)線程
- 消費(fèi)者只有一個(gè),故必須使用循環(huán)來(lái)處理生產(chǎn)者生產(chǎn)的數(shù)據(jù)
程序優(yōu)化:(三處)
1. StringIO的使用替代文件
2. sid的構(gòu)造
3. 列表推導(dǎo)式構(gòu)造線程列表
感覺(jué)還不是很熟練,來(lái)個(gè)實(shí)例:程序設(shè)計(jì)要求如下:
1、調(diào)用OTCBTC的API,獲取所有買家、賣家出價(jià)數(shù)據(jù)
2、涉及的幣種有:EOS、ETH、BCH、NEO
3、將獲取到的json數(shù)據(jù)轉(zhuǎn)換成xml格式并保存
4、要求使用多線程
from threading import Thread import requests from xml.etree.ElementTree import ElementTree,Element from queue import Queueclass DownloadThread(Thread):'''下載當(dāng)前某種貨幣的賣單與買單'''def __init__(self,coin_id,queue):Thread.__init__(self)self.coin_id = coin_idself.queue = queueself.url = "https://bb.otcbtc.com/api/v2/depth?market=%s&limit=1000"self.url %= coin_iddef download(self,url):'''下載json數(shù)據(jù),存儲(chǔ)為data'''response = requests.get(url)return response.json()def run(self):print('downloading %s' % self.coin_id)data = self.download(self.url)self.queue.put((self.coin_id,data))class ConvertThread(Thread):'''把請(qǐng)求響應(yīng)轉(zhuǎn)化為xml文件'''def __init__(self,queue):Thread.__init__(self)self.queue = queuedef setchildtree(self,superelement_tag,spec_dict):'''構(gòu)建asks tree或者bids tree. superelement_tag是子樹(shù)的根節(jié)點(diǎn)名,spec_dict是整個(gè)json字符串轉(zhuǎn)換后的python字典'''e =Element(superelement_tag)for list_item in spec_dict[superelement_tag]:e1 = Element('item')e.append(e1)e2_price = Element('price')e2_price.text = list_item[0]e1.append(e2_price)e2_volumn = Element('volumn')e2_volumn.text = list_item[1]e1.append(e2_volumn)return edef convert(self,coin_id,spec_dict):'''將請(qǐng)求響應(yīng)body的字典轉(zhuǎn)換為xml文件'''root = Element('data')e_timestamp = Element('timestamp')#必須在xml中把數(shù)字變成字符串!否則報(bào)錯(cuò):TypeError: cannot serialize 1530197213 (type int),序列化錯(cuò)誤!e_timestamp.text = str(spec_dict['timestamp'])root.append(e_timestamp)asks_childtree = self.setchildtree('asks',spec_dict)root.append(asks_childtree)bids_childtree = self.setchildtree('bids',spec_dict)root.append(bids_childtree)et = ElementTree(root)et.write('%s.xml' % coin_id)def run(self):while True:#獲取隊(duì)列中已經(jīng)下載好的數(shù)據(jù)coin_id,data = self.queue.get()#判斷隊(duì)列是否已經(jīng)收到哨符!if data == False:breakprint('converting %s' % coin_id)self.convert(coin_id,data)if __name__ == '__main__':queue = Queue()markets = ['eosbtc','ethbtc','bchbtc','neobtc']threads = [DownloadThread(market,queue) for market in markets]for thread in threads:thread.start()ct = ConvertThread(queue)ct.start()#等待所有下載線程完畢for thread in threads:thread.join()#添加終止convert線程的哨符queue.put(('xxx',False))#practice23:線程間事件通知
1、 Event的使用
+ Event.wait與Event.set
可以看出,e.wait方法相當(dāng)于阻塞函數(shù),阻塞程序繼續(xù)執(zhí)行,直到等到觸發(fā)信號(hào)e.set()
從運(yùn)行框可以看出,程序并未執(zhí)行完。
from threading import Event,Thread def f(e):print('hello')e.wait()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start() e.set()由于e.set(),線程被觸發(fā)繼續(xù)執(zhí)行,程序最后運(yùn)行完退出。
- Event.clear()
Event對(duì)象調(diào)用一對(duì)wait/set方法后就不能再次調(diào)用這對(duì)方法了,若想再次調(diào)用,必須先對(duì)Event對(duì)象調(diào)用clear方法!
from threading import Event,Thread def f(e):while True:print('hello')e.wait()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start() e.set()由于e.set()使得線程內(nèi)的阻塞函數(shù)e.wait()失效,故循環(huán)無(wú)限往復(fù)
from threading import Event,Thread import time def f(e):while True:print('hello')e.wait()e.clear()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start() e.set() time.sleep(1) print('*'*40) e.set()主線程與子線程共同維護(hù)Event對(duì)象e
e.start()啟動(dòng)子線程,對(duì)應(yīng)輸出hello,然后開(kāi)始阻塞;主線程e.set()結(jié)束子線程的阻塞,e.clear()使得e.start()可以重新生效,輸出world與hello,然循環(huán)再次被e.wait()阻塞;
等待一秒,e.set()使得阻塞再次被解除!
2、 實(shí)例:
要求:
+ 多線程下載股票csv數(shù)據(jù)(生產(chǎn)者)
+ 單線程轉(zhuǎn)換為xml文件(消費(fèi)者)
+ 單線程打包xml文件(每當(dāng)生成3個(gè)xml文件便打包為一個(gè)tar.gz包)
不足之處:tar線程 最終的退出方式使用了全局變量,不太優(yōu)雅;守護(hù)線程感覺(jué)又不滿足條件
#practice24:線程池
concurrent.futures 函數(shù)庫(kù)有一個(gè) ThreadPoolExecutor 類,可以構(gòu)建多線程(異步執(zhí)行多個(gè)調(diào)用)。
1、 多線程的使用方法
from concurrent.futures import ThreadPoolExecutordef handle(a,b):print('hello world',str(a*b))return a*b #構(gòu)建多線程對(duì)象:executor executor = ThreadPoolExecutor(max_workers=3) #調(diào)用submit方法,提交任務(wù)給線程池,默認(rèn)一次submit使用一個(gè)線程 #線程執(zhí)行結(jié)果由Future對(duì)象保存 future = executor.submit(handle,3,4) #調(diào)用result方法提取結(jié)果,如果線程未結(jié)束,則阻塞起來(lái),直到有結(jié)果 result = future.result() print(result)#除了submit,還有更高效的提交任務(wù)方法map,返回迭代器,每次迭代返回函數(shù)的執(zhí)行結(jié)果,不是future對(duì)象 #使用3個(gè)線程,依次執(zhí)行handle(1,1) handle(2,2) handle(3,3) for result in executor.map(handle,[1,2,3],[1,2,3]):print(result)2、實(shí)例
要求:
1. 構(gòu)建echo TCP服務(wù)器,響應(yīng)客戶端的請(qǐng)求,即直接返回客戶端發(fā)來(lái)的數(shù)據(jù)。
2. TCP服務(wù)器開(kāi)啟10個(gè)線程異步處理客戶端請(qǐng)求。
3. 構(gòu)建echo客戶端,發(fā)送請(qǐng)求驗(yàn)證多線程。
- 服務(wù)端
- 客戶端
【運(yùn)行結(jié)果】
- 客戶端
- 服務(wù)端
先運(yùn)行服務(wù)端代碼,作為服務(wù)器是無(wú)限循環(huán),等待請(qǐng)求
sendall,recv,accept無(wú)數(shù)據(jù)時(shí)都會(huì)阻塞
必須先運(yùn)行服務(wù)器代碼,再運(yùn)行多個(gè)客戶端!
#practice25:多進(jìn)程
1、 多進(jìn)程的定義
- 創(chuàng)建子進(jìn)程
- 與線程的區(qū)別
雖然進(jìn)程與線程的很多方法相似,但最大的不同是,進(jìn)程之間占用不同的地址空間。所以不能使用之前線程共用全局變量的方法進(jìn)通信!
2、 進(jìn)程間通信
- 使用multiprocessing.Queue
- 使用multiprocessing.Pipe
3、 多進(jìn)程使用場(chǎng)景:cpu密集型操作
from threading import Thread from multiprocessing import Processdef isarmstrong(n):'''求n是不是水仙花數(shù),返回bool結(jié)果(無(wú)須關(guān)注具體算法)'''a,t = [],nwhile t > 0:a.append(t % 10)t /= 10k = len(a)return sum(x * k for x in a) == ndef findarmstrong(a,b):'''在a-b間尋找水仙花樹(shù)'''result = [x for x in range(a,b) if isarmstrong(x)]print(result)def run_multithreads(*args):'''采用多線程處理尋找水仙花樹(shù)的任務(wù),args傳入的是多個(gè)查找范圍'''threads = [Thread(target=findarmstrong,args=(a,b)) for a,b in args]for thread in threads:thread.start()for thread in threads:thread.join()def run_multiprocess(*args):'''采用多線程處理尋找水仙花樹(shù)的任務(wù),args傳入的是多個(gè)查找范圍'''proceses = [Process(target=findarmstrong,args=(a,b)) for a,b in args]for process in proceses:process.start()for process in proceses:process.join()if __name__ == '__main__':import timestart = time.time()# run_multiprocess((200000,300000),(300000,400000))run_multithreads((200000,300000),(300000,400000))end = time.time()print(end-start)多進(jìn)程明顯比多線程快
總結(jié)
以上是生活随笔為你收集整理的python3练习题:并发编程(21-25)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: python3练习题:11-20
- 下一篇: 每天5分钟玩转python3算法:二分查