日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

递归锁、信号量、GIL锁、基于多线程的socket通信和进程池线程池

發(fā)布時(shí)間:2023/12/13 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 递归锁、信号量、GIL锁、基于多线程的socket通信和进程池线程池 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

遞歸鎖、信號(hào)量、GIL鎖、基于多線程的socket通信和進(jìn)程池線程池

遞歸鎖

死鎖現(xiàn)象:是指兩個(gè)或兩個(gè)以上的進(jìn)程和線程因搶奪計(jì)算機(jī)資源而產(chǎn)生的一種互相等待的現(xiàn)象

from threading import Thread from threading import Lock import time lock_A = Lock() lock_B = Lock() class MyThread(Thread):def run(self):self.f1()self.f2()def f1(self):lock_A.acquire()print(f"{self.name}拿到了A鎖")lock_B.acquire()print(f"{self.name}拿到了B鎖")lock_B.release()lock_A.release()def f2(self):lock_B.acquire()print(f"{self.name}拿到了B鎖")time.sleep(0.1)lock_A.acquire()print(f"{self.name}拿到了A鎖")lock_A.release()lock_B.release() if __name__ == '__main__':for i in range(3):t = MyThread()t.start() # 結(jié)果: Thread-1拿到了A鎖 Thread-1拿到了B鎖 Thread-1拿到了B鎖 Thread-2拿到了A鎖

遞歸鎖:

遞歸鎖有一個(gè)計(jì)數(shù)的功能, 原數(shù)字為0,上一次鎖,計(jì)數(shù)+1,釋放一次鎖,計(jì)數(shù)-1,

只要遞歸鎖上面的數(shù)字不為零,其他線程就不能搶鎖.

from threading import Thread from threading import RLock import time lock_A = lock_B = RLock() class MyThread(Thread):def run(self):self.f1()self.f2()def f1(self):lock_A.acquire()print(f"{self.name}拿到了A鎖")lock_B.acquire()print(f"{self.name}拿到了B鎖")lock_B.release()lock_A.release()def f2(self):lock_B.acquire()print(f"{self.name}拿到了B鎖")time.sleep(0.1)lock_A.acquire()print(f"{self.name}拿到了A鎖")lock_A.release()lock_B.release() if __name__ == '__main__':for i in range(3):t = MyThread()t.start()

信號(hào)量

也是一種鎖, 控制并發(fā)數(shù)量

from threading import Thread,Semaphore,current_thread import time import random sem = Semaphore(5) def task():sem.acquire()print(f"{current_thread().name}廁所ing")time.sleep(random.randint(1,3))print(f"{current_thread().name}廁所ed")sem.release() if __name__ == '__main__':for i in range(20):t = Thread(target=task,)t.start()

GIL鎖

GIL鎖的定義:

全局解釋鎖,就是一把互斥鎖,將并發(fā)變成串行,同一時(shí)刻只能有一個(gè)線程使用解釋器資源,犧牲效率,保證解釋器的數(shù)據(jù)安全。

py文件在內(nèi)存中的執(zhí)行過程:

  • 當(dāng)執(zhí)行py文件時(shí),會(huì)在內(nèi)存中開啟一個(gè)進(jìn)程
  • 進(jìn)程中不光包括py文件還有python解釋器,py文件中的線程會(huì)將代碼交給解釋器,
  • 解釋器將python代碼轉(zhuǎn)化為C語(yǔ)言能識(shí)別的字節(jié)碼,然后再交給解釋器中的虛擬機(jī)將字節(jié)碼轉(zhuǎn)化為二進(jìn)制碼最后交給CPU執(zhí)行

當(dāng)線程1先拿到GIL鎖時(shí)線程2、線程3就只能等待,當(dāng)線程1在CPU執(zhí)行遇到阻塞或執(zhí)行一段時(shí)間后,線程1會(huì)被掛起,同時(shí)GIL鎖會(huì)被釋放,此時(shí)線程2或線程3就會(huì)拿到鎖進(jìn)入解釋器,同樣,當(dāng)在CPU執(zhí)行遇到阻塞或執(zhí)行一段時(shí)間后被掛起,同時(shí)GIL鎖會(huì)被釋放,此時(shí)最后一個(gè)線程就會(huì)進(jìn)入解釋器。

從上面可以看出,當(dāng)遇到單個(gè)進(jìn)程中含有多個(gè)線程時(shí),由于GIL鎖的存在,Cpython并不能利用多核進(jìn)行并行處理,但可以在單核實(shí)現(xiàn)并發(fā)。

但不同進(jìn)程之間的多線程是可以利用多核的。

GIL鎖的兩個(gè)作用:

1、保證解釋器里面的數(shù)據(jù)的安全;

2、強(qiáng)行加鎖,減輕開發(fā)負(fù)擔(dān)

問題:單進(jìn)程的多線程不能利用多核

如何判斷什么情況使用多線程并發(fā)與多進(jìn)程并發(fā)

對(duì)計(jì)算來說,cpu越多越好,但是對(duì)于I/O來說,再多的cpu也沒用

  當(dāng)然對(duì)運(yùn)行一個(gè)程序來說,隨著cpu的增多執(zhí)行效率肯定會(huì)有所提高(不管提高幅度多大,總會(huì)有所提高),這是因?yàn)橐粋€(gè)程序基本上不會(huì)是純計(jì)算或者純I/O,所以應(yīng)該相對(duì)的去看一個(gè)程序到底是計(jì)算密集型還是I/O密集型,如下:

#分析: 我們有四個(gè)任務(wù)需要處理,處理方式肯定是要達(dá)到并發(fā)的效果,解決方案可以是: 方案一:開啟四個(gè)進(jìn)程 方案二:一個(gè)進(jìn)程下,開啟四個(gè)線程#單核情況下,分析結(jié)果: 如果四個(gè)任務(wù)是計(jì)算密集型,沒有多核來并行計(jì)算,方案一徒增了創(chuàng)建進(jìn)程的開銷,方案二勝如果四個(gè)任務(wù)是I/O密集型,方案一創(chuàng)建進(jìn)程的開銷大,且進(jìn)程的切換速度遠(yuǎn)不如線程,方案二勝#多核情況下,分析結(jié)果:如果四個(gè)任務(wù)是計(jì)算密集型,多核意味著并行計(jì)算,在python中一個(gè)進(jìn)程中同一時(shí)刻只有一個(gè)線程執(zhí)行,可以利用多核,方案一勝如果四個(gè)任務(wù)是I/O密集型,再多的核也解決不了I/O問題,方案二勝#結(jié)論:現(xiàn)在的計(jì)算機(jī)基本上都是多核,python對(duì)于計(jì)算密集型的任務(wù)開多線程的效率并不能帶來多大性能上的提升,甚至不如串行(沒有大量切換),但是,對(duì)于IO密集型的任務(wù)效率還是有顯著提升的。

總結(jié):多核前提下,如果任務(wù)IO密集型,使用多線程并發(fā);如果任務(wù)計(jì)算密集型,使用多進(jìn)程并發(fā)。

驗(yàn)證Cpython的并發(fā)效率

  • 計(jì)算密集型: 單個(gè)進(jìn)程的多線程并發(fā) vs 多個(gè)進(jìn)程的并發(fā)并行
from threading import Thread from multiprocessing import Process import time import randomif __name__ == '__main__':# 多進(jìn)程的并發(fā),并行start_time = time.time()l1 = []for i in range(4):p = Process(target=task,)l1.append(p)p.start()for p in l1:p.join()print(f'執(zhí)行效率:{time.time()- start_time}') # 2.5342209339141846 from threading import Thread from multiprocessing import Process import time import randomif __name__ == '__main__': # 多線程的并發(fā)start_time = time.time()l1 = []for i in range(4):p = Thread(target=task,)l1.append(p)p.start()for p in l1:p.join()print(f'執(zhí)行效率:{time.time()- start_time}') # 5.262923240661621

? 總結(jié): 計(jì)算密集型: 多進(jìn)程的并發(fā)并行效率高.

  • IO密集型: 單個(gè)進(jìn)程的多線程并發(fā) vs 多個(gè)進(jìn)程的并發(fā)并行
from threading import Thread from multiprocessing import Process import time import randomdef task():count = 0time.sleep(random.randint(1,3))count += 1if __name__ == '__main__':# 多進(jìn)程的并發(fā),并行start_time = time.time()l1 = []for i in range(50):p = Process(target=task,)l1.append(p)p.start()for p in l1:p.join()print(f'執(zhí)行效率:{time.time()- start_time}') # 7.145753383636475 from threading import Thread from multiprocessing import Process import time import randomdef task():count = 0time.sleep(random.randint(1,3))count += 1if __name__ == '__main__':start_time = time.time()l1 = []for i in range(50):p = Thread(target=task,)l1.append(p)p.start()for p in l1:p.join()print(f'執(zhí)行效率:{time.time()- start_time}') # 3.0278055667877197

? 總結(jié):對(duì)于IO密集型: 單個(gè)進(jìn)程的多線程的并發(fā)效率高.

基于多線程的socket通信

客戶端:

import socketclient = socket.socket()client.connect(('127.0.0.1',8848))while 1:try:to_server_data = input('>>>').strip()client.send(to_server_data.encode('utf-8'))from_server_data = client.recv(1024)print(f'來自服務(wù)端的消息: {from_server_data.decode("utf-8")}')except Exception:break client.close()

服務(wù)端:

import socket from threading import Threaddef communicate(conn,addr):while 1:try:from_client_data = conn.recv(1024)print(f'來自客戶端{(lán)addr[1]}的消息: {from_client_data.decode("utf-8")}')to_client_data = input('>>>').strip()conn.send(to_client_data.encode('utf-8'))except Exception:breakconn.close()def _accept():server = socket.socket()server.bind(('127.0.0.1', 8848))server.listen(5)while 1:conn, addr = server.accept()t = Thread(target=communicate,args=(conn,addr))t.start()if __name__ == '__main__':_accept()

進(jìn)程池線程池

線程池: 一個(gè)容器,這個(gè)容器限制住你開啟線程的數(shù)量,比如4個(gè),第一次肯定只能并發(fā)的處理4個(gè)任務(wù),只要有任務(wù)完成,線程馬上就會(huì)接下一個(gè)任務(wù).

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import os import time import random# print(os.cpu_count()) 獲取cpu數(shù)量 def task(n):print(f'{os.getpid()} 接客')time.sleep(random.randint(1,3))# 開啟進(jìn)程池 (并行(并行+并發(fā))) if __name__ == '__main__':p = ProcessPoolExecutor() # 進(jìn)程池,默認(rèn)不寫,開啟數(shù)量為cpu數(shù)量for i in range(20):p.submit(task,i)# 開啟線程池 (并發(fā))t = ThreadPoolExecutor() # 默認(rèn)不寫, cpu個(gè)數(shù)*5 線程數(shù)# t = ThreadPoolExecutor(100) # 100個(gè)線程for i in range(20):t.submit(task,i)

轉(zhuǎn)載于:https://www.cnblogs.com/lifangzheng/p/11415009.html

總結(jié)

以上是生活随笔為你收集整理的递归锁、信号量、GIL锁、基于多线程的socket通信和进程池线程池的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。