Python zmq的三种简单模式
ZMQ (以下 ZeroMQ 簡(jiǎn)稱 ZMQ)是一個(gè)簡(jiǎn)單好用的傳輸層,像框架一樣的一個(gè) socket library,他使得 Socket 編程更加簡(jiǎn)單、簡(jiǎn)潔和性能更高。
是一個(gè)消息處理隊(duì)列庫(kù),可在多個(gè)線程、內(nèi)核和主機(jī)盒之間彈性伸縮。ZMQ 的明確目標(biāo)是“成為標(biāo)準(zhǔn)網(wǎng)絡(luò)協(xié)議棧的一部分,之后進(jìn)入 Linux 內(nèi)核”。
ZMQ 讓編寫(xiě)高性能網(wǎng)絡(luò)應(yīng)用程序極為簡(jiǎn)單和有趣。
ZeroMQ并不是一個(gè)對(duì)socket的封裝,不能用它去實(shí)現(xiàn)已有的網(wǎng)絡(luò)協(xié)議。
?
它有自己的模式,不同于更底層的點(diǎn)對(duì)點(diǎn)通訊模式。
?
它有比tcp協(xié)議更高一級(jí)的協(xié)議。(當(dāng)然ZeroMQ不一定基于TCP協(xié)議,它也可以用于進(jìn)程間和進(jìn)程內(nèi)通訊)
?
zeromq 并不是類似rabbitmq消息列隊(duì),它實(shí)際上只一個(gè)消息列隊(duì)組件,一個(gè)庫(kù)。
?
zeromq的幾種模式
Request-Reply模式(請(qǐng)求響應(yīng)模型):
客戶端在請(qǐng)求后,服務(wù)端必須回響應(yīng)
由客戶端發(fā)起請(qǐng)求,并等待服務(wù)端響應(yīng)請(qǐng)求。從客戶端端來(lái)看,一定是一對(duì)對(duì)發(fā)收配對(duì)的;
反之,在服務(wù)端一定是收發(fā)對(duì)。服務(wù)端和客戶端都可以是1:N的模型。通常把1認(rèn)為是server,N認(rèn)為是Client。
?
ZMQ可以很好的支持路由功能(實(shí)現(xiàn)路由功能的組件叫做Device),把1:N擴(kuò)展為N:M(只需要加入若干路由節(jié)點(diǎn))。
從這個(gè)模型看,更底層的端點(diǎn)地址是對(duì)上層隱藏的。每個(gè)請(qǐng)求都隱含回應(yīng)地址,而應(yīng)用則不關(guān)心它
?
?
?
服務(wù)端:
# sever.py
import zmq import sys context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True:try:print("wait for client ...")message = socket.recv()print("message from client:", message.decode('utf-8'))socket.send(message)except Exception as e:print('異常:',e)sys.exit()?
?
客戶端:
#client.py
import zmq import sys context = zmq.Context() print("Connecting to server...") socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") while True:input1 = input("請(qǐng)輸入內(nèi)容:").strip()if input1 == 'b':sys.exit()socket.send(input1.encode('utf-8'))message = socket.recv()print("Received reply: ", message.decode('utf-8'))?
?
?
Publish-Subscribe模式(發(fā)布訂閱模型):
廣播所有client,沒(méi)有隊(duì)列緩存,斷開(kāi)連接數(shù)據(jù)將永遠(yuǎn)丟失。client可以進(jìn)行數(shù)據(jù)過(guò)濾。
?
服務(wù)端
server.py
?
import zmqimport time
import sys
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
while True:
msg = input("請(qǐng)輸入要發(fā)布的信息:").strip()
if msg == 'b':
sys.exit()
socket.send(msg.encode('utf-8'))
time.sleep(1)
?
?
?
客戶端1
client1.py
import zmqcontext = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,''.encode('utf-8')) # 接收所有消息
while True:
response = socket.recv().decode('utf-8');
print("response: %s" % response) ?
?
?
?
?客戶端2
client2.py
import zmqcontext = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,'123'.encode('utf-8')) # 消息過(guò)濾 只接受123開(kāi)頭的信息
while True:
response = socket.recv().decode('utf-8');
print("response: %s" % response)
?
?
?運(yùn)行結(jié)果:
發(fā)布端發(fā)布以下信息(注意:b是關(guān)閉發(fā)布端的指令):
請(qǐng)輸入要發(fā)布的信息:hello python 請(qǐng)輸入要發(fā)布的信息:大唐不夜城 請(qǐng)輸入要發(fā)布的信息:123435678 請(qǐng)輸入要發(fā)布的信息:123我愛(ài)你 請(qǐng)輸入要發(fā)布的信息:廣播模式,發(fā)布端只關(guān)心發(fā)布信息,不關(guān)心訂閱端是否接收 請(qǐng)輸入要發(fā)布的信息:b?
?
?客戶端1接收的信息:
response: hello python response: 大唐不夜城 response: 123435678 response: 123我愛(ài)你 response: 廣播模式,發(fā)布端只關(guān)心發(fā)布信息,不關(guān)心訂閱端是否接收?
?
?客戶端2接收的信息:
response: 123435678 response: 123我愛(ài)你?
?
Parallel Pipeline模式(管道模型):?
? 由三部分組成,push進(jìn)行數(shù)據(jù)推送,work進(jìn)行數(shù)據(jù)緩存,pull進(jìn)行數(shù)據(jù)競(jìng)爭(zhēng)獲取處理。區(qū)別于Publish-Subscribe存在一個(gè)數(shù)據(jù)緩存和處理負(fù)載。
當(dāng)連接被斷開(kāi),數(shù)據(jù)不會(huì)丟失,重連后數(shù)據(jù)繼續(xù)發(fā)送到對(duì)端。
server.py
import zmq import timecontext = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5557")while True:msg = input("請(qǐng)輸入要發(fā)布的信息:").strip()socket.send(msg.encode('utf-8'))print("已發(fā)送")time.sleep(1)?
?
worker.py
import zmq context = zmq.Context() receive = context.socket(zmq.PULL) receive.connect('tcp://127.0.0.1:5557') sender = context.socket(zmq.PUSH) sender.connect('tcp://127.0.0.1:5558')while True:data = receive.recv()print("正在轉(zhuǎn)發(fā)...")sender.send(data)?
?
client.py
import zmq context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind("tcp://*:5558")while True:response = socket.recv().decode('utf-8')print("response: %s" % response)?
結(jié)果:
server端:
請(qǐng)輸入要發(fā)布的信息:hello python 已發(fā)送 請(qǐng)輸入要發(fā)布的信息:王者不可阻擋 已發(fā)送 請(qǐng)輸入要發(fā)布的信息:123abc 已發(fā)送 請(qǐng)輸入要發(fā)布的信息:?
?
?work端
正在轉(zhuǎn)發(fā)... 正在轉(zhuǎn)發(fā)... 正在轉(zhuǎn)發(fā)...?
?
?client端:(接收第二條信息后斷開(kāi),斷開(kāi)后重新收到的信息)
response: 123abc?
轉(zhuǎn)載于:https://www.cnblogs.com/yunwangjun-python-520/p/10777375.html
總結(jié)
以上是生活随笔為你收集整理的Python zmq的三种简单模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: oracle查看被锁的表和解锁
- 下一篇: MSD是国家发行的吗