日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python framework threads_Python - 多线程

發(fā)布時間:2024/9/27 python 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python framework threads_Python - 多线程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

當(dāng)我們寫的程序需要并發(fā)時,我們就需要用到 Python 中的一些并發(fā)庫,例如 asyncio、thread、 multiprocessing 等,本文主要介紹 Python 標(biāo)準(zhǔn)庫中的多線程庫 thread

threading 基本使用

使用多線程的優(yōu)勢在于

程序運行更快

適用于 IO 密集的場景

Python 標(biāo)準(zhǔn)庫提供了兩個模塊,_thread 和 threading ,threading 對 _thread 進(jìn)行了封裝,雖然 Python 有 GIL ,會在線程切換時消耗很多資源,但是在 IO 密集的場景下,Python 多線程還是很管用的

先看看 threading 的基本使用

import threading

def hello(*args, **kwargs):#定義一個 hello 函數(shù)

print('hello, world', args, kwargs)

實例化線程對象,target 參數(shù)為指定函數(shù),args 是傳遞的列表參數(shù),kwargs 是傳遞的字典參數(shù),通過 start 方法啟動一個線程

>>>t = threading.Thread(target=hello, args=[1, 2, 3], kwargs={'a': 'b'})

>>>t.start()

hello, world (1, 2, 3) {'a': 'b'}

threading 和 Thread 常用參數(shù)和方法

name 參數(shù)

導(dǎo)入 logging 庫,更加直觀的顯示線程的信息

import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s') #配置 logging

def hello():

logging.info('test')

>>>t = threading.Thread(target=hello, name='hello_thread')#指定線程名稱

2017-03-12 22:30:58,556 INFO [hello_thread] test

daemon

線程退出的時候,該線程的所有daemon 的子線程都會退出,而no-daemon 子線程不會退出

而線程退出會等待所有的 no-daemon 子線程退出

join 方法

子線程的 join 方法會阻塞主線程,直到子線程退出或者超時,如果不設(shè)置超時時間,就會一直等待子線程退出

import threading

from time import sleep

def worker():

sleep(3)

threads = [threading.Thread(target=worker) for _ in range(3)]

def main():

for t in threads:

t.start()

for t in threads:

t.join()

執(zhí)行 main 函數(shù)能夠感覺到是一直阻塞著的,直到子線程退出

>>>main()

enumerate 方法

列出當(dāng)前所有的線程

>>>threading.enumerate()

[<_mainthread started>,

local

線程共享內(nèi)存、狀態(tài)和資源,但是threading local 對象的屬性,只對當(dāng)前線程可見

>>>ctx = threading.local()

>>>ctx

>>>ctx.data = 'aaa'

def worker():

print(ctx.data)

>>>worker()

'aaa'

>>>threading.Thread(target=worker).start()

In [101]: Exception in thread Thread-2477:

Traceback (most recent call last):

File "/usr/local/opt/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 914, in _bootstrap_inner

self.run()

File "/usr/local/opt/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 862, in run

self._target(*self._args, **self._kwargs)

File "", line 2, in worker

print(ctx.data)

AttributeError: '_thread._local' object has no attribute 'data'

實例化 Thread 類

之前通過 target 參數(shù)的方式不是非常的優(yōu)雅,其實可以通過繼承 Thread 類并重寫 run 方法來編寫更加優(yōu)雅的代碼

class MyThread(threading.Thread):

def run(self):

print('hello, world')

>>>MyThread()

>>>MyThread().start()

hello, world

傳遞參數(shù)

通過重寫 __init__() 方法傳遞參數(shù)

class MyThread(threading.Thread):

def __init__(self, *args, **kwargs):

super().__init__()

self.args = args

self.kwargs = kwargs

def run(self):

print('hello, world', self.args, self.kwargs)

>>>t = MyThread(1, 2, 3, state='ok')

>>>t.start()

hello, world (1, 2, 3) {'state': 'ok'}

線程同步

在使用多個線程同時操作一個資源的情況下( 例如讀文件) ,我們需要控制同一時刻只有一個線程對資源進(jìn)行操作,這時候就需要一些同步機(jī)制,如 鎖、隊列、條件、事件等

Lock

我們可以通過 threading.Lock 來解決這個問題

Lock 對象一般有兩個操作,獲取 acquire 和 釋放 release

通過 acquire 方法 將 Lock 對象狀態(tài)設(shè)置為鎖定,如果是鎖定狀態(tài)則會阻塞,release 方法則將 Lock 對象解鎖

import threading

lock = threading.Lock()

>>>lock.acquire()

True

>>>lock.acquire(timeout=3)

False

>>>lock.release()

>>>lock.acquire(timeout=3)

True

一個抓取頁面的例子,通過使用鎖,實現(xiàn)了線程之間的同步

import requests

import threading

import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

lock = threading.Lock()

file = open('data', 'a')

urls = ['http://baidu.com', 'http://cn.bing.com/']

class FetchUrls(threading.Thread):

def __init__(self, url: str, file, lock: threading.Lock, name=None):

super().__init__()

self.url = url

self.file = file

self.lock = lock

if name is not None:

self.name = name

def run(self):

res = requests.get(self.url)

self.lock.acquire()

logging.info('Lock Acquire')

self.file.write(res.text)

logging.info('File Writed')

self.lock.release()

測試

>>>ts = [FetchUrls(url, file, lock, name=url) for url in urls]

>>>[t.start() for t in ts]

2017-03-13 14:00:05,142 INFO [http://baidu.com] Lock Acquire

2017-03-13 14:00:05,142 INFO [http://baidu.com] File Writed

2017-03-13 14:00:05,271 INFO [http://cn.bing.com/] Lock Acquire

2017-03-13 14:00:05,272 INFO [http://cn.bing.com/] File Writed

RLock

RLock 是一個可重用鎖,可以多次調(diào)用 acquire 而不阻塞,但是 release 時也要執(zhí)行和 acquire 一樣的次數(shù)

import threading

>>>rlock = threading.RLock()

>>>rlock.acquire()

True

>>>rlock.acquire()

True

>>>rlock.acquire()

True

>>>rlock.release()

>>>rlock.release()

>>>rlock.release()

>>>rlock.release()

Traceback (most recent call last):

File "", line 1, in

RuntimeError: cannot release un-acquired lock

Condition

如果多個線程使用 生產(chǎn)者 —> 消費者的模式,可以使用 Condition,生產(chǎn)者生產(chǎn)數(shù)據(jù)后,通過 notify/notify_all 通知給消費者消費數(shù)據(jù)

import threading

import random

import logging

import time

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class Producer(threading.Thread):

def __init__(self, datas: list, cond: threading.Condition, name=None):

super().__init__()

self.cond = cond

self.datas = datas

if name is not None:

self.name = name

def run(self):

while True:

data = random.randint(1, 100)

logging.info(data)

self.datas.append(data)

self.cond.acquire()

self.cond.notify()

self.cond.release()

time.sleep(1)

"""

self.cond.acquire()

self.cond.notify()

self.cond.release()

等價于

with self.cond:

self.notify()

無論 notify 還是 wait 都需要先 acquire,然后再 release

一般使用 with 語句

"""

class Consumer(threading.Thread):

def __init__(self, datas: list, cond: threading.Condition, name=None):

super().__init__()

self.cond = cond

self.datas = datas

if name is not None:

self.name = name

def run(self):

while True:

self.cond.acquire()

while True:

data = self.datas.pop()

logging.info(data)

break

self.cond.wait() #消費者通過 wait 方法等待 生產(chǎn)者 notify

self.cond.release()

def runner():

datas = []

cond = threading.Condition()

t1 = Producer(datas, cond, name='producer')

t2 = Consumer(datas, cond, name='consumer')

t1.start()

t2.start()

t1.join()

t2.join()

測試

>>>runner()

2017-03-13 14:56:12,442 INFO [producer] 89

2017-03-13 14:56:12,442 INFO [consumer] 89

2017-03-13 14:56:13,445 INFO [producer] 85

2017-03-13 14:56:13,445 INFO [consumer] 85

2017-03-13 14:56:14,450 INFO [producer] 57

2017-03-13 14:56:14,450 INFO [consumer] 57

2017-03-13 14:56:15,454 INFO [producer] 65

2017-03-13 14:56:15,454 INFO [consumer] 65

2017-03-13 14:56:16,458 INFO [producer] 15

2017-03-13 14:56:16,459 INFO [consumer] 15

Event

Event 是一個簡單的機(jī)制,線程發(fā)出一個信號,其他線程等待

import threading

import logging

import time

import random

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class EventProducer(threading.Thread):

def __init__(self, datas: list, event: threading.Event, name=None):

super().__init__()

self.datas = datas

self.event = event

if name is not None:

self.name = name

def run(self):

while True:

data = random.randint(1, 100)

logging.info(data)

self.datas.append(data)

self.event.set()

time.sleep(1)

class EventConsumer(threading.Thread):

def __init__(self, datas: list, event: threading.Event, name=None):

super().__init__()

self.datas = datas

self.event = event

if name is not None:

self.name = name

def run(self):

while True:

self.event.wait() # wait 方法阻塞 消費者線程

try:

data = self.datas.pop()

logging.info(data)

except IndexError:

continue

def runner():

event = threading.Event()

datas = []

t1 = EventProducer(datas, event, name='EventProducer')

t2 = EventConsumer(datas, event, name='EventConsumer')

t1.start()

t2.start()

t1.join()

t2.join()

測試

>>>runner()

2017-03-13 16:18:54,251 INFO [EventProducer] 82

2017-03-13 16:18:54,251 INFO [EventConsumer] 82

2017-03-13 16:18:55,261 INFO [EventProducer] 37

2017-03-13 16:18:55,261 INFO [EventConsumer] 37

2017-03-13 16:18:56,270 INFO [EventProducer] 92

2017-03-13 16:18:56,271 INFO [EventConsumer] 92

Queue

之前的幾個 提供者 —> 消費者 的例子 一直用一個全局的列表來傳遞數(shù)據(jù),其實不是很科學(xué),不同線程傳遞數(shù)據(jù)應(yīng)該使用 Queue ,因為 Queue 本身也可以阻塞線程,使用 Queue 還可以省去同步

import queue

import threading

import logging

import random

from time import sleep

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class QueueProducer(threading.Thread):

def __init__(self, queue: queue.Queue(), name=None):

super().__init__()

self.queue = queue

if name is not None:

self.name = name

def run(self):

while True:

data = random.randint(1, 100)

logging.info(data)

self.queue.put(data)

sleep(1)

class QueueConsumer(threading.Thread):

def __init__(self, queue: queue.Queue, name=None):

super().__init__()

self.queue = queue

if name is not None:

self.name = name

def run(self):

while True:

data = self.queue.get()

logging.info(data)

def runner():

q = queue.Queue()

t1 = QueueProducer(q, name='QueueProducer')

t2 = QueueConsumer(q, name='QueueConsumer')

t1.start()

t2.start()

t1.join()

t2.join()

測試

>>>runner()

2017-03-13 16:34:49,401 INFO [QueueProducer] 82

2017-03-13 16:34:49,401 INFO [QueueConsumer] 82

2017-03-13 16:34:50,405 INFO [QueueProducer] 2

2017-03-13 16:34:50,405 INFO [QueueConsumer] 2

2017-03-13 16:34:51,406 INFO [QueueProducer] 74

GIL

提到 Python 多線程就一定要說說 GIL Global Interpreter Lock 全局解釋器鎖,由于 GIL 的存在,Python 的線程不能達(dá)到真正的并行,在 CPython (C語言實現(xiàn)的 Python) 中 線程使用的是操作系統(tǒng)原生的線程

CPython 中,一個解釋器有一條主線程,和若干條用戶程序的線程,由于 GIL 的存在,每一個進(jìn)程要執(zhí)行時,都要去獲取 GIL ,所以并不能有效的利用多核 CPU 實現(xiàn)多線程并行,也就是說,多個線程不能夠同時執(zhí)行

如果要實現(xiàn)真正的并行,就需要使用 multiprocessing 這個多進(jìn)程模塊了

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結(jié)

以上是生活随笔為你收集整理的python framework threads_Python - 多线程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。