日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMq--1

發布時間:2024/9/15 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMq--1 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RabbitMQ是什么

定義

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

AMPQ

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

它可以使對應的客戶端(client)與對應的消息中間件(broker)進行交互。消息中間件從發布者(publisher)那里收到消息(發布消息的應用,也稱為producer),然后將他們轉發給消費者(consumers,處理消息的應用)。由于AMQP是一個網絡協議,所以發布者、消費者以及消息中間件可以部署到不同的物理機器上面。

?

RabbitMQ為何會出現

或者說AMPQ為何會出現,它的應用場景又是什么?

解決什么問題

你是否遇到過兩個(多個)系統間需要通過定時任務來同步某些數據?你是否在為異構系統的不同進程間相互調用、通訊的問題而苦惱、掙扎?

在Web應用高并發環境下,由于來不及同步處理,請求往往會發生堵塞。比如說,大量的insert、update請求同時到達mysql,會帶來無數的行鎖表鎖,最后導致請求數過多,觸發too many connections錯誤。

消息服務擅長于解決多系統、異構系統間的數據交換(消息通知/通訊)問題,你也可以把它用于系統間服務的相互調用(RPC)通過使用消息隊列,我們可以異步處理請求,從而緩解系統的壓力。

應用場景

對于一個大型的軟件系統來說,它會有很多的組件或者說模塊或者說子系統或者(Subsystem or Component or Submodule)。那么這些模塊的如何通信?這和傳統的IPC有很大的區別。傳統的IPC很多都是在單一系統上的,模塊耦合性很大,不適合擴展(Scalability);如果使用socket那么不同的模塊的確可以部署到不同的機器上,但是還是有很多問題需要解決。比如:

1.信息的發送者和接收者如何維持這個連接,如果一方的連接中斷,這期間的數據如何方式丟失?

2.如何降低發送者和接收者的耦合度?

3.如何讓Priority高的接收者先接到數據?

4.如何做到Load balance?有效均衡接收者的負載?

5.如何有效的將數據發送到相關的接收者?也就是說將接收者subscribe 不同的數據,如何做有效的filter。

6.如何做到可擴展,甚至將這個通信模塊發到cluster上?

7.如何保證接收者接收到了完整,正確的數據?

AMDQ協議解決了以上的問題,而RabbitMQ實現了AMQP。

RabbitMQ基礎概念

應用場景架構

RabbitMQ Server:也叫broker server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQ isn’t a food truck, it’s a delivery service. 他的角色就是維護一條從Producer到Consumer的路線,保證數據能夠按照指定的方式進行傳輸。但是這個保證也不是100%的保證,但是對于普通的應用來說這已經足夠了。當然對于商業系統來說,可以再做一層數據一致性的guard,就可以徹底保證系統的一致性了。

Client A & B:也叫Producer,數據的發送方。Create messages and Publish (Send) them to a broker server (RabbitMQ)。一個Message有兩個部分:Payload(有效載荷)和Label(標簽)。Payload顧名思義就是傳輸的數據,Label是Exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。

Client 1,2,3:也叫Consumer,數據的接收方。Consumers attach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱后,RabbitMQ把它發送給它的某個訂閱者即Consumer。當然可能會把同一個Message發送給很多的Consumer。在這個Message中,只有payload,label已經被刪掉了。對于Consumer來說,它是不知道誰發送的這個信息的。就是協議本身不支持。但是當然了如果Producer發送的payload包含了Producer的信息就另當別論了。

對于一個數據從Producer到Consumer的正確傳遞,還有三個概念需要明確:exchanges, queues and bindings。

Exchanges are where producers publish their messages. 消息交換機,它指定消息按什么規則,路由到哪個隊列

Queues are where the messages end up and are received by consumers. 消息隊列載體,每個消息都會被投入到一個或多個隊列

Bindings are how the messages get routed from the exchange to particular queues. 綁定,它的作用就是把exchange和queue按照路由規則綁定起來

Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞

還有幾個概念是上述圖中沒有標明的,那就是Connection(連接),Channel(通道,頻道),Vhost(虛擬主機)。

Connection:就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個TCP連接。

Channel:虛擬連接。它建立在上述的TCP連接中。數據流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。

Vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。每個virtual host本質上都是一個RabbitMQ Server,擁有它自己的queue,exchagne,和bings rule等等。這保證了你可以在多個不同的application中使用RabbitMQ。

Channel的選擇

那么,為什么使用Channel,而不是直接使用TCP連接?

對于OS來說,建立和關閉TCP連接是有代價的,頻繁的建立關閉TCP連接對于系統的性能有很大的影響,而且TCP的連接數也有限制,這也限制了系統處理高并發的能力。但是,在TCP連接中建立Channel是沒有上述代價的。對于Producer或者Consumer來說,可以并發的使用多個Channel進行Publish或者Receive。

有實驗表明,1s的數據可以Publish10K的數據包。當然對于不同的硬件環境,不同的數據包大小這個數據肯定不一樣,但是我只想說明,對于普通的Consumer或者Producer來說,這已經足夠了。如果不夠用,你考慮的應該是如何細化split你的設計。

消息隊列執行過程

1.客戶端連接到消息隊列服務器,打開一個Channel。

2.客戶端聲明一個Exchange,并設置相關屬性。

3.客戶端聲明一個Queue,并設置相關屬性。

4.客戶端使用Routing key,在Exchange和Queue之間建立好綁定關系。

5.客戶端投遞消息到Exchange。

Exchange接收到消息后,就根據消息的key和已經設置的Binding,進行消息路由,將消息投遞到一個或多個隊列里。有三種常用類型的Exchanges:direct,fanout,topic,每個實現了不同的路由算法(routing algorithm):

Direct exchange:完全根據key進行投遞的叫做Direct交換機。如果Routing key匹配, 那么Message就會被傳遞到相應的queue中。其實在queue創建時,它會自動的以queue的名字作為routing key來綁定那個exchange。例如,綁定時設置了Routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。

Fanout exchange:不需要key的叫做Fanout交換機。它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。

Topic exchange:對key進行模式匹配后進行投遞的叫做Topic交換機。比如符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”,不能匹配“abc.def.ina”。

更多消息隊列相關設計介紹請參考:

RabbitMQ系列二(構建消息隊列)

RabbitMQ系列三 (深入消息隊列)

消息隊列的創建

Consumer和Procuder都可以通過 queue.declare 創建queue。對于某個Channel來說,Consumer不能declare一個queue,卻訂閱其他的queue。當然也可以創建私有的queue。這樣只有app本身才可以使用這個queue。queue也可以自動刪除,被標為auto-delete的queue在最后一個Consumer unsubscribe后就會被自動刪除。那么如果是創建一個已經存在的queue呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創建如果參數和第一次不一樣,那么該操作雖然成功,但是queue的屬性并不會被修改。

那么誰應該負責創建這個queue呢?是Consumer,還是Producer?

如果queue不存在,當然Consumer不會得到任何的Message。但是如果queue不存在,那么Producer Publish的Message會被丟棄。所以,還是為了數據不丟失,Consumer和Producer都try to create the queue!反正不管怎么樣,這個接口都不會出問題。

Queue對load balance的處理是完美的。對于多個Consumer來說,RabbitMQ 使用循環的方式(round-robin)的方式均衡的發送給不同的Consumer。

消息的ack機制

默認情況下,如果Message 已經被某個Consumer正確的接收到了,那么該Message就會被從queue中移除。當然也可以讓同一個Message發送到很多的Consumer。

如果一個queue沒被任何的Consumer Subscribe(訂閱),那么,如果這個queue有數據到達,那么這個數據會被cache,不會被丟棄。當有Consumer時,這個數據會被立即發送到這個Consumer,這個數據被Consumer正確收到時,這個數據就被從queue中刪除。

那么什么是正確收到呢?通過ack。

每個Message都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack(Consumer的basic.ack),也可以自動的ack(訂閱Queue時指定auto_ack為true)。

如果有數據沒有被ack,那么RabbitMQ Server會把這個信息發送到下一個Consumer。

如果這個app有bug,忘記了ack,那么RabbitMQ Server不會再發送數據給它,因為Server認為這個Consumer處理能力有限。

而且ack的機制可以起到限流的作用(Benefit to throttling):在Consumer處理完成數據后發送ack,甚至在額外的延時后發送ack,將有效的balance Consumer的load。

當然對于實際的例子,比如我們可能會對某些數據進行merge,比如merge 4s內的數據,然后sleep 4s后再獲取數據。特別是在監聽系統的state,我們不希望所有的state實時的傳遞上去,而是希望有一定的延時。這樣可以減少某些IO,而且終端用戶也不會感覺到。

沒有正確響應呢?

如果Consumer接收了一個消息就還沒有發送ack就與RabbitMQ斷開了,RabbitMQ會認為這條消息沒有投遞成功會重新投遞到別的Consumer。

如果Consumer本身邏輯有問題沒有發送ack的處理,RabbitMQ不會再向該Consumer發送消息。RabbitMQ會認為這個Consumer還沒有處理完上一條消息,沒有能力繼續接收新消息。

我們可以善加利用這一機制,如果需要處理過程是相當復雜的,應用程序可以延遲發送ack直到處理完成為止。這可以有效控制應用程序這邊的負載,不致于被大量消息沖擊。

消息拒絕

由于要拒絕消息,所以ack響應消息還沒有發出,這里拒絕消息可以有兩種選擇:

Consumer直接斷開RabbitMQ這樣RabbitMQ將把這條消息重新排隊,交由其它Consumer處理。這個方法在RabbitMQ各版本都支持,這樣做的壞處就是連接斷開增加了RabbitMQ的額外負擔,特別是consumer出現異常每條消息都無法正常處理的時候。

RabbitMQ 2.0.0可以使用 basic.reject 命令,收到該命令RabbitMQ會重新投遞到其它的Consumer。如果設置requeue為false,RabbitMQ會直接將消息從queue中移除。

其實還有一種選擇就是直接忽略這條消息并發送ACK,當你明確知道這條消息是異常的不會有Consumer能處理,可以這樣做拋棄異常數據。

為什么要發送basic.reject消息而不是ACK?RabbitMQ后面的版本可能會引入”dead letter”隊列,如果想利用dead letter做點文章就使用basic.reject并設置requeue為false。

消息持久化

RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:

1.Exchange持久化,在聲明時指定durable => 1

2.Queue持久化,在聲明時指定durable => 1

3.消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

若Exchange和Queue都是持久化的,那么它們之間的Binding也是持久化的;而Exchange和Queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

Consumer從durable queue中取回一條消息之后并發回了ack消息,RabbitMQ就會將其標記,方便后續垃圾回收。如果一條持久化的消息沒有被consumer取走,RabbitMQ重啟之后會自動重建exchange和queue(以及bingding關系),消息通過持久化日志重建再次進入對應的queues,exchanges。

RabbitMQ集群

由于RabbitMQ是用erlang開發的,RabbitMQ完全依賴erlang的Cluster,因為erlang天生就是一門分布式語言,集群非常方便,但其本身并不支持負載均衡。Erlang的集群中各節點是經由過程一個magic cookie來實現的,這個cookie存放在 $home/.erlang.cookie中(像我的root用戶安裝的就是放在我的root/.erlang.cookie中),文件是400的權限。所以必須保證各節點cookie內容一致,不然節點之間就無法通信。

集群方式

Rabbitmq集群大概分為二種方式:

1.普通模式:默認的集群模式。

對于Queue來說,消息實體只存在于其中一個節點,A、B兩個節點僅有相同的元數據,即隊列結構,但隊列的元數據僅保存有一份,即創建該隊列的rabbitmq節點(A節點),當A節點宕機,你可以去其B節點查看,./rabbitmqctl list_queues 發現該隊列已經丟失,但聲明的exchange還存在。

當消息進入A節點的Queue中后,consumer從B節點拉取時,RabbitMQ會臨時在A、B間進行消息傳輸,把A中的消息實體取出并經過B發送給consumer,所以consumer應平均連接每一個節點,從中取消息。

該模式存在一個問題就是當A節點故障后,B節點無法取到A節點中還未消費的消息實體。如果做了隊列持久化或消息持久化,那么得等A節點恢復,然后才可被消費,并且在A節點恢復之前其它節點不能再創建A節點已經創建過的持久隊列;如果沒有持久化的話,消息就會失丟。

這種模式更適合非持久化隊列,只有該隊列是非持久的,客戶端才能重新連接到集群里的其他節點,并重新創建隊列。假如該隊列是持久化的,那么唯一辦法是將故障節點恢復起來。

2.鏡像模式:把需要的隊列做成鏡像隊列,存在于多個節點。

該模式解決了普通模式的問題,其實質不同之處在于,消息實體會主動在鏡像節點間同步,而不是在consumer取數據時臨時拉取。

該模式帶來的副作用也很明顯,除了降低系統性能外,如果鏡像隊列數量過多,加之大量的消息進入,集群內部的網絡帶寬將會被這種同步通訊大大消耗掉。

所以在對可靠性要求較高的場合中適用,一個隊列想做成鏡像隊列,需要先設置policy,然后客戶端創建隊列的時候,rabbitmq集群根據“隊列名稱”自動設置是普通集群模式或鏡像隊列。具體如下:

隊列通過策略來使能鏡像。策略能在任何時刻改變,rabbitmq隊列也近可能的將隊列隨著策略變化而變化;非鏡像隊列和鏡像隊列之間是有區別的,前者缺乏額外的鏡像基礎設施,沒有任何slave,因此會運行得更快。

為了使隊列稱為鏡像隊列,你將會創建一個策略來匹配隊列,設置策略有兩個鍵“ha-mode和 ha-params(可選)”。ha-params根據ha-mode設置不同的值,下面表格說明這些key的選項。

為什么RabbitMQ不將隊列復制到集群里每個節點呢?這與它的集群的設計本意相沖突,集群的設計目的就是增加更多節點時,能線性的增加性能(CPU、內存)和容量(內存、磁盤)。理由如下:

Storage Space: If every cluster node had a full copy of every queue, adding nodes wouldn’t give you more storage capacity. For example, if one node could store 1GB of messages, adding two more nodes would simply give you two more copies of the same 1GB of messages.(存儲空間:如果每個集群節點每個隊列的一個完整副本,增加節點需要更多的存儲容量。例如,如果一個節點可以存儲1 gb的消息,添加兩個節點需要兩份相同的1gb的消息)

Performance: Publishing messages would require replicating those messages to every cluster node. For durable messages that would require triggering disk activity on all nodes for every message. Your network and disk load would increase every time you added a node, keeping the performance of the cluster the same (or possibly worse).(性能:發布消息需要將這些信息復制到每個集群節點。對持久消息,要求為每條消息觸發磁盤活動在所有節點上。每次添加一個節點都會帶來 網絡和磁盤的負載。)

當然RabbitMQ新版本集群也支持隊列復制(有個選項可以配置)。比如在有五個節點的集群里,可以指定某個隊列的內容在2個節點上進行存儲,從而在性能與高可用性之間取得一個平衡(應該就是指鏡像模式)。

集群節點

RabbitMQ的集群節點包括內存節點、磁盤節點。顧名思義內存節點就是將所有數據放在內存,磁盤節點將數據放在磁盤。不過,如果在投遞消息時,打開了消息的持久化,那么即使是內存節點,數據還是安全的放在磁盤。

一個rabbitmq集 群中可以共享 user,vhost,queue,exchange等,所有的數據和狀態都是必須在所有節點上復制的,一個例外是,那些當前只屬于創建它的節點的消息隊列,盡管它們可見且可被所有節點讀取。rabbitmq節點可以動態的加入到集群中,一個節點它可以加入到集群中,也可以從集群環集群會進行一個基本的負載均衡。

集群中有兩種節點:

1.內存節點:只保存狀態到內存(一個例外的情況是:持久的queue的持久內容將被保存到disk)

2.磁盤節點:保存狀態到內存和磁盤。

內存節點雖然不寫入磁盤,但是它執行比磁盤節點要好。集群中,只需要一個磁盤節點來保存狀態 就足夠了如果集群中只有內存節點,那么不能停止它們,否則所有的狀態,消息等都會丟失。

總結

以上是生活随笔為你收集整理的RabbitMq--1的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。