ActiveMQ菜鸟入门教程
本篇文章介紹一個簡單的ActiveMQdemo搭建,本人試過了代碼可以運行成功。
原文鏈接:https://www.songliguo.com/activemq-getting-started.html
ActiveMQ官網下載地址:?http://activemq.apache.org/download.html??
ActiveMQ安裝可參考:?https://blog.csdn.net/mr_haixin/article/details/80418204
ActiveMQ是Apache的一個開源項目,它是一個能力強勁的開源消息總線,也是一個中間件產品。它是JMS的一個實現。
在介紹ActiveMQ之前,先來復習一下J2EE中的JMS規范。
JMS是Java Message Service的簡稱,用來發送異步消息,在不同系統和不同的模塊之間我們可以利用它實現集成。
JMS有兩個好處,第一個就是讓模塊之間或者系統之間的耦合度降低,第二個是異步通信。
JMS的消息機制有2種模型,一種是Point to Point,表現為隊列的形式。發送的消息,只能被一個接收者取走;另一種是Topic,可以被多個訂閱者訂閱,類似于群發。
在JMS中有這樣幾個重要的核心接口和類:
- ConnectionFactory,用于jms client獲取與jms provider的連接。不同的jms產品,對這個接口有不同的實現,比如說ActiveMQ,這個接口的實現類是ActiveMQConnectionFactory
- Connection,是由ConnectionFactory產生的,表示jms client與jms provider的連接
- ?Session,是由Connection產生的,表示一個會話。Session是關鍵組件,Message、Producer/Consumer、Destination都是在Session上創建的
- ?Message,這個組件很好理解,就是傳輸的消息,里面包括head、properties、body,其中head是必選的
- ?Destination,是消息源,對發送者來說,就是消息發到哪里;對接收者來說,就是從哪里取消息。Destination有2個子接口,Queue和Topic,分別對應上面提到的2種模型
- MessageProducer,是消息發送者,創建這個組件的代碼類似:
?
?
| 1 2 3 4 5 6 | ? //創建一個Queue,名稱為SongLiGuo_FirstQueue destination = session.createQueue("SongLiGuo_FirstQueue"); //得到消息生產者【發送者】 messageProducer = session.createProducer(destination); ? |
可以注意到,這里需要把Destination作為參數,傳入createProducer()方法,這說明消息發送者是綁定到Destination上的,這個發送者發送的消息,會發送到這個綁定的Destination上
- ?MessageConsumer,是消息接收者,和Message Producer是相反的一種組件
?
對JMS有所了解之后,我們來看ActiveMQ。
1.下載ActiveMQ
去官方網站下載:http://activemq.apache.org/
2.運行ActiveMQ
解壓縮apache-activemq-5.5.1-bin.zip,然后雙擊apache-activemq-5.5.1binactivemq.bat運行ActiveMQ程序。
啟動ActiveMQ以后,登陸:http://localhost:8161/admin/,創建一個Queue,命名為SongLiGuo_FirstQueue(如果這里創建了程序就不用創建)
3.創建Eclipse項目并運行
創建project:ActiveMQ-5.5,并導入apache-activemq-5.5.1lib目錄下需要用到的jar文件,項目結構如下圖所示:
3.1.Sender.java
?
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | ? package com.songliguo.activemq; ? import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; ? import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; ? /** * *???? * 項目名稱:ActiveMQ-5.5?? * 類名稱:Sender?? * 類描述:?? ActiveMQ發送者 * 創建人:Songliguo?? * 創建時間:2017年3月14日 上午10:01:02?? * 修改人:?? * 修改時間: * 修改備注:?? * @version???? * */ public class Sender { ? private static final int SEND_NUMBER = 10; public static void main(String[] args) { //ConnectionFactory是連接工廠,JMS用它創建連接 ConnectionFactory connectionFactory; //Connection JMS客戶端到JMS provider的連接 Connection connection = null; //Session 一個發送或者接收消息的線程 Session session; //Destination 消息發送目的地,消息發送給誰接收 Destination destination; //MessageProducer 消息發送者 MessageProducer messageProducer; //構造ConnectionFactory 實例對象,此處采用ActiveMQ的實現jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { //構造工廠得到連接對象 connection = connectionFactory.createConnection(); //啟動 connection.start(); //獲取操作連接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //創建一個Queue,名稱為SongLiGuo_FirstQueue ????????????????????????destination = session.createQueue("SongLiGuo_FirstQueue"); //得到消息生產者【發送者】 messageProducer = session.createProducer(destination); //設置不持久化,根據實際情況而定 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //構造消息,此處寫死,項目就是參數或者方法獲取 sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); }finally { try { if(null != connection){ connection.close(); } } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer)throws Exception { ????????for (int i = 1; i <= SEND_NUMBER; i++) { ????????????TextMessage message = session.createTextMessage("ActiveMq 發送的消息" + i); ????????????// 發送消息到目的地方 ????????????System.out.println("發送消息:" + "ActiveMq 發送的消息" + i); ????????????producer.send(message); ????????} ????} } ? ? |
?
3.2.Receiver.java
?
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | ? package com.songliguo.activemq; ? import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; ? import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; ? /** * *???? * 項目名稱:ActiveMQ-5.5?? * 類名稱:Receiver?? * 類描述:??activeMQ接收類 * 創建人:Songliguo?? * 創建時間:2017年3月14日 上午10:31:35?? * 修改人:?? * 修改時間: * 修改備注:?? * @version???? * */ public class Receiver { ? public static void main(String[] args) { //connectionFactory 連接工廠,JMS用它創建連接 ConnectionFactory connectionFactory; //connection JMS客戶端到JMS provider 的連接 Connection connection = null; //session一個發送或者接收的線程 Session session; //destination 消息目的地,發送給誰接收 Destination destination; //消費者消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { //構造工廠得到連接對象 connection = connectionFactory.createConnection(); //啟動 connection.start(); //獲取操作連接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("SongLiGuo_FirstQueue"); ????????????consumer = session.createConsumer(destination); ????????????while(true){ ???????????? //設置接收者收消息的時間,為了方便測試,這里暫定設置為100s ???????????? TextMessage message = (TextMessage)consumer.receive(100); ???????????? if(null != message){ ???????????? System.out.println("收到消息==="+message.getText()); ???????????? }else{ ???????????? break; ???????????? } ????????????} } catch (Exception e) { e.printStackTrace(); }finally{ try { if(null != connection){ connection.close(); } } catch (Throwable ignore) { } } } } ? ? |
?
4.注意事項
項目所引用的jar最后在ActiveMQ下的lib中找
5.測試結果
運行sender,在運行完sender以后,我們可以看到如下console
我們再切換到receiver運行后的console,如下圖所示:
?
6.我們可以在http://localhost:8161/admin/queues.jsp看到消息發送和接收情況
轉載請注明原文鏈接:首頁?->?技術交流?->?JAVA開發?->?ActiveMQ菜鳥入門教程
總結
以上是生活随笔為你收集整理的ActiveMQ菜鸟入门教程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 恩施机器人编程_恩施武汉机器人激光切割机
- 下一篇: limbo模拟器镜像Android,li