掌握Rabbitmq几个重要概念,从一条消息说起
RabbitMQ 是功能強(qiáng)大的開源消息代理。根據(jù)官網(wǎng)稱:也是使用量最廣泛的消息隊(duì)列。就像他的口號(hào)“Messaging that just works”,開箱即用使用簡(jiǎn)單,支持多種消息傳輸協(xié)議(AMQP、STOMP、MQTT)。
一個(gè)應(yīng)用程序或者服務(wù)如何使用RabbitMq呢?
首先會(huì)有生產(chǎn)者和消費(fèi)者兩個(gè)角色;生產(chǎn)者連接到rabbit代理服務(wù),創(chuàng)建一條AMQP信道,然后把生成的消息,通過(guò)信道發(fā)布到交換器上,交換器根據(jù)路由規(guī)則(路由key)進(jìn)行綁定到或者路由到隊(duì)列上面。最后消息到達(dá)隊(duì)列上中。消費(fèi)者跟生產(chǎn)者一樣需要先和rabbit代理服務(wù)器創(chuàng)建連接,同時(shí)創(chuàng)建一個(gè)消息管道,并訂閱到隊(duì)列上,進(jìn)而從隊(duì)列中獲取消息,進(jìn)行處理。這里面涉及到消息、交換器、綁定、隊(duì)列幾個(gè)重要的概念,下面會(huì)一一講解。整個(gè)過(guò)程如圖所示
消息
生產(chǎn)者創(chuàng)建消息,這里的消息是指?消息包含兩個(gè)部分內(nèi)容:有效載荷(payload)、標(biāo)簽(label)。有效載荷就是你想要傳輸?shù)臄?shù)據(jù)。而標(biāo)簽是描述了有效載荷,并且RabbitMQ用它來(lái)決定誰(shuí)將獲得消息的拷貝。其實(shí)通過(guò)上圖你也會(huì)發(fā)現(xiàn),不同于tcp協(xié)議,因?yàn)锳MQP沒有明確的接收方,只會(huì)用標(biāo)簽表述這條消息,然后把消息交給Rabbit。rabbit會(huì)根據(jù)標(biāo)簽把消息發(fā)送給感興趣的接收方。
隊(duì)列
消息最終到達(dá)隊(duì)列中并等待消費(fèi)。消費(fèi)者通過(guò)AMQP的Basic.Consume命令訂閱。這樣做會(huì)將信道設(shè)置為接受模式,直到取消對(duì)隊(duì)列的訂閱為止。訂閱之后,消費(fèi)者在消費(fèi)(或者拒絕)最近的接收的那條消息之后,就能從隊(duì)列中自動(dòng)的接收下一條消息。
注意:什么時(shí)候消息才會(huì)從隊(duì)列中刪除呢?這里涉及到一個(gè)消息確認(rèn)的動(dòng)作。消費(fèi)者接收到的每一條消息都必須進(jìn)行確認(rèn)。才會(huì)從隊(duì)列中刪除。消費(fèi)者可以通過(guò)AMQP的Basic.Ack命令顯式地向rabbtmq發(fā)送一個(gè)確認(rèn),或者在訂閱到隊(duì)列的時(shí)候就將autoAck屬性設(shè)置為true;如:autoAck: true,一旦消費(fèi)者接收消息,rabbitmq會(huì)自動(dòng)視其確認(rèn)了消息。
如果消費(fèi)者接收到消費(fèi)1,然后在確認(rèn)之前從rabbit斷開連接,rabbitmq會(huì)認(rèn)為這條消息沒有分發(fā),然后重新分發(fā)下一個(gè)訂閱的消費(fèi)者。這樣做的好處,即使你的應(yīng)用程序奔潰了,也可以確保消息會(huì)被發(fā)送給另一個(gè)消費(fèi)者進(jìn)行處理,或者等待你的程序恢復(fù)正常連接,繼續(xù)消費(fèi)。假設(shè)消費(fèi)者A程序與rabbit斷開了連接,消息進(jìn)而會(huì)被消費(fèi)者B進(jìn)行消費(fèi)處理。如下圖
只要消費(fèi)者不進(jìn)行確認(rèn),rabbit將不會(huì)給該消費(fèi)者發(fā)送消息,因?yàn)樵谏弦粭l消息被確認(rèn)之前,rabbit會(huì)認(rèn)為這個(gè)消費(fèi)者并沒有準(zhǔn)備好接收下一條消息的能力。
在沒有辦法正常確認(rèn)消息,不能一直堵塞呀,比如消費(fèi)者有bug。那就使用AMPQ的Basic.Reject命令;明確的拒絕這條消息,其中一個(gè)參數(shù)requeue如果設(shè)置了ture的話,Rabbit會(huì)把消息重新發(fā)給下一個(gè)訂閱的消費(fèi)者。
如果你檢測(cè)到一條消息本身有錯(cuò)誤而任何一個(gè)消費(fèi)者都無(wú)法處理的時(shí)候,就可以把requeue設(shè)置為false,rabbitmq會(huì)把消息從隊(duì)里中移除,而不會(huì)把他發(fā)送給新的消費(fèi)者。
注意:這里你可以使用對(duì)拒絕的消息進(jìn)行特殊處理,比如發(fā)送到死信隊(duì)列或者專門收集的erro隊(duì)里中。
小結(jié):隊(duì)列是amqp消息通信的基礎(chǔ)模塊
1.為消息提供的處所,消息在此等待消費(fèi)
2.對(duì)負(fù)載均衡來(lái)說(shuō),隊(duì)列是絕佳方案。只需附加一堆消費(fèi)者,并讓rabbitmq以循環(huán)的方式均勻地分配發(fā)來(lái)的消息。
3.隊(duì)列是rabbit中消息的最后的終點(diǎn)。
交換器、綁定
我們知道消費(fèi)者如何獲取消息,那么現(xiàn)在的問(wèn)題是,消息是如何到達(dá)隊(duì)列的呢?消息發(fā)送到交換器,會(huì)根據(jù)確定的規(guī)則,RabbitMQ將會(huì)決定消息該投遞到哪個(gè)隊(duì)列。這些規(guī)則稱為路由鍵(routing key)。隊(duì)列通過(guò)路由鍵綁定到交換器。當(dāng)你發(fā)送消息到代理服務(wù)器時(shí),消息將擁有一個(gè)路由鍵。如:AMPQ的Basic.Publish方法,有個(gè)參數(shù)routingKey通過(guò)他指定。即便是空的,RabbitMQ也會(huì)將其和綁定使用的路由鍵進(jìn)行匹配。
交換器有四種類型:direct、fanout、topic和headers;每種類型實(shí)現(xiàn)了不同的路由算法,前三個(gè)比較常用。
1.direct
這種模式非常簡(jiǎn)單:路由鍵匹配的話,消息就被投遞到對(duì)應(yīng)的隊(duì)列。路由算法-使用路由鍵和隊(duì)列名稱同名進(jìn)行路由消息。使用場(chǎng)景-直接把消息發(fā)送到指定隊(duì)列時(shí)使用。
默認(rèn)的direct交換器,不需要進(jìn)行聲明, 隊(duì)列聲明會(huì)自動(dòng)綁定到默認(rèn)的交換器上,并以隊(duì)列名稱作為路右鍵。使用以下代碼發(fā)送消息申明的隊(duì)列中。
channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: null,body: body);2.fanout交換器
這種模式下,可以忽略routing key,唯一需要做的就是為新的消費(fèi)者寫一段代碼,然后聲明新的隊(duì)列并將其綁定到fanout交換器上。當(dāng)你發(fā)送一條消息到fanout交換器上,他會(huì)把消息投遞給所有附加在此交換器的隊(duì)里上。路由算法-消息會(huì)路由到綁定到交換器上的所有隊(duì)列。使用場(chǎng)景-發(fā)布訂閱的廣播功能
2.topic交換器
這類交換器允許不同源頭的消息到達(dá)同一個(gè)隊(duì)列。路由算法-根據(jù)全部或部分路由鍵匹配將消息路由綁定的隊(duì)列上。使用場(chǎng)景-根據(jù)某些條件廣播到特定的隊(duì)列上。
小結(jié):
本文主要總結(jié)了 AMQP幾個(gè)主要元素:交換器,綁定,隊(duì)列。以及一個(gè)消息創(chuàng)建到消費(fèi)者讀取消費(fèi)的過(guò)程。?
總結(jié)
以上是生活随笔為你收集整理的掌握Rabbitmq几个重要概念,从一条消息说起的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 大揭秘| 我司项目组Gitlab Flo
- 下一篇: 虚虚实实,亦假亦真的 ValueTupl