python3 进程
1.開進(jìn)程的兩種方式:
1. 使用內(nèi)置的進(jìn)程
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/17from multiprocessing import Process import osdef get_id(name):print(name,"Main process:",os.getppid(),"current process;", os.getpid())P1 = Process(target=get_id, args=('andy',)) P2 = Process(target = get_id, args=("Jack", ))if __name__ == "__main__":P2.start()P1.start()print("主進(jìn)程")?2. 自定義進(jìn)程類:
from multiprocessing import Process import osclass Custom_Process(Process):def __init__(self, name):super().__init__()self.name = namedef run(self):print(self.name, "Main process:", os.getppid(), "current process;", os.getpid())if __name__ == "__main__":P1 = Custom_Process('andy')P2 = Custom_Process("jack")P1.start()P2.start()print("主進(jìn)程")?事實(shí)上在調(diào)用P1.start時(shí),系統(tǒng)調(diào)用了Process類的run方法,在我們直接調(diào)用Process類時(shí),
我們需要指定target(即要進(jìn)行的操作,參數(shù)args),那么定制后我們重寫了run方法,即重寫的
run方法。
在Custom_Process類中我用到了
super().__init__()?這是重寫父類的方法之一,另一種方法是:
Parent.__init__(self)在這里就是:Process.__init__()
關(guān)于super().__init__()事實(shí)上并不是調(diào)用父類,而是尋找繼承順序中的下一個(gè)
具體可以參考:Python’s super() considered super!
?
下面是一個(gè)應(yīng)用進(jìn)程的例子,之前在寫 cs模型 ? ?時(shí)有:
server.listen(5)# 設(shè)置可以接受的連接數(shù)量?雖然這里可以接受5個(gè)鏈接,但事實(shí)上由于功能上并未實(shí)現(xiàn)
所以每次只有一個(gè)鏈接可以正常進(jìn)行通信,其他的鏈接都必須
等到之前的鏈接完成才行。
下面著手改進(jìn):
server
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/16import socket,json, struct, subprocess from multiprocessing import ProcessBUFF_SIZE = 1024 IP_PORT = ("127.0.0.1", 8081)server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)# 重用端口 server.bind(IP_PORT) server.listen(5)# 設(shè)置可以接受的連接數(shù)量def communicate(conn, client_addr):while True:# 內(nèi)層循環(huán)為通信循環(huán)msg = conn.recv(BUFF_SIZE)if not msg:breakpipes = subprocess.Popen(msg.decode("utf-8"),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)error = pipes.stderr.read()if error:print("Error:",error)response_msg = errorelse:response_msg = pipes.stdout.read()header = {'data_size':len(response_msg)}# 數(shù)據(jù)長度header_json = json.dumps(header)#序列化header_json_byte = bytes(header_json,encoding="utf-8")conn.send(struct.pack('i',len(header_json_byte)))#先發(fā)送報(bào)頭長度,僅包含數(shù)據(jù)長度, 這里的i指int類型conn.send(header_json_byte)# 再發(fā)送報(bào)頭conn.sendall(response_msg)# 正式的信息print("Request from:",client_addr, "Command:",msg)conn.close() if __name__ == "__main__":while True:# 外層循環(huán)為鏈接循環(huán)conn, client_addr = server.accept()p = Process(target=communicate, args=(conn, client_addr))p.start()server.close()?client未變:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/16import socket, json, structBUFF_SIZE = 1024 IP_PORT = ("127.0.0.1", 8081)client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(IP_PORT)while True:msg = input(">>:").strip().encode("utf-8")if not msg:breakclient.send(msg)header = client.recv(4)print("Header:",struct.unpack("i", header))header_length = struct.unpack('i', header)[0]print("Header_length:", header_length)header_json = json.loads(client.recv(header_length).decode("utf-8"))data_size = header_json['data_size']print("Data_size:",data_size)recv_size = 0recv_data = b''while recv_size < data_size:recv_data += client.recv(BUFF_SIZE)recv_size += len(recv_data)print(recv_data.decode("gbk"))client.close()?看下運(yùn)行結(jié)果,這里只開了兩個(gè)客戶端,5個(gè)同樣的道理:
?
2.LOCK 互斥鎖:
import os, time from multiprocessing import Process, Lockdef work(mutex):mutex.acquire()print("%d is working..." % os.getpid())time.sleep(2)print("%d is done!" % os.getpid())mutex.release()if __name__ == "__main__":mutex = Lock()p1 = Process(target=work, args=(mutex,))p2 = Process(target=work, args=(mutex,))p1.start()p2.start()模擬 搶票系統(tǒng):所有人都可以查看到還剩下多票,但是只有部分人能搶到票。
import json, random, time, os from multiprocessing import Process , Lockdef search():dic = json.load(open('db.txt',))print("%s查詢,車票剩余%s" % (os.getpid(),dic['count']))def get_ticket():dic = json.load(open('db.txt',))if dic['count'] > 0:dic['count'] -= 1time.sleep(random.randint(1,4))json.dump(dic,open('db.txt', 'w'))print('%s 購買成功' % os.getpid())print("車票剩下%s" % dic["count"])else:print("%s搶票失敗 " % os.getpid())def task(mutex):search()mutex.acquire()get_ticket()mutex.release()if __name__ == "__main__":mutex = Lock()for i in range(10):p = Process(target=task, args=(mutex,))p.start()?
?
?3.Join
1.join方法的作用是阻塞主進(jìn)程(擋住,無法執(zhí)行join以后的語句),專注執(zhí)行多線程。
2.多線程多join的情況下,依次執(zhí)行各線程的join方法,前頭一個(gè)結(jié)束了才能執(zhí)行后面一個(gè)。
3.無參數(shù),則等待到該線程結(jié)束,才開始執(zhí)行下一個(gè)線程的join。
4.設(shè)置參數(shù)后,則等待該線程這么長時(shí)間就不管它了(而該線程并沒有結(jié)束)。不管的意思就是可以執(zhí)行后面的主進(jìn)程了。
看例子:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import os, time from multiprocessing import Process, Lockdef work(mutex, t):mutex.acquire()print("%s Running at %s\n" % (os.getpid(),time.strftime("%H:%M:%S")))time.sleep(t)mutex.release()print("%s Stop at %s\n" % (os.getpid(),time.strftime("%H:%M:%S")))if __name__ == "__main__":print("Main Process Running at %s\n" % time.strftime("%H:%M:%S"))mutex = Lock()p1 = Process(target=work, args=(mutex,5))p2 = Process(target=work, args=(mutex,3))p1.start()p2.start()p1.join()print("Join1 finish at %s!\n" % time.strftime("%H:%M:%S"))p2.join()print("Join2 finish at %s!\n" % time.strftime("%H:%M:%S"))print("Main Process Stop at %s\n" % time.strftime("%H:%M:%S"))?此時(shí)沒有指定join的時(shí)長,所以,第一個(gè)進(jìn)程執(zhí)行完了,第一個(gè)join也相應(yīng)的結(jié)束了,
然后第二個(gè)進(jìn)程執(zhí)行完了,第二個(gè)join也結(jié)束了。
?
當(dāng)指定時(shí)間后分兩種情況,當(dāng)join的時(shí)間比進(jìn)程需要執(zhí)行的時(shí)間短時(shí),它就不再等待該進(jìn)行,直接執(zhí)行
將join()修改為p2.join(2)
將p2.join()修改為p2.join(2)
?
可以看到,進(jìn)程4768還未執(zhí)行完時(shí),join1等待2秒后直接不管它了,執(zhí)行了后面的打印語句
接著執(zhí)行了join2,等待2秒后,主進(jìn)程自己結(jié)束了自己(這里應(yīng)該是打印語句的原因,事實(shí)上并未直接的結(jié)束)
此時(shí)4768仍在運(yùn)行,直到自己結(jié)束。然后才是進(jìn)程11108
?
如果我將時(shí)間設(shè)置得比它需要的時(shí)間還長呢,那么它應(yīng)該在進(jìn)程運(yùn)行完時(shí)也結(jié)束
將P1.join()修改為p1.join(6)
將p2.join()修改為p2.join(4)
?可以看到Join1,join2都是在兩個(gè)進(jìn)行結(jié)束后自己結(jié)束了,并沒有等待設(shè)定的時(shí)間長度。
?
4.Daemon 守護(hù)進(jìn)程
守護(hù)進(jìn)程的作用:
一:守護(hù)進(jìn)程會在主進(jìn)程代碼執(zhí)行結(jié)束后就終止
二:守護(hù)進(jìn)程內(nèi)無法再開啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
下面看例子:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19from multiprocessing import Process import timedef work():print("Running...")time.sleep(3)print("Finish!")if __name__ == "__main__":p = Process(target=work,)#p.daemon = Truep.start()print("Main finish!")運(yùn)行結(jié)果:
?
將#p.daemon=True注釋掉,再運(yùn)行:
可以看到,主進(jìn)程結(jié)束了,子進(jìn)程也結(jié)束了, 并不會等待它運(yùn)行完。
?
守護(hù)進(jìn)程為什么在主進(jìn)程結(jié)束后就結(jié)束了呢?
首先,我們要明白守護(hù)進(jìn)程的作用:守護(hù)主進(jìn)程的一些功能,當(dāng)主進(jìn)程執(zhí)行完了,
也就是說它的功能已經(jīng)全部執(zhí)行完了,那么,守護(hù)進(jìn)程也就沒有繼續(xù)守護(hù)下去的
必要了,所以一旦主進(jìn)程結(jié)束了,守護(hù)進(jìn)程也就結(jié)束了。
?
?5.Semaphore 信號量
Semaphore制對共享資源的訪問數(shù)量,比如可以同時(shí)運(yùn)行的子進(jìn)程數(shù)量:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import multiprocessing import timedef worker(s):s.acquire()print(multiprocessing.current_process().name + "acquire");time.sleep(2)print(multiprocessing.current_process().name + "release\n");s.release()if __name__ == "__main__":s = multiprocessing.Semaphore(2)for i in range(5):p = multiprocessing.Process(target = worker, args=(s,))p.start()如上, 只有釋放一個(gè)進(jìn)程才有新的進(jìn)程進(jìn)來
?
將信號量改成大于等于進(jìn)程數(shù):
s = multiprocessing.Semaphore(5)可以看到,所有進(jìn)程一下全部啟動(dòng)了。
?
進(jìn)程間通信有一個(gè)人種方式,一種是隊(duì)列,一種是管道
6.隊(duì)列
?下面演示在一個(gè)進(jìn)程中往隊(duì)列中傳入數(shù)據(jù),用另一個(gè)進(jìn)程取出來:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import random,os from multiprocessing import Queue,Processdef put_q(q):print("Put...")for i in range(5):n = random.randint(1,5)print(n)q.put(n)def get_q(q):print("\nGet...")while True:if not q.empty():print("%s" % os.getpid(),q.get())else:breakif __name__ == "__main__":q = Queue(8)p1 = Process(target=put_q,args=(q,))p2 = Process(target=get_q,args=(q,))p1.start()p1.join() # 防止進(jìn)程2先啟動(dòng),隊(duì)列為空p2.start()?
這樣就實(shí)現(xiàn)了進(jìn)程間的通信
?
7.管道
Pipe方法返回(conn1, conn2)代表一個(gè)管道的兩個(gè)端。Pipe方法有duplex參數(shù), 如果duplex參數(shù)為True(默認(rèn)值),那么這個(gè)管道是全雙工模式, 也就是說conn1和conn2均可收發(fā)。duplex為False,conn1只負(fù)責(zé)接受消息,conn2只負(fù)責(zé)發(fā)送消息。 send和recv方法分別是發(fā)送和接受消息的方法。例如,在全雙工模式下, 可以調(diào)用conn1.send發(fā)送消息,conn1.recv接收消息。如果沒有消息可接收, recv方法會一直阻塞。如果管道已經(jīng)被關(guān)閉,那么recv方法會拋出EOFError。 事實(shí)上,管道的應(yīng)用與上面的隊(duì)列基本一致,對上面的代碼稍作修改: #!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import random,os from multiprocessing import Pipe,Processdef send_p(p):print("send...")for i in range(5):n = random.randint(1,5)print(n)p.send(n)def receive_p(p):print("\nReceive...")while True:print("%s" % os.getpid(),p.recv())if __name__ == "__main__":p = Pipe()p1 = Process(target=send_p,args=(p[0],))p2 = Process(target=receive_p,args=(p[1],))p1.start()p1.join() p2.start()運(yùn)行:
?
8.Pool 進(jìn)程池
?Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請求提交到pool中時(shí),
如果池還沒有滿,那么就會創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請求;
但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請求就會等待,
直到池中有進(jìn)程結(jié)束,才會創(chuàng)建新的進(jìn)程來它。
看例子:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/20import time from multiprocessing import Pool, Processdef work(msg):print(msg, 'is working\n')time.sleep(2)print(msg,'finish!\n')if __name__ == "__main__":pro = Process()pool = Pool(processes=3)for i in range(1,6):msg = "process %s" % ipool.apply_async(work,(msg,))pool.close()pool.join()# 阻塞主進(jìn)程,等待子進(jìn)程執(zhí)行完?運(yùn)行:
指定進(jìn)程池只有3個(gè)進(jìn)程,所以第四個(gè)進(jìn)程只有前面結(jié)束一個(gè)進(jìn)程時(shí)才能開始。
需要說明的是 pool.apply_async()是非阻塞的,pool.apply()則是阻塞的。看區(qū)別:
修改:
pool.apply(work,(msg,))?再次運(yùn)行:
可以看到,子進(jìn)程只能結(jié)束一個(gè)后都會運(yùn)行下一個(gè)進(jìn)程
?回調(diào)函數(shù):
回調(diào)函數(shù)指:進(jìn)程池中任何一個(gè)任務(wù)一旦處理完了,就立即告知主進(jìn)程:
我好了額,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個(gè)函數(shù)去處理該結(jié)果。
?
對上面的例子進(jìn)行修改:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/20import time, os from multiprocessing import Pool, Processdef work(msg):print(msg, 'is working\n')time.sleep(2)print(msg,'finish!\n')return msgdef plus(msg):if msg:msg = msg + '*plus*'print(msg)if __name__ == "__main__":pro = Process()pool = Pool(processes=3)for i in range(1,6):msg = "process %s" % ipool.apply_async(work,(msg,), callback=plus)# 回調(diào)函數(shù)pool.close()pool.join()?運(yùn)行:
可以看到一個(gè)進(jìn)程結(jié)果后,在開啟一個(gè)新的進(jìn)程到進(jìn)程池后,
主進(jìn)程又調(diào)用一個(gè)回調(diào)函數(shù)對該進(jìn)程的結(jié)果進(jìn)行了二次處理。
?
補(bǔ)充:
對于計(jì)算機(jī)來說,也并不能無限開啟進(jìn)程,通常比較好的情況是
進(jìn)程數(shù)等于計(jì)算機(jī)核數(shù)是比較好的,否則開多了可能會起到反作用
那么要怎么查看自己的計(jì)算機(jī)是幾核的呢?
?
posted on 2017-09-17 13:49 Andy_963 閱讀(...) 評論(...) 編輯 收藏轉(zhuǎn)載于:https://www.cnblogs.com/Andy963/p/7535378.html
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的python3 进程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 最全mysql的复制和读写分离
- 下一篇: luogu p1459 三值的排序