日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python3练习题:并发编程(21-25)

發布時間:2025/10/17 python 10 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python3练习题:并发编程(21-25) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

關于python3中的并發編程,經過這些天的學習,歸納如下:

#practice21:多線程

  • 線程的定義
    • 方法一:直接Thread()構造
    • 方法二:構造Thread的子類
    #多線程的使用 from urllib.request import urlretrieve import csv from xml.etree.ElementTree import ElementTree,Element from threading import Threaddef download(sid,filename):'''下載csv文件'''url = 'http://quotes.money.163.com/service/chddata.html?code=%s&start=20150104&end=20160108' % str(sid)response = urlretrieve(url,filename)def convert(filename):'''csv文件轉換為xml文件'''with open(filename,'rt',encoding='GB2312')as rf:if rf:reader = csv.reader(rf)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('%s.xml' % filename,encoding='utf-8')def handle(sid):print("downloading %s :" % str(sid))download(sid,'demo%s.csv' % str(sid))print("converting %s :" % str(sid))convert('demo%s.csv' % str(sid))#方法一 threads = [] for i in range(1000001,1000010):#注意,args不能是(i),因為必須是元組t = Thread(target=handle,args=(i,))threads.append(t)t.start()#線程等待 for t in threads:t.join()print("main thread")#方法二 class Mythread(Thread):def __init__(self,sid):Thread.__init__(self)self.sid = siddef run(self):handle(self.sid)print('*'*20) threads = [] for i in range(1000001,1000010):t = Mythread(i)threads.append(t)t.start()#線程等待 for t in threads:t.join()print("main thread")

    執行結果:

    downloading 1000001 : downloading 1000002 : downloading 1000003 : downloading 1000004 : downloading 1000005 : downloading 1000006 : downloading 1000007 : downloading 1000008 : downloading 1000009 : converting 1000003 : converting 1000006 : converting 1000004 : converting 1000009 : converting 1000001 : converting 1000005 : converting 1000008 : converting 1000002 : converting 1000007 : main thread ******************** downloading 1000001 : downloading 1000002 : downloading 1000003 : downloading 1000004 : downloading 1000005 : downloading 1000006 : downloading 1000007 : downloading 1000008 : downloading 1000009 : converting 1000003 : converting 1000002 : converting 1000005 : converting 1000004 : converting 1000001 : converting 1000009 : converting 1000008 : converting 1000006 : converting 1000007 : main thread [Finished in 0.9s]

    #practice22:線程間通信

    • .Queue,該隊列是線程安全的;
    • 一個進程內的多個線程共用地址空間,這是線程間通信的基本依據;
    • 本例采用生產者/消費者模型,有多個生產者和一個消費者,每個生產者占用一個線程
    • 消費者只有一個,故必須使用循環來處理生產者生產的數據
    from urllib.request import urlretrieve import csv from xml.etree.ElementTree import ElementTree,Element from threading import Thread from queue import Queueclass DownloadThread(Thread):'''下載線程'''def __init__(self,sid,queue):Thread.__init__(self)self.sid = sidself.filename = 'demo{}'.format(str(sid))self.queue = queuedef download(self,sid,filename):'''下載csv文件'''url = 'http://quotes.money.163.com/service/chddata.html?code=%s&start=20150104&end=20160108' % str(sid)response = urlretrieve(url,filename)def run(self):print("downloading %s :" % str(self.sid))self.download(self.sid,self.filename)self.queue.put(self.filename)class ConvertThread(Thread):'''轉換現場'''def __init__(self,queue):Thread.__init__(self)self.queue = queuedef convert(self,filename):'''csv文件轉換為xml文件'''with open(filename,'rt',encoding='GB2312')as rf:if rf:reader = csv.reader(rf)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('%s.xml' % filename,encoding='utf-8')def run(self):while True:filename = self.queue.get()if filename == False:breakprint("converting %s :" % str(filename))self.convert(filename)if __name__ == '__main__':#線程使用隊列通信q = Queue()threads = []#創建并開啟全部線程,包括9個下載線程和一個轉換線程for i in range(1000001,1000010):t = DownloadThread(i,q)threads.append(t)t.start()ct = ConvertThread(q)ct.start()#等待下載線程完畢,通知轉換線程結束for i in threads:i.join()q.put(False)

    程序優化:(三處)
    1. StringIO的使用替代文件
    2. sid的構造
    3. 列表推導式構造線程列表

    from urllib.request import urlretrieve import csv from xml.etree.ElementTree import ElementTree,Element from threading import Thread from queue import Queue from io import StringIO import requestsclass DownloadThread(Thread):'''下載線程'''def __init__(self,sid,queue):Thread.__init__(self)self.sid = sidself.filename = 'demo{}'.format(str(sid))self.queue = queuedef download(self,sid):'''下載csv文件'''#變化3:使用rjust來調整字符串,使得sid只輸入一到兩位即可url = 'http://quotes.money.163.com/service/chddata.html?code=1%s&start=20150104&end=20160108' % str(sid).rjust(6,'0')response = requests.get(url)#變化1:用類文件對象(內存對象)來存儲csv字符串數據,而非文件self.data = StringIO(response.text)def run(self):print("downloading %s :" % str(self.sid))self.download(self.sid)self.queue.put((self.sid,self.data))class ConvertThread(Thread):'''轉換現場'''def __init__(self,queue):Thread.__init__(self)self.queue = queuedef convert(self,sid,data):'''csv文件轉換為xml文件'''#變化1:csv模塊可直接使用stringio對象來獲取readerif data:reader = csv.reader(data)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('1%s.xml' % str(sid).rjust(6,'0'),encoding='utf-8')def run(self):while True:sid,data = self.queue.get()if data == False:breakprint("converting %s :" % str(sid))self.convert(sid,data)if __name__ == '__main__':q = Queue()#變化2:使用列表推導式代替for循環,簡化代碼threads = [DownloadThread(i,q) for i in range(1,10)]for thread in threads:thread.start()ct = ConvertThread(q)ct.start()for i in threads:i.join()q.put((100,False))

    感覺還不是很熟練,來個實例:程序設計要求如下:

    1、調用OTCBTC的API,獲取所有買家、賣家出價數據

    2、涉及的幣種有:EOS、ETH、BCH、NEO

    3、將獲取到的json數據轉換成xml格式并保存

    4、要求使用多線程

    from threading import Thread import requests from xml.etree.ElementTree import ElementTree,Element from queue import Queueclass DownloadThread(Thread):'''下載當前某種貨幣的賣單與買單'''def __init__(self,coin_id,queue):Thread.__init__(self)self.coin_id = coin_idself.queue = queueself.url = "https://bb.otcbtc.com/api/v2/depth?market=%s&limit=1000"self.url %= coin_iddef download(self,url):'''下載json數據,存儲為data'''response = requests.get(url)return response.json()def run(self):print('downloading %s' % self.coin_id)data = self.download(self.url)self.queue.put((self.coin_id,data))class ConvertThread(Thread):'''把請求響應轉化為xml文件'''def __init__(self,queue):Thread.__init__(self)self.queue = queuedef setchildtree(self,superelement_tag,spec_dict):'''構建asks tree或者bids tree. superelement_tag是子樹的根節點名,spec_dict是整個json字符串轉換后的python字典'''e =Element(superelement_tag)for list_item in spec_dict[superelement_tag]:e1 = Element('item')e.append(e1)e2_price = Element('price')e2_price.text = list_item[0]e1.append(e2_price)e2_volumn = Element('volumn')e2_volumn.text = list_item[1]e1.append(e2_volumn)return edef convert(self,coin_id,spec_dict):'''將請求響應body的字典轉換為xml文件'''root = Element('data')e_timestamp = Element('timestamp')#必須在xml中把數字變成字符串!否則報錯:TypeError: cannot serialize 1530197213 (type int),序列化錯誤!e_timestamp.text = str(spec_dict['timestamp'])root.append(e_timestamp)asks_childtree = self.setchildtree('asks',spec_dict)root.append(asks_childtree)bids_childtree = self.setchildtree('bids',spec_dict)root.append(bids_childtree)et = ElementTree(root)et.write('%s.xml' % coin_id)def run(self):while True:#獲取隊列中已經下載好的數據coin_id,data = self.queue.get()#判斷隊列是否已經收到哨符!if data == False:breakprint('converting %s' % coin_id)self.convert(coin_id,data)if __name__ == '__main__':queue = Queue()markets = ['eosbtc','ethbtc','bchbtc','neobtc']threads = [DownloadThread(market,queue) for market in markets]for thread in threads:thread.start()ct = ConvertThread(queue)ct.start()#等待所有下載線程完畢for thread in threads:thread.join()#添加終止convert線程的哨符queue.put(('xxx',False))

    #practice23:線程間事件通知

    1、 Event的使用
    + Event.wait與Event.set

    from threading import Event,Thread def f(e):print('hello')e.wait()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start()

    可以看出,e.wait方法相當于阻塞函數,阻塞程序繼續執行,直到等到觸發信號e.set()

    從運行框可以看出,程序并未執行完。

    from threading import Event,Thread def f(e):print('hello')e.wait()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start() e.set()

    由于e.set(),線程被觸發繼續執行,程序最后運行完退出。

    • Event.clear()

    Event對象調用一對wait/set方法后就不能再次調用這對方法了,若想再次調用,必須先對Event對象調用clear方法!

    from threading import Event,Thread def f(e):while True:print('hello')e.wait()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start() e.set()

    由于e.set()使得線程內的阻塞函數e.wait()失效,故循環無限往復

    from threading import Event,Thread import time def f(e):while True:print('hello')e.wait()e.clear()print('world')e = Event() t = Thread(target=f,args=(e,)) t.start() e.set() time.sleep(1) print('*'*40) e.set()

    主線程與子線程共同維護Event對象e

    e.start()啟動子線程,對應輸出hello,然后開始阻塞;主線程e.set()結束子線程的阻塞,e.clear()使得e.start()可以重新生效,輸出world與hello,然循環再次被e.wait()阻塞;

    等待一秒,e.set()使得阻塞再次被解除!

    2、 實例:

    要求:
    + 多線程下載股票csv數據(生產者)
    + 單線程轉換為xml文件(消費者)
    + 單線程打包xml文件(每當生成3個xml文件便打包為一個tar.gz包)

    import csv from xml.etree.ElementTree import ElementTree,Element from threading import Thread,Event from queue import Queue from io import StringIO import requests import os import tarfileclass DownloadThread(Thread):'''下載線程'''def __init__(self,sid,queue):Thread.__init__(self)self.sid = sidself.filename = 'demo{}'.format(str(sid))self.queue = queuedef download(self,sid):'''下載csv文件'''url = 'http://quotes.money.163.com/service/chddata.html?code=1%s&start=20150104&end=20160108' % str(sid).rjust(6,'0')response = requests.get(url)self.data = StringIO(response.text)def run(self):print("downloading %s :" % str(self.sid))self.download(self.sid)self.queue.put((self.sid,self.data))class ConvertThread(Thread):'''轉換線程'''def __init__(self,queue,cevent,tevent):'''轉換線程與打包線程共同維護兩個事件:轉換事件cevent與打包事件tevent'''Thread.__init__(self)self.queue = queueself.cevent = ceventself.tevent = tevent#生成xml文件的計數器self.count = 0def convert(self,sid,data):'''csv文件轉換為xml文件'''if data:reader = csv.reader(data)header = next(reader)root = Element('data')for row in reader:line = Element('row')root.append(line)for key,value in zip(header,row):e = Element(key)e.text = valueline.append(e)et = ElementTree(root)et.write('1%s.xml' % str(sid).rjust(6,'0'),encoding='utf-8')def run(self):while True:sid,data = self.queue.get()if data == False:#當終止哨符發出后,可能最后的xml文件不足3個,但也要打包#必須先設置終止信后,后開始打包global tarstoptarstop = Trueself.tevent.set()breakprint("converting %s :" % str(sid))self.convert(sid,data)#每轉換一個xml文件,計數器加1self.count += 1if self.count == 3:self.count = 0#通知打包線程開始打包self.tevent.set()#停止轉換,要想循環使用事件,clear()需要緊跟wait()#注意:必須先通知打包線程,再停止轉換,反過來不行self.cevent.wait()self.cevent.clear()class TarThread(Thread):'''打包線程'''def __init__(self,cevent,tevent):'''轉換線程與打包線程共同維護兩個事件:轉換事件cevent與打包事件tevent'''Thread.__init__(self)#tar包名稱的初始idself.count = 0self.cevent = ceventself.tevent = tevent#任何一個循環執行的線程必須要有出口,設置為守護線程,主線程結束后,該線程自動退出,可能未完成打包任務!經測試不可行!# self.setDaemon(True)def tar(self):'''尋找當前文件夾下xml文件,生成打包文件,同時將源文件刪除!'''self.count += 1filename = '%s.tar.gz' % str(self.count)with tarfile.open(filename,'w:gz') as tar:for file in os.listdir('.'):#注意函數名字:endswith不是endwithif file.endswith('.xml'):tar.add(file)os.remove(file)#如果當前文件夾下沒有xml文件,但執行上一步任然會生成tar包,需要把空的tar包刪除if not tar.members:os.remove(filename)def run(self):global tarstopwhile not tarstop and True:#阻塞等待打包命令,一旦阻塞被解除,執行完動作后應當立即調用clear(),使得下一次調用wait方法有效self.tevent.wait()self.tar()self.tevent.clear()#一旦打包完成,應當立即通知轉換線程繼續轉換self.cevent.set()if __name__ == '__main__':#定義線程安全隊列,用于下載與轉換線程間通信dcqueue = Queue()tarstop = False#定義轉換事件與打包事件cevent,tevent = Event(),Event()#定義下載、轉換、打包線程threads = [DownloadThread(i,dcqueue) for i in range(1,11)]ct = ConvertThread(dcqueue,cevent,tevent)tt = TarThread(cevent,tevent)#開啟所有線程for thread in threads:thread.start()ct.start()tt.start()#等待下載線程執行完畢,發出轉換線程的終止哨符for i in threads:i.join()dcqueue.put((100,False))

    不足之處:tar線程 最終的退出方式使用了全局變量,不太優雅;守護線程感覺又不滿足條件


    #practice24:線程池

    concurrent.futures 函數庫有一個 ThreadPoolExecutor 類,可以構建多線程(異步執行多個調用)。

    1、 多線程的使用方法

    from concurrent.futures import ThreadPoolExecutordef handle(a,b):print('hello world',str(a*b))return a*b #構建多線程對象:executor executor = ThreadPoolExecutor(max_workers=3) #調用submit方法,提交任務給線程池,默認一次submit使用一個線程 #線程執行結果由Future對象保存 future = executor.submit(handle,3,4) #調用result方法提取結果,如果線程未結束,則阻塞起來,直到有結果 result = future.result() print(result)#除了submit,還有更高效的提交任務方法map,返回迭代器,每次迭代返回函數的執行結果,不是future對象 #使用3個線程,依次執行handle(1,1) handle(2,2) handle(3,3) for result in executor.map(handle,[1,2,3],[1,2,3]):print(result)

    2、實例

    要求:
    1. 構建echo TCP服務器,響應客戶端的請求,即直接返回客戶端發來的數據。
    2. TCP服務器開啟10個線程異步處理客戶端請求。
    3. 構建echo客戶端,發送請求驗證多線程。

    • 服務端
    import socket from concurrent.futures import ThreadPoolExecutorHOST = 'localhost' PORT = 12345def handle_request(conn):with conn as subsock:while True:data = subsock.recv(1024)if not data:breaksubsock.sendall(data)def server(address):pool = ThreadPoolExecutor(10)ip,port = addresswith socket.socket() as s:s.bind(address)s.listen(5)while True:conn,address = s.accept()print('Client ' + ip + ":" + str(port) + ' connected')pool.submit(handle_request,conn)server(('',12345))
    • 客戶端
    import socketdef run_sockets(addr):with socket.socket() as s:s.connect(addr)s.sendall(b'hello world')data = s.recv(1024)print(data)for i in range(7):run_sockets(('localhost',12345))

    【運行結果】

    • 客戶端

    • 服務端

    先運行服務端代碼,作為服務器是無限循環,等待請求

    sendall,recv,accept無數據時都會阻塞

    必須先運行服務器代碼,再運行多個客戶端!


    #practice25:多進程

    1、 多進程的定義

    • 創建子進程
    from multiprocessing import Processdef f(a,b):print(a*b)p = Process(target=f,args=(1,5)) p.start() print('main process') p.join() print('main1 process')

    • 與線程的區別

      雖然進程與線程的很多方法相似,但最大的不同是,進程之間占用不同的地址空間。所以不能使用之前線程共用全局變量的方法進通信!


    2、 進程間通信
    • 使用multiprocessing.Queue
    from multiprocessing import Process,Queue,Pipe#進程安全Queue的基本使用 q = Queue()def f(q):print('hello')#當隊列內容為空,get操作會阻塞!,直到傳入dataprint(q.get())print('world')p = Process(target=f,args=(q,)) p.start() q.put('yes it is')

    • 使用multiprocessing.Pipe
    from multiprocessing import Process,Pipedef f(c):#無數據會阻塞在這里data = c.recv()print(data)c1,c2 = Pipe() p = Process(target=f,args=(c2,)) p.start() c1.send('hello world')

    3、 多進程使用場景:cpu密集型操作

    from threading import Thread from multiprocessing import Processdef isarmstrong(n):'''求n是不是水仙花數,返回bool結果(無須關注具體算法)'''a,t = [],nwhile t > 0:a.append(t % 10)t /= 10k = len(a)return sum(x * k for x in a) == ndef findarmstrong(a,b):'''在a-b間尋找水仙花樹'''result = [x for x in range(a,b) if isarmstrong(x)]print(result)def run_multithreads(*args):'''采用多線程處理尋找水仙花樹的任務,args傳入的是多個查找范圍'''threads = [Thread(target=findarmstrong,args=(a,b)) for a,b in args]for thread in threads:thread.start()for thread in threads:thread.join()def run_multiprocess(*args):'''采用多線程處理尋找水仙花樹的任務,args傳入的是多個查找范圍'''proceses = [Process(target=findarmstrong,args=(a,b)) for a,b in args]for process in proceses:process.start()for process in proceses:process.join()if __name__ == '__main__':import timestart = time.time()# run_multiprocess((200000,300000),(300000,400000))run_multithreads((200000,300000),(300000,400000))end = time.time()print(end-start)

    多進程明顯比多線程快

    總結

    以上是生活随笔為你收集整理的python3练习题:并发编程(21-25)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。