日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

python3练习题:并发编程(21-25)

發(fā)布時(shí)間:2025/10/17 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python3练习题:并发编程(21-25) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

關(guān)于python3中的并發(fā)編程,經(jīng)過(guò)這些天的學(xué)習(xí),歸納如下:

#practice21:多線程

  • 線程的定義
    • 方法一:直接Thread()構(gòu)造
    • 方法二:構(gòu)造Thread的子類
    #多線程的使用 from urllib.request import urlretrieve import csv from xml.etree.ElementTree import ElementTree,Element from threading import Threaddef download(sid,filename):'''下載csv文件'''url = 'http://quotes.money.163.com/service/chddata.html?code=%s&start=20150104&end=20160108' % str(sid)response = urlretrieve(url,filename)def convert(filename):'''csv文件轉(zhuǎn)換為xml文件'''with open(filename,'rt',encoding='GB2312')as rf:if rf:reader = csv.reader(rf)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('%s.xml' % filename,encoding='utf-8')def handle(sid):print("downloading %s :" % str(sid))download(sid,'demo%s.csv' % str(sid))print("converting %s :" % str(sid))convert('demo%s.csv' % str(sid))#方法一 threads = [] for i in range(1000001,1000010):#注意,args不能是(i),因?yàn)楸仨毷窃Mt = Thread(target=handle,args=(i,))threads.append(t)t.start()#線程等待 for t in threads:t.join()print("main thread")#方法二 class Mythread(Thread):def __init__(self,sid):Thread.__init__(self)self.sid = siddef run(self):handle(self.sid)print('*'*20) threads = [] for i in range(1000001,1000010):t = Mythread(i)threads.append(t)t.start()#線程等待 for t in threads:t.join()print("main thread")

    執(zhí)行結(jié)果:

    downloading 1000001 : downloading 1000002 : downloading 1000003 : downloading 1000004 : downloading 1000005 : downloading 1000006 : downloading 1000007 : downloading 1000008 : downloading 1000009 : converting 1000003 : converting 1000006 : converting 1000004 : converting 1000009 : converting 1000001 : converting 1000005 : converting 1000008 : converting 1000002 : converting 1000007 : main thread ******************** downloading 1000001 : downloading 1000002 : downloading 1000003 : downloading 1000004 : downloading 1000005 : downloading 1000006 : downloading 1000007 : downloading 1000008 : downloading 1000009 : converting 1000003 : converting 1000002 : converting 1000005 : converting 1000004 : converting 1000001 : converting 1000009 : converting 1000008 : converting 1000006 : converting 1000007 : main thread [Finished in 0.9s]

    #practice22:線程間通信

    • .Queue,該隊(duì)列是線程安全的;
    • 一個(gè)進(jìn)程內(nèi)的多個(gè)線程共用地址空間,這是線程間通信的基本依據(jù);
    • 本例采用生產(chǎn)者/消費(fèi)者模型,有多個(gè)生產(chǎn)者和一個(gè)消費(fèi)者,每個(gè)生產(chǎn)者占用一個(gè)線程
    • 消費(fèi)者只有一個(gè),故必須使用循環(huán)來(lái)處理生產(chǎn)者生產(chǎn)的數(shù)據(jù)
    from urllib.request import urlretrieve import csv from xml.etree.ElementTree import ElementTree,Element from threading import Thread from queue import Queueclass DownloadThread(Thread):'''下載線程'''def __init__(self,sid,queue):Thread.__init__(self)self.sid = sidself.filename = 'demo{}'.format(str(sid))self.queue = queuedef download(self,sid,filename):'''下載csv文件'''url = 'http://quotes.money.163.com/service/chddata.html?code=%s&start=20150104&end=20160108' % str(sid)response = urlretrieve(url,filename)def run(self):print("downloading %s :" % str(self.sid))self.download(self.sid,self.filename)self.queue.put(self.filename)class ConvertThread(Thread):'''轉(zhuǎn)換現(xiàn)場(chǎng)'''def __init__(self,queue):Thread.__init__(self)self.queue = queuedef convert(self,filename):'''csv文件轉(zhuǎn)換為xml文件'''with open(filename,'rt',encoding='GB2312')as rf:if rf:reader = csv.reader(rf)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('%s.xml' % filename,encoding='utf-8')def run(self):while True:filename = self.queue.get()if filename == False:breakprint("converting %s :" % str(filename))self.convert(filename)if __name__ == '__main__':#線程使用隊(duì)列通信q = Queue()threads = []#創(chuàng)建并開(kāi)啟全部線程,包括9個(gè)下載線程和一個(gè)轉(zhuǎn)換線程for i in range(1000001,1000010):t = DownloadThread(i,q)threads.append(t)t.start()ct = ConvertThread(q)ct.start()#等待下載線程完畢,通知轉(zhuǎn)換線程結(jié)束for i in threads:i.join()q.put(False)

    程序優(yōu)化:(三處)
    1. StringIO的使用替代文件
    2. sid的構(gòu)造
    3. 列表推導(dǎo)式構(gòu)造線程列表

    from urllib.request import urlretrieve import csv from xml.etree.ElementTree import ElementTree,Element from threading import Thread from queue import Queue from io import StringIO import requestsclass DownloadThread(Thread):'''下載線程'''def __init__(self,sid,queue):Thread.__init__(self)self.sid = sidself.filename = 'demo{}'.format(str(sid))self.queue = queuedef download(self,sid):'''下載csv文件'''#變化3:使用rjust來(lái)調(diào)整字符串,使得sid只輸入一到兩位即可url = 'http://quotes.money.163.com/service/chddata.html?code=1%s&start=20150104&end=20160108' % str(sid).rjust(6,'0')response = requests.get(url)#變化1:用類文件對(duì)象(內(nèi)存對(duì)象)來(lái)存儲(chǔ)csv字符串?dāng)?shù)據(jù),而非文件self.data = StringIO(response.text)def run(self):print("downloading %s :" % str(self.sid))self.download(self.sid)self.queue.put((self.sid,self.data))class ConvertThread(Thread):'''轉(zhuǎn)換現(xiàn)場(chǎng)'''def __init__(self,queue):Thread.__init__(self)self.queue = queuedef convert(self,sid,data):'''csv文件轉(zhuǎn)換為xml文件'''#變化1:csv模塊可直接使用stringio對(duì)象來(lái)獲取readerif data:reader = csv.reader(data)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('1%s.xml' % str(sid).rjust(6,'0'),encoding='utf-8')def run(self):while True:sid,data = self.queue.get()if data == False:breakprint("converting %s :" % str(sid))self.convert(sid,data)if __name__ == '__main__':q = Queue()#變化2:使用列表推導(dǎo)式代替for循環(huán),簡(jiǎn)化代碼threads = [DownloadThread(i,q) for i in range(1,10)]for thread in threads:thread.start()ct = ConvertThread(q)ct.start()for i in threads:i.join()q.put((100,False))

    感覺(jué)還不是很熟練,來(lái)個(gè)實(shí)例:程序設(shè)計(jì)要求如下:

    1、調(diào)用OTCBTC的API,獲取所有買家、賣家出價(jià)數(shù)據(jù)

    2、涉及的幣種有:EOS、ETH、BCH、NEO

    3、將獲取到的json數(shù)據(jù)轉(zhuǎn)換成xml格式并保存

    4、要求使用多線程

    from threading import Thread import requests from xml.etree.ElementTree import ElementTree,Element from queue import Queueclass DownloadThread(Thread):'''下載當(dāng)前某種貨幣的賣單與買單'''def __init__(self,coin_id,queue):Thread.__init__(self)self.coin_id = coin_idself.queue = queueself.url = "https://bb.otcbtc.com/api/v2/depth?market=%s&limit=1000"self.url %= coin_iddef download(self,url):'''下載json數(shù)據(jù),存儲(chǔ)為data'''response = requests.get(url)return response.json()def run(self):print('downloading %s' % self.coin_id)data = self.download(self.url)self.queue.put((self.coin_id,data))class ConvertThread(Thread):'''把請(qǐng)求響應(yīng)轉(zhuǎn)化為xml文件'''def __init__(self,queue):Thread.__init__(self)self.queue = queuedef setchildtree(self,superelement_tag,spec_dict):'''構(gòu)建asks tree或者bids tree. superelement_tag是子樹(shù)的根節(jié)點(diǎn)名,spec_dict是整個(gè)json字符串轉(zhuǎn)換后的python字典'''e =Element(superelement_tag)for list_item in spec_dict[superelement_tag]:e1 = Element('item')e.append(e1)e2_price = Element('price')e2_price.text = list_item[0]e1.append(e2_price)e2_volumn = Element('volumn')e2_volumn.text = list_item[1]e1.append(e2_volumn)return edef convert(self,coin_id,spec_dict):'''將請(qǐng)求響應(yīng)body的字典轉(zhuǎn)換為xml文件'''root = Element('data')e_timestamp = Element('timestamp')#必須在xml中把數(shù)字變成字符串!否則報(bào)錯(cuò):TypeError: cannot serialize 1530197213 (type int),序列化錯(cuò)誤!e_timestamp.text = str(spec_dict['timestamp'])root.append(e_timestamp)asks_childtree = self.setchildtree('asks',spec_dict)root.append(asks_childtree)bids_childtree = self.setchildtree('bids',spec_dict)root.append(bids_childtree)et = ElementTree(root)et.write('%s.xml' % coin_id)def run(self):while True:#獲取隊(duì)列中已經(jīng)下載好的數(shù)據(jù)coin_id,data = self.queue.get()#判斷隊(duì)列是否已經(jīng)收到哨符!if data == False:breakprint('converting %s' % coin_id)self.convert(coin_id,data)if __name__ == '__main__':queue = Queue()markets = ['eosbtc','ethbtc','bchbtc','neobtc']threads = [DownloadThread(market,queue) for market in markets]for thread in threads:thread.start()ct = ConvertThread(queue)ct.start()#等待所有下載線程完畢for thread in threads:thread.join()#添加終止convert線程的哨符queue.put(('xxx',False))

    #practice23:線程間事件通知

    1、 Event的使用
    + Event.wait與Event.set

    from threading import Event,Thread def f(e):print('hello')e.wait()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start()

    可以看出,e.wait方法相當(dāng)于阻塞函數(shù),阻塞程序繼續(xù)執(zhí)行,直到等到觸發(fā)信號(hào)e.set()

    從運(yùn)行框可以看出,程序并未執(zhí)行完。

    from threading import Event,Thread def f(e):print('hello')e.wait()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start() e.set()

    由于e.set(),線程被觸發(fā)繼續(xù)執(zhí)行,程序最后運(yùn)行完退出。

    • Event.clear()

    Event對(duì)象調(diào)用一對(duì)wait/set方法后就不能再次調(diào)用這對(duì)方法了,若想再次調(diào)用,必須先對(duì)Event對(duì)象調(diào)用clear方法!

    from threading import Event,Thread def f(e):while True:print('hello')e.wait()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start() e.set()

    由于e.set()使得線程內(nèi)的阻塞函數(shù)e.wait()失效,故循環(huán)無(wú)限往復(fù)

    from threading import Event,Thread import time def f(e):while True:print('hello')e.wait()e.clear()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start() e.set() time.sleep(1) print('*'*40) e.set()

    主線程與子線程共同維護(hù)Event對(duì)象e

    e.start()啟動(dòng)子線程,對(duì)應(yīng)輸出hello,然后開(kāi)始阻塞;主線程e.set()結(jié)束子線程的阻塞,e.clear()使得e.start()可以重新生效,輸出world與hello,然循環(huán)再次被e.wait()阻塞;

    等待一秒,e.set()使得阻塞再次被解除!

    2、 實(shí)例:

    要求:
    + 多線程下載股票csv數(shù)據(jù)(生產(chǎn)者)
    + 單線程轉(zhuǎn)換為xml文件(消費(fèi)者)
    + 單線程打包xml文件(每當(dāng)生成3個(gè)xml文件便打包為一個(gè)tar.gz包)

    import csv from xml.etree.ElementTree import ElementTree,Element from threading import Thread,Event from queue import Queue from io import StringIO import requests import os import tarfileclass DownloadThread(Thread):'''下載線程'''def __init__(self,sid,queue):Thread.__init__(self)self.sid = sidself.filename = 'demo{}'.format(str(sid))self.queue = queuedef download(self,sid):'''下載csv文件'''url = 'http://quotes.money.163.com/service/chddata.html?code=1%s&start=20150104&end=20160108' % str(sid).rjust(6,'0')response = requests.get(url)self.data = StringIO(response.text)def run(self):print("downloading %s :" % str(self.sid))self.download(self.sid)self.queue.put((self.sid,self.data))class ConvertThread(Thread):'''轉(zhuǎn)換線程'''def __init__(self,queue,cevent,tevent):'''轉(zhuǎn)換線程與打包線程共同維護(hù)兩個(gè)事件:轉(zhuǎn)換事件cevent與打包事件tevent'''Thread.__init__(self)self.queue = queueself.cevent = ceventself.tevent = tevent#生成xml文件的計(jì)數(shù)器self.count = 0def convert(self,sid,data):'''csv文件轉(zhuǎn)換為xml文件'''if data:reader = csv.reader(data)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('1%s.xml' % str(sid).rjust(6,'0'),encoding='utf-8')def run(self):while True:sid,data = self.queue.get()if data == False:#當(dāng)終止哨符發(fā)出后,可能最后的xml文件不足3個(gè),但也要打包#必須先設(shè)置終止信后,后開(kāi)始打包global tarstoptarstop = Trueself.tevent.set()breakprint("converting %s :" % str(sid))self.convert(sid,data)#每轉(zhuǎn)換一個(gè)xml文件,計(jì)數(shù)器加1self.count += 1if self.count == 3:self.count = 0#通知打包線程開(kāi)始打包self.tevent.set()#停止轉(zhuǎn)換,要想循環(huán)使用事件,clear()需要緊跟wait()#注意:必須先通知打包線程,再停止轉(zhuǎn)換,反過(guò)來(lái)不行self.cevent.wait()self.cevent.clear()class TarThread(Thread):'''打包線程'''def __init__(self,cevent,tevent):'''轉(zhuǎn)換線程與打包線程共同維護(hù)兩個(gè)事件:轉(zhuǎn)換事件cevent與打包事件tevent'''Thread.__init__(self)#tar包名稱的初始idself.count = 0self.cevent = ceventself.tevent = tevent#任何一個(gè)循環(huán)執(zhí)行的線程必須要有出口,設(shè)置為守護(hù)線程,主線程結(jié)束后,該線程自動(dòng)退出,可能未完成打包任務(wù)!經(jīng)測(cè)試不可行!# self.setDaemon(True)def tar(self):'''尋找當(dāng)前文件夾下xml文件,生成打包文件,同時(shí)將源文件刪除!'''self.count += 1filename = '%s.tar.gz' % str(self.count)with tarfile.open(filename,'w:gz') as tar:for file in os.listdir('.'):#注意函數(shù)名字:endswith不是endwithif file.endswith('.xml'):tar.add(file)os.remove(file)#如果當(dāng)前文件夾下沒(méi)有xml文件,但執(zhí)行上一步任然會(huì)生成tar包,需要把空的tar包刪除if not tar.members:os.remove(filename)def run(self):global tarstopwhile not tarstop and True:#阻塞等待打包命令,一旦阻塞被解除,執(zhí)行完動(dòng)作后應(yīng)當(dāng)立即調(diào)用clear(),使得下一次調(diào)用wait方法有效self.tevent.wait()self.tar()self.tevent.clear()#一旦打包完成,應(yīng)當(dāng)立即通知轉(zhuǎn)換線程繼續(xù)轉(zhuǎn)換self.cevent.set()if __name__ == '__main__':#定義線程安全隊(duì)列,用于下載與轉(zhuǎn)換線程間通信dcqueue = Queue()tarstop = False#定義轉(zhuǎn)換事件與打包事件cevent,tevent = Event(),Event()#定義下載、轉(zhuǎn)換、打包線程threads = [DownloadThread(i,dcqueue) for i in range(1,11)]ct = ConvertThread(dcqueue,cevent,tevent)tt = TarThread(cevent,tevent)#開(kāi)啟所有線程for thread in threads:thread.start()ct.start()tt.start()#等待下載線程執(zhí)行完畢,發(fā)出轉(zhuǎn)換線程的終止哨符for i in threads:i.join()dcqueue.put((100,False))

    不足之處:tar線程 最終的退出方式使用了全局變量,不太優(yōu)雅;守護(hù)線程感覺(jué)又不滿足條件


    #practice24:線程池

    concurrent.futures 函數(shù)庫(kù)有一個(gè) ThreadPoolExecutor 類,可以構(gòu)建多線程(異步執(zhí)行多個(gè)調(diào)用)。

    1、 多線程的使用方法

    from concurrent.futures import ThreadPoolExecutordef handle(a,b):print('hello world',str(a*b))return a*b #構(gòu)建多線程對(duì)象:executor executor = ThreadPoolExecutor(max_workers=3) #調(diào)用submit方法,提交任務(wù)給線程池,默認(rèn)一次submit使用一個(gè)線程 #線程執(zhí)行結(jié)果由Future對(duì)象保存 future = executor.submit(handle,3,4) #調(diào)用result方法提取結(jié)果,如果線程未結(jié)束,則阻塞起來(lái),直到有結(jié)果 result = future.result() print(result)#除了submit,還有更高效的提交任務(wù)方法map,返回迭代器,每次迭代返回函數(shù)的執(zhí)行結(jié)果,不是future對(duì)象 #使用3個(gè)線程,依次執(zhí)行handle(1,1) handle(2,2) handle(3,3) for result in executor.map(handle,[1,2,3],[1,2,3]):print(result)

    2、實(shí)例

    要求:
    1. 構(gòu)建echo TCP服務(wù)器,響應(yīng)客戶端的請(qǐng)求,即直接返回客戶端發(fā)來(lái)的數(shù)據(jù)。
    2. TCP服務(wù)器開(kāi)啟10個(gè)線程異步處理客戶端請(qǐng)求。
    3. 構(gòu)建echo客戶端,發(fā)送請(qǐng)求驗(yàn)證多線程。

    • 服務(wù)端
    import socket from concurrent.futures import ThreadPoolExecutorHOST = 'localhost' PORT = 12345def handle_request(conn):with conn as subsock:while True:data = subsock.recv(1024)if not data:breaksubsock.sendall(data)def server(address):pool = ThreadPoolExecutor(10)ip,port = addresswith socket.socket() as s:s.bind(address)s.listen(5)while True:conn,address = s.accept()print('Client ' + ip + ":" + str(port) + ' connected')pool.submit(handle_request,conn)server(('',12345))
    • 客戶端
    import socketdef run_sockets(addr):with socket.socket() as s:s.connect(addr)s.sendall(b'hello world')data = s.recv(1024)print(data)for i in range(7):run_sockets(('localhost',12345))

    【運(yùn)行結(jié)果】

    • 客戶端

    • 服務(wù)端

    先運(yùn)行服務(wù)端代碼,作為服務(wù)器是無(wú)限循環(huán),等待請(qǐng)求

    sendall,recv,accept無(wú)數(shù)據(jù)時(shí)都會(huì)阻塞

    必須先運(yùn)行服務(wù)器代碼,再運(yùn)行多個(gè)客戶端!


    #practice25:多進(jìn)程

    1、 多進(jìn)程的定義

    • 創(chuàng)建子進(jìn)程
    from multiprocessing import Processdef f(a,b):print(a*b)p = Process(target=f,args=(1,5)) p.start() print('main process') p.join() print('main1 process')

    • 與線程的區(qū)別

      雖然進(jìn)程與線程的很多方法相似,但最大的不同是,進(jìn)程之間占用不同的地址空間。所以不能使用之前線程共用全局變量的方法進(jìn)通信!


    2、 進(jìn)程間通信
    • 使用multiprocessing.Queue
    from multiprocessing import Process,Queue,Pipe#進(jìn)程安全Queue的基本使用 q = Queue()def f(q):print('hello')#當(dāng)隊(duì)列內(nèi)容為空,get操作會(huì)阻塞!,直到傳入dataprint(q.get())print('world')p = Process(target=f,args=(q,)) p.start() q.put('yes it is')

    • 使用multiprocessing.Pipe
    from multiprocessing import Process,Pipedef f(c):#無(wú)數(shù)據(jù)會(huì)阻塞在這里data = c.recv()print(data)c1,c2 = Pipe() p = Process(target=f,args=(c2,)) p.start() c1.send('hello world')

    3、 多進(jìn)程使用場(chǎng)景:cpu密集型操作

    from threading import Thread from multiprocessing import Processdef isarmstrong(n):'''求n是不是水仙花數(shù),返回bool結(jié)果(無(wú)須關(guān)注具體算法)'''a,t = [],nwhile t > 0:a.append(t % 10)t /= 10k = len(a)return sum(x * k for x in a) == ndef findarmstrong(a,b):'''在a-b間尋找水仙花樹(shù)'''result = [x for x in range(a,b) if isarmstrong(x)]print(result)def run_multithreads(*args):'''采用多線程處理尋找水仙花樹(shù)的任務(wù),args傳入的是多個(gè)查找范圍'''threads = [Thread(target=findarmstrong,args=(a,b)) for a,b in args]for thread in threads:thread.start()for thread in threads:thread.join()def run_multiprocess(*args):'''采用多線程處理尋找水仙花樹(shù)的任務(wù),args傳入的是多個(gè)查找范圍'''proceses = [Process(target=findarmstrong,args=(a,b)) for a,b in args]for process in proceses:process.start()for process in proceses:process.join()if __name__ == '__main__':import timestart = time.time()# run_multiprocess((200000,300000),(300000,400000))run_multithreads((200000,300000),(300000,400000))end = time.time()print(end-start)

    多進(jìn)程明顯比多線程快

    總結(jié)

    以上是生活随笔為你收集整理的python3练习题:并发编程(21-25)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。