Python异常处理和进程线程
寫在前面
最壞的結果,不過是大器晚成;
?
一、異常處理
- 1.語法錯誤導致的異常
-?這種錯誤,根本過不了python解釋器的語法檢測,必須在程序運行前就修正;
- 2.邏輯上的異常
- 即邏輯錯誤,例如除零錯誤;
- 異常相關信息:異常的追蹤信息 + 異常類型 + 異常值
- 異常種類
1 ArithmeticError 2 AssertionError 3 AttributeError 4 BaseException 5 BufferError 6 BytesWarning 7 DeprecationWarning 8 EnvironmentError 9 EOFError 10 Exception 11 FloatingPointError 12 FutureWarning 13 GeneratorExit 14 ImportError 15 ImportWarning 16 IndentationError 17 IndexError 18 IOError 19 KeyboardInterrupt 20 KeyError 21 LookupError 22 MemoryError 23 NameError 24 NotImplementedError 25 OSError 26 OverflowError 27 PendingDeprecationWarning 28 ReferenceError 29 RuntimeError 30 RuntimeWarning 31 StandardError 32 StopIteration 33 SyntaxError 34 SyntaxWarning 35 SystemError 36 SystemExit 37 TabError 38 TypeError 39 UnboundLocalError 40 UnicodeDecodeError 41 UnicodeEncodeError 42 UnicodeError 43 UnicodeTranslateError 44 UnicodeWarning 45 UserWarning 46 ValueError 47 Warning 48 ZeroDivisionError 異常類別- 異常示例:
# IndexError: list index out of range list1 = [1,2] list1[7]# KeyError: 'k9' dict1 = {'k1':'v1','k2':'v2', } dict1['k9']# ValueError: invalid literal for int() with base 10: 'standby' str = 'standby' int(str)...- 3.異常處理
- try...except... (可以寫多個except)
- 從上向下匹配,匹配到了就不再匹配下面的except,類似于 iptables
try:# msg=input('>>:')# int(msg) #ValueErrorprint(x) #NameErrord={'a':1}d['b'] #KeyErrorl=[1,2]l[10] #IndexError1+'asdfsadfasdf' #TypeErrorexcept ValueError as e:print('ValueError: %s' % e) except NameError as e:print('NameError: %s' % e) except KeyError as e:print(e)print('=============>')- 萬能異常 Exception
try:# d={'a':1}# d['b'] #KeyErrorl=[1,2]l[10] #IndexError1+'asdfsadfasdf' #TypeErrorexcept Exception as e:print('捕獲到異常,異常類型:%s,異常值:%s' % (type(e),e))print('=============>')- try...except...else...finally... (finally 里面主要是做一些清理工作)
try:# 1+'asdfsadfasdf' #TypeErrorprint('aaaaaa')except Exception as e:print('捕獲到異常,異常類型:%s,異常值:%s' % (type(e), e)) else:print('沒有異常時發生會執行') finally:print('有沒有異常都會執行')- 4.異常擴展
- 主動拋出異常
try:raise TypeError('類型錯誤') except Exception as e:print(e)- 自定義異常類型
class standbyException(BaseException):def __init__(self,msg):self.msg=msgdef __str__(self):return self.msg try:raise standbyException('--->> 自定義異常類型') except standbyException as e:print(e)- 斷言 assert
# 待補充...
二、操作系統
- 0.參考:操作系統簡介
- 1.基礎概念
- 1.精簡的說的話,操作系統就是一個協調、管理和控制計算機硬件資源和軟件資源的控制程序;
- 2.操作系統管理硬件,提供系統調用(接口,比方說:文件);對資源請求進行有序調度處理;
- 3.操作系統處在用戶應用程序和計算機硬件之間,本質也是一個軟件
- 4.操作系統由操作系統的內核(運行于內核態,管理硬件資源)以及系統調用(運行于用戶態,為應用程序員寫的應用程序提供系統調用接口)兩部分組成,所以,單純的說操作系統是運行于內核態的,是不準確的。
# 1.隔離復雜度,提供簡單易用的接口 隱藏了丑陋的硬件調用接口,為應用程序員提供調用硬件資源的更好,更簡單,更清晰的模型(系統調用接口)。 應用程序員有了這些接口后,就不用再考慮操作硬件的細節,專心開發自己的應用程序即可。比如,磁盤資源的抽象是文件系統(C盤,D盤,E盤...下的目錄及文件), 有了文件的概念,我們直接打開文件,讀或者寫就可以了, 無需關心記錄是否應該使用修正的調頻記錄方式,以及當前電機的狀態等細節 # 2.將應用程序對硬件資源的競態請求變得有序化 例如:很多應用軟件其實是共享一套計算機硬件, 比方說有可能有三個應用程序同時需要申請打印機來輸出內容, 那么a程序競爭到了打印機資源就打印, 然后可能是b競爭到打印機資源,也可能是c,這就導致了無序; 打印機可能打印一段a的內容然后又去打印c..., 操作系統的一個功能就是將這種無序變得有序;- 2.相關概念
- 1.批處理
-?把一堆人的輸入攢成一大波輸入;把一堆人的輸出攢成一大波輸出;節省了機時;
- 2.多道程序設計
- 空間上的復用
-?將內存分為幾部分,每個部分放入一個程序,這樣,同一時間內存中就有了多道程序;
空間上的復用最大的問題是: 程序之間的內存必須分割,這種分割需要在硬件層面實現,由操作系統控制。 如果內存彼此不分割,則一個程序可以訪問另外一個程序的內存,1.首先喪失的是安全性: 比如你的qq程序可以訪問操作系統的內存,這意味著你的qq可以拿到操作系統的所有權限。2.其次喪失的是穩定性: 某個程序崩潰時有可能把別的程序的內存也給回收了, 比方說把操作系統的內存給回收了,則操作系統崩潰。- 時間上的復用
- 快速的上下文切換
當一個資源在時間上復用時,不同的程序或用戶輪流使用它,第一個程序獲取該資源使用結束后,在輪到第二個。。。第三個。。。例如:只有一個cpu,多個程序需要在該cpu上運行,操作系統先把cpu分給第一個程序; 在這個程序運行的足夠長的時間(時間長短由操作系統的算法說了算)或者遇到了I/O阻塞,操作系統則把cpu分配給下一個程序; 以此類推,直到第一個程序重新被分配到了cpu然后再次運行,由于cpu的切換速度很快,給用戶的感覺就是這些程序是同時運行的,或者說是并發的,或者說是偽并行的。 至于資源如何實現時間復用,或者說誰應該是下一個要運行的程序,以及一個任務需要運行多長時間,這些都是操作系統的工作。 當一個程序在等待I/O時,另一個程序可以使用cpu,如果內存中可以同時存放足夠多的作業,則cpu的利用率可以接近100%;- 3.分時操作系統
把處理機的運行時間分成很短的時間片,按時間片輪流把處理機分配給各聯機作業使用;分時操作系統,在多個程序之間切換,按時間片切換; 第三代計算機廣泛采用了必須的保護硬件(程序之間的內存彼此隔離)之后,分時系統才開始流行;?
1 操作系統的作用: 2 1.把硬件丑陋復雜的接口隱藏起來,給應用程序提供簡單易用的接口 3 2.管理、調度進程,并且把進程之間對硬件的競爭變得有序化 4 5 6 多道技術: 7 1.產生背景:為了實現單CPU下的并發效果 8 2.分為兩部分: 9 1.空間上的復用(內存里放入多個程序,必須實現硬件層面的隔離) 10 2.時間上的復用(復用的是CPU的時間片;快速切換) 11 什么情況下進行切換? 12 1.正在執行的任務遇到的阻塞(例如I/O) 13 2.正在執行的任務運行時間過長 14 15 進程:正在運行的一個過程/任務 16 由操作系統負責調度,然后由CPU負責執行 17 18 19 并發:偽并行,單核+多道實現 20 并行:只有多核才能實現真正的并行 21 22 23 同步:打電話 24 異步:發短信 25 26 27 進程的創建: 28 1.系統初始化的時候 29 2.與用戶交互:雙擊一個EXE 30 3.在執行一個進程的過程當中,調用了Popen/os.fork 31 4.批處理任務 32 33 34 系統調用: 35 Linux:fork 36 Windows:CreateProcess 37 38 39 Linux與Windows下的進程的區別: 40 1.linux下的進程有父子關系,Windows下的進程沒有這個關系; 41 2.Linux下創建一個新的進程,需要拷貝父進程的地址空間;Windows下從最開始創建進程,兩個進程之間就是不一樣的; 42 43 44 進程的三種狀態: 45 1.就緒 46 2.運行 47 3.阻塞 48 49 50 進程間快速切換,前提是在切換之前需要保存當前進程當前的狀態 51 yield 就有這種操作系統級別的 保存狀態 的功能 課上記的一些筆記,作為補充?
三、進程
- 0.參考:Cpython解釋器支持的進程與線程
- 1.進程的概念
-?進程:正在進行的一個過程或者說一個任務。而負責執行任務則是cpu;起源于操作系統,是操作系統最核心的概念;
- 程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程;
-?同一個程序執行兩次,那也是兩個進程,比如打開暴風影音,雖然都是同一個軟件,但是一個可以播放西游記,一個可以播放天龍八部;
An executing instance of a program is called a process.Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.程序并不能單獨運行,只有將程序裝載到內存中,系統為它分配資源才能運行,而這種執行的程序就稱之為進程。 程序和進程的區別就在于:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬于動態概念。在多道編程中,我們允許多個程序同時加載到內存中,在操作系統的調度下,可以實現并發地執行。這是這樣的設計,大大提高了CPU的利用率。 進程的出現讓每個用戶感覺到自己獨享CPU,因此,進程就是為了在CPU上實現多道編程而提出的。- 2.并發與并行
- 1.并發:是偽并行,即看起來是同時運行。單個cpu+多道技術就可以實現并發,(并行也屬于并發);
- 2.并行:同時運行,只有具備多個cpu才能實現真正意義上的并行;
-??單核下,可以利用多道技術;多個核,每個核也都可以利用多道技術(多道技術是針對單核而言的)
有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4;一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術;而一旦任務1的I/O結束了,操作系統會重新調用它; (需知進程的調度、分配給哪個cpu運行,由操作系統說了算),可能被分配給四個cpu中的任意一個去執行多道技術: 內存中同時存入多道(多個)程序,cpu從一個進程快速切換到另外一個, 使每個進程各自運行幾十或幾百毫秒;這樣,雖然在某一個瞬間,一個cpu只能執行一個任務, 但在1秒內,cpu卻可以運行多個進程,這就給人產生了并行的錯覺,即偽并發, 以此來區分多處理器操作系統的真正硬件并行(多個cpu共享同一個物理內存)
- 3.同步和異步
同步就是指一個進程在執行某個請求的時候,若該請求需要一段時間才能返回信息, 那么這個進程將會一直等待下去,直到收到返回信息才繼續執行下去;異步是指進程不需要一直等下去,而是繼續執行下面的操作,不管其他進程的狀態。 當有消息返回時系統會通知進程進行處理,這樣可以提高執行的效率。同步:打電話異步:發短息、MySQL主從復制
- 4.multiprocess簡單介紹
python中的多線程無法利用多核優勢; 如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing。1.multiprocessing模塊用來開啟子進程,并在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。2.multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock、Pool等組件。需要再次強調的一點是: 與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限于該進程內;- 5.Process介紹
- 1.參數介紹
Process([group [, target [, name [, args [, kwargs]]]]]) 由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號group 參數未使用,值始終為None target 表示調用對象,即子進程要執行的任務 args 表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs 表示調用對象的字典,kwargs={'name':'egon','age':18} name 為子進程的名稱- 2.方法介紹
p.start() 啟動進程,并調用該子進程中的p.run() p.run() 進程啟動時運行的方法,正是它去調用target指定的函數, 我們自定義類的類中一定要實現該方法 p.terminate() 強制終止進程p,不會進行任何清理操作, 如果p創建了子進程,該子進程就成了僵尸進程,使用該方法需要特別小心這種情況。 如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖p.is_alive() 如果p仍然運行,返回Truep.join([timeout]) hold住的是主進程 主線程等待p終止(強調:是主線程處于等的狀態,而p是處于運行的狀態)。 timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程, 而不能join住run開啟的進程- 3.屬性介紹
p.daemon 默認值為False,如果設為True,代表p為后臺運行的守護進程,當p的父進程終止時,p也隨之終止; 并且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置;p.name:進程的名稱p.pid:進程的pidp.exitcode 進程在運行時為None、如果為–N,表示被信號N結束(了解即可)p.authkey 進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。 這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性, 這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)- 4.注意
注意:在windows中Process()必須放到# if __name__ == '__main__':下Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() insideif __name__ == "__main__" since statements inside this if-statement will not get called upon import.由于Windows沒有fork,多處理模塊啟動一個新的Python進程并導入調用模塊。 如果在導入時調用Process(),那么這將啟動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的原因, 使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。- 6.進程的創建
1. 在UNIX中該系統調用是:fork; fork會創建一個與父進程一模一樣的副本, 二者有相同的存儲映像、同樣的環境字符串和同樣的打開文件; (在shell解釋器進程中,執行一個命令就會創建一個子進程)2. 在windows中該系統調用是:CreateProcess, CreateProcess既處理進程的創建,也負責把正確的程序裝入新進程。 關于創建的子進程,UNIX和windows1.相同的是:進程創建后,父進程和子進程有各自不同的地址空間(多道技術要求物理層面實現進程之間內存的隔離), 任何一個進程的在其地址空間中的修改都不會影響到另外一個進程;2.不同的是:在UNIX中,子進程的初始地址空間是父進程的一個副本, 提示:子進程和父進程是可以有只讀的共享內存區的。 但是對于windows系統來說,從一開始父進程與子進程的地址空間就是不同的。- 創建進程示例1:
# Windows上調用Process,可執行代碼一定要放到 __main__ 里 from multiprocessing import Process import time,randomdef func(name):print('%s is running...' % name)# time.sleep(random.randint(1,3))time.sleep(1)print('%s run end.' % name)if __name__ == '__main__':# p1 = Process(target=func,args=('standby',))p1 = Process(target=func,args=('standby',),name='sub-P1') # 指定進程的名字p1.start()print(p1.name)print('Parent running')time.sleep(1)print('Parent run end')--- sub-P1 Parent running standby is running... Parent run end standby run end.- 創建進程示例2:使用繼承的方式,必須在類中定義一個run()方法
# 繼承Process類 from multiprocessing import Process import time, randomclass Foo(Process):def __init__(self, name):super().__init__() # 調用父類Process的init方法進行初始化self.name = namedef run(self): # 必須定義run()方法print('%s is running...' % self.name)# time.sleep(random.randint(1,3))time.sleep(1)print('%s run end.' % self.name) if __name__ == '__main__':p1 = Foo('standby')p1.start() # start() 會自動調用 run()print('Parent running...')time.sleep(1)print('Parent run end.')--- Parent running... standby is running... Parent run end. standby run end.- 7.進程的狀態
?
就緒狀態:進程已經準備好,已分配到所需資源,只要分配到CPU就能夠立即運行;如果進程運行時間片使用完也會進入就緒狀態;執行狀態:進程處于就緒狀態被調度后,進程進入執行狀態;阻塞狀態:正在執行的進程由于某些事件(I/O請求,申請緩存區失敗)而暫時無法運行,進程受到阻塞; 在滿足請求時進入就緒狀態等待系統調用; 其實在兩種情況下會導致一個進程在邏輯上不能運行:1. 進程掛起是自身原因,遇到I/O阻塞,便要讓出CPU讓其他進程去執行,這樣保證CPU一直在工作2. 與進程無關,是操作系統層面,可能會因為一個進程占用時間過多,或者優先級等原因,而調用其他的進程去使用CPU。
- 8.Process對象的常用方法和屬性
- 1.daemon=True與join()
p.daemon=True 之后,如果在子進程p內再創建子進程就會報錯: 'daemonic processes are not allowed to have children' # 示例1:沒有join也不設置daemon from multiprocessing import Process def say(name):print('%s say hello.' % name)if __name__ == '__main__':p = Process(target=say,args=('standby',))p.start() # 創建新的進程需要時間,所以主線程先打印;但子進程也會打印,因為主線程需要等待子進程執行完畢,才結束,避免出現僵尸進程(沒有父進程的子進程)print('這是主線程')---結果--- 這是主線程 standby say hello.===================> # 示例2:設置daemon=True,沒有join from multiprocessing import Process def say(name):print('%s say hello.' % name)if __name__ == '__main__':p = Process(target=say,args=('standby',))p.daemon = True #一定要在p.start()前設置;設置p為守護進程,禁止p創建子進程;并且父進程結束,p跟著一起結束p.start()print('這是主線程')---結果--- 這是主線程===================> # 示例3:有join,沒有設置daemon=True from multiprocessing import Process def say(name):print('%s say hello.' % name)if __name__ == '__main__':p = Process(target=say,args=('standby',))p.start()p.join() # 阻塞了主線程,使得子進程先執行完畢,主線程才繼續往下執行print('這是主線程')---結果--- standby say hello. 這是主線程===================> # 示例4:設置daemon=True,有join from multiprocessing import Process def say(name):print('%s say hello.' % name)if __name__ == '__main__':p = Process(target=say,args=('standby',))p.daemon = True #把p設置成守護進程p.start()p.join()print('這是主線程') ---結果--- standby say hello. 這是主線程===================> ===================> ===================> # 示例5:子進程為什么會不打印? from multiprocessing import Process import time def say(name):print('%s say hello.' % name)time.sleep(2)print('%s say bye.' % name)if __name__ == '__main__':p = Process(target=say,args=('standby',))p.daemon = True #把p設置成守護進程p.start()p.join(0.001) # 主線程等待p的結束,等0.0001秒就不再等了;print('這是主線程')---結果--- 這是主線程===================> # 示例6:把主線程阻塞的時間改大一點,改為0.1s from multiprocessing import Process import time def say(name):print('%s say hello.' % name)time.sleep(2)print('%s say bye.' % name)if __name__ == '__main__':p = Process(target=say,args=('standby',))p.daemon = True #把p設置成守護進程p.start()p.join(0.1)print('這是主線程')---結果--- standby say hello. 這是主線程===================> # 示例7:把主線程阻塞的時間改大一點,改為2s from multiprocessing import Process import time def say(name):print('%s say hello.' % name)time.sleep(2)print('%s say bye.' % name)if __name__ == '__main__':p = Process(target=say,args=('standby',))p.daemon = True #把p設置成守護進程p.start()p.join(2)print('這是主線程')---結果--- standby say hello. 這是主線程===================> # 示例8:把主線程阻塞的時間改大一點,改為2.1s from multiprocessing import Process import time def say(name):print('%s say hello.' % name)time.sleep(2)print('%s say bye.' % name)if __name__ == '__main__':p = Process(target=say,args=('standby',))p.daemon = True #把p設置成守護進程p.start()p.join(2.1)print('這是主線程')---結果--- standby say hello. standby say bye. 這是主線程- 2.terminate() 與 is_alive()
# 示例1:查看terminate之后的狀態 from multiprocessing import Process def say(name):print('%s say hello.' % name)print('%s say bye.' % name)if __name__ == '__main__':p = Process(target=say,args=('standby',))print(p.is_alive())p.start()p.terminate()print(p.is_alive())print('這是主線程')print(p.is_alive())---結果--- False True 這是主線程 True========================> # 示例2:在主線程中sleep一下 from multiprocessing import Process import time def say(name):print('%s say hello.' % name)print('%s say bye.' % name)if __name__ == '__main__':p = Process(target=say,args=('standby',))print(p.is_alive())p.start()p.terminate() # 關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活;print(p.is_alive())print('這是主線程')time.sleep(0.01)print(p.is_alive())---結果--- False True 這是主線程 False- 3.name 和 pid
from multiprocessing import Process import os def say():print('Say hello,子進程id:%s' % os.getpid())if __name__ == '__main__':p = Process(target=say)p.start()print('子進程的名字是:%s,子進程id:%s' % (p.name,p.pid))print('這是主線程,主線程id:%s' % os.getpid())---結果--- 子進程的名字是:Process-1,子進程id:1612 這是主線程,主線程id:6004 Say hello,子進程id:1612?
四、線程
- 為什么要有線程?
有了進程為什么還要線程?進程有很多優點,它提供了多道編程, 讓我們感覺我們每個人都擁有自己的CPU和其他資源,可以提高計算機的利用率。 很多人就不理解了,既然進程這么優秀,為什么還要線程呢? 其實,仔細觀察就會發現進程還是有很多缺陷的,主要體現在兩點上:1.進程只能在一個時間干一件事,如果想同時干兩件事或多件事,進程就無能為力了。2.進程在執行的過程中如果阻塞,例如等待輸入,整個進程就會掛起; 即使進程中有些工作不依賴于輸入的數據,也將無法執行。- 線程示例:
進程與進程之間的資源是隔離的; 一個進程里的多個線程共享進程的資源;例子:編譯一個文檔,有三個功能,接收用戶輸入+格式化+定期保存1.用三個進程實現;但進程間數據是隔離的,這樣就需要維護三份資源數據;2.用1個進程掛三個線程實現,三個線程共享一份資源;- 線程是什么?
-?進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是cpu上的執行單位;
-?線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位;
-?一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發多個線程,每條線程并行執行不同的任務;
-?進程之間是競爭關系,線程之間是協作關系;
A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).- 線程和進程的區別
# 1 Threads share the address space of the process that created it; Processes have their own address space.# 2 Threads have direct access to the data segment of its process; Processes have their own copy of the data segment of the parent process.# 3 Threads can directly communicate with other threads of its process; Processes must use interprocess communication to communicate with sibling processes.# 4 New threads are easily created; New processes require duplication of the parent process.# 5 Threads can exercise considerable control over threads of the same process; Processes can only exercise control over child processes.# 6 Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; Changes to the parent process does not affect child processes.- 創建進程的開銷要遠大于創建線程的開銷;
# 創建 500 個線程 import time from threading import Thread def work():a = 99999b = 101001010010101010str1 = 'axaxxchaxchnahxalx'str2 = 'axaxxcedw2312haxchnahxalx'str3 = '121212axaxxchaxchnahxalx'dic = {'k1':'v1','k2':'v2'}if __name__ == '__main__':start_time = time.time()t_l = []for i in range(500):t=Thread(target=work)t_l.append(t)t.start()for t in t_l:t.join()stop_time = time.time()print('Run time is %s' % (stop_time-start_time)) # Run time is 0.05900001525878906# ++++++++++++++++++++++++++++++++++# 創建 500 個進程 import time from multiprocessing import Process def work():a = 99999b = 101001010010101010str1 = 'axaxxchaxchnahxalx'str2 = 'axaxxcedw2312haxchnahxalx'str3 = '121212axaxxchaxchnahxalx'dic = {'k1':'v1','k2':'v2'}if __name__ == '__main__':start_time = time.time()p_l = []for i in range(500):p=Process(target=work)p_l.append(p)p.start()for p in p_l:p.join()stop_time = time.time()print('Run time is %s' % (stop_time-start_time)) # Run time is 19.552000045776367關于線程和協程更多參見: ...
五、多線程和多進程
- 多進程
- 模擬創建多個子進程,未阻塞的情況
from multiprocessing import Process import time,randomdef func(name):print('%s is running...' % name)time.sleep(random.randint(1,3))print('%s run end.' % name)if __name__ == '__main__':# p1 = Process(target=func,args=('standby',))p1 = Process(target=func,args=('進程1',),name='sub-P1')p2 = Process(target=func,args=('進程2',),name='sub-P2')p3 = Process(target=func,args=('進程3',),name='sub-P3')p4 = Process(target=func,args=('進程4',),name='sub-P4')sub_p_lits = [p1,p2,p3,p4]for p in sub_p_lits:p.start()print('Parent running')time.sleep(1)print('Parent run end')--- Parent running 進程2 is running... 進程4 is running... 進程3 is running... 進程1 is running... Parent run end 進程3 run end. 進程2 run end. 進程4 run end. 進程1 run end.- 模擬創建多個子進程,join阻塞的情況:
from multiprocessing import Process import time,randomdef func(name):print('%s is running...' % name)time.sleep(random.randint(1,3))print('%s run end.' % name)if __name__ == '__main__':# p1 = Process(target=func,args=('standby',))p1 = Process(target=func,args=('進程1',),name='sub-P1')p2 = Process(target=func,args=('進程2',),name='sub-P2')p3 = Process(target=func,args=('進程3',),name='sub-P3')p4 = Process(target=func,args=('進程4',),name='sub-P4')sub_p_lits = [p1,p2,p3,p4]for p in sub_p_lits:p.start()for p in sub_p_lits:p.join()print('Parent running')time.sleep(1)print('Parent run end')--- 進程2 is running... 進程1 is running... 進程3 is running... 進程4 is running... 進程2 run end. 進程3 run end. 進程4 run end. 進程1 run end. Parent running Parent run end-?socket + Process 實現并發處理多個客戶端連接
#!/usr/bin/python # -*- coding:utf-8 -*- # Server端from multiprocessing import Process from socket import * server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5)def talk(conn,addr):while True: #通訊循環try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:break if __name__ == '__main__':while True: #鏈接循環conn,addr=server.accept()print(addr)p=Process(target=talk,args=(conn,addr))p.start() #!/usr/bin/python # -*- coding:utf-8 -*- # Client端from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8')) # 多進程實現socket存在的問題: 每來一個客戶端,都在服務端開啟一個進程; 如果并發來一個萬個客戶端,要開啟一萬個進程嗎? 你自己嘗試著在你自己的機器上開啟一萬個,10萬個進程試一試。(導致機器死機)解決方法:進程池
五、進程池
- 進程池有啥用?
Pool可以提供指定數量的進程,供用戶調用; 當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求; 但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,就重用進程池中的進程。注意:是重用進程,而不是新建,Pool維護指定個數的進程,來循環執行很多的任務,進程的id都是不變的!- Pool
Pool([numprocess [,initializer [, initargs]]]):創建進程池numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值;- 常用方法
參考:http://www.cnblogs.com/congbo/archive/2012/08/23/2652490.html
p.apply(func [, args [, kwargs]]) 進程池中的工作進程執行func(*args,**kwargs),然后返回結果; 是同步的;p.apply_async(func [, args [, kwargs]]) 與apply用法一致,但它是非阻塞的且支持結果返回后進行回調; 是異步的; 主進程循環運行過程中不等待apply_async的返回結果; 在主進程結束后,即使子進程還未返回整個程序也會退出。 雖然 apply_async是非阻塞的,但其返回結果的get方法卻是阻塞的:如使用result.get()會阻塞主進程。p.close() 關閉進程池,即不再像進程池中提交任務。如果所有操作持續掛起,它們將在工作進程終止前完成;P.join() 主進程阻塞等待子進程的退出, join方法要在close或terminate之后使用;p.terminate() 結束工作進程,不再處理未處理的任務;- apply(),同步
1.apply本質上就是apply_async().get()2.apply_async().get()會返回結果,但同時也會阻塞,導致變為串行;- apply()示例
from multiprocessing import Pool import time def Foo(i):time.sleep(1)return i + 100 if __name__ == '__main__':start_time = time.time()pool = Pool(5)res_l = []for i in range(5):res = pool.apply(func=Foo, args=(i,))res_l.append(res)pool.close()for res in res_l:print(res)print('---> end')stop_time = time.time()print('Run time is: %s' % (stop_time-start_time))---結果--- 100 101 102 103 104 ---> end Run time is: 5.174000024795532- apply_async(),異步非阻塞
如果使用異步提交的任務,主進程需要使用join,等待進程池內任務都處理完,然后可以用get收集結果,
?否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了;
調用join之前,一定要先調用close() 函數,否則會出錯; close()執行后不會有新的進程加入到pool,join函數等待素有子進程結束;
-?apply_async() 示例1:沒有后面的join() 和 get(),則程序整體結束,進程池中的任務還沒來得及全部執行完也都跟著主進程一起結束了;
# 在主線程內沒有使用join()的情況 from multiprocessing import Pool import time def Foo(i):time.sleep(1)return i + 100 def Bar(arg):print('--->exec done:', arg) if __name__ == '__main__':start_time = time.time()pool = Pool(5)for i in range(5):pool.apply_async(func=Foo, args=(i,), callback=Bar)print('end')pool.close()stop_time = time.time()print('Run time is: %s' % (stop_time-start_time))---結果--- end Run time is: 0.08100008964538574- apply_async() 示例2:主線程里沒寫join(),但是使用get()獲取進程池中進程執行的結果
from multiprocessing import Pool import time def Foo(i):time.sleep(1)return i + 100 def Bar(arg):print('--->exec done:', arg) if __name__ == '__main__':start_time = time.time()pool = Pool(5)res_l = []for i in range(5):res = pool.apply_async(func=Foo, args=(i,))res_l.append(res)pool.close() # 關閉進程池,不再向進程池中提交任務;for res in res_l:print(res.get()) # 使用get()獲取進程池中進程的執行結果print('end')stop_time = time.time()print('Run time is: %s' % (stop_time-start_time))---結果--- 100 101 102 103 104 end Run time is: 1.2239999771118164- apply_async() 示例3:使用join()但不使用get()
# 在主線程內使用 join() 的情況 from multiprocessing import Pool import time def Foo(i):time.sleep(1)return i + 100 def Bar(arg):print('--->exec done:', arg) if __name__ == '__main__':start_time = time.time()pool = Pool(5)for i in range(5):pool.apply_async(func=Foo, args=(i,), callback=Bar)pool.close() # 關閉進程池,不再向進程池中提交任務;pool.join() # 進程池中進程執行完畢后再關閉,如果注釋那么程序直接關閉;print('end')stop_time = time.time()print('Run time is: %s' % (stop_time-start_time))---結果--- --->exec done: 100 --->exec done: 101 --->exec done: 102 --->exec done: 103 --->exec done: 104 end Run time is: 1.3329999446868896- 進程池Pool改寫socket并發通信,避免使用多進程的缺陷問題
# 進程池 Pool實現socket服務端from multiprocessing import Pool from socket import * import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8090)) server.listen(5)def talk(conn,addr):print('子進程id:%s' % os.getpid())while True: #通訊循環try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:break if __name__ == '__main__':print("cpu_count: %s" % os.cpu_count())pool=Pool()while True: #鏈接循環conn,addr=server.accept()print(addr)# pool.apply(talk,args=(conn,addr))pool.apply_async(talk,args=(conn,addr)) # socket客戶端from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8090))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8')) cpu_count: 4 <--- Pool()默認取這個值 ('127.0.0.1', 4944) 子進程id:2872 <---進程池中第一個進程的id ('127.0.0.1', 4945) 子進程id:1076 <---進程池中第二個進程的id ('127.0.0.1', 4948) 子進程id:5544 <---進程池中第三個進程的id ('127.0.0.1', 4951) 子進程id:5500 <---進程池中第四個進程的id ('127.0.0.1', 4952) ('127.0.0.1', 4953) 子進程id:2872 <=== 后面新來的連接復用原來的進程連接 子進程id:1076 <=== 如果進程池中4個進程都在用,則后面新來的連接將處在阻塞的狀態,一旦有進程釋放,新連接就會復用被釋放的進程id ('127.0.0.1', 4975) 子進程id:5544 ('127.0.0.1', 4982)- 回調函數應用
回調函數 是主進程在處理;誰有返回值就通知主進程,然后主進程去執行回調函數里的操作; - 不需要回調函數的場景:如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數;此種情況可使用get()獲取執行結果;- 需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數; 我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行), 這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。- 應用示例:爬蟲
from multiprocessing import Pool import requests import re import jsondef get_page(url,pattern): # 判斷并下載網頁,并返回給回調函數response=requests.get(url)if response.status_code == 200:return (response.text,pattern)else:print('response.status_code not 200.') def parse_page(info): # 作為回調函數,按照寫好的正則去解析網頁內容page_content,pattern=infores=re.findall(pattern,page_content)for item in res:dic={'index':item[0],'title':item[1],'actor':item[2].strip()[3:],'time':item[3][5:],'score':item[4]+item[5]}with open('maoyan.txt','a',encoding='utf-8') as f: # 按照定義好的字典,寫入到文件中f.write('%s\n' % json.dumps(dic)) if __name__ == '__main__':pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)url_dic={'http://maoyan.com/board':pattern1,}p=Pool()for url,pattern in url_dic.items():p.apply_async(get_page,args=(url,pattern),callback=parse_page)p.close()p.join() # 等待進程池的進程任務都執行完畢with open('maoyan.txt', mode='r', encoding='utf-8') as rf:item_list = rf.readlines()for item in item_list:res = json.loads(item)print(res) # 讀取文件內容,查看爬取的內容---maoyan.txt內容--- {"actor": "\u738b\u9719,\u4fdd\u5251\u950b,\u8463\u52c7", "index": "1", "score": "9.3", "time": "2017-06-30", "title": "\u8840\u6218\u6e58\u6c5f"} {"actor": "\u674e\u5fae\u6f2a,\u4ea6\u98ce", "index": "2", "score": "9.3", "time": "2017-06-16", "title": "\u91cd\u8fd4\u00b7\u72fc\u7fa4"} {"actor": "\u9ad8\u5f3a,\u4e8e\u6708\u4ed9,\u674e\u7389\u5cf0", "index": "3", "score": "9.2", "time": "2017-06-09", "title": "\u5fe0\u7231\u65e0\u8a00"} {"actor": "\u6768\u57f9,\u5c3c\u739b\u624e\u5806,\u65af\u6717\u5353\u560e", "index": "4", "score": "8.9", "time": "2017-06-20", "title": "\u5188\u4ec1\u6ce2\u9f50"} {"actor": "\u6234\u592b\u00b7\u5e15\u7279\u5c14,\u9c81\u59ae\u00b7\u739b\u62c9,\u5927\u536b\u00b7\u6587\u7ff0", "index": "5", "score": "8.8", "time": "2017-06-22", "title": "\u96c4\u72ee"} {"actor": "\u76d6\u5c14\u00b7\u52a0\u6735,\u514b\u91cc\u65af\u00b7\u6d3e\u6069,\u7f57\u5bbe\u00b7\u6000\u7279", "index": "6", "score": "8.6", "time": "2017-06-02", "title": "\u795e\u5947\u5973\u4fa0"} {"actor": "\u8521\u5353\u598d,\u5468\u67cf\u8c6a,\u949f\u6b23\u6f7c", "index": "7", "score": "8.5", "time": "2017-06-23", "title": "\u539f\u8c05\u4ed677\u6b21"} {"actor": "\u738b\u6653\u5f64,\u674e\u6654,\u6d2a\u6d77\u5929", "index": "8", "score": "8.1", "time": "2017-05-27", "title": "\u4e09\u53ea\u5c0f\u732a2"} {"actor": "\u590f\u96e8,\u95eb\u59ae,\u6f58\u658c\u9f99", "index": "9", "score": "8.0", "time": "2017-06-29", "title": "\u53cd\u8f6c\u4eba\u751f"} {"actor": "\u6768\u5e42,\u970d\u5efa\u534e,\u91d1\u58eb\u6770", "index": "10", "score": "7.9", "time": "2017-06-29", "title": "\u9006\u65f6\u8425\u6551"}---打印結果--- {'actor': '王霙,保劍鋒,董勇', 'index': '1', 'score': '9.3', 'time': '2017-06-30', 'title': '血戰湘江'} {'actor': '李微漪,亦風', 'index': '2', 'score': '9.3', 'time': '2017-06-16', 'title': '重返·狼群'} {'actor': '高強,于月仙,李玉峰', 'index': '3', 'score': '9.2', 'time': '2017-06-09', 'title': '忠愛無言'} {'actor': '楊培,尼瑪扎堆,斯朗卓嘎', 'index': '4', 'score': '8.9', 'time': '2017-06-20', 'title': '岡仁波齊'} {'actor': '戴夫·帕特爾,魯妮·瑪拉,大衛·文翰', 'index': '5', 'score': '8.8', 'time': '2017-06-22', 'title': '雄獅'} {'actor': '蓋爾·加朵,克里斯·派恩,羅賓·懷特', 'index': '6', 'score': '8.6', 'time': '2017-06-02', 'title': '神奇女俠'} {'actor': '蔡卓妍,周柏豪,鐘欣潼', 'index': '7', 'score': '8.5', 'time': '2017-06-23', 'title': '原諒他77次'} {'actor': '王曉彤,李曄,洪海天', 'index': '8', 'score': '8.1', 'time': '2017-05-27', 'title': '三只小豬2'} {'actor': '夏雨,閆妮,潘斌龍', 'index': '9', 'score': '8.0', 'time': '2017-06-29', 'title': '反轉人生'} {'actor': '楊冪,霍建華,金士杰', 'index': '10', 'score': '7.9', 'time': '2017-06-29', 'title': '逆時營救'}?
六、進程同步(鎖)
進程之間數據不共享,但是共享同一套文件系統; 所以訪問同一個文件,或同一個打印終端,是沒有問題的;1.共享同一打印終端
多個進程同時執行打印操作: 發現會有多行內容打印到一行的現象(多個進程共享并搶占同一個打印終端,亂了)- 示例:
# 共享同一個打印終端 import os,time from multiprocessing import Process class Logger(Process):def __init__(self):super().__init__()# super(Logger,self).__init__()def run(self):# time.sleep(1)print(self.name,'pid: %s' % os.getpid())if __name__ == '__main__':for i in range(100): # i5, 4核,起1W個進程就會死機,慎跑...l=Logger()l.start()2.共享同一個文件
共享同一個文件; 有的同學會想到,既然可以用文件共享數據,那么進程間通信用文件作為數據傳輸介質就可以了啊;可以,但是有問題:1.效率 2.需要自己加鎖處理- 示例:
#多進程共享一套文件系統 from multiprocessing import Process def write_to_file(file,mode,num):with open(file,mode=mode,encoding='utf-8') as wf:wf.write(num) if __name__ == '__main__':for i in range(50):p = Process(target=write_to_file,args=('a.txt','a',str(i)))p.start()3.鎖(互斥鎖)
加鎖的目的是為了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改;沒錯,速度是慢了,犧牲了速度而保證了數據安全;進程之間數據隔離,但是共享一套文件系統, 因而可以通過文件來實現進程直接的通信,但問題是必須自己加鎖處理- 模擬搶票程序
#文件db的內容為:{"count":1}#注意一定要用雙引號,不然json無法識別;- 沒有加鎖的搶票程序:由于沒有加鎖,所以存在多個進程同時去讀寫文件的情況,導致一張票被多個人搶到;
from multiprocessing import Process,Lock import json import time import random def work(dbfile,name,lock):with open(dbfile,encoding='utf-8') as f:dic=json.loads(f.read())if dic['count'] > 0:dic['count'] -= 1time.sleep(random.randint(1,3)) # 模擬網絡延遲with open(dbfile,'w',encoding='utf-8') as f:f.write(json.dumps(dic))print('\033[43m%s 搶票成功\033[0m' %name)else:print('\033[45m%s 搶票失敗\033[0m' %name)if __name__ == '__main__':lock=Lock() # 進程間不共享數據,所以需要把lock當做參數傳進入p_l=[]for i in range(100):p=Process(target=work,args=('a.txt','用戶%s' %i,lock))p_l.append(p)p.start()for p in p_l:p.join()print('主進程')---結果--- 用戶6 搶票成功 用戶19 搶票失敗 用戶0 搶票失敗 用戶3 搶票成功 用戶45 搶票成功 用戶38 搶票失敗 用戶10 搶票成功 用戶51 搶票失敗 用戶5 搶票失敗 用戶58 搶票失敗 用戶8 搶票失敗 用戶54 搶票失敗 用戶87 搶票失敗 用戶71 搶票失敗 用戶94 搶票失敗 用戶66 搶票失敗 用戶93 搶票失敗 用戶1 搶票成功 用戶61 搶票失敗 用戶31 搶票失敗 用戶70 搶票失敗 用戶13 搶票失敗 用戶77 搶票失敗 用戶92 搶票失敗 用戶7 搶票失敗 用戶34 搶票失敗 用戶44 搶票失敗 用戶23 搶票失敗 用戶29 搶票失敗 用戶33 搶票失敗 用戶41 搶票失敗 用戶82 搶票失敗 用戶86 搶票失敗 用戶39 搶票失敗 用戶43 搶票失敗 用戶90 搶票失敗 用戶17 搶票失敗 用戶28 搶票失敗 用戶14 搶票失敗 用戶67 搶票失敗 用戶48 搶票失敗 用戶37 搶票失敗 用戶24 搶票失敗 用戶63 搶票失敗 用戶46 搶票失敗 用戶25 搶票失敗 用戶74 搶票失敗 用戶47 搶票失敗 用戶80 搶票失敗 用戶57 搶票失敗 用戶11 搶票失敗 用戶30 搶票失敗 用戶96 搶票失敗 用戶73 搶票失敗 用戶91 搶票失敗 用戶22 搶票失敗 用戶20 搶票失敗 用戶89 搶票失敗 用戶83 搶票失敗 用戶98 搶票失敗 用戶53 搶票失敗 用戶88 搶票失敗 用戶79 搶票失敗 用戶78 搶票失敗 用戶49 搶票失敗 用戶64 搶票失敗 用戶95 搶票失敗 用戶18 搶票失敗 用戶97 搶票失敗 用戶59 搶票失敗 用戶72 搶票失敗 用戶42 搶票失敗 用戶21 搶票失敗 用戶32 搶票失敗 用戶4 搶票失敗 用戶27 搶票失敗 用戶65 搶票失敗 用戶62 搶票失敗 用戶99 搶票失敗 用戶55 搶票失敗 用戶81 搶票失敗 用戶15 搶票失敗 用戶40 搶票失敗 用戶69 搶票失敗 用戶85 搶票失敗 用戶16 搶票失敗 用戶50 搶票失敗 用戶26 搶票失敗 用戶60 搶票失敗 用戶75 搶票失敗 用戶35 搶票失敗 用戶68 搶票失敗 用戶36 搶票失敗 用戶52 搶票失敗 用戶84 搶票失敗 用戶76 搶票失敗 用戶12 搶票成功 用戶2 搶票成功 用戶9 搶票成功 用戶56 搶票成功 主進程- 加鎖的情況:犧牲執行速度,保護了數據安全性,只有一個用戶能搶到票;
from multiprocessing import Process,Lock import json import time import random def work(dbfile,name,lock):# lock.acquire()with lock:with open(dbfile,encoding='utf-8') as f:dic=json.loads(f.read())if dic['count'] > 0:dic['count']-=1time.sleep(random.randint(1,3)) #模擬網絡延遲with open(dbfile,'w',encoding='utf-8') as f:f.write(json.dumps(dic))print('\033[43m%s 搶票成功\033[0m' %name)else:print('\033[45m%s 搶票失敗\033[0m' %name)# lock.release() if __name__ == '__main__':start_time = time.time()lock=Lock() # 進程間不共享數據,所以需要把lock當做參數傳進入p_l=[]for i in range(100):p=Process(target=work,args=('a.txt','用戶%s' % i,lock))p_l.append(p)p.start()for p in p_l:p.join()stop_time = time.time()print('Run time: %s' % (stop_time-start_time))---結果--- 用戶19 搶票成功 用戶1 搶票失敗 用戶0 搶票失敗 用戶3 搶票失敗 用戶7 搶票失敗 用戶13 搶票失敗 用戶5 搶票失敗 用戶55 搶票失敗 用戶51 搶票失敗 用戶43 搶票失敗 用戶39 搶票失敗 用戶59 搶票失敗 用戶63 搶票失敗 用戶62 搶票失敗 用戶2 搶票失敗 用戶50 搶票失敗 用戶47 搶票失敗 用戶23 搶票失敗 用戶14 搶票失敗 用戶9 搶票失敗 用戶18 搶票失敗 用戶75 搶票失敗 用戶21 搶票失敗 用戶27 搶票失敗 用戶54 搶票失敗 用戶11 搶票失敗 用戶61 搶票失敗 用戶15 搶票失敗 用戶31 搶票失敗 用戶38 搶票失敗 用戶25 搶票失敗 用戶35 搶票失敗 用戶6 搶票失敗 用戶30 搶票失敗 用戶34 搶票失敗 用戶42 搶票失敗 用戶36 搶票失敗 用戶67 搶票失敗 用戶26 搶票失敗 用戶46 搶票失敗 用戶17 搶票失敗 用戶49 搶票失敗 用戶71 搶票失敗 用戶22 搶票失敗 用戶45 搶票失敗 用戶10 搶票失敗 用戶53 搶票失敗 用戶65 搶票失敗 用戶29 搶票失敗 用戶69 搶票失敗 用戶73 搶票失敗 用戶33 搶票失敗 用戶52 搶票失敗 用戶58 搶票失敗 用戶37 搶票失敗 用戶41 搶票失敗 用戶24 搶票失敗 用戶40 搶票失敗 用戶48 搶票失敗 用戶81 搶票失敗 用戶57 搶票失敗 用戶32 搶票失敗 用戶83 搶票失敗 用戶79 搶票失敗 用戶77 搶票失敗 用戶20 搶票失敗 用戶95 搶票失敗 用戶89 搶票失敗 用戶93 搶票失敗 用戶91 搶票失敗 用戶44 搶票失敗 用戶99 搶票失敗 用戶97 搶票失敗 用戶90 搶票失敗 用戶87 搶票失敗 用戶85 搶票失敗 用戶74 搶票失敗 用戶66 搶票失敗 用戶16 搶票失敗 用戶86 搶票失敗 用戶82 搶票失敗 用戶98 搶票失敗 用戶70 搶票失敗 用戶78 搶票失敗 用戶72 搶票失敗 用戶84 搶票失敗 用戶28 搶票失敗 用戶94 搶票失敗 用戶4 搶票失敗 用戶60 搶票失敗 用戶8 搶票失敗 用戶96 搶票失敗 用戶80 搶票失敗 用戶92 搶票失敗 用戶76 搶票失敗 用戶88 搶票失敗 用戶12 搶票失敗 用戶64 搶票失敗 用戶56 搶票失敗 用戶68 搶票失敗 Run time: 4.325000047683716?
七、進程間通信
1.操作同一個文件
- 需對文件加鎖:保護共享數據;
- 見上面搶票的例子;
2.IPC,InterProcess Communication
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的;- 1.隊列(推薦使用),隊列Queue是管道+鎖實現的;先進先出;
# Queue類 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞;# 屬性 maxsize是隊列中允許最大項數,省略則無大小限制;# 常用方法 q.put() 用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。 如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。 如果超時,會拋出Queue.Full異常。 如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。q.get() 可以從隊列讀取并且刪除一個元素。get方法也有兩個可選參數:blocked和timeout。 如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。 如果blocked為False,有兩種情況存在:如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.q.get_nowait(): 同 q.get(False)q.put_nowait(): 同 q.put(False)q.empty() 調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目;q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走;q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣; def put(self, obj, block=True, timeout=None): # 隊列滿的時候,q.put()會阻塞,直到隊列不滿passdef get(self, block=True, timeout=None): # 隊列空的時候,q,get()會阻塞,直到隊列有新元素pass- 示例1
from multiprocessing import Process,Queueq = Queue(3) q.put(1) q.put('liu') q.put(('a','b','c',)) print(q.full()) print(q.qsize()) print(q.get()) print(q.get()) print(q.get()) print(q.empty())--- True 3 1 liu ('a', 'b', 'c') True- 示例2:簡單的生產者消費者模型
from multiprocessing import Queue, Process import random, os, timedef getter(name, queue):print('Son process name: %s, pid: %s, ppid: %s' % (name, os.getpid(), os.getppid()))while True:try:value = queue.get(block=True, timeout=3)# block為True,就是如果隊列中無數據了。# |—————— 若timeout默認是None,那么會一直等待下去。# |—————— 若timeout設置了時間,那么會等待timeout秒后才會拋出Queue.Empty異常# block 為False,如果隊列中無數據,就拋出Queue.Empty異常print("Process getter get: %f" % value)except Exception as e:print('getter捕捉到異常:%s' % e)breakdef putter(name, queue):print('Son process name: %s, pid: %s, ppid: %s' % (name, os.getpid(), os.getppid()))for i in range(0, 10):time.sleep(1)value = random.randint(1,10)queue.put(value)# 放入數據 put(obj[, block[, timeout]])# 若block為True,如隊列是滿的:# |—————— 若timeout是默認None,那么就會一直等下去# |—————— 若timeout設置了等待時間,那么會等待timeout秒后,如果還是滿的,那么就拋出Queue.Full.# 若block是False,如果隊列滿了,直接拋出Queue.Fullprint("Process putter put: %f" % value)if __name__ == '__main__':queue = Queue()getter_process = Process(target=getter, args=("Getter", queue))putter_process = Process(target=putter, args=("Putter", queue))getter_process.start()putter_process.start()getter_process.join()putter_process.join()print('Main process, pid: %s' % os.getpid())---結果--- Son process name: Getter, pid: 3088, ppid: 5656 Son process name: Putter, pid: 2712, ppid: 5656 Process putter put: 8.000000 Process getter get: 8.000000 Process putter put: 6.000000 Process getter get: 6.000000 Process putter put: 6.000000 Process getter get: 6.000000 Process putter put: 8.000000 Process getter get: 8.000000 Process putter put: 3.000000 Process getter get: 3.000000 Process putter put: 6.000000 Process getter get: 6.000000 Process putter put: 1.000000 Process getter get: 1.000000 Process putter put: 7.000000 Process getter get: 7.000000 Process putter put: 8.000000 Process getter get: 8.000000 Process putter put: 6.000000 Process getter get: 6.000000 getter捕捉到異常: Main process, pid: 5656- 2.管道(不推薦)
tail -f access.log |grep '404' # Pipe函數 def Pipe(duplex=True):return Connection(), Connection()Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe. 在進程之間創建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象;強調一點:必須在產生Process對象之前產生管道;# 參數 dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發送。新建一個Pipe(duplex)的時候,如果duplex為True,那么創建的管道是雙向的;如果duplex為False,那么創建的管道是單向的。# 常用方法 conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError;conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象
- 管道Pipe建立的過程(Pipe的讀寫效率要高于Queue。)
進程間的Pipe基于fork機制建立。當主進程創建Pipe的時候,Pipe的兩個Connections連接的的都是主進程。當主進程創建子進程后,Connections也被拷貝了一份。此時有了4個Connections。此后,關閉主進程的一個Out Connection,關閉一個子進程的一個In Connection。那么就建立好了一個輸入在主進程,輸出在子進程的管道。- 示例1:基于管道實現的進程間通信
from multiprocessing import Pipe, Processdef son_process(pipe):_out_pipe, _in_pipe = pipe# 關閉fork過來的輸入端_in_pipe.close()while True:try:msg = _out_pipe.recv()print('子進程通過管道獲得消息:%s' % msg)except Exception as e:print('子進程中捕獲到異常:%s' % e)# 當out_pipe接受不到輸出的時候且輸入被關閉的時候,會拋出EORFError,可以捕獲并且退出子進程breakif __name__ == '__main__':out_pipe, in_pipe = Pipe(True)son_p = Process(target=son_process, args=((out_pipe, in_pipe),))son_p.start()# 等pipe被fork 后,關閉主進程的輸出端# 這樣,創建的Pipe一端連接著主進程的輸入,一端連接著子進程的輸出口out_pipe.close()for x in range(20):in_pipe.send(x)in_pipe.close()son_p.join()print("主進程也結束了")---結果--- 子進程通過管道獲得消息:0 子進程通過管道獲得消息:1 子進程通過管道獲得消息:2 子進程通過管道獲得消息:3 子進程通過管道獲得消息:4 子進程通過管道獲得消息:5 子進程通過管道獲得消息:6 子進程通過管道獲得消息:7 子進程通過管道獲得消息:8 子進程通過管道獲得消息:9 子進程通過管道獲得消息:10 子進程通過管道獲得消息:11 子進程通過管道獲得消息:12 子進程通過管道獲得消息:13 子進程通過管道獲得消息:14 子進程通過管道獲得消息:15 子進程通過管道獲得消息:16 子進程通過管道獲得消息:17 子進程通過管道獲得消息:18 子進程通過管道獲得消息:19 子進程中捕獲到異常: 主進程也結束了- 示例2:基于管道實現的進程間通信
from multiprocessing import Process,Pipe import time def consumer(p,name):left,right=pleft.close()while True:try:baozi=right.recv()print('%s 收到包子:%s' %(name,baozi))except EOFError:right.close()break def producer(seq,p):left,right=pright.close()for i in seq:left.send(i)time.sleep(1)else:left.close() if __name__ == '__main__':left,right=Pipe()c1=Process(target=consumer,args=((left,right),'c1'))c1.start()seq=(i for i in range(10))producer(seq,(left,right))right.close()left.close()c1.join()print('主進程')---結果--- c1 收到包子:0 c1 收到包子:1 c1 收到包子:2 c1 收到包子:3 c1 收到包子:4 c1 收到包子:5 c1 收到包子:6 c1 收到包子:7 c1 收到包子:8 c1 收到包子:9 主進程- 示例3:基于管道的雙向通信
from multiprocessing import Process,Pipedef adder(p,name):server,client=pclient.close()while True:try:x,y=server.recv()print('%s 通過管道收到了:%s 和 %s' % (name,x,y))except EOFError:server.close()breakres = x + yserver.send(res)print('%s 回應消息:%s' % (name, res)) if __name__ == '__main__':server,client=Pipe()s1=Process(target=adder,args=((server,client),'s1'))s1.start()server.close()client.send((10,20))print(client.recv())client.close()s1.join()print('主進程')---結果--- s1 通過管道收到了:10 和 20 30 s1 回應消息:30 主進程- 3.擴展:生產者消費者模型
- 1.主進程作為生產者,一個子進程作為消費者
# 生產者消費者模型 from multiprocessing import Process,Queue import time import randomdef consumer(q,name):while True:time.sleep(random.randint(1,3))try:res=q.get(block=True,timeout=5)print('\033[41m消費者%s拿到了%s\033[0m' %(name,res))except Exception as e:print('消費者捕獲異常:%s' % e)break def producer(seq,q,name):for item in seq:time.sleep(random.randint(1,3))q.put(item)print('\033[45m生產者%s生產了%s\033[0m' %(name,item))if __name__ == '__main__':q=Queue()c=Process(target=consumer,args=(q,'standby'),)c.start()seq=['包子%s' % i for i in range(10)] # 列表生成式producer(seq,q,'廚師')print('生產者已經生產完畢')---結果--- 生產者廚師生產了包子0 生產者廚師生產了包子1 消費者standby拿到了包子0 生產者廚師生產了包子2 消費者standby拿到了包子1 消費者standby拿到了包子2 生產者廚師生產了包子3 生產者廚師生產了包子4 消費者standby拿到了包子3 生產者廚師生產了包子5 消費者standby拿到了包子4 生產者廚師生產了包子6 消費者standby拿到了包子5 生產者廚師生產了包子7 消費者standby拿到了包子6 消費者standby拿到了包子7 生產者廚師生產了包子8 消費者standby拿到了包子8 生產者廚師生產了包子9 生產者已經生產完畢 消費者standby拿到了包子9 消費者捕獲異常:- 2.開兩個子進程,一個作為生產者,另一個作為消費者
from multiprocessing import Process,Queue import time import randomdef consumer(q,name):while True:time.sleep(random.randint(1,3))res=q.get()if res is None:breakprint('\033[41m消費者%s拿到了%s\033[0m' %(name,res)) def producer(seq,q,name):for item in seq:time.sleep(random.randint(1,3))q.put(item)print('\033[45m生產者%s生產了%s\033[0m' %(name,item))q.put(None)if __name__ == '__main__':q=Queue()c=Process(target=consumer,args=(q,'standby'),)c.start()seq=['包子%s' %i for i in range(10)]p=Process(target=producer,args=(seq,q,'廚師'))p.start()c.join()print('主進程')---結果--- 生產者廚師生產了包子0 消費者standby拿到了包子0 生產者廚師生產了包子1 消費者standby拿到了包子1 生產者廚師生產了包子2 生產者廚師生產了包子3 消費者standby拿到了包子2 生產者廚師生產了包子4 消費者standby拿到了包子3 生產者廚師生產了包子5 消費者standby拿到了包子4 生產者廚師生產了包子6 消費者standby拿到了包子5 生產者廚師生產了包子7 消費者standby拿到了包子6 生產者廚師生產了包子8 消費者standby拿到了包子7 生產者廚師生產了包子9 消費者standby拿到了包子8 消費者standby拿到了包子9 主進程- 3.JoinableQueue
創建隊列的另外一個類:JoinableQueue([maxsize]); 這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。 通知進程是使用共享的信號和條件變量來實現的。#參數介紹: maxsize是隊列中允許最大項數,省略則無大小限制;#方法介紹: JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:q.task_done(): 使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常; q.join(): 生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止;- JoinableQueue示例
from multiprocessing import Process,JoinableQueue import time import randomdef consumer(q,name):while True:time.sleep(random.randint(1,3))res=q.get()q.task_done()print('\033[41m消費者%s拿到了%s\033[0m' %(name,res)) def producer(seq,q,name):for item in seq:time.sleep(random.randint(1,3))q.put(item)print('\033[45m生產者%s生產了%s\033[0m' %(name,item))print('生產者生產完畢')q.join()print('消費者都消費完了')if __name__ == '__main__':q=JoinableQueue()c=Process(target=consumer,args=(q,'standby'),)c.daemon=True #設置守護進程,主進程結束c就結束c.start()seq=['包子%s' %i for i in range(10)]p=Process(target=producer,args=(seq,q,'廚師'))p.start()p.join() #主進程等待p結束,p等待c把數據都取完,c一旦取完數據,p.join就是不再阻塞,進# 而主進程結束,主進程結束會回收守護進程c,而且c此時也沒有存在的必要了print('主進程')---結果--- 生產者廚師生產了包子0 消費者standby拿到了包子0 生產者廚師生產了包子1 消費者standby拿到了包子1 生產者廚師生產了包子2 消費者standby拿到了包子2 生產者廚師生產了包子3 消費者standby拿到了包子3 生產者廚師生產了包子4 消費者standby拿到了包子4 生產者廚師生產了包子5 消費者standby拿到了包子5 生產者廚師生產了包子6 消費者standby拿到了包子6 生產者廚師生產了包子7 消費者standby拿到了包子7 生產者廚師生產了包子8 消費者standby拿到了包子8 生產者廚師生產了包子9 生產者生產完畢 消費者standby拿到了包子9 消費者都消費完了 主進程3.Manager共享內存
- 示例1:不加鎖的情況
# Manager實現共享內存,沒加鎖的例子 # 這種情況會出現同時有多個進程在寫同一個內存中所共享的數據,導致最后數據不對; from multiprocessing import Manager,Process def func(dic):dic['count'] -= 1if __name__ == '__main__':m = Manager()dic = m.dict({'count':100})obj_l = []for i in range(100):p = Process(target=func,args=(dic,))p.start()obj_l.append(p)for p in obj_l:p.join()print(dic)---結果--- {'count': 2}- 示例2:第一種加鎖的寫法
# Manager實現共享內存,加鎖實現的例子 # 第一種寫法: from multiprocessing import Manager,Process,Lock def func(dic,lock):lock.acquire()dic['count'] -= 1lock.release()if __name__ == '__main__':lock = Lock()m = Manager()dic = m.dict({'count':100})obj_l = []for i in range(100):p = Process(target=func,args=(dic,lock))p.start()obj_l.append(p)for p in obj_l:p.join()print(dic)---結果--- {'count': 0}- 示例3.第二種加鎖的寫法
from multiprocessing import Manager,Process,Lock def func(dic,lock):with lock:dic['count'] -= 1if __name__ == '__main__':lock = Lock()# m = Manager()with Manager() as m:dic = m.dict({'count':100})obj_l = []for i in range(100):p = Process(target=func,args=(dic,lock))p.start()obj_l.append(p)for p in obj_l:p.join()print(dic)---結果--- {'count': 0}?
八、paramiko模塊
Paramiko is a Python (2.6+, 3.3+) implementation of the SSHv2 protocol [1], providing both client and server functionality. While it leverages a Python C extension for low level cryptography (Cryptography), Paramiko itself is a pure Python interface around SSH networking concepts.-?paramiko是用python語言寫的一個模塊,遵循SSH2協議,支持以加密和認證的方式,進行遠程服務器的連接。
1 __all__ = [ 2 'Transport', 3 'SSHClient', 4 'MissingHostKeyPolicy', 5 'AutoAddPolicy', 6 'RejectPolicy', 7 'WarningPolicy', 8 'SecurityOptions', 9 'SubsystemHandler', 10 'Channel', 11 'PKey', 12 'RSAKey', 13 'DSSKey', 14 'Message', 15 'SSHException', 16 'AuthenticationException', 17 'PasswordRequiredException', 18 'BadAuthenticationType', 19 'ChannelException', 20 'BadHostKeyException', 21 'ProxyCommand', 22 'ProxyCommandFailure', 23 'SFTP', 24 'SFTPFile', 25 'SFTPHandle', 26 'SFTPClient', 27 'SFTPServer', 28 'SFTPError', 29 'SFTPAttributes', 30 'SFTPServerInterface', 31 'ServerInterface', 32 'BufferedFile', 33 'Agent', 34 'AgentKey', 35 'HostKeys', 36 'SSHConfig', 37 'util', 38 'io_sleep', 39 ] Paramiko提供的方法- 實例1:密碼驗證登錄主機,遠程執行命令
import paramiko IP = '10.0.0.9' PORT = 22 USER = 'root' PASSWORD = '123456.' ssh_conn = paramiko.SSHClient() ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try:ssh_conn.connect(IP,PORT,USER,PASSWORD,timeout=3) except Exception as e:print('連接失敗:%s' % e) while True:cmd = input('Input cmd, q/Q to exit.>>>\t').strip()if 'Q' == cmd.upper():print('Bye...')breakstdin,stdout,stderr = ssh_conn.exec_command(cmd)# print(stdin.read())res = stderr.read().decode('utf-8')if not res:res = stdout.read().decode('utf-8')print(res) ssh_conn.close()- 實例2:密碼驗證登錄主機,執行ftp上傳下載操作
# 下載到本地 import paramikot = paramiko.Transport(('10.0.0.9',22)) t.connect(username='root',password='123456.') sftp = paramiko.SFTPClient.from_transport(t) sftp.get(r'/data/pass.txt','1.txt') t.close()# 上傳到遠端服務器 import paramikot = paramiko.Transport(('10.0.0.9',22)) t.connect(username='root',password='123456.') sftp = paramiko.SFTPClient.from_transport(t) sftp.put(r'D:\soft\work\Python_17\day09\paramiko_demo.py','/data/paramiko_demo.py') # sftp.get(r'/data/pass.txt','1.txt') t.close()- 實例3:秘鑰驗證登錄遠程主機,執行命令
參見:Pool多進程示例
?
九、Python GIL
參考:http://www.dabeaz.com/python/UnderstandingGIL.pdf
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)-?GIL并不是Python的特性,Python完全可以不依賴于GIL;
GIL使得同一時刻統一進程中只有一個線程被執行;進程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優勢;
十、擴展
- 1.列表生成式
seq=['數字:%s' %i for i in range(10)] print(type(seq)) print(seq)---結果--- <class 'list'> ['數字:0', '數字:1', '數字:2', '數字:3', '數字:4', '數字:5', '數字:6', '數字:7', '數字:8', '數字:9']- 2.os.cpu_count()
?
十一、練習
要求:
題目:簡單主機批量管理工具
需求:
代碼實現:
1 # Config for configparser test. 2 3 ;[DEFAULT] 4 ;Admin = 'liulixin@hitedu.org' 5 ;Description = 'Traffic Server' 6 7 [QLS_GROUP] 8 Master = True 9 Slave1 = True 10 Slave2 = False 11 Slave3 = False 12 13 [Master] 14 ip = 10.0.0.9 15 port = 22 16 user = root 17 password = 123456. 18 enable = True 19 20 [Slave1] 21 ip = 10.0.0.8 22 port = 55336 23 user = root 24 password = slave1. 25 enable = True 26 27 [Slave2] 28 ip = 10.0.0.7 29 port = 3333 30 user = root 31 password = slave2. 32 enable = False 33 34 [Slave3] 35 ip = 10.0.0.6 36 port = 3307 37 user = root 38 password = slave3. 39 enable = True my.cnf配置文件程序代碼:
#!/usr/bin/python # -*- coding:utf-8 -*-import os import paramiko import configparser from multiprocessing import Pool file_path = os.path.abspath(__file__) config = configparser.ConfigParser() config.read('my.cnf',encoding='utf-8')info = ''' Command Syntax like this: 批量執行命令:batch_run -h h1,h2 -g web_clusters,db_servers -cmd "df -h" 批量拷貝文件:batch_scp -h h1,h2 -g web_clusters,db_servers -action put test.py /tmp/ 退出:q/Q '''def ssh_cmd(ip,port,user,password,cmd):ssh_conn = paramiko.SSHClient()ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())try:ssh_conn.connect(ip,int(port),user,password,timeout=3)except Exception as e:print('連接失敗:%s' % e)stdin, stdout, stderr = ssh_conn.exec_command(cmd)ret = []ret.append(ip)res = stderr.read().decode('utf-8')if not res:res = stdout.read().decode('utf-8')ret.append(res)return ret def ssh_cmd_callback(ret):print("This is callback func of %s" % ret[0])def scp_put(ip,port,user,password,file_to_put,remote_dir):try:trans = paramiko.Transport((ip,int(port)))trans.connect(username=user, password=password)sftp = paramiko.SFTPClient.from_transport(trans)sftp.put(localpath=os.path.join(os.path.dirname(file_path),file_to_put),\remotepath=r'%s/%s' % (remote_dir,file_to_put))trans.close()except Exception as e:print('scp_put err: %s' % e)def scp_get(ip,port,user,password,file_to_get, local_file_name):try:trans = paramiko.Transport((ip, int(port)))trans.connect(username=user, password=password)sftp = paramiko.SFTPClient.from_transport(trans)print(file_to_get,local_file_name)sftp.get(file_to_get,local_file_name)trans.close()except Exception as e:print('scp_get err: %s' % e)def output(res_list):for res in res_list:# print(res.get())print(res.get()[1])def get_target_ip_dict(cmd_list):target_ip_dict = {}if '-h' in cmd_list:sectionos_list = config.sections()sectionos_list.remove('QLS_GROUP')h_index = cmd_list.index('-h')input_host_str = cmd_list[h_index + 1]input_host_list = input_host_str.split(',')for host in input_host_list:if host.capitalize() in sectionos_list:enable = config.get(host.capitalize(), 'enable')if 'False' == enable:print('The %s is offline now, continue...' % host.capitalize())continueitem_dict = {'ip': None,'port': None,'user': None,'password': None,}item_dict['ip'] = config.get(host.capitalize(), 'ip')item_dict['port'] = config.get(host.capitalize(), 'port')item_dict['user'] = config.get(host.capitalize(), 'user')item_dict['password'] = config.get(host.capitalize(), 'password')if host.capitalize() not in target_ip_dict:target_ip_dict[host.capitalize()] = item_dictelse:print('No server: %s exist.' % host)if '-g' in cmd_list:sectionos_list = config.sections()g_index = cmd_list.index('-g')input_group_str = cmd_list[g_index + 1]input_group_list = input_group_str.split(',')for group in input_group_list:if group.upper() not in sectionos_list:print('No group: %s exist.' % group)else:available_tag_list = []for tag, value in config.items(group.upper()):if 'True' == value:available_tag_list.append(tag.capitalize())for tag in available_tag_list:if tag.capitalize() not in target_ip_dict:item_dict = {'ip': None,'port': None,'user': None,'password': None,}item_dict['ip'] = config.get(tag.capitalize(), 'ip')item_dict['port'] = config.get(tag.capitalize(), 'port')item_dict['user'] = config.get(tag.capitalize(), 'user')item_dict['password'] = config.get(tag.capitalize(), 'password')target_ip_dict[tag.capitalize()] = item_dictreturn target_ip_dictdef batch_run(cmd_list):target_ip_dict = get_target_ip_dict(cmd_list)cmd_index = cmd_list.index('-cmd')cmd_to_exec = ' '.join(cmd_list[cmd_index+1:])my_pool = Pool(len(target_ip_dict))res_list = []for host in target_ip_dict:res = my_pool.apply_async(func=ssh_cmd,args=(target_ip_dict[host]['ip'],\target_ip_dict[host]['port'],\target_ip_dict[host]['user'],\target_ip_dict[host]['password'],\cmd_to_exec.strip('"')),callback=ssh_cmd_callback)res_list.append(res)my_pool.close()my_pool.join()output(res_list)def batch_scp(cmd_list):target_ip_dict = get_target_ip_dict(cmd_list)action_index = cmd_list.index('-action')if 'PUT' != cmd_list[action_index+1].upper() and 'GET' != cmd_list[action_index+1].upper():print("Scp option invaild, just support put and get option.")return# -action put test.py /tmp/if 'PUT' == cmd_list[action_index+1].upper():file_to_put = cmd_list[action_index+2]remote_dir = cmd_list[-1]if os.path.exists(file_to_put):print('Start to put %s' % file_to_put)my_pool = Pool(len(target_ip_dict))res_list = []for host in target_ip_dict:res = my_pool.apply_async(func=scp_put, args=(target_ip_dict[host]['ip'],\target_ip_dict[host]['port'],\target_ip_dict[host]['user'],\target_ip_dict[host]['password'],\file_to_put,remote_dir))res_list.append(res)my_pool.close()my_pool.join()# output(res_list)print('End to put %s' % file_to_put)else:print('%s not exist.' % file_to_put)return# -action get /path/test.py local_file_nameif 'GET' == cmd_list[action_index + 1].upper():file_to_get = cmd_list[action_index + 2]local_file_name = cmd_list[-1]print('Start to get %s' % file_to_get)my_pool = Pool(len(target_ip_dict))res_list = []for host in target_ip_dict:res = my_pool.apply_async(func=scp_get, args=(target_ip_dict[host]['ip'],\target_ip_dict[host]['port'],\target_ip_dict[host]['user'],\target_ip_dict[host]['password'],\file_to_get, local_file_name))res_list.append(res)my_pool.close()my_pool.join()print('End to get %s' % file_to_get)def bye():print('Bye...')exit(0) cmd_option = {'batch_run':batch_run,'batch_scp':batch_scp, } if __name__ == '__main__':while True:print(info)cmd_input = input('請輸入命令>>>\t').strip()if 'Q' == cmd_input.upper():bye()cmd_list = cmd_input.split()if cmd_list[0] not in cmd_option:print('輸入無效')elif '-h' != cmd_list[1] and '-g' != cmd_list[1]:print(type(cmd_list[1]),cmd_list[1])print('目標主機/主機組無效')elif '-cmd' not in cmd_list and '-action' not in cmd_list:print('輸入的操作命令不符合語法規則')else:cmd_option[cmd_list[0]](cmd_list)
轉載于:https://www.cnblogs.com/standby/p/7091911.html
總結
以上是生活随笔為你收集整理的Python异常处理和进程线程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: css与网页制作
- 下一篇: 一个关于python装饰器参数的问题