日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

详解RocketMQ中的Producer

發布時間:2024/1/1 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 详解RocketMQ中的Producer 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

上一篇博客講解了如何安裝RocketMQ,并且也簡單的介紹了一下相關RocketMq的概念,那么這篇博客,來剖析一下MQ中的producer的角色,看看它是來干什么的?


?


?上圖就是MQ中Producer的有關結構圖,下面來著重分析一下每個類的用途


?1.MQAdmin:作為MQ應用層最底層的類,為我們提供了所有公共的方法,常用的有如下

?根據key、主題名和隊列來創建Topic


?void createTopic(final String key, final String newTopic, final int queueNum)?throws MQClientException;

?查詢消息隊列中的偏移量

?long maxOffset(final MessageQueue mq) throws MQClientException;

?根據各種條件來查詢Message信息

?QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?final long end) throws MQClientException, InterruptedException;



?2.MQProducer:用來發送生產者中的消息,包含了start和shutdown以及各種send方法,其中send方法返回值為sendResult,里面包含著SendStatus也就是發送的狀態。send 消息方法,只要不拋異常,就代表發送成功。但是發送成功會有多個狀態,在 sendResult 里定義。?


? ?SEND_OK?
消息發送成功?
? ?FLUSH_DISK_TIMEOUT?
消息發送成功,但是服務器刷盤超時,消息已經進入服務器隊列,只有此時服務器宕機,消息才會丟失?
? ?FLUSH_SLAVE_TIMEOUT?
消息發送成功,但是服務器同步到 Slave 時超時,消息已經進入服務器隊列,只有此時服務器宕機,消
息才會丟失?
? ?SLAVE_NOT_AVAILABLE?
消息發送成功,但是此時 slave 不可用,消息已經進入服務器隊列,只有此時服務器宕機,消息才會丟


?3.ClientConfig:Client端公共的配置信息,例如心跳數、持久化的時間間隔等

?4.DefaultMQProducer:基礎的MQProducer,有一些基本的默認設置,供我們使用。例如默認的隊列數目、默認的超時時間等


?

下面通過一個實例來了解一下Producer中常用的操作

?

[java]?view plaincopyprint?
  • <span?style="font-family:Comic?Sans?MS;font-size:18px;">/**??????
  • ?*?@FileName:?Producer.java????
  • ?*?@Package:com.test????
  • ?*?@Description:?TODO???
  • ?*?@author:?LUCKY?????
  • ?*?@date:2015年12月28日?下午2:32:22????
  • ?*?@version?V1.0??????
  • ?*/??
  • package?com.test;??
  • ??
  • import?java.util.List;??
  • ??
  • import?com.alibaba.rocketmq.client.producer.DefaultMQProducer;??
  • import?com.alibaba.rocketmq.client.producer.SendCallback;??
  • import?com.alibaba.rocketmq.client.producer.SendResult;??
  • import?com.alibaba.rocketmq.common.message.Message;??
  • import?com.alibaba.rocketmq.common.message.MessageQueue;??
  • ??
  • /**?
  • ?*?@ClassName:?Producer?
  • ?*?@Description:?模擬生產者?
  • ?*?@author:?LUCKY?
  • ?*?@date:2015年12月28日?下午2:32:22?
  • ?*/??
  • public?class?ProducerTest?{??
  • ????public?static?void?main(String[]?args)?throws?Exception?{??
  • ??
  • ????????DefaultMQProducer?producer?=?new?DefaultMQProducer("Producer");??
  • ????????//?必須要設置nameserver地址??
  • ????????producer.setNamesrvAddr("100.66.154.81:9876");??
  • ????????try?{??
  • //??????????producer.setClientIP("**");??
  • ????????????//設置實例名稱??
  • ????????????producer.setInstanceName("dd");??
  • ????????????//設置重試的次數??
  • ????????????producer.setRetryTimesWhenSendFailed(3);??
  • ????????????//開啟生產者??
  • ????????????producer.start();??
  • ????????????//創建一條消息??
  • ????????????Message?msg?=?new?Message("PushTopic",?"push",?"1",??
  • ????????????????????"內容一".getBytes());??
  • ????????????//發送消息??
  • ????????????SendResult?result?=?producer.send(msg);??
  • ????????????//發送,并觸發回調函數??
  • ????????????producer.send(msg,?new?SendCallback()?{??
  • ??????????????????
  • ????????????????@Override??
  • ????????????????//成功的回調函數??
  • ????????????????public?void?onSuccess(SendResult?sendResult)?{??
  • ????????????????????System.out.println(sendResult.getSendStatus());??
  • ????????????????????System.out.println("成功了");??
  • ????????????????}??
  • ??????????????????
  • ????????????????@Override??
  • ????????????????//出現異常的回調函數??
  • ????????????????public?void?onException(Throwable?e)?{??
  • ????????????????System.out.println("失敗了"+e.getMessage());??
  • ??????????????????????
  • ????????????????}??
  • ????????????});??
  • ??????????
  • ??????????????
  • ????????????//獲取某個主題的消息隊列??
  • ????????????List<MessageQueue>?messageQueues?=?producer??
  • ????????????????????.fetchPublishMessageQueues("PushTopic");??
  • ????????????System.out.println(messageQueues.size());??
  • ??????????
  • ????????}?catch?(Exception?e)?{??
  • ????????????e.printStackTrace();??
  • ????????}?finally?{??
  • ??????????????
  • ?????????producer.shutdown();??
  • ????????}??
  • ????}??
  • ??
  • }??
  • </span> ?
  • 總結

    以上是生活随笔為你收集整理的详解RocketMQ中的Producer的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。