给女朋友讲ActiveMQ是啥?
1 ActiveMQ是啥
ActiveMQ 就是一個消息中間件,市面上現在有很多的消息中間件開源產品,比如,RocketMQ、RabbitMQ、Kafka等。
拿一個簡單的比喻來說,消息中間件就是一個中轉站,在程序中加的一個中轉站,有了這樣一個類似快遞的存儲站點,可以大大的減輕物流的壓力,而對應到程序中,也就是減輕了程序的壓力。
另外不得不說的是,ActiveMQ是遵從 JMS 規范的消息中間件,那么什么是 JMS 規范呢?
JMS 規范
JMS是java的消息服務,JMS的客戶端之間可以通過JMS服務進行異步的消息傳輸。
消息模型
- Point-to-Point(P2P),點對點
- P2P模式圖
如上圖,有幾個需要了解的概念,發送者、接收者、消息隊列。
在點對點模型中,一般消息由發送者將消息發送到消息隊列中,然后,接收者從消息隊列中消費消息,消息被消費者消費之后,消息就不存在了。
- Publish/Subscribe(Pub/Sub),發布訂閱模型
- Pub/Sub模式圖
如上圖,有下面幾個概念,主題、發布者、訂閱者。
在發布訂閱模型中,發布者通常將消息發布到主題(topic)中,然后,訂閱者通過訂閱主題來消費消息,與 P2P 模型不同的是,發布訂閱模型的消息是可以被多次消費的!
兩種模式的區別
1、P2P在發送者和接收者之間沒有時間上的依賴性,也就是說發送者發送了消息之后,不管接收者有沒有運行,不會影響消息發送到隊列,而Pub/Sub模式有時間上的依賴性,消費者必須先訂閱主題,才能夠消費消息。2、P2P模式的每個消息只能有一個消費者,消費完了消息就不存在了,Pub/Sub模式可以有多個消費者。
2 為什么需要使用消息中間件
到這里我就不得不講一個小故事了!
小明、小李和小白都是在一個項目組的 Java 開發人員,但是呢,他們的團隊比較小,只有幾個開發人員,而他們正在開發一個項目,這個項目比較龐大,所以,項目負責人就考慮到項目進度,給他們每個人都分一個模塊單獨開發,這樣就能夠加快項目的進度了。
然而,萬萬沒有想到的是,當項目開發到一定階段的時候,小明、小李和小白各自負責的模塊都需要項目調用數據了,但是呢,現在問題來了,每次小白向小明需要數據的時候,小明總是要改接口來滿足小白的需求,而且還會擔心小明的系統會不會出問題,如果出了問題就調用不了怎么辦?這樣就總是耽誤項目的進度,小李那邊也是出現了這種問題!
于是,小明就想了個辦法,如果在各個模塊之間再加一個模塊,用來處理數據,比如一個隊列來存數據,每次就把數據丟到那個模塊中去,這樣就不用擔心那個問題啦。小明是不是很聰明!
其實,小明沒有做足夠的調查,他說的這個模塊,就是 ActiveMQ 的作用所在啦。
也就是降低模塊與模塊之間的耦合度,達到解耦的目的!
然后,他們又遇到了一個問題,他們在開發一個用戶注冊模塊的時候,是先注冊,然后寫入數據庫,然后再發送郵件或者短信通知用戶,但是,他們發現這樣的系統速度很慢!
后來,他們發現了消息中間件后,改造了一下,變成了下面的模式。
他們也發現了,這就是消息中間件帶來的異步執行的優勢!系統速度杠杠的!
后來,小明、小李和小白開發的系統呢上線了,但是,公司業快速發展,當流量大的時候,系統的數據調用總是負荷不了,出現宕機的問題,沒辦法,只能再改代碼了!
他們靈機一動,前面都用了消息中間件了,但是沒有發現另外一個功能,我們可以加入消息中間件,控制每次消費消息的數量,保證系統不會宕機,剩下的消息在系統流量小的時候再定時執行不就可以了。簡直不要太好!
小明、小李和小白經過這個系統的開發,終于明白了消息中間件的優勢了!
3 安裝使用
3.1 下載
到下面的官網地址下載,包括linux和Windows的不同版本。
- https://activemq.apache.org/components/classic/download/
3.2 解壓使用
windows使用方法
首先,解壓到一個自己的目錄,ActiveMQ目錄如下;
進入到對應的 bin 目錄;
里面有一個 activemq 的可執行文件,打開 cmd,執行:activemq start
成功啟動了!
關閉;
activemq stoplinux 使用方法
解壓到指定目錄;
sudo tar zxvf activemq-x.x.x-bin.tar.gz進入到 bin 目錄,執行下面命令;
./activemq start關閉;
./activemq stop后臺管理界面
啟動成功之后,可以輸出http://localhost:8161/admin/查看 ActiveMQ 的后臺管理界面,用戶名和密碼都為 admin。
ok,到這里,ActiveMQ的安裝和基本使用應該沒有問題了,接下來,我們使用 ActiveMQ 的 Java API 從一個入門實例開始講起!
4 ActiveMQ入門程序
4.1 前提條件
在開始之前,先申明一下需要的 Java 環境的配置,相關配置自行解決哦!
- Java JDK1.7 以上
- Maven 3.0 以上
- 開發工具 IDEA
4.2 帶你入門
step1:導入 Maven 相關依賴;
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><spring.version>4.3.10.RELEASE</spring.version></properties><dependencies><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.0</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>4.2.5.RELEASE</version></dependency></dependencies>step2:創建發送端類;
/*** @ClassName JmsSender* @Description* @Author 歐陽思海* @Date 2019/8/13 16:39* @Version 1.0**/ public class JmsSender {public static void main(String[] args) {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = null;try {connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);Destination destination = session.createQueue("queue");MessageProducer producer = session.createProducer(destination);TextMessage textMessage = session.createTextMessage("hello activemq");producer.send(textMessage);//session.commit();session.close();} catch (JMSException e) {e.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}} }上面的代碼創建了一個消息發送者,步驟如下:1、創建ActiveMQ實現的JMS規范的實現類ActiveMQConnectionFactory的對象connectionFactory ,并且給定參數ActiveMQ的服務地址;2、由connectionFactory調用方法createConnection創建連接connection對象;3、由connection對象調用createSession方法創建session會話對象;4、有了session對象之后,就可以發送者、隊列或者主題了,這里創建隊列,session.createQueue("queue"),并給定了隊列名稱為queue。5、session對象通過方法createProducer創建生產者,并且創建消息session.createTextMessage("hello activemq");6、生產者調用send的方法發送消息,producer.send(textMessage);
通過上面的步驟就可以將消息發送到隊列中了,接著只要等待消費者消費消息即可,消息消費后,消息就消失了。
通過上面的講解,也將JMS的主要的接口都概括了,包括:ConnectionFactory(連接工廠)、Session(會話)、Connection(連接);
step3:創建消費端類;
/*** @ClassName JmsReceiver* @Description* @Author 歐陽思海* @Date 2019/8/13 16:47* @Version 1.0**/ public class JmsReceiver {public static void main(String[] args) {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = null;try {//創建連接connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//創建隊列(如果隊列已經存在則不會創建,queue是隊列名稱)//destination表示目的地Destination destination = session.createQueue("queue");//創建消息接收者MessageConsumer consumer = session.createConsumer(destination);TextMessage textMessage = (TextMessage) consumer.receive();System.out.println(textMessage.getText());session.commit();session.close();} catch (JMSException e) {e.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}} }消費者和生產者的差別不大,前面的創建工廠、創建連接、創建會話對象和生產者一樣,區別在于,session.createConsumer(destination)通過session創建消費者,然后,調用receive方法接受消息。
運行發送端,查看后臺管理界面,點擊 Queues 選項,發現有一個入隊的消息,并且沒有出隊列;
運行接收端;
再查看后臺管理界面,消息被消費了;
5 ActiveMQ整合Spring
這一部分花了挺多時間琢磨的,首先是應為在實際的開發中,我們整合Spring來開發項目是最多的一種方式,這一塊如果可以學透的話,對于項目開發是非常有好處的,出于這個出發點,盡可能的把相關的知識講解的全面一些。
首先,這一部分分為以下三個部分來講解。
- 不使用 Spring 配置文件方式
- 使用 Spring 配置文件方式
- 注解方式(0配置)
5.1 前提條件
項目結構這次搭建的項目是一個子模塊聚合的項目,結構如下;
這個聚合的項目分為生產者(Producer) 和消費者(Consumer)兩個子模塊。
導入 Maven 依賴
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><spring.version>4.3.10.RELEASE</spring.version></properties><dependencyManagement><dependencies><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>4.2.5.RELEASE</version></dependency></dependencies></dependencyManagement>溫馨提示
由于我這里使用的是子模塊聚合的方式,所以,如果你不是這種方式的項目,直接給出各個依賴的版本在你的項目中即可!
5.2 不使用 Spring 配置文件方式
這一節的講解中,我們將采用不使用 Spring 的配置文件的方式,Maven 的相關依賴在上面已經給出,請參考上一節的內容。
生產者(Producer)
首先,我們來看一下生產者端,生產者端主要負責發送消息到 Broker 中,發送的目的地(Destination)可以分為隊列(Queue)和主題(Topic),下面,我們就看看如何不采用 Spring 配置文件的方式發送消息。
public static void main(String[] args) {ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = null;try {connection = cf.createConnection();connection.start();Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Queue destination = session.createQueue("queue2");JmsQueueSenderWithNotXml jmsQueueSender = new JmsQueueSenderWithNotXml();jmsQueueSender.setConnectionFactory(cf);jmsQueueSender.setQueue(destination);jmsQueueSender.simpleSend();jmsQueueSender.sendWithConversion();} catch (JMSException e) {e.printStackTrace();}}private JmsTemplate jmsTemplate;private Queue queue;public void setConnectionFactory(ConnectionFactory cf) {this.jmsTemplate = new JmsTemplate(cf);}public void setQueue(Queue queue) {this.queue = queue;}/** @Author 歐陽思海* @Description 發送簡單消息* @Date 15:45 2019/8/16* @Param []* @return void**/public void simpleSend() {this.jmsTemplate.send(this.queue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage("hello queue world");}});System.out.println("發送成功!");}/** @Author 歐陽思海* @Description 發送map類型的消息* @Date 15:46 2019/8/16* @Param []* @return void**/public void sendWithConversion() {Map map = new HashMap();map.put("Name", "sihai");map.put("Age", new Integer(18));jmsTemplate.convertAndSend("Queue3", map, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws JMSException {message.setIntProperty("num", 189);message.setJMSCorrelationID("00001");return message;}});System.out.println("發送成功!");}step1:上面是生產者端的所有代碼示例,在這個示例中,我們首先通過下面的代碼設置好ConnectionFactory 和Queue,并且調用JmsTemplateSpring提供的工具類提供兩個發送消息的方法 。
private JmsTemplate jmsTemplate;private Queue queue;public void setConnectionFactory(ConnectionFactory cf) {this.jmsTemplate = new JmsTemplate(cf);}public void setQueue(Queue queue) {this.queue = queue;}/** @Author 歐陽思海* @Description 發送簡單消息* @Date 15:45 2019/8/16* @Param []* @return void**/public void simpleSend() {this.jmsTemplate.send(this.queue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage("hello queue world");}});System.out.println("發送成功!");}/** @Author 歐陽思海* @Description 發送map類型的消息* @Date 15:46 2019/8/16* @Param []* @return void**/public void sendWithConversion() {Map map = new HashMap();map.put("Name", "sihai");map.put("Age", new Integer(18));jmsTemplate.convertAndSend("Queue3", map, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws JMSException {message.setIntProperty("num", 189);message.setJMSCorrelationID("00001");return message;}});System.out.println("發送成功!");}step2:使用Main方法,設置ConnectionFactory和Queue對象,接著,調用發送方法發送消息。
public static void main(String[] args) {ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = null;try {connection = cf.createConnection();connection.start();Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Queue destination = session.createQueue("queue2");JmsQueueSenderWithNotXml jmsQueueSender = new JmsQueueSenderWithNotXml();jmsQueueSender.setConnectionFactory(cf);jmsQueueSender.setQueue(destination);jmsQueueSender.simpleSend();jmsQueueSender.sendWithConversion();} catch (JMSException e) {e.printStackTrace();}}step2:接著,我們運行上面的代碼,輸出下面結果,再看一下ActiveMQ的控制臺,看看有沒有消息發送成功。
發現有一條掛起的消息和入隊列的消息,說明發送成功!
消費者(Consumer)
對于消費者,在這一節先不展開講解,可以先參考上面的入門程序的消費端的代碼消費消息,接下來的方式再講解消費端的消費消息。
5.3 使用 Spring 配置文件方式
上面一節中,講解了不使用 Spring 配置的方式如何發送消息,主要是想讓大家了解一下其中的原理,這一節中,將使用 Spring 配置的方式講解,這種方式在實際的開發中還是用的比較多的。
生產者(Producer)
既然是配置文件的方式,那么,首先,不得不講如何進行xml配置了。
step1:xml配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/jms https://www.springframework.org/schema/jms/spring-jms.xsd"><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean></property><property name="maxConnections" value="50"/></bean><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0" value="spring-queue"/></bean><!--<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg index="0" value="spring-topic"/></bean>--><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory"/><property name="defaultDestination" ref="destination"/><property name="messageConverter"><bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/></property></bean></beans>在上面的配置中,首先,需要配置connectionFactory(對應不使用配置的connectionFactory對象),然后,需要配置destination(對應不使用配置的destination),在這里使用的是向隊列發送消息,也可以使用主題(Topic),最后,配置 Spring 提供的jmsTemplate模板類。
step2:使用Main方法運行
public static void main(String[] args) {ApplicationContext application = new FileSystemXmlApplicationContext("G:\\ideaproject\\activemq\\Producer\\src\\main\\resources\\service-jms.xml");JmsTemplate jmsTemplate = (JmsTemplate) application.getBean("jmsTemplate");for (int i = 0; i < 10; i++) {int finalI = i;jmsTemplate.send((session) -> {TextMessage textMessage = session.createTextMessage();textMessage.setText("first message" + finalI);return textMessage;});}}在上面的代碼中,調用了JmsTemplate的send方法發送消息。運行之后,就成功發送消息了,這種方式還是簡潔不少的。
溫馨提示
上面我使用的是FileSystemXmlApplicationContext獲取xml配置文件,除此之外,你也可以使用ClassPathXmlApplicationContext來獲取。
消費者(Consumer)
在上一節中,沒有講解消費者,在這一節中,將重點講解。
step1:首先,我們還是需要配置xml文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:amq="http://activemq.apache.org/schema/core"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-4.1.xsdhttp://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms-4.1.xsdhttp://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"><!--連接工廠--><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean></property><property name="maxConnections" value="50"/></bean><!--配置隊列--><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0" value="queue2"/></bean><!-- 配置主題(topic)--><!-- <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg index="0" value="spring-topic"/></bean>--><!--配置spring的jms模板--><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory"/><property name="defaultDestination" ref="destination"/><property name="messageConverter"><bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/></property></bean><!-- 消息監聽器 --><!--<bean id="messageListener" class="com.sihai.activemq.listener.MyMessageListener"/>--><bean id="messageListener" class="com.sihai.activemq.listener.MySessionAwareMessageListener"></bean><!--jta事務--><!--<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>--><!-- 消息監聽器容器 --><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="destination"/><property name="messageListener" ref="messageListener"/><!--配置本地資源事務--><!--<property name="sessionTransacted" value="true"/>--><!--配置jta事務--><!--<property name="transactionManager" ref="transactionManager"/>--></bean><!--<!– 監聽注解支持 –><jms:annotation-driven />--></beans>最前面的配置和生產者是一樣的,需要配置connectionFactory(對應不使用配置的connectionFactory對象),然后,需要配置destination(對應不使用配置的destination)。
區別在于,消費者端需要配置一個消息監聽器容器,如下。
<!-- 消息監聽器 --><!--<bean id="messageListener" class="com.sihai.activemq.listener.MyMessageListener"/>--><bean id="messageListener" class="com.sihai.activemq.listener.MySessionAwareMessageListener"></bean><!--jta事務--><!--<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>--><!-- 消息監聽器容器 --><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="destination"/><property name="messageListener" ref="messageListener"/><!--配置本地資源事務--><!--<property name="sessionTransacted" value="true"/>--><!--配置jta事務--><!--<property name="transactionManager" ref="transactionManager"/>--></bean>那么這個怎么配置呢?請接著看。
step2:消息監聽器容器配置首先,我們需要寫一個類,實現MessageListener接口,然后實現一個名為onMessage的方法,通過這個方法就可以監聽是否有消息,有消息就消費。
/*** @ClassName MyMessageListener* @Description 消息消費監聽器實現* @Author 歐陽思海* @Date 2019/8/13 20:39* @Version 1.0**/ @Component public class MyMessageListener implements MessageListener {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {System.out.println(((TextMessage) message).getText());}catch (JMSException ex) {throw new RuntimeException(ex);}}else {throw new IllegalArgumentException("Message must be of type TextMessage");}} }如此,配置就完成了。
step3:啟動spring容器,運行。
/** @Author 歐陽思海* @Description xml配置方式獲取消息* @Date 18:09 2019/8/16* @Param []* @return void**/@Testpublic void test_01() throws IOException {ClassPathXmlApplicationContext application = new ClassPathXmlApplicationContext("G:\\ideaproject\\activemq\\Consumer\\src\\main\\resources\\service-jms.xml");/*JmsTemplate jmsTemplate = (JmsTemplate) application.getBean("jmsTemplate");String msg = (String) jmsTemplate.receiveAndConvert();System.out.println(msg);*/System.in.read();}在上面的代碼中,System.in.read(),這個作用就是一直等待,有消息就消費。
step4:開啟消息監聽器事務在消息處理的過程中是可以開啟事務的,如果出現處理失敗的情況,就會回滾。在消息監聽容器當中可以配置一個屬性是sessionTransacted的本地事務,如果value為true,就代表開啟本地事務。具體配置如下:
<!-- 消息監聽器容器 --><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="destination"/><property name="messageListener" ref="messageListener"/><!--配置本地資源事務--><property name="sessionTransacted" value="true"/> </bean>消息監聽器容器
上面的消費者的講解中,其實,最重要的就是消息監聽器容器配置了,這一部分,我們就詳細的講解一下消息監聽器容器的配置方法。
1 實現MessageListener接口這種方式就是上面的實例使用的方式,先看看這個接口。
public interface MessageListener {void onMessage(Message var1); }這個接口很簡單,只有一個方法onMessage,通過拿到Message參數讀取消息,這里就不再多說了。
2 實現SessionAwareMessageListener接口這個接口平時很少用到,但是,其實是有這個接口可以實現的,這個接口和上面的MessageListener接口有點不一樣,這個接口是Spring提供的。
public interface SessionAwareMessageListener<M extends Message> {void onMessage(M var1, Session var2) throws JMSException; }另外,你可以看到,這個接口提供的是一個泛型接口,可以是M extends Message這個類型,同時,實現的方式onMessage,還多了一個Session參數,可以在獲取消息的同時處理Session。
使用實例
/*** @ClassName MySessionAwareMessageListener* @Description 實現SessionAwareMessageListener的消息監聽器* @Author 歐陽思海* @Date 2019/8/16 16:02* @Version 1.0**/ public class MySessionAwareMessageListener implements SessionAwareMessageListener {@Overridepublic void onMessage(Message message, Session session) throws JMSException {if (message instanceof TextMessage) {try {System.out.println(((TextMessage) message).getText());}catch (JMSException ex) {throw new RuntimeException(ex);}}else {throw new IllegalArgumentException("Message must be of type TextMessage");}} }5.4 注解方式(0配置)
前面已經介紹了兩種方式,分別是不使用xml配置方式和使用xml配置的方式,但是,由于現在微服務的興起,約定優于配置是現在的一種趨勢,所以,在這一節中,我們使用注解的方式來處理。
生產者(Producer)
由于使用注解的方式,所以,我們不再需要xml配置文件了,但是,我們可以參照上面的xml的配置方式來配置注解的方式。
step1:首先,我們需要一個 Java 配置類,如下;
/*** @ClassName ProducerConfig* @Description 不用xml的配置類* @Author 歐陽思海* @Date 2019/8/16 17:41* @Version 1.0**/ @Configuration public class ProducerConfig {@Bean//配置ConnectionFactory用于生成connectionpublic ActiveMQConnectionFactory connectionFactory() {ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory("tcp://localhost:61616");return activeMQConnectionFactory;}@Bean//注冊SingleConnectionFactory,這個spring的一個包裝工廠 用于管理真正的ConnectionFactorypublic SingleConnectionFactory singleConnectionFactory(ActiveMQConnectionFactory activeMQconnectionFactory) {SingleConnectionFactory connectionFactory = new SingleConnectionFactory();//設置目標工廠connectionFactory.setTargetConnectionFactory(activeMQconnectionFactory);return connectionFactory;}@Bean//配置生產者,jmsTemplatepublic JmsTemplate jmsTemplate(SingleConnectionFactory connectionFactory) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setConnectionFactory(connectionFactory);return jmsTemplate;}/*** 配置隊列目的的: 根據測試需要配置其中一個* 1.隊列 點對點 queue* 2.主題 一對多 topic*/@Bean //public ActiveMQQueue queueDestination() {ActiveMQQueue activeMQQueue = new ActiveMQQueue("queue-anno");return activeMQQueue;}@Beanpublic ActiveMQTopic topicDestination() {ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic-anno");return activeMQTopic;} }上面的配置的每一個方法就對應xml配置的每一個節點,對應起來配置會比較簡單,每一個方法都使用了@Bean這個注解,類上使用Configuration,將這些配置加入到 spring 容器中。
step2:啟動 spring 容器,發送消息;
/*** @ClassName JmsSenderWithAnnotation* @Description 注解發送方式* @Author 歐陽思海* @Date 2019/8/16 18:04* @Version 1.0**/ public class JmsSenderWithAnnotation {/** @Author 歐陽思海* @Description 測試點對點* @Date 18:05 2019/8/16* @Param []* @return void**/@Testpublic void testActiveMqAnnotation() {AnnotationConfigApplicationContext aContext =new AnnotationConfigApplicationContext(ProducerConfig.class);//獲得發送者的模板對象JmsTemplate jmsTemplate = aContext.getBean(JmsTemplate.class);Destination bean = (Destination) aContext.getBean("queueDestination");//發送消息jmsTemplate.send(bean, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {TextMessage message = session.createTextMessage();message.setText("activemq message for queue");return message;}});}/** @Author 歐陽思海* @Description 測試topic發送* @Date 18:06 2019/8/16* @Param []* @return void**/@Testpublic void testActiveMqAnnotation2() {AnnotationConfigApplicationContext aContext =new AnnotationConfigApplicationContext(ProducerConfig.class);//獲得發送者的模板對象JmsTemplate jmsTemplate = aContext.getBean(JmsTemplate.class);Destination bean = (Destination) aContext.getBean("topicDestination");//發送消息jmsTemplate.send(bean, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {TextMessage message = session.createTextMessage();message.setText("activemq message for topic");return message;}});} }分別運行這兩個測試,查看ActiveMQ控制臺,發現Queue和Topic都有一條消息發送成功;
消費者(Consumer)
消費者的大概也差不多,跟xml的配置一樣,多的也是消息監聽容器的配置,來看看;
step1:首先,Java 配置類
*** @ClassName ConsumerConfig* @Description 不用xml的配置類* @Author 歐陽思海* @Date 2019/8/16 17:44* @Version 1.0**/ @ComponentScan(basePackages = {"com.sihai"}) @EnableJms @Configuration public class ConsumerConfig {@Bean//配置ConnectionFactory用于生成connectionpublic ActiveMQConnectionFactory connectionFactory() {ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory("tcp://localhost:61616");return activeMQConnectionFactory;}@Bean//注冊SingleConnectionFactory,這個spring的一個包裝工廠 用于管理真正的ConnectionFactorypublic SingleConnectionFactory singleConnectionFactory(ActiveMQConnectionFactory activeMQconnectionFactory) {SingleConnectionFactory connectionFactory = new SingleConnectionFactory();//設置目標工廠connectionFactory.setTargetConnectionFactory(activeMQconnectionFactory);return connectionFactory;}/*在xml當中的如下配置 效果相同* <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">* <property name="connectionFactory" ref="connectionFactory" />* <property name="destination" ref="topicDestination" />* <property name="messageListener" ref="itemListenerMessage" />* </bean>**/@Beanpublic DefaultMessageListenerContainer jmsListenerContainerFactory(SingleConnectionFactory singleConnectionFactory, MyMessageListener myMessageListener, Destination destination) {//創建容器DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();//設置監聽器jmsContainer.setMessageListener(myMessageListener);//設置連接工廠jmsContainer.setConnectionFactory(singleConnectionFactory);//設置監聽目的地的名字/也可以直接設置對象目的地jmsContainer.setDestination(destination);return jmsContainer;}/*** 1.隊列 點對點 queue* 2.主題 一對多 topic*/@Beanpublic ActiveMQQueue queueDestination() {ActiveMQQueue activeMQQueue = new ActiveMQQueue("queue-anno");return activeMQQueue;}/*@Beanpublic ActiveMQTopic topicDestination() {ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic-anno");return activeMQTopic;}*/ }其中只有一個消息監聽容器的配置是和生產者的配置不同的,消息監聽容器的配置需要配置消息監聽器、連接工廠和目的地(Destination)。
/*在xml當中的如下配置 效果相同* <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">* <property name="connectionFactory" ref="connectionFactory" />* <property name="destination" ref="topicDestination" />* <property name="messageListener" ref="itemListenerMessage" />* </bean>**/@Beanpublic DefaultMessageListenerContainer jmsListenerContainerFactory(SingleConnectionFactory singleConnectionFactory, MyMessageListener myMessageListener, Destination destination) {//創建容器DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();//設置監聽器jmsContainer.setMessageListener(myMessageListener);//設置連接工廠jmsContainer.setConnectionFactory(singleConnectionFactory);//設置監聽目的地的名字/也可以直接設置對象目的地jmsContainer.setDestination(destination);return jmsContainer;}step2:消息監聽器
/*** @ClassName MyMessageListener* @Description 消息消費監聽器實現* @Author 歐陽思海* @Date 2019/8/13 20:39* @Version 1.0**/ @Component public class MyMessageListener implements MessageListener {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {System.out.println(((TextMessage) message).getText());}catch (JMSException ex) {throw new RuntimeException(ex);}}else {throw new IllegalArgumentException("Message must be of type TextMessage");}} }這個前面已經講過了,這里就不再累贅了,但是,這里我需要講的是消息監聽器注解方式的配置,如下。
step3:消息監聽器注解方式的配置方法
/*** @ClassName JmsAnnotation* @Description 注解方式監聽* @Author 歐陽思海* @Date 2019/8/16 17:01* @Version 1.0**/ @Component @EnableJms public class JmsAnnotation {@JmsListener(destination = "queue-anno")public void onMessage(Message message) {if (message instanceof TextMessage) {try {System.out.println(((TextMessage) message).getText());}catch (JMSException ex) {throw new RuntimeException(ex);}}else {throw new IllegalArgumentException("Message must be of type TextMessage");}} }你會發現,在消息監聽器的類上面需要兩個配置@Component和@EnableJms,用于標記這是一個消息監聽器,另外,在onMessage方法上,需要一個@JmsListener(destination = "queue-anno")注解,可以標記需要哪個destination 。
注意:如果采用注解的消息監聽,那么需要修改Java類的消息監聽的容器的配置,否則會出現問題
step4:消息監聽容器配置更改將
/*在xml當中的如下配置 效果相同* <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">* <property name="connectionFactory" ref="connectionFactory" />* <property name="destination" ref="topicDestination" />* <property name="messageListener" ref="itemListenerMessage" />* </bean>**/@Beanpublic DefaultMessageListenerContainer jmsListenerContainerFactory(SingleConnectionFactory singleConnectionFactory, MyMessageListener myMessageListener, Destination destination) {//創建容器DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();//設置監聽器jmsContainer.setMessageListener(myMessageListener);//設置連接工廠jmsContainer.setConnectionFactory(singleConnectionFactory);//設置監聽目的地的名字/也可以直接設置對象目的地jmsContainer.setDestination(destination);return jmsContainer;}改為
@Beanpublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory()); // factory.setDestinationResolver(destinationResolver());factory.setSessionTransacted(true);factory.setConcurrency("3-10");return factory;}上面的修改會發現,實現接口的監聽器使用的是DefaultMessageListenerContainer,而注解的方式使用的是DefaultJmsListenerContainerFactory,所以,這里需要特別注意。
此時,消息監聽器是注解的方式的Java配置類就是下面這樣的。
/*** @ClassName ConsumerConfig* @Description 不用xml的配置類* @Author 歐陽思海* @Date 2019/8/16 17:44* @Version 1.0**/ @ComponentScan(basePackages = {"com.sihai"}) @EnableJms @Configuration public class ConsumerConfig {@Bean//配置ConnectionFactory用于生成connectionpublic ActiveMQConnectionFactory connectionFactory() {ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory("tcp://localhost:61616");return activeMQConnectionFactory;}@Bean//注冊SingleConnectionFactory,這個spring的一個包裝工廠 用于管理真正的ConnectionFactorypublic SingleConnectionFactory singleConnectionFactory(ActiveMQConnectionFactory activeMQconnectionFactory) {SingleConnectionFactory connectionFactory = new SingleConnectionFactory();//設置目標工廠connectionFactory.setTargetConnectionFactory(activeMQconnectionFactory);return connectionFactory;}@Beanpublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory()); // factory.setDestinationResolver(destinationResolver());factory.setSessionTransacted(true);factory.setConcurrency("3-10");return factory;}/*** 1.隊列 點對點 queue* 2.主題 一對多 topic*/@Beanpublic ActiveMQQueue queueDestination() {ActiveMQQueue activeMQQueue = new ActiveMQQueue("queue-anno");return activeMQQueue;}/*@Beanpublic ActiveMQTopic topicDestination() {ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic-anno");return activeMQTopic;}*/ }step5:啟動容器,消費消息
/*** @ClassName SpringSender* @Description* @Author 歐陽思海* @Date 2019/8/13 17:22* @Version 1.0**/ public class SpringReceiver {/** @Author 歐陽思海* @Description xml配置方式獲取消息* @Date 18:09 2019/8/16* @Param []* @return void**/@Testpublic void test_01() throws IOException {ApplicationContext application = new FileSystemXmlApplicationContext("G:\\ideaproject\\activemq\\Consumer\\src\\main\\resources\\service-jms.xml");/*JmsTemplate jmsTemplate = (JmsTemplate) application.getBean("jmsTemplate");String msg = (String) jmsTemplate.receiveAndConvert();System.out.println(msg);*/System.in.read();}/** @Author 歐陽思海* @Description 注解方式獲取消息* @Date 18:10 2019/8/16* @Param []* @return void**/@Testpublic void test_02() throws IOException {AnnotationConfigApplicationContext aContext =new AnnotationConfigApplicationContext(ConsumerConfig.class);/*JmsTemplate jmsTemplate = (JmsTemplate) application.getBean("jmsTemplate");String msg = (String) jmsTemplate.receiveAndConvert();System.out.println(msg);*/System.in.read();} }終于,到這里把ActiveMQ整合Spring的全部內容就講述完結了,這一部分講了三個部分,分別是:
- 不使用 Spring 配置文件方式
- 使用 Spring 配置文件方式
- 注解方式(0配置)
6 ActiveMQ支持的傳輸協議
6.1 默認協議介紹
在ActiveMQ中支持的協議還是挺多的,這也是ActiveMQ的一個特點之一,例如,默認支持AMQP、MQTT、OpenWire、STOMP、WebSocket,這些默認的協議的配置都是在activemq.xml配置文件中的。
<transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/></transportConnectors>注意:上面的每種協議的端口都必須是不一樣的。
6.2 其他協議
除了上面的協議外,還支持這些協議:TCP、UDP 、NIO、SSL、Http(s)、vm
那么如何使用這些協議呢?
只需要在上面的activemq.xml配置文件中的transportConnectors節點添加就可以,例如,添加 nio協議。
<transportConnectors><!-- 新增協議 --><transportConnector name="nio" uri="nio://0.0.0.0:61619"/><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/></transportConnectors>其他協議的添加方法也是相似的!
6.3 簡化配置
在ActiveMQ中還有一種更加簡單的配置方法,在uri中可以使用 auto 來簡化配置,ActiveMQ將監聽器端口的消息自動適配相應的協議。
<transportConnector name="auto" uri="auto://0.0.0.0:61619"/>如果需要更加安全,還可以在此基礎上添加ssl協議。
<transportConnector name="auto+ssl" uri="auto+ssl://0.0.0.0:61619"/>如果還想要提高傳輸的性能,可以配合上面的nio協議,提高網絡性能。
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61619"/>7 ActiveMQ的持久化存儲機制
持久化的作用是什么呢?
作用主要是為避免系統以外宕機而導致消息丟失,在ActiveMQ中支持多種持久化機制,比如,JDBC、AMQ、KahaDB、LevelDB,下面簡單介紹一下這幾種機制。
- JDBC:基于數據庫存儲的方式,可以存儲在Mysql等數據庫中,這種機制的性能瓶頸在Mysql等數據庫,所以其性能是不太好的。
配置方法在activemq.xml配置文件中配置,這里我們使用Mysql進行配置。
step1:修改persistenceAdapter節點
<persistenceAdapter><jdbcPersistenceAdapter dataSource="#mysqlDataSource" createTablesOnStartup="true"/><!--<kahaDB directory="${activemq.data}/kahadb"/>--></persistenceAdapter>其中,dataSource="#mysqlDataSource"是數據源引用。
step2:配置Mysql數據源
<bean id="mysqlDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://localhost:3306/test"/><property name="username" value="root"/><property name="password" value="123456"/></bean>這就是spring的配置方式。
step3:導入數據庫連接池、驅動等Jar包在ActiveMQ的目錄中有一個lib目錄,是存放jar包的目錄。
將下面幾個Jar放入。
step4:啟動ActiveMQ,查看結果啟動之后,打開mysql數據庫,發現生成了三張數據表。這樣就成功了,每次生成消息之后,就會將消息的信息存儲到這三張表中,消費之后,再刪除信息。
- AMQ:基于文件存儲,這種方式會把消息寫入日志文件,并且是順序存儲方式,這種方式比JDBC方式要好,缺點是:會為每個Destination創建索引,占用大量磁盤空間。
配置方法在activemq.xml配置文件中配置,更加詳細參數請參考:https://activemq.apache.org/amq-message-store。
<broker brokerName="broker" persistent="true" useShutdownHook="false"><persistenceAdapter><amqPersistenceAdapter directory="數據存儲目錄" maxFileLength="32mb"/></persistenceAdapter></broker>- KahaDB:這個5.4版本之后出現的默認的持久化方式,與AMQ很相似,不同的是只為Destination創建一個索引。
配置方法在activemq.xml配置文件中配置,更加詳細參數請參考:https://activemq.apache.org/kahadb。
<broker brokerName="broker"><persistenceAdapter><kahaDB directory="數據存儲目錄" journalMaxFileLength="32mb"/></persistenceAdapter></broker>- LevelDB:5.6版本后推出的新的持久化方式。這種比KahaDB更快,跟KahaDB類似,但是不是用自定義B數實現。但是需要注意的是,目前官網已經不推薦使用這種方式,而是推薦使用KahaDB。
配置方法在activemq.xml配置文件中配置,更加詳細的參數請參考:https://activemq.apache.org/leveldb-store。
<broker brokerName="broker" ... >...<persistenceAdapter><levelDB directory="數據存儲目錄"/></persistenceAdapter>...</broker>8 ActiveMQ網絡連接支持
Broker的網絡配置主要有三種配置方法,分別是靜態配置、動態配置和主從配置。
8.1 靜態配置
靜態傳輸提供了一種硬編碼機制,可以使用URI列表發現其他連接。使用此發現機制的連接將嘗試連接到列表中的所有URI,直到成功為止。
在activemq.xml配置文件中配置。
<networkConnectors><networkConnector uri="static:(tcp://localhoat:61616)"/></networkConnectors>配置語法
static:(uri1,uri2,uri3,…)?options舉例
static:(tcp://localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100uri的屬性說明
8.2 動態配置
在activemq.xml配置文件中配置。
<networkConnectors><networkConnector uri="multicast://default"/></networkConnectors>8.3 主從配置
Master-Slave模型是非常常見的,主從模型主要是為了防止一個網絡節點出現問題而提出的,提高了穩定性。
在ActiveMQ中也是可配置的,我們可以在activemq.xml配置文件中進行相關配置。
<networkConnectors><networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> </networkConnectors>注意:Master-Slave方式的第一個url需要是master,其他是slave。
另外,NetworkConnector 節點還有其他屬性可以配置,具體詳情可以查看官網:https://activemq.apache.org/networks-of-brokers。
8.4 容錯的客戶端連接方法
在前面的客戶端連接ActiveMQ的時候只是使用一個簡單的url進行連接。
ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory("tcp://localhost:61616");但是,這種方式會出現一個問題,一旦這臺ActiveMQ宕機了,就連接不上了,所以,有另外一種容錯的方式,當一臺出現宕機,可以連接上其他的機器,這樣就不會出現問題了。
ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://remotehost:61616)");其他屬性參數請參考:https://activemq.apache.org/failover-transport-reference。
文章有不當之處,歡迎指正,如果喜歡微信閱讀,你也可以關注我的微信公眾號:好好學java,獲取優質學習資源。
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的给女朋友讲ActiveMQ是啥?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FileZilla 连接不上虚拟机 ub
- 下一篇: 推荐几个华为、字节跳动、蚂蚁金服的大佬公