生活随笔
收集整理的這篇文章主要介紹了
详解RocketMQ中的Producer
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
上一篇博客講解了如何安裝RocketMQ,并且也簡單的介紹了一下相關RocketMq的概念,那么這篇博客,來剖析一下MQ中的producer的角色,看看它是來干什么的?
?
?上圖就是MQ中Producer的有關結構圖,下面來著重分析一下每個類的用途
?1.MQAdmin:作為MQ應用層最底層的類,為我們提供了所有公共的方法,常用的有如下
?根據key、主題名和隊列來創建Topic
?void createTopic(final String key, final String newTopic, final int queueNum)?throws MQClientException;
?查詢消息隊列中的偏移量
?long maxOffset(final MessageQueue mq) throws MQClientException;
?根據各種條件來查詢Message信息
?QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?final long end) throws MQClientException, InterruptedException;
?2.MQProducer:用來發送生產者中的消息,包含了start和shutdown以及各種send方法,其中send方法返回值為sendResult,里面包含著SendStatus也就是發送的狀態。send 消息方法,只要不拋異常,就代表發送成功。但是發送成功會有多個狀態,在 sendResult 里定義。?
? ?SEND_OK?
消息發送成功?
? ?FLUSH_DISK_TIMEOUT?
消息發送成功,但是服務器刷盤超時,消息已經進入服務器隊列,只有此時服務器宕機,消息才會丟失?
? ?FLUSH_SLAVE_TIMEOUT?
消息發送成功,但是服務器同步到 Slave 時超時,消息已經進入服務器隊列,只有此時服務器宕機,消
息才會丟失?
? ?SLAVE_NOT_AVAILABLE?
消息發送成功,但是此時 slave 不可用,消息已經進入服務器隊列,只有此時服務器宕機,消息才會丟
?3.ClientConfig:Client端公共的配置信息,例如心跳數、持久化的時間間隔等
?4.DefaultMQProducer:基礎的MQProducer,有一些基本的默認設置,供我們使用。例如默認的隊列數目、默認的超時時間等
?
下面通過一個實例來了解一下Producer中常用的操作
?
[java]?view plaincopyprint?
<span?style="font-family:Comic?Sans?MS;font-size:18px;">?????????package?com.test;????import?java.util.List;????import?com.alibaba.rocketmq.client.producer.DefaultMQProducer;??import?com.alibaba.rocketmq.client.producer.SendCallback;??import?com.alibaba.rocketmq.client.producer.SendResult;??import?com.alibaba.rocketmq.common.message.Message;??import?com.alibaba.rocketmq.common.message.MessageQueue;???????????public?class?ProducerTest?{??????public?static?void?main(String[]?args)?throws?Exception?{????????????DefaultMQProducer?producer?=?new?DefaultMQProducer("Producer");????????????????????producer.setNamesrvAddr("100.66.154.81:9876");??????????try?{??????????????????????????????producer.setInstanceName("dd");????????????????????????????producer.setRetryTimesWhenSendFailed(3);????????????????????????????producer.start();????????????????????????????Message?msg?=?new?Message("PushTopic",?"push",?"1",??????????????????????"內容一".getBytes());????????????????????????????SendResult?result?=?producer.send(msg);????????????????????????????producer.send(msg,?new?SendCallback()?{????????????????????????????????????@Override????????????????????????????????????public?void?onSuccess(SendResult?sendResult)?{??????????????????????System.out.println(sendResult.getSendStatus());??????????????????????System.out.println("成功了");??????????????????}????????????????????????????????????@Override????????????????????????????????????public?void?onException(Throwable?e)?{??????????????????System.out.println("失敗了"+e.getMessage());????????????????????????????????????????}??????????????});????????????????????????????????????????????????????List<MessageQueue>?messageQueues?=?producer??????????????????????.fetchPublishMessageQueues("PushTopic");??????????????System.out.println(messageQueues.size());????????????????????}?catch?(Exception?e)?{??????????????e.printStackTrace();??????????}?finally?{?????????????????????????producer.shutdown();??????????}??????}????}??</span> ?
總結
以上是生活随笔為你收集整理的详解RocketMQ中的Producer的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。