PYTHON之路(八)
http://www.cnblogs.com/alex3714/articles/5227251.html
Socket語法及相關
socket概念
A?network socket?is an endpoint of a connection across a?computer network. Today, most communication between computers is based on the?Internet Protocol; therefore most network sockets are?Internet sockets. More precisely, a socket is a?handle?(abstract reference) that a local program can pass to the networking?application programming interface?(API) to use the connection, for example "send this data on this socket". Sockets are internally often simply?integers, which identify which connection to use.
For example, to send "Hello, world!" via?TCP?to port 80 of the host with address 1.2.3.4, one might get a socket, connect it to the remote host, send the string, then close the socket:
A?socket API?is an?application programming interface?(API), usually provided by the?operating system, that allows application programs to control and use network sockets. Internet socket APIs are usually based on the?Berkeley sockets?standard. In the Berkeley sockets standard, sockets are a form of?file descriptor?(a?file?handle), due to the?Unix philosophy?that "everything is a file", and the analogies between sockets and files: you can read, write, open, and close both. In practice the differences mean the analogy is strained, and one instead use different interfaces (send and receive) on a socket. In?inter-process communication, each end will generally have its own socket, but these may use different APIs: they are abstracted by the network protocol.
A?socket address?is the combination of an?IP address?and a?port number, much like one end of a telephone connection is the combination of a?phone number?and a particular?extension. Sockets need not have an address (for example for only sending data), but if a program?binds?a socket to an address, the socket can be used to receive data sent to that address. Based on this address, internet sockets deliver incoming data packets to the appropriate application?process?or?thread.
Socket Families(地址簇)
socket.AF_UNIX unix本機進程間通信?
socket.AF_INET IPV4
socket.AF_INET6 ?IPV6
These constants represent the address (and protocol) families, used for the first argument to?socket(). If the?AF_UNIX?constant is not defined then this protocol is unsupported. More constants may be available depending on the system.
?
Socket Types
socket.SOCK_STREAM ?#for tcp
socket.SOCK_DGRAM ? #for udp?
socket.SOCK_RAW ? ? #原始套接字,普通的套接字無法處理ICMP、IGMP等網絡報文,而SOCK_RAW可以;其次,SOCK_RAW也可以處理特殊的IPv4報文;此外,利用原始套接字,可以通過IP_HDRINCL套接字選項由用戶構造IP頭。
socket.SOCK_RDM ?#是一種可靠的UDP形式,即保證交付數據報但不保證順序。SOCK_RAM用來提供對原始協議的低級訪問,在需要執行某些特殊操作時使用,如發送ICMP報文。SOCK_RAM通常僅限于高級用戶或管理員運行的程序使用。
socket.SOCK_SEQPACKET #廢棄了
These constants represent the socket types, used for the second argument to?socket(). More constants may be available depending on the system. (Only?SOCK_STREAM?and?SOCK_DGRAM?appear to be generally useful.)
Socket 方法
socket.socket(family=AF_INET,?type=SOCK_STREAM,?proto=0,?fileno=None)
Create a new socket using the given address family, socket type and protocol number. The address family should be?AF_INET?(the default),?AF_INET6,?AF_UNIX,?AF_CAN?or?AF_RDS. The socket type should beSOCK_STREAM?(the default),?SOCK_DGRAM,?SOCK_RAW?or perhaps one of the other?SOCK_?constants. The protocol number is usually zero and may be omitted or in the case where the address family is?AF_CAN?the protocol should be one of?CAN_RAW?or?CAN_BCM. If?fileno?is specified, the other arguments are ignored, causing the socket with the specified file descriptor to return. Unlike?socket.fromfd(),?fileno?will return the same socket and not a duplicate. This may help close a detached socket using?socket.close().
socket.socketpair([family[,?type[,?proto]]])
Build a pair of connected socket objects using the given address family, socket type, and protocol number. Address family, socket type, and protocol number are as for the?socket()?function above. The default family is?AF_UNIX?if defined on the platform; otherwise, the default is?AF_INET.
socket.create_connection(address[,?timeout[,?source_address]])
Connect to a TCP service listening on the Internet?address?(a 2-tuple?(host,?port)), and return the socket object. This is a higher-level function than?socket.connect(): if?host?is a non-numeric hostname, it will try to resolve it for both?AF_INET?and?AF_INET6, and then try to connect to all possible addresses in turn until a connection succeeds. This makes it easy to write clients that are compatible to both IPv4 and IPv6.
Passing the optional?timeout?parameter will set the timeout on the socket instance before attempting to connect. If no?timeout?is supplied, the global default timeout setting returned by?getdefaulttimeout()?is used.
If supplied,?source_address?must be a 2-tuple?(host,?port)?for the socket to bind to as its source address before connecting. If host or port are ‘’ or 0 respectively the OS default behavior will be used.
socket.getaddrinfo(host,?port,?family=0,?type=0,?proto=0,?flags=0) #獲取要連接的對端主機地址
sk.bind(address)
s.bind(address) 將套接字綁定到地址。address地址的格式取決于地址族。在AF_INET下,以元組(host,port)的形式表示地址。
sk.listen(backlog)
開始監聽傳入連接。backlog指定在拒絕連接之前,可以掛起的最大連接數量。
? ? ? backlog等于5,表示內核已經接到了連接請求,但服務器還沒有調用accept進行處理的連接個數最大為5
? ? ? 這個值不能無限大,因為要在內核中維護連接隊列
sk.setblocking(bool)
是否阻塞(默認True),如果設置False,那么accept和recv時一旦無數據,則報錯。
sk.accept()
接受連接并返回(conn,address),其中conn是新的套接字對象,可以用來接收和發送數據。address是連接客戶端的地址。
接收TCP 客戶的連接(阻塞式)等待連接的到來
sk.connect(address)
連接到address處的套接字。一般,address的格式為元組(hostname,port),如果連接出錯,返回socket.error錯誤。
sk.connect_ex(address)
同上,只不過會有返回值,連接成功時返回 0 ,連接失敗時候返回編碼,例如:10061
sk.close()
關閉套接字
sk.recv(bufsize[,flag])
接受套接字的數據。數據以字符串形式返回,bufsize指定最多可以接收的數量。flag提供有關消息的其他信息,通常可以忽略。
sk.recvfrom(bufsize[.flag])
與recv()類似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。
sk.send(string[,flag])
將string中的數據發送到連接的套接字。返回值是要發送的字節數量,該數量可能小于string的字節大小。即:可能未將指定內容全部發送。
sk.sendall(string[,flag])
將string中的數據發送到連接的套接字,但在返回之前會嘗試發送所有數據。成功返回None,失敗則拋出異常。
? ? ? 內部通過遞歸調用send,將所有內容發送出去。
sk.sendto(string[,flag],address)
將數據發送到套接字,address是形式為(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。該函數主要用于UDP協議。
sk.settimeout(timeout)
設置套接字操作的超時期,timeout是一個浮點數,單位是秒。值為None表示沒有超時期。一般,超時期應該在剛創建套接字時設置,因為它們可能用于連接的操作(如 client 連接最多等待5s )
sk.getpeername()
返回連接套接字的遠程地址。返回值通常是元組(ipaddr,port)。
sk.getsockname()
返回套接字自己的地址。通常是一個元組(ipaddr,port)
sk.fileno()
套接字的文件描述符
socket.sendfile(file,?offset=0,?count=None)
? ? ?發送文件 ,但目前多數情況下并無什么卵用。
SocketServer
The?socketserver?module simplifies the task of writing network servers.
There are four basic concrete server classes:
class?socketserver.TCPServer(server_address,?RequestHandlerClass,?bind_and_activate=True)This uses the Internet TCP protocol, which provides for continuous streams of data between the client and server. If?bind_and_activate?is true, the constructor automatically attempts to invoke?server_bind()?andserver_activate(). The other parameters are passed to the?BaseServer?base class.
class?socketserver.UDPServer(server_address,?RequestHandlerClass,?bind_and_activate=True)This uses datagrams, which are discrete packets of information that may arrive out of order or be lost while in transit. The parameters are the same as for?TCPServer.
class?socketserver.UnixStreamServer(server_address,?RequestHandlerClass,?bind_and_activate=True)class?socketserver.UnixDatagramServer(server_address,?RequestHandlerClass,bind_and_activate=True)These more infrequently used classes are similar to the TCP and UDP classes, but use Unix domain sockets; they’re not available on non-Unix platforms. The parameters are the same as for?TCPServer.
These four classes process requests?synchronously; each request must be completed before the next request can be started. This isn’t suitable if each request takes a long time to complete, because it requires a lot of computation, or because it returns a lot of data which the client is slow to process. The solution is to create a separate process or thread to handle each request; the?ForkingMixIn?and?ThreadingMixIn?mix-in classes can be used to support asynchronous behaviour.
There are five classes in an inheritance diagram, four of which represent synchronous servers of four types:
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
Note that?UnixDatagramServer?derives from?UDPServer, not from?UnixStreamServer?— the only difference between an IP and a Unix stream server is the address family, which is simply repeated in both Unix server classes.
class?socketserver.ForkingMixInclass?socketserver.ThreadingMixInForking and threading versions of each type of server can be created using these mix-in classes. For instance,?ThreadingUDPServer?is created as follows:
class ThreadingUDPServer(ThreadingMixIn, UDPServer):
pass
The mix-in class comes first, since it overrides a method defined in?UDPServer. Setting the various attributes also changes the behavior of the underlying server mechanism.
class?socketserver.ForkingTCPServerclass?socketserver.ForkingUDPServerclass?socketserver.ThreadingTCPServerclass?socketserver.ThreadingUDPServerThese classes are pre-defined using the mix-in classes.
?
?
?
Request Handler Objects
class?socketserver.BaseRequestHandler
This is the superclass of all request handler objects. It defines the interface, given below. A concrete request handler subclass must define a new?handle()?method, and can override any of the other methods. A new instance of the subclass is created for each request.
setup()Called before the?handle()?method to perform any initialization actions required. The default implementation does nothing.
handle()This function must do all the work required to service a request. The default implementation does nothing. Several instance attributes are available to it; the request is available as?self.request; the client address as?self.client_address; and the server instance as?self.server, in case it needs access to per-server information.
The type of?self.request?is different for datagram or stream services. For stream services,self.request?is a socket object; for datagram services,?self.request?is a pair of string and socket.
finish()Called after the?handle()?method to perform any clean-up actions required. The default implementation does nothing. If?setup()?raises an exception, this function will not be called.
?
?
?
socketserver.TCPServer?Example
server side
import socketserver
class MyTCPHandler(socketserver.BaseRequestHandler):
"""
The request handler class for our server.
It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
client.
"""
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print("{} wrote:".format(self.client_address[0]))
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())
if __name__ == "__main__":
HOST, PORT = "localhost", 9999
# Create the server, binding to localhost on port 9999
server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
client side
import socket
import sys
HOST, PORT = "localhost", 9999
data = " ".join(sys.argv[1:])
# Create a socket (SOCK_STREAM means a TCP socket)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Connect to server and send data
sock.connect((HOST, PORT))
sock.sendall(bytes(data + "\n", "utf-8"))
# Receive data from the server and shut down
received = str(sock.recv(1024), "utf-8")
finally:
sock.close()
print("Sent: {}".format(data))
print("Received: {}".format(received))
上面這個例子你會發現,依然不能實現多并發,哈哈,在server端做一下更改就可以了
把
server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)改成
?
server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler)
####################################################################################################################
異常處理
try:
xxxxx
except Exception as e:
xxxxxx
print(e)
else:
xxxxxx
finally:
xxxxx
Exception這個錯誤類是基類,大部分的錯誤信息都能抓到,但不是所有錯誤信息都能抓到,比如indention, syntax, keyboardinterrupt, etc.
自定義異常
?
斷言 assert --- > 條件判斷, 不符合就跳出,無法往下執行。上面例子因為在try里面,所以無論如何都執行finally
##########################################################################################
進程與線程
什么是線程(thread)?
線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發多個線程,每條線程并行執行不同的任務
A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.
Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.
If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.
Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.
On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.
Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.
Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).
什么是進程(process)?
An executing instance of a program is called a process.
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
進程與線程的區別?
1. Threads share the address space of the process that created it; processes have their own address space.
2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
4. New threads are easily created; new processes require duplication of the parent process.
5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
Python GIL(Global Interpreter Lock)
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行,擦。。。,那這還叫什么多線程呀?莫如此早的下結結論,聽我現場講。
首先需要明確的一點是GIL并不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這里要先明確一點:GIL并不是Python的特性,Python完全可以不依賴于GIL
這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf?
###############################################################################
線程直接調用
#!/usr/bin/env python
import threading
import time
def hi(num):
print("numbers: %s"%num)
time.sleep(3)
ali = []
for i in range(10):
t = threading.Thread(target=hi,args=[i,]) #生成線程
t.start() #執行線程
print(t.getName())
ali.append(t)
for x in ali:
x.join() #相當于讓這個線程執行完,等待線程執行完畢
print('the end.....')
還有線程繼承式調用
import threading
import time
class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num = num
def run(self):#定義每個線程要運行的函數
print("running on number:%s" %self.num)
time.sleep(3)
if __name__ == '__main__':
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
join and Daemon m.setDaemon(True) #將主線程設置為Daemon線程,它退出時,其它子線程會同時退出,不管是否執行完任務
import time
import threading
def run(n):
print('[%s]------running----\n' % n)
time.sleep(2)
print('--done--')
def main():
for i in range(5):
t = threading.Thread(target=run,args=[i,])
#time.sleep(1)
t.start()
#t.join(1)
print('starting thread', t.getName())
m = threading.Thread(target=main,args=[])
m.setDaemon(True) #將主線程設置為Daemon線程,它退出時,其它子線程會同時退出,不管是否執行完任務
m.start()
#m.join(timeout=3)
print("---main thread done----")
線程鎖(互斥鎖Mutex)
一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味著每個線程可以訪問同一份數據,此時,如果2個線程同時要修改同一份數據,會出現什么狀況?
import time
import threading
def addNum():
global num #在每個線程中都獲取這個全局變量
print('--get num:',num )
time.sleep(1)
num -=1 #對此公共變量進行-1操作
num = 100 #設定一個共享變量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有線程執行完畢
t.join()
print('final num:', num )
正常來講,這個num結果應該是0, 但在python 2.7上多運行幾次,會發現,最后打印出來的num結果不總是0,為什么每次運行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操作, 由于2個線程是并發同時運行的,所以2個線程很有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量后,結果就都是99。那怎么辦呢? 很簡單,每個線程在要修改公共數據時,為了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢并把鎖釋放掉后才能再訪問此數據。
加鎖版本:
import time
import threading
def addNum():
global num #在每個線程中都獲取這個全局變量
print('--get num:',num )
time.sleep(1)
lock.acquire() #修改數據前加鎖
num -=1 #對此公共變量進行-1操作
lock.release() #修改后釋放
num = 100 #設定一個共享變量
thread_list = []
lock = threading.Lock() #生成全局鎖
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有線程執行完畢
t.join()
print('final num:', num )
線程鎖和GIL區別: GIL保證同意時刻只有一個線程在運行,保證的是底層C代碼線程API的同一時刻只有一個線程運行,線程鎖保證python運行的內存空間數據的一致性,加了線程鎖后對數據的操作只有這一個線程能修改,變成串行又不是并行了。
RLock(遞歸鎖)
說白了就是在一個大鎖中還要再包含子鎖. 如果涉及到多個線程前后運行,都有獲取和釋放鎖的行為,也就是說有好幾把鎖的情況,用RLock. 不容易出錯,如果這時還用Lock,則可能系統搞混淆了。
其實都可以用RLock替代Lock
import threading,time
def run1():
print("grab the first part data")
lock.acquire()
global num
num +=1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2+=1
lock.release()
return num2
def run3():
lock.acquire() #要有三把鎖啊
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res,res2)
if __name__ == '__main__':
num,num2 = 0,0
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num,num2)
Semaphore(信號量)
互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。
import threading,time
def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s\n" %n)
semaphore.release()
if __name__ == '__main__':
num= 0
semaphore = threading.BoundedSemaphore(5) #最多允許5個線程同時運行
for i in range(20):
t = threading.Thread(target=run,args=(i,))
t.start()
while threading.active_count() != 1:
pass #print threading.active_count()
else:
print('----all threads done---')
print(num)
Events
An event is a simple synchronization object;
the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set
event.wait()
# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.
通過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則
import threading,time
import random
def light():
if not event.isSet():
event.set() #wait就不阻塞 #綠燈狀態
count = 0
while True:
if count < 10:
print('\033[42;1m--green light on---\033[0m')
elif count <13:
print('\033[43;1m--yellow light on---\033[0m')
elif count <20:
if event.isSet():
event.clear()
print('\033[41;1m--red light on---\033[0m')
else:
count = 0
event.set() #打開綠燈
time.sleep(1)
count +=1
def car(n):
while 1:
time.sleep(1)
if event.isSet(): #綠燈
print("car [%s] is running.." % n)
else:
print("car [%s] is waiting for the red light.." %n)
event.wait()
if __name__ == '__main__':
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()
#############################################################################
多進程multiprocessing
multiprocessing?is a package that supports spawning processes using an API similar to the?threading?module. The?multiprocessing?package offers both local and remote concurrency,?effectively side-stepping the?Global Interpreter Lock?by using subprocesses instead of threads. Due to this, the?multiprocessing?module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
from multiprocessing import Process
import time
import os
def f(name):
time.sleep(2)
print('hello', name)
def info(title):
print(title)
print('parent process,',os.getppid())
print('current process id,',os.getpid())
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
info('\033[32;1mmain process line\033[0m')
p1 = Process(target=info, args=('bob',))
p1.start()
p1.join()
進程間通訊
不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:
Queues
使用方法跟threading里的queue差不多
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
Pipes
The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
Managers
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.append(1)
print(l)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
進程同步
Without using the lock output from the different processes is liable to get all mixed up.
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
apply
apply_async
from multiprocessing import Process,Pool
import time
def Foo(i):
time.sleep(2)
return i+100
def Bar(arg):
print('-->exec done:',arg)
pool = Pool(5)
for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)
#pool.apply(func=Foo, args=(i,))
print('end')
pool.close()
pool.join()#進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
######################################################################################
轉載于:https://www.cnblogs.com/joey251744647/p/5295355.html
總結
以上是生活随笔為你收集整理的PYTHON之路(八)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: myeclipse 怎么安装与激活
- 下一篇: websocket python爬虫_p