python中的多线程、多进程
簡介
使用Python可以快速地編寫程序,但是python對(duì)多線程的支持卻不好,在Python2中,更多地使用多進(jìn)程。在Python3中,引入了concurrent,便于多線程/進(jìn)程開發(fā)。
Python GIL
Python代碼的執(zhí)行由Python解釋器進(jìn)行控制,目前Python的解釋器有多種,比較著名的有CPython、PyPy、Jython等。其中CPython為最廣泛使用的Python解釋器,是最早的由c語言開發(fā)。
在OS中,支持多個(gè)線程同時(shí)執(zhí)行。 但在Python設(shè)計(jì)之初考慮到在Python解釋器的主循環(huán)中執(zhí)行Python代碼,于是CPython中設(shè)計(jì)了全局解釋器鎖GIL(Global Interpreter Lock)機(jī)制,用于管理解釋器的訪問,Python線程的執(zhí)行必須先競爭到GIL權(quán)限才能執(zhí)行。
因此無論是單核還是多核CPU,任意給定時(shí)刻只有一個(gè)線程會(huì)被Python解釋器執(zhí)行,無法多線程運(yùn)行。并這也是為什么在多核CPU上,Python的多線程有時(shí)效率并不高的根本原因。
Python2中高性能解決方法
Python多任務(wù)的解決方案主要由這么幾種:
- 啟動(dòng)多進(jìn)程,每個(gè)進(jìn)程只有一個(gè)線程,通過多進(jìn)程執(zhí)行多任務(wù);
- 啟動(dòng)單進(jìn)程,在進(jìn)程內(nèi)啟動(dòng)多線程,通過多線程執(zhí)行多任務(wù);
- 啟動(dòng)多進(jìn)程,在每個(gè)進(jìn)程內(nèi)再啟動(dòng)多個(gè)線程,同時(shí)執(zhí)行更多的任務(wù)–這樣子太復(fù)雜,實(shí)際上效果并不好,使用的更少。
使用多進(jìn)程
多進(jìn)程的package對(duì)應(yīng)的是multiprocessing。
先看一下Process類。
''' from multiprocessing.process import Process, current_process, active_children '''class Process(object):'''Process objects represent activity that is run in a separate processThe class is analagous to `threading.Thread`'''_Popen = Nonedef __init__(self, group=None, target=None, name=None, args=(), kwargs={}):assert group is None, 'group argument must be None for now'count = _current_process._counter.next()self._identity = _current_process._identity + (count,)self._authkey = _current_process._authkeyself._daemonic = _current_process._daemonicself._tempdir = _current_process._tempdirself._parent_pid = os.getpid()self._popen = Noneself._target = targetself._args = tuple(args)self._kwargs = dict(kwargs)self._name = name or type(self).__name__ + '-' + \':'.join(str(i) for i in self._identity)一個(gè)簡單的Process的使用示例:
from multiprocessing import Processdef f(name):print 'hello', nameif __name__ == '__main__':p = Process(target=f, args=('bob',))p.start()p.join()多線程處理
線程處理的package是threading.
先簡單看一下Thread類
# Main class for threadsclass Thread(_Verbose):"""A class that represents a thread of control.This class can be safely subclassed in a limited fashion."""__initialized = False# Need to store a reference to sys.exc_info for printing# out exceptions when a thread tries to use a global var. during interp.# shutdown and thus raises an exception about trying to perform some# operation on/with a NoneType__exc_info = _sys.exc_info# Keep sys.exc_clear too to clear the exception just before# allowing .join() to return.__exc_clear = _sys.exc_cleardef __init__(self, group=None, target=None, name=None,args=(), kwargs=None, verbose=None):"""This constructor should always be called with keyword arguments. Arguments are:*group* should be None; reserved for future extension when a ThreadGroupclass is implemented.*target* is the callable object to be invoked by the run()method. Defaults to None, meaning nothing is called.*name* is the thread name. By default, a unique name is constructed ofthe form "Thread-N" where N is a small decimal number.*args* is the argument tuple for the target invocation. Defaults to ().*kwargs* is a dictionary of keyword arguments for the targetinvocation. Defaults to {}.If a subclass overrides the constructor, it must make sure to invokethe base class constructor (Thread.__init__()) before doing anythingelse to the thread."""簡單示例
#!/usr/bin/python from threading import Threaddef count(n):print "begin count..." "\r\n"while n > 0:n-=1print "done."def test_ThreadCount():t1 = Thread(target=count,args=(1000000,))print("start thread.")t1.start()print "join thread." t1.join()if __name__ == '__main__': test_ThreadCount()輸出:
start thread. begin count... join thread.done.使用多進(jìn)程和多線程性能對(duì)比
測試代碼是網(wǎng)友的,使用了timeit, 請(qǐng)先安裝此包。
#!/usr/bin/python from threading import Thread from multiprocessing import Process,Manager from timeit import timeitdef count(n):while n > 0:n-=1def test_normal():count(1000000)count(1000000)def test_Thread():t1 = Thread(target=count,args=(1000000,))t2 = Thread(target=count,args=(1000000,))t1.start()t2.start()t1.join()t2.join()def test_Process():t1 = Process(target=count,args=(1000000,))t2 = Process(target=count,args=(1000000,))t1.start()t2.start()t1.join()t2.join()if __name__ == '__main__':print "test_normal",timeit('test_normal()','from __main__ import test_normal',number=10)print "test_Thread",timeit('test_Thread()','from __main__ import test_Thread',number=10)print "test_Process",timeit('test_Process()','from __main__ import test_Process',number=10)執(zhí)行后的輸出結(jié)果:
test_normal 1.0291161 test_Thread 7.5084157 test_Process 1.6441867可見,直接使用方法反而最快,使用Process次之,使用Thread最慢。單這個(gè)測試只是運(yùn)算測試。如果有IO類的慢速操作時(shí),還是要使用Process或者Thread。
python3中的concurrent.futures包
使用java或者CSharp的開發(fā)者,對(duì)future應(yīng)該比較了解。這個(gè)是用以并發(fā)支持。
在Python3.2中提供了concurrent.futures包, 而python 2.7需要安裝futures模塊,使用命令pip install futures安裝即可.
模塊concurrent.futures給開發(fā)者提供一個(gè)執(zhí)行異步調(diào)用的高級(jí)接口。concurrent.futures基本上就是在Python的threading和multiprocessing模塊之上構(gòu)建的抽象層,更易于使用。盡管這個(gè)抽象層簡化了這些模塊的使用,但是也降低了很多靈活性。
這里最重要的是類Executor,當(dāng)然Executor是抽象類,具體的實(shí)現(xiàn)類有2個(gè),分別是ThreadPoolExecutor 和 ProcessPoolExecutor,正如名字所示,分別對(duì)應(yīng)著Thread和Process的執(zhí)行池。
看一下ProcessPoolExecutor定義, 缺省地,最大的工作任務(wù)應(yīng)該和CPU數(shù)量匹配。
class ProcessPoolExecutor(_base.Executor):def __init__(self, max_workers=None):"""Initializes a new ProcessPoolExecutor instance.Args:max_workers: The maximum number of processes that can be used toexecute the given calls. If None or not given then as manyworker processes will be created as the machine has processors."""_check_system_limits()if max_workers is None:self._max_workers = multiprocessing.cpu_count()else:if max_workers <= 0:raise ValueError("max_workers must be greater than 0")self._max_workers = max_workers再看一下ThreadPoolExecutor的定義, 最重疊IO上(或者參考CompleteIO),處理最大的工作數(shù)量應(yīng)該cpu數(shù)量的5倍。
class ThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None):"""Initializes a new ThreadPoolExecutor instance.Args:max_workers: The maximum number of threads that can be used toexecute the given calls."""if max_workers is None:# Use this number because ThreadPoolExecutor is often# used to overlap I/O instead of CPU work.max_workers = (cpu_count() or 1) * 5if max_workers <= 0:raise ValueError("max_workers must be greater than 0")self._max_workers = max_workersself._work_queue = queue.Queue()self._threads = set()self._shutdown = Falseself._shutdown_lock = threading.Lock()看一個(gè)簡單的示例,改編自網(wǎng)友的程序:
#!/usr/bin/python2 import os import urllibfrom concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from concurrent.futures import ProcessPoolExecutordef downloader(url):req = urllib.urlopen(url)if (req != None):print "begin down", url filename = os.path.basename(url)ext = os.path.splitext(url)[1]if not ext:raise RuntimeError("URL does not contain an extension")with open(filename,"wb") as file_handle:while True:chunk = req.read(1024)if not chunk:breakfile_handle.write(chunk)msg = "Finished downloading {filename}".format(filename = filename)return msgdef mainProcess(urls):with ProcessPoolExecutor(max_workers = 5) as executor:futures = [executor.submit(downloader,url) for url in urls]for future in as_completed(futures):print(future.result())def mainThread(urls):with ThreadPoolExecutor(max_workers = 5) as executor:futures = [executor.submit(downloader,url) for url in urls]for future in as_completed(futures):print(future.result())if __name__ == "__main__":urls1 = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf","http://www.irs.gov/pub/irs-pdf/f1040a.pdf","http://www.irs.gov/pub/irs-pdf/f1040ez.pdf"]urls2 = ["http://www.irs.gov/pub/irs-pdf/f1040es.pdf","http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]mainProcess(urls1)mainThread(urls2)執(zhí)行3次,輸出如下:
----1 begin down http://www.irs.gov/pub/irs-pdf/f1040ez.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040a.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040.pdf Finished downloading f1040ez.pdf Finished downloading f1040.pdf Finished downloading f1040a.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040es.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040sb.pdf Finished downloading f1040sb.pdf Finished downloading f1040es.pdf----2 begin down http://www.irs.gov/pub/irs-pdf/f1040.pdfb egin down http://www.irs.gov/pub/irs-pdf/f1040ez.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040a.pdf Finished downloading f1040ez.pdf Finished downloading f1040a.pdf Finished downloading f1040.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040es.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040sb.pdf Finished downloading f1040sb.pdf Finished downloading f1040es.pdf----3 begin down http://www.irs.gov/pub/irs-pdf/f1040.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040a.pdf Finished downloading f1040.pdf Finished downloading f1040a.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040ez.pdf Finished downloading f1040ez.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040sb.pdf begin down http://www.irs.gov/pub/irs-pdf/f1040es.pdf Finished downloading f1040sb.pdf Finished downloading f1040es.pdf總結(jié)
以上是生活随笔為你收集整理的python中的多线程、多进程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 美国辉瑞癌症畅销药专利将到期 每年损失超
- 下一篇: 在python中操作excel