日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python通过amqp消息队列协议中的Qpid实现数据通信

發布時間:2023/12/4 python 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python通过amqp消息队列协议中的Qpid实现数据通信 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

簡介:

? ? 這兩天看了消息隊列通信,打算在配置平臺上應用起來。以前用過zeromq但是這東西太快了,還有就是rabbitmq有點大,新浪的朋友推薦了qpid,簡單輕便。自己總結了下文檔,大家可以瞅瞅。


AMQP(消息隊列協議Advanced Message Queuing Protocol)是一種消息協議 ,等同于JMS,但是JMS只是java平臺的方案,AMQP是一個跨語言的協議。


AMQP 不分語言平臺,主流的語言都支持,運維這邊的perl,python,ruby更是支持,所以大家就放心用吧。


主流的消息隊列通信類型:

點對點:A?發消息給?B。 廣播:A?發給所有其他人的消息 組播:A?發給多個但不是所有其他人的消息。 Requester/response:類似訪問網頁的通信方式,客戶端發請求并等待,服務端回復該請求 Pub-sub:類似雜志發行,出版雜志的人并不知道誰在看這本雜志,訂閱的人并不關心誰在發表這本雜志。出版的人只管將信息發布出去,訂閱的人也只在需要的時候收到該信息。 Store-and-forward:存儲轉發模型類似信件投遞,寫信的人將消息寫給某人,但在將信件發出的時候,收信的人并不一定在家等待,也并不知道有消息給他。但這個消息不會丟失,會放在收信者的信箱中。這種模型允許信息的異步交換。 其他通信模型。。。


Publisher --->Exchange ---> MessageQueue --->Consumer


整個過程是異步的.Publisher,Consumer相互不知道對方的存在,Exchange負責交換/路由,依靠Routing Key,每個消息者有一個Routing Key,每個Binding將自已感興趣的RoutingKey告訴Exchange,以便Exchange將相關的消息轉發給相應的Queue !


幾個概念

幾個概念 Producer,Routing?Key,Exchange,Binding,Queue,Consumer. Producer:?消息的創建者,消息的發送者 Routing?Key:唯一用來映射消息該進入哪個隊列的標識 Exchange:負責消息的路由,交換 Binding:定義Queue和Exchange的映射關系 Queue:消息隊列 Consumer:消息的使用者 Exchange類型 Fan-Out:類似于廣播方式,不管RoutingKey Direct:根據RoutingKey,進行關聯投寄 Topic:類似于Direct,但是支持多個Key關聯,以組的方式投寄。key以.來定義界限。類似于usea.news,usea.weather.這兩個消息是一組的。



QPID


Qpid 是 Apache 開發的一款面向對象的消息中間件,它是一個 AMQP 的實現,可以和其他符合 AMQP 協議的系統進行通信。Qpid 提供了 C++/Python/Java/C# 等主流編程語言的客戶端庫,安裝使用非常方便。相對于其他的 AMQP 實現,Qpid 社區十分活躍,有望成為標準 AMQP 中間件產品。除了符合 AMQP 基本要求之外,Qpid 提供了很多額外的 HA 特性,非常適于集群環境下的消息通信!


基本功能外提供以下特性:


采用 Corosync(?)來保證集群環境下的Fault-tolerant(?) 特性

支持XML的Exchange,消息為XML時,彩用Xquery過濾

支持plugin

提供安全認證,可對producer/consumer提供身份認證

qpidd --port --no-data-dir --auth

port:端口

--no-data-dir:不指定數據目錄

--auth:不啟用安全身份認證



啟動后自動創建一些Exchange,amp.topic,amp.direct,amp.fanout


tools:


Qpid-config:維護Queue,Exchange,內部配置

Qpid-route:配置broker Federation(聯盟?集群?)

Qpid-tool:監控


咱們說完介紹了,這里就趕緊測試下。


服務器端的安裝:


yum?install?qpid-cpp-server yum?install?qpid-tools /etc/init.d/qpidd?start


發布端的測試代碼


一些測試代碼轉自: ibm 的開發社區?


#!/usr/bin/env?python #xiaorui.cc #轉自ibm開發社區 import?optparse,?time from?qpid.messaging?import?* from?qpid.util?import?URL from?qpid.log?import?enable,?DEBUG,?WARN def?nameval(st):idx?=?st.find("=")if?idx?>=?0:name?=?st[0:idx]value?=?st[idx+1:]else:name?=?stvalue?=?Nonereturn?name,?value parser?=?optparse.OptionParser(usage="usage:?%prog?[options]?ADDRESS?[?CONTENT?...?]",description="Send?messages?to?the?supplied?address.") parser.add_option("-b",?"--broker",?default="localhost",help="connect?to?specified?BROKER?(default?%default)") parser.add_option("-r",?"--reconnect",?action="store_true",help="enable?auto?reconnect") parser.add_option("-i",?"--reconnect-interval",?type="float",?default=3,help="interval?between?reconnect?attempts") parser.add_option("-l",?"--reconnect-limit",?type="int",help="maximum?number?of?reconnect?attempts") parser.add_option("-c",?"--count",?type="int",?default=1,help="stop?after?count?messages?have?been?sent,?zero?disables?(default?%default)") parser.add_option("-t",?"--timeout",?type="float",?default=None,help="exit?after?the?specified?time") parser.add_option("-I",?"--id",?help="use?the?supplied?id?instead?of?generating?one") parser.add_option("-S",?"--subject",?help="specify?a?subject") parser.add_option("-R",?"--reply-to",?help="specify?reply-to?address") parser.add_option("-P",?"--property",?dest="properties",?action="append",?default=[],metavar="NAME=VALUE",?help="specify?message?property") parser.add_option("-M",?"--map",?dest="entries",?action="append",?default=[],metavar="KEY=VALUE",help="specify?map?entry?for?message?body") parser.add_option("-v",?dest="verbose",?action="store_true",help="enable?logging") opts,?args?=?parser.parse_args() if?opts.verbose:enable("qpid",?DEBUG) else:enable("qpid",?WARN) if?opts.id?is?None:spout_id?=?str(uuid4()) else:spout_id?=?opts.id if?args:addr?=?args.pop(0) else:parser.error("address?is?required") content?=?None if?args:text?=?"?".join(args) else:text?=?None if?opts.entries:content?=?{}if?text:content["text"]?=?textfor?e?in?opts.entries:name,?val?=?nameval(e)content[name]?=?val else:content?=?text conn?=?Connection(opts.broker,reconnect=opts.reconnect,reconnect_interval=opts.reconnect_interval,reconnect_limit=opts.reconnect_limit) try:conn.open()ssn?=?conn.session()snd?=?ssn.sender(addr)count?=?0start?=?time.time()while?(opts.count?==?0?or?count?<?opts.count)?and?\(opts.timeout?is?None?or?time.time()?-?start?<?opts.timeout):msg?=?Message(subject=opts.subject,reply_to=opts.reply_to,content=content)msg.properties["spout-id"]?=?"%s:%s"?%?(spout_id,?count)for?p?in?opts.properties:name,?val?=?nameval(p)msg.properties[name]?=?valsnd.send(msg)count?+=?1print?msg except?SendError,?e:print?e except?KeyboardInterrupt:pass conn.close()


客戶端的測試代碼:


#!/usr/bin/env?python #xiaorui.cc ##轉自ibm開發社區 import?optparse from?qpid.messaging?import?* from?qpid.util?import?URL from?qpid.log?import?enable,?DEBUG,?WARN parser?=?optparse.OptionParser(usage="usage:?%prog?[options]?ADDRESS?...",description="Drain?messages?from?the?supplied?address.") parser.add_option("-b",?"--broker",?default="localhost",help="connect?to?specified?BROKER?(default?%default)") parser.add_option("-c",?"--count",?type="int",help="number?of?messages?to?drain") parser.add_option("-f",?"--forever",?action="store_true",help="ignore?timeout?and?wait?forever") parser.add_option("-r",?"--reconnect",?action="store_true",help="enable?auto?reconnect") parser.add_option("-i",?"--reconnect-interval",?type="float",?default=3,help="interval?between?reconnect?attempts") parser.add_option("-l",?"--reconnect-limit",?type="int",help="maximum?number?of?reconnect?attempts") parser.add_option("-t",?"--timeout",?type="float",?default=0,help="timeout?in?seconds?to?wait?before?exiting?(default?%default)") parser.add_option("-p",?"--print",?dest="format",?default="%(M)s",help="format?string?for?printing?messages?(default?%default)") parser.add_option("-v",?dest="verbose",?action="store_true",help="enable?logging") opts,?args?=?parser.parse_args() if?opts.verbose:enable("qpid",?DEBUG) else:enable("qpid",?WARN) if?args:addr?=?args.pop(0) else:parser.error("address?is?required") if?opts.forever:timeout?=?None else:timeout?=?opts.timeout class?Formatter:def?__init__(self,?message):self.message?=?messageself.environ?=?{"M":?self.message,"P":?self.message.properties,"C":?self.message.content}def?__getitem__(self,?st):return?eval(st,?self.environ) conn?=?Connection(opts.broker,reconnect=opts.reconnect,reconnect_interval=opts.reconnect_interval,reconnect_limit=opts.reconnect_limit) try:conn.open()ssn?=?conn.session()rcv?=?ssn.receiver(addr)count?=?0while?not?opts.count?or?count?<?opts.count:try:msg?=?rcv.fetch(timeout=timeout)print?opts.format?%?Formatter(msg)count?+=?1ssn.acknowledge()except?Empty:break except?ReceiverError,?e:print?e except?KeyboardInterrupt:pass conn.close()


Browse 模式的意思是,瀏覽的意思,一個特殊的需求,我訪問了一次,別人也能訪問。

Consume 模式的意思是,我瀏覽了一次后,刪除這一條。別人就訪問不到啦。

這個是瀏覽模式:


sub-pub 通信的例子


Pub-sub 是另一種很有用的通信模型。恐怕它的名字就源于出版發行這種現實中的信息傳遞方式吧,publisher 就是出版商,subscriber 就是訂閱者。


服務端 qpid-config?add?exchange?topic?news-service ./spout?news-service/news?xiaorui.cc 客戶端: ./drain?-t?120?news-service/#.news


PUB端,創建TOPIC點 !


SUB端,也就是接收端!


總結:

?qpid挺好用的,比rabbitmq要輕型,比zeromq保險點! 各方面的文檔也都很健全,值得一用。 ? ?話說,這三個消息隊列我也都用過,最一開始用的是redis的pubsub做日志收集和信息通知,后來在做集群相關的項目的時候,我自己搞了一套zeromq的分布式任務分發,和saltstack很像,當然了遠沒有萬人用的salt強大。 ?rabbitmq的用法,更是看中他的安全和持久化,當然性能真的不咋地。

關于qpid的性能我沒有親自做大量的測試,但是聽朋友說,加持久化可以到7k,不加持久化可以到1500左右。


總結

以上是生活随笔為你收集整理的Python通过amqp消息队列协议中的Qpid实现数据通信的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。