Python中的select、epoll详解
Python中的select、epoll詳解
文章目錄
- Python中的select、epoll詳解
- 一、select
- 1、相關(guān)概念
- 2.select的特性
- 1.那么單進程是如何實現(xiàn)多并發(fā)的呢???
- 2.select的原理
- 3.select 優(yōu)點
- 4.select 缺點
- 5.python select
- 6.select 示例:
- 二、poll
- 1.相關(guān)概念
- 2.poll的原理
- 3.代碼
- 三、epoll
- 1.相關(guān)概念
- 2、epoll原理
- 3.優(yōu)缺點
- select
- poll
- 3.epoll
- 4. FD劇增后帶來的IO效率問題
- 5.消息傳遞方式
- 4.在Python中調(diào)用epoll
- 5.例
- 四、web靜態(tài)服務(wù)器-epool
一、select
1、相關(guān)概念
- select是通過一個select()系統(tǒng)調(diào)用來監(jiān)視多個文件描述符的數(shù)組(在linux中一切事物皆文件,塊設(shè)備,socket連接等)
- 當(dāng)select()返回后,該數(shù)組中就緒的文件描述符便會被內(nèi)核修改標(biāo)志位(變成ready)
- 使得進程可以獲得這些文件描述符從而進行后續(xù)的讀寫操作
- select會不斷監(jiān)視網(wǎng)絡(luò)接口的某個目錄下有多少文件描述符變成ready狀態(tài)。在網(wǎng)絡(luò)接口中,過來一個連接就會建立一個’文件’
- 變成ready狀態(tài)后,select就可以操作這個文件描述符了。
為什么要用一個進程實現(xiàn)多并發(fā)而不采用多線程實現(xiàn)多并發(fā)呢?
答:因為一個進程實現(xiàn)多并發(fā)比多線程是實現(xiàn)多并發(fā)的效率還要高,因為啟動多線程會有很多的開銷,而且CPU要不斷的檢查每個線程的狀態(tài),確定哪個線程是否可以執(zhí)行。這個對系統(tǒng)來說也是有壓力的,用單進程的話就可以避免這種開銷和給系統(tǒng)帶來的壓力,
2.select的特性
1.那么單進程是如何實現(xiàn)多并發(fā)的呢???
答:
- 很巧妙的使用了生產(chǎn)者和消費者的模式(異步)
- 生產(chǎn)者和消費者可以實現(xiàn)非阻塞,一個socketserver通過select接收多個連接過來(之前的socket一個進程只能接收一個連接,當(dāng)接收新的連接的時候產(chǎn)生阻塞,因為這個socket進程要先和客戶端進行通信,二者是彼此互相阻塞等待的
- 這個時候如果再來一個連接,要等之前的那個連接斷了,這個才可以連進來。也就是說用基本的socket實現(xiàn)多進程是阻塞的。
- 為了解決這個問題采用每來一個連接產(chǎn)生一個線程,是不阻塞了,但是當(dāng)線程數(shù)量過多的時候,對于cpu來說開銷和壓力是比較大的。
- 對于單個socket來說,阻塞的時候大部分的時候都是在等待IO操作(網(wǎng)絡(luò)操作也屬于IO操作)。為了避免這種情況,就出現(xiàn)了異步。
- 客戶端發(fā)起一個連接,會在服務(wù)端注冊一個文件句柄,服務(wù)端會不斷輪詢這些文件句柄的列表
- 主進程和客戶端建立連接而沒有啟動線程,這個時候主進程和客戶端進行交互,其他的客戶端是無法連接主進程的
- 為了實現(xiàn)主進程既能和已連接的客戶端收發(fā)消息,又能和新的客戶端建立連接,就把輪詢變的非常快(死循環(huán))去刷客戶端連接進來的文件句柄的列表
- 只要客戶端發(fā)消息了,服務(wù)端讀取了消息之后,有另一個列表去接收給客戶端返回的消息,也不斷的去刷這個列表,刷出來后返回給客戶端,這樣和客戶端的這次通信就完成了,但是跟客戶端的連接還沒有斷,但是就進入了下一次的輪詢。】
2.select的原理
- 1.從用戶空間拷貝fd_set到內(nèi)核空間(fd_set 過大導(dǎo)致占用空間且慢);
- 2.注冊回調(diào)函數(shù)__pollwait;
- 3.遍歷所有fd,對全部指定設(shè)備做一次poll(這里的poll是一個文件操作,它有兩個參數(shù),一個是文件fd本身,一個是當(dāng)設(shè)備尚未就緒時調(diào)用的回調(diào)函數(shù)__pollwait,這個函數(shù)把設(shè)備自己特有的等待隊列傳給內(nèi)核,讓內(nèi)核把當(dāng)前的進程掛載到其中)(遍歷數(shù)組中所有 fd);
- 4.當(dāng)設(shè)備就緒時,設(shè)備就會喚醒在自己特有等待隊列中的【所有】節(jié)點,于是當(dāng)前進程就獲取到了完成的信號。poll文件操作返回的是一組標(biāo)準(zhǔn)的掩碼,其中的各個位指示當(dāng)前的不同的就緒狀態(tài)(全0為沒有任何事件觸發(fā)),根據(jù)mask可對fd_set賦值;
- 5.如果所有設(shè)備返回的掩碼都沒有顯示任何的事件觸發(fā),就去掉回調(diào)函數(shù)的函數(shù)指針,進入有限時的睡眠狀態(tài),再恢復(fù)和不斷做poll,再作有限時的睡眠,直到其中一個設(shè)備有事件觸發(fā)為止。
- 6.只要有事件觸發(fā),系統(tǒng)調(diào)用返回,將fd_set從內(nèi)核空間拷貝到用戶空間,回到用戶態(tài),用戶就可以對相關(guān)的fd作進一步的讀或者寫操作了。
3.select 優(yōu)點
- select目前幾乎在所有的平臺上支持,良好跨平臺性。
- 單進程實現(xiàn)監(jiān)視多個文件描述符,節(jié)省系統(tǒng)開銷
4.select 缺點
- 每次調(diào)用select,都需要把fd集合從用戶態(tài)拷貝到內(nèi)核態(tài),這個開銷在fd很多的時候會很大
- 單個進程能夠監(jiān)視的fd數(shù)量存在最大限制,在linux上默認(rèn)為1024(可以通過修改宏定義或者重新編譯內(nèi)核的方式提升這個限制)
- 并且由于select的fd是放在數(shù)組中,并且每次都要線性遍歷整個數(shù)組,當(dāng)fd很多的時候,開銷也很大
5.python select
調(diào)用select的函數(shù)為:
readable,writable,exceptional = select.select(rlist, wlist, xlist[, timeout])前三個參數(shù)都分別是三個列表,數(shù)組中的對象均為waitable object:均是整數(shù)的文件描述符(file descriptor)或者一個擁有返回文件描述符方法fileno()的對象;
- rlist: 等待讀就緒的list
- wlist: 等待寫就緒的list
- errlist: 等待“異常”的list
select方法用來監(jiān)視文件描述符,如果文件描述符發(fā)生變化,則獲取該描述符。
- 1、這三個list可以是一個空的list,但是接收3個空的list是依賴于系統(tǒng)的(在Linux上是可以接受的,但是在window上是不可以的)。
- 2、當(dāng) rlist 序列中的描述符發(fā)生可讀時(accetp和read),則獲取發(fā)生變化的描述符并添加到 readable 序列中
- 3、當(dāng) wlist 序列中含有描述符時,則將該序列中所有的描述符添加到 writable 序列中
- 4、當(dāng) errlist序列中的句柄發(fā)生錯誤時,則將該發(fā)生錯誤的句柄添加到 exceptional 序列中
- 5、當(dāng) 超時時間 未設(shè)置,則select會一直阻塞,直到監(jiān)聽的描述符發(fā)生變化當(dāng) 超時時間 = 1 時,那么如果監(jiān)聽的句柄均無任何變化,則select會阻塞 1 秒,之后返回三個空列表,如果監(jiān)聽的描述符(fd)有變化,則直接執(zhí)行。
- 6、在list中可以接受Ptython的的file對象(比如sys.stdin,或者會被open()和os.open()返回的object),socket object將會返回socket.socket()。也可以自定義類,只要有一個合適的fileno()的方法(需要真實返回一個文件描述符,而不是一個隨機的整數(shù))。
6.select 示例:
#coding:UTF8import select import socket import sys import Queue#創(chuàng)建一個TCP/IP 進程 server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setblocking(0)#連接地址和端口 server_address = ('localhost',10000) print >>sys.stderr,'starting up on %s prot %s' % server_address server.bind(server_address)#最大允許鏈接數(shù) server.listen(5)inputs = [ server ] outputs = []message_queues = {}while inputs:print >>sys.stderr,'\nwaiting for the next event'readable,writable,exceptional = select.select(inputs,outputs,inputs)# Handle inputsfor s in readable:if s is server:# A "readable" server socket is ready to accept a connectionconnection, client_address = s.accept()print >>sys.stderr, 'new connection from', client_address#connection.setblocking(0)inputs.append(connection)# Give the connection a queue for data we want to sendmessage_queues[connection] = Queue.Queue()else:data = s.recv(1024)if data:# A readable client socket has dataprint >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())message_queues[s].put(data) #這個s相當(dāng)于connection# Add output channel for responseif s not in outputs:outputs.append(s)else:# Interpret empty result as closed connectionprint >>sys.stderr, 'closing', client_address, 'after reading no data'# Stop listening for input on the connectionif s in outputs:outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數(shù)據(jù)了,所以這時候如果這個客戶端的連接對象還在outputs列表中,就把它刪掉inputs.remove(s) #inputs中也刪除掉s.close() #把這個連接關(guān)閉掉# Remove message queuedel message_queues[s]# Handle outputsfor s in writable:try:next_msg = message_queues[s].get_nowait()except Queue.Empty:# No messages waiting so stop checking for writability.print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'outputs.remove(s)else:print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())s.send(next_msg.upper())# Handle "exceptional conditions"for s in exceptional:print >>sys.stderr, 'handling exceptional condition for', s.getpeername()# Stop listening for input on the connectioninputs.remove(s)if s in outputs:outputs.remove(s)s.close()# Remove message queuedel message_queues[s] import socket import queue from select import select SERVER_IP = ('127.0.0.1', 9999) # 保存客戶端發(fā)送過來的消息,將消息放入隊列中 message_queue = {} input_list = [] output_list = [] if __name__ == "__main__": server = socket.socket() server.bind(SERVER_IP) server.listen(10) # 設(shè)置為非阻塞 server.setblocking(False) # 初始化將服務(wù)端加入監(jiān)聽列表 input_list.append(server) while True: # 開始 select 監(jiān)聽,對input_list中的服務(wù)端server進行監(jiān)聽 stdinput, stdoutput, stderr = select(input_list, output_list, input_list) # 循環(huán)判斷是否有客戶端連接進來,當(dāng)有客戶端連接進來時select將觸發(fā) for obj in stdinput: # 判斷當(dāng)前觸發(fā)的是不是服務(wù)端對象, 當(dāng)觸發(fā)的對象是服務(wù)端對象時,說明有新客戶端連接進來了 if obj == server: # 接收客戶端的連接, 獲取客戶端對象和客戶端地址信息 conn, addr = server.accept() print("Client {0} connected! ".format(addr)) # 將客戶端對象也加入到監(jiān)聽的列表中, 當(dāng)客戶端發(fā)送消息時 select 將觸發(fā) input_list.append(conn) # 為連接的客戶端單獨創(chuàng)建一個消息隊列,用來保存客戶端發(fā)送的消息 message_queue[conn] = queue.Queue() else: # 由于客戶端連接進來時服務(wù)端接收客戶端連接請求,將客戶端加入到了監(jiān)聽列表中(input_list),客戶端發(fā)送消息將觸發(fā) # 所以判斷是否是客戶端對象觸發(fā) try: recv_data = obj.recv(1024) # 客戶端未斷開 if recv_data: print("received {0} from client {1}".format(recv_data.decode(), addr)) # 將收到的消息放入到各客戶端的消息隊列中 message_queue[obj].put(recv_data) # 將回復(fù)操作放到output列表中,讓select監(jiān)聽 if obj not in output_list: output_list.append(obj) except ConnectionResetError: # 客戶端斷開連接了,將客戶端的監(jiān)聽從input列表中移除 input_list.remove(obj) # 移除客戶端對象的消息隊列 del message_queue[obj] print("\n[input] Client {0} disconnected".format(addr)) # 如果現(xiàn)在沒有客戶端請求,也沒有客戶端發(fā)送消息時,開始對發(fā)送消息列表進行處理,是否需要發(fā)送消息 for sendobj in output_list: try: # 如果消息隊列中有消息,從消息隊列中獲取要發(fā)送的消息 if not message_queue[sendobj].empty(): # 從該客戶端對象的消息隊列中獲取要發(fā)送的消息 send_data = message_queue[sendobj].get() sendobj.sendall(send_data) else: # 將監(jiān)聽移除等待下一次客戶端發(fā)送消息 output_list.remove(sendobj) except ConnectionResetError: # 客戶端連接斷開了 del message_queue[sendobj] output_list.remove(sendobj) print("\n[output] Client {0} disconnected".format(addr))二、poll
1.相關(guān)概念
- poll在1986年誕生于System V Release3,它和select在本質(zhì)上沒有多大差別,但是poll沒有最大文件描述符數(shù)量的限制。
- poll和select同樣存在一個缺點就是,包含大量文件描述符的數(shù)組被整體復(fù)制于用戶態(tài)和內(nèi)核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨著文件描述符數(shù)量的增加而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調(diào)用select()和poll() 的時候?qū)⒃俅螆蟾孢@些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(fā)(Level Triggered)。
2.poll的原理
在Python中調(diào)用poll
select.poll(),返回一個poll的對象,支持注冊和注銷文件描述符。 poll.register(fd[, eventmask])注冊一個文件描述符注冊后,可以通過poll()方法來檢查是否有對應(yīng)的I/O事件發(fā)生。fd可以是i 個整數(shù),或者有返回整數(shù)的fileno()方法對象。如果File對象實現(xiàn)了fileno(),也可以當(dāng)作參數(shù)使用。
eventmask是一個你想去檢查的事件類型,它可以是常量POLLIN, POLLPRI和 POLLOUT的組合。如果缺省,默認(rèn)會去檢查所有的3種事件類型。
| POLLIN | 有數(shù)據(jù)讀取 |
| POLLPRT | 有數(shù)據(jù)緊急讀取 |
| POLLOUT | 準(zhǔn)備輸出:輸出不會阻塞 |
| POLLERR | 某些錯誤情況出現(xiàn) |
| POLLHUP | 掛起 |
| POLLNVAL | 無效請求:描述無法打開 |
3.代碼
#coding: utf-8 import select, socketresponse = b"hello world"serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) serversocket.bind(('localhost', 10000)) serversocket.listen(1) serversocket.setblocking(0)# poll = select.poll() poll.register(serversocket.fileno(), select.POLLIN)connections = {} while True:for fd, event in poll.poll():if event == select.POLLIN:if fd == serversocket.fileno():con, addr = serversocket.accept()poll.register(con.fileno(), select.POLLIN)connections[con.fileno()] = conelse:con = connections[fd]data = con.recv(1024)if data:poll.modify(con.fileno(), select.POLLOUT)elif event == select.POLLOUT:con = connections[fd]con.send(response)poll.unregister(con.fileno())con.close()三、epoll
1.相關(guān)概念
直到Linux2.6才出現(xiàn)了由內(nèi)核直接支持的實現(xiàn)方法,那就是epoll,它幾乎具備了之前所說的一切優(yōu)點,被公認(rèn)為Linux2.6下性能最好的多路I/O就緒通知方法。
epoll可以同時支持水平觸發(fā)和邊緣觸發(fā)(Edge Triggered,只告訴進程哪些文件描述符剛剛變?yōu)榫途w狀態(tài),它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發(fā)),理論上邊緣觸發(fā)的性能要更高一些,但是代碼實現(xiàn)相當(dāng)復(fù)雜。
- epoll同樣只告知那些就緒的文件描述符
- 當(dāng)我們調(diào)用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表 就緒描述符數(shù)量的值
- 你只需要去epoll指定的一個數(shù)組中依次取得相應(yīng)數(shù)量的文件描述符即可,這里也使用了內(nèi)存映射(mmap)技術(shù),這樣便徹底省掉了 這些文件描述符在系統(tǒng)調(diào)用時復(fù)制的開銷。
- 另一個本質(zhì)的改進在于epoll采用基于事件的就緒通知方式。
- 在select/poll中,進程只有在調(diào)用一定的方法后,內(nèi)核才對所有監(jiān)視的文件描 述符進行掃描
- 而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基于某個文件描述符就緒時,內(nèi)核會采用類似callback的回調(diào) 機制,迅速激活這個文件描述符,當(dāng)進程調(diào)用epoll_wait()時便得到通知。
2、epoll原理
調(diào)用epoll_create時,做了以下事情:
- 內(nèi)核幫我們在epoll文件系統(tǒng)里建了個file結(jié)點;
- 在內(nèi)核cache里建了個紅黑樹用于存儲以后epoll_ctl傳來的socket;
- 建立一個list鏈表,用于存儲準(zhǔn)備就緒的事件。
調(diào)用epoll_ctl時,做了以下事情:
- 把socket放到epoll文件系統(tǒng)里file對象對應(yīng)的紅黑樹上;
- 給內(nèi)核中斷處理程序注冊一個回調(diào)函數(shù),告訴內(nèi)核,如果這個句柄的中斷到了,就把它放到準(zhǔn)備就緒list鏈表里。
調(diào)用epoll_wait時,做了以下事情:
- 觀察list鏈表里有沒有數(shù)據(jù)。有數(shù)據(jù)就返回,沒有數(shù)據(jù)就sleep,等到timeout時間到后即使鏈表沒數(shù)據(jù)也返回。
- 通常情況下即使我們要監(jiān)控百萬計的句柄,大多一次也只返回很少量的準(zhǔn)備就緒句柄而已,所以,epoll_wait僅需要從內(nèi)核態(tài)copy少量的句柄到用戶態(tài)而已。
3.優(yōu)缺點
select
select本質(zhì)上是通過設(shè)置或者檢查存放fd標(biāo)志位的數(shù)據(jù)結(jié)構(gòu)來進行下一步處理。這樣所帶來的缺點是(從聲明、到系統(tǒng)調(diào)用、到掃描、到返回后掃描):
- 1).單個進程可監(jiān)視的fd數(shù)量被限制
- 2).需要維護一個用來存放大量fd的數(shù)據(jù)結(jié)構(gòu),這樣會使得用戶空間和內(nèi)核空間在傳遞該結(jié)構(gòu)時復(fù)制開銷大
- 3).對socket進行掃描時是線性掃描
- 4).用戶也需要對返回的 fd_set 進行遍歷
poll
poll本質(zhì)上和select沒有區(qū)別,它將用戶傳入的數(shù)組拷貝到內(nèi)核空間,然后查詢每個fd對應(yīng)的設(shè)備狀態(tài),如果設(shè)備就緒則在設(shè)備等待隊列中加入一項并繼續(xù)遍歷,如果遍歷完所有fd后沒有發(fā)現(xiàn)就緒設(shè)備,則掛起當(dāng)前進程,直到設(shè)備就緒或者主動超時,被喚醒后它又要再次遍歷fd。這個過程經(jīng)歷了多次無謂的遍歷。
- poll沒有最大連接數(shù)的限制,原因是poll是基于數(shù)組來存儲的,但是同樣有一個缺點:
- 大量的fd的數(shù)組被整體復(fù)制于用戶態(tài)和內(nèi)核地址空間之間,而不管這樣的復(fù)制是不是有意義。
- poll還有一個特點是“水平觸發(fā)”,如果報告了fd后,沒有被處理,那么下次poll時會再次報告該fd。
3.epoll
epoll支持水平觸發(fā)和邊緣觸發(fā),最大的特點在于邊緣觸發(fā),它只告訴進程哪些fd剛剛變?yōu)榫托钁B(tài),并且只會通知一次。在前面說到的復(fù)制問題上,epoll使用mmap減少復(fù)制開銷。還有一個特點是,epoll使用“事件”的就緒通知方式,通過epoll_ctl注冊fd,一旦該fd就緒,內(nèi)核就會采用類似callback的回調(diào)機制來激活該fd,epoll_wait便可以收到通知
- 1.支持一個進程所能打開的最大連接數(shù)
- 2.select 單個進程所能打開的最大連接數(shù)有FD_SETSIZE宏定義,其大小是32個整數(shù)的大小(在32位的機器上,大小就是3232,同理64位機器上FD_SETSIZE為3264),當(dāng)然我們可以對進行修改,然后重新編譯內(nèi)核,但是性能可能會受到影響,這需要進一步的測試。
- 3.epoll本質(zhì)上和select沒有區(qū)別,但是它沒有最大連接數(shù)的限制,原因是它是基于鏈表來存儲的
- 4.epoll 雖然連接數(shù)有上限,但是很大,1G內(nèi)存的機器上可以打開10萬左右的連接,2G內(nèi)存的機器可以打開20萬左右的連接
4. FD劇增后帶來的IO效率問題
- select 因為每次調(diào)用時都會對連接進行線性遍歷,所以隨著FD的增加會造成遍歷速度慢的“線性下降性能問題”。
- poll 同上
- epoll 因為epoll內(nèi)核中實現(xiàn)是根據(jù)每個fd上的callback函數(shù)來實現(xiàn)的,只有活躍的socket才會主動調(diào)用callback,所以在活躍socket較少的情況下,使用epoll沒有前面兩者的線性下降的性能問題,但是所有socket都很活躍的情況下,可能會有性能問題。
5.消息傳遞方式
- select 內(nèi)核需要將消息傳遞到用戶空間,都需要內(nèi)核拷貝動作。
- poll 同上
- epoll epoll通過內(nèi)核和用戶空間共享一塊內(nèi)存來實現(xiàn)的。
下面我們對上面的socket例子進行改造,看一下select的例子:
4.在Python中調(diào)用epoll
select.epoll([sizehint=-1])返回一個epoll對象。| EPOLLIN | 讀就緒 |
| EPOLLOUT | 寫就緒 |
| EPOLLPRI | 有數(shù)據(jù)緊急讀取 |
| EPOLLERR | assoc. fd有錯誤情況發(fā)生 |
| EPOLLHUP | assoc. fd發(fā)生掛起 |
| EPOLLRT | 設(shè)置邊緣觸發(fā)(ET)(默認(rèn)的是水平觸發(fā)) |
| EPOLLONESHOT | 設(shè)置為 one-short 行為,一個事件(event)被拉出后,對應(yīng)的fd在內(nèi)部被禁用 |
| EPOLLRDNORM | 和 EPOLLIN 相等 |
| EPOLLRDBAND | 優(yōu)先讀取的數(shù)據(jù)帶(data band) |
| EPOLLWRNORM | 和 EPOLLOUT 相等 |
| EPOLLWRBAND | 優(yōu)先寫的數(shù)據(jù)帶(data band) |
| EPOLLMSG 忽視 |
| epoll.close() | 關(guān)閉epoll對象的文件描述符。 |
| epoll.fileno | 返回control fd的文件描述符number。 |
| epoll.fromfd(fd) | 用給予的fd來創(chuàng)建一個epoll對象。 |
| epoll.register(fd[, eventmask]) | 在epoll對象中注冊一個文件描述符。(如果文件描述符已經(jīng)存在,將會引起一個IOError) |
| epoll.modify(fd, eventmask) | 修改一個已經(jīng)注冊的文件描述符。 |
| epoll.unregister(fd) | 注銷一個文件描述符。 |
| epoll.poll(timeout=-1[, maxevnets=-1]) | 等待事件,timeout(float)的單位是秒(second)。 |
5.例
#coding:Utf8 import socket, selectEOL1 = b'\n\n' EOL2 = b'\n\r\n' response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n' response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n' response += b'Hello, world!'serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) serversocket.bind(('localhost', 10000)) serversocket.listen(1) serversocket.setblocking(0)epoll = select.epoll() epoll.register(serversocket.fileno(), select.EPOLLIN)try:connections = {}; requests = {}; responses = {}while True:events = epoll.poll(1)for fileno, event in events:if fileno == serversocket.fileno():connection, address = serversocket.accept()connection.setblocking(0)epoll.register(connection.fileno(), select.EPOLLIN)connections[connection.fileno()] = connectionrequests[connection.fileno()] = b''responses[connection.fileno()] = responseelif event & select.EPOLLIN:requests[fileno] += connections[fileno].recv(1024)if EOL1 in requests[fileno] or EOL2 in requests[fileno]:epoll.modify(fileno, select.EPOLLOUT)print('-'*40 + '\n' + requests[fileno].decode()[:-2])elif event & select.EPOLLOUT:byteswritten = connections[fileno].send(responses[fileno])responses[fileno] = responses[fileno][byteswritten:]if len(responses[fileno]) == 0:epoll.modify(fileno, 0)connections[fileno].shutdown(socket.SHUT_RDWR)elif event & select.EPOLLHUP:epoll.unregister(fileno)connections[fileno].close()del connections[fileno] finally:epoll.unregister(serversocket.fileno())epoll.close()serversocket.close() #!/usr/bin/env python import select import socket response = b'' serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.bind(('0.0.0.0', 8080)) serversocket.listen(1) # 因為socket默認(rèn)是阻塞的,所以需要使用非阻塞(異步)模式。 serversocket.setblocking(0) # 創(chuàng)建一個epoll對象 epoll = select.epoll() # 在服務(wù)端socket上面注冊對讀event的關(guān)注。一個讀event隨時會觸發(fā)服務(wù)端socket去接收一個socket連接 epoll.register(serversocket.fileno(), select.EPOLLIN) try: # 字典connections映射文件描述符(整數(shù))到其相應(yīng)的網(wǎng)絡(luò)連接對象 connections = {} requests = {} responses = {} while True: # 查詢epoll對象,看是否有任何關(guān)注的event被觸發(fā)。參數(shù)“1”表示,我們會等待1秒來看是否有event發(fā)生。 # 如果有任何我們感興趣的event發(fā)生在這次查詢之前,這個查詢就會帶著這些event的列表立即返回 events = epoll.poll(1) # event作為一個序列(fileno,event code)的元組返回。fileno是文件描述符的代名詞,始終是一個整數(shù)。 for fileno, event in events: # 如果是服務(wù)端產(chǎn)生event,表示有一個新的連接進來 if fileno == serversocket.fileno(): connection, address = serversocket.accept() print('client connected:', address) # 設(shè)置新的socket為非阻塞模式 connection.setblocking(0) # 為新的socket注冊對讀(EPOLLIN)event的關(guān)注 epoll.register(connection.fileno(), select.EPOLLIN) connections[connection.fileno()] = connection # 初始化接收的數(shù)據(jù) requests[connection.fileno()] = b'' # 如果發(fā)生一個讀event,就讀取從客戶端發(fā)送過來的新數(shù)據(jù) elif event & select.EPOLLIN: print("------recvdata---------") # 接收客戶端發(fā)送過來的數(shù)據(jù) requests[fileno] += connections[fileno].recv(1024) # 如果客戶端退出,關(guān)閉客戶端連接,取消所有的讀和寫監(jiān)聽 if not requests[fileno]: connections[fileno].close() # 刪除connections字典中的監(jiān)聽對象 del connections[fileno] # 刪除接收數(shù)據(jù)字典對應(yīng)的句柄對象 del requests[connections[fileno]] print(connections, requests) epoll.modify(fileno, 0) else: # 一旦完成請求已收到,就注銷對讀event的關(guān)注,注冊對寫(EPOLLOUT)event的關(guān)注。寫event發(fā)生的時候,會回復(fù)數(shù)據(jù)給客戶端 epoll.modify(fileno, select.EPOLLOUT) # 打印完整的請求,證明雖然與客戶端的通信是交錯進行的,但數(shù)據(jù)可以作為一個整體來組裝和處理 print('-' * 40 + '\n' + requests[fileno].decode()) # 如果一個寫event在一個客戶端socket上面發(fā)生,它會接受新的數(shù)據(jù)以便發(fā)送到客戶端 elif event & select.EPOLLOUT: print("-------send data---------") # 每次發(fā)送一部分響應(yīng)數(shù)據(jù),直到完整的響應(yīng)數(shù)據(jù)都已經(jīng)發(fā)送給操作系統(tǒng)等待傳輸給客戶端 byteswritten = connections[fileno].send(requests[fileno]) requests[fileno] = requests[fileno][byteswritten:] if len(requests[fileno]) == 0: # 一旦完整的響應(yīng)數(shù)據(jù)發(fā)送完成,就不再關(guān)注寫event epoll.modify(fileno, select.EPOLLIN) # HUP(掛起)event表明客戶端socket已經(jīng)斷開(即關(guān)閉),所以服務(wù)端也需要關(guān)閉。 # 沒有必要注冊對HUP event的關(guān)注。在socket上面,它們總是會被epoll對象注冊 elif event & select.EPOLLHUP: print("end hup------") # 注銷對此socket連接的關(guān)注 epoll.unregister(fileno) # 關(guān)閉socket連接 connections[fileno].close() del connections[fileno] finally: # 打開的socket連接不需要關(guān)閉,因為Python會在程序結(jié)束的時候關(guān)閉。這里顯式關(guān)閉是一個好的代碼習(xí)慣 epoll.unregister(serversocket.fileno()) epoll.close() serversocket.close()四、web靜態(tài)服務(wù)器-epool
以下代碼,支持http的長連接,即使用了Content-Length
import socket import time import sys import re import selectclass WSGIServer(object):"""定義一個WSGI服務(wù)器的類"""def __init__(self, port, documents_root):# 1. 創(chuàng)建套接字self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 2. 綁定本地信息self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.server_socket.bind(("", port))# 3. 變?yōu)楸O(jiān)聽套接字self.server_socket.listen(128)self.documents_root = documents_root# 創(chuàng)建epoll對象self.epoll = select.epoll()# 將tcp服務(wù)器套接字加入到epoll中進行監(jiān)聽self.epoll.register(self.server_socket.fileno(), select.EPOLLIN|select.EPOLLET)# 創(chuàng)建添加的fd對應(yīng)的套接字self.fd_socket = dict()def run_forever(self):"""運行服務(wù)器"""# 等待對方鏈接while True:# epoll 進行 fd 掃描的地方 -- 未指定超時時間則為阻塞等待epoll_list = self.epoll.poll()# 對事件進行判斷for fd, event in epoll_list:# 如果是服務(wù)器套接字可以收數(shù)據(jù),那么意味著可以進行acceptif fd == self.server_socket.fileno():new_socket, new_addr = self.server_socket.accept()# 向 epoll 中注冊 連接 socket 的 可讀 事件self.epoll.register(new_socket.fileno(), select.EPOLLIN | select.EPOLLET)# 記錄這個信息self.fd_socket[new_socket.fileno()] = new_socket# 接收到數(shù)據(jù)elif event == select.EPOLLIN:request = self.fd_socket[fd].recv(1024).decode("utf-8")if request:self.deal_with_request(request, self.fd_socket[fd])else:# 在epoll中注銷客戶端的信息self.epoll.unregister(fd)# 關(guān)閉客戶端的文件句柄self.fd_socket[fd].close()# 在字典中刪除與已關(guān)閉客戶端相關(guān)的信息del self.fd_socket[fd]def deal_with_request(self, request, client_socket):"""為這個瀏覽器服務(wù)器"""if not request:returnrequest_lines = request.splitlines()for i, line in enumerate(request_lines):print(i, line)# 提取請求的文件(index.html)# GET /a/b/c/d/e/index.html HTTP/1.1ret = re.match(r"([^/]*)([^ ]+)", request_lines[0])if ret:print("正則提取數(shù)據(jù):", ret.group(1))print("正則提取數(shù)據(jù):", ret.group(2))file_name = ret.group(2)if file_name == "/":file_name = "/index.html"# 讀取文件數(shù)據(jù)try:f = open(self.documents_root+file_name, "rb")except:response_body = "file not found, 請輸入正確的url"response_header = "HTTP/1.1 404 not found\r\n"response_header += "Content-Type: text/html; charset=utf-8\r\n"response_header += "Content-Length: %d\r\n" % len(response_body)response_header += "\r\n"# 將header返回給瀏覽器client_socket.send(response_header.encode('utf-8'))# 將body返回給瀏覽器client_socket.send(response_body.encode("utf-8"))else:content = f.read()f.close()response_body = contentresponse_header = "HTTP/1.1 200 OK\r\n"response_header += "Content-Length: %d\r\n" % len(response_body)response_header += "\r\n"# 將數(shù)據(jù)返回給瀏覽器client_socket.send(response_header.encode("utf-8")+response_body)# 設(shè)置服務(wù)器服務(wù)靜態(tài)資源時的路徑 DOCUMENTS_ROOT = "./html"def main():"""控制web服務(wù)器整體"""# python3 xxxx.py 7890if len(sys.argv) == 2:port = sys.argv[1]if port.isdigit():port = int(port)else:print("運行方式如: python3 xxx.py 7890")returnprint("http服務(wù)器使用的port:%s" % port)http_server = WSGIServer(port, DOCUMENTS_ROOT)http_server.run_forever()if __name__ == "__main__":main()總結(jié)
以上是生活随笔為你收集整理的Python中的select、epoll详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python中的HTTP协议
- 下一篇: Python中的GIL和深浅拷贝