普通消息 普通消息也叫做無(wú)序消息,簡(jiǎn)單來(lái)說就是沒有順序的消息,producer 只管發(fā)送消息,consumer 只管接收消息,至于消息和消息之間的順序并沒有保證,可能先發(fā)送的消息先消費(fèi),也可能先發(fā)送的消息后消費(fèi)。
舉個(gè)簡(jiǎn)單例子,producer 依次發(fā)送 order id 為 1、2、3 的消息到 broker,consumer 接到的消息順序有可能是 1、2、3,也有可能是 2、1、3 等情況,這就是普通消息。
因?yàn)椴恍枰WC消息的順序,所以消息可以大規(guī)模并發(fā)地發(fā)送和消費(fèi),吞吐量很高,適合大部分場(chǎng)景。
代碼示例 :
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//聲明并初始化一個(gè)producer//需要一個(gè)producer group名字作為構(gòu)造方法的參數(shù),這里為concurrent_producerDefaultMQProducer producer = new DefaultMQProducer("concurrent_producer");//設(shè)置NameServer地址,此處應(yīng)改為實(shí)際NameServer地址,多個(gè)地址之間用;分隔//NameServer的地址必須有,但是也可以通過環(huán)境變量的方式設(shè)置,不一定非得寫死在代碼里producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");//調(diào)用start()方法啟動(dòng)一個(gè)producer實(shí)例producer.start();//發(fā)送10條消息到Topic為TopicTest,tag為TagA,消息內(nèi)容為“Hello RocketMQ”拼接上i的值for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTestConcurrent",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body);//調(diào)用producer的send()方法發(fā)送消息//這里調(diào)用的是同步的方式,所以會(huì)有返回結(jié)果,同時(shí)默認(rèn)發(fā)送的也是普通消息SendResult sendResult = producer.send(msg);//打印返回結(jié)果,可以看到消息發(fā)送的狀態(tài)以及一些相關(guān)信息System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//發(fā)送完消息之后,調(diào)用shutdown()方法關(guān)閉producerproducer.shutdown();}
}
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//聲明并初始化一個(gè)consumer//需要一個(gè)consumer group名字作為構(gòu)造方法的參數(shù),這里為concurrent_consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("concurrent_consumer");//同樣也要設(shè)置NameServer地址consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");//這里設(shè)置的是一個(gè)consumer的消費(fèi)策略//CONSUME_FROM_LAST_OFFSET 默認(rèn)策略,從該隊(duì)列最尾開始消費(fèi),即跳過歷史消息//CONSUME_FROM_FIRST_OFFSET 從隊(duì)列最開始開始消費(fèi),即歷史消息(還儲(chǔ)存在broker的)全部消費(fèi)一遍//CONSUME_FROM_TIMESTAMP 從某個(gè)時(shí)間點(diǎn)開始消費(fèi),和setConsumeTimestamp()配合使用,默認(rèn)是半個(gè)小時(shí)以前consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//設(shè)置consumer所訂閱的Topic和Tag,*代表全部的Tagconsumer.subscribe("TopicTestConcurrent", "*");//設(shè)置一個(gè)Listener,主要進(jìn)行消息的邏輯處理//注意這里使用的是MessageListenerConcurrently這個(gè)接口consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);//返回消費(fèi)狀態(tài)//CONSUME_SUCCESS 消費(fèi)成功//RECONSUME_LATER 消費(fèi)失敗,需要稍后重新消費(fèi)return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//調(diào)用start()方法啟動(dòng)consumerconsumer.start();System.out.println("Consumer Started.");}
}
有序消息 有序消息就是按照一定的先后順序的消息類型。
舉個(gè)例子來(lái)說,producer 依次發(fā)送 order id 為 1、2、3 的消息到 broker,consumer 接到的消息順序也就是 1、2、3 ,而不會(huì)出現(xiàn)普通消息那樣的 2、1、3 等情況。
那么有序消息是如何保證的呢?我們都知道消息首先由 producer 到 broker,再?gòu)?broker 到 consumer,分這兩步走。那么要保證消息的有序,勢(shì)必這兩步都是要保證有序的,即要保證消息是按有序發(fā)送到 broker,broker 也是有序?qū)⑾⑼哆f給 consumer,兩個(gè)條件必須同時(shí)滿足,缺一不可。 進(jìn)一步還可以將有序消息分成
之前我們講過,topic 只是消息的邏輯分類,內(nèi)部實(shí)現(xiàn)其實(shí)是由 queue 組成。當(dāng) producer 把消息發(fā)送到某個(gè) topic 時(shí),默認(rèn)是會(huì)消息發(fā)送到具體的 queue 上。
全局有序
舉個(gè)例子,producer 發(fā)送 order id 為 1、2、3、4 的四條消息到 topicA 上,假設(shè) topicA 的 queue 數(shù)為 3 個(gè)(queue0、queue1、queue2),那么消息的分布可能就是這種情況,id 為 1 的在 queue0,id 為 2 的在 queue1,id 為 3 的在 queue2,id 為 4 的在 queue0。同樣的,consumer 消費(fèi)時(shí)也是按 queue 去消費(fèi),這時(shí)候就可能出現(xiàn)先消費(fèi) 1、4,再消費(fèi) 2、3,和我們的預(yù)期不符。那么我們?nèi)绾螌?shí)現(xiàn) 1、2、3、4 的消費(fèi)順序呢?道理其實(shí)很簡(jiǎn)單,只需要把訂單 topic 的 queue 數(shù)改為 1,如此一來(lái),只要 producer 按照 1、2、3、4 的順序去發(fā)送消息,那么 consumer 自然也就按照 1、2、3、4 的順序去消費(fèi),這就是全局有序消息。
由于一個(gè) topic 只有一個(gè) queue ,即使我們有多個(gè) producer 實(shí)例和 consumer 實(shí)例也很難提高消息吞吐量。就好比過獨(dú)木橋,大家只能一個(gè)挨著一個(gè)過去,效率低下。
那么有沒有吞吐量和有序之間折中的方案呢?其實(shí)是有的,就是局部有序消息。
局部有序
我們知道訂單消息可以再細(xì)分為訂單創(chuàng)建、訂單付款、訂單完成等消息,這些消息都有相同的 order id。同時(shí),也只有按照訂單創(chuàng)建、訂單付款、訂單完成的順序去消費(fèi)才符合業(yè)務(wù)邏輯。但是不同 order id 的消息是可以并行的,不會(huì)影響到業(yè)務(wù)。這時(shí)候就常見做法就是將 order id 進(jìn)行處理,將 order id 相同的消息發(fā)送到 topicB 的同一個(gè) queue,假設(shè)我們 topicB 有 2 個(gè) queue,那么我們可以簡(jiǎn)單的對(duì) id 取余,奇數(shù)的發(fā)往 queue0,偶數(shù)的發(fā)往 queue1,消費(fèi)者按照 queue 去消費(fèi)時(shí),就能保證 queue0 里面的消息有序消費(fèi),queue1 里面的消息有序消費(fèi)。
由于一個(gè) topic 可以有多個(gè) queue,所以在性能比全局有序高得多。假設(shè) queue 數(shù)是 n,理論上性能就是全局有序的 n 倍,當(dāng)然 consumer 也要跟著增加才行。在實(shí)際情況中,這種局部有序消息是會(huì)比全局有序消息用的更多。
示例代碼 :
public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {// 聲明并初始化一個(gè)producer// 需要一個(gè)producer group名字作為構(gòu)造方法的參數(shù),這里為ordered_producerDefaultMQProducer orderedProducer = new DefaultMQProducer("ordered_producer");// 設(shè)置NameServer地址,此處應(yīng)改為實(shí)際NameServer地址,多個(gè)地址之間用;分隔//NameServer的地址必須有,但是也可以通過環(huán)境變量的方式設(shè)置,不一定非得寫死在代碼里orderedProducer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");// 調(diào)用start()方法啟動(dòng)一個(gè)producer實(shí)例orderedProducer.start();// 自定義一個(gè)tag數(shù)組String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};// 發(fā)送10條消息到Topic為TopicTestOrdered,tag為tags數(shù)組按順序取值,// key值為“KEY”拼接上i的值,消息內(nèi)容為“Hello RocketMQ”拼接上i的值for (int i = 0; i < 10; i++) {int orderId = i % 10;Message msg =new Message("TopicTestOrdered", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = orderedProducer.send(msg, new MessageQueueSelector() {// 選擇發(fā)送消息的隊(duì)列@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// arg的值其實(shí)就是orderIdInteger id = (Integer) arg;// mqs是隊(duì)列集合,也就是topic所對(duì)應(yīng)的所有隊(duì)列int index = id % mqs.size();// 這里根據(jù)前面的id對(duì)隊(duì)列集合大小求余來(lái)返回所對(duì)應(yīng)的隊(duì)列return mqs.get(index);}}, orderId);System.out.println(sendResult);}orderedProducer.shutdown();} catch (MQClientException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
}
至于是要實(shí)現(xiàn)全局有序,還是局部有序,在此示例代碼中,就取決于 TopicTestOrdered 這個(gè) Topic 的隊(duì)列數(shù)了。
public class Consumer {public static void main(String[] args) throws MQClientException {//聲明并初始化一個(gè)consumer//需要一個(gè)consumer group名字作為構(gòu)造方法的參數(shù),這里為concurrent_consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer");//同樣也要設(shè)置NameServer地址consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");//這里設(shè)置的是一個(gè)consumer的消費(fèi)策略//CONSUME_FROM_LAST_OFFSET 默認(rèn)策略,從該隊(duì)列最尾開始消費(fèi),即跳過歷史消息//CONSUME_FROM_FIRST_OFFSET 從隊(duì)列最開始開始消費(fèi),即歷史消息(還儲(chǔ)存在broker的)全部消費(fèi)一遍//CONSUME_FROM_TIMESTAMP 從某個(gè)時(shí)間點(diǎn)開始消費(fèi),和setConsumeTimestamp()配合使用,默認(rèn)是半個(gè)小時(shí)以前consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//設(shè)置consumer所訂閱的Topic和Tagconsumer.subscribe("TopicTestOrdered", "TagA || TagC || TagD");//設(shè)置一個(gè)Listener,主要進(jìn)行消息的邏輯處理//注意這里使用的是MessageListenerOrderly這個(gè)接口consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);//返回消費(fèi)狀態(tài)//SUCCESS 消費(fèi)成功//SUSPEND_CURRENT_QUEUE_A_MOMENT 消費(fèi)失敗,暫停當(dāng)前隊(duì)列的消費(fèi)return ConsumeOrderlyStatus.SUCCESS;}});//調(diào)用start()方法啟動(dòng)consumerconsumer.start();System.out.println("Consumer Started.");}
}
延時(shí)消息 延時(shí)消息,簡(jiǎn)單來(lái)說就是當(dāng) producer 將消息發(fā)送到 broker 后,會(huì)延時(shí)一定時(shí)間后才投遞給 consumer 進(jìn)行消費(fèi)。
RcoketMQ的延時(shí)等級(jí)為:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延時(shí)。level=1,表示 1 級(jí)延時(shí),對(duì)應(yīng)延時(shí) 1s。level=2 表示 2 級(jí)延時(shí),對(duì)應(yīng)5s,以此類推。
這種消息一般適用于消息生產(chǎn)和消費(fèi)之間有時(shí)間窗口要求的場(chǎng)景。比如說我們網(wǎng)購(gòu)時(shí),下單之后是有一個(gè)支付時(shí)間,超過這個(gè)時(shí)間未支付,系統(tǒng)就應(yīng)該自動(dòng)關(guān)閉該筆訂單。那么在訂單創(chuàng)建的時(shí)候就會(huì)就需要發(fā)送一條延時(shí)消息(延時(shí)15分鐘)后投遞給 consumer,consumer 接收消息后再對(duì)訂單的支付狀態(tài)進(jìn)行判斷是否關(guān)閉訂單。
設(shè)置延時(shí)非常簡(jiǎn)單,只需要在Message設(shè)置對(duì)應(yīng)的延時(shí)級(jí)別即可:
Message msg = new Message("TopicTest",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body);// 這里設(shè)置需要延時(shí)的等級(jí)即可msg.setDelayTimeLevel(3);SendResult sendResult = producer.send(msg);
更多技術(shù)干貨,可以掃描下面的二維碼,關(guān)注微信公眾號(hào):馮先生的筆記
作者:馮先生的筆記
鏈接:http://www.jianshu.com/p/11e875074a8f
來(lái)源:簡(jiǎn)書
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
總結(jié)
以上是生活随笔 為你收集整理的必知必会的RocketMQ消息类型 的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔 推薦給好友。