日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rocketmq 消息指定_详解RocketMQ不同类型的消费者

發布時間:2025/4/16 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rocketmq 消息指定_详解RocketMQ不同类型的消费者 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原標題:詳解RocketMQ不同類型的消費者

云棲君導讀:本文節選自云棲社區系列叢書《RocketMQ原理與實戰解析》,作者:阿里巴巴數據專家楊開元。本節將重點講解RocketMQ不同類型的消費者。

根據使用者對讀取操作的控制情況,分為兩種類型。一個是DefaultMQPushConsumer,由系統控制讀取操作,收到消息后自動調用傳入的處理方法來處理;另一個是DefaultMQPullConsumer,讀取操作中的大部分功能由使用者自主控制。

1.1.1 DefaultMQPushConsumer的使用

使用DefaultMQPushConsumer主要是設置好各種參數和傳入處理消息的函數。系統收到消息后自動調用處理函數來處理消息,自動保存Offset,而且加入新的DefaultMQPushConsumer后會自動做負載均衡。下面結合org.apache.rocketmq.example.quickstart包中的源碼來介紹。

代碼清單1-1 DefaultMQPushConsumer示例

DefaultMQPushConsumer需要設置三個參數:一是這個Consumer的GroupName,二是NameServer的地址和端口號,三是Topic的名稱,下面詳細介紹。

Consumer的GroupName用于把多個Consumer組織到一起,提高并發處理能力,GroupName需要和消息模式(MessageModel)配合使用。

RocketMQ支持兩種消息模式:Clustering 和 Broadcasting。

在Clustering 模式下,同一個ConsumerGroup(GroupName相同)里的每個Consumer只消費所訂閱消息的一部分內容,同一個ConsumerGroup里所有的Consumer消費的內容合起來才是所訂閱Topic內容的整體,從而達到負載均衡的目的。

在Broadcasting模式下,同一個ConsumerGroup里的每個Consumer都能消費到所訂閱Topic的全部消息,也就是一個消息會被多次分發,被多個Consumer消費。

NameServer的地址和端口號,可以填寫多個,用分號隔開,達到消除單點故障的目的,比如 “ip1:port;ip2:port;ip3:port”。

Topic名稱用來標識消息類型,需要提前創建。如果不需要消費某個Topic下的所有消息,可以通過指定消息的Tag進行消息過濾,比如:Consumer.subscribe("TopicTest", "tag1 || tag2 || tag3"),表示這個Consumer要消費“TopicTest”下帶有tag1或tag2或tag3的消息(Tag是在發送消息時設置的標簽)。在填寫Tag參數的位置,用null或者“*”表示要消費這個Topic的所有消息。

1.1.2 DefaultMQPushConsumer的處理流程

本節通過分析源碼來說明DefaultMQPushConsumer的處理流程。

DefaultMQPushConsumer主要功能實現在DefaultMQPushConsumerImpl類中,消息的處理邏輯是在pullMessage這個函數里的PullCallBack中。在PullCallBack函數里有個switch語句,根據從Broker返回的消息類型做相應的處理,具體處理邏輯可以查看源碼。

代碼清單1-2 DefaultMQPushConsuer的處理邏輯

DefaultMQPushConsuer的源碼中有很多PullRequest語句,比如DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest),為什么“PushConsumer”中使用“PullRequest”呢?這是通過“長輪詢”方式達到Push效果的方法,長輪詢方式既有Pull的優點,又兼具Push方式的實時性。

Push方式是Server端接收到消息后,主動把消息推送給Client端,實時性高。對于一個提供隊列服務的Server來說,用Push方式主動推送有很多弊端;首先是加大Server端的工作量,進而影響Server的性能,其次Client的處理能力各不相同,Client的狀態不受Server控制,如果Client不能及時處理Server推送過來的消息,會造成各種潛在問題。

Pull方式是Client端循環地從Server端拉取消息,主動權在Client手里,自己拉取到一定量消息后,處理妥當了再接著取。Pull方式的問題是循環拉取消息的間隔不好設定,間隔太短就處在一個“忙等”的狀態,浪費資源;每個Pull的時間間隔太長,Server端有消息到來有可能沒有被及時處理。

“長輪詢”方式是通過Client端和Server端的配合,既擁有Pull的優點,又能達到保證實時性的目的。我們結合源碼來分析:

代碼清單1- 3 發送Pull消息代碼片段

源碼中有這一行設置語句

requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis),設置Broker最長阻塞時間,默認設置是15秒,注意是Broker在沒有新消息的時候才阻塞,有消息會立刻返回。

從Broker的源碼中可以看出,服務端接到新消息請求后,如果隊列里沒有新消息,并不急于返回,通過一個循環不斷查看狀態,每次 waitForRunning一段時候(默認是5秒),然后后再Check。默認情況下當Broker一直沒有新消息,第三次Check的時候,等待時間超過Request里面的 BrokerSuspendMaxTimeMillis,就返回空結果。在等待的過程中,Broker收到了新的消息后會直接調用notifyMessageArriving函數返回請求結果。“長輪詢”的核心是,Broker端HOLD住客戶端過來的請求一小段時間,在這個時間內有新消息到達,就利用現有的連接立刻返回消息給Consumer。“長輪詢”的主動權還是掌握在Consumer手中,Broker即使有大量消息積壓,也不會主動推送給Consumer。

長輪詢方式的局限性,是在HOLD住Consumer請求的時候需要占用資源,它適合用在消息隊列這種客戶端連接數可控的場景中。

1.1.3 DefaultMQPullConsumer

使用DefaultMQPullConsumer像使用DefaultMQPushConsumer一樣需要設置各種參數,寫處理消息的函數,同時還需要做額外的事情。接下來結合org.apache.rocketmq.example.simple包中的例子源碼來介紹。

示例代碼的處理邏輯是逐個讀取某Topic下所有Message Queue的內容,讀完一遍后退出,主要處理額外的三件事情:

(1) 獲取Message Queue并遍歷

一個Topic包括多個Message Queue,如果這個Consumer需要獲取Topic下所有的消息,就要遍歷多有的Message Queue。如果有特殊情況,也可以選擇某些特定的Message Queue來讀取消息。

(2) 維護Offsetstore

從一個Message Queue里拉取消息的時候,要傳入Offset參數(long類型的值),隨著不斷讀取消息,Offset會不斷增長。這個時候由用戶負責把Offset存儲下來,根據具體情況可以存到內存里、寫到磁盤或者數據庫里等。

(3) 根據不同的消息狀態做不同的處理

拉取消息的請求發出后,會返回:FOUND,NO_MATCHED_MSG,NO_NEW_MSG,OFFSET_ILLEGAL四種狀態,要根據每個狀態做不同的處理。比較重要的兩個狀態是FOUNT和NO_NEW_MSG,分別表示獲取到消息和沒有新的消息

實際情況中可以把while(true)放到外層,達到無限循環的目的。因為PullConsumer需要用戶自己處理遍歷Message Queue、保存Offset,所以PullConsumer有更多的自主性和靈活性。

----------------

責任編輯:

總結

以上是生活随笔為你收集整理的rocketmq 消息指定_详解RocketMQ不同类型的消费者的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。