日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ActiveMQ入门系列二:入门代码实例(点对点模式)

發(fā)布時間:2025/4/16 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMQ入门系列二:入门代码实例(点对点模式) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

在上一篇《ActiveMQ入門系列一:認(rèn)識并安裝ActiveMQ(Windows下)》中,大致介紹了ActiveMQ和一些概念,并下載、安裝、啟動他,還訪問了他的控制臺頁面。

這篇,就用代碼實(shí)例說下如何實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi)。

一、理論基礎(chǔ)

同RabbitMQ一樣,ActiveMQ中也是有兩種模式:

  • 點(diǎn)對點(diǎn)模式(Point to Point,簡寫為PTP)
  • 發(fā)布/訂閱模式(Publish & Subscribe,簡寫為Pub & Sub)

通過上一篇我們知道了制造消息的應(yīng)用叫生產(chǎn)者(Producer),生產(chǎn)者在生產(chǎn)了消息后會發(fā)送消息到目的地(Destination),到達(dá)消費(fèi)和處理消息的應(yīng)用(也就是消費(fèi)者Consumer)。這里的兩種模式就通過對應(yīng)不同的消息目的地(Destination)來實(shí)現(xiàn),PTP對應(yīng)Queue(隊(duì)列)、Pub&Sub對應(yīng)Topic(主題)。

今天就詳細(xì)介紹下PTP和Queue,下一篇介紹Pub & Sub和Topic。

在PTP模式的示意圖:

?

  • 消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費(fèi)者從queue中取出并且消費(fèi)消息。
  • 消息被消費(fèi)以后,queue中不再有存儲,所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。
  • Queue支持存在多個消費(fèi)者,但是對一個消息而言,只會有一個消費(fèi)者可以消費(fèi)、其它的則不能消費(fèi)此消息了。
  • 當(dāng)消費(fèi)者不存在時,消息會一直保存,直到有消費(fèi)消費(fèi)。

在PTP中,代碼實(shí)現(xiàn)有兩種方式:消費(fèi)者主動消費(fèi)和消費(fèi)者監(jiān)聽消費(fèi),下面就分別說下。

二、消費(fèi)者主動消費(fèi)

主動消費(fèi)是最基本也是最簡單的消費(fèi)方式,先上代碼:

  • 創(chuàng)建maven工程并引入依賴 <dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.7.0</version></dependency>
  • 實(shí)現(xiàn)生產(chǎn)者 package com.sam.ptp;
    import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /*** @author JAVA開發(fā)老菜鳥**/ public class Producer {public static final String QUEUE_NAME = "ptp-demo";//隊(duì)列名public void producer(String message) throws JMSException {ConnectionFactory factory = null;Connection connection = null;Session session = null;MessageProducer producer = null;try {/*** 1.創(chuàng)建連接工廠* 創(chuàng)建工廠,構(gòu)造方法有三個參數(shù):分別是用戶名、密碼、連接地址* 無參構(gòu)造:有默認(rèn)的連接地址,localhost* 一個參數(shù):無驗(yàn)證模式,無用戶的認(rèn)證* 三個參數(shù):有認(rèn)證和連接地址,我這里使用三個參數(shù)的構(gòu)造方法*/factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");/*** 2.創(chuàng)建連接,有兩個方法(我這里使用無參數(shù)的)* 無參數(shù)* 有參數(shù):用戶名、密碼;*/connection = factory.createConnection();/*** 3.啟動連接* 生產(chǎn)者可以不用調(diào)用start()方法啟動,因?yàn)樵诎l(fā)送消息的時候回進(jìn)行檢查* 如果未啟動連接,會自動啟動。* 如果有特殊配置,需要配置完成后再啟動連接*/connection.start();/*** 4.用連接創(chuàng)建會話* 有兩個參數(shù):是否需要事務(wù)、消息確認(rèn)機(jī)制* 如果支持事務(wù),對于生產(chǎn)者來說第二個參數(shù)就無效了,這個時候第二個參數(shù)建議傳入Session.SESSION_TRANSACTED* 如果不支持事務(wù),第二個參數(shù)有效且必須傳遞** AUTO_ACKNOWLEDGE:自動確認(rèn),消息處理后自動確認(rèn)(商業(yè)開發(fā)不推薦)* CLIENT_ACKNOWLEDGE:客戶端手動確認(rèn),消費(fèi)者處理后必須手動確認(rèn)* DUPS_OK_ACKNOWLEDGE:有副本的客戶端手動確認(rèn),消息可以多次處理(不建議)*/session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);/*** 5.用會話創(chuàng)建目的地(隊(duì)列)、生產(chǎn)者、消息* 隊(duì)列名是隊(duì)列的唯一標(biāo)記* 創(chuàng)建生產(chǎn)者的時候可以指定目的地,也可以在發(fā)送消息的時候再指定*/Destination destination = session.createQueue(QUEUE_NAME);producer = session.createProducer(destination);TextMessage textMessage = session.createTextMessage(message);/*** 6.生產(chǎn)者發(fā)送消息到目的地*/producer.send(textMessage);System.out.println("消息發(fā)送成功");} catch(Exception ex){throw ex;} finally {/*** 7.釋放資源*/if(producer != null){producer.close();}if(session != null){session.close();}if(connection != null){connection.close();}}}public static void main(String[] args){Producer producer = new Producer();try{producer.producer("hello, activemq");} catch (Exception ex){ex.printStackTrace();}} }

    ?

  • 實(shí)現(xiàn)消費(fèi)者 package com.sam.ptp;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** @author JAVA開發(fā)老菜鳥** 主動消費(fèi)*/ public class Consumer {public String consumer() throws JMSException {ConnectionFactory factory = null;Connection connection = null;Session session = null;MessageConsumer consumer = null;try {factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");connection = factory.createConnection();/*** 消費(fèi)者必須啟動連接,否則無法消費(fèi)*/connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue(Producer.QUEUE_NAME);consumer = session.createConsumer(destination);/*** 獲取隊(duì)列消息*/Message message = consumer.receive();String text = ((TextMessage) message).getText();return text;} catch(Exception ex){throw ex;} finally {/*** 7.釋放資源*/if(consumer != null){consumer.close();}if(session != null){session.close();}if(connection != null){connection.close();}}}public static void main(String[] args){Consumer consumer = new Consumer();try{String message = consumer.consumer();System.out.println("消息消費(fèi)成功:" + message);} catch (Exception ex){ex.printStackTrace();}} }

    ?

  • 好,這樣代碼就寫好了,我們來測試下。

    1.先運(yùn)行生產(chǎn)者,我發(fā)現(xiàn)報(bào)錯了。。。

    好吧,原來是我這次沒有啟動ActiveMQ,被自己蠢哭了。。。

    啟動ActiveMQ之后,再運(yùn)行生產(chǎn)者,成功了。

    去看下控制臺頁面的變化,隊(duì)列里面多了個“ptp-demo”隊(duì)列,這個就是我們生產(chǎn)者代碼里面的隊(duì)列名,并且能看到該隊(duì)列的基本情況:

    從左到右依次為,有待消費(fèi)的消息1條、消費(fèi)者0個、已經(jīng)發(fā)送的消息1條、已經(jīng)消費(fèi)的消息0條

    2.接下來運(yùn)行消費(fèi)者,成功

    再去看下控制臺頁面,發(fā)現(xiàn)隊(duì)列信息變了,從左到右依次為:有待消費(fèi)的消息0條、消費(fèi)者0個、已經(jīng)發(fā)送的消息1條、已經(jīng)消費(fèi)的消息1條

    也就是說,消息真的被消費(fèi)了!

    ?代碼寫完了,也按照預(yù)期執(zhí)行完了,我們現(xiàn)在再回過頭來分析下消費(fèi)者的代碼,會發(fā)現(xiàn)他在consumer.receive()之后不會再消費(fèi)其他消息了,即便后面再有消息被生產(chǎn)出來也不會再消費(fèi)。也就是說只能在運(yùn)行后消費(fèi)一次消息,這個就是主動消費(fèi)。

    如果想要循環(huán)消費(fèi)多次產(chǎn)生的消息的話,怎么辦呢?請用下面的監(jiān)聽消費(fèi)

    ?

    三、消費(fèi)者監(jiān)聽消費(fèi)

    還是先上代碼,代碼結(jié)構(gòu)同主動消費(fèi)類似,有細(xì)微差別,具體代碼不貼了,可以到我的GitHub或碼云上獲取源碼

  • 首先為了區(qū)分,我把隊(duì)列名改了 public static final String QUEUE_NAME = "ptp-listener-demo";//隊(duì)列名

    ?

  • 生產(chǎn)者和消費(fèi)者的消息確認(rèn)方式都改成了客戶端手動確認(rèn),不再自動確認(rèn),手動確認(rèn)有個好處就是可以防止消息沒有被正常消費(fèi)而丟失,這個同RabbitMQ機(jī)制一樣 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    ?

  • 生產(chǎn)者生產(chǎn)消息的時候,為了方便我改成了一次性發(fā)送10條 /*** 6.創(chuàng)建消息并且生產(chǎn)者發(fā)送消息到目的地*/for(int num = 0; num < 10; num++){TextMessage textMessage = session.createTextMessage(message + num);producer.send(textMessage);System.out.println("消息發(fā)送成功"+textMessage.getText());}

    ?

  • 關(guān)鍵點(diǎn)來了,在消費(fèi)者上加了一個監(jiān)聽器 /*** 注冊監(jiān)聽器,隊(duì)列中的消息變化會自動觸發(fā)監(jiān)聽器,接收并自動處理消息** 監(jiān)聽器一旦注冊,永久有效,一直到程序關(guān)閉* 監(jiān)聽器可以注冊多個,相當(dāng)于集群* activemq自動輪詢多個監(jiān)聽器,實(shí)現(xiàn)并行處理*/consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {//需要手動確認(rèn)消息 message.acknowledge();TextMessage om = (TextMessage) message;String data = om.getText();System.out.println(data);} catch (JMSException e) {e.printStackTrace();}}});

    ?

  • 執(zhí)行生產(chǎn)者:

    ?

    執(zhí)行消費(fèi)者,消息全部被消費(fèi)了:

    ?

    再執(zhí)行2遍生產(chǎn)者,消息同樣都被消費(fèi)了。?

    控制臺頁面多了個隊(duì)列,由于監(jiān)聽中的消費(fèi)者沒有關(guān)閉,因此這里能看到消費(fèi)者數(shù)量為1,我執(zhí)行了三遍生產(chǎn)者,因此消息有30條。

    還沒完,繼續(xù)...

    我們這次先啟動2個消費(fèi)者,然后啟動生產(chǎn)者

    兩個生產(chǎn)者分別消費(fèi)了消息0,2,4,6,8和1,3,5,7,9

    也就是說兩個消費(fèi)者都監(jiān)聽到了消息,并且activemq自動輪詢兩個監(jiān)聽器發(fā)送消息。

    ?

    好,到這里,ActiveMQ的點(diǎn)對點(diǎn)模式就介紹完了。下一篇介紹發(fā)布訂閱模式,敬請期待

    轉(zhuǎn)載于:https://www.cnblogs.com/sam-uncle/p/10988930.html

    總結(jié)

    以上是生活随笔為你收集整理的ActiveMQ入门系列二:入门代码实例(点对点模式)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 一本之道高清无码视频 | 黄色aaaa| 色哟哟一区二区 | 少妇与公做了夜伦理69 | 天天色影综合网 | 国产精品专区在线观看 | 欧美精品中文 | 日本少妇做爰全过程毛片 | www.九色| 久久精品国产熟女亚洲AV麻豆 | 黄网站在线观 | 免费又黄又爽又色的视频 | 欧美成年人在线视频 | 欧美日韩在线成人 | 欧美浓毛大泬视频 | 一久久| 午夜秋霞 | 欧美色乱 | 亚洲精品乱码久久久久久久久久久久 | 国产精品永久免费观看 | 涩涩视屏 | 欧美国产成人在线 | 日韩在线免费看 | a毛片在线观看 | 正在播放老肥熟妇露脸 | 性生生活性生交a级 | 国精品无码一区二区三区 | 中文字幕人成人乱码亚洲电影 | 久久综合久 | 国产精品亚洲αv天堂无码 伊人性视频 | 国产高清一级 | 国模精品一区二区三区 | 91视频在线观看免费 | 黑人性xxx | 亚洲男人天堂2018 | 青青草免费公开视频 | 美国一级片网站 | 强行挺进白丝老师里呻吟 | 青青草视频在线观看 | 男人的天堂av网 | 亚洲一区二区三区婷婷 | 最新av | 午夜影院0606 | 神马老子午夜 | 爱情岛论坛亚洲自拍 | 成人激情视频网站 | 日韩色| 有机z中国电影免费观看 | 欧美三级自拍 | 麻豆毛片| 老女人一区 | 欧美色吊丝| 国产日韩一区二区三区在线观看 | 在线观看高清视频 | 亚洲free性xxxx护士白浆 | 午夜激情视频在线播放 | 日韩欧美亚洲一区二区三区 | 激情小说视频在线 | 农民工hdxxxx性中国 | 人妻丝袜一区 | 九九在线 | 13日本xxxxxⅹxxx20 | 91社区视频 | 奇米777色 | 欧美成人tv | 在线观看的av | 久久精品成人一区二区三区蜜臀 | 日韩欧美中文字幕在线视频 | 天堂视频在线免费观看 | 中文字幕日韩精品一区 | 国产偷人妻精品一区二区在线 | 在线免费看黄视频 | 国产精品国产三级国产专播精品人 | 成人免费看片'在线观看 | 熟女自拍一区 | 亚洲色图在线观看 | 日韩成人av影院 | 国产成人精品一区二区三区免费 | 黄色片网站国产 | 久久国产热视频 | 欧美日韩激情在线一区二区三区 | 成年人av网站 | 国产美女在线免费 | 久久免费视频观看 | av体验区| 国产91精品ai换脸 | 黄色字幕网| 无码一区二区三区免费视频 | 丁香花电影免费播放电影 | av.www| 久久99热人妻偷产国产 | 欧美多人猛交狂配 | 日韩一区二区a片免费观看 伊人网综合在线 | 国产午夜福利100集发布 | 美女xx00 | 欧美a级黄色 | 一区二区在线播放视频 | 青青青手机在线视频 | 国产精品亚洲AV色欲三区不卡 |