日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

Python异步非阻塞IO多路复用Select/Poll/Epoll使用

發(fā)布時(shí)間:2024/7/23 python 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python异步非阻塞IO多路复用Select/Poll/Epoll使用 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

來源:http://www.haiyun.me/archives/1056.html

有許多封裝好的異步非阻塞IO多路復(fù)用框架,底層在linux基于最新的epoll實(shí)現(xiàn),為了更好的使用,了解其底層原理還是有必要的。
下面記錄下分別基于Select/Poll/Epoll的echo server實(shí)現(xiàn)。
Python Select Server,可監(jiān)控事件數(shù)量有限制:

#!/usr/bin/python # -*- coding: utf-8 -*- import select import socket</span> import Queueserver = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setblocking(False) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1) server_address= ('192.168.1.5',8080) server.bind(server_address) server.listen(10)#select輪詢等待讀socket集合 inputs = [server] #select輪詢等待寫socket集合 outputs = [] message_queues = {} #select超時(shí)時(shí)間 timeout = 20while True:print "等待活動(dòng)連接......"readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)if not (readable or writable or exceptional) :print "select超時(shí)無活動(dòng)連接,重新select...... "continue; #循環(huán)可讀事件for s in readable :#如果是server監(jiān)聽的socketif s is server:#同意連接connection, client_address = s.accept()print "新連接: ", client_addressconnection.setblocking(0)#將連接加入到select可讀事件隊(duì)列inputs.append(connection)#新建連接為key的字典,寫回讀取到的消息message_queues[connection] = Queue.Queue()else:#不是本機(jī)監(jiān)聽就是客戶端發(fā)來的消息data = s.recv(1024)if data :print "收到數(shù)據(jù):" , data , "客戶端:",s.getpeername()message_queues[s].put(data)if s not in outputs:#將讀取到的socket加入到可寫事件隊(duì)列outputs.append(s)else:#空白消息,關(guān)閉連接print "關(guān)閉連接:", client_addressif s in outputs :outputs.remove(s)inputs.remove(s)s.close()del message_queues[s]for s in writable:try:msg = message_queues[s].get_nowait()except Queue.Empty:print "連接:" , s.getpeername() , '消息隊(duì)列為空'outputs.remove(s)else:print "發(fā)送數(shù)據(jù):" , msg , "到", s.getpeername()s.send(msg)for s in exceptional:print "異常連接:", s.getpeername()inputs.remove(s)if s in outputs:outputs.remove(s)s.close()del message_queues[s] Python Poll Server,Select升級版,無可監(jiān)控事件數(shù)量限制,還是要輪詢所有事件:

#!/usr/bin/python # -*- coding: utf-8 -*- import socket import select import Queueserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ("192.168.1.5", 8080) server.bind(server_address) server.listen(5) print "服務(wù)器啟動(dòng)成功,監(jiān)聽IP:" , server_address message_queues = {} #超時(shí),毫秒 timeout = 5000 #監(jiān)聽哪些事件 READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR) READ_WRITE = (READ_ONLY|select.POLLOUT) #新建輪詢事件對象 poller = select.poll() #注冊本機(jī)監(jiān)聽socket到等待可讀事件事件集合 poller.register(server,READ_ONLY) #文件描述符到socket映射 fd_to_socket = {server.fileno():server,} while True:print "等待活動(dòng)連接......"#輪詢注冊的事件集合events = poller.poll(timeout)if not events:print "poll超時(shí),無活動(dòng)連接,重新poll......"continueprint "有" , len(events), "個(gè)新事件,開始處理......"for fd ,flag in events:s = fd_to_socket[fd]#可讀事件if flag & (select.POLLIN | select.POLLPRI) :if s is server :#如果socket是監(jiān)聽的server代表有新連接connection , client_address = s.accept()print "新連接:" , client_addressconnection.setblocking(False)fd_to_socket[connection.fileno()] = connection#加入到等待讀事件集合poller.register(connection,READ_ONLY)message_queues[connection] = Queue.Queue()else :#接收客戶端發(fā)送的數(shù)據(jù)data = s.recv(1024)if data:print "收到數(shù)據(jù):" , data , "客戶端:" , s.getpeername()message_queues[s].put(data)#修改讀取到消息的連接到等待寫事件集合poller.modify(s,READ_WRITE)else :# Close the connectionprint " closing" , s.getpeername()# Stop listening for input on the connectionpoller.unregister(s)s.close()del message_queues[s]#連接關(guān)閉事件elif flag & select.POLLHUP :print " Closing ", s.getpeername() ,"(HUP)"poller.unregister(s)s.close()#可寫事件elif flag & select.POLLOUT :try:msg = message_queues[s].get_nowait()except Queue.Empty:print s.getpeername() , " queue empty"poller.modify(s,READ_ONLY)else :print "發(fā)送數(shù)據(jù):" , data , "客戶端:" , s.getpeername()s.send(msg)#異常事件elif flag & select.POLLERR:print " exception on" , s.getpeername()poller.unregister(s)s.close()del message_queues[s]</span>Python Epoll Server,基于回調(diào)的事件通知模式,輕松管理大量連接:
#!/usr/bin/python # -*- coding: utf-8 -*- import socket, select import Queueserversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ("192.168.1.5", 8080) serversocket.bind(server_address) serversocket.listen(1) print "服務(wù)器啟動(dòng)成功,監(jiān)聽IP:" , server_address serversocket.setblocking(0) timeout = 10 #新建epoll事件對象,后續(xù)要監(jiān)控的事件添加到其中 epoll = select.epoll() #添加服務(wù)器監(jiān)聽fd到等待讀事件集合 epoll.register(serversocket.fileno(), select.EPOLLIN) message_queues = {}fd_to_socket = {serversocket.fileno():serversocket,} while True:print "等待活動(dòng)連接......"#輪詢注冊的事件集合events = epoll.poll(timeout)if not events:print "epoll超時(shí)無活動(dòng)連接,重新輪詢......"continueprint "有" , len(events), "個(gè)新事件,開始處理......"for fd, event in events:socket = fd_to_socket[fd]#可讀事件if event & select.EPOLLIN:#如果活動(dòng)socket為服務(wù)器所監(jiān)聽,有新連接if socket == serversocket:connection, address = serversocket.accept()print "新連接:" , addressconnection.setblocking(0)#注冊新連接fd到待讀事件集合epoll.register(connection.fileno(), select.EPOLLIN)fd_to_socket[connection.fileno()] = connectionmessage_queues[connection] = Queue.Queue()#否則為客戶端發(fā)送的數(shù)據(jù)else:data = socket.recv(1024)if data:print "收到數(shù)據(jù):" , data , "客戶端:" , socket.getpeername()message_queues[socket].put(data)#修改讀取到消息的連接到等待寫事件集合epoll.modify(fd, select.EPOLLOUT)#可寫事件elif event & select.EPOLLOUT:try:msg = message_queues[socket].get_nowait()except Queue.Empty:print socket.getpeername() , " queue empty"epoll.modify(fd, select.EPOLLIN)else :print "發(fā)送數(shù)據(jù):" , data , "客戶端:" , socket.getpeername()socket.send(msg)#關(guān)閉事件elif event & select.EPOLLHUP:epoll.unregister(fd)fd_to_socket[fd].close()del fd_to_socket[fd] epoll.unregister(serversocket.fileno()) epoll.close() serversocket.close()

總結(jié)

以上是生活随笔為你收集整理的Python异步非阻塞IO多路复用Select/Poll/Epoll使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。