日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python3 进程

發(fā)布時(shí)間:2025/7/25 python 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python3 进程 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
python3 進(jìn)程

1.開進(jìn)程的兩種方式:

1. 使用內(nèi)置的進(jìn)程

#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/17from multiprocessing import Process import osdef get_id(name):print(name,"Main process:",os.getppid(),"current process;", os.getpid())P1 = Process(target=get_id, args=('andy',)) P2 = Process(target = get_id, args=("Jack", ))if __name__ == "__main__":P2.start()P1.start()print("主進(jìn)程")

?2. 自定義進(jìn)程類:

from multiprocessing import Process import osclass Custom_Process(Process):def __init__(self, name):super().__init__()self.name = namedef run(self):print(self.name, "Main process:", os.getppid(), "current process;", os.getpid())if __name__ == "__main__":P1 = Custom_Process('andy')P2 = Custom_Process("jack")P1.start()P2.start()print("主進(jìn)程")

?事實(shí)上在調(diào)用P1.start時(shí),系統(tǒng)調(diào)用了Process類的run方法,在我們直接調(diào)用Process類時(shí),

我們需要指定target(即要進(jìn)行的操作,參數(shù)args),那么定制后我們重寫了run方法,即重寫的

run方法。

在Custom_Process類中我用到了

super().__init__()

?這是重寫父類的方法之一,另一種方法是:

Parent.__init__(self)

在這里就是:Process.__init__()

關(guān)于super().__init__()事實(shí)上并不是調(diào)用父類,而是尋找繼承順序中的下一個(gè)

具體可以參考:Python’s super() considered super!

?

下面是一個(gè)應(yīng)用進(jìn)程的例子,之前在寫 cs模型 ? ?時(shí)有:

server.listen(5)# 設(shè)置可以接受的連接數(shù)量

?雖然這里可以接受5個(gè)鏈接,但事實(shí)上由于功能上并未實(shí)現(xiàn)

所以每次只有一個(gè)鏈接可以正常進(jìn)行通信,其他的鏈接都必須

等到之前的鏈接完成才行。

下面著手改進(jìn):

server

#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/16import socket,json, struct, subprocess from multiprocessing import ProcessBUFF_SIZE = 1024 IP_PORT = ("127.0.0.1", 8081)server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)# 重用端口 server.bind(IP_PORT) server.listen(5)# 設(shè)置可以接受的連接數(shù)量def communicate(conn, client_addr):while True:# 內(nèi)層循環(huán)為通信循環(huán)msg = conn.recv(BUFF_SIZE)if not msg:breakpipes = subprocess.Popen(msg.decode("utf-8"),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)error = pipes.stderr.read()if error:print("Error:",error)response_msg = errorelse:response_msg = pipes.stdout.read()header = {'data_size':len(response_msg)}# 數(shù)據(jù)長度header_json = json.dumps(header)#序列化header_json_byte = bytes(header_json,encoding="utf-8")conn.send(struct.pack('i',len(header_json_byte)))#先發(fā)送報(bào)頭長度,僅包含數(shù)據(jù)長度, 這里的i指int類型conn.send(header_json_byte)# 再發(fā)送報(bào)頭conn.sendall(response_msg)# 正式的信息print("Request from:",client_addr, "Command:",msg)conn.close() if __name__ == "__main__":while True:# 外層循環(huán)為鏈接循環(huán)conn, client_addr = server.accept()p = Process(target=communicate, args=(conn, client_addr))p.start()server.close()

?client未變:

#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/16import socket, json, structBUFF_SIZE = 1024 IP_PORT = ("127.0.0.1", 8081)client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(IP_PORT)while True:msg = input(">>:").strip().encode("utf-8")if not msg:breakclient.send(msg)header = client.recv(4)print("Header:",struct.unpack("i", header))header_length = struct.unpack('i', header)[0]print("Header_length:", header_length)header_json = json.loads(client.recv(header_length).decode("utf-8"))data_size = header_json['data_size']print("Data_size:",data_size)recv_size = 0recv_data = b''while recv_size < data_size:recv_data += client.recv(BUFF_SIZE)recv_size += len(recv_data)print(recv_data.decode("gbk"))client.close()

?看下運(yùn)行結(jié)果,這里只開了兩個(gè)客戶端,5個(gè)同樣的道理:

?

2.LOCK 互斥鎖:

import os, time from multiprocessing import Process, Lockdef work(mutex):mutex.acquire()print("%d is working..." % os.getpid())time.sleep(2)print("%d is done!" % os.getpid())mutex.release()if __name__ == "__main__":mutex = Lock()p1 = Process(target=work, args=(mutex,))p2 = Process(target=work, args=(mutex,))p1.start()p2.start()

模擬 搶票系統(tǒng):所有人都可以查看到還剩下多票,但是只有部分人能搶到票。

import json, random, time, os from multiprocessing import Process , Lockdef search():dic = json.load(open('db.txt',))print("%s查詢,車票剩余%s" % (os.getpid(),dic['count']))def get_ticket():dic = json.load(open('db.txt',))if dic['count'] > 0:dic['count'] -= 1time.sleep(random.randint(1,4))json.dump(dic,open('db.txt', 'w'))print('%s 購買成功' % os.getpid())print("車票剩下%s" % dic["count"])else:print("%s搶票失敗 " % os.getpid())def task(mutex):search()mutex.acquire()get_ticket()mutex.release()if __name__ == "__main__":mutex = Lock()for i in range(10):p = Process(target=task, args=(mutex,))p.start()

?

?

?3.Join

1.join方法的作用是阻塞主進(jìn)程(擋住,無法執(zhí)行join以后的語句),專注執(zhí)行多線程。

2.多線程多join的情況下,依次執(zhí)行各線程的join方法,前頭一個(gè)結(jié)束了才能執(zhí)行后面一個(gè)。

3.無參數(shù),則等待到該線程結(jié)束,才開始執(zhí)行下一個(gè)線程的join。

4.設(shè)置參數(shù)后,則等待該線程這么長時(shí)間就不管它了(而該線程并沒有結(jié)束)。不管的意思就是可以執(zhí)行后面的主進(jìn)程了。

看例子:

#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import os, time from multiprocessing import Process, Lockdef work(mutex, t):mutex.acquire()print("%s Running at %s\n" % (os.getpid(),time.strftime("%H:%M:%S")))time.sleep(t)mutex.release()print("%s Stop at %s\n" % (os.getpid(),time.strftime("%H:%M:%S")))if __name__ == "__main__":print("Main Process Running at %s\n" % time.strftime("%H:%M:%S"))mutex = Lock()p1 = Process(target=work, args=(mutex,5))p2 = Process(target=work, args=(mutex,3))p1.start()p2.start()p1.join()print("Join1 finish at %s!\n" % time.strftime("%H:%M:%S"))p2.join()print("Join2 finish at %s!\n" % time.strftime("%H:%M:%S"))print("Main Process Stop at %s\n" % time.strftime("%H:%M:%S"))

?此時(shí)沒有指定join的時(shí)長,所以,第一個(gè)進(jìn)程執(zhí)行完了,第一個(gè)join也相應(yīng)的結(jié)束了,

然后第二個(gè)進(jìn)程執(zhí)行完了,第二個(gè)join也結(jié)束了。

?

當(dāng)指定時(shí)間后分兩種情況,當(dāng)join的時(shí)間比進(jìn)程需要執(zhí)行的時(shí)間短時(shí),它就不再等待該進(jìn)行,直接執(zhí)行

將join()修改為p2.join(2)

將p2.join()修改為p2.join(2)

?

可以看到,進(jìn)程4768還未執(zhí)行完時(shí),join1等待2秒后直接不管它了,執(zhí)行了后面的打印語句

接著執(zhí)行了join2,等待2秒后,主進(jìn)程自己結(jié)束了自己(這里應(yīng)該是打印語句的原因,事實(shí)上并未直接的結(jié)束)

此時(shí)4768仍在運(yùn)行,直到自己結(jié)束。然后才是進(jìn)程11108

?

如果我將時(shí)間設(shè)置得比它需要的時(shí)間還長呢,那么它應(yīng)該在進(jìn)程運(yùn)行完時(shí)也結(jié)束

將P1.join()修改為p1.join(6)

將p2.join()修改為p2.join(4)

?可以看到Join1,join2都是在兩個(gè)進(jìn)行結(jié)束后自己結(jié)束了,并沒有等待設(shè)定的時(shí)間長度。

?

4.Daemon 守護(hù)進(jìn)程

守護(hù)進(jìn)程的作用:

一:守護(hù)進(jìn)程會在主進(jìn)程代碼執(zhí)行結(jié)束后就終止

二:守護(hù)進(jìn)程內(nèi)無法再開啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

下面看例子:

#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19from multiprocessing import Process import timedef work():print("Running...")time.sleep(3)print("Finish!")if __name__ == "__main__":p = Process(target=work,)#p.daemon = Truep.start()print("Main finish!")

運(yùn)行結(jié)果:

?

將#p.daemon=True注釋掉,再運(yùn)行:

可以看到,主進(jìn)程結(jié)束了,子進(jìn)程也結(jié)束了, 并不會等待它運(yùn)行完。

?

守護(hù)進(jìn)程為什么在主進(jìn)程結(jié)束后就結(jié)束了呢?

首先,我們要明白守護(hù)進(jìn)程的作用:守護(hù)主進(jìn)程的一些功能,當(dāng)主進(jìn)程執(zhí)行完了,

也就是說它的功能已經(jīng)全部執(zhí)行完了,那么,守護(hù)進(jìn)程也就沒有繼續(xù)守護(hù)下去的

必要了,所以一旦主進(jìn)程結(jié)束了,守護(hù)進(jìn)程也就結(jié)束了。

?

?5.Semaphore 信號量

Semaphore制對共享資源的訪問數(shù)量,比如可以同時(shí)運(yùn)行的子進(jìn)程數(shù)量:

#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import multiprocessing import timedef worker(s):s.acquire()print(multiprocessing.current_process().name + "acquire");time.sleep(2)print(multiprocessing.current_process().name + "release\n");s.release()if __name__ == "__main__":s = multiprocessing.Semaphore(2)for i in range(5):p = multiprocessing.Process(target = worker, args=(s,))p.start()

如上, 只有釋放一個(gè)進(jìn)程才有新的進(jìn)程進(jìn)來

?

將信號量改成大于等于進(jìn)程數(shù):

s = multiprocessing.Semaphore(5)

可以看到,所有進(jìn)程一下全部啟動(dòng)了。

?

進(jìn)程間通信有一個(gè)人種方式,一種是隊(duì)列,一種是管道

6.隊(duì)列

?下面演示在一個(gè)進(jìn)程中往隊(duì)列中傳入數(shù)據(jù),用另一個(gè)進(jìn)程取出來:

#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import random,os from multiprocessing import Queue,Processdef put_q(q):print("Put...")for i in range(5):n = random.randint(1,5)print(n)q.put(n)def get_q(q):print("\nGet...")while True:if not q.empty():print("%s" % os.getpid(),q.get())else:breakif __name__ == "__main__":q = Queue(8)p1 = Process(target=put_q,args=(q,))p2 = Process(target=get_q,args=(q,))p1.start()p1.join() # 防止進(jìn)程2先啟動(dòng),隊(duì)列為空p2.start()

?

這樣就實(shí)現(xiàn)了進(jìn)程間的通信

?

7.管道

Pipe方法返回(conn1, conn2)代表一個(gè)管道的兩個(gè)端。Pipe方法有duplex參數(shù), 如果duplex參數(shù)為True(默認(rèn)值),那么這個(gè)管道是全雙工模式, 也就是說conn1和conn2均可收發(fā)。duplex為False,conn1只負(fù)責(zé)接受消息,conn2只負(fù)責(zé)發(fā)送消息。 send和recv方法分別是發(fā)送和接受消息的方法。例如,在全雙工模式下, 可以調(diào)用conn1.send發(fā)送消息,conn1.recv接收消息。如果沒有消息可接收, recv方法會一直阻塞。如果管道已經(jīng)被關(guān)閉,那么recv方法會拋出EOFError。 事實(shí)上,管道的應(yīng)用與上面的隊(duì)列基本一致,對上面的代碼稍作修改: #!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import random,os from multiprocessing import Pipe,Processdef send_p(p):print("send...")for i in range(5):n = random.randint(1,5)print(n)p.send(n)def receive_p(p):print("\nReceive...")while True:print("%s" % os.getpid(),p.recv())if __name__ == "__main__":p = Pipe()p1 = Process(target=send_p,args=(p[0],))p2 = Process(target=receive_p,args=(p[1],))p1.start()p1.join() p2.start()

運(yùn)行:

?

8.Pool 進(jìn)程池

?Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請求提交到pool中時(shí),

如果池還沒有滿,那么就會創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請求;

但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請求就會等待,

直到池中有進(jìn)程結(jié)束,才會創(chuàng)建新的進(jìn)程來它。

看例子:

#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/20import time from multiprocessing import Pool, Processdef work(msg):print(msg, 'is working\n')time.sleep(2)print(msg,'finish!\n')if __name__ == "__main__":pro = Process()pool = Pool(processes=3)for i in range(1,6):msg = "process %s" % ipool.apply_async(work,(msg,))pool.close()pool.join()# 阻塞主進(jìn)程,等待子進(jìn)程執(zhí)行完

?運(yùn)行:

指定進(jìn)程池只有3個(gè)進(jìn)程,所以第四個(gè)進(jìn)程只有前面結(jié)束一個(gè)進(jìn)程時(shí)才能開始。

需要說明的是 pool.apply_async()是非阻塞的,pool.apply()則是阻塞的。看區(qū)別:

修改:

pool.apply(work,(msg,))

?再次運(yùn)行:

可以看到,子進(jìn)程只能結(jié)束一個(gè)后都會運(yùn)行下一個(gè)進(jìn)程

?回調(diào)函數(shù):

回調(diào)函數(shù)指:進(jìn)程池中任何一個(gè)任務(wù)一旦處理完了,就立即告知主進(jìn)程:

我好了額,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個(gè)函數(shù)去處理該結(jié)果。

?

對上面的例子進(jìn)行修改:

#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/20import time, os from multiprocessing import Pool, Processdef work(msg):print(msg, 'is working\n')time.sleep(2)print(msg,'finish!\n')return msgdef plus(msg):if msg:msg = msg + '*plus*'print(msg)if __name__ == "__main__":pro = Process()pool = Pool(processes=3)for i in range(1,6):msg = "process %s" % ipool.apply_async(work,(msg,), callback=plus)# 回調(diào)函數(shù)pool.close()pool.join()

?運(yùn)行:

可以看到一個(gè)進(jìn)程結(jié)果后,在開啟一個(gè)新的進(jìn)程到進(jìn)程池后,

主進(jìn)程又調(diào)用一個(gè)回調(diào)函數(shù)對該進(jìn)程的結(jié)果進(jìn)行了二次處理。

?

補(bǔ)充:

對于計(jì)算機(jī)來說,也并不能無限開啟進(jìn)程,通常比較好的情況是

進(jìn)程數(shù)等于計(jì)算機(jī)核數(shù)是比較好的,否則開多了可能會起到反作用

那么要怎么查看自己的計(jì)算機(jī)是幾核的呢?

?

posted on 2017-09-17 13:49 Andy_963 閱讀(...) 評論(...) 編輯 收藏

轉(zhuǎn)載于:https://www.cnblogs.com/Andy963/p/7535378.html

《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的python3 进程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。