python使用协程实现udp_python-socket和进程线程协程(代码展示)
#一、進程Process#在windows中使用需要注意#在Windows操作系統中由于沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。#所以必須把創建子進程的部分使用if __name__ =='__main__' 判斷保護起來,import 的時候 ,就不會遞歸運行了。#join:父進程等待子進程結束后才繼續執行自己后續的代碼
importtimeimportrandomfrom multiprocessing importProcessdeffunc(index):
time.sleep(random.randint(1, 3))print('第%s封郵件發送完畢' %index)if __name__ == '__main__':
p_lst=[]for i in range(10):
p= Process(target=func, args=(i,))
p.start()#先讓所有子進程都啟動
p_lst.append(p)for p in p_lst: #再進行join阻塞
p.join()print('10封郵件全部發送完畢')#守護進程#主進程創建守護進程#1:守護進程會在主進程代碼執行結束后就終止#2:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children#注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
#1,守護進程會在主進程代碼執行結束后就終止
importtimefrom multiprocessing importProcessdeffunc():print('子進程 start')
time.sleep(3) #睡3秒的時候主進程的代碼已經執行完畢了,所以子進程也會跟著結束
print('子進程end')if __name__ == '__main__':
p= Process(target=func)
p.daemon= True #daemon是Process的屬性
p.start()
time.sleep(2) #睡2秒的時候,執行了子進程
print('主進程')#結果:#子進程 start#主進程
#鎖#加鎖降低了程序的效率,讓原來能夠同時執行的代碼變成順序執行了,異步變同步的過程#保證了數據的安全
importtimeimportjsonfrom multiprocessing importProcessfrom multiprocessing import Lock #導入Lock類
defsearch(person):
with open('ticket') as f:
ticketinfo=json.load(f)print('%s查詢余票:' % person, ticketinfo['count'])defget_ticket(person):
with open('ticket') as f:
ticketinfo=json.load(f)
time.sleep(0.2) #模擬讀數據的網絡延遲
if ticketinfo['count'] >0:print('%s買到票了' %person)
ticketinfo['count'] -= 1time.sleep(0.2)
with open('ticket', 'w') as f:
json.dump(ticketinfo, f)else:print('%s沒買到票' %person)defticket(person, lock):
search(person)
lock.acquire()#誰獲得鑰匙 誰才能進入
get_ticket(person)
lock.release()#用完了,把鑰匙給下一個人
if __name__ == '__main__':
lock= Lock() #創建一個鎖對象
for i in range(5):
p= Process(target=ticket, args=('person%s' %i, lock))
p.start()#結果:#person1查詢余票: 3#person3查詢余票: 3#person0查詢余票: 3#person2查詢余票: 3#person4查詢余票: 3#person1買到票了#person3買到票了#person0買到票了#person2沒買到票#person4沒買到票
#1、信號量的實現機制:計數器 + 鎖實現的#信號量同步基于內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()#調用被阻塞。#互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據(Samphore相當于有幾把鑰匙,lock只能有一把鑰匙)
importtimeimportrandomfrom multiprocessing importProcessfrom multiprocessing importSemaphoredef changba(person, sem): #在唱吧 唱歌
sem.acquire() #第一次可以同時進來兩個人
print('%s走進唱吧' %person)
time.sleep(random.randint(3, 6)) #每個人唱歌的時間
print('%s走出唱吧' % person) #唱完走人
sem.release() #把鑰匙給下一個人
if __name__ == '__main__':
sem= Semaphore(2) #2把鑰匙
for i in range(5):
p= Process(target=changba, args=('person%s' %i, sem))
p.start()#事件 Event#事件主要提供了三個方法#set、wait、clear。#事件處理的機制:全局定義了一個“Flag”,#如果“Flag”值為False,那么當程序執行event.wait方法時就會阻塞,#如果“Flag”值為True,那么event.wait方法時便不再阻塞。
#阻塞事件 :wait()方法#wait是否阻塞是看event對象內部的Flag
#控制Flag的值:#set() 將Flag的值改成True#clear() 將Flag的值改成False#is_set() 判斷當前的Flag的值
#紅綠燈:
importtimeimportrandomfrom multiprocessing importProcessfrom multiprocessing importEventdef traffic_ligth(e): #紅綠燈
print('\033[31m紅燈亮\033[0m') #Flag 默認是False
whileTrue:if e.is_set(): #如果是綠燈
time.sleep(2) #2秒后
print('\033[31m紅燈亮\033[0m') #轉為紅燈
e.clear() #設置為False
else: #如果是紅燈
time.sleep(2) #2秒后
print('\033[32m綠燈亮\033[0m') #轉為綠燈
e.set() #設置為True
def car(e, i): #車
if note.is_set():print('car %s在等待' %i)
e.wait()print('car %s 通過了' %i)if __name__ == '__main__':
e=Event()
p= Process(target=traffic_ligth, args=(e,)) #紅綠燈進程
p.daemon =True
p.start()
p_lst=[]for i in range(10): #10輛車的進程
time.sleep(random.randrange(0, 3, 2))
p= Process(target=car, args=(e, i))
p.start()
p_lst.append(p)for p inp_lst: p.join()#二、進程通信#隊列(先進先出)#生產者消費者模型
importtimeimportrandomfrom multiprocessing importProcess, Queuedefconsumer(q, name):#消費者
whileTrue:
food= q.get() #在隊列中取值
if food is None: breaktime.sleep(random.uniform(0.3, 1)) #模擬吃消耗的時間
print('%s偷吃了%s,快打死他' %(name, food))defproducter(q, name, food):#生產者
for i in range(10):
time.sleep(random.uniform(0.5, 0.9)) #模擬生產時間
print('%s生產了%s,序號:%s' %(name, food, i))
q.put(food+ str(i)) #把值存入隊列中
if __name__ == '__main__':
q= Queue() #Queue隊列對象
c1 = Process(target=consumer, args=(q, '小明'))
c2= Process(target=consumer, args=(q, '小東'))
c1.start()
c2.start()
p1= Process(target=producter, args=(q, '張三', '面包'))
p2= Process(target=producter, args=(q, '李四', '可樂'))
p1.start()
p2.start()
p1.join()
p2.join()
q.put(None)#有幾個consumer進程就需要放幾個None,表示生產完畢(這就有點low了)
q.put(None)#JoinableQueue#JoinableQueue和Queue幾乎一樣,不同的是JoinableQueue隊列允許使用者告訴隊列某個數據已經處理了。通知進程是使用共享的信號和條件變量來實現的。#task_done():使用者使用此方法發出信號,表示q.get()返回的項目已經被處理#join():當隊列中有數據的時候,使用此方法會進入阻塞,直到放入隊列中所有的數據都被處理掉(都被task_done)才轉換成不阻塞
#解決剛才生產者消費者模型low的問題:
importtimeimportrandomfrom multiprocessing importProcess, JoinableQueuedefconsumer(jq, name):#消費者
whileTrue:
food= jq.get() #在隊列中取值
#if food is None:break
time.sleep(random.uniform(0.3, 1)) #模擬吃消耗的時間
print('%s偷吃了%s,快打死他' %(name, food))
jq.task_done()#向jq.join()發送一次信號,證明這個數據已經處理了
defproducter(jq, name, food):#生產者
for i in range(10):
time.sleep(random.uniform(0.5, 0.9)) #模擬生產時間
print('%s生產了%s,序號:%s' %(name, food, i))
jq.put(food+ str(i)) #把值存入隊列中
if __name__ == '__main__':
jq=JoinableQueue()
c1= Process(target=consumer, args=(jq, '小明'))
c2= Process(target=consumer, args=(jq, '小東'))
c1.daemon= True #把消費者設置為守護進程
c2.daemon =True
c1.start()
c2.start()
p1= Process(target=producter, args=(jq, '張三', '面包'))
p2= Process(target=producter, args=(jq, '李四', '可樂'))
p1.start()
p2.start()
p1.join()
p2.join()
jq.join()#數據全部被task_done后才不阻塞
#管道#Pipe([duplex]):在進程之間創建一條管道,并返回元組(left,right),其中left,right表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道#duplex:默認管道是全雙工的,如果將duplex改成False,left只能用于接收,right只能用于發送。#主要方法:#right.recv(): 接收left.send()發送的內容。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。#letf.send(): 通過連接發送內容。#close(): 關閉連接。
#pipe的端口管理不會隨著某一個進程的關閉就關閉#操作系統來管理進程對這些端口的使用,不使用的端口應該關閉它#一條管道,兩個進程,就有4個端口 每關閉一個端口計數-1,直到只剩下一個端口的時候 recv就會報錯#如果不關閉不使用的端口,在已經把數據發送完畢的情況下,那么接收端的recv就會一直掛起,等待接收數據,這個進程就一直不能關閉#因此不使用的端口就應該關閉它,讓recv拋出異常后對這個進程進行處理
from multiprocessing importProcess, Pipedefconsumer(left, right):
left.close()#若這里不close,則不會異常EOFError,數據接收完畢后,下面的right.recv()就會一直掛起
whileTrue:try:print(right.recv())exceptEOFError:break
if __name__ == '__main__':
left, right=Pipe()
Process(target=consumer, args=(left, right)).start()
right.close()for i in range(10):
left.send('Apple%s' %i)
left.close()#三、進程池#同步、apply
importosimporttimefrom multiprocessing importPooldeftest(num):
time.sleep(1)print('%s:%s' %(num, os.getpid()))return num * 2
if __name__ == '__main__':
p=Pool()for i in range(20):
res= p.apply(test, args=(i,)) #提交任務的方法 同步提交
print('-->', res) #res就是test的return的值
#異步、apply_async
importtimefrom multiprocessing importPooldeffunc(num):
time.sleep(1)print('做了%s件衣服' %num)if __name__ == '__main__':
p= Pool(4) #進程池中創建4個進程,不寫的話,默認值為你電腦的CUP數量
for i in range(50):
p.apply_async(func, args=(i,)) #異步提交func到一個子進程中執行,沒有返回值的情況
p.close() #關閉進程池,用戶不能再向這個池中提交任務了
p.join() #阻塞,直到進程池中所有的任務都被執行完
#注意:#異步提交且沒有返回值接收的情況下必須要用close()和join()#因為如果沒有close()和join(),主進程執行完畢后會立刻把子進程回收了,相當于子進程還沒來得及開啟#所以要join,讓子進程結束后再結束父進程,但是進程池中要使用join就必須先進行close
importtimeimportosfrom multiprocessing importPooldeftest(num):
time.sleep(1)print('%s:%s' %(num, os.getpid()))return num * 2
if __name__ == '__main__':
p=Pool()
res_lst=[]for i in range(20):
res= p.apply_async(test, args=(i,)) #提交任務的方法 異步提交
res_lst.append(res)for res inres_lst:print(res.get())#注意:#異步提交有返回值的情況下,res是一個對象代表的是這個任務的編號,需要用res.get()方法讓任務執行且把返回值返回給你。#get有阻塞效果,拿到子進程的返回值后才不阻塞,所以并不需要再使用close和join。
#map#map接收一個函數和一個可迭代對象,是異步提交的簡化版本,自帶close和join方法#可迭代對象的每一個值就是函數接收的實參,可迭代對象的長度就是創建的任務數量#map拿到返回值是所有結果組成的列表
importtimefrom multiprocessing importPooldeffunc(num):print('子進程:', num)#time.sleep(1)
returnnumif __name__ == '__main__':
p=Pool()
ret= p.map(func, range(10)) #ret是列表
for i inret:print('返回值:', i)#進程池的回調函數(同步提交apply沒有回調函數)
importosfrom multiprocessing importPooldeffunc(i):print('子進程:', os.getpid())returnidefcall_back(res):print('回調函數:', os.getpid())print('res--->', res)if __name__ == '__main__':
p=Pool()print('主進程:', os.getpid())
p.apply_async(func, args=(1,), callback=call_back) #callback關鍵字傳參,參數是回調函數
p.close()
p.join()#結果:#主進程: 4732#子進程: 10552#回調函數: 4732#res---> 1#
#從結果可以看出:#子進程func執行完畢之后才去執行callback回調函數#子進程func的返回值會作為回調函數的參數#回調函數是在主進程中執行的
總結
以上是生活随笔為你收集整理的python使用协程实现udp_python-socket和进程线程协程(代码展示)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RTX5 | 线程管理02 - 创建线程
- 下一篇: websocket python爬虫_p