日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > python >内容正文

python

python使用协程实现udp_python-socket和进程线程协程(代码展示)

發(fā)布時(shí)間:2025/3/15 python 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python使用协程实现udp_python-socket和进程线程协程(代码展示) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

#一、進(jìn)程Process#在windows中使用需要注意#在Windows操作系統(tǒng)中由于沒(méi)有fork(linux操作系統(tǒng)中創(chuàng)建進(jìn)程的機(jī)制),在創(chuàng)建子進(jìn)程的時(shí)候會(huì)自動(dòng) import 啟動(dòng)它的這個(gè)文件,而在 import 的時(shí)候又執(zhí)行了整個(gè)文件。因此如果將process()直接寫(xiě)在文件中就會(huì)無(wú)限遞歸創(chuàng)建子進(jìn)程報(bào)錯(cuò)。#所以必須把創(chuàng)建子進(jìn)程的部分使用if __name__ =='__main__' 判斷保護(hù)起來(lái),import 的時(shí)候 ,就不會(huì)遞歸運(yùn)行了。#join:父進(jìn)程等待子進(jìn)程結(jié)束后才繼續(xù)執(zhí)行自己后續(xù)的代碼

importtimeimportrandomfrom multiprocessing importProcessdeffunc(index):

time.sleep(random.randint(1, 3))print('第%s封郵件發(fā)送完畢' %index)if __name__ == '__main__':

p_lst=[]for i in range(10):

p= Process(target=func, args=(i,))

p.start()#先讓所有子進(jìn)程都啟動(dòng)

p_lst.append(p)for p in p_lst: #再進(jìn)行join阻塞

p.join()print('10封郵件全部發(fā)送完畢')#守護(hù)進(jìn)程#主進(jìn)程創(chuàng)建守護(hù)進(jìn)程#1:守護(hù)進(jìn)程會(huì)在主進(jìn)程代碼執(zhí)行結(jié)束后就終止#2:守護(hù)進(jìn)程內(nèi)無(wú)法再開(kāi)啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children#注意:進(jìn)程之間是互相獨(dú)立的,主進(jìn)程代碼運(yùn)行結(jié)束,守護(hù)進(jìn)程隨即終止

#1,守護(hù)進(jìn)程會(huì)在主進(jìn)程代碼執(zhí)行結(jié)束后就終止

importtimefrom multiprocessing importProcessdeffunc():print('子進(jìn)程 start')

time.sleep(3) #睡3秒的時(shí)候主進(jìn)程的代碼已經(jīng)執(zhí)行完畢了,所以子進(jìn)程也會(huì)跟著結(jié)束

print('子進(jìn)程end')if __name__ == '__main__':

p= Process(target=func)

p.daemon= True #daemon是Process的屬性

p.start()

time.sleep(2) #睡2秒的時(shí)候,執(zhí)行了子進(jìn)程

print('主進(jìn)程')#結(jié)果:#子進(jìn)程 start#主進(jìn)程

#鎖#加鎖降低了程序的效率,讓原來(lái)能夠同時(shí)執(zhí)行的代碼變成順序執(zhí)行了,異步變同步的過(guò)程#保證了數(shù)據(jù)的安全

importtimeimportjsonfrom multiprocessing importProcessfrom multiprocessing import Lock #導(dǎo)入Lock類(lèi)

defsearch(person):

with open('ticket') as f:

ticketinfo=json.load(f)print('%s查詢(xún)余票:' % person, ticketinfo['count'])defget_ticket(person):

with open('ticket') as f:

ticketinfo=json.load(f)

time.sleep(0.2) #模擬讀數(shù)據(jù)的網(wǎng)絡(luò)延遲

if ticketinfo['count'] >0:print('%s買(mǎi)到票了' %person)

ticketinfo['count'] -= 1time.sleep(0.2)

with open('ticket', 'w') as f:

json.dump(ticketinfo, f)else:print('%s沒(méi)買(mǎi)到票' %person)defticket(person, lock):

search(person)

lock.acquire()#誰(shuí)獲得鑰匙 誰(shuí)才能進(jìn)入

get_ticket(person)

lock.release()#用完了,把鑰匙給下一個(gè)人

if __name__ == '__main__':

lock= Lock() #創(chuàng)建一個(gè)鎖對(duì)象

for i in range(5):

p= Process(target=ticket, args=('person%s' %i, lock))

p.start()#結(jié)果:#person1查詢(xún)余票: 3#person3查詢(xún)余票: 3#person0查詢(xún)余票: 3#person2查詢(xún)余票: 3#person4查詢(xún)余票: 3#person1買(mǎi)到票了#person3買(mǎi)到票了#person0買(mǎi)到票了#person2沒(méi)買(mǎi)到票#person4沒(méi)買(mǎi)到票

#1、信號(hào)量的實(shí)現(xiàn)機(jī)制:計(jì)數(shù)器 + 鎖實(shí)現(xiàn)的#信號(hào)量同步基于內(nèi)部計(jì)數(shù)器,每調(diào)用一次acquire(),計(jì)數(shù)器減1;每調(diào)用一次release(),計(jì)數(shù)器加1.當(dāng)計(jì)數(shù)器為0時(shí),acquire()#調(diào)用被阻塞。#互斥鎖同時(shí)只允許一個(gè)線(xiàn)程更改數(shù)據(jù),而信號(hào)量Semaphore是同時(shí)允許一定數(shù)量的線(xiàn)程更改數(shù)據(jù)(Samphore相當(dāng)于有幾把鑰匙,lock只能有一把鑰匙)

importtimeimportrandomfrom multiprocessing importProcessfrom multiprocessing importSemaphoredef changba(person, sem): #在唱吧 唱歌

sem.acquire() #第一次可以同時(shí)進(jìn)來(lái)兩個(gè)人

print('%s走進(jìn)唱吧' %person)

time.sleep(random.randint(3, 6)) #每個(gè)人唱歌的時(shí)間

print('%s走出唱吧' % person) #唱完走人

sem.release() #把鑰匙給下一個(gè)人

if __name__ == '__main__':

sem= Semaphore(2) #2把鑰匙

for i in range(5):

p= Process(target=changba, args=('person%s' %i, sem))

p.start()#事件 Event#事件主要提供了三個(gè)方法#set、wait、clear。#事件處理的機(jī)制:全局定義了一個(gè)“Flag”,#如果“Flag”值為False,那么當(dāng)程序執(zhí)行event.wait方法時(shí)就會(huì)阻塞,#如果“Flag”值為T(mén)rue,那么event.wait方法時(shí)便不再阻塞。

#阻塞事件 :wait()方法#wait是否阻塞是看event對(duì)象內(nèi)部的Flag

#控制Flag的值:#set() 將Flag的值改成True#clear() 將Flag的值改成False#is_set() 判斷當(dāng)前的Flag的值

#紅綠燈:

importtimeimportrandomfrom multiprocessing importProcessfrom multiprocessing importEventdef traffic_ligth(e): #紅綠燈

print('\033[31m紅燈亮\033[0m') #Flag 默認(rèn)是False

whileTrue:if e.is_set(): #如果是綠燈

time.sleep(2) #2秒后

print('\033[31m紅燈亮\033[0m') #轉(zhuǎn)為紅燈

e.clear() #設(shè)置為False

else: #如果是紅燈

time.sleep(2) #2秒后

print('\033[32m綠燈亮\033[0m') #轉(zhuǎn)為綠燈

e.set() #設(shè)置為T(mén)rue

def car(e, i): #車(chē)

if note.is_set():print('car %s在等待' %i)

e.wait()print('car %s 通過(guò)了' %i)if __name__ == '__main__':

e=Event()

p= Process(target=traffic_ligth, args=(e,)) #紅綠燈進(jìn)程

p.daemon =True

p.start()

p_lst=[]for i in range(10): #10輛車(chē)的進(jìn)程

time.sleep(random.randrange(0, 3, 2))

p= Process(target=car, args=(e, i))

p.start()

p_lst.append(p)for p inp_lst: p.join()#二、進(jìn)程通信#隊(duì)列(先進(jìn)先出)#生產(chǎn)者消費(fèi)者模型

importtimeimportrandomfrom multiprocessing importProcess, Queuedefconsumer(q, name):#消費(fèi)者

whileTrue:

food= q.get() #在隊(duì)列中取值

if food is None: breaktime.sleep(random.uniform(0.3, 1)) #模擬吃消耗的時(shí)間

print('%s偷吃了%s,快打死他' %(name, food))defproducter(q, name, food):#生產(chǎn)者

for i in range(10):

time.sleep(random.uniform(0.5, 0.9)) #模擬生產(chǎn)時(shí)間

print('%s生產(chǎn)了%s,序號(hào):%s' %(name, food, i))

q.put(food+ str(i)) #把值存入隊(duì)列中

if __name__ == '__main__':

q= Queue() #Queue隊(duì)列對(duì)象

c1 = Process(target=consumer, args=(q, '小明'))

c2= Process(target=consumer, args=(q, '小東'))

c1.start()

c2.start()

p1= Process(target=producter, args=(q, '張三', '面包'))

p2= Process(target=producter, args=(q, '李四', '可樂(lè)'))

p1.start()

p2.start()

p1.join()

p2.join()

q.put(None)#有幾個(gè)consumer進(jìn)程就需要放幾個(gè)None,表示生產(chǎn)完畢(這就有點(diǎn)low了)

q.put(None)#JoinableQueue#JoinableQueue和Queue幾乎一樣,不同的是JoinableQueue隊(duì)列允許使用者告訴隊(duì)列某個(gè)數(shù)據(jù)已經(jīng)處理了。通知進(jìn)程是使用共享的信號(hào)和條件變量來(lái)實(shí)現(xiàn)的。#task_done():使用者使用此方法發(fā)出信號(hào),表示q.get()返回的項(xiàng)目已經(jīng)被處理#join():當(dāng)隊(duì)列中有數(shù)據(jù)的時(shí)候,使用此方法會(huì)進(jìn)入阻塞,直到放入隊(duì)列中所有的數(shù)據(jù)都被處理掉(都被task_done)才轉(zhuǎn)換成不阻塞

#解決剛才生產(chǎn)者消費(fèi)者模型low的問(wèn)題:

importtimeimportrandomfrom multiprocessing importProcess, JoinableQueuedefconsumer(jq, name):#消費(fèi)者

whileTrue:

food= jq.get() #在隊(duì)列中取值

#if food is None:break

time.sleep(random.uniform(0.3, 1)) #模擬吃消耗的時(shí)間

print('%s偷吃了%s,快打死他' %(name, food))

jq.task_done()#向jq.join()發(fā)送一次信號(hào),證明這個(gè)數(shù)據(jù)已經(jīng)處理了

defproducter(jq, name, food):#生產(chǎn)者

for i in range(10):

time.sleep(random.uniform(0.5, 0.9)) #模擬生產(chǎn)時(shí)間

print('%s生產(chǎn)了%s,序號(hào):%s' %(name, food, i))

jq.put(food+ str(i)) #把值存入隊(duì)列中

if __name__ == '__main__':

jq=JoinableQueue()

c1= Process(target=consumer, args=(jq, '小明'))

c2= Process(target=consumer, args=(jq, '小東'))

c1.daemon= True #把消費(fèi)者設(shè)置為守護(hù)進(jìn)程

c2.daemon =True

c1.start()

c2.start()

p1= Process(target=producter, args=(jq, '張三', '面包'))

p2= Process(target=producter, args=(jq, '李四', '可樂(lè)'))

p1.start()

p2.start()

p1.join()

p2.join()

jq.join()#數(shù)據(jù)全部被task_done后才不阻塞

#管道#Pipe([duplex]):在進(jìn)程之間創(chuàng)建一條管道,并返回元組(left,right),其中l(wèi)eft,right表示管道兩端的連接對(duì)象,強(qiáng)調(diào)一點(diǎn):必須在產(chǎn)生Process對(duì)象之前產(chǎn)生管道#duplex:默認(rèn)管道是全雙工的,如果將duplex改成False,left只能用于接收,right只能用于發(fā)送。#主要方法:#right.recv(): 接收l(shuí)eft.send()發(fā)送的內(nèi)容。如果沒(méi)有消息可接收,recv方法會(huì)一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會(huì)拋出EOFError。#letf.send(): 通過(guò)連接發(fā)送內(nèi)容。#close(): 關(guān)閉連接。

#pipe的端口管理不會(huì)隨著某一個(gè)進(jìn)程的關(guān)閉就關(guān)閉#操作系統(tǒng)來(lái)管理進(jìn)程對(duì)這些端口的使用,不使用的端口應(yīng)該關(guān)閉它#一條管道,兩個(gè)進(jìn)程,就有4個(gè)端口 每關(guān)閉一個(gè)端口計(jì)數(shù)-1,直到只剩下一個(gè)端口的時(shí)候 recv就會(huì)報(bào)錯(cuò)#如果不關(guān)閉不使用的端口,在已經(jīng)把數(shù)據(jù)發(fā)送完畢的情況下,那么接收端的recv就會(huì)一直掛起,等待接收數(shù)據(jù),這個(gè)進(jìn)程就一直不能關(guān)閉#因此不使用的端口就應(yīng)該關(guān)閉它,讓recv拋出異常后對(duì)這個(gè)進(jìn)程進(jìn)行處理

from multiprocessing importProcess, Pipedefconsumer(left, right):

left.close()#若這里不close,則不會(huì)異常EOFError,數(shù)據(jù)接收完畢后,下面的right.recv()就會(huì)一直掛起

whileTrue:try:print(right.recv())exceptEOFError:break

if __name__ == '__main__':

left, right=Pipe()

Process(target=consumer, args=(left, right)).start()

right.close()for i in range(10):

left.send('Apple%s' %i)

left.close()#三、進(jìn)程池#同步、apply

importosimporttimefrom multiprocessing importPooldeftest(num):

time.sleep(1)print('%s:%s' %(num, os.getpid()))return num * 2

if __name__ == '__main__':

p=Pool()for i in range(20):

res= p.apply(test, args=(i,)) #提交任務(wù)的方法 同步提交

print('-->', res) #res就是test的return的值

#異步、apply_async

importtimefrom multiprocessing importPooldeffunc(num):

time.sleep(1)print('做了%s件衣服' %num)if __name__ == '__main__':

p= Pool(4) #進(jìn)程池中創(chuàng)建4個(gè)進(jìn)程,不寫(xiě)的話(huà),默認(rèn)值為你電腦的CUP數(shù)量

for i in range(50):

p.apply_async(func, args=(i,)) #異步提交func到一個(gè)子進(jìn)程中執(zhí)行,沒(méi)有返回值的情況

p.close() #關(guān)閉進(jìn)程池,用戶(hù)不能再向這個(gè)池中提交任務(wù)了

p.join() #阻塞,直到進(jìn)程池中所有的任務(wù)都被執(zhí)行完

#注意:#異步提交且沒(méi)有返回值接收的情況下必須要用close()和join()#因?yàn)槿绻麤](méi)有close()和join(),主進(jìn)程執(zhí)行完畢后會(huì)立刻把子進(jìn)程回收了,相當(dāng)于子進(jìn)程還沒(méi)來(lái)得及開(kāi)啟#所以要join,讓子進(jìn)程結(jié)束后再結(jié)束父進(jìn)程,但是進(jìn)程池中要使用join就必須先進(jìn)行close

importtimeimportosfrom multiprocessing importPooldeftest(num):

time.sleep(1)print('%s:%s' %(num, os.getpid()))return num * 2

if __name__ == '__main__':

p=Pool()

res_lst=[]for i in range(20):

res= p.apply_async(test, args=(i,)) #提交任務(wù)的方法 異步提交

res_lst.append(res)for res inres_lst:print(res.get())#注意:#異步提交有返回值的情況下,res是一個(gè)對(duì)象代表的是這個(gè)任務(wù)的編號(hào),需要用res.get()方法讓任務(wù)執(zhí)行且把返回值返回給你。#get有阻塞效果,拿到子進(jìn)程的返回值后才不阻塞,所以并不需要再使用close和join。

#map#map接收一個(gè)函數(shù)和一個(gè)可迭代對(duì)象,是異步提交的簡(jiǎn)化版本,自帶close和join方法#可迭代對(duì)象的每一個(gè)值就是函數(shù)接收的實(shí)參,可迭代對(duì)象的長(zhǎng)度就是創(chuàng)建的任務(wù)數(shù)量#map拿到返回值是所有結(jié)果組成的列表

importtimefrom multiprocessing importPooldeffunc(num):print('子進(jìn)程:', num)#time.sleep(1)

returnnumif __name__ == '__main__':

p=Pool()

ret= p.map(func, range(10)) #ret是列表

for i inret:print('返回值:', i)#進(jìn)程池的回調(diào)函數(shù)(同步提交apply沒(méi)有回調(diào)函數(shù))

importosfrom multiprocessing importPooldeffunc(i):print('子進(jìn)程:', os.getpid())returnidefcall_back(res):print('回調(diào)函數(shù):', os.getpid())print('res--->', res)if __name__ == '__main__':

p=Pool()print('主進(jìn)程:', os.getpid())

p.apply_async(func, args=(1,), callback=call_back) #callback關(guān)鍵字傳參,參數(shù)是回調(diào)函數(shù)

p.close()

p.join()#結(jié)果:#主進(jìn)程: 4732#子進(jìn)程: 10552#回調(diào)函數(shù): 4732#res---> 1#

#從結(jié)果可以看出:#子進(jìn)程func執(zhí)行完畢之后才去執(zhí)行callback回調(diào)函數(shù)#子進(jìn)程func的返回值會(huì)作為回調(diào)函數(shù)的參數(shù)#回調(diào)函數(shù)是在主進(jìn)程中執(zhí)行的

總結(jié)

以上是生活随笔為你收集整理的python使用协程实现udp_python-socket和进程线程协程(代码展示)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。