一个winform带你玩转rabbitMQ
源碼已放出?https://github.com/dubing/MaoyaRabbit
本章分3部分
一、安裝部署初探
二、進(jìn)階
三、api相關(guān)
安裝 部署 初探
先上圖
一. 安裝部署
下載 rabbitMQ :http://www.rabbitmq.com/download.html
安裝rabbitmq需要erlang,下載erlang:http://www.erlang.org/download.html
按照官網(wǎng)按照步驟,例如windows?http://www.rabbitmq.com/install-windows.html?
安裝完rabbitMQ可以再啟動(dòng)插件擴(kuò)展,其中包含了一個(gè)管理后臺(tái)
最新版本的后臺(tái)地址為?http://localhost:15672/?
用戶名和密碼都為guest,輸入完成進(jìn)入主菜單
功能很豐富,可以查看當(dāng)前服務(wù)器的交換機(jī),隊(duì)列,消息,連接,會(huì)話等得使用情況。
基本上到這里服務(wù)器的安裝部署環(huán)節(jié)算是ok,很簡(jiǎn)單。
?
二. ?簡(jiǎn)介
要了解rabbitMQ 首先要了解AMQP協(xié)議 百科上給的很詳細(xì)?http://baike.baidu.com/view/4023136.htm?fr=aladdin
AMQP 有四個(gè)非常重要的概念:虛擬機(jī)(virtual host),通道(exchange),隊(duì)列(queue)和綁定(binding)。
虛擬機(jī): 通常是應(yīng)用的外在邊界,我們可以為不同的虛擬機(jī)分配訪問(wèn)權(quán)限。虛擬機(jī)可持有多個(gè)交換機(jī)、隊(duì)列和綁定。
交換機(jī): 從連接通道(Channel)接收消息,并按照特定的路由規(guī)則發(fā)送給隊(duì)列。
隊(duì)列: 消息最終的存儲(chǔ)容器,直到消費(fèi)客戶端(Consumer)將其取走。
綁定: 也就是所謂的路由規(guī)則,告訴交換機(jī)將何種類型的消息發(fā)送到某個(gè)隊(duì)列中。
這個(gè)概念很重要 不然在學(xué)習(xí)rabbitmq的地方會(huì)碰到很多困難。想要進(jìn)階學(xué)習(xí)的可以參考?https://www.rabbitmq.com/tutorials/amqp-concepts.html
借用官方一個(gè)圖來(lái)闡述AMQP
RabbitMQ是一個(gè)消息代理。它的核心原理非常簡(jiǎn)單:接收和發(fā)送消息。
你可以把它想像成一個(gè)郵局:你把信件放入郵箱,郵遞員就會(huì)把信件投遞到你的收件人處。在這個(gè)比喻中,RabbitMQ是一個(gè)郵箱、郵局、郵遞員。RabbitMQ和郵局的主要區(qū)別是,它處理的不是紙,而是接收、存儲(chǔ)和發(fā)送二進(jìn)制的數(shù)據(jù)——消息。
對(duì)于rabbitMQ本身的特點(diǎn) 參考官網(wǎng)?http://www.rabbitmq.com/features.html
1、可靠性(Reliability)
RabbitMQ提供很多特性供我們可以在性能和可靠性作出折中的選擇,包括持久化、發(fā)送確認(rèn)、發(fā)布者確認(rèn)和高可用性等。
2、彈性選路(Flexible Routing)
消息在到達(dá)隊(duì)列前通過(guò)交換(exchanges)來(lái)被選路。RabbitMQ為典型的選路邏輯設(shè)計(jì)了幾個(gè)內(nèi)置的交換類型。對(duì)于更加復(fù)雜的選路,我們可以將exchanges綁定在一起或者寫屬于自己的exchange類型插件。
3、集群化(Clustering)
在一個(gè)局域網(wǎng)內(nèi)的幾個(gè)RabbitMQ服務(wù)器可以集群起來(lái),組成一個(gè)邏輯的代理人。
4、聯(lián)盟(Federation)
對(duì)于那些需要比集群更加松散和非可靠連接的服務(wù)器來(lái)說(shuō),RabbitMQ提供一個(gè)聯(lián)盟模型(Federation Model)
5、高可用隊(duì)列(High Available Queue)
可以在一個(gè)集群里的幾個(gè)機(jī)器里對(duì)隊(duì)列做鏡像,確保即時(shí)發(fā)生了硬件失效,你的消息也是安全的。
6、多客戶端(Many Clients)
有各種語(yǔ)言的RabbitMQ客戶端
7、管理UI(Management UI)
RabbitMQ提供一個(gè)易用的管理UI來(lái)監(jiān)控和控制消息代理人的各個(gè)方面。
8、跟蹤(Tracing)
如果你的消息系統(tǒng)行為異常,RabbitMQ提供跟蹤支持來(lái)找出錯(cuò)誤的根源。
9、插件系統(tǒng)(Plugin System)
RabbitMQ提供各種方式的插件擴(kuò)展,我們可以實(shí)現(xiàn)自己的插件。
使用任務(wù)隊(duì)列一個(gè)優(yōu)點(diǎn)是能夠輕易地并行處理任務(wù)。當(dāng)處理大量積壓的任務(wù),只要增加工作隊(duì)列,通過(guò)這個(gè)方式,能夠?qū)崿F(xiàn)輕易的縮放。
?
三. 初探
文中的winform所采取的client為官方的.net版本?https://github.com/rabbitmq/rabbitmq-dotnet-client
首先是Connection和Channel的概念
Connection 建立與rabbitmq server的一個(gè)連接,由ConnectionFactory創(chuàng)建,Channel建立在connection基礎(chǔ)上的一個(gè)頻道,相對(duì)于connection來(lái)說(shuō),它是輕量級(jí)的??梢岳斫獬梢淮螘?huì)話。
代碼示例 本機(jī)環(huán)境
| 1 2 3 4 5 6 7 | ????using?(IConnection connection = factory.CreateConnection()) ????{ ????????using?(IModel channel = connection.CreateModel()) ????????{ //do something ????????} ????} |
exchange常用有三種類型:
Direct :處理路由鍵。需要將一個(gè)隊(duì)列綁定到交換機(jī)上,要求該消息與一個(gè)特定的路由鍵完全匹配。這是一個(gè)完整的匹配。
Fanout :不處理路由鍵。你只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。
Topic : 將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上。符號(hào)“#”匹配一個(gè)或多個(gè)詞,符號(hào)“*”匹配不多不少一個(gè)詞。
還有一種多重屬性的類型headers,我們?cè)谙乱徽鹿?jié)討論。
我們用winform分別造成三種類型的exchange來(lái)實(shí)際體驗(yàn)一下
這里所謂的限定exchange是在我們安裝rabbitmq server的時(shí)候自動(dòng)生成的一些 我們的測(cè)試不使用這些exchange。
然后我們新建3個(gè)Queue,這里我們會(huì)發(fā)現(xiàn)一個(gè)有趣的現(xiàn)象,rabbitmq server對(duì)于新生成的隊(duì)列都會(huì)默認(rèn)綁定在一個(gè)名稱為“”的默認(rèn)exchange上。
先試試direct類型,下面我們分別把Q1,Q2,Q3根據(jù)路由key為空,k1,k.#綁定在dEx上(direct exchange)。
然后我們根據(jù)路由key為空,k,k1,k2,k3來(lái)發(fā)送消息m1,m2,m3,m4,m5
再用3個(gè)隊(duì)列接收消息試一下結(jié)果
因?yàn)榘l(fā)送確認(rèn)標(biāo)記ack,所以隊(duì)列上讀取過(guò)的消息會(huì)被刪除,為了進(jìn)一步認(rèn)證,我在結(jié)尾又添加了一個(gè)routingkey為k.#的消息(對(duì)應(yīng)綁定Q3),由圖可見(jiàn)direct 模式下隊(duì)列之收取他們完全對(duì)應(yīng)的routingkey消息。
下面我們?cè)僭囈幌?strong>fanout類型,把Q1,Q2,Q3根據(jù)路由key為空,k1,k.#綁定在fEx上(fanout exchange)。
同上步驟建立綁定關(guān)系
生產(chǎn)消息,然后看下隊(duì)列接受消息的情況
效果很明顯,fanout為廣播模式。
再試試topic類型 把Q1,Q2,Q3根據(jù)路由key為空,k1,k.#綁定在tEx上(topic exchange)。
推送消息
接收消息
通過(guò)3種模式 3個(gè)隊(duì)列的消息讀取 大家應(yīng)該了解了這3中模式的區(qū)別。
進(jìn)階
一. ?exchange屬性
Type
前一章我們說(shuō)了exchange的類型分為fanout,direct,topic.還有一種不常用的headers。
headers這種類型的exchange綁定的時(shí)候會(huì)忽略掉routingkey,Headers是一個(gè)鍵值對(duì),可以定義成成字典等。發(fā)送者在發(fā)送的時(shí)候定義一些鍵值對(duì),接收者也可以再綁定時(shí)候傳入一些鍵值對(duì),兩者匹配的話,則對(duì)應(yīng)的隊(duì)列就可以收到消息。匹配有兩種方式all和any。這兩種方式是在接收端必須要用鍵值"x-mactch"來(lái)定義。all代表定義的多個(gè)鍵值對(duì)都要滿足,而any則代碼只要滿足一個(gè)就可以了。之前的幾種exchange的routingKey都需要要字符串形式的,而headers exchange則沒(méi)有這個(gè)要求,因?yàn)殒I值對(duì)的值可以是任何類型
舉個(gè)例子,發(fā)送端定義2個(gè)鍵值{k1,1},{k2,2},接收端綁定隊(duì)列的時(shí)候定義{"x-match", "any"},那么接收端的鍵值屬性里只要存在{k1,1}或{k2,2}都可以獲取到消息。
這樣的類型擴(kuò)展的程度很大,適合非常復(fù)雜的業(yè)務(wù)場(chǎng)景。
Durability
持久性,這是exchange的可選屬性,如果你Durability設(shè)置為false,那些當(dāng)前會(huì)話結(jié)束的時(shí)候,該exchange也會(huì)被銷毀?!?br /> 新建一個(gè)transient exchange
關(guān)閉當(dāng)前連接再查看一下
剛才我們新建的transient已經(jīng)銷毀了。
Auto delete
當(dāng)沒(méi)有隊(duì)列或者其他exchange綁定到此exchange的時(shí)候,該exchange被銷毀。這個(gè)很簡(jiǎn)單就不示例了。
Internal?(比較簡(jiǎn)單 也不展示了)
表示這個(gè)exchange不可以被client用來(lái)推送消息,僅用來(lái)進(jìn)行exchange和exchange之間的綁定。
PS: 無(wú)法聲明2個(gè)名稱相同 但是類型卻不同的exchange
二. ?Queue屬性
Durability?和exchange相同,未持久化的隊(duì)列,服務(wù)重啟后銷毀。
Auto delete?當(dāng)沒(méi)有消費(fèi)者連接到該隊(duì)列的時(shí)候,隊(duì)列自動(dòng)銷毀。
Exclusive?使隊(duì)列成為私有隊(duì)列,只有當(dāng)前應(yīng)用程序可用,當(dāng)你需要限制隊(duì)列只有一個(gè)消費(fèi)者,這是很有用的。
擴(kuò)展屬性如下對(duì)應(yīng)源程序?RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary<string,object>)最后的參數(shù)
Message TTL?當(dāng)一個(gè)消息被推送在該隊(duì)列的時(shí)候 可以存在的時(shí)間 單位為ms,(對(duì)應(yīng)擴(kuò)展參數(shù)argument?"x-message-ttl" )
Auto expire?在隊(duì)列自動(dòng)刪除之前可以保留多長(zhǎng)時(shí)間(對(duì)應(yīng)擴(kuò)展參數(shù)argument?"x-expires")
Max length?一個(gè)隊(duì)列可以容納的已準(zhǔn)備消息的數(shù)量(對(duì)應(yīng)擴(kuò)展參數(shù)argument?"x-max-length")
... 更多參考?http://www.rabbitmq.com/extensions.html
ps:一旦創(chuàng)建了隊(duì)列和交換機(jī),就不能修改其標(biāo)志了。例如,如果創(chuàng)建了一個(gè)non-durable的隊(duì)列,然后想把它改變成durable的,唯一的辦法就是刪除這個(gè)隊(duì)列然后重現(xiàn)創(chuàng)建。
三. ?Message屬性
Durability?
消息的持久在代碼中設(shè)置的方法與exchange和queue不同,有2種方法
1.
| 1 2 3 4 | IBasicProperties properties = channel.CreateBasicProperties(); properties.SetPersistent(true); byte[] payload = Encoding.ASCII.GetBytes(message); channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload); |
2.
| 1 2 3 4 | IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; byte[] payload = Encoding.ASCII.GetBytes(message); channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload); |
contentType: 標(biāo)識(shí)消息內(nèi)容的MIME,例如JSON用application/json
replayTo: 標(biāo)識(shí)回調(diào)的queue的地址
correlationId:用于request和response的關(guān)聯(lián),確保消息的請(qǐng)求和響應(yīng)的同一性
Message的2種狀態(tài):
Ready
此狀態(tài)的消息存在于隊(duì)列中待處理。
Unacknowledged
此狀態(tài)的消息表示已經(jīng)在處理未確認(rèn)。
說(shuō)到Unacknowledged,這里需要了解一個(gè)ack的概念。當(dāng)Consumer接收到消息、處理任務(wù)完成之后,會(huì)發(fā)送帶有這個(gè)消息標(biāo)示符的ack,來(lái)告訴server這個(gè)消息接收到并處理完成。RabbitMQ會(huì)一直等到處理某個(gè)消息的Consumer的鏈接失去之后,才確定這個(gè)消息沒(méi)有正確處理,從而RabbitMQ重發(fā)這個(gè)消息。
Message acknowledgment是默認(rèn)關(guān)閉的。初始化Consumer時(shí)有個(gè)noAck參數(shù),如果設(shè)置為true,這個(gè)Consumer在收到消息之后會(huì)馬上返回ack。
string BasicConsume(string queue, bool noAck, RabbitMQ.Client.IBasicConsumer consumer)
一般來(lái)說(shuō),常用的場(chǎng)景noack一般就是設(shè)置成true,但是對(duì)于風(fēng)險(xiǎn)要求比較高的項(xiàng)目,例如支付。對(duì)于每一條消息我們都需要保證他的完整性和正確性。就需要獲取消息后確認(rèn)執(zhí)行完正確的業(yè)務(wù)邏輯后再主動(dòng)返回一個(gè)ack給server??梢酝ㄟ^(guò)rabbitmqctl list_queues name message_rady message_unacknowleded 命令來(lái)查看隊(duì)列中的消息情況,也可以通過(guò)后臺(tái)管理界面。
我們先hold住一條消息
然后我們?cè)訇P(guān)閉鏈接或者重啟服務(wù)
數(shù)據(jù)還是完整的?!?/p>
ps:message的消費(fèi)還分為consume和baseget 下面講到集群的時(shí)候再介紹。
四. ?binding相關(guān)
如果你綁定了一個(gè)durable的隊(duì)列和一個(gè)durable的交換機(jī),RabbitMQ會(huì)自動(dòng)保留這個(gè)綁定。類似的,如果刪除了某個(gè)隊(duì)列或交換機(jī)(無(wú)論是不是 durable),依賴它的綁定都會(huì)自動(dòng)刪除。
在聲明一個(gè)隊(duì)列的同時(shí),server會(huì)默認(rèn)讓此隊(duì)列綁定在默認(rèn)的exchange上,這個(gè)exchange的名稱為空。
?
?五. ?發(fā)布訂閱
我們上一章的demo中實(shí)際上已經(jīng)使用了發(fā)布訂閱模式。
RabbitMQ消息模型的核心理念是:發(fā)布者(producer)不會(huì)直接發(fā)送任何消息給隊(duì)列。事實(shí)上,發(fā)布者(producer)甚至不知道消息是否已經(jīng)被投遞到隊(duì)列。發(fā)布者(producer)只需要把消息發(fā)送給一個(gè)exchange。exchange非常簡(jiǎn)單,它一邊從發(fā)布者方接收消息,一邊把消息推入隊(duì)列。exchange必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊(duì)列還是是多個(gè)隊(duì)列,或者是直接忽略消息。這些規(guī)則是通過(guò)exchange type來(lái)定義的。
發(fā)布訂閱其實(shí)很簡(jiǎn)單,例如上章我所示例,假設(shè)我們一開(kāi)始沒(méi)有任何消息,現(xiàn)在有一個(gè)生產(chǎn)者P1,他是一個(gè)天氣預(yù)報(bào)播放者。然后我們有2個(gè)消費(fèi)者來(lái)訂閱他的消息。
P1通過(guò)廣播類型的交換機(jī)fEx來(lái)發(fā)布他的天氣消息,c1,c2分別建立一個(gè)隊(duì)列為Q1,Q2. 并且訂閱P1的fEx.
基本可以如圖所示
我們P1利用fEx生成一條消息的時(shí)候,c1,c2通過(guò)Q1,Q2都可以獲取到p1所發(fā)布的消息
我們發(fā)布3條消息
查看隊(duì)列情況
Q1:
Q2:
Q1,Q2都拿到了廣播的消息,至于C1,C2如何消費(fèi)這些消息,互相之間完全沒(méi)有干擾。
ps:簡(jiǎn)單一句話 發(fā)布訂閱中發(fā)布者所產(chǎn)生的消息通過(guò)exchange對(duì)所有綁定他的隊(duì)列隊(duì)形消息推送,每個(gè)隊(duì)列獲取綁定所對(duì)應(yīng)的消息
六. ?WorkQueue (可用于消費(fèi)者集群)
區(qū)分于發(fā)布訂閱,消費(fèi)者集群主要解決橫向服務(wù)器擴(kuò)展問(wèn)題,如果一個(gè)隊(duì)列積壓太多,如何均與的讓不同的消費(fèi)者來(lái)承擔(dān)。
默認(rèn)來(lái)說(shuō),RabbitMQ會(huì)按順序得把消息發(fā)送給每個(gè)消費(fèi)者(consumer)。平均每個(gè)消費(fèi)者都會(huì)收到同等數(shù)量得消息。這種發(fā)送消息得方式叫做——輪詢(round-robin)。
我們開(kāi)3個(gè)程序,1個(gè)生產(chǎn) 2個(gè)消費(fèi)。
如圖所示綁定關(guān)系如下
2個(gè)消費(fèi)者用同樣的程序,這里記錄進(jìn)程pid以區(qū)分,實(shí)際項(xiàng)目中可以用不同服務(wù)器來(lái)區(qū)分
啟動(dòng)消息消費(fèi),使消費(fèi)者處理work狀態(tài)
然后我們不停的通過(guò)生產(chǎn)者這發(fā)布消息
然后我們看下2個(gè)消費(fèi)者的消費(fèi)情況
1.
2.
3.
4.
5.
默認(rèn)地,RabbitMQ會(huì)逐一地向下一個(gè)Consumer發(fā)放消息,每一個(gè)Consumer會(huì)得到數(shù)目相同的消息。文中所示之所以是按照1條一條的輪詢,是因?yàn)槌绦蛑锌刂屏艘粋€(gè)隊(duì)列單次消費(fèi)的數(shù)量。
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
API CommandLine 以及其他功能
RabbitMQ API
RabbitMQ Server提供了豐富的http api。
舉個(gè)列子
需要HTTP基本身份驗(yàn)證。默認(rèn)的用戶名/密碼為guest/guest。
這些返回值得意義我從官網(wǎng)搬來(lái)解釋,為了避免翻譯的問(wèn)題導(dǎo)致大家理解的誤差這里直接給出原文
cluster_name | The name of the entire cluster, as set with?rabbitmqctl set_cluster_name. |
erlang_full_version | A string with extended detail about the Erlang VM and how it was compiled, for the node connected to. |
erlang_version | A string with the Erlang version of the node connected to. As clusters should all run the same version this can be taken as representing the cluster. |
exchange_types | A list of all exchange types available. |
listeners | All (non-HTTP) network listeners for all nodes in the cluster. (See?contexts?in?/api/nodes?for HTTP). |
management_version | Version of the management plugin in use. |
message_stats | A message_stats object for everything the user can see - for all vhosts regardless of permissions in the case of?monitoring?and?administrator?users, and for all vhosts the user has access to for other users. |
node | The name of the cluster node this management plugin instance is running on. |
object_totals | An object containing global counts of all connections, channels, exchanges, queues and consumers, subject to the same visibility rules as for?message_stats. |
queue_totals | An object containing sums of the?messages,?messages_ready?and?messages_unacknowledged?fields for all queues, again subject to the same visibility rules as for?message_stats. |
rabbitmq_version | Version of RabbitMQ on the node which processed this request. |
statistics_db_node | Name of the cluster node hosting the management statistics database. |
statistics_level | Whether the node is running fine or coarse statistics. |
又或者通過(guò)api查詢虛擬主機(jī)
許多api的URI需要一個(gè)虛擬主機(jī)路徑的一部分的名字,因?yàn)槊种挥形ㄒ辉谝粋€(gè)虛擬主機(jī)識(shí)別物體。作為默認(rèn)的虛擬主機(jī)稱為“/”,這??將需要被編碼為“%2F”。
在我的demo程序中對(duì)應(yīng)的api功能可以通過(guò)這里的功能來(lái)實(shí)現(xiàn)
其更豐富的功能可以參考官網(wǎng)說(shuō)明文檔?http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html
以及?http://hg.rabbitmq.com/rabbitmq-management/raw-file/rabbitmq_v3_3_5/priv/www/api/index.html
一般來(lái)說(shuō)我們常用的我在應(yīng)用程序中已經(jīng)給出?例如查看所有隊(duì)列等
?RabbitMQ CommandLine
除了豐富的http api,rabbitmq server自然也有其很全面命令行。
例如查詢所有exchange。
查詢所有隊(duì)列以及他們包含的消息數(shù)目
rabbitmqctl更多的命令說(shuō)明參考?http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Message的BasicGet于consume的區(qū)別
? consume的功能上一張介紹過(guò),basicget更偏向于我們平時(shí)用過(guò)的其他類型的MessageQueue,它就是最基本的接受消息,consume的消費(fèi)針對(duì)basicget來(lái)說(shuō)屬于一個(gè)長(zhǎng)連接于短連接的區(qū)別。
消費(fèi)者關(guān)系一旦確定,基本上默認(rèn)它就是在偵聽(tīng)通道的消息是否在生產(chǎn)。而basicget則是由客戶端手動(dòng)來(lái)控制。
在demo中在下圖所示處區(qū)分
如果你選擇了消費(fèi)消息,那么基本上代碼層面是這樣來(lái)完成的
| 1 2 3 4 5 6 7 8 9 | var?consumer =?new?QueueingBasicConsumer(channel); channel.BasicQos(0, 1,?false); channel.BasicConsume(queue.name, rbAckTrue.Checked, consumer); while?(true) { ????var?e = consumer.Queue.Dequeue(); ????MessageBox.Show(string.Format("隊(duì)列{0}獲取消息{1},線程id為{2}", queue.name, Encoding.ASCII.GetString(e.Body), Process.GetCurrentProcess().Id)); ????Thread.Sleep(1000); } |
本篇先到此,希望對(duì)大家有幫助
轉(zhuǎn)載于:https://www.cnblogs.com/zwb7926/p/6029196.html
總結(jié)
以上是生活随笔為你收集整理的一个winform带你玩转rabbitMQ的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 女鞋多少钱啊?
- 下一篇: Java缓存学习之五:spring 对缓