日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

详解RocketMQ中的Producer

發(fā)布時(shí)間:2024/1/1 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 详解RocketMQ中的Producer 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

上一篇博客講解了如何安裝RocketMQ,并且也簡單的介紹了一下相關(guān)RocketMq的概念,那么這篇博客,來剖析一下MQ中的producer的角色,看看它是來干什么的?


?


?上圖就是MQ中Producer的有關(guān)結(jié)構(gòu)圖,下面來著重分析一下每個(gè)類的用途


?1.MQAdmin:作為MQ應(yīng)用層最底層的類,為我們提供了所有公共的方法,常用的有如下

?根據(jù)key、主題名和隊(duì)列來創(chuàng)建Topic


?void createTopic(final String key, final String newTopic, final int queueNum)?throws MQClientException;

?查詢消息隊(duì)列中的偏移量

?long maxOffset(final MessageQueue mq) throws MQClientException;

?根據(jù)各種條件來查詢Message信息

?QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?final long end) throws MQClientException, InterruptedException;



?2.MQProducer:用來發(fā)送生產(chǎn)者中的消息,包含了start和shutdown以及各種send方法,其中send方法返回值為sendResult,里面包含著SendStatus也就是發(fā)送的狀態(tài)。send 消息方法,只要不拋異常,就代表發(fā)送成功。但是發(fā)送成功會(huì)有多個(gè)狀態(tài),在 sendResult 里定義。?


? ?SEND_OK?
消息發(fā)送成功?
? ?FLUSH_DISK_TIMEOUT?
消息發(fā)送成功,但是服務(wù)器刷盤超時(shí),消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有此時(shí)服務(wù)器宕機(jī),消息才會(huì)丟失?
? ?FLUSH_SLAVE_TIMEOUT?
消息發(fā)送成功,但是服務(wù)器同步到 Slave 時(shí)超時(shí),消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有此時(shí)服務(wù)器宕機(jī),消
息才會(huì)丟失?
? ?SLAVE_NOT_AVAILABLE?
消息發(fā)送成功,但是此時(shí) slave 不可用,消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有此時(shí)服務(wù)器宕機(jī),消息才會(huì)丟


?3.ClientConfig:Client端公共的配置信息,例如心跳數(shù)、持久化的時(shí)間間隔等

?4.DefaultMQProducer:基礎(chǔ)的MQProducer,有一些基本的默認(rèn)設(shè)置,供我們使用。例如默認(rèn)的隊(duì)列數(shù)目、默認(rèn)的超時(shí)時(shí)間等


?

下面通過一個(gè)實(shí)例來了解一下Producer中常用的操作

?

[java]?view plaincopyprint?
  • <span?style="font-family:Comic?Sans?MS;font-size:18px;">/**??????
  • ?*?@FileName:?Producer.java????
  • ?*?@Package:com.test????
  • ?*?@Description:?TODO???
  • ?*?@author:?LUCKY?????
  • ?*?@date:2015年12月28日?下午2:32:22????
  • ?*?@version?V1.0??????
  • ?*/??
  • package?com.test;??
  • ??
  • import?java.util.List;??
  • ??
  • import?com.alibaba.rocketmq.client.producer.DefaultMQProducer;??
  • import?com.alibaba.rocketmq.client.producer.SendCallback;??
  • import?com.alibaba.rocketmq.client.producer.SendResult;??
  • import?com.alibaba.rocketmq.common.message.Message;??
  • import?com.alibaba.rocketmq.common.message.MessageQueue;??
  • ??
  • /**?
  • ?*?@ClassName:?Producer?
  • ?*?@Description:?模擬生產(chǎn)者?
  • ?*?@author:?LUCKY?
  • ?*?@date:2015年12月28日?下午2:32:22?
  • ?*/??
  • public?class?ProducerTest?{??
  • ????public?static?void?main(String[]?args)?throws?Exception?{??
  • ??
  • ????????DefaultMQProducer?producer?=?new?DefaultMQProducer("Producer");??
  • ????????//?必須要設(shè)置nameserver地址??
  • ????????producer.setNamesrvAddr("100.66.154.81:9876");??
  • ????????try?{??
  • //??????????producer.setClientIP("**");??
  • ????????????//設(shè)置實(shí)例名稱??
  • ????????????producer.setInstanceName("dd");??
  • ????????????//設(shè)置重試的次數(shù)??
  • ????????????producer.setRetryTimesWhenSendFailed(3);??
  • ????????????//開啟生產(chǎn)者??
  • ????????????producer.start();??
  • ????????????//創(chuàng)建一條消息??
  • ????????????Message?msg?=?new?Message("PushTopic",?"push",?"1",??
  • ????????????????????"內(nèi)容一".getBytes());??
  • ????????????//發(fā)送消息??
  • ????????????SendResult?result?=?producer.send(msg);??
  • ????????????//發(fā)送,并觸發(fā)回調(diào)函數(shù)??
  • ????????????producer.send(msg,?new?SendCallback()?{??
  • ??????????????????
  • ????????????????@Override??
  • ????????????????//成功的回調(diào)函數(shù)??
  • ????????????????public?void?onSuccess(SendResult?sendResult)?{??
  • ????????????????????System.out.println(sendResult.getSendStatus());??
  • ????????????????????System.out.println("成功了");??
  • ????????????????}??
  • ??????????????????
  • ????????????????@Override??
  • ????????????????//出現(xiàn)異常的回調(diào)函數(shù)??
  • ????????????????public?void?onException(Throwable?e)?{??
  • ????????????????System.out.println("失敗了"+e.getMessage());??
  • ??????????????????????
  • ????????????????}??
  • ????????????});??
  • ??????????
  • ??????????????
  • ????????????//獲取某個(gè)主題的消息隊(duì)列??
  • ????????????List<MessageQueue>?messageQueues?=?producer??
  • ????????????????????.fetchPublishMessageQueues("PushTopic");??
  • ????????????System.out.println(messageQueues.size());??
  • ??????????
  • ????????}?catch?(Exception?e)?{??
  • ????????????e.printStackTrace();??
  • ????????}?finally?{??
  • ??????????????
  • ?????????producer.shutdown();??
  • ????????}??
  • ????}??
  • ??
  • }??
  • </span> ?
  • 總結(jié)

    以上是生活随笔為你收集整理的详解RocketMQ中的Producer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。