python framework threads_Python - 多线程
當(dāng)我們寫的程序需要并發(fā)時,我們就需要用到 Python 中的一些并發(fā)庫,例如 asyncio、thread、 multiprocessing 等,本文主要介紹 Python 標(biāo)準(zhǔn)庫中的多線程庫 thread
threading 基本使用
使用多線程的優(yōu)勢在于
程序運行更快
適用于 IO 密集的場景
Python 標(biāo)準(zhǔn)庫提供了兩個模塊,_thread 和 threading ,threading 對 _thread 進(jìn)行了封裝,雖然 Python 有 GIL ,會在線程切換時消耗很多資源,但是在 IO 密集的場景下,Python 多線程還是很管用的
先看看 threading 的基本使用
import threading
def hello(*args, **kwargs):#定義一個 hello 函數(shù)
print('hello, world', args, kwargs)
實例化線程對象,target 參數(shù)為指定函數(shù),args 是傳遞的列表參數(shù),kwargs 是傳遞的字典參數(shù),通過 start 方法啟動一個線程
>>>t = threading.Thread(target=hello, args=[1, 2, 3], kwargs={'a': 'b'})
>>>t.start()
hello, world (1, 2, 3) {'a': 'b'}
threading 和 Thread 常用參數(shù)和方法
name 參數(shù)
導(dǎo)入 logging 庫,更加直觀的顯示線程的信息
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s') #配置 logging
def hello():
logging.info('test')
>>>t = threading.Thread(target=hello, name='hello_thread')#指定線程名稱
2017-03-12 22:30:58,556 INFO [hello_thread] test
daemon
線程退出的時候,該線程的所有daemon 的子線程都會退出,而no-daemon 子線程不會退出
而線程退出會等待所有的 no-daemon 子線程退出
join 方法
子線程的 join 方法會阻塞主線程,直到子線程退出或者超時,如果不設(shè)置超時時間,就會一直等待子線程退出
import threading
from time import sleep
def worker():
sleep(3)
threads = [threading.Thread(target=worker) for _ in range(3)]
def main():
for t in threads:
t.start()
for t in threads:
t.join()
執(zhí)行 main 函數(shù)能夠感覺到是一直阻塞著的,直到子線程退出
>>>main()
enumerate 方法
列出當(dāng)前所有的線程
>>>threading.enumerate()
[<_mainthread started>,
local
線程共享內(nèi)存、狀態(tài)和資源,但是threading local 對象的屬性,只對當(dāng)前線程可見
>>>ctx = threading.local()
>>>ctx
>>>ctx.data = 'aaa'
def worker():
print(ctx.data)
>>>worker()
'aaa'
>>>threading.Thread(target=worker).start()
In [101]: Exception in thread Thread-2477:
Traceback (most recent call last):
File "/usr/local/opt/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/local/opt/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "", line 2, in worker
print(ctx.data)
AttributeError: '_thread._local' object has no attribute 'data'
實例化 Thread 類
之前通過 target 參數(shù)的方式不是非常的優(yōu)雅,其實可以通過繼承 Thread 類并重寫 run 方法來編寫更加優(yōu)雅的代碼
class MyThread(threading.Thread):
def run(self):
print('hello, world')
>>>MyThread()
>>>MyThread().start()
hello, world
傳遞參數(shù)
通過重寫 __init__() 方法傳遞參數(shù)
class MyThread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__()
self.args = args
self.kwargs = kwargs
def run(self):
print('hello, world', self.args, self.kwargs)
>>>t = MyThread(1, 2, 3, state='ok')
>>>t.start()
hello, world (1, 2, 3) {'state': 'ok'}
線程同步
在使用多個線程同時操作一個資源的情況下( 例如讀文件) ,我們需要控制同一時刻只有一個線程對資源進(jìn)行操作,這時候就需要一些同步機(jī)制,如 鎖、隊列、條件、事件等
Lock
我們可以通過 threading.Lock 來解決這個問題
Lock 對象一般有兩個操作,獲取 acquire 和 釋放 release
通過 acquire 方法 將 Lock 對象狀態(tài)設(shè)置為鎖定,如果是鎖定狀態(tài)則會阻塞,release 方法則將 Lock 對象解鎖
import threading
lock = threading.Lock()
>>>lock.acquire()
True
>>>lock.acquire(timeout=3)
False
>>>lock.release()
>>>lock.acquire(timeout=3)
True
一個抓取頁面的例子,通過使用鎖,實現(xiàn)了線程之間的同步
import requests
import threading
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
lock = threading.Lock()
file = open('data', 'a')
urls = ['http://baidu.com', 'http://cn.bing.com/']
class FetchUrls(threading.Thread):
def __init__(self, url: str, file, lock: threading.Lock, name=None):
super().__init__()
self.url = url
self.file = file
self.lock = lock
if name is not None:
self.name = name
def run(self):
res = requests.get(self.url)
self.lock.acquire()
logging.info('Lock Acquire')
self.file.write(res.text)
logging.info('File Writed')
self.lock.release()
測試
>>>ts = [FetchUrls(url, file, lock, name=url) for url in urls]
>>>[t.start() for t in ts]
2017-03-13 14:00:05,142 INFO [http://baidu.com] Lock Acquire
2017-03-13 14:00:05,142 INFO [http://baidu.com] File Writed
2017-03-13 14:00:05,271 INFO [http://cn.bing.com/] Lock Acquire
2017-03-13 14:00:05,272 INFO [http://cn.bing.com/] File Writed
RLock
RLock 是一個可重用鎖,可以多次調(diào)用 acquire 而不阻塞,但是 release 時也要執(zhí)行和 acquire 一樣的次數(shù)
import threading
>>>rlock = threading.RLock()
>>>rlock.acquire()
True
>>>rlock.acquire()
True
>>>rlock.acquire()
True
>>>rlock.release()
>>>rlock.release()
>>>rlock.release()
>>>rlock.release()
Traceback (most recent call last):
File "", line 1, in
RuntimeError: cannot release un-acquired lock
Condition
如果多個線程使用 生產(chǎn)者 —> 消費者的模式,可以使用 Condition,生產(chǎn)者生產(chǎn)數(shù)據(jù)后,通過 notify/notify_all 通知給消費者消費數(shù)據(jù)
import threading
import random
import logging
import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
class Producer(threading.Thread):
def __init__(self, datas: list, cond: threading.Condition, name=None):
super().__init__()
self.cond = cond
self.datas = datas
if name is not None:
self.name = name
def run(self):
while True:
data = random.randint(1, 100)
logging.info(data)
self.datas.append(data)
self.cond.acquire()
self.cond.notify()
self.cond.release()
time.sleep(1)
"""
self.cond.acquire()
self.cond.notify()
self.cond.release()
等價于
with self.cond:
self.notify()
無論 notify 還是 wait 都需要先 acquire,然后再 release
一般使用 with 語句
"""
class Consumer(threading.Thread):
def __init__(self, datas: list, cond: threading.Condition, name=None):
super().__init__()
self.cond = cond
self.datas = datas
if name is not None:
self.name = name
def run(self):
while True:
self.cond.acquire()
while True:
data = self.datas.pop()
logging.info(data)
break
self.cond.wait() #消費者通過 wait 方法等待 生產(chǎn)者 notify
self.cond.release()
def runner():
datas = []
cond = threading.Condition()
t1 = Producer(datas, cond, name='producer')
t2 = Consumer(datas, cond, name='consumer')
t1.start()
t2.start()
t1.join()
t2.join()
測試
>>>runner()
2017-03-13 14:56:12,442 INFO [producer] 89
2017-03-13 14:56:12,442 INFO [consumer] 89
2017-03-13 14:56:13,445 INFO [producer] 85
2017-03-13 14:56:13,445 INFO [consumer] 85
2017-03-13 14:56:14,450 INFO [producer] 57
2017-03-13 14:56:14,450 INFO [consumer] 57
2017-03-13 14:56:15,454 INFO [producer] 65
2017-03-13 14:56:15,454 INFO [consumer] 65
2017-03-13 14:56:16,458 INFO [producer] 15
2017-03-13 14:56:16,459 INFO [consumer] 15
Event
Event 是一個簡單的機(jī)制,線程發(fā)出一個信號,其他線程等待
import threading
import logging
import time
import random
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
class EventProducer(threading.Thread):
def __init__(self, datas: list, event: threading.Event, name=None):
super().__init__()
self.datas = datas
self.event = event
if name is not None:
self.name = name
def run(self):
while True:
data = random.randint(1, 100)
logging.info(data)
self.datas.append(data)
self.event.set()
time.sleep(1)
class EventConsumer(threading.Thread):
def __init__(self, datas: list, event: threading.Event, name=None):
super().__init__()
self.datas = datas
self.event = event
if name is not None:
self.name = name
def run(self):
while True:
self.event.wait() # wait 方法阻塞 消費者線程
try:
data = self.datas.pop()
logging.info(data)
except IndexError:
continue
def runner():
event = threading.Event()
datas = []
t1 = EventProducer(datas, event, name='EventProducer')
t2 = EventConsumer(datas, event, name='EventConsumer')
t1.start()
t2.start()
t1.join()
t2.join()
測試
>>>runner()
2017-03-13 16:18:54,251 INFO [EventProducer] 82
2017-03-13 16:18:54,251 INFO [EventConsumer] 82
2017-03-13 16:18:55,261 INFO [EventProducer] 37
2017-03-13 16:18:55,261 INFO [EventConsumer] 37
2017-03-13 16:18:56,270 INFO [EventProducer] 92
2017-03-13 16:18:56,271 INFO [EventConsumer] 92
Queue
之前的幾個 提供者 —> 消費者 的例子 一直用一個全局的列表來傳遞數(shù)據(jù),其實不是很科學(xué),不同線程傳遞數(shù)據(jù)應(yīng)該使用 Queue ,因為 Queue 本身也可以阻塞線程,使用 Queue 還可以省去同步
import queue
import threading
import logging
import random
from time import sleep
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
class QueueProducer(threading.Thread):
def __init__(self, queue: queue.Queue(), name=None):
super().__init__()
self.queue = queue
if name is not None:
self.name = name
def run(self):
while True:
data = random.randint(1, 100)
logging.info(data)
self.queue.put(data)
sleep(1)
class QueueConsumer(threading.Thread):
def __init__(self, queue: queue.Queue, name=None):
super().__init__()
self.queue = queue
if name is not None:
self.name = name
def run(self):
while True:
data = self.queue.get()
logging.info(data)
def runner():
q = queue.Queue()
t1 = QueueProducer(q, name='QueueProducer')
t2 = QueueConsumer(q, name='QueueConsumer')
t1.start()
t2.start()
t1.join()
t2.join()
測試
>>>runner()
2017-03-13 16:34:49,401 INFO [QueueProducer] 82
2017-03-13 16:34:49,401 INFO [QueueConsumer] 82
2017-03-13 16:34:50,405 INFO [QueueProducer] 2
2017-03-13 16:34:50,405 INFO [QueueConsumer] 2
2017-03-13 16:34:51,406 INFO [QueueProducer] 74
GIL
提到 Python 多線程就一定要說說 GIL Global Interpreter Lock 全局解釋器鎖,由于 GIL 的存在,Python 的線程不能達(dá)到真正的并行,在 CPython (C語言實現(xiàn)的 Python) 中 線程使用的是操作系統(tǒng)原生的線程
CPython 中,一個解釋器有一條主線程,和若干條用戶程序的線程,由于 GIL 的存在,每一個進(jìn)程要執(zhí)行時,都要去獲取 GIL ,所以并不能有效的利用多核 CPU 實現(xiàn)多線程并行,也就是說,多個線程不能夠同時執(zhí)行
如果要實現(xiàn)真正的并行,就需要使用 multiprocessing 這個多進(jìn)程模塊了
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的python framework threads_Python - 多线程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 斑马888t打印机墨盒安装_硒鼓?墨盒?
- 下一篇: kmeans python_k-mean