Python通过amqp消息队列协议中的Qpid实现数据通信
簡介:
? ? 這兩天看了消息隊列通信,打算在配置平臺上應用起來。以前用過zeromq但是這東西太快了,還有就是rabbitmq有點大,新浪的朋友推薦了qpid,簡單輕便。自己總結(jié)了下文檔,大家可以瞅瞅。
AMQP(消息隊列協(xié)議Advanced Message Queuing Protocol)是一種消息協(xié)議 ,等同于JMS,但是JMS只是java平臺的方案,AMQP是一個跨語言的協(xié)議。
AMQP 不分語言平臺,主流的語言都支持,運維這邊的perl,python,ruby更是支持,所以大家就放心用吧。
主流的消息隊列通信類型:
點對點:A?發(fā)消息給?B。 廣播:A?發(fā)給所有其他人的消息 組播:A?發(fā)給多個但不是所有其他人的消息。 Requester/response:類似訪問網(wǎng)頁的通信方式,客戶端發(fā)請求并等待,服務端回復該請求 Pub-sub:類似雜志發(fā)行,出版雜志的人并不知道誰在看這本雜志,訂閱的人并不關(guān)心誰在發(fā)表這本雜志。出版的人只管將信息發(fā)布出去,訂閱的人也只在需要的時候收到該信息。 Store-and-forward:存儲轉(zhuǎn)發(fā)模型類似信件投遞,寫信的人將消息寫給某人,但在將信件發(fā)出的時候,收信的人并不一定在家等待,也并不知道有消息給他。但這個消息不會丟失,會放在收信者的信箱中。這種模型允許信息的異步交換。 其他通信模型。。。
Publisher --->Exchange ---> MessageQueue --->Consumer
整個過程是異步的.Publisher,Consumer相互不知道對方的存在,Exchange負責交換/路由,依靠Routing Key,每個消息者有一個Routing Key,每個Binding將自已感興趣的RoutingKey告訴Exchange,以便Exchange將相關(guān)的消息轉(zhuǎn)發(fā)給相應的Queue !
幾個概念
幾個概念 Producer,Routing?Key,Exchange,Binding,Queue,Consumer. Producer:?消息的創(chuàng)建者,消息的發(fā)送者 Routing?Key:唯一用來映射消息該進入哪個隊列的標識 Exchange:負責消息的路由,交換 Binding:定義Queue和Exchange的映射關(guān)系 Queue:消息隊列 Consumer:消息的使用者 Exchange類型 Fan-Out:類似于廣播方式,不管RoutingKey Direct:根據(jù)RoutingKey,進行關(guān)聯(lián)投寄 Topic:類似于Direct,但是支持多個Key關(guān)聯(lián),以組的方式投寄。key以.來定義界限。類似于usea.news,usea.weather.這兩個消息是一組的。QPID
Qpid 是 Apache 開發(fā)的一款面向?qū)ο蟮南⒅虚g件,它是一個 AMQP 的實現(xiàn),可以和其他符合 AMQP 協(xié)議的系統(tǒng)進行通信。Qpid 提供了 C++/Python/Java/C# 等主流編程語言的客戶端庫,安裝使用非常方便。相對于其他的 AMQP 實現(xiàn),Qpid 社區(qū)十分活躍,有望成為標準 AMQP 中間件產(chǎn)品。除了符合 AMQP 基本要求之外,Qpid 提供了很多額外的 HA 特性,非常適于集群環(huán)境下的消息通信!
基本功能外提供以下特性:
采用 Corosync(?)來保證集群環(huán)境下的Fault-tolerant(?) 特性
支持XML的Exchange,消息為XML時,彩用Xquery過濾
支持plugin
提供安全認證,可對producer/consumer提供身份認證
qpidd --port --no-data-dir --auth
port:端口
--no-data-dir:不指定數(shù)據(jù)目錄
--auth:不啟用安全身份認證
啟動后自動創(chuàng)建一些Exchange,amp.topic,amp.direct,amp.fanout
tools:
Qpid-config:維護Queue,Exchange,內(nèi)部配置
Qpid-route:配置broker Federation(聯(lián)盟?集群?)
Qpid-tool:監(jiān)控
咱們說完介紹了,這里就趕緊測試下。
服務器端的安裝:
發(fā)布端的測試代碼:
一些測試代碼轉(zhuǎn)自: ibm 的開發(fā)社區(qū)?
客戶端的測試代碼:
#!/usr/bin/env?python #xiaorui.cc ##轉(zhuǎn)自ibm開發(fā)社區(qū) import?optparse from?qpid.messaging?import?* from?qpid.util?import?URL from?qpid.log?import?enable,?DEBUG,?WARN parser?=?optparse.OptionParser(usage="usage:?%prog?[options]?ADDRESS?...",description="Drain?messages?from?the?supplied?address.") parser.add_option("-b",?"--broker",?default="localhost",help="connect?to?specified?BROKER?(default?%default)") parser.add_option("-c",?"--count",?type="int",help="number?of?messages?to?drain") parser.add_option("-f",?"--forever",?action="store_true",help="ignore?timeout?and?wait?forever") parser.add_option("-r",?"--reconnect",?action="store_true",help="enable?auto?reconnect") parser.add_option("-i",?"--reconnect-interval",?type="float",?default=3,help="interval?between?reconnect?attempts") parser.add_option("-l",?"--reconnect-limit",?type="int",help="maximum?number?of?reconnect?attempts") parser.add_option("-t",?"--timeout",?type="float",?default=0,help="timeout?in?seconds?to?wait?before?exiting?(default?%default)") parser.add_option("-p",?"--print",?dest="format",?default="%(M)s",help="format?string?for?printing?messages?(default?%default)") parser.add_option("-v",?dest="verbose",?action="store_true",help="enable?logging") opts,?args?=?parser.parse_args() if?opts.verbose:enable("qpid",?DEBUG) else:enable("qpid",?WARN) if?args:addr?=?args.pop(0) else:parser.error("address?is?required") if?opts.forever:timeout?=?None else:timeout?=?opts.timeout class?Formatter:def?__init__(self,?message):self.message?=?messageself.environ?=?{"M":?self.message,"P":?self.message.properties,"C":?self.message.content}def?__getitem__(self,?st):return?eval(st,?self.environ) conn?=?Connection(opts.broker,reconnect=opts.reconnect,reconnect_interval=opts.reconnect_interval,reconnect_limit=opts.reconnect_limit) try:conn.open()ssn?=?conn.session()rcv?=?ssn.receiver(addr)count?=?0while?not?opts.count?or?count?<?opts.count:try:msg?=?rcv.fetch(timeout=timeout)print?opts.format?%?Formatter(msg)count?+=?1ssn.acknowledge()except?Empty:break except?ReceiverError,?e:print?e except?KeyboardInterrupt:pass conn.close()
Browse 模式的意思是,瀏覽的意思,一個特殊的需求,我訪問了一次,別人也能訪問。
Consume 模式的意思是,我瀏覽了一次后,刪除這一條。別人就訪問不到啦。
這個是瀏覽模式:
sub-pub 通信的例子
Pub-sub 是另一種很有用的通信模型??峙滤拿志驮从诔霭姘l(fā)行這種現(xiàn)實中的信息傳遞方式吧,publisher 就是出版商,subscriber 就是訂閱者。
PUB端,創(chuàng)建TOPIC點 !
SUB端,也就是接收端!
總結(jié):
?qpid挺好用的,比rabbitmq要輕型,比zeromq保險點! 各方面的文檔也都很健全,值得一用。 ? ?話說,這三個消息隊列我也都用過,最一開始用的是redis的pubsub做日志收集和信息通知,后來在做集群相關(guān)的項目的時候,我自己搞了一套zeromq的分布式任務分發(fā),和saltstack很像,當然了遠沒有萬人用的salt強大。 ?rabbitmq的用法,更是看中他的安全和持久化,當然性能真的不咋地。
關(guān)于qpid的性能我沒有親自做大量的測試,但是聽朋友說,加持久化可以到7k,不加持久化可以到1500左右。
總結(jié)
以上是生活随笔為你收集整理的Python通过amqp消息队列协议中的Qpid实现数据通信的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Object-C中的字符串对象1-不可变
- 下一篇: Python中文编码判别及转换