协程,IO模式
1、協(xié)程(別人的模塊,達(dá)到單線程并發(fā)效果)
程序的運(yùn)行狀態(tài):
阻塞:
IO阻塞
非阻塞:
運(yùn)行
就緒
單線程實(shí)現(xiàn)并發(fā):
在應(yīng)用程序里控制多個(gè)任務(wù)的切換+保存狀態(tài)
可以把IO減下來,但是不可能降到無
優(yōu)點(diǎn):
應(yīng)用程序級別速度要遠(yuǎn)遠(yuǎn)高于操作系統(tǒng)的切換
缺點(diǎn):
多個(gè)任務(wù)一旦有一個(gè)阻塞沒有切,整個(gè)縣城都堵塞在原地
該線程內(nèi)的其他的任務(wù)都不能執(zhí)行
?
一旦引入?yún)f(xié)程,就需要檢測單線程下素有的IO行為,
實(shí)現(xiàn)遇到IO就切換,少一個(gè)都不行,因?yàn)橐坏┮粋€(gè)任務(wù)阻塞,整個(gè)線程就 阻塞,其他的任務(wù)即便是可以計(jì)算,但是也無法運(yùn)行。
2、用協(xié)程的目的
想要在單線程下實(shí)現(xiàn)并發(fā)
并發(fā)指的是多個(gè)任務(wù)看起來是同時(shí)運(yùn)行的
并發(fā)=切換+保存狀態(tài)
線程在進(jìn)程內(nèi),進(jìn)程在操作系統(tǒng)內(nèi),一切由操作系統(tǒng)控制
?
自己設(shè)置python程序讓線程形成切換,不用操作系統(tǒng)控制。
協(xié)程只有單線程下遇到IO切換才會提成效率,操作系統(tǒng)相對與協(xié)程要更慢點(diǎn)
?
import timedef func1():for i in range(1000000):i+1 def func2():for i in range(1000000):i+1 start = time.time() func1() func2() stop=time.time() print(stop-start)# 基于yield并發(fā) import time def func1():while True:print("func1")yield def func2():g=func1()for i in range(1000000):print("func2")i+1time.sleep(3)next(g)start=time.time() func2() stop=time.time() print(stop-start) 串行執(zhí)行:?
from gevent import monkey,spawn;monkey.patch_all()#monkey.patch_all()補(bǔ)丁 import timedef eat(name):print("%s eat 1" %name)time.sleep(3)print("%s eat 2" %name) def play(name):print("%s play 1" %name)time.sleep(1)print("%s play 2" %name)start=time.time() g1=spawn(eat,"yf")#自動運(yùn)行任務(wù) 模塊 g2=spawn(play,"fxc")g1.join()#通過補(bǔ)丁讓join識別其他IO g2.join() print(time.time() - start) print(g1) print(g2)from gevent import monkey,spawn;monkey.patch_all() from threading import current_thread import timedef eat(name):print("%s eat 1" %current_thread().name)#都是一個(gè)線程,前面有dummy假標(biāo)明time.sleep(3)print("%s eat 2" %current_thread().name) def play(name):print("%s play 1" %current_thread().name)time.sleep(1)print("%s play 2" %current_thread().name)start=time.time() g1=spawn(eat,"yf") g2=spawn(play,"fxc")g1.join() g2.join() print(time.time() - start) print(g1) print(g2) 與yield類似可以直接計(jì)算的 gevent并發(fā)的套接字通信:
from gevent import spawn,monkey;monkey.patch_all() from socket import * from threading import Threaddef talk(conn):while True:try:data = conn.recv(1024)if len(data)==0:breakconn.send(data.upper())except ConnectionResetError:breakconn.close()def server(ip,port,backlog=5):server = socket(AF_INET,SOCK_STREAM)server.bind((ip,port))server.listen(backlog)print("starting..")while True:conn,addr=server.accept()spawn(talk,conn) if __name__ == '__main__':g=spawn(server,"127.0.0.1",8080)g.join() 服務(wù)器: from threading import Thread,current_thread from socket import *def task():client = socket(AF_INET,SOCK_STREAM)client.connect(("127.0.0.1",8080))while True:msg="%s say hello" %current_thread().nameclient.send(msg.encode("utf-8"))data=client.recv(1024)print(data.decode("utf-8"))if __name__ == '__main__':for i in range(500):t=Thread(target=task)t.start() 客戶端:網(wǎng)絡(luò)IO:
????recvfrom:#等待客戶鏈接請求
????????wait data:等待客戶端產(chǎn)生數(shù)據(jù)——》客戶端OS--》網(wǎng)絡(luò)--》服務(wù)端操作系統(tǒng)緩存
????????copy data:由本地操作系統(tǒng)緩存中的數(shù)據(jù)拷貝到應(yīng)用程序的內(nèi)存中
?
????send:
????????copy data
conn.recv(1024) ==>OS
from socket import * import timeserver = socket() server.bind(("127.0.0.1",8080)) server.listen(5) server.setblocking(False)#是否有鏈接過來 conn_l=[] while True:try:print("總連接數(shù)[%s]" %len(conn_l))conn,addr=server.accept()conn_l.append(conn)except BlockingIOError:del_l=[]for conn in conn_l:try:data=conn.recv(1024)if len(data)==0:del_l.append(conn)continueconn.send(data.upper())except BlockingIOError:passexcept ConnectionResetError:del_l.append(conn)for conn in del_l:conn_l.remove(conn) 服務(wù)端: from socket import * import osclient=socket(AF_INET,SOCK_STREAM) client.connect(("127.0.0.1",8080)) while True:msg="%s say hello" %os.getpid()client.send(msg.encode("utf-8"))data=client.recv(1024)print(data.dacoda("utf-8")) 客戶端:?
轉(zhuǎn)載于:https://www.cnblogs.com/yf18767106368/p/9325944.html
總結(jié)
- 上一篇: 唐宇迪学习笔记9:逻辑回归与梯度下降策略
- 下一篇: nuxt中必须要知道的一点 关于 nux