ActiveMQ消息传递的两种方式
1.什么是ActiveMQ?
ActiveMQ是apache提供的開(kāi)源的,實(shí)現(xiàn)消息傳遞的一個(gè)中間插件,可以和spring整合,是目前最流行的開(kāi)源消息總線,ActiveMQ是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。較相似的還有rabbitMQ和kafka等,都是最為消息傳遞的插件
2.ActiveMQ傳遞消息的兩種方式
前提:需要引入activemq的jar包
點(diǎn)對(duì)點(diǎn)方式(PTP):一個(gè)消費(fèi)者對(duì)應(yīng)一個(gè)生產(chǎn)者
發(fā)布/訂閱模式(Publish/Sub):一個(gè)生產(chǎn)者產(chǎn)生消息發(fā)送后,可以被多個(gè)消費(fèi)者進(jìn)行接收。
JMS定義了五種消息正文格式,以及消息的調(diào)用類(lèi)型,允許發(fā)送和接收一些不同類(lèi)型的數(shù)據(jù),提供現(xiàn)有消息格式的一些級(jí)別的兼容性。
StreamMessage:--JAVA原始的數(shù)據(jù)流
TextMessage:一個(gè)字符串對(duì)象
ObjectMessage:一個(gè)系列化的java對(duì)象
BytesMessage:一個(gè)字節(jié)對(duì)象
MapMessage:key/value方式的鍵值對(duì)
(1)點(diǎn)對(duì)點(diǎn)的方式(PTP)
即:一個(gè)消息的生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者
生產(chǎn)者(Producer)實(shí)現(xiàn)步驟:
第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象,將服務(wù)端activemq的 ip 和 port 作為構(gòu)造參數(shù)傳遞
第二步:通過(guò)第一步創(chuàng)建的工廠對(duì)象獲得連接對(duì)象Connection
第三步:開(kāi)啟連接,直接調(diào)用connection對(duì)象的start方法即可
第四步:創(chuàng)建一個(gè)Session對(duì)象,通過(guò)connection對(duì)象創(chuàng)建
第五步:通過(guò)Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(該對(duì)象有兩種方式:topic和quene),這里使用quene
第六步:通過(guò)Session對(duì)象創(chuàng)建一個(gè)生產(chǎn)者Producer對(duì)象
第七步:創(chuàng)建Message對(duì)象,這里使用TextMessage對(duì)象,設(shè)置消息內(nèi)容
第八步:使用創(chuàng)建的生產(chǎn)者對(duì)象Producer發(fā)送消息
第九步:關(guān)閉資源(Producer對(duì)象,Connection對(duì)象,Session對(duì)象)
@Testpublic void testQueueProducer() throws Exception {// 第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。//brokerURL服務(wù)器的ip及端口號(hào)ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://ip地址:61616");// 第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。Connection connection = connectionFactory.createConnection();// 第三步:開(kāi)啟連接,調(diào)用Connection對(duì)象的start方法。 connection.start();// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。//第一個(gè)參數(shù):是否開(kāi)啟事務(wù)。true:開(kāi)啟事務(wù),第二個(gè)參數(shù)忽略。//第二個(gè)參數(shù):當(dāng)?shù)谝粋€(gè)參數(shù)為false時(shí),才有意義。消息的應(yīng)答模式。1、自動(dòng)應(yīng)答2、手動(dòng)應(yīng)答。一般是自動(dòng)應(yīng)答。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)Queue對(duì)象。//參數(shù):隊(duì)列的名稱。Queue queue = session.createQueue("test-queue");// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。MessageProducer producer = session.createProducer(queue);// 第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。/*TextMessage message = new ActiveMQTextMessage();message.setText("hello activeMq,this is my first test.");*/TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");// 第八步:使用Producer對(duì)象發(fā)送消息。 producer.send(textMessage);// 第九步:關(guān)閉資源。 producer.close();session.close();connection.close();}消費(fèi)者實(shí)現(xiàn):
第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象,將服務(wù)端activemq的 ip 和 port 作為構(gòu)造參數(shù)傳遞
第二步:通過(guò)第一步創(chuàng)建的工廠對(duì)象獲得連接對(duì)象Connection
第三步:開(kāi)啟連接,直接調(diào)用connection對(duì)象的start方法即可
第四步:創(chuàng)建一個(gè)Session對(duì)象,通過(guò)connection對(duì)象創(chuàng)建
第五步:創(chuàng)建一個(gè)Destination對(duì)象,使用quene,需要和生產(chǎn)者的quene一致
第六步:創(chuàng)建一個(gè)消費(fèi)者對(duì)象
第七步:接收消息
第八步:打印接收的消息
第九步:關(guān)閉資源
消費(fèi)者的代碼:
@Testpublic void testQueueConsumer() throws Exception {// 第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");// 第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。Connection connection = connectionFactory.createConnection();// 第三步:開(kāi)啟連接。調(diào)用Connection對(duì)象的start方法。 connection.start();// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致queue,并且隊(duì)列的名稱一致。Queue queue = session.createQueue("test-queue");// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。MessageConsumer consumer = session.createConsumer(queue);// 第七步:接收消息。consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = null;//取消息的內(nèi)容text = textMessage.getText();// 第八步:打印消息。 System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});//等待鍵盤(pán)輸入 System.in.read();// 第九步:關(guān)閉資源 consumer.close();session.close();connection.close();} View Code(2)訂閱發(fā)布方式傳遞消息:Topic??
補(bǔ)充:由于topic傳遞消息的特點(diǎn)是,一個(gè)生產(chǎn)者可以有多個(gè)消費(fèi)者,生產(chǎn)者生產(chǎn)的消息在沒(méi)有被消費(fèi)者消費(fèi)之前,并不會(huì)將消息持久化到activemq的服務(wù)端,發(fā)送的消息會(huì)自動(dòng)消失。所以 測(cè)試的時(shí)候需要先創(chuàng)建消費(fèi)者對(duì)象,然后在發(fā)送消息,防止消息丟失。
生產(chǎn)者實(shí)現(xiàn)步驟:
步驟和PTP的方式完全一樣,不同的是在創(chuàng)建Destination對(duì)象的時(shí)候,需要?jiǎng)?chuàng)建topic對(duì)象
直接上代碼:
@Testpublic void testTopicProducer() throws Exception {// 第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。// brokerURL服務(wù)器的ip及端口號(hào)ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP地址:61616");// 第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。Connection connection = connectionFactory.createConnection();// 第三步:開(kāi)啟連接,調(diào)用Connection對(duì)象的start方法。 connection.start();// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。// 第一個(gè)參數(shù):是否開(kāi)啟事務(wù)。true:開(kāi)啟事務(wù),第二個(gè)參數(shù)忽略。// 第二個(gè)參數(shù):當(dāng)?shù)谝粋€(gè)參數(shù)為false時(shí),才有意義。消息的應(yīng)答模式。1、自動(dòng)應(yīng)答2、手動(dòng)應(yīng)答。一般是自動(dòng)應(yīng)答。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)topic對(duì)象。// 參數(shù):話題的名稱。Topic topic = session.createTopic("test-topic");// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。MessageProducer producer = session.createProducer(topic);// 第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。/** TextMessage message = new ActiveMQTextMessage(); message.setText(* "hello activeMq,this is my first test.");*/TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");// 第八步:使用Producer對(duì)象發(fā)送消息。 producer.send(textMessage);// 第九步:關(guān)閉資源。 producer.close();session.close();connection.close();} View Code消費(fèi)者實(shí)現(xiàn)的步驟:
步驟和PTP消費(fèi)者實(shí)現(xiàn)的步驟一樣,唯一不同的是在創(chuàng)建Destination對(duì)象的時(shí)候,創(chuàng)建topic對(duì)象,同時(shí)要和發(fā)布訂閱的生產(chǎn)者的topic一致
消費(fèi)者代碼:
@Testpublic void testTopicConsumer() throws Exception {// 第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP地址:61616");// 第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。Connection connection = connectionFactory.createConnection();// 第三步:開(kāi)啟連接。調(diào)用Connection對(duì)象的start方法。 connection.start();// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致topic,并且話題的名稱一致。Topic topic = session.createTopic("test-topic");// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。MessageConsumer consumer = session.createConsumer(topic);// 第七步:接收消息。consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = null;// 取消息的內(nèi)容text = textMessage.getText();// 第八步:打印消息。 System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});System.out.println("topic的消費(fèi)端03。。。。。");// 等待鍵盤(pán)輸入 System.in.read();// 第九步:關(guān)閉資源 consumer.close();session.close();connection.close();} View Code總結(jié):兩種傳遞消息的方式的異同
相同點(diǎn):實(shí)現(xiàn)步驟基本一樣,大同小異
不同點(diǎn):PTP傳遞消息的方法,消息的生產(chǎn)者發(fā)送以后,消息會(huì)持久化在activemq的服務(wù)端,如果該消息給消費(fèi)者消費(fèi),在服務(wù)端持久化的消息也就同時(shí)被刪除。
發(fā)布訂閱傳遞消息的方法:消息的生產(chǎn)者發(fā)送消息以后,如果沒(méi)有消費(fèi)者消費(fèi),消息不會(huì)持久化在activemq的客戶端,會(huì)立即消失。如果創(chuàng)建的消息被消費(fèi),會(huì)的activemq的服務(wù)端顯示消息相關(guān)內(nèi)容。這一點(diǎn)和PTP剛好相反。
注意:發(fā)布訂閱傳遞消息的方式:也是可以實(shí)現(xiàn)消息持久化在服務(wù)端的,需要消費(fèi)者首先在activemq的服務(wù)端訂閱消息(注冊(cè)),將消費(fèi)者客戶端的ID(作為唯一標(biāo)識(shí),因?yàn)榭梢杂卸鄠€(gè)消費(fèi)者)和消息的ID傳遞給服務(wù)端即可。
?
轉(zhuǎn)載于:https://www.cnblogs.com/shuai-server/p/8966299.html
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ消息传递的两种方式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: [转].Net实现本地化简易教程
- 下一篇: Gif(2)-加载视图-波纹