python创建线程_多种方法实现 python 线程池
最近在做一個爬蟲相關的項目,單線程的整站爬蟲,耗時真的不是一般的巨大,運行一次也是心累,,,所以,要想實現整站爬蟲,多線程是不可避免的,那么python多線程又應該怎樣實現呢?這里主要要幾個問題(關于python多線程的GIL問題就不再說了,網上太多了)。
一、 既然多線程可以縮短程序運行時間,那么,是不是線程數量越多越好呢?
顯然,并不是,每一個線程的從生成到消亡也是需要時間和資源的,太多的線程會占用過多的系統資源(內存開銷,cpu開銷),而且生成太多的線程時間也是可觀的,很可能會得不償失,這里給出一個最佳線程數量的計算方式:
最佳線程數的獲取:
1、通過用戶慢慢遞增來進行性能壓測,觀察QPS(即每秒的響應請求數,也即是最大吞吐能力。),響應時間
2、根據公式計算:服務器端最佳線程數量=((線程等待時間+線程cpu時間)/線程cpu時間) * cpu數量
3、單用戶壓測,查看CPU的消耗,然后直接乘以百分比,再進行壓測,一般這個值的附近應該就是最佳線程數量。
二、為什么要使用線程池?
對于任務數量不斷增加的程序,每有一個任務就生成一個線程,最終會導致線程數量的失控,例如,整站爬蟲,假設初始只有一個鏈接a,那么,這個時候只啟動一個線程,運行之后,得到這個鏈接對應頁面上的b,c,d,,,等等新的鏈接,作為新任務,這個時候,就要為這些新的鏈接生成新的線程,線程數量暴漲。在之后的運行中,線程數量還會不停的增加,完全無法控制。所以,對于任務數量不端增加的程序,固定線程數量的線程池是必要的。
三、如何實現線程池?
這里,我分別介紹三種實現方式:
1、過去:
使用threadpool模塊,這是個python的第三方模塊,支持python2和python3,具體使用方式如下:
#! /usr/bin/env python#-*- coding: utf-8 -*-
importthreadpoolimporttimedefsayhello (a):print("hello:"+a)
time.sleep(2)defmain():globalresult
seed=["a","b","c"]
start=time.time()
task_pool=threadpool.ThreadPool(5)
requests=threadpool.makeRequests(sayhello,seed)for req inrequests:
task_pool.putRequest(req)
task_pool.wait()
end=time.time()
time_m= end-startprint("time:"+str(time_m))
start1=time.time()for each inseed:
sayhello(each)
end1=time.time()print("time1:"+str(end1-start1))if __name__ == '__main__':
main()
運行結果如下:
threadpool是一個比較老的模塊了,現在雖然還有一些人在用,但已經不再是主流了,關于python多線程,現在已經開始步入未來(future模塊)了
2、未來:
使用concurrent.futures模塊,這個模塊是python3中自帶的模塊,但是,python2.7以上版本也可以安裝使用,具體使用方式如下:
#! /usr/bin/env python#-*- coding: utf-8 -*-
from concurrent.futures importThreadPoolExecutorimporttimedefsayhello(a):print("hello:"+a)
time.sleep(2)defmain():
seed=["a","b","c"]
start1=time.time()for each inseed:
sayhello(each)
end1=time.time()print("time1:"+str(end1-start1))
start2=time.time()
with ThreadPoolExecutor(3) as executor:for each inseed:
executor.submit(sayhello,each)
end2=time.time()print("time2:"+str(end2-start2))
start3=time.time()
with ThreadPoolExecutor(3) as executor1:
executor1.map(sayhello,seed)
end3=time.time()print("time3:"+str(end3-start3))if __name__ == '__main__':
main()
運行結果如下:
注意到一點:
concurrent.futures.ThreadPoolExecutor,在提交任務的時候,有兩種方式,一種是submit()函數,另一種是map()函數,兩者的主要區別在于:
2.1、map可以保證輸出的順序, submit輸出的順序是亂的
2.2、如果你要提交的任務的函數是一樣的,就可以簡化成map。但是假如提交的任務函數是不一樣的,或者執行的過程之可能出現異常(使用map執行過程中發現問題會直接拋出錯誤)就要用到submit()
2.3、submit和map的參數是不同的,submit每次都需要提交一個目標函數和對應的參數,map只需要提交一次目標函數,目標函數的參數放在一個迭代器(列表,字典)里就可以。
3.現在?
這里要考慮一個問題,以上兩種線程池的實現都是封裝好的,任務只能在線程池初始化的時候添加一次,那么,假設我現在有這樣一個需求,需要在線程池運行時,再往里面添加新的任務(注意,是新任務,不是新線程),那么要怎么辦?
其實有兩種方式:
3.1、重寫threadpool或者future的函數:
這個方法需要閱讀源模塊的源碼,必須搞清楚源模塊線程池的實現機制才能正確的根據自己的需要重寫其中的方法。
3.2、自己構建一個線程池:
這個方法就需要對線程池的有一個清晰的了解了,附上我自己構建的一個線程池:
#! /usr/bin/env python#-*- coding: utf-8 -*-
importthreadingimportQueueimporthashlibimportloggingfrom utils.progress importPrintProgressfrom utils.save importSaveToSqliteclassThreadPool(object):def __init__(self, thread_num, args):
self.args=args
self.work_queue=Queue.Queue()
self.save_queue=Queue.Queue()
self.threads=[]
self.running=0
self.failure=0
self.success=0
self.tasks={}
self.thread_name=threading.current_thread().getName()
self.__init_thread_pool(thread_num)#線程池初始化
def __init_thread_pool(self, thread_num):#下載線程
for i inrange(thread_num):
self.threads.append(WorkThread(self))#打印進度信息線程
self.threads.append(PrintProgress(self))#保存線程
self.threads.append(SaveToSqlite(self, self.args.dbfile))#添加下載任務
defadd_task(self, func, url, deep):#記錄任務,判斷是否已經下載過
url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest()if not url_hash inself.tasks:
self.tasks[url_hash]=url
self.work_queue.put((func, url, deep))
logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8")))#獲取下載任務
defget_task(self):#從隊列里取元素,如果block=True,則一直阻塞到有可用元素為止。
task = self.work_queue.get(block=False)returntaskdeftask_done(self):#表示隊列中的某個元素已經執行完畢。
self.work_queue.task_done()#開始任務
defstart_task(self):for item inself.threads:
item.start()
logging.debug("Work start")defincrease_success(self):
self.success+= 1
defincrease_failure(self):
self.failure+= 1
defincrease_running(self):
self.running+= 1
defdecrease_running(self):
self.running-= 1
defget_running(self):returnself.running#打印執行信息
defget_progress_info(self):
progress_info={}
progress_info['work_queue_number'] =self.work_queue.qsize()
progress_info['tasks_number'] =len(self.tasks)
progress_info['save_queue_number'] =self.save_queue.qsize()
progress_info['success'] =self.success
progress_info['failure'] =self.failurereturnprogress_infodefadd_save_task(self, url, html):
self.save_queue.put((url, html))defget_save_task(self):
save_task= self.save_queue.get(block=False)returnsave_taskdefwait_all_complete(self):for item inself.threads:ifitem.isAlive():#join函數的意義,只有當前執行join函數的線程結束,程序才能接著執行下去
item.join()#WorkThread 繼承自threading.Thread
classWorkThread(threading.Thread):#這里的thread_pool就是上面的ThreadPool類
def __init__(self, thread_pool):
threading.Thread.__init__(self)
self.thread_pool=thread_pool#定義線程功能方法,即,當thread_1,...,thread_n,調用start()之后,執行的操作。
defrun(self):print(threading.current_thread().getName())whileTrue:try:#get_task()獲取從工作隊列里獲取當前正在下載的線程,格式為func,url,deep
do, url, deep =self.thread_pool.get_task()
self.thread_pool.increase_running()#判斷deep,是否獲取新的鏈接
flag_get_new_link =Trueif deep >=self.thread_pool.args.deep:
flag_get_new_link=False#此處do為工作隊列傳過來的func,返回值為一個頁面內容和這個頁面上所有的新鏈接
html, new_link =do(url, self.thread_pool.args, flag_get_new_link)if html == '':
self.thread_pool.increase_failure()else:
self.thread_pool.increase_success()#html添加到待保存隊列
self.thread_pool.add_save_task(url, html)#添加新任務,即,將新頁面上的不重復的鏈接加入工作隊列。
ifnew_link:for url innew_link:
self.thread_pool.add_task(do, url, deep+ 1)
self.thread_pool.decrease_running()#self.thread_pool.task_done()
exceptQueue.Empty:if self.thread_pool.get_running() <=0:break
exceptException, e:
self.thread_pool.decrease_running()#print str(e)
break
總結
以上是生活随笔為你收集整理的python创建线程_多种方法实现 python 线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mininet编程实现交换机规则的插入、
- 下一篇: websocket python爬虫_p