日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python编辑程序模型_python并发编程之IO模型

發(fā)布時間:2025/3/15 python 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python编辑程序模型_python并发编程之IO模型 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

了解新知識之前需要知道的一些知識

同步(synchronous):一個進程在執(zhí)行某個任務(wù)時,另外一個進程必須等待其執(zhí)行完畢,才能繼續(xù)執(zhí)行

#所謂同步,就是在發(fā)出一個功能調(diào)用時,在沒有得到結(jié)果之前,該調(diào)用就不會返回。按照這個定義,

其實絕大多數(shù)函數(shù)都是同步調(diào)用。但是一般而言,我們在說同步、異步的時候,

特指那些需要其他部件協(xié)作或者需要一定時間完成的任務(wù)。#舉例:#1. multiprocessing.Pool下的apply #發(fā)起同步調(diào)用后,就在原地等著任務(wù)結(jié)束,

根本不考慮任務(wù)是在計算還是在io阻塞,總之就是一股腦地等任務(wù)結(jié)束#2. concurrent.futures.ProcessPoolExecutor().submit(func,).result()#3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()

異步(asynchronous):

#異步的概念和同步相對。當一個異步功能調(diào)用發(fā)出后,調(diào)用者不能立刻得到結(jié)果。

當該異步功能完成后,通過狀態(tài)、通知或回調(diào)來通知調(diào)用者。如果異步功能用狀態(tài)來通知,

那么調(diào)用者就需要每隔一定時間檢查一次,效率就很低(有些初學(xué)多線程編程的人,總喜歡用一個循環(huán)去檢查某個變量的值,這其實是一 種很嚴重的錯誤)。

如果是使用通知的方式,效率則很高,因為異步功能幾乎不需要做額外的操作。至于回調(diào)函數(shù),其實和通知沒太多區(qū)別。#舉例:#1. multiprocessing.Pool().apply_async() #發(fā)起異步調(diào)用后,并不會等待任務(wù)結(jié)束才返回,相反,

會立即獲取一個臨時結(jié)果(并不是最終的結(jié)果,可能是封裝好的一個對象)。#2. concurrent.futures.ProcessPoolExecutor(3).submit(func,)#3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)

阻塞(blocking):

#阻塞調(diào)用是指調(diào)用結(jié)果返回之前,當前線程會被掛起(如遇到io操作)。函數(shù)只有在得到結(jié)果之后才會

將阻塞的線程激活。有人也許會把阻塞調(diào)用和同步調(diào)用等同起來,實際上他是不同的。對于同步調(diào)用來說,

很多時候當前線程還是激活的,只是從邏輯上當前函數(shù)沒有返回而已。#舉例:#1. 同步調(diào)用:apply一個累計1億次的任務(wù),該調(diào)用會一直等待,直到任務(wù)返回結(jié)果為止,

但并未阻塞住(即便是被搶走cpu的執(zhí)行權(quán)限,那也是處于就緒態(tài));#2. 阻塞調(diào)用:當socket工作在阻塞模式的時候,如果沒有數(shù)據(jù)的情況下調(diào)用recv函數(shù),

則當前線程就會被掛起,直到有數(shù)據(jù)為止。

非阻塞(non-blocking):

#非阻塞和阻塞的概念相對應(yīng),指在不能立刻得到結(jié)果之前也會立刻返回,同時該函數(shù)不會阻塞當前線程。

小結(jié):

#1. 同步與異步針對的是函數(shù)/任務(wù)的調(diào)用方式:同步就是當一個進程發(fā)起一個函數(shù)(任務(wù))調(diào)用的時候,一直等到函數(shù)(任務(wù))完成,而進程繼續(xù)處于激活狀態(tài)。而異步情況下是當一個進程發(fā)起一個函數(shù)(任務(wù))調(diào)用的時候,不會等函數(shù)返回,而是繼續(xù)往下執(zhí)行當,函數(shù)返回的時候通過狀態(tài)、通知、事件等方式通知進程任務(wù)完成。 #2. 阻塞與非阻塞針對的是進程或線程:阻塞是當請求不能滿足的時候就將進程掛起,而非阻塞則不會阻塞當前進程

一、IO模型介紹

IO發(fā)生時涉及的對象和步驟。對于一個網(wǎng)絡(luò)IO(network IO),它會涉及到兩個系統(tǒng)對象,一個是調(diào)用這個IO的process (or thread),另一個就是系統(tǒng)內(nèi)核(kernel)。當一個read操作發(fā)生時,該操作會經(jīng)歷兩個階段:

#1)等待數(shù)據(jù)準備 (Waiting for the data to be ready)

#2)將數(shù)據(jù)從內(nèi)核拷貝到進程中(Copying the data from the kernel to the process)

記住這兩點很重要,因為這些IO模型的區(qū)別就是在兩個階段上各有不同的情況。

二、阻塞IO ?(blocking IO)

阻塞IO(blocking IO)的特點:就是在IO執(zhí)行的兩個階段(等待數(shù)據(jù)和拷貝數(shù)據(jù)兩個階段)都被block了。

實際上,除非特別指定,幾乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。這給網(wǎng)絡(luò)編程帶來了一個很大的問題,如在調(diào)用recv(1024)的同時,線程將被阻塞,在此期間,線程將無法執(zhí)行任何運算或響應(yīng)任何的網(wǎng)絡(luò)請求。

1 from socket import *

2 server =socket(AF_INET,SOCK_STREAM)3 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)4 server.bind(('127.0.0.1',8080))5 server.listen(5)6 print('start runnig...')7 whileTrue:8 conn,addr = server.accept() #IO操作 在這accept的時候不能干recv的活

9 print(addr)10 whileTrue:11 try:12 data = conn.recv(1024) #IO操作

13 conn.send(data.upper())14 exceptException:15 break

16 conn.close()17 server.close()18

19 #我們以前寫的這個就是阻塞的IO模型:一旦阻塞了就在那卡著

20 #直到等到數(shù)據(jù)已經(jīng)到了操作系統(tǒng),操作系統(tǒng)再從內(nèi)核拷貝給應(yīng)用程序

21 #阻塞IO在那兩個階段全都阻塞住了

服務(wù)端

1 from socket import *

2 client =socket(AF_INET,SOCK_STREAM)3 client.connect(('127.0.0.1',8080))4 whileTrue:5 cmd = input('>>:').strip()6 if not cmd:continue

7 client.send(cmd.encode('utf-8'))8 data = client.recv(1024)9 print('接受的是:%s'%data.decode('utf-8'))10 client.close()

客戶端

一個簡單的解決方案:

#在服務(wù)器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每個連接都擁有獨立的線程(或進程),

這樣任何一個連接的阻塞都不會影響其他的連接。

該方案的問題是:

#開啟多進程或都線程的方式,在遇到要同時響應(yīng)成百上千路的連接請求,

則無論多線程還是多進程都會嚴重占據(jù)系統(tǒng)資源,降低系統(tǒng)對外界響應(yīng)效率,

而且線程與進程本身也更容易進入假死狀態(tài)。

改進方案:

很多程序員可能會考慮使用“線程池”或“連接池”。

“線程池”旨在減少創(chuàng)建和銷毀線程的頻率,其維持一定合理數(shù)量的線程,并讓空閑的線程重新承擔(dān)新的執(zhí)行任務(wù)

。“連接池”維持連接的緩存池,盡量重用已有的連接、減少創(chuàng)建和關(guān)閉連接的頻率。這兩種技術(shù)都可以很好

的降低系統(tǒng)開銷,都被廣泛應(yīng)用很多大型系統(tǒng),如websphere、tomcat和各種數(shù)據(jù)庫等。

改進后方案其實也存在著問題:

#“線程池”和“連接池”技術(shù)也只是在一定程度上緩解了頻繁調(diào)用IO接口帶來的資源占用。而且,

所謂“池”始終有其上限,當請求大大超過上限時,“池”構(gòu)成的系統(tǒng)對外界的響應(yīng)并不比沒有池的時候效果

好多少。所以使用“池”必須考慮其面臨的響應(yīng)規(guī)模,并根據(jù)響應(yīng)規(guī)模調(diào)整“池”的大小。

對應(yīng)上例中的所面臨的可能同時出現(xiàn)的上千甚至上萬次的客戶端請求,“線程池”或“連接池”或許可以緩解部分壓力,但是不能解決所有問題。總之,多線程模型可以方便高效的解決小規(guī)模的服務(wù)請求,但面對大規(guī)模的服務(wù)請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個問題。

三、非阻塞IO ?(nonblocking IO)

多線程,多進程,進程池,線程池都可以實現(xiàn)并發(fā),但是仍然沒有解決IO問題

那么下面我們來了解一下非阻塞IO

從圖中可以看出,當用戶進程發(fā)出read操作時,如果kernel中的數(shù)據(jù)還沒有準備好,那么它并不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發(fā)起一個read操作后,并不需要等待,而是馬上就得到了一個結(jié)果。用戶進程判斷結(jié)果是一個error時,它就知道數(shù)據(jù)還沒有準備好,于是用戶就可以在本次到下次再發(fā)起read詢問的時間間隔內(nèi)做其他事情,或者直接再次發(fā)送read操作。一旦kernel中的數(shù)據(jù)準備好了,并且又再次收到了用戶進程的system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存(這一階段仍然是阻塞的),然后返回。

也就是說非阻塞的recvform系統(tǒng)調(diào)用調(diào)用之后,進程并沒有被阻塞,內(nèi)核馬上返回給進程,如果數(shù)據(jù)還沒準備好,此時會返回一個error。進程在返回之后,可以干點別的事情,然后再發(fā)起recvform系統(tǒng)調(diào)用。重復(fù)上面的過程,循環(huán)往復(fù)的進行recvform系統(tǒng)調(diào)用。這個過程通常被稱之為輪詢。輪詢檢查內(nèi)核數(shù)據(jù),直到數(shù)據(jù)準備好,再拷貝數(shù)據(jù)到進程,進行數(shù)據(jù)處理。需要注意,拷貝數(shù)據(jù)整個過程,進程仍然是屬于阻塞的狀態(tài)。

所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel數(shù)據(jù)準備好了沒有。

server.setblocking()#默認是True

server.setblocking(False) #False的話就成非阻塞了,這只是對于socket套接字來說的

所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問內(nèi)核數(shù)據(jù)準備好了沒有。

wait data 等數(shù)據(jù)的這個階段是不阻塞的

copy data 這個階段還是要阻塞的

服務(wù)端

1 #這種程序雖說解決了單線程并發(fā),但是大大的占用了cpu

2 from socket import *

3 importtime4 severt =socket(AF_INET,SOCK_STREAM)5 severt.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)6 severt.bind(('127.0.0.1',8080))7 severt.listen(5)8 severt.setblocking(False) #默認是True (如果是False,套接字里的一些阻塞操作都變成非阻塞的)

9 print('startting....')10 conn_l =[]11 del_l =[]12 whileTrue:13 try:14 print(conn_l)15 conn,addr = severt.accept() #收不到數(shù)據(jù)的時候才出異常

16 print(conn)17 conn_l.append(conn)18 except BlockingIOError: #吧收不到數(shù)據(jù)的那段時間利用起來(利用他收不到

19 #數(shù)據(jù)的時候,才干下面的for循環(huán))

20 for conn inconn_l:21 try:22 data = conn.recv(1024)23 conn.send(data.upper())24 exceptBlockingIOError:25 pass

26 except ConnectionResetError: #端開鏈接的錯誤(如果突然斷開鏈接,會報錯

27 #就先添加到列表里面去,完了吧鏈接給清除了)

28 del_l.append(conn)29 for obj indel_l:30 obj.close()31 conn_l.remove(obj)32 del_l.clear()

客戶端

1 from socket import *

2 client =socket(AF_INET,SOCK_STREAM)3 client.connect(('127.0.0.1',8080))4 whileTrue:5 cmd = input('>>:').strip()6 if not cmd:continue

7 client.send(cmd.encode('utf-8'))8 data = client.recv(1024)9 print(data.decode('utf-8'))

對服務(wù)端的說明:如果客戶端斷開連接的時候,就會發(fā)生ConnectionResetError

所以我們的處理一下這個異常。就如上邊的服務(wù)端所示

但是非阻塞IO模型絕不被推薦。

非阻塞IO模型優(yōu)點:能夠在等待任務(wù)完成的時間里干其他活了(包括提交其他任務(wù),也就是 “后臺” 可以有多個任務(wù)在“”同時“”執(zhí)行)。

非阻塞IO模型缺點:

1. 循環(huán)調(diào)用recv()將大幅度推高CPU占用率;這也是我們在代碼中留一句time.sleep(2)的原因,否則在低配主機下極容易出現(xiàn)卡機情況

2. 任務(wù)完成的響應(yīng)延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務(wù)可能在兩次輪詢之間的任意時間完成。這會導(dǎo)致整體數(shù)據(jù)吞吐量的降低。

四、多路復(fù)用IO ?(IO multiplexing)

當用戶進程調(diào)用了select,那么整個進程會被block,而同時,kernel會“監(jiān)視”所有select負責(zé)的socket,當任何一個socket中的數(shù)據(jù)準備好了,select就會返回。這個時候用戶進程再調(diào)用read操作,將數(shù)據(jù)從kernel拷貝到用戶進程。

這個圖和blocking IO的圖其實并沒有太大的不同,事實上還更差一些。因為這里需要使用兩個系統(tǒng)調(diào)用(select和recvfrom),而blocking IO只調(diào)用了一個系統(tǒng)調(diào)用(recvfrom)。但是,用select的優(yōu)勢在于它可以同時處理多個connection。

強調(diào):

1. 如果處理的連接數(shù)不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優(yōu)勢并不是對于單個連接能處理得更快,而是在于能處理更多的連接。

2.?在多路復(fù)用模型中,對于每一個socket,一般都設(shè)置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數(shù)block,而不是被socket IO給block。

結(jié)論: select的優(yōu)勢在于可以處理多個連接,不適用于單個連接

1 #服務(wù)端

2 from socket import *

3 importselect4

5 s=socket(AF_INET,SOCK_STREAM)6 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)7 s.bind(('127.0.0.1',8081))8 s.listen(5)9 s.setblocking(False) #設(shè)置socket的接口為非阻塞

10 read_l=[s,]11 whileTrue:12 r_l,w_l,x_l=select.select(read_l,[],[])13 print(r_l)14 for ready_obj inr_l:15 if ready_obj ==s:16 conn,addr=ready_obj.accept() #此時的ready_obj等于s

17 read_l.append(conn)18 else:19 try:20 data=ready_obj.recv(1024) #此時的ready_obj等于conn

21 if notdata:22 read_l.remove(ready_obj)23 continue

24 ready_obj.send(data.upper())25 exceptConnectionResetError:26 read_l.remove(ready_obj)27

28 #客戶端

29 from socket import *

30 c=socket(AF_INET,SOCK_STREAM)31 c.connect(('127.0.0.1',8081))32

33 whileTrue:34 msg=input('>>:')35 if not msg:continue

36 c.send(msg.encode('utf-8'))37 data=c.recv(1024)38 print(data.decode('utf-8'))39

40 select IO模型

select IO 模塊

select監(jiān)聽fd變化的過程分析:

#用戶進程創(chuàng)建socket對象,拷貝監(jiān)聽的fd到內(nèi)核空間,每一個fd會對應(yīng)一張系統(tǒng)文件表,

內(nèi)核空間的fd響應(yīng)到數(shù)據(jù)后,就會發(fā)送信號給用戶進程數(shù)據(jù)已到;#用戶進程再發(fā)送系統(tǒng)調(diào)用,比如(accept)將內(nèi)核空間的數(shù)據(jù)copy到用戶空間,

同時作為接受數(shù)據(jù)端內(nèi)核空間的數(shù)據(jù)清除,這樣重新監(jiān)聽時fd再有新的數(shù)據(jù)又可以響應(yīng)到了

(發(fā)送端因為基于TCP協(xié)議所以需要收到應(yīng)答后才會清除)。

select模塊的優(yōu)點

#相比其他模型,使用select() 的事件驅(qū)動模型只用單線程(進程)執(zhí)行,占用資源少,不消耗太多 CPU,

同時能夠為多客戶端提供服務(wù)。如果試圖建立一個簡單的事件驅(qū)動的服務(wù)器程序,這個模型有一定的參考價值。

select模塊的缺點

#首先select()接口并不是實現(xiàn)“事件驅(qū)動”的最好選擇。因為當需要探測的句柄值較大時,

select()接口本身需要消耗大量時間去輪詢各個句柄。很多操作系統(tǒng)提供了更為高效的接口,

如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。

如果需要實現(xiàn)更高效的服務(wù)器程序,類似epoll這樣的接口更被推薦。

遺憾的是不同的操作系統(tǒng)特供的epoll接口有很大差異,所以使用類似于epoll的接口實現(xiàn)

具有較好跨平臺能力的服務(wù)器會比較困難。#其次,該模型將事件探測和事件響應(yīng)夾雜在一起,一旦事件響應(yīng)的執(zhí)行體龐大,則對整個模型是災(zāi)難性的。

五、異步IO(asynchronous IO)

用戶進程發(fā)起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產(chǎn)生任何block。然后,kernel會等待數(shù)據(jù)準備完成,然后將數(shù)據(jù)拷貝到用戶內(nèi)存,當這一切都完成之后,kernel會給用戶進程發(fā)送一個signal,告訴它read操作完成了。

六、IO模型比較分析

經(jīng)過上面的介紹,會發(fā)現(xiàn)non-blocking IO和asynchronous IO的區(qū)別還是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,但是它仍然要求進程去主動的check,并且當數(shù)據(jù)準備完成以后,也需要進程主動的再次調(diào)用recvfrom來將數(shù)據(jù)拷貝到用戶內(nèi)存。而asynchronous IO則完全不同。它就像是用戶進程將整個IO操作交給了他人(kernel)完成,然后他人做完后發(fā)信號通知。在此期間,用戶進程不需要去檢查IO操作的狀態(tài),也不需要主動的去拷貝數(shù)據(jù)。

七、selsectors模塊

這三種IO多路復(fù)用模型在不同的平臺有著不同的支持,而epoll在windows下就不支持,好在我們有selectors模塊,幫我們默認選擇當前平臺下最合適的

1 #服務(wù)端

2 from socket import *

3 importselectors4

5 sel=selectors.DefaultSelector()6 defaccept(server_fileobj,mask):7 conn,addr=server_fileobj.accept()8 sel.register(conn,selectors.EVENT_READ,read)9

10 defread(conn,mask):11 try:12 data=conn.recv(1024)13 if notdata:14 print('closing',conn)15 sel.unregister(conn)16 conn.close()17 return

18 conn.send(data.upper()+b'_SB')19 exceptException:20 print('closing', conn)21 sel.unregister(conn)22 conn.close()23

24

25

26 server_fileobj=socket(AF_INET,SOCK_STREAM)27 server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)28 server_fileobj.bind(('127.0.0.1',8088))29 server_fileobj.listen(5)30 server_fileobj.setblocking(False) #設(shè)置socket的接口為非阻塞

31 sel.register(server_fileobj,selectors.EVENT_READ,accept) #相當于網(wǎng)select的讀列表里append了一個文件句柄server_fileobj,并且綁定了一個回調(diào)函數(shù)accept

32

33 whileTrue:34 events=sel.select() #檢測所有的fileobj,是否有完成wait data的

35 for sel_obj,mask inevents:36 callback=sel_obj.data #callback=accpet

37 callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

38

39 #客戶端

40 from socket import *

41 c=socket(AF_INET,SOCK_STREAM)42 c.connect(('127.0.0.1',8088))43

44 whileTrue:45 msg=input('>>:')46 if not msg:continue

47 c.send(msg.encode('utf-8'))48 data=c.recv(1024)49 print(data.decode('utf-8'))

selectors

總結(jié)

以上是生活随笔為你收集整理的python编辑程序模型_python并发编程之IO模型的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。