Python通过amqp消息队列协议中的Qpid实现数据通信
簡介:
? ? 這兩天看了消息隊列通信,打算在配置平臺上應用起來。以前用過zeromq但是這東西太快了,還有就是rabbitmq有點大,新浪的朋友推薦了qpid,簡單輕便。自己總結了下文檔,大家可以瞅瞅。
AMQP(消息隊列協議Advanced Message Queuing Protocol)是一種消息協議 ,等同于JMS,但是JMS只是java平臺的方案,AMQP是一個跨語言的協議。
AMQP 不分語言平臺,主流的語言都支持,運維這邊的perl,python,ruby更是支持,所以大家就放心用吧。
主流的消息隊列通信類型:
點對點:A?發消息給?B。 廣播:A?發給所有其他人的消息 組播:A?發給多個但不是所有其他人的消息。 Requester/response:類似訪問網頁的通信方式,客戶端發請求并等待,服務端回復該請求 Pub-sub:類似雜志發行,出版雜志的人并不知道誰在看這本雜志,訂閱的人并不關心誰在發表這本雜志。出版的人只管將信息發布出去,訂閱的人也只在需要的時候收到該信息。 Store-and-forward:存儲轉發模型類似信件投遞,寫信的人將消息寫給某人,但在將信件發出的時候,收信的人并不一定在家等待,也并不知道有消息給他。但這個消息不會丟失,會放在收信者的信箱中。這種模型允許信息的異步交換。 其他通信模型。。。
Publisher --->Exchange ---> MessageQueue --->Consumer
整個過程是異步的.Publisher,Consumer相互不知道對方的存在,Exchange負責交換/路由,依靠Routing Key,每個消息者有一個Routing Key,每個Binding將自已感興趣的RoutingKey告訴Exchange,以便Exchange將相關的消息轉發給相應的Queue !
幾個概念
幾個概念 Producer,Routing?Key,Exchange,Binding,Queue,Consumer. Producer:?消息的創建者,消息的發送者 Routing?Key:唯一用來映射消息該進入哪個隊列的標識 Exchange:負責消息的路由,交換 Binding:定義Queue和Exchange的映射關系 Queue:消息隊列 Consumer:消息的使用者 Exchange類型 Fan-Out:類似于廣播方式,不管RoutingKey Direct:根據RoutingKey,進行關聯投寄 Topic:類似于Direct,但是支持多個Key關聯,以組的方式投寄。key以.來定義界限。類似于usea.news,usea.weather.這兩個消息是一組的。QPID
Qpid 是 Apache 開發的一款面向對象的消息中間件,它是一個 AMQP 的實現,可以和其他符合 AMQP 協議的系統進行通信。Qpid 提供了 C++/Python/Java/C# 等主流編程語言的客戶端庫,安裝使用非常方便。相對于其他的 AMQP 實現,Qpid 社區十分活躍,有望成為標準 AMQP 中間件產品。除了符合 AMQP 基本要求之外,Qpid 提供了很多額外的 HA 特性,非常適于集群環境下的消息通信!
基本功能外提供以下特性:
采用 Corosync(?)來保證集群環境下的Fault-tolerant(?) 特性
支持XML的Exchange,消息為XML時,彩用Xquery過濾
支持plugin
提供安全認證,可對producer/consumer提供身份認證
qpidd --port --no-data-dir --auth
port:端口
--no-data-dir:不指定數據目錄
--auth:不啟用安全身份認證
啟動后自動創建一些Exchange,amp.topic,amp.direct,amp.fanout
tools:
Qpid-config:維護Queue,Exchange,內部配置
Qpid-route:配置broker Federation(聯盟?集群?)
Qpid-tool:監控
咱們說完介紹了,這里就趕緊測試下。
服務器端的安裝:
發布端的測試代碼:
一些測試代碼轉自: ibm 的開發社區?
客戶端的測試代碼:
#!/usr/bin/env?python #xiaorui.cc ##轉自ibm開發社區 import?optparse from?qpid.messaging?import?* from?qpid.util?import?URL from?qpid.log?import?enable,?DEBUG,?WARN parser?=?optparse.OptionParser(usage="usage:?%prog?[options]?ADDRESS?...",description="Drain?messages?from?the?supplied?address.") parser.add_option("-b",?"--broker",?default="localhost",help="connect?to?specified?BROKER?(default?%default)") parser.add_option("-c",?"--count",?type="int",help="number?of?messages?to?drain") parser.add_option("-f",?"--forever",?action="store_true",help="ignore?timeout?and?wait?forever") parser.add_option("-r",?"--reconnect",?action="store_true",help="enable?auto?reconnect") parser.add_option("-i",?"--reconnect-interval",?type="float",?default=3,help="interval?between?reconnect?attempts") parser.add_option("-l",?"--reconnect-limit",?type="int",help="maximum?number?of?reconnect?attempts") parser.add_option("-t",?"--timeout",?type="float",?default=0,help="timeout?in?seconds?to?wait?before?exiting?(default?%default)") parser.add_option("-p",?"--print",?dest="format",?default="%(M)s",help="format?string?for?printing?messages?(default?%default)") parser.add_option("-v",?dest="verbose",?action="store_true",help="enable?logging") opts,?args?=?parser.parse_args() if?opts.verbose:enable("qpid",?DEBUG) else:enable("qpid",?WARN) if?args:addr?=?args.pop(0) else:parser.error("address?is?required") if?opts.forever:timeout?=?None else:timeout?=?opts.timeout class?Formatter:def?__init__(self,?message):self.message?=?messageself.environ?=?{"M":?self.message,"P":?self.message.properties,"C":?self.message.content}def?__getitem__(self,?st):return?eval(st,?self.environ) conn?=?Connection(opts.broker,reconnect=opts.reconnect,reconnect_interval=opts.reconnect_interval,reconnect_limit=opts.reconnect_limit) try:conn.open()ssn?=?conn.session()rcv?=?ssn.receiver(addr)count?=?0while?not?opts.count?or?count?<?opts.count:try:msg?=?rcv.fetch(timeout=timeout)print?opts.format?%?Formatter(msg)count?+=?1ssn.acknowledge()except?Empty:break except?ReceiverError,?e:print?e except?KeyboardInterrupt:pass conn.close()
Browse 模式的意思是,瀏覽的意思,一個特殊的需求,我訪問了一次,別人也能訪問。
Consume 模式的意思是,我瀏覽了一次后,刪除這一條。別人就訪問不到啦。
這個是瀏覽模式:
sub-pub 通信的例子
Pub-sub 是另一種很有用的通信模型。恐怕它的名字就源于出版發行這種現實中的信息傳遞方式吧,publisher 就是出版商,subscriber 就是訂閱者。
PUB端,創建TOPIC點 !
SUB端,也就是接收端!
總結:
?qpid挺好用的,比rabbitmq要輕型,比zeromq保險點! 各方面的文檔也都很健全,值得一用。 ? ?話說,這三個消息隊列我也都用過,最一開始用的是redis的pubsub做日志收集和信息通知,后來在做集群相關的項目的時候,我自己搞了一套zeromq的分布式任務分發,和saltstack很像,當然了遠沒有萬人用的salt強大。 ?rabbitmq的用法,更是看中他的安全和持久化,當然性能真的不咋地。
關于qpid的性能我沒有親自做大量的測試,但是聽朋友說,加持久化可以到7k,不加持久化可以到1500左右。
總結
以上是生活随笔為你收集整理的Python通过amqp消息队列协议中的Qpid实现数据通信的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Object-C中的字符串对象1-不可变
- 下一篇: Python中文编码判别及转换