python异步io 队列_python 学习笔记九 队列,异步IO
queue (隊列)
隊列是為線程安全使用的。
1.先入先出
import queue
#測試定義類傳入隊列class Foo(object):
def __init__(self,n):
self.n=nnew = queue.Queue(maxsize=3)new.put(1)new.put(Foo(1),timeout=2) # 超時時間后,拋出隊列full異常new.put([1, 2, 3],timeout=2)
print(new.full()) #判斷隊列是否滿 True
#new.put("abc",timeout=1) #隊列已滿,再放報錯
print(new.qsize()) # 查看當前隊列長度
print(new.get())
print(new.get())
print(new.get())
print(new.empty()) #判斷隊列是否為空 True
#print(new.get_nowait()) #隊列已空,取不到數據報異常
2.后進先出
q =queue.LifoQueue() #指定使用LifoQueue
q.put(3)
q.put(2)
print(q.get_nowait())
print(q.get_nowait())
“""
2
3
"""
3.優先級隊列
存入一個元組,第一個為優先級,第二個為數據,第三個默認超時時間
import queuenew = queue.PriorityQueue(maxsize=3)new.put((1,[1,2,3]))new.put((10,"strings"))new.put((20,"strings"))
print(new.get_nowait())
print(new.get_nowait())
print(new.get_nowait())
“”“
(1, [1, 2, 3])
(10, 'strings')
(20, 'strings')
“”“
生成者消費者模型
通過queue.task_done 和 queue.join 實現
一對一
import threading, queue, time
#生產者消費者模型為了程序松耦合,
#生產者生產消息
def consumer(n):whileTrue:
print("\033[32;1m consumer [%s] \033[0m get task: %s" % (n, q.get()))
time.sleep(1) # 每秒吃一個
q.task_done() # q.get()1次通知隊列減少1
#消費者消費消息
def producter(n):
count= 1
whileTrue:
print("producter [%s] produced a new task : %s" %(n, count))
q.put(count)
count+= 1q.join() #消息阻塞 隊列為空,重新觸發
print("all task has been cosumed by consumers ...")
q = queue.Queue()
c1 = threading.Thread(target=consumer, args=[1, ])p1 = threading.Thread(target=producter, args=["p1", ])
c1.start()
p1.start()#result:
producter [p1] produced a new task : 1 #生產一個消息consumer [1] get task: 1 #消費一個消息,q.task_done() 通知隊列減少1個消息all task has been cosumed by consumers ... #q.join() 收到隊列為空,開始生產消息
producter [p1] produced anew task : 2consumer [1] get task: 2all task has been cosumed by consumers ...
producter [p1] produced anew task : 3consumer [1] get task: 3all task has been cosumed by consumers ...
producter [p1] produced anew task : 4consumer [1] get task: 4all task has been cosumed by consumers ...
producter [p1] produced anew task : 5consumer [1] get task: 5
一對多
def consumer(n):
while True:
print("\033[32;1m consumer [%s] \033[0m get task: %s" % (n, q.get()))
time.sleep(1) # 每秒吃一個
q.task_done() # get()1次通知隊列減少1
def producter(n):
count = 1
while True:
print("producter [%s] produced a new task : %s" % (n, count))
q.put(count)
count += 1
q.join() #消息阻塞 隊列為空重新觸發print("all task has been cosumed by consumers ...")q = queue.Queue()
c1 = threading.Thread(target=consumer, args=[1, ])
c2 = threading.Thread(target=consumer, args=[2, ])
c3 = threading.Thread(target=consumer, args=[3, ])p1 = threading.Thread(target=producter, args=["p1", ])
c1.start()
c2.start()
c3.start()
p1.start()
result:
producter [p1] produced a new task : 1
consumer [1]? get task: 1
all task has been cosumed by consumers ...
producter [p1] produced a new task : 2
consumer [2]? get task: 2
all task has been cosumed by consumers ...
producter [p1] produced a new task : 3
consumer [3]? get task: 3
all task has been cosumed by consumers ...
producter [p1] produced a new task : 4
consumer [1]? get task: 4
all task has been cosumed by consumers ...
producter [p1] produced a new task : 5
consumer [2]? get task: 5
all task has been cosumed by consumers ...
producter [p1] produced a new task : 6
consumer [3]? get task: 6
all task has been cosumed by consumers ...
producter [p1] produced a new task : 7
consumer [1]? get task: 7
all task has been cosumed by consumers ...
producter [p1] produced a new task : 8
consumer [2]? get task: 8
all task has been cosumed by consumers ...
producter [p1] produced a new task : 9
consumer [3]? get task: 9
all task has been cosumed by consumers ...
producter [p1] produced a new task : 10
consumer [1]? get task: 10
多對多
def consumer(n):whileTrue:
print("\033[32;1m consumer [%s] \033[0m get task: %s" % (n, q.get()))
time.sleep(1) # 每秒吃一個
q.task_done() #get()1次通知隊列減少1
def producter(n):
count= 1
whileTrue:
print("producter [%s] produced a new task : %s" %(n, count))
q.put(count)
count+= 1q.join() #消息阻塞 隊列為空重新觸發
print("all task has been cosumed by consumers ...")
q=queue.Queue()
c1= threading.Thread(target=consumer, args=[1, ])
c2= threading.Thread(target=consumer, args=[2, ])
c3= threading.Thread(target=consumer, args=[3, ])
p1= threading.Thread(target=producter, args=["p1", ])
p2= threading.Thread(target=producter, args=["p2", ])c1.start()
c2.start()
c3.start()
p1.start()
p2.start()
result:
producter [p1] produced a new task : 1
producter [p2] produced a new task : 1
consumer [1]? get task: 1
consumer [2]? get task: 1
all task has been cosumed by consumers ...
all task has been cosumed by consumers ...
producter [p1] produced a new task : 2
producter [p2] produced a new task : 2
consumer [3]? get task: 2
consumer [2]? get task: 2
all task has been cosumed by consumers ...
producter [p1] produced a new task : 3
consumer [1]? get task: 3
all task has been cosumed by consumers ...
producter [p2] produced a new task : 3
consumer [2]? get task: 3
all task has been cosumed by consumers ...
producter [p1] produced a new task : 4
consumer [3]? get task: 4
all task has been cosumed by consumers ...
all task has been cosumed by consumers ...
producter [p1] produced a new task : 5
producter [p2] produced a new task : 4
consumer [1]? get task: 5
consumer [2]? get task: 4
all task has been cosumed by consumers ...
all task has been cosumed by consumers ...
協程
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程。
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當于進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
協程的好處:
無需線程上下文切換的開銷
無需原子操作鎖定及同步的開銷
方便切換控制流,簡化編程模型
高并發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用于高并發處理。
缺點:
無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序(可以通過生產者消費者模型解決)
使用yield模擬協程
#使用yield實現在單線程的情況下實現并發運算的效果
import time
def consumer(name):
print("%s 準備吃包子!"%name)whileTrue:
baozi= yield#yield接收返回值
print("包子[%s]來了,被[%s]吃了!" %(baozi,name)
def producer(name):
c.__next__()
d.__next__()
print("開始生產包子!")for i in range(5):
time.sleep(1)
print("做了2個包子!")
c.send(i) #發送給yield
d.send(i)if __name__ == '__main__':
c= consumer("c1")
d= consumer("c2")
p= producer()
result:
c1 準備吃包子! #實例化消費者,遇到yield函數凍結接收yield返回值
c2 準備吃包子!
開始生產包子!
做了2個包子!
包子[0]來了,被[c1]吃了! #函數繼續執行
包子[0]來了,被[c2]吃了!
做了2個包子!
包子[1]來了,被[c1]吃了!
包子[1]來了,被[c2]吃了!
做了2個包子!
包子[2]來了,被[c1]吃了!
包子[2]來了,被[c2]吃了!
做了2個包子!
包子[3]來了,被[c1]吃了!
包子[3]來了,被[c2]吃了!
做了2個包子!
包子[4]來了,被[c1]吃了!
包子[4]來了,被[c2]吃了!
grentlet
fromgreenlet import greenlet
def test1():
print(12)
gr2.switch() #手動切換
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch()
print(78)
gr1=greenlet(test1) #加入協程
gr2=greenlet(test2)
gr1.switch()"""12
56
34
78
"""
Gevent
import gevent
def foo():
print('\033[32;1m Running in foo\033[0m')
gevent.sleep(1)#阻塞1秒
print('\033[32;1m Explicit context switch to foo again\033[0m')
def bar():
print('\033[31;1m Explicit context to bar\033[0m')
gevent.sleep(1)
print('\033[31;1m Implicit context switch back to bar\033[0m')
def boom():
print('\033[33;1m just boom \033[0m')
gevent.sleep(1)
print('\033[33;1m boom shakashaka \033[0m')
gevent.joinall(
[ #將foo加入協程,協程間切換不會按照順序而是隨機切換
gevent.spawn(foo),
gevent.spawn(bar),
gevent.spawn(boom)
]
)
result:
Running in foo
Explicit context to bar
just boom
Explicit context switch to foo again
boom shakashaka
Implicit context switch back to bar
"""
gevent實現遇到io阻塞自動切換
fromgevent import monkey; monkey.patch_all()
import geventfromurllib.request import urlopen
def f(url):
print('GET: %s' %url)
resp=urlopen(url)
data=resp.read()
print('%d bytes received from %s.' %(len(data), url))
gevent.joinall([
gevent.spawn(f,'https://www.python.org/'),
gevent.spawn(f,'https://www.yahoo.com/'),
gevent.spawn(f,'https://github.com/'),
])
result:
"""
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
46958 bytes received from https://www.python.org/.
24121 bytes received from https://github.com/.
413706 bytes received from https://www.yahoo.com/.
"""
gevent 實現多線程socketserver
import sys,time,gevent,socketfromgevent import socket, monkey
#將所有遇到的阻塞變為非阻塞
monkey.patch_all()
def server(port):
s=socket.socket()
s.bind(("0.0.0.0", port))
s.listen(50)whileTrue:
cli, addr=s.accept()
#派生協程 執行handle_request函數 將客戶端socket連接傳參
gevent.spawn(handle_request, cli)
def handle_request(s):try:whileTrue:
data= s.recv(1024)
print("recv:", data.decode("utf8"))
s.send(data)ifnot data:
#如果客戶端斷開連接此處沒有效果,如果服務端斷開連接,通知服務端去掉該連接
s.shutdown(socket.SHUT_WR)
except Exceptionase:
print(e)finally:
s.close()if __name__ == "__main__":
server(8001)
import socket
HOST= 'localhost'# The remote host
PORT= 8001 # The same port asused by the server
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))whileTrue:
msg= bytes(input(">>:"),encoding="utf8")
s.sendall(msg)
data= s.recv(1024)
#print(data)
print('Received', repr(data))
s.close()
select、poll、epoll
select
select最早于1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,
使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。
select目前幾乎在所有的平臺上支持,其良好跨平臺支持也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。
select的一個缺點在于單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨著文件描述符數量的增大,其復制的開銷也線性增長。(解讀:在select/poll時代,服務器進程每次都把這100萬個連接告訴操作系統(從用戶態復制句柄數據結構到內核態),
讓操作系統內核去查詢這些套接字上是否有事件發生,輪詢完后,再將句柄數據復制到用戶態,讓服務器應用程序輪詢處理已發生的網絡事件,這一過程資源消耗較大,因此,select/poll一般只能處理幾千的并發連接。)
同時,由于網絡響應時間的延遲 使得大量TCP連接處于非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。
poll
poll在1986年誕生于System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制。
poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制于用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,
它的開銷隨著文件描述符數量的增加而線性增大。另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()
的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)。
epoll
直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。
epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,
這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。
epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表 就緒描述符數量的值,
你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了 這些文件描述符在系統調用時復制的開銷。
另一個本質的改進在于epoll采用基于事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描 述符進行掃描,而epoll事先通過epoll_ctl()
來注冊一個文件描述符,一旦基于某個文件描述符就緒時,內核會采用類似callback的回調 機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
select實現多線程socketserver實例:
#server端
importselectimportqueueimportsocketimportsys#生成服務端socket實例
server =socket.socket()#設置非阻塞 傳入bool類型
server.setblocking(0)#設置綁定的ip地址和端口
server_address = ('localhost', 10000)print(sys.stderr, 'starting up on %s port %s' %server_address)
server.bind(server_address)#監聽客戶端最大連接數
server.listen(5)#初始化讀取數據的監聽列表,最開始時希望從server這個套接字上讀取數據
inputs =[server, ]#初始化寫入數據的監聽列表,最開始并沒有客戶端連接進來,所以列表為空
outputs =[]#消息隊列用字典表示 鍵為客戶端socket對象,值為發送內容 可能有多個客戶端連接,發送多條信息,將消息先存入隊列而不是直接發送
message_queues ={}#inputs列表默認存放server 即服務端的socket對象用于等待客戶端接入
whileinputs:print(sys.stderr, '\nwaiting for the next event')#注:select能夠監控 f=open(),obj=socket(),sys.stdin,sys.stdout終端輸入輸出(所有帶fileno()方法的文件句柄)
#文件操作是python無法檢測的,windows也不支持終端輸入輸出的文件句柄(OSError:應用程序沒有調用 WSAStartup,或者 WSAStartup 失敗。)
readable, writeable, exceptional =select.select(inputs, outputs, inputs)#一旦客戶端連接,server的內容將改變,select檢測到server的變化,將其返回給readable
for s inreadable:#默認只有server,等待客戶端連接,但是有了client的socket對象后,等待的可能是客戶端發送的消息,這里需要判斷是socket還是消息
if s isserver:#創建客戶端socket連接 connection 服務端為客戶端生成的socket對象,client_address 客戶端地址
connection, client_address =s.accept()print(sys.stderr, 'new connection from', client_address)#客戶端socket設置非阻塞
connection.setblocking(0)#因為有讀操作發生,所以將此連接加入inputs
inputs.append(connection)#為每個連接創建一個queue隊列,數據并不是立即發送需要放入隊列,等待outputs隊列有數據才發送,同時確保每個連接接收到正確的數據。
message_queues[connection] =queue.Queue()#等待的將是客戶端發送的數據
else:#接收客戶端數據
data = s.recv(1024)ifdata:print(sys.stderr, 'received "%s" from %s' %(data, s.getpeername()))#將收到的數據放入隊列中
message_queues[s].put(data)if s not inoutputs:#將socket客戶端的連接加入outputs中,并且用來給客戶端返回數據。
outputs.append(s)else:#連接已經斷開
print(sys.stderr, 'closing', client_address, 'after reading no data')if s inoutputs:#因為連接已經斷開,無需再返回消息,這時候如果這個客戶端的連接對象還在outputs列表中,就把它刪掉。
outputs.remove(s)#連接已經斷開,在inputs中select也不用感知
inputs.remove(s)#關閉會話
s.close()#從字典中刪除服務端為客戶端建立連接的socket對象
delmessage_queues[s]#一旦有參數,將一直為客戶端返回數據
for s inwriteable:try:#讀取客戶端請求信息,采用非阻塞的方式get_nowait() 沒有讀取到數據拋出異常
next_msg =message_queues[s].get_nowait()except queue.Empty:#引發隊列空異常
print(sys.stderr, 'output queue for', s.getpeername(), 'is empty')#沒有讀取到數據,無需為客戶端返回消息,將其從outputs中刪除,否則 select將一直感知,并傳給writeable
outputs.remove(s)else:#沒有任何異常
print(sys.stderr, 'sending "%s" to %s' %(next_msg, s.getpeername()))#此處是服務端原樣返回接收到的信息
s.send(next_msg)#如果服務端或客戶端連接發生錯誤,exceptional將會有內容
for s inexceptional:print(sys.stderr, 'handling exceptional condition for', s.getpeername())#將客戶端連接刪除
inputs.remove(s)#如果還有數據未發完
if s inoutputs:#但是連接已經斷開,只好從outputs刪除
outputs.remove(s)#關閉會話
s.close()#刪除該客戶端連接隊列,無須在發送數據了。
del message_queues[s]
#client
importsocketimportsys#消息列表
messages = ['this is the message.','It will be sent','in parts.',
]#ip_port
server_address = ('localhost', 10000)#socket對象
socks =[socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET,socket.SOCK_STREAM),
]print(sys.stderr, 'connecting to %s port %s' %server_address)#發起連接
for s insocks:
s.connect(server_address)#發送消息
for message inmessages:for s insocks:print(sys.stderr, '%s: sending "%s"' %(s.getsockname(), message))#發送請求
s.send(bytes(message, "utf8"))for s insocks:try:#接收信息
data = s.recv(1024)print(sys.stderr, '%s: received "%s"' %(s.getsockname(), data))exceptException as e:print(e, 'closing socket', s.getsockname())#未收到回應,連接終止
s.close()
更多詳細內容:http://www.cnblogs.com/wupeiqi/articles/5040823.html
總結
以上是生活随笔為你收集整理的python异步io 队列_python 学习笔记九 队列,异步IO的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pokeplusIOS总是闪退怎么办?无
- 下一篇: 用python输入任意三条边长_如何用p