日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ入门到入土(一)新手也能看懂的原理和实战!

發布時間:2025/3/20 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ入门到入土(一)新手也能看懂的原理和实战! 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

點擊上方?好好學java?,選擇?星標?公眾號

重磅資訊、干貨,第一時間送達

今日推薦:硬剛一周,3W字總結,一年的經驗告訴你如何準備校招!

個人原創100W+訪問量博客:點擊前往,查看更多

學任何技術都是兩步驟:

  • 搭建環境

  • helloworld

  • 我也不例外,直接搞起來。

    一、RocketMQ的安裝

    1、文檔

    官方網站

    http://rocketmq.apache.org

    GitHub

    https://github.com/apache/rocketmq

    2、下載

    wget?https://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip

    我們是基于Centos8來的,面向官方文檔學習,所以下載地址自然也是官方的。

    去官方網站找合適的版本進行下載,目前我這里最新的是4.7.0版本。

    http://rocketmq.apache.org/dowloading/releases/

    https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip

    3、準備工作

    3.1、解壓

    unzip?rocketmq-all-4.7.0-bin-release.zip

    3.2、安裝jdk

    sudo?yum?install?java-1.8.0-openjdk-devel

    4、啟動

    4.1、啟動namesrv

    cd?rocketmq-all-4.7.0-bin-release/bin ./mqnamesrv

    4.2、啟動broker

    cd?rocketmq-all-4.7.0-bin-release/bin ./mqbroker?-n?localhost:9876

    常見錯誤以及解決方案:

    常見錯誤:啟動broker失敗?Cannot allocate memory

    [root@node-113b?bin]#?./mqbroker?-n?localhost:9876 Java?HotSpot(TM)?64-Bit?Server?VM?warning:?INFO:?os::commit_memory(0x00000005c0000000,?8589934592,?0)?failed ;?error='Cannot?allocate?memory'?(errno=12)# #?There?is?insufficient?memory?for?the?Java?Runtime?Environment?to?continue. #?Native?memory?allocation?(mmap)?failed?to?map?8589934592?bytes?for?committing?reserved?memory. #?An?error?report?file?with?more?information?is?saved?as: #?/usr/local/rocketmq/bin/hs_err_pid1997.log

    解決方案:

    是由于默認內存分配的太大了,超出了本機內存,直接OOM了。

    修改bin/目錄下的如下兩個腳本

    runbroker.sh runserver.sh

    在這兩個腳本里都搜索-server -Xms,將其內存分配小點,自己玩的話512MB就足夠了,夠夠的了!

    4.3、啟動成功標識

    namesrv啟動成功標識:

    broker啟動成功標識:

    二、RocketMQ控制臺的安裝

    控制臺目前獲取方式有如下兩種:

  • 第三方網站去下載現成的,比如csdn等。

  • 官方源碼包自己編譯而成,官方沒有現成的。

  • 我們這里當然采取官方方式。

    1、官方文檔

    github倉庫

    https://github.com/apache/rocketmq-externals

    中文指南

    https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md

    2、下載源碼

    https://codeload.github.com/apache/rocketmq-externals/zip/master

    3、修改配置(可選)

    我們下載完解壓后的文件目錄如下:

    修改rocketmq-console\src\main\resources\application.properties文件的server.port就歐了。默認8080。

    4、編譯打包

    進入rocketmq-console,然后用maven進行編譯打包

    mvn?clean?package?-DskipTests

    打包完會在target下生成我們spring boot的jar程序,直接java -jar啟動完事。

    5、啟動控制臺

    將編譯打包好的springboot程序扔到服務器上,執行如下命令進行啟動

    java?-jar?rocketmq-console-ng-1.0.1.jar?--rocketmq.config.namesrvAddr=127.0.0.1:9876

    如果想后臺啟動就nohup &

    訪問一下看看效果:

    三、測試

    rocketmq給我們提供了測試工具和測試類,可以在安裝完很方便的進行測試。

    0、準備工作

    rocketmq給我們提供的默認測試工具在bin目錄下,叫tools.sh。我們測試前需要配置這個腳本,為他指定namesrv地址才可以,否則測試發送/消費消息的時候會出現如下錯誤?connect to null failed

    22:49:02.470?[main]?DEBUG?i.n.u.i.l.InternalLoggerFactory?-?Using?SLF4J?as?the?default?logging?framework RocketMQLog:WARN?No?appenders?could?be?found?for?logger?(io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN?Please?initialize?the?logger?system?properly. java.lang.IllegalStateException:?org.apache.rocketmq.remoting.exception.RemotingConnectException:?connect?to?null?failed

    配置如下:

    vim?tools.sh #?在export?JAVA_HOME上面添加如下這段代碼 export?NAMESRV_ADDR=localhost:9876

    1、發送消息

    ./tools.sh?org.apache.rocketmq.example.quickstart.Producer

    成功的話會看到嘩嘩嘩的日志,因為這個類會發送1000條消息到TopicTest這個Topic下。

    2、消費消息

    ./tools.sh?org.apache.rocketmq.example.quickstart.Consumer

    成功的話會看到嘩嘩嘩的日志,因為這個類會消費TopicTest下的全部消息。剛發送的1000條都會被消費掉。

    3、控制臺

    發送成功后我們自然也能來到管控臺去看消息和消費情況等等等信息

    四、架構圖以及角色

    1、架構圖

    2、角色

    2.1、Broker

    • 理解成RocketMQ本身

    • broker主要用于producer和consumer接收和發送消息

    • broker會定時向nameserver提交自己的信息

    • 是消息中間件的消息存儲、轉發服務器

    • 每個Broker節點,在啟動時,都會遍歷NameServer列表,與每個NameServer建立長連接,注冊自己的信息,之后定時上報

    2.2、Nameserver

    • 理解成zookeeper的效果,只是他沒用zk,而是自己寫了個nameserver來替代zk

    • 底層由netty實現,提供了路由管理、服務注冊、服務發現的功能,是一個無狀態節點

    • nameserver是服務發現者,集群中各個角色(producer、broker、consumer等)都需要定時向nameserver上報自己的狀態,以便互相發現彼此,超時不上報的話,nameserver會把它從列表中剔除

    • nameserver可以部署多個,當多個nameserver存在的時候,其他角色同時向他們上報信息,以保證高可用,

    • NameServer集群間互不通信,沒有主備的概念

    • nameserver內存式存儲,nameserver中的broker、topic等信息默認不會持久化,所以他是無狀態節點

    2.3、Producer

    • 消息的生產者

    • 隨機選擇其中一個NameServer節點建立長連接,獲得Topic路由信息(包括topic下的queue,這些queue分布在哪些broker上等等)

    • 接下來向提供topic服務的master建立長連接(因為rocketmq只有master才能寫消息),且定時向master發送心跳

    2.4、Consumer

    • 消息的消費者

    • 通過NameServer集群獲得Topic的路由信息,連接到對應的Broker上消費消息

    • 由于Master和Slave都可以讀取消息,因此Consumer會與Master和Slave都建立連接進行消費消息

    3、核心流程

    • Broker都注冊到Nameserver上

    • Producer發消息的時候會從Nameserver上獲取發消息的topic信息

    • Producer向提供服務的所有master建立長連接,且定時向master發送心跳

    • Consumer通過NameServer集群獲得Topic的路由信息

    • Consumer會與所有的Master和所有的Slave都建立連接進行監聽新消息

    五、核心概念

    1、Message

    消息載體。Message發送或者消費的時候必須指定Topic。Message有一個可選的Tag項用于過濾消息,還可以添加額外的鍵值對。

    2、topic

    消息的邏輯分類,發消息之前必須要指定一個topic才能發,就是將這條消息發送到這個topic上。消費消息的時候指定這個topic進行消費。就是邏輯分類。

    3、queue

    1個Topic會被分為N個Queue,數量是可配置的。message本身其實是存儲到queue上的,消費者消費的也是queue上的消息。多說一嘴,比如1個topic4個queue,有5個Consumer都在消費這個topic,那么會有一個consumer浪費掉了,因為負載均衡策略,每個consumer消費1個queue,5>4,溢出1個,這個會不工作。

    4、Tag

    Tag 是 Topic 的進一步細分,顧名思義,標簽。每個發送的時候消息都能打tag,消費的時候可以根據tag進行過濾,選擇性消費。

    5、Message Model

    消息模型:集群(Clustering)和廣播(Broadcasting)

    6、Message Order

    消息順序:順序(Orderly)和并發(Concurrently)

    7、Producer Group

    消息生產者組

    8、Consumer Group

    消息消費者組

    六、ACK

    首先要明確一點:ACK機制是發生在Consumer端的,不是在Producer端的。也就是說Consumer消費完消息后要進行ACK確認,如果未確認則代表是消費失敗,這時候Broker會進行重試策略(僅集群模式會重試)。ACK的意思就是:Consumer說:ok,我消費成功了。這條消息給我標記成已消費吧。

    七、消費模式

    1、集群模式(Clustering)

    1.1、圖解

    1.2、特點

    • 每條消息只需要被處理一次,broker只會把消息發送給消費集群中的一個消費者

    • 在消息重投時,不能保證路由到同一臺機器上

    • 消費狀態由broker維護

    2、廣播模式(Broadcasting)

    2.1、圖解

    2.2、特點

    • 消費進度由consumer維護

    • 保證每個消費者都消費一次消息

    • 消費失敗的消息不會重投

    八、Java API

    說明:

    • RocketMQ服務端版本為目前最新版:4.7.0

    • Java客戶端版本采取的目前最新版:4.7.0

    pom如下

    <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version> </dependency>

    1、Producer

    發消息肯定要必備如下幾個條件:

    • 指定生產組名(不能用默認的,會報錯)

    • 配置namesrv地址(必須)

    • 指定topic name(必須)

    • 指定tag/key(可選)

    驗證消息是否發送成功:消息發送完后可以啟動消費者進行消費,也可以去管控臺上看消息是否存在。

    1.1、send(同步)

    public?class?Producer?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();//?創建消息對象,topic為:myTopic001,消息內容為:hello worldMessage?msg?=?new?Message("myTopic001",?"hello?world".getBytes());//?發送消息到mq,同步的SendResult?result?=?producer.send(msg);System.out.println("發送消息成功!result is :?"?+?result);//?關閉Producerproducer.shutdown();System.out.println("生產者 shutdown!");} }

    輸出結果:

    發送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854140F418B4AAC26F7973910000, offsetMsgId=7B39B49D00002A9F00000000000589BE,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=0],?queueOffset=7] 生產者 shutdown!

    1.2、send(批量)

    public?class?ProducerMultiMsg?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();String?topic?=?"myTopic001";//?創建消息對象,topic為:myTopic001,消息內容為:hello world1/2/3Message?msg1?=?new?Message(topic,?"hello?world1".getBytes());Message?msg2?=?new?Message(topic,?"hello?world2".getBytes());Message?msg3?=?new?Message(topic,?"hello?world3".getBytes());//?創建消息對象的集合,用于批量發送List<Message>?msgs?=?new?ArrayList<>();msgs.add(msg1);msgs.add(msg2);msgs.add(msg3);//?批量發送的api的也是send(),只是他的重載方法支持List<Message>,同樣是同步發送。SendResult?result?=?producer.send(msgs);System.out.println("發送消息成功!result is :?"?+?result);//?關閉Producerproducer.shutdown();System.out.println("生產者 shutdown!");} }

    輸出結果:

    發送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854139C418B4AAC26F7D13770000,A9FE854139C418B4AAC26F7D13770001,A9FE854139C418B4AAC26F7D13770002, offsetMsgId=7B39B49D00002A9F0000000000058A62,7B39B49D00002A9F0000000000058B07,7B39B49D00002A9F0000000000058BAC,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=0],?queueOffset=8] 生產者 shutdown!

    從結果中可以看到只有一個msgId,所以可以發現雖然是三條消息對象,但是卻只發送了一次,大大節省了client與server的開銷。

    錯誤情況:

    批量發送的topic必須是同一個,如果message對象指定不同的topic,那么批量發送的時候會報錯:

    Exception?in?thread?"main"?org.apache.rocketmq.client.exception.MQClientException:?Failed?to?initiate?the?MessageBatch For?more?information,?please?visit?the?url,?http://rocketmq.apache.org/docs/faq/at?org.apache.rocketmq.client.producer.DefaultMQProducer.batch(DefaultMQProducer.java:950)at?org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:898)at?com.chentongwei.mq.rocketmq.ProducerMultiMsg.main(ProducerMultiMsg.java:29) Caused?by:?java.lang.UnsupportedOperationException:?The?topic?of?the?messages?in?one?batch?should?be?the?sameat?org.apache.rocketmq.common.message.MessageBatch.generateFromList(MessageBatch.java:58)at?org.apache.rocketmq.client.producer.DefaultMQProducer.batch(DefaultMQProducer.java:942)...?2?more

    1.3、sendCallBack(異步)

    public?class?ProducerASync?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();//?創建消息對象,topic為:myTopic001,消息內容為:hello world asyncMessage?msg?=?new?Message("myTopic001",?"hello?world?async".getBytes());//?進行異步發送,通過SendCallback接口來得知發送的結果producer.send(msg,?new?SendCallback()?{//?發送成功的回調接口@Overridepublic?void?onSuccess(SendResult?sendResult)?{System.out.println("發送消息成功!result is :?"?+?sendResult);}//?發送失敗的回調接口@Overridepublic?void?onException(Throwable?throwable)?{throwable.printStackTrace();System.out.println("發送消息失敗!result is :?"?+?throwable.getMessage());}});producer.shutdown();System.out.println("生產者 shutdown!");} }

    輸出結果:

    生產者 shutdown! java.lang.IllegalStateException:?org.apache.rocketmq.remoting.exception.RemotingConnectException:?connect?to?[124.57.180.156:9876]?failedat?org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:681)at?org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:511)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:692)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:556)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.access$300(DefaultMQProducerImpl.java:97)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl$4.run(DefaultMQProducerImpl.java:510)at?java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at?java.util.concurrent.FutureTask.run(FutureTask.java:266)at?java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at?java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at?java.lang.Thread.run(Thread.java:745) Caused?by:?org.apache.rocketmq.remoting.exception.RemotingConnectException:?connect?to?[124.57.180.156:9876]?failedat?org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel(NettyRemotingClient.java:441)at?org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel(NettyRemotingClient.java:396)at?org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:365)at?org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1371)at?org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1361)at?org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:624)...?10?more 發送消息失敗!result is : org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [124.57.180.156:9876]?failed

    為啥報錯了?很簡單,他是異步的,從結果就能看出來,由于是異步的,我還沒發送到mq呢,你就先給我shutdown了。肯定不行,所以我們在shutdown前面sleep 1s在看效果

    public?class?ProducerASync?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();//?創建消息對象,topic為:myTopic001,消息內容為:hello world asyncMessage?msg?=?new?Message("myTopic001",?"hello?world?async".getBytes());//?進行異步發送,通過SendCallback接口來得知發送的結果producer.send(msg,?new?SendCallback()?{//?發送成功的回調接口@Overridepublic?void?onSuccess(SendResult?sendResult)?{System.out.println("發送消息成功!result is :?"?+?sendResult);}//?發送失敗的回調接口@Overridepublic?void?onException(Throwable?throwable)?{throwable.printStackTrace();System.out.println("發送消息失敗!result is :?"?+?throwable.getMessage());}});Thread.sleep(1000);producer.shutdown();System.out.println("生產者 shutdown!");} }

    輸出結果:

    發送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854106E418B4AAC26F8719B20000, offsetMsgId=7B39B49D00002A9F0000000000058CFC,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=1],?queueOffset=2] 生產者 shutdown!

    1.4、sendOneway

    public?class?ProducerOneWay?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();//?創建消息對象,topic為:myTopic001,消息內容為:hello world onewayMessage?msg?=?new?Message("myTopic001",?"hello?world?oneway".getBytes());//?效率最高,因為oneway不關心是否發送成功,我就投遞一下我就不管了。所以返回是voidproducer.sendOneway(msg);System.out.println("投遞消息成功!,注意這里是投遞成功,而不是發送消息成功哦!因為我sendOneway也不知道到底成沒成功,我沒返回值的。");producer.shutdown();System.out.println("生產者 shutdown!");} }

    輸出結果:

    投遞消息成功!,注意這里是投遞成功,而不是發送消息成功哦!因為我sendOneway也不知道到底成沒成功,我沒返回值的。 生產者 shutdown!

    1.5、效率對比

    sendOneway > sendCallBack > send批量 > send單條

    很容易理解,sendOneway不求結果,我就負責投遞,我不管你失敗還是成功,相當于中轉站,來了我就扔出去,我不進行任何其他處理。所以最快。

    而sendCallBack是異步發送肯定比同步的效率高。

    send批量和send單條的效率也是分情況的,如果只有1條msg要發,那還搞毛批量,直接send單條完事。

    2、Consumer

    每個consumer只能關注一個topic。

    發消息肯定要必備如下幾個條件:

    • 指定消費組名(不能用默認的,會報錯)

    • 配置namesrv地址(必須)

    • 指定topic name(必須)

    • 指定tag/key(可選)

    2.1、CLUSTERING

    集群模式,默認。

    比如啟動五個Consumer,Producer生產一條消息后,Broker會選擇五個Consumer中的其中一個進行消費這條消息,所以他屬于點對點消費模式。

    public?class?Consumer?{public?static?void?main(String[]?args)?throws?Exception?{//?指定消費組名為my-consumerDefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("my-consumer");//?配置namesrv地址consumer.setNamesrvAddr("124.57.180.156:9876");//?訂閱topic:myTopic001 下的全部消息(因為是*,*指定的是tag標簽,代表全部消息,不進行任何過濾)consumer.subscribe("myTopic001",?"*");//?注冊監聽器,進行消息消息。consumer.registerMessageListener(new?MessageListenerConcurrently()?{@Overridepublic?ConsumeConcurrentlyStatus?consumeMessage(List<MessageExt>?msgs,?ConsumeConcurrentlyContext?consumeConcurrentlyContext)?{for?(MessageExt?msg?:?msgs)?{String?str?=?new?String(msg.getBody());//?輸出消息內容System.out.println(str);}//?默認情況下,這條消息只會被一個consumer消費,這叫點對點消費模式。也就是集群模式。//?ack確認return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//?啟動消費者consumer.start();System.out.println("Consumer?start");} }

    2.2、BROADCASTING

    廣播模式。

    比如啟動五個Consumer,Producer生產一條消息后,Broker會把這條消息廣播到五個Consumer中,這五個Consumer分別消費一次,每個都消費一次。

    //?代碼里只需要添加如下這句話即可: consumer.setMessageModel(MessageModel.BROADCASTING);?

    2.3、兩種模式對比

    • 集群默認是默認的,廣播模式是需要手動配置。

    • 一條消息:集群模式下的多個Consumer只會有一個Consumer消費。廣播模式下的每一個Consumer都會消費這條消息。

    • 廣播模式下,發送一條消息后,會被當前被廣播的所有Consumer消費,但是后面新加入的Consumer不會消費這條消息,很好理解:村里面大喇叭喊了全村來領雞蛋,第二天你們村新來個人,那個人肯定聽不到昨天大喇叭喊的消息呀。

    3、TAG&&KEY

    發送/消費 消息的時候可以指定tag/key來進行過濾消息,支持通配符。*代表消費此topic下的全部消息,不進行過濾。

    看下org.apache.rocketmq.common.message.Message源碼可以發現發消息的時候可以指定tag和keys:

    public?Message(String?topic,?String?tags,?String?keys,?byte[]?body)?{this(topic,?tags,?keys,?0,?body,?true); }

    比如:

    public?class?ProducerTagsKeys?{public?static?void?main(String[]?args)?throws?Exception?{//?指定生產組名為my-producerDefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");//?配置namesrv地址producer.setNamesrvAddr("124.57.180.156:9876");//?啟動Producerproducer.start();//?創建消息對象,topic為:myTopic001,消息內容為:hello world,且tags為:test-tags,keys為test-keysMessage?msg?=?new?Message("myTopic001",?"test-tags",?"test-keys",?"hello?world".getBytes());//?發送消息到mq,同步的SendResult?result?=?producer.send(msg);System.out.println("發送消息成功!result is :?"?+?result);//?關閉Producerproducer.shutdown();System.out.println("生產者 shutdown!");} }

    輸出結果:

    發送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854149DC18B4AAC26FA4B7200000, offsetMsgId=7B39B49D00002A9F0000000000058DA6,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=3],?queueOffset=3] 生產者 shutdown!

    查看管控臺,可以發現tags和keys已經生效了:

    消費的時候如果指定*那就是此topic下的全部消息,我們可以指定前綴通配符,比如:

    //?這樣就只會消費myTopic001下的tag為test-*開頭的消息。 consumer.subscribe("myTopic001",?"test-*");//?代表訂閱Topic為myTopic001下的tag為TagA或TagB的所有消息 consumer.subscribe("myTopic001",?"TagA||TagB");

    還支持SQL表達式過濾,不是很常用。不BB了。

    4、常見錯誤

    4.1、sendDefaultImpl call timeout

    4.1.1、異常

    Exception?in?thread?"main"?org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:?sendDefaultImpl?call?timeoutat?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:666)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1288)at?org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:324)at?com.chentongwei.mq.rocketmq.Producer.main(Producer.java:18)

    4.1.2、解決

    1.如果你是云服務器,首先檢查安全組是否允許9876這個端口訪問,是否開啟了防火墻,如果開啟了的話是否將9876映射了出去。

    2.修改配置文件broker.conf,加上:

    brokerIP1=我用的是阿里云服務器,這里是我的公網IP

    啟動namesrv和broker的時候加上本機IP(我用的是阿里云服務器,這里是我的公網IP):

    ./bin/mqnamesrv?-n?IP:9876 ./bin/mqbroker?-n?IP:9876?-c?conf/broker.conf

    4.2、No route info of this topic

    4.2.1、異常

    Exception?in?thread?"main"?org.apache.rocketmq.client.exception.MQClientException:?No?route?info?of?this?topic:?myTopic001 See?http://rocketmq.apache.org/docs/faq/?for?further?details.at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:684)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1288)at?org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:324)at?com.chentongwei.mq.rocketmq.Producer.main(Producer.java:18)

    4.2.2、解決

    很明顯發送成功了,不再是剛才的超時了,但是告訴我們沒有這個topic。那不能每次都手動創建呀,所以啟動broker的時候可以指定參數讓broker為我們自動創建。如下

    ./bin/mqbroker?-n?IP:9876?-c?conf/broker.conf?autoCreateTopicEnable=true 推薦文章
    • 硬剛一周,3W字總結,一年的經驗告訴你如何準備校招!

    • 今年的校招,Java 好拿 offer 嗎?

    • 10月了,該聊聊今年秋招了!

    • 聊聊在騰訊實習快一個月的感受

    原創電子書歷時整整一年總結的?Java 面試 + Java 后端技術學習指南,這是本人這幾年及校招的總結,各種高頻面試題已經全部進行總結,按照章節復習即可,已經拿到了大廠offer。 原創思維導圖掃碼或者微信搜?程序員的技術圈子?回復?面試?領取原創電子書和思維導圖。

    總結

    以上是生活随笔為你收集整理的RocketMQ入门到入土(一)新手也能看懂的原理和实战!的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 中文字幕一区视频 | 免费三级黄色 | 韩国av三级 | 欧美精品一二三区 | av色图在线| 在线观看的av网址 | 一区二区三区四区在线观看视频 | 亚洲AV无码一区二区三区性 | 黄色成人在线视频 | 欧美888 | 在线免费观看av片 | 国产同性人妖ts口直男 | 日韩五码电影 | 国产精品无码一区二区三区免费 | 亚洲精品乱码久久久久久自慰 | 亚洲一区二区色图 | 快射视频网站 | 国产精品99久久久久久久 | 美女黄色大片 | 国产秋霞| 久草视频在线看 | 国产精品久久久久一区二区三区 | 久久久久久艹 | 亚洲一区亚洲二区 | 日韩激情综合 | 亚洲三级在线 | 日韩精品一区二区三区无码专区 | 色在线视频 | 国产一区二区三区四区hd | 七七久久 | avtt香蕉久久 | 五月天国产在线 | 懂色av蜜臀av粉嫩av分享吧最新章节 | 黄色在线观看国产 | v天堂中文在线 | 波多野结衣一区二 | 国产无码精品在线观看 | 尹人久久 | 情侣作爱视频网站 | 看全黄大色黄大片 | av网址免费 | 国产又粗又猛又爽又黄又 | 蜜臀av免费在线观看 | 久久精品女人毛片国产 | 亚洲视频在线观看一区二区 | 国产在线观看免费播放 | 国产成人一区二区三区别 | 欧美a√在线 | www.av成人 | 国产性猛交╳xxx乱大交一区 | www.男人天堂| 秘密基地电影免费版观看国语 | 五月天国产视频 | 91热热 | 小h片在线观看 | 捆绑少妇玩各种sm调教 | 耳光调教vk | 成人激情四射 | www.天天色 | 高潮毛片无遮挡高清免费 | 色偷偷av男人的天堂 | 精品一级少妇久久久久久久 | 天天干天天插天天射 | 伊人色网 | 亚洲国产精品免费视频 | 一区二区三区资源 | 九九人人| 国产精品日本 | av片免费在线播放 | 免费aa视频 | 亚洲v日韩v综合v精品v | 一区二区三区网 | 好吊操免费视频 | 调教一区 | 少妇搡bbbb搡bbb搡澳门 | 少妇高潮a一级 | www网站在线免费观看 | 亚洲在线视频 | 91精品国产综合久久精品图片 | 欧美xxxx黑人又粗又长密月 | 免费看黄色的网址 | 先锋影音男人 | 殴美黄色大片 | 亚洲精品国产精品国自产观看浪潮 | 影音先锋成人资源 | 免费av网站在线播放 | 白浆四溢 | 少妇又色又紧又大爽又刺激 | 日韩一级片在线播放 | 99riav国产精品 | 国产原创在线观看 | 亚洲欧洲综合网 | 亚洲av人人澡人人爽人人夜夜 | 91av在线免费视频 | 国产精品久久91 | 夜夜夜久久久 | 岛国毛片在线观看 | 51精产品一区一区三区 | 日本色悠悠 |