日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python zmq 讲解

發布時間:2025/3/15 python 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python zmq 讲解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

安裝 pip install pyzmq-18.0.1

?

1.?請求應答模式(Request-Reply)(rep 和 req)

??消息雙向的,有來有往,req端請求的消息,rep端必須答復給req端

2.?訂閱發布模式?(pub 和?sub)

??消息單向的,有去無回的。可按照發布端可發布制定主題的消息,訂閱端可訂閱喜歡的主題,訂閱端只會收到自己已經訂閱的主題。發布端發布一條消息,可被多個訂閱端同事收到。

3.?push?pull模式

消息單向的,也是有去無回的。push的任何一個消息,始終只會有一個pull端收到消息.

后續的代理模式和路由模式等都是在三種基本模式上面的擴展或變異。

?

1.請求回應模型。由請求端發起請求,并等待回應端回應請求。從請求端來看,一定是一對對收發配對的;

反之,在回應端一定是發收對。請求端和回應端都可以是1:N的模型。通常把1認為是server,N認為是Client。

0MQ可以很好的支持路由功能(實現路由功能的組件叫做Device),把1:N擴展為N:M(只需要加入若干路由節點)。

從這個模型看,更底層的端點地址是對上層隱藏的。每個請求都隱含回應地址,而應用則不關心它。

2.發布訂閱模型。這個模型里,發布端是單向只發送數據的,且不關心是否把全部的信息都發送給訂閱者。

如果發布端開始發布信息的時候,訂閱端尚未連接上,這些信息直接丟棄。

不過一旦訂閱端連接上來,中間會保證沒有信息丟失。

同樣,訂閱端則只負責接收,而不能反饋。

如果發布端和訂閱端需要交互(比如要確認訂閱者是否已經連接上),則使用額外的socket采用請求回應模型滿足這個需求。

3.管道模型。這個模型里,管道是單向的,從PUSH端單向的向PULL端單向的推送數據流。

?

server為REP模式,等待消息,client為REQ模式,向server請求消息。

一個最簡單的例子:

server.py

import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") message = socket.recv() print "message from client:", message # Send reply back to client socket.send("World")

client.py

import zmq context = zmq.Context() print "Connecting to server..." socket = context.socket(zmq.REQ) socket.connect ("tcp://localhost:5555") socket.send ("Hello") message = socket.recv() print "Received reply: ", message

一個線程中如果有多個sokect,同時需要收發數據時,zmq提供polling sockets實現,不用在send()或者recv()時阻塞socket。

下面是一個在recv()端接受信息的poller()輪詢接受代碼。其中socks = dict(poller.poll(1000))中的1000位延遲時間。
?

#!/usr/bin/python # -*- coding: utf-8 import zmq import random import time from multiprocessing import Processdef server_push(port="5556"):context = zmq.Context()socket = context.socket(zmq.PUSH)socket.bind("tcp://*:%s" % port)print "Running server on port: ", port# serves only 5 request and diesfor reqnum in range(10):if reqnum < 6:socket.send("Continue")else:socket.send("Exit")breaktime.sleep (1)def server_pub(port="5558"):context = zmq.Context()socket = context.socket(zmq.PUB)socket.bind("tcp://*:%s" % port)publisher_id = random.randrange(0,9999)print "Running server on port: ", port# serves only 5 request and diesfor reqnum in range(10):# Wait for next request from clienttopic = random.randrange(8,10)messagedata = "server#%s" % publisher_idprint "%s %s" % (topic, messagedata)socket.send("%d %s" % (topic, messagedata))time.sleep(1) def client(port_push, port_sub):context = zmq.Context()socket_pull = context.socket(zmq.PULL)socket_pull.connect ("tcp://localhost:%s" % port_push)print "Connected to server with port %s" % port_pushsocket_sub = context.socket(zmq.SUB)socket_sub.connect ("tcp://localhost:%s" % port_sub)socket_sub.setsockopt(zmq.SUBSCRIBE, "9")#zmq.SUBSCRIBE創建一個消息過濾標志,訂閱以9為前綴的消息print "Connected to publisher with port %s" % port_sub# 初始化Pollerpoller = zmq.Poller()poller.register(socket_pull, zmq.POLLIN)poller.register(socket_sub, zmq.POLLIN)# Work on requests from both server and publisher#如果設置為POLLIN則刷新recv,與之對應的是POLLOUT刷新send發送事件,也可以同時設置兩個標志should_continue = Truewhile should_continue:socks = dict(poller.poll())if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:message = socket_pull.recv()print "Recieved control command: %s" % messageif message == "Exit":print "Recieved exit command, client will stop recieving messages"should_continue = Falseif socket_sub in socks and socks[socket_sub] == zmq.POLLIN:string = socket_sub.recv()topic, messagedata = string.split()#Python split()通過指定分隔符對字符串進行切片,如果參數num 有指定值,則僅分隔 num 個子字符串#str.split(str="", num=string.count(str)).# str -- 分隔符,默認為所有的空字符,包括空格、換行(\n)、制表符(\t)等,num -- 分割次數print "Processing ... ", topic, messagedata if __name__ == "__main__":# Now we can run a few serversserver_push_port = "5556"server_pub_port = "5558"Process(target=server_push, args=(server_push_port,)).start()Process(target=server_pub, args=(server_pub_port,)).start()Process(target=client, args=(server_push_port,server_pub_port,)).start()

結果:

Running server on port: 5556 Running server on port: 5558 8 server#5230 Connected to server with port 5556 Connected to publisher with port 5558 Recieved control command: Continue 8 server#5230 Recieved control command: Continue 9 server#5230 Processing ... 9 server#5230 Recieved control command: Continue 9 server#5230 Processing ... 9 server#5230 Recieved control command: Continue 9 server#5230 Processing ... 9 server#5230 Recieved control command: Continue 9 server#5230 Processing ... 9 server#5230 Recieved control command: Continue 9 server#5230 Processing ... 9 server#5230 Recieved control command: Exit Recieved exit command, client will stop recieving messages 8 server#5230 9 server#5230 9 server#5230Process finished with exit code 0

from zmq.auth.certs import create_certificates?

創建證書

public_key, private_key = create_certificates(tmp_key_dir, id)


?

from zmq.auth.ioloop import IOLoopAuthenticator

IO循環驗證程序

?

from zmq.auth.certs import load_certificate

讀取證書

server_secret_file = os.path.join(secret_keys_dir, 'beeswarm_server.pri') server_public, server_secret = load_certificate(server_secret_file) drone_data_inbound = beeswarm.shared.zmq_context.socket(zmq.PULL) drone_data_inbound.curve_secretkey = server_secret drone_data_inbound.curve_publickey = server_public drone_data_inbound.curve_server = True drone_data_inbound.bind('tcp://*:{0}'.format(self.config['network']['zmq_port']))

?

ZMQ開源源碼實例

from gevent import Greenlet import zmq.green as zmq from zmq.auth.certs import create_certificatesdef initcontext = beeswarm.shared.zmq_contextself.config_commands = context.socket(zmq.REP)self.enabled = Truedef runself.config_commands.bind(SocketNames.CONFIG_COMMANDS.value)poller = zmq.Poller()poller.register(self.config_commands, zmq.POLLIN)while self.enabled:socks = dict(poller.poll(500))if self.config_commands in socks and socks[self.config_commands] == zmq.POLLIN:self._handle_commands()def _handle_commands(self):msg = self.config_commands.recv()if ' ' in msg:cmd, data = msg.split(' ', 1)else:cmd = msglogger.debug('Received command: {0}'.format(cmd))if cmd == Messages.SET_CONFIG_ITEM.value: #SET 重新設置配置文件self._handle_command_set(data)self.config_commands.send('{0} {1}'.format(Messages.OK.value, '{}'))elif cmd == Messages.GET_CONFIG_ITEM.value: #GET 獲取配置文件某KEY值value = self._handle_command_get(data)self.config_commands.send('{0} {1}'.format(Messages.OK.value, value))elif cmd == Messages.GET_ZMQ_KEYS.value: #返回客戶端證書self._handle_command_getkeys(data)elif cmd == Messages.DELETE_ZMQ_KEYS.value: #刪除客戶端證書self._remove_zmq_keys(data)self.config_commands.send('{0} {1}'.format(Messages.OK.value, '{}'))else:logger.error('Unknown command received: {0}'.format(cmd))self.config_commands.send(Messages.FAIL.value)def _handle_command_set(self, data):new_config = json.loads(data)self.config.update(new_config)self._save_config_file()def _handle_command_get(self, data):# example: 'network,host' will lookup self.config['network']['host']#示例:“network,host”將查找self.config[“network”][“host”]keys = data.split(',')value = self._retrieve_nested_config(keys, self.config)return valuedef _retrieve_nested_config(self, keys, dict):if keys[0] in dict:if len(keys) == 1:return dict[keys[0]]else:return self._retrieve_nested_config(keys[1:], dict[keys[0]])def _handle_command_getkeys(self, name):private_key, publickey = self._get_zmq_keys(name)self.config_commands.send(Messages.OK.value + ' ' + json.dumps({'public_key': publickey,'private_key': private_key}))def _save_config_file(self):with open(self.config_file, 'w+') as config_file:config_file.write(json.dumps(self.config, indent=4))def _get_zmq_keys(self, id):cert_path = os.path.join(self.work_dir, 'certificates')public_keys = os.path.join(cert_path, 'public_keys')private_keys = os.path.join(cert_path, 'private_keys')public_key_path = os.path.join(public_keys, '{0}.pub'.format(id))private_key_path = os.path.join(private_keys, '{0}.pri'.format(id))if not os.path.isfile(public_key_path) or not os.path.isfile(private_key_path):logging.debug('Generating ZMQ keys for: {0}.'.format(id))for _path in [cert_path, public_keys, private_keys]:if not os.path.isdir(_path):os.mkdir(_path)tmp_key_dir = tempfile.mkdtemp()try:public_key, private_key = create_certificates(tmp_key_dir, id)# the final location for keys#鑰匙的最終位置shutil.move(public_key, public_key_path)shutil.move(private_key, private_key_path)finally:shutil.rmtree(tmp_key_dir)# return copy of keys#返回密鑰副本return open(private_key_path, "r").readlines(), open(public_key_path, "r").readlines()def _remove_zmq_keys(self, id):cert_path = os.path.join(self.work_dir, 'certificates')public_keys = os.path.join(cert_path, 'public_keys')private_keys = os.path.join(cert_path, 'private_keys')public_key_path = os.path.join(public_keys, '{0}.pub'.format(id))private_key_path = os.path.join(private_keys, '{0}.pri'.format(id))for _file in [public_key_path, private_key_path]:if os.path.isfile(_file):os.remove(_file)def message_proxy(self, work_dir):"""drone_data_inboud is for data comming from dronesdrone_data_outbound is for commands to the drones, topic must either be a drone ID or all for sending a broadcast message to all drones無人機內部數據用于無人機的數據混合。無人機數據輸出用于向無人機發送命令,主題必須是無人機ID或全部用于向所有無人機發送廣播消息"""public_keys_dir = os.path.join(work_dir, 'certificates', 'public_keys')secret_keys_dir = os.path.join(work_dir, 'certificates', 'private_keys')# start and configure auth worker#啟動并配置身份驗證工作程序auth = IOLoopAuthenticator()auth.start()auth.allow('127.0.0.1')auth.configure_curve(domain='*', location=public_keys_dir)# external interfaces for communicating with drones#與無人機通信的外部接口#服務器證書server_secret_file = os.path.join(secret_keys_dir, 'beeswarm_server.pri')#獲取公鑰和密鑰server_public, server_secret = load_certificate(server_secret_file)drone_data_inbound = beeswarm.shared.zmq_context.socket(zmq.PULL)drone_data_inbound.curve_secretkey = server_secretdrone_data_inbound.curve_publickey = server_publicdrone_data_inbound.curve_server = Truedrone_data_inbound.bind('tcp://*:{0}'.format(self.config['network']['zmq_port']))drone_data_outbound = beeswarm.shared.zmq_context.socket(zmq.PUB)drone_data_outbound.curve_secretkey = server_secretdrone_data_outbound.curve_publickey = server_publicdrone_data_outbound.curve_server = Truedrone_data_outbound.bind('tcp://*:{0}'.format(self.config['network']['zmq_command_port']))# internal interfaces# all inbound session data from drones will be replayed on this socket#內部接口,來自無人機的所有入站會話數據將在此套接字上重播drone_data_socket = beeswarm.shared.zmq_context.socket(zmq.PUB)drone_data_socket.bind(SocketNames.DRONE_DATA.value)# all commands received on this will be published on the external interface#在此上接收到的所有命令都將在外部接口上發布drone_command_socket = beeswarm.shared.zmq_context.socket(zmq.PULL)drone_command_socket.bind(SocketNames.DRONE_COMMANDS.value)poller = zmq.Poller()poller.register(drone_data_inbound, zmq.POLLIN)poller.register(drone_command_socket, zmq.POLLIN)while True:# .recv() gives no context switch - why not? using poller with timeout instead#recv()不提供上下文切換-為什么不?將輪詢器與超時一起使用socks = dict(poller.poll(100))gevent.sleep()if drone_command_socket in socks and socks[drone_command_socket] == zmq.POLLIN:data = drone_command_socket.recv()drone_id, _ = data.split(' ', 1)logger.debug("Sending drone command to: {0}".format(drone_id))# pub socket takes care of filtering#pub socket負責過濾drone_data_outbound.send(data)elif drone_data_inbound in socks and socks[drone_data_inbound] == zmq.POLLIN:raw_msg = drone_data_inbound.recv()split_data = raw_msg.split(' ', 2)if len(split_data) == 3:topic, drone_id, data = split_dataelse:data = Nonetopic, drone_id, = split_datalogger.debug("Received {0} message from {1}.".format(topic, drone_id))# relay message on internal socket#內部插座上的中繼消息drone_data_socket.send(raw_msg)

?

總結

以上是生活随笔為你收集整理的Python zmq 讲解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。