日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python异步和进程_12.python进程\协程\异步IO

發(fā)布時(shí)間:2023/12/15 python 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python异步和进程_12.python进程\协程\异步IO 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

進(jìn)程

Python中的多線程無法利用多核優(yōu)勢 , 所以如果我們想要充分地使用多核CPU的資源 , 那么就只能靠多進(jìn)程了

multiprocessing模塊中提供了Process , Queue , Pipe , Lock , RLock , Event , Condition等組件 , 與threading模塊有很多相似之處

1.創(chuàng)建進(jìn)程

from multiprocessing importProcessimporttimedeffunc(name):

time.sleep(2)print('hello',name)if __name__ == '__main__':

p= Process(target=func,args=('derek',))

p.start()#p.join()

print('end...')

View Code

2.進(jìn)程間通訊

(1)Queue

不同進(jìn)程間內(nèi)存是不共享的,要想實(shí)現(xiàn)兩個(gè)進(jìn)程間的數(shù)據(jù)交換。進(jìn)程間通信有兩種主要形式 , 隊(duì)列和管道

from multiprocessing import Process, Queue #Queue是進(jìn)程排列

deff(test):

test.put('22') #通過創(chuàng)建的子進(jìn)程往隊(duì)列添加數(shù)據(jù),實(shí)線父子進(jìn)程交互

if __name__ == '__main__':

q= Queue() #父進(jìn)程

q.put("11")

p= Process(target=f, args=(q,)) #子進(jìn)程

p.start()

p.join()print("取到:",q.get_nowait())print("取到:",q.get_nowait())#父進(jìn)程在創(chuàng)建子進(jìn)程的時(shí)候就把q克隆一份給子進(jìn)程#通過pickle序列化、反序列化,來達(dá)到兩個(gè)進(jìn)程之間的交互

結(jié)果:

取到:11取到:22

Queue

(2)Pipe(管道)

The?Pipe()?function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).

from multiprocessing importProcess, Pipedeff(conn):

conn.send('11')

conn.send('22')print("from parent:",conn.recv())print("from parent:", conn.recv())

conn.close()if __name__ == '__main__':

parent_conn, child_conn= Pipe() #生成管道實(shí)例,可以互相send()和recv()

p= Process(target=f, args=(child_conn,))

p.start()print(parent_conn.recv()) #prints "11"

print(parent_conn.recv()) #prints "22"

parent_conn.send("33") #parent 發(fā)消息給 child

parent_conn.send("44")

p.join()

Pipe

3.Manager

進(jìn)程之間是相互獨(dú)立的 ,Queue和pipe只是實(shí)現(xiàn)了數(shù)據(jù)交互,并沒實(shí)現(xiàn)數(shù)據(jù)共享,Manager可以實(shí)現(xiàn)進(jìn)程間數(shù)據(jù)共享 。

Manager還支持進(jìn)程中的很多操作 , 比如Condition , Lock , Namespace , Queue , RLock , Semaphore等

from multiprocessing importProcess, Managerimportosdeff(d, l):

d[os.getpid()]=os.getpid()

l.append(os.getpid())print(l)if __name__ == '__main__':

with Manager() as manager:

d= manager.dict() #{} #生成一個(gè)字典,可在多個(gè)進(jìn)程間共享和傳遞

l= manager.list(range(5)) #生成一個(gè)列表,可在多個(gè)進(jìn)程間共享和傳遞

p_list =[]for i in range(2):

p= Process(target=f, args=(d, l))

p.start()

p_list.append(p)for res in p_list: #等待結(jié)果

res.join()print(d)print(l)

View Code

4.lock

from multiprocessing importProcess, Lockdeff(l, i):#l.acquire()

print('hello world', i)#l.release()

if __name__ == '__main__':

lock=Lock()for num in range(100):

Process(target=f, args=(lock, num)).start() #要把lock傳到函數(shù)的參數(shù)l

#lock防止在屏幕上打印的時(shí)候會亂

lock

5.進(jìn)程池

進(jìn)程池內(nèi)部維護(hù)一個(gè)進(jìn)程序列,當(dāng)使用時(shí),則去進(jìn)程池中獲取一個(gè)進(jìn)程,如果進(jìn)程池序列中沒有可供使用的進(jìn)程,那么程序就會等待,直到進(jìn)程池中有可用進(jìn)程為止。

進(jìn)程池中有以下幾個(gè)主要方法:

apply:從進(jìn)程池里取一個(gè)進(jìn)程并執(zhí)行

apply_async:apply的異步版本

terminate:立刻關(guān)閉線程池

join:主進(jìn)程等待所有子進(jìn)程執(zhí)行完畢,必須在close或terminate之后

close:等待所有進(jìn)程結(jié)束后,才關(guān)閉線程池

from multiprocessing importProcess, PoolimporttimeimportosdefFoo(i):

time.sleep(2)print("in process",os.getpid())return i + 100

defBar(arg):print('-->exec done:', arg,os.getpid())if __name__ == '__main__': #多進(jìn)程,必須加這一句(windows系統(tǒng))

pool = Pool(processes=3) #允許進(jìn)程池同時(shí)放入3個(gè)進(jìn)程

print("主進(jìn)程",os.getpid())for i in range(10):

pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回調(diào),執(zhí)行完Foo(),接著執(zhí)行Bar()

#pool.apply(func=Foo, args=(i,)) #串行

print('end')

pool.close()

pool.join()#進(jìn)程池中進(jìn)程執(zhí)行完畢后再關(guān)閉,如果注釋,那么程序直接關(guān)閉。必須先close(),再join()

Pool

協(xié)程

1.簡介

協(xié)程(Coroutine) : 是單線程下的并發(fā) , 又稱微線程 , 纖程 . 協(xié)程是一種用戶態(tài)的輕量級線程 , 即協(xié)程有用戶自己控制調(diào)度

協(xié)程擁有自己的寄存器上下文和棧。協(xié)程調(diào)度切換時(shí),將寄存器上下文和棧保存到其他地方,在切回來的時(shí)候,恢復(fù)先前保存的寄存器上下文和棧。

協(xié)程能保留上一次調(diào)用時(shí)的狀態(tài)(即所有局部狀態(tài)的一個(gè)特定組合),每次過程重入時(shí),就相當(dāng)于進(jìn)入上一次調(diào)用的狀態(tài)

使用協(xié)程的優(yōu)缺點(diǎn)

優(yōu)點(diǎn) :

協(xié)程的切換開銷更小 , 屬于程序級別的切換 , 更加輕量級

單線程內(nèi)就可以實(shí)現(xiàn)并發(fā)的效果 , 最大限度利用CPU

缺點(diǎn) :

協(xié)程的本質(zhì)是單線程下 , 無法利用多核 , 可以是一個(gè)程序開啟多個(gè)進(jìn)程 , 每個(gè)進(jìn)程內(nèi)開啟多個(gè)線程 , 每個(gè)線程內(nèi)開啟協(xié)程

協(xié)程指的是單個(gè)線程 , 因而一旦協(xié)程出現(xiàn)阻塞 將會阻塞整個(gè)線程

2.Greenlet

greenlet是一個(gè)用C實(shí)現(xiàn)的協(xié)程模塊,相比與python自帶的yield,它可以使你在任意函數(shù)之間隨意切換,而不需把這個(gè)函數(shù)先聲明為generator

手動切換

from greenlet importgreenletdeftest1():print(12)

gr2.switch()#到這里切換到gr2,執(zhí)行test2()

print(34)

gr2.switch()#切換到上次gr2運(yùn)行的位置

deftest2():print(56)

gr1.switch()#切換到上次gr1運(yùn)行的位置

print(78)

gr1= greenlet(test1) #啟動一個(gè)協(xié)程gr1

gr2 = greenlet(test2) #啟動一個(gè)協(xié)程gr2

gr1.switch()#開始運(yùn)行g(shù)r1

greenlet

3.Gevent

Gevent 是一個(gè)第三方庫,可以輕松通過gevent實(shí)現(xiàn)并發(fā)同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴(kuò)展模塊形式接入Python的輕量級協(xié)程。

(1)IO阻塞自動切換

importgeventdeffoo():print('Running in foo')

gevent.sleep(2)print('阻塞時(shí)間最長,最后運(yùn)行')defbar():print('running in bar')

gevent.sleep(1)print('foo()還在阻塞,這里第二個(gè)運(yùn)行')deffunc3():print("running in func3")

gevent.sleep(0)print("其它兩個(gè)還在IO阻塞先運(yùn)行")#創(chuàng)建協(xié)程實(shí)例

gevent.joinall([

gevent.spawn(foo),#生成,

gevent.spawn(bar),

gevent.spawn(func3),

])#遇到IO自動切換

結(jié)果:

Runninginfoo

runninginbar

runninginfunc3

其它兩個(gè)還在IO阻塞先運(yùn)行

foo()還在阻塞,這里第二個(gè)運(yùn)行

阻塞時(shí)間最長,最后運(yùn)行

Process finished with exit code 0

View Code

由于切換是在IO操作時(shí)自動完成,所以gevent需要修改Python自帶的一些標(biāo)準(zhǔn)庫,這一過程在啟動時(shí)通過monkey patch完成:

(2)爬蟲例子:

from urllib importrequestimportgevent,timefrom gevent importmonkey

monkey.patch_all()#作用:把當(dāng)前程序的所有的io操作給我單獨(dú)的做上標(biāo)記

deff(url):print('GET: %s' %url)

resp=request.urlopen(url)

data=resp.read()print('%d bytes received from %s.' %(len(data), url))#同步需要的時(shí)間

urls = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/']

time_start=time.time()for url inurls:

f(url)print("同步cost",time.time() -time_start)#下面是異步花費(fèi)的時(shí)間

async_time_start =time.time()

gevent.joinall([

gevent.spawn(f,'https://www.python.org/'),

gevent.spawn(f,'https://www.yahoo.com/'),

gevent.spawn(f,'https://github.com/'),

])print("異步cost",time.time() -async_time_start)

結(jié)果:

GET: https://www.python.org/

48954 bytes received from https://www.python.org/.

GET: https://www.yahoo.com/

491871 bytes received from https://www.yahoo.com/.

GET: https://github.com/

51595 bytes received from https://github.com/.

同步cost4.928282260894775GET: https://www.python.org/GET: https://www.yahoo.com/GET: https://github.com/

48954 bytes received from https://www.python.org/.494958 bytes received from https://www.yahoo.com/.51599 bytes received from https://github.com/.

異步cost1.4920852184295654

IO多路復(fù)用

詳解:http://www.cnblogs.com/alex3714/articles/5876749.html

selectors模塊

selectors基于select模塊實(shí)現(xiàn)IO多路復(fù)用,調(diào)用語句selectors.DefaultSelector(),特點(diǎn)是根據(jù)平臺自動選擇最佳IO多路復(fù)用機(jī)制,調(diào)用順序:epoll > poll > select

做一個(gè)socket servers

importselectorsimportsocket

sel= selectors.DefaultSelector() #根據(jù)平臺自動選擇最佳IO多路復(fù)用機(jī)制

defaccept(sock, mask):

conn, addr= sock.accept() #Should be ready

#print('accepted', conn, 'from', addr,mask)

conn.setblocking(False) #設(shè)置為非阻塞IO

sel.register(conn, selectors.EVENT_READ, read)#新連接注冊read回調(diào)函數(shù)

#將conn和read函數(shù)注冊到一起,當(dāng)conn有變化時(shí)執(zhí)行read函數(shù)

defread(conn, mask):

data= conn.recv(1024) #Should be ready

ifdata:print('echoing', repr(data), 'to', conn)

conn.send(data)#Hope it won't block

else:print('closing', conn)

sel.unregister(conn)

conn.close()

sock=socket.socket()

sock.bind(('localhost', 9999))

sock.listen(100)

sock.setblocking(False)#設(shè)置為非阻塞IO

sel.register(sock, selectors.EVENT_READ, accept)#將sock和accept函數(shù)注冊到一起,當(dāng)sock有變化時(shí)執(zhí)行accept函數(shù)

whileTrue:

events= sel.select() #默認(rèn)阻塞,有活動連接就返回活動的連接列表,監(jiān)聽[(key1,mask1),(key2),(mask2)]

for key, mask inevents:

callback= key.data #accept #1 key.data就是accept # 2 key.data就是read

callback(key.fileobj, mask) #key.fileobj= 文件句柄

#1 key.fileobj就是sock # 2 key.fileobj就是conn

server

client

importsocketimportsys

messages= [ b'This is the message.',

b'It will be sent',

b'in parts.',

]

server_address= ('localhost', 9999)#Create a TCP/IP socket

socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(5)]print(socks)#Connect the socket to the port where the server is listening

print('connecting to %s port %s' %server_address)for s insocks:

s.connect(server_address)for message inmessages:#Send messages on both sockets

for s insocks:print('%s: sending "%s"' %(s.getsockname(), message) )

s.send(message)#Read responses on both sockets

for s insocks:

data= s.recv(1024)print( '%s: received "%s"' %(s.getsockname(), data) )if notdata:print( 'closing socket', s.getsockname() )

mutlti conn socket client

總結(jié)

以上是生活随笔為你收集整理的python异步和进程_12.python进程\协程\异步IO的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。