生活随笔
收集整理的這篇文章主要介紹了
详解RocketMQ中的Producer
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
上一篇博客講解了如何安裝RocketMQ,并且也簡單的介紹了一下相關(guān)RocketMq的概念,那么這篇博客,來剖析一下MQ中的producer的角色,看看它是來干什么的?
?
?上圖就是MQ中Producer的有關(guān)結(jié)構(gòu)圖,下面來著重分析一下每個(gè)類的用途
?1.MQAdmin:作為MQ應(yīng)用層最底層的類,為我們提供了所有公共的方法,常用的有如下
?根據(jù)key、主題名和隊(duì)列來創(chuàng)建Topic
?void createTopic(final String key, final String newTopic, final int queueNum)?throws MQClientException;
?查詢消息隊(duì)列中的偏移量
?long maxOffset(final MessageQueue mq) throws MQClientException;
?根據(jù)各種條件來查詢Message信息
?QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?final long end) throws MQClientException, InterruptedException;
?2.MQProducer:用來發(fā)送生產(chǎn)者中的消息,包含了start和shutdown以及各種send方法,其中send方法返回值為sendResult,里面包含著SendStatus也就是發(fā)送的狀態(tài)。send 消息方法,只要不拋異常,就代表發(fā)送成功。但是發(fā)送成功會(huì)有多個(gè)狀態(tài),在 sendResult 里定義。?
? ?SEND_OK?
消息發(fā)送成功?
? ?FLUSH_DISK_TIMEOUT?
消息發(fā)送成功,但是服務(wù)器刷盤超時(shí),消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有此時(shí)服務(wù)器宕機(jī),消息才會(huì)丟失?
? ?FLUSH_SLAVE_TIMEOUT?
消息發(fā)送成功,但是服務(wù)器同步到 Slave 時(shí)超時(shí),消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有此時(shí)服務(wù)器宕機(jī),消
息才會(huì)丟失?
? ?SLAVE_NOT_AVAILABLE?
消息發(fā)送成功,但是此時(shí) slave 不可用,消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有此時(shí)服務(wù)器宕機(jī),消息才會(huì)丟
?3.ClientConfig:Client端公共的配置信息,例如心跳數(shù)、持久化的時(shí)間間隔等
?4.DefaultMQProducer:基礎(chǔ)的MQProducer,有一些基本的默認(rèn)設(shè)置,供我們使用。例如默認(rèn)的隊(duì)列數(shù)目、默認(rèn)的超時(shí)時(shí)間等
?
下面通過一個(gè)實(shí)例來了解一下Producer中常用的操作
?
[java]?view plaincopyprint?
<span?style="font-family:Comic?Sans?MS;font-size:18px;">?????????package?com.test;????import?java.util.List;????import?com.alibaba.rocketmq.client.producer.DefaultMQProducer;??import?com.alibaba.rocketmq.client.producer.SendCallback;??import?com.alibaba.rocketmq.client.producer.SendResult;??import?com.alibaba.rocketmq.common.message.Message;??import?com.alibaba.rocketmq.common.message.MessageQueue;???????????public?class?ProducerTest?{??????public?static?void?main(String[]?args)?throws?Exception?{????????????DefaultMQProducer?producer?=?new?DefaultMQProducer("Producer");????????????????????producer.setNamesrvAddr("100.66.154.81:9876");??????????try?{??????????????????????????????producer.setInstanceName("dd");????????????????????????????producer.setRetryTimesWhenSendFailed(3);????????????????????????????producer.start();????????????????????????????Message?msg?=?new?Message("PushTopic",?"push",?"1",??????????????????????"內(nèi)容一".getBytes());????????????????????????????SendResult?result?=?producer.send(msg);????????????????????????????producer.send(msg,?new?SendCallback()?{????????????????????????????????????@Override????????????????????????????????????public?void?onSuccess(SendResult?sendResult)?{??????????????????????System.out.println(sendResult.getSendStatus());??????????????????????System.out.println("成功了");??????????????????}????????????????????????????????????@Override????????????????????????????????????public?void?onException(Throwable?e)?{??????????????????System.out.println("失敗了"+e.getMessage());????????????????????????????????????????}??????????????});????????????????????????????????????????????????????List<MessageQueue>?messageQueues?=?producer??????????????????????.fetchPublishMessageQueues("PushTopic");??????????????System.out.println(messageQueues.size());????????????????????}?catch?(Exception?e)?{??????????????e.printStackTrace();??????????}?finally?{?????????????????????????producer.shutdown();??????????}??????}????}??</span> ?
總結(jié)
以上是生活随笔為你收集整理的详解RocketMQ中的Producer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。