日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

上下文管理、线程池、redis订阅和发布

發布時間:2024/10/12 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 上下文管理、线程池、redis订阅和发布 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一:上下文管理:

對于一些對象在使用之后,需要關閉操作的。比如說:socket、mysql數據庫連接、文件句柄等。

都可以用上下文來管理。

語法結構:

1 Typical usage: 2 3 @contextmanager 4 def some_generator(<arguments>): 5 <setup> 6 try: 7 yield <value> 8 finally: 9 <cleanup> 10 11 This makes this: 12 13 with some_generator(<arguments>) as <variable>: 14 <body>

?

code:

1 import socket 2 import contextlib 3 4 5 @contextlib.contextmanager 6 def sock_server(host,port): 7 sk=socket.socket() 8 sk.bind((host,port)) 9 sk.listen(4) 10 try: 11 yield sk 12 finally: 13 sk.close() 14 15 with sock_server("127.0.0.1",22) as soc: 16 print(soc)

執行順序:

解釋:python從上到下依次解釋:

1、當到with的時候,執行with內socket_server("127.0.0.1",22),跳到

2、被contextlib.contextmanager裝飾的函數。

3、依次執行函數socket_server到yield 并把sk返回給第4步的sco變量

4、然后執行with下面的代碼塊,執行print語句。

5、當with語句的代碼塊執行完。跳到第3步的yeild。

6、執行finally語句里的代碼塊。

二:線程池(threadpool)

自己版本:

1 #!/bin/env python 2 #author:evil_liu 3 #date:2016-7-21 4 #description: thread pool 5 6 import threading 7 import time 8 import queue 9 10 class Thread_Poll: 11 ''' 12 功能:該類主要實現多線程,以及線程復用。 13 ''' 14 def __init__(self,task_num,max_size): 15 ''' 16 功能:該函數是初始化線程池對象。 17 :param task_num: 任務數量。 18 :param max_size: 線程數量。 19 :return:無。 20 ''' 21 self.task_num=task_num 22 self.max_size=max_size 23 self.q=queue.Queue(task_num)#設置任務隊列的。 24 self.thread_list=[] 25 self.res_q=queue.Queue()#設置結果隊列。 26 27 def run(self,func,i,call_back=None): 28 ''' 29 功能:該函數是線程池運行主函數。 30 :param func: 傳入任務主函數。 31 :param *args: 任務函數參數,需要是元組形式。 32 :param call_back: 回調函數。 33 :return: 無。 34 ''' 35 if len(self.thread_list)<self.max_size:#如果目前線程數小于我們定義的線程的個數,進行創建。 36 self.creat_thread() 37 misson=(func,i,call_back)#往任務隊列放任務。 38 self.q.put(misson) 39 40 def creat_thread(self): 41 ''' 42 功能:該函數主要是創建線程,并調用call方法。 43 :return: 無。 44 ''' 45 t=threading.Thread(target=self.call)#創建線程 46 t.start() 47 48 def call(self): 49 ''' 50 功能:該函數是線程循環執行任務函數。 51 :return: 無。 52 ''' 53 cur_thread=threading.currentThread 54 self.thread_list.append(cur_thread) 55 event=self.q.get() 56 while True: 57 func,args,cal_ba=event#獲取任務函數。 58 try: 59 res=func(*args)#執行任務函數。注意參數形式是元組形式。 60 flag="OK" 61 except Exception as e: 62 print(e) 63 res=False 64 flag="fail" 65 self.res(res,flag)#調用回調函數,將執行結果返回到隊列中。 66 try: 67 event=self.q.get(timeout=2)#如果任務隊列為空,獲取任務超時2s超過2s線程停止執行任務,并退出。 68 except Exception: 69 self.thread_list.remove(cur_thread) 70 break 71 def res(self,res,status): 72 ''' 73 功能:該方法主要是將執行結果方法隊列中。 74 :param res: 任務函數的執行結果。 75 :param status: 執行任務函數的結果,成功還是失敗。 76 :return: 無。 77 ''' 78 da_res=(res,status) 79 self.res_q.put(da_res) 80 81 def task(x,y): 82 ''' 83 功能:該函數主要需要執行函數。 84 :param x: 參數。 85 :return: 返回值1,表示執行成功。 86 ''' 87 print(x) 88 return x+y 89 def wri_fil(x): 90 ''' 91 功能:該函數主要講結果隊列中的結果寫入文件中。 92 :param x: 任務長度。 93 :return: 無。 94 ''' 95 while True:#將執行結果,從隊列中獲取結果并將結果寫入文件中。 96 time.sleep(1) 97 if pool.res_q.qsize()==x:#當隊列當前的長度等于任務執行次數,表示任務執行完成。 98 with open('1.txt','w') as f1: 99 for i in range(pool.res_q.qsize()): 100 try: 101 data=pool.res_q.get(timeout=2) 102 f1.write('mission result:%s,status:%s\n'%data) 103 except Exception: 104 break 105 break 106 else: 107 continue 108 if __name__ == '__main__': 109 pool=Thread_Poll(10,5)#初始化線程池對象。 110 for i in range(10):#循環任務。 111 pool.run(task,(1,2)) 112 wri_fil(10)

?

老師版本:注意老師在創建線程的時候,如果此時任務隊列中沒有任務的時候,不會創建其他線程。在線程執行完任務之后,將線程加入空閑線程的列表中,然后讓當前線程去隊列里獲取任務,利用queue里的get()方法阻塞的作用的,如果一直阻塞的話,

然后表示空閑的列表中的加入的線程 一直有,此時表示創建線程數已經滿足任務需求,如果不阻塞則空閑線程列表里沒有空余線程。而是獲取任務,執行任務。

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import queue 5 import threading 6 import contextlib 7 import time 8 9 StopEvent = object() 10 11 12 class ThreadPool(object): 13 14 def __init__(self, max_num, max_task_num = None): 15 if max_task_num: 16 self.q = queue.Queue(max_task_num) 17 else: 18 self.q = queue.Queue() 19 self.max_num = max_num 20 self.cancel = False 21 self.terminal = False 22 self.generate_list = [] 23 self.free_list = [] 24 25 def run(self, func, args, callback=None): 26 """ 27 線程池執行一個任務 28 :param func: 任務函數 29 :param args: 任務函數所需參數 30 :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數) 31 :return: 如果線程池已經終止,則返回True否則None 32 """ 33 if self.cancel: 34 return 35 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 36 self.generate_thread() 37 w = (func, args, callback,) 38 self.q.put(w) 39 40 def generate_thread(self): 41 """ 42 創建一個線程 43 """ 44 t = threading.Thread(target=self.call) 45 t.start() 46 47 def call(self): 48 """ 49 循環去獲取任務函數并執行任務函數 50 """ 51 current_thread = threading.currentThread() 52 self.generate_list.append(current_thread) 53 54 event = self.q.get() 55 while event != StopEvent: 56 57 func, arguments, callback = event 58 try: 59 result = func(*arguments) 60 success = True 61 except Exception as e: 62 success = False 63 result = None 64 65 if callback is not None: 66 try: 67 callback(success, result) 68 except Exception as e: 69 pass 70 71 with self.worker_state(self.free_list, current_thread): 72 if self.terminal: 73 event = StopEvent 74 else: 75 event = self.q.get() 76 else: 77 78 self.generate_list.remove(current_thread) 79 80 def close(self): 81 """ 82 執行完所有的任務后,所有線程停止 83 """ 84 self.cancel = True 85 full_size = len(self.generate_list) 86 while full_size: 87 self.q.put(StopEvent) 88 full_size -= 1 89 90 def terminate(self): 91 """ 92 無論是否還有任務,終止線程 93 """ 94 self.terminal = True 95 96 while self.generate_list: 97 self.q.put(StopEvent) 98 99 self.q.queue.clear() 100 101 @contextlib.contextmanager 102 def worker_state(self, state_list, worker_thread): 103 """ 104 用于記錄線程中正在等待的線程數 105 """ 106 state_list.append(worker_thread) 107 try: 108 yield 109 finally: 110 state_list.remove(worker_thread) 111 112 113 114 # How to use 115 116 117 pool = ThreadPool(5) 118 119 def callback(status, result): 120 # status, execute action status 121 # result, execute action return value 122 pass 123 124 125 def action(i): 126 print(i) 127 128 for i in range(30): 129 ret = pool.run(action, (i,), callback) 130 131 time.sleep(5) 132 print(len(pool.generate_list), len(pool.free_list)) 133 print(len(pool.generate_list), len(pool.free_list)) 134 # pool.close() 135 # pool.terminate()

?

轉載于:https://www.cnblogs.com/evilliu/p/5724933.html

總結

以上是生活随笔為你收集整理的上下文管理、线程池、redis订阅和发布的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。