日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python使用协程实现udp_python-socket和进程线程协程(代码展示)

發布時間:2025/3/15 python 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python使用协程实现udp_python-socket和进程线程协程(代码展示) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

#一、進程Process#在windows中使用需要注意#在Windows操作系統中由于沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。#所以必須把創建子進程的部分使用if __name__ =='__main__' 判斷保護起來,import 的時候 ,就不會遞歸運行了。#join:父進程等待子進程結束后才繼續執行自己后續的代碼

importtimeimportrandomfrom multiprocessing importProcessdeffunc(index):

time.sleep(random.randint(1, 3))print('第%s封郵件發送完畢' %index)if __name__ == '__main__':

p_lst=[]for i in range(10):

p= Process(target=func, args=(i,))

p.start()#先讓所有子進程都啟動

p_lst.append(p)for p in p_lst: #再進行join阻塞

p.join()print('10封郵件全部發送完畢')#守護進程#主進程創建守護進程#1:守護進程會在主進程代碼執行結束后就終止#2:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children#注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

#1,守護進程會在主進程代碼執行結束后就終止

importtimefrom multiprocessing importProcessdeffunc():print('子進程 start')

time.sleep(3) #睡3秒的時候主進程的代碼已經執行完畢了,所以子進程也會跟著結束

print('子進程end')if __name__ == '__main__':

p= Process(target=func)

p.daemon= True #daemon是Process的屬性

p.start()

time.sleep(2) #睡2秒的時候,執行了子進程

print('主進程')#結果:#子進程 start#主進程

#鎖#加鎖降低了程序的效率,讓原來能夠同時執行的代碼變成順序執行了,異步變同步的過程#保證了數據的安全

importtimeimportjsonfrom multiprocessing importProcessfrom multiprocessing import Lock #導入Lock類

defsearch(person):

with open('ticket') as f:

ticketinfo=json.load(f)print('%s查詢余票:' % person, ticketinfo['count'])defget_ticket(person):

with open('ticket') as f:

ticketinfo=json.load(f)

time.sleep(0.2) #模擬讀數據的網絡延遲

if ticketinfo['count'] >0:print('%s買到票了' %person)

ticketinfo['count'] -= 1time.sleep(0.2)

with open('ticket', 'w') as f:

json.dump(ticketinfo, f)else:print('%s沒買到票' %person)defticket(person, lock):

search(person)

lock.acquire()#誰獲得鑰匙 誰才能進入

get_ticket(person)

lock.release()#用完了,把鑰匙給下一個人

if __name__ == '__main__':

lock= Lock() #創建一個鎖對象

for i in range(5):

p= Process(target=ticket, args=('person%s' %i, lock))

p.start()#結果:#person1查詢余票: 3#person3查詢余票: 3#person0查詢余票: 3#person2查詢余票: 3#person4查詢余票: 3#person1買到票了#person3買到票了#person0買到票了#person2沒買到票#person4沒買到票

#1、信號量的實現機制:計數器 + 鎖實現的#信號量同步基于內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()#調用被阻塞。#互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據(Samphore相當于有幾把鑰匙,lock只能有一把鑰匙)

importtimeimportrandomfrom multiprocessing importProcessfrom multiprocessing importSemaphoredef changba(person, sem): #在唱吧 唱歌

sem.acquire() #第一次可以同時進來兩個人

print('%s走進唱吧' %person)

time.sleep(random.randint(3, 6)) #每個人唱歌的時間

print('%s走出唱吧' % person) #唱完走人

sem.release() #把鑰匙給下一個人

if __name__ == '__main__':

sem= Semaphore(2) #2把鑰匙

for i in range(5):

p= Process(target=changba, args=('person%s' %i, sem))

p.start()#事件 Event#事件主要提供了三個方法#set、wait、clear。#事件處理的機制:全局定義了一個“Flag”,#如果“Flag”值為False,那么當程序執行event.wait方法時就會阻塞,#如果“Flag”值為True,那么event.wait方法時便不再阻塞。

#阻塞事件 :wait()方法#wait是否阻塞是看event對象內部的Flag

#控制Flag的值:#set() 將Flag的值改成True#clear() 將Flag的值改成False#is_set() 判斷當前的Flag的值

#紅綠燈:

importtimeimportrandomfrom multiprocessing importProcessfrom multiprocessing importEventdef traffic_ligth(e): #紅綠燈

print('\033[31m紅燈亮\033[0m') #Flag 默認是False

whileTrue:if e.is_set(): #如果是綠燈

time.sleep(2) #2秒后

print('\033[31m紅燈亮\033[0m') #轉為紅燈

e.clear() #設置為False

else: #如果是紅燈

time.sleep(2) #2秒后

print('\033[32m綠燈亮\033[0m') #轉為綠燈

e.set() #設置為True

def car(e, i): #車

if note.is_set():print('car %s在等待' %i)

e.wait()print('car %s 通過了' %i)if __name__ == '__main__':

e=Event()

p= Process(target=traffic_ligth, args=(e,)) #紅綠燈進程

p.daemon =True

p.start()

p_lst=[]for i in range(10): #10輛車的進程

time.sleep(random.randrange(0, 3, 2))

p= Process(target=car, args=(e, i))

p.start()

p_lst.append(p)for p inp_lst: p.join()#二、進程通信#隊列(先進先出)#生產者消費者模型

importtimeimportrandomfrom multiprocessing importProcess, Queuedefconsumer(q, name):#消費者

whileTrue:

food= q.get() #在隊列中取值

if food is None: breaktime.sleep(random.uniform(0.3, 1)) #模擬吃消耗的時間

print('%s偷吃了%s,快打死他' %(name, food))defproducter(q, name, food):#生產者

for i in range(10):

time.sleep(random.uniform(0.5, 0.9)) #模擬生產時間

print('%s生產了%s,序號:%s' %(name, food, i))

q.put(food+ str(i)) #把值存入隊列中

if __name__ == '__main__':

q= Queue() #Queue隊列對象

c1 = Process(target=consumer, args=(q, '小明'))

c2= Process(target=consumer, args=(q, '小東'))

c1.start()

c2.start()

p1= Process(target=producter, args=(q, '張三', '面包'))

p2= Process(target=producter, args=(q, '李四', '可樂'))

p1.start()

p2.start()

p1.join()

p2.join()

q.put(None)#有幾個consumer進程就需要放幾個None,表示生產完畢(這就有點low了)

q.put(None)#JoinableQueue#JoinableQueue和Queue幾乎一樣,不同的是JoinableQueue隊列允許使用者告訴隊列某個數據已經處理了。通知進程是使用共享的信號和條件變量來實現的。#task_done():使用者使用此方法發出信號,表示q.get()返回的項目已經被處理#join():當隊列中有數據的時候,使用此方法會進入阻塞,直到放入隊列中所有的數據都被處理掉(都被task_done)才轉換成不阻塞

#解決剛才生產者消費者模型low的問題:

importtimeimportrandomfrom multiprocessing importProcess, JoinableQueuedefconsumer(jq, name):#消費者

whileTrue:

food= jq.get() #在隊列中取值

#if food is None:break

time.sleep(random.uniform(0.3, 1)) #模擬吃消耗的時間

print('%s偷吃了%s,快打死他' %(name, food))

jq.task_done()#向jq.join()發送一次信號,證明這個數據已經處理了

defproducter(jq, name, food):#生產者

for i in range(10):

time.sleep(random.uniform(0.5, 0.9)) #模擬生產時間

print('%s生產了%s,序號:%s' %(name, food, i))

jq.put(food+ str(i)) #把值存入隊列中

if __name__ == '__main__':

jq=JoinableQueue()

c1= Process(target=consumer, args=(jq, '小明'))

c2= Process(target=consumer, args=(jq, '小東'))

c1.daemon= True #把消費者設置為守護進程

c2.daemon =True

c1.start()

c2.start()

p1= Process(target=producter, args=(jq, '張三', '面包'))

p2= Process(target=producter, args=(jq, '李四', '可樂'))

p1.start()

p2.start()

p1.join()

p2.join()

jq.join()#數據全部被task_done后才不阻塞

#管道#Pipe([duplex]):在進程之間創建一條管道,并返回元組(left,right),其中left,right表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道#duplex:默認管道是全雙工的,如果將duplex改成False,left只能用于接收,right只能用于發送。#主要方法:#right.recv(): 接收left.send()發送的內容。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。#letf.send(): 通過連接發送內容。#close(): 關閉連接。

#pipe的端口管理不會隨著某一個進程的關閉就關閉#操作系統來管理進程對這些端口的使用,不使用的端口應該關閉它#一條管道,兩個進程,就有4個端口 每關閉一個端口計數-1,直到只剩下一個端口的時候 recv就會報錯#如果不關閉不使用的端口,在已經把數據發送完畢的情況下,那么接收端的recv就會一直掛起,等待接收數據,這個進程就一直不能關閉#因此不使用的端口就應該關閉它,讓recv拋出異常后對這個進程進行處理

from multiprocessing importProcess, Pipedefconsumer(left, right):

left.close()#若這里不close,則不會異常EOFError,數據接收完畢后,下面的right.recv()就會一直掛起

whileTrue:try:print(right.recv())exceptEOFError:break

if __name__ == '__main__':

left, right=Pipe()

Process(target=consumer, args=(left, right)).start()

right.close()for i in range(10):

left.send('Apple%s' %i)

left.close()#三、進程池#同步、apply

importosimporttimefrom multiprocessing importPooldeftest(num):

time.sleep(1)print('%s:%s' %(num, os.getpid()))return num * 2

if __name__ == '__main__':

p=Pool()for i in range(20):

res= p.apply(test, args=(i,)) #提交任務的方法 同步提交

print('-->', res) #res就是test的return的值

#異步、apply_async

importtimefrom multiprocessing importPooldeffunc(num):

time.sleep(1)print('做了%s件衣服' %num)if __name__ == '__main__':

p= Pool(4) #進程池中創建4個進程,不寫的話,默認值為你電腦的CUP數量

for i in range(50):

p.apply_async(func, args=(i,)) #異步提交func到一個子進程中執行,沒有返回值的情況

p.close() #關閉進程池,用戶不能再向這個池中提交任務了

p.join() #阻塞,直到進程池中所有的任務都被執行完

#注意:#異步提交且沒有返回值接收的情況下必須要用close()和join()#因為如果沒有close()和join(),主進程執行完畢后會立刻把子進程回收了,相當于子進程還沒來得及開啟#所以要join,讓子進程結束后再結束父進程,但是進程池中要使用join就必須先進行close

importtimeimportosfrom multiprocessing importPooldeftest(num):

time.sleep(1)print('%s:%s' %(num, os.getpid()))return num * 2

if __name__ == '__main__':

p=Pool()

res_lst=[]for i in range(20):

res= p.apply_async(test, args=(i,)) #提交任務的方法 異步提交

res_lst.append(res)for res inres_lst:print(res.get())#注意:#異步提交有返回值的情況下,res是一個對象代表的是這個任務的編號,需要用res.get()方法讓任務執行且把返回值返回給你。#get有阻塞效果,拿到子進程的返回值后才不阻塞,所以并不需要再使用close和join。

#map#map接收一個函數和一個可迭代對象,是異步提交的簡化版本,自帶close和join方法#可迭代對象的每一個值就是函數接收的實參,可迭代對象的長度就是創建的任務數量#map拿到返回值是所有結果組成的列表

importtimefrom multiprocessing importPooldeffunc(num):print('子進程:', num)#time.sleep(1)

returnnumif __name__ == '__main__':

p=Pool()

ret= p.map(func, range(10)) #ret是列表

for i inret:print('返回值:', i)#進程池的回調函數(同步提交apply沒有回調函數)

importosfrom multiprocessing importPooldeffunc(i):print('子進程:', os.getpid())returnidefcall_back(res):print('回調函數:', os.getpid())print('res--->', res)if __name__ == '__main__':

p=Pool()print('主進程:', os.getpid())

p.apply_async(func, args=(1,), callback=call_back) #callback關鍵字傳參,參數是回調函數

p.close()

p.join()#結果:#主進程: 4732#子進程: 10552#回調函數: 4732#res---> 1#

#從結果可以看出:#子進程func執行完畢之后才去執行callback回調函數#子進程func的返回值會作為回調函數的參數#回調函數是在主進程中執行的

總結

以上是生活随笔為你收集整理的python使用协程实现udp_python-socket和进程线程协程(代码展示)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。