日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

ActiveMQ消息传递的两种方式

發(fā)布時(shí)間:2025/3/19 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMQ消息传递的两种方式 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1.什么是ActiveMQ?

  ActiveMQ是apache提供的開(kāi)源的,實(shí)現(xiàn)消息傳遞的一個(gè)中間插件,可以和spring整合,是目前最流行的開(kāi)源消息總線,ActiveMQ是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。較相似的還有rabbitMQ和kafka等,都是最為消息傳遞的插件

2.ActiveMQ傳遞消息的兩種方式

前提:需要引入activemq的jar包

點(diǎn)對(duì)點(diǎn)方式(PTP):一個(gè)消費(fèi)者對(duì)應(yīng)一個(gè)生產(chǎn)者

發(fā)布/訂閱模式(Publish/Sub):一個(gè)生產(chǎn)者產(chǎn)生消息發(fā)送后,可以被多個(gè)消費(fèi)者進(jìn)行接收。

JMS定義了五種消息正文格式,以及消息的調(diào)用類(lèi)型,允許發(fā)送和接收一些不同類(lèi)型的數(shù)據(jù),提供現(xiàn)有消息格式的一些級(jí)別的兼容性。

StreamMessage:--JAVA原始的數(shù)據(jù)流

TextMessage:一個(gè)字符串對(duì)象

ObjectMessage:一個(gè)系列化的java對(duì)象

BytesMessage:一個(gè)字節(jié)對(duì)象

MapMessage:key/value方式的鍵值對(duì)

(1)點(diǎn)對(duì)點(diǎn)的方式(PTP)

  即:一個(gè)消息的生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者

生產(chǎn)者(Producer)實(shí)現(xiàn)步驟:

第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象,將服務(wù)端activemq的 ip 和 port 作為構(gòu)造參數(shù)傳遞

第二步:通過(guò)第一步創(chuàng)建的工廠對(duì)象獲得連接對(duì)象Connection

第三步:開(kāi)啟連接,直接調(diào)用connection對(duì)象的start方法即可

第四步:創(chuàng)建一個(gè)Session對(duì)象,通過(guò)connection對(duì)象創(chuàng)建

第五步:通過(guò)Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(該對(duì)象有兩種方式:topic和quene),這里使用quene

第六步:通過(guò)Session對(duì)象創(chuàng)建一個(gè)生產(chǎn)者Producer對(duì)象

第七步:創(chuàng)建Message對(duì)象,這里使用TextMessage對(duì)象,設(shè)置消息內(nèi)容

第八步:使用創(chuàng)建的生產(chǎn)者對(duì)象Producer發(fā)送消息

第九步:關(guān)閉資源(Producer對(duì)象,Connection對(duì)象,Session對(duì)象)

@Testpublic void testQueueProducer() throws Exception {// 第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。//brokerURL服務(wù)器的ip及端口號(hào)ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://ip地址:61616");// 第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。Connection connection = connectionFactory.createConnection();// 第三步:開(kāi)啟連接,調(diào)用Connection對(duì)象的start方法。 connection.start();// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。//第一個(gè)參數(shù):是否開(kāi)啟事務(wù)。true:開(kāi)啟事務(wù),第二個(gè)參數(shù)忽略。//第二個(gè)參數(shù):當(dāng)?shù)谝粋€(gè)參數(shù)為false時(shí),才有意義。消息的應(yīng)答模式。1、自動(dòng)應(yīng)答2、手動(dòng)應(yīng)答。一般是自動(dòng)應(yīng)答。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)Queue對(duì)象。//參數(shù):隊(duì)列的名稱。Queue queue = session.createQueue("test-queue");// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。MessageProducer producer = session.createProducer(queue);// 第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。/*TextMessage message = new ActiveMQTextMessage();message.setText("hello activeMq,this is my first test.");*/TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");// 第八步:使用Producer對(duì)象發(fā)送消息。 producer.send(textMessage);// 第九步:關(guān)閉資源。 producer.close();session.close();connection.close();}

消費(fèi)者實(shí)現(xiàn):

第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象,將服務(wù)端activemq的 ip 和 port 作為構(gòu)造參數(shù)傳遞

第二步:通過(guò)第一步創(chuàng)建的工廠對(duì)象獲得連接對(duì)象Connection

第三步:開(kāi)啟連接,直接調(diào)用connection對(duì)象的start方法即可

第四步:創(chuàng)建一個(gè)Session對(duì)象,通過(guò)connection對(duì)象創(chuàng)建

第五步:創(chuàng)建一個(gè)Destination對(duì)象,使用quene,需要和生產(chǎn)者的quene一致

第六步:創(chuàng)建一個(gè)消費(fèi)者對(duì)象

第七步:接收消息

第八步:打印接收的消息

第九步:關(guān)閉資源

消費(fèi)者的代碼:

@Testpublic void testQueueConsumer() throws Exception {// 第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");// 第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。Connection connection = connectionFactory.createConnection();// 第三步:開(kāi)啟連接。調(diào)用Connection對(duì)象的start方法。 connection.start();// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致queue,并且隊(duì)列的名稱一致。Queue queue = session.createQueue("test-queue");// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。MessageConsumer consumer = session.createConsumer(queue);// 第七步:接收消息。consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = null;//取消息的內(nèi)容text = textMessage.getText();// 第八步:打印消息。 System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});//等待鍵盤(pán)輸入 System.in.read();// 第九步:關(guān)閉資源 consumer.close();session.close();connection.close();} View Code

(2)訂閱發(fā)布方式傳遞消息:Topic??

補(bǔ)充:由于topic傳遞消息的特點(diǎn)是,一個(gè)生產(chǎn)者可以有多個(gè)消費(fèi)者,生產(chǎn)者生產(chǎn)的消息在沒(méi)有被消費(fèi)者消費(fèi)之前,并不會(huì)將消息持久化到activemq的服務(wù)端,發(fā)送的消息會(huì)自動(dòng)消失。所以 測(cè)試的時(shí)候需要先創(chuàng)建消費(fèi)者對(duì)象,然后在發(fā)送消息,防止消息丟失。

生產(chǎn)者實(shí)現(xiàn)步驟:

步驟和PTP的方式完全一樣,不同的是在創(chuàng)建Destination對(duì)象的時(shí)候,需要?jiǎng)?chuàng)建topic對(duì)象

直接上代碼:

@Testpublic void testTopicProducer() throws Exception {// 第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。// brokerURL服務(wù)器的ip及端口號(hào)ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP地址:61616");// 第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。Connection connection = connectionFactory.createConnection();// 第三步:開(kāi)啟連接,調(diào)用Connection對(duì)象的start方法。 connection.start();// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。// 第一個(gè)參數(shù):是否開(kāi)啟事務(wù)。true:開(kāi)啟事務(wù),第二個(gè)參數(shù)忽略。// 第二個(gè)參數(shù):當(dāng)?shù)谝粋€(gè)參數(shù)為false時(shí),才有意義。消息的應(yīng)答模式。1、自動(dòng)應(yīng)答2、手動(dòng)應(yīng)答。一般是自動(dòng)應(yīng)答。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)topic對(duì)象。// 參數(shù):話題的名稱。Topic topic = session.createTopic("test-topic");// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。MessageProducer producer = session.createProducer(topic);// 第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。/** TextMessage message = new ActiveMQTextMessage(); message.setText(* "hello activeMq,this is my first test.");*/TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");// 第八步:使用Producer對(duì)象發(fā)送消息。 producer.send(textMessage);// 第九步:關(guān)閉資源。 producer.close();session.close();connection.close();} View Code

消費(fèi)者實(shí)現(xiàn)的步驟:

步驟和PTP消費(fèi)者實(shí)現(xiàn)的步驟一樣,唯一不同的是在創(chuàng)建Destination對(duì)象的時(shí)候,創(chuàng)建topic對(duì)象,同時(shí)要和發(fā)布訂閱的生產(chǎn)者的topic一致

消費(fèi)者代碼:

@Testpublic void testTopicConsumer() throws Exception {// 第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP地址:61616");// 第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。Connection connection = connectionFactory.createConnection();// 第三步:開(kāi)啟連接。調(diào)用Connection對(duì)象的start方法。 connection.start();// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致topic,并且話題的名稱一致。Topic topic = session.createTopic("test-topic");// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。MessageConsumer consumer = session.createConsumer(topic);// 第七步:接收消息。consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = null;// 取消息的內(nèi)容text = textMessage.getText();// 第八步:打印消息。 System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});System.out.println("topic的消費(fèi)端03。。。。。");// 等待鍵盤(pán)輸入 System.in.read();// 第九步:關(guān)閉資源 consumer.close();session.close();connection.close();} View Code

總結(jié):兩種傳遞消息的方式的異同

相同點(diǎn):實(shí)現(xiàn)步驟基本一樣,大同小異

不同點(diǎn):PTP傳遞消息的方法,消息的生產(chǎn)者發(fā)送以后,消息會(huì)持久化在activemq的服務(wù)端,如果該消息給消費(fèi)者消費(fèi),在服務(wù)端持久化的消息也就同時(shí)被刪除。

發(fā)布訂閱傳遞消息的方法:消息的生產(chǎn)者發(fā)送消息以后,如果沒(méi)有消費(fèi)者消費(fèi),消息不會(huì)持久化在activemq的客戶端,會(huì)立即消失。如果創(chuàng)建的消息被消費(fèi),會(huì)的activemq的服務(wù)端顯示消息相關(guān)內(nèi)容。這一點(diǎn)和PTP剛好相反。

注意:發(fā)布訂閱傳遞消息的方式:也是可以實(shí)現(xiàn)消息持久化在服務(wù)端的,需要消費(fèi)者首先在activemq的服務(wù)端訂閱消息(注冊(cè)),將消費(fèi)者客戶端的ID(作為唯一標(biāo)識(shí),因?yàn)榭梢杂卸鄠€(gè)消費(fèi)者)和消息的ID傳遞給服務(wù)端即可。

?

轉(zhuǎn)載于:https://www.cnblogs.com/shuai-server/p/8966299.html

總結(jié)

以上是生活随笔為你收集整理的ActiveMQ消息传递的两种方式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。