gj12-2 协程和异步io
生活随笔
收集整理的這篇文章主要介紹了
gj12-2 协程和异步io
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
12.3 epoll+回調(diào)+事件循環(huán)方式url
import socket from urllib.parse import urlparse# 使用非阻塞io完成http請(qǐng)求def get_url(url):# 通過socket請(qǐng)求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = "/"# 建立socket連接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)client.setblocking(False)try:client.connect((host, 80)) # 阻塞不會(huì)消耗cpuexcept BlockingIOError as e:print(e)while True: # 不停的詢問連接是否建立好, 需要while循環(huán)不停的去檢查狀態(tài)try: # 嘗試不停發(fā)client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))breakexcept OSError as e:passdata = b""while True:try:d = client.recv(1024)except BlockingIOError as e:continueif d:data += delse:breakdata = data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)client.close()if __name__ == "__main__":get_url(http://www.baidu.com) 通過非阻塞io實(shí)現(xiàn)http請(qǐng)求select + 回調(diào) + 事件循環(huán)
并發(fā)性高, 使用單線程
import socket from urllib.parse import urlparse from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE# 自動(dòng)選擇IO復(fù)用的方法 selector = DefaultSelector() # 使用select完成http請(qǐng)求 urls = [] # 存放爬取的url stop = Falseclass Fetcher:def get_url(self, url):self.spider_url = urlurl = urlparse(url)self.host = url.netlocself.path = url.pathself.data = b""if self.path == "":self.path = "/"# 建立socket連接self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.client.setblocking(False)try:self.client.connect((self.host, 80)) # 阻塞不會(huì)消耗cpuexcept BlockingIOError as e:pass# 注冊(cè)selector.register(self.client.fileno(), EVENT_WRITE, self.connected)# 建立成功后發(fā)送請(qǐng)求def connected(self, key):selector.unregister(key.fd) #self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))selector.register(self.client.fileno(), EVENT_READ, self.readable)# 讀取def readable(self, key):d = self.client.recv(1024) # 準(zhǔn)備好了就會(huì)循環(huán)調(diào)用if d:self.data += delse:selector.unregister(key.fd)data = self.data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)self.client.close()urls.remove(self.spider_url) # 從 urls 列表里面去掉完成的urlif not urls:global stopstop = Truedef loop():# 事件循環(huán),不停的請(qǐng)求socket的狀態(tài)并調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)# 1. select本身是不支持register模式# 2. socket狀態(tài)變化以后的回調(diào)是由程序員完成的while not stop:ready = selector.select()for key, mask in ready:call_back = key.datacall_back(key)# 回調(diào)+事件循環(huán)+select(poll\epoll)if __name__ == "__main__":fetcher = Fetcher()import timestart_time = time.time()for url in range(20):url = "http://www.baidu.com/".format(url)urls.append(url)fetcher = Fetcher()fetcher.get_url(url)loop()print(time.time() - start_time)12.4 回調(diào)之痛
將代碼邏輯拆分成了幾段,維護(hù)性不高
如果回調(diào)函數(shù)執(zhí)行不正常該如何?
如果回調(diào)里面還要嵌套回調(diào)怎么辦?要嵌套很多層怎么辦?
如果嵌套了多層,其中某個(gè)環(huán)節(jié)出錯(cuò)了會(huì)造成什么后果?
如果有個(gè)數(shù)據(jù)需要被每個(gè)回調(diào)都處理怎么辦?
怎么使用當(dāng)前函數(shù)中的局部變量?
1.可讀性差
2.共享狀態(tài)管理困難
3.異常處理困難
12.5 協(xié)程是什么
C10M問題和協(xié)程
如何利用8核心CPU,64G內(nèi)存,在10gbps的網(wǎng)絡(luò)上保持1000萬并發(fā)連接
1.回調(diào)模式編碼復(fù)雜度高
2.同步編程的并發(fā)性不高
3.多線程編程需要線程間同步,lock
1.采用同步的方式去編寫異步的代碼
2.使用單線程去切換任務(wù):
?? 1.線程是由操作系統(tǒng)切換的,單線程切換意味著我們需要程序員自己去調(diào)度任務(wù)
?? 2.不在需要鎖,并發(fā)性高,如果單線程內(nèi)切換函數(shù),性能遠(yuǎn)高于線程切換,并發(fā)性更高
12.6 生成器進(jìn)階-send、close和throw方法
def gen_func():# 1. 可以產(chǎn)出值, 2. 可以接收值(調(diào)用方傳遞進(jìn)來的值)html = yield "http://lewen.com"print("inner:",html)yield 2yield 3return "lewen"if __name__ == "__main__":gen = gen_func()# 1.啟動(dòng)生成器方式有兩種, next(), send# 在調(diào)用send發(fā)送非none值之前,我們必須啟動(dòng)一次生成器,# 方式有兩種1. gen.send(None), 2. next(gen)url = gen.send(None)# print(url) # http://lewen.com# url = next(gen)# download urlhtml = "lewen"# gen.send(html) # inner: lewenprint(gen.send(html)) # send方法可以傳遞值進(jìn)入生成器內(nèi)部,同時(shí)還可以重啟生成器執(zhí)行到下一個(gè)yield位置"""inner: lewen2"""# print(next(gen))# print(next(gen))# print(next(gen))# print(next(gen)) gen_send def gen_func():# 1. 可以產(chǎn)出值, 2. 可以接收值(調(diào)用方傳遞進(jìn)來的值)try:yield "http://lewen.com"except Exception:pass# yield "http://projectsedu.com"yield 2yield 3return "lewen"if __name__ == "__main__":gen = gen_func()print(next(gen))gen.close() # 關(guān)閉了生成器print(next(gen)) # StopIteration# ---- http://lewen.com --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-8-7930c3f86cda> in <module>16 print(next(gen))17 gen.close() # 關(guān)閉了生成器 ---> 18 print(next(gen)) # StopIterationStopIteration: ----def gen_func():# 1. 可以產(chǎn)出值, 2. 可以接收值(調(diào)用方傳遞進(jìn)來的值)try:yield "http://projectsedu.com"except GeneratorExit:pass# yield "http://projectsedu.com"yield 2yield 3return "lewen"if __name__ == "__main__":gen = gen_func()print(next(gen))gen.close() # 關(guān)閉了生成器print(next(gen)) # StopIteration# GeneratorExit是繼承自BaseException, 并沒有繼承 Exception# --- http://projectsedu.com --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-9-a1ac8a75795f> in <module>15 gen = gen_func()16 print(next(gen)) ---> 17 gen.close() # 關(guān)閉了生成器18 print(next(gen)) # StopIteration19RuntimeError: generator ignored GeneratorExit ---def gen_func():# 1. 可以產(chǎn)出值, 2. 可以接收值(調(diào)用方傳遞進(jìn)來的值) # try: # yield "http://projectsedu.com" # except Exception: # passyield "http://projectsedu.com"yield 2yield 3return "lewen"if __name__ == "__main__":gen = gen_func()print(next(gen))gen.close() # 關(guān)閉了生成器print("lewen")# --- http://projectsedu.com lewen gen_close def gen_func():#1. 可以產(chǎn)出值, 2. 可以接收值(調(diào)用方傳遞進(jìn)來的值)try:yield "http://projectsedu.com"except Exception as e:passyield 2yield 3return "bobby"if __name__ == "__main__":gen = gen_func()print(next(gen))gen.throw(Exception, "download error")print(next(gen)) # --- http://projectsedu.com 3gen.throw(Exception, "download error") # --- --------------------------------------------------------------------------- Exception Traceback (most recent call last) <ipython-input-10-08e213416358> in <module> ----> 1 gen.throw(Exception, "download error")<ipython-input-7-bc909182a9a4> in gen_func()6 pass7 yield 2 ----> 8 yield 39 return "bobby"10Exception: download error gen_throw12.7 生成器進(jìn)階-yield from
# python3.3新加了yield from語(yǔ)法 from itertools import chainmy_list = [1, 2, 3] my_dict = {"lewen1": "http://projectsedu.com","lewen2": "http://www.imooc.com", }for value in chain(my_list, my_dict, range(5, 10)):print(value)# """ 1 2 3 lewen1 lewen2 5 6 7 8 9def my_chain(*args, **kwargs):for my_iterable in args:for value in my_iterable:yield valuefor value in my_chain(my_list, my_dict, range(5, 10)):print(value) # --- 1 2 3 lewen1 lewen2 5 6 7 8 9def my_chain(*args, **kwargs):for my_iterable in args:yield from my_iterable for value in my_chain(my_list, my_dict, range(5, 10)):print(value) # --- 1 2 3 lewen1 lewen2 5 6 7 8 9 chain def g1(iterable):yield iterabledef g2(iterable):yield from iterablefor value in g1(range(10)):print(value) for value in g2(range(10)):print(value)# """""" range(0, 10) 0 1 2 3 4 5 6 7 8 9 yield from iterable def g1(gen):yield from gendef main():g = g1()g.send(None)# 1. main 調(diào)用方 g1(委托生成器) gen 子生成器 # 1. yield from會(huì)在調(diào)用方與子生成器之間建立一個(gè)雙向通道final_result = {} def middle(key):while True:final_result[key] = yield from sales_sum(key)print(key+"銷量統(tǒng)計(jì)完成!!.") def sales_sum(pro_name):total = 0nums = []while True:x = yieldprint(pro_name+"銷量: ", x)if not x:breaktotal += xnums.append(x)return total, numsdef main():data_sets = {"lewen牌面膜": [1200, 1500, 3000],"lewen牌手機(jī)": [28,55,98,108 ],"lewen牌大衣": [280,560,778,70],}for key, data_set in data_sets.items():print("start key:", key)m = middle(key)m.send(None) # 預(yù)激middle協(xié)程for value in data_set:m.send(value) # 給協(xié)程傳遞每一組的值m.send(None)print("final_result:", final_result)if __name__ == '__main__':main()# """""" start key: lewen牌面膜 lewen牌面膜銷量: 1200 lewen牌面膜銷量: 1500 lewen牌面膜銷量: 3000 lewen牌面膜銷量: None lewen牌面膜銷量統(tǒng)計(jì)完成!!. start key: lewen牌手機(jī) lewen牌手機(jī)銷量: 28 lewen牌手機(jī)銷量: 55 lewen牌手機(jī)銷量: 98 lewen牌手機(jī)銷量: 108 lewen牌手機(jī)銷量: None lewen牌手機(jī)銷量統(tǒng)計(jì)完成!!. start key: lewen牌大衣 lewen牌大衣銷量: 280 lewen牌大衣銷量: 560 lewen牌大衣銷量: 778 lewen牌大衣銷量: 70 lewen牌大衣銷量: None lewen牌大衣銷量統(tǒng)計(jì)完成!!. final_result: {'lewen牌面膜': (5700, [1200, 1500, 3000]), 'lewen牌手機(jī)': (289, [28, 55, 98, 108]), 'lewen牌大衣': (1688, [280, 560, 778, 70])}"""def sales_sum(pro_name):total = 0nums = []while True:x = yield # 接受值print(pro_name+"銷量: ", x)if not x:breaktotal += xnums.append(x)return total, numsif __name__ == "__main__":my_gen = sales_sum("bobby牌手機(jī)")my_gen.send(None)my_gen.send(1200)my_gen.send(1500)my_gen.send(3000)try:my_gen.send(None) # 如果將代碼合并到middle,出現(xiàn)異常就需要自己去捕捉,except StopIteration as e: # 用 yield from ,就不用去 try 捕捉result = e.valueprint(result)# """""" bobby牌手機(jī)銷量: 1200 bobby牌手機(jī)銷量: 1500 bobby牌手機(jī)銷量: 3000 bobby牌手機(jī)銷量: None (5700, [1200, 1500, 3000]) yield from 例子 #pep380#1. RESULT = yield from EXPR可以簡(jiǎn)化成下面這樣 #一些說明 """ _i:子生成器,同時(shí)也是一個(gè)迭代器 _y:子生成器生產(chǎn)的值 _r:yield from 表達(dá)式最終的值 _s:調(diào)用方通過send()發(fā)送的值 _e:異常對(duì)象"""_i = iter(EXPR) # EXPR是一個(gè)可迭代對(duì)象,_i其實(shí)是子生成器; try:_y = next(_i) # 預(yù)激子生成器,把產(chǎn)出的第一個(gè)值存在_y中; except StopIteration as _e:_r = _e.value # 如果拋出了`StopIteration`異常,那么就將異常對(duì)象的`value`屬性保存到_r,這是最簡(jiǎn)單的情況的返回值; else:while 1: # 嘗試執(zhí)行這個(gè)循環(huán),委托生成器會(huì)阻塞;_s = yield _y # 生產(chǎn)子生成器的值,等待調(diào)用方`send()`值,發(fā)送過來的值將保存在_s中;try:_y = _i.send(_s) # 轉(zhuǎn)發(fā)_s,并且嘗試向下執(zhí)行;except StopIteration as _e:_r = _e.value # 如果子生成器拋出異常,那么就獲取異常對(duì)象的`value`屬性存到_r,退出循環(huán),恢復(fù)委托生成器的運(yùn)行;break RESULT = _r # _r就是整個(gè)yield from表達(dá)式返回的值。""" 1. 子生成器可能只是一個(gè)迭代器,并不是一個(gè)作為協(xié)程的生成器,所以它不支持.throw()和.close()方法; 2. 如果子生成器支持.throw()和.close()方法,但是在子生成器內(nèi)部,這兩個(gè)方法都會(huì)拋出異常; 3. 調(diào)用方讓子生成器自己拋出異常 4. 當(dāng)調(diào)用方使用next()或者.send(None)時(shí),都要在子生成器上調(diào)用next()函數(shù),當(dāng)調(diào)用方使用.send()發(fā)送非 None 值時(shí),才調(diào)用子生成器的.send()方法; """ _i = iter(EXPR) try:_y = next(_i) except StopIteration as _e:_r = _e.value else:while 1:try:_s = yield _yexcept GeneratorExit as _e:try:_m = _i.closeexcept AttributeError:passelse:_m()raise _eexcept BaseException as _e:_x = sys.exc_info()try:_m = _i.throwexcept AttributeError:raise _eelse:try:_y = _m(*_x)except StopIteration as _e:_r = _e.valuebreakelse:try:if _s is None:_y = next(_i)else:_y = _i.send(_s)except StopIteration as _e:_r = _e.valuebreak RESULT = _r""" 看完代碼,我們總結(jié)一下關(guān)鍵點(diǎn):1. 子生成器生產(chǎn)的值,都是直接傳給調(diào)用方的;調(diào)用方通過.send()發(fā)送的值都是直接傳遞給子生成器的;如果發(fā)送的是 None,會(huì)調(diào)用子生成器的__next__()方法,如果不是 None,會(huì)調(diào)用子生成器的.send()方法; 2. 子生成器退出的時(shí)候,最后的return EXPR,會(huì)觸發(fā)一個(gè)StopIteration(EXPR)異常; 3. yield from表達(dá)式的值,是子生成器終止時(shí),傳遞給StopIteration異常的第一個(gè)參數(shù); 4. 如果調(diào)用的時(shí)候出現(xiàn)StopIteration異常,委托生成器會(huì)恢復(fù)運(yùn)行,同時(shí)其他的異常會(huì)向上 "冒泡"; 5. 傳入委托生成器的異常里,除了GeneratorExit之外,其他的所有異常全部傳遞給子生成器的.throw()方法;如果調(diào)用.throw()的時(shí)候出現(xiàn)了StopIteration異常,那么就恢復(fù)委托生成器的運(yùn)行,其他的異常全部向上 "冒泡"; 6. 如果在委托生成器上調(diào)用.close()或傳入GeneratorExit異常,會(huì)調(diào)用子生成器的.close()方法,沒有的話就不調(diào)用。如果在調(diào)用.close()的時(shí)候拋出了異常,那么就向上 "冒泡",否則的話委托生成器會(huì)拋出GeneratorExit異常。""" yield from 解析12.8 async和await
async def downloader(url):return "lewen" async def download_url(url):# dosomethingshtml = await downloader(url)return html if __name__ == "__main__":coro = download_url("http://www.imooc.com")# next(None) # 不能這樣調(diào)用coro.send(None)--------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-3-879770ebad5e> in <module>7 if __name__ == "__main__":8 coro = download_url("http://www.imooc.com") ----> 9 coro.send(None)10StopIteration: lewen 用yield 可以實(shí)現(xiàn) 生成器和協(xié)程,但容易混淆,就引入了await關(guān)鍵字import types@types.coroutine def downloader(url):yield "lewen"async def download_url(url):# dosomethingshtml = await downloader(url)return htmlif __name__ == "__main__":coro = download_url("http://www.imooc.com")# next(None) # 不能這樣調(diào)用coro.send(None)12-9 生成器實(shí)現(xiàn)協(xié)程
# 生成器是可以暫停的函數(shù) import inspectdef gen_func():yield 1# value = yield from# 第一返回值給調(diào)用方, 第二調(diào)用方通過send方式返回值給genreturn "lewen"# 1. 用同步的方式編寫異步的代碼, 在適當(dāng)?shù)臅r(shí)候暫停函數(shù)并在適當(dāng)?shù)臅r(shí)候啟動(dòng)函數(shù)if __name__ == "__main__":gen = gen_func()print(inspect.getgeneratorstate(gen))next(gen)print(inspect.getgeneratorstate(gen))try:next(gen)except StopIteration:passprint(inspect.getgeneratorstate(gen))"""GEN_CREATEDGEN_SUSPENDEDGEN_CLOSED""" gen 狀態(tài) import socketdef get_socket_data():yield "lewen"def downloader(url):client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)client.setblocking(False)try:client.connect((host, 80)) # 阻塞不會(huì)消耗cpuexcept BlockingIOError as e:passselector.register(self.client.fileno(), EVENT_WRITE, self.connected)source = yield from get_socket_data()data = source.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)def download_html(html):html = yield from downloader()if __name__ == "__main__":# 協(xié)程的調(diào)度依然是 事件循環(huán)+協(xié)程模式 ,協(xié)程是單線程模式pass yield 實(shí)現(xiàn)協(xié)程-
總結(jié)
以上是生活随笔為你收集整理的gj12-2 协程和异步io的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: gj12-1 协程和异步io
- 下一篇: gj13 asyncio并发编程