日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

给女朋友讲ActiveMQ是啥?

發(fā)布時(shí)間:2025/3/20 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 给女朋友讲ActiveMQ是啥? 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1 ActiveMQ是啥

ActiveMQ 就是一個(gè)消息中間件,市面上現(xiàn)在有很多的消息中間件開(kāi)源產(chǎn)品,比如,RocketMQ、RabbitMQ、Kafka等。

拿一個(gè)簡(jiǎn)單的比喻來(lái)說(shuō),消息中間件就是一個(gè)中轉(zhuǎn)站,在程序中加的一個(gè)中轉(zhuǎn)站,有了這樣一個(gè)類似快遞的存儲(chǔ)站點(diǎn),可以大大的減輕物流的壓力,而對(duì)應(yīng)到程序中,也就是減輕了程序的壓力。

另外不得不說(shuō)的是,ActiveMQ是遵從 JMS 規(guī)范的消息中間件,那么什么是 JMS 規(guī)范呢?

JMS 規(guī)范

JMS是java的消息服務(wù),JMS的客戶端之間可以通過(guò)JMS服務(wù)進(jìn)行異步的消息傳輸。

消息模型

  • Point-to-Point(P2P),點(diǎn)對(duì)點(diǎn)
  • P2P模式圖

如上圖,有幾個(gè)需要了解的概念,發(fā)送者、接收者、消息隊(duì)列

在點(diǎn)對(duì)點(diǎn)模型中,一般消息由發(fā)送者將消息發(fā)送到消息隊(duì)列中,然后,接收者從消息隊(duì)列中消費(fèi)消息,消息被消費(fèi)者消費(fèi)之后,消息就不存在了。

  • Publish/Subscribe(Pub/Sub),發(fā)布訂閱模型
  • Pub/Sub模式圖

如上圖,有下面幾個(gè)概念,主題、發(fā)布者、訂閱者

發(fā)布訂閱模型中,發(fā)布者通常將消息發(fā)布到主題(topic)中,然后,訂閱者通過(guò)訂閱主題來(lái)消費(fèi)消息,與 P2P 模型不同的是,發(fā)布訂閱模型的消息是可以被多次消費(fèi)的!

兩種模式的區(qū)別

1、P2P在發(fā)送者和接收者之間沒(méi)有時(shí)間上的依賴性,也就是說(shuō)發(fā)送者發(fā)送了消息之后,不管接收者有沒(méi)有運(yùn)行,不會(huì)影響消息發(fā)送到隊(duì)列,而Pub/Sub模式有時(shí)間上的依賴性,消費(fèi)者必須先訂閱主題,才能夠消費(fèi)消息。2、P2P模式的每個(gè)消息只能有一個(gè)消費(fèi)者,消費(fèi)完了消息就不存在了,Pub/Sub模式可以有多個(gè)消費(fèi)者。

2 為什么需要使用消息中間件

到這里我就不得不講一個(gè)小故事了!

小明、小李和小白都是在一個(gè)項(xiàng)目組的 Java 開(kāi)發(fā)人員,但是呢,他們的團(tuán)隊(duì)比較小,只有幾個(gè)開(kāi)發(fā)人員,而他們正在開(kāi)發(fā)一個(gè)項(xiàng)目,這個(gè)項(xiàng)目比較龐大,所以,項(xiàng)目負(fù)責(zé)人就考慮到項(xiàng)目進(jìn)度,給他們每個(gè)人都分一個(gè)模塊單獨(dú)開(kāi)發(fā),這樣就能夠加快項(xiàng)目的進(jìn)度了。

然而,萬(wàn)萬(wàn)沒(méi)有想到的是,當(dāng)項(xiàng)目開(kāi)發(fā)到一定階段的時(shí)候,小明、小李和小白各自負(fù)責(zé)的模塊都需要項(xiàng)目調(diào)用數(shù)據(jù)了,但是呢,現(xiàn)在問(wèn)題來(lái)了,每次小白小明需要數(shù)據(jù)的時(shí)候,小明總是要改接口來(lái)滿足小白的需求,而且還會(huì)擔(dān)心小明的系統(tǒng)會(huì)不會(huì)出問(wèn)題,如果出了問(wèn)題就調(diào)用不了怎么辦?這樣就總是耽誤項(xiàng)目的進(jìn)度,小李那邊也是出現(xiàn)了這種問(wèn)題!

于是,小明就想了個(gè)辦法,如果在各個(gè)模塊之間再加一個(gè)模塊,用來(lái)處理數(shù)據(jù),比如一個(gè)隊(duì)列來(lái)存數(shù)據(jù),每次就把數(shù)據(jù)丟到那個(gè)模塊中去,這樣就不用擔(dān)心那個(gè)問(wèn)題啦。小明是不是很聰明!

其實(shí),小明沒(méi)有做足夠的調(diào)查,他說(shuō)的這個(gè)模塊,就是 ActiveMQ 的作用所在啦。

也就是降低模塊與模塊之間的耦合度,達(dá)到解耦的目的!

然后,他們又遇到了一個(gè)問(wèn)題,他們?cè)陂_(kāi)發(fā)一個(gè)用戶注冊(cè)模塊的時(shí)候,是先注冊(cè),然后寫(xiě)入數(shù)據(jù)庫(kù),然后再發(fā)送郵件或者短信通知用戶,但是,他們發(fā)現(xiàn)這樣的系統(tǒng)速度很慢!

后來(lái),他們發(fā)現(xiàn)了消息中間件后,改造了一下,變成了下面的模式。

他們也發(fā)現(xiàn)了,這就是消息中間件帶來(lái)的異步執(zhí)行的優(yōu)勢(shì)!系統(tǒng)速度杠杠的!

后來(lái),小明、小李和小白開(kāi)發(fā)的系統(tǒng)呢上線了,但是,公司業(yè)快速發(fā)展,當(dāng)流量大的時(shí)候,系統(tǒng)的數(shù)據(jù)調(diào)用總是負(fù)荷不了,出現(xiàn)宕機(jī)的問(wèn)題,沒(méi)辦法,只能再改代碼了!

他們靈機(jī)一動(dòng),前面都用了消息中間件了,但是沒(méi)有發(fā)現(xiàn)另外一個(gè)功能,我們可以加入消息中間件,控制每次消費(fèi)消息的數(shù)量,保證系統(tǒng)不會(huì)宕機(jī),剩下的消息在系統(tǒng)流量小的時(shí)候再定時(shí)執(zhí)行不就可以了。簡(jiǎn)直不要太好!

小明、小李和小白經(jīng)過(guò)這個(gè)系統(tǒng)的開(kāi)發(fā),終于明白了消息中間件的優(yōu)勢(shì)了!

3 安裝使用

3.1 下載

到下面的官網(wǎng)地址下載,包括linux和Windows的不同版本。

  • https://activemq.apache.org/components/classic/download/

3.2 解壓使用

windows使用方法

首先,解壓到一個(gè)自己的目錄,ActiveMQ目錄如下;

進(jìn)入到對(duì)應(yīng)的 bin 目錄;

里面有一個(gè) activemq 的可執(zhí)行文件,打開(kāi) cmd,執(zhí)行:activemq start

成功啟動(dòng)了!

關(guān)閉;

activemq stop

linux 使用方法

解壓到指定目錄;

sudo tar zxvf activemq-x.x.x-bin.tar.gz

進(jìn)入到 bin 目錄,執(zhí)行下面命令;

./activemq start

關(guān)閉;

./activemq stop

后臺(tái)管理界面

啟動(dòng)成功之后,可以輸出http://localhost:8161/admin/查看 ActiveMQ 的后臺(tái)管理界面,用戶名和密碼都為 admin。

ok,到這里,ActiveMQ的安裝和基本使用應(yīng)該沒(méi)有問(wèn)題了,接下來(lái),我們使用 ActiveMQ 的 Java API 從一個(gè)入門實(shí)例開(kāi)始講起!

4 ActiveMQ入門程序

4.1 前提條件

在開(kāi)始之前,先申明一下需要的 Java 環(huán)境的配置,相關(guān)配置自行解決哦!

  • Java JDK1.7 以上
  • Maven 3.0 以上
  • 開(kāi)發(fā)工具 IDEA

4.2 帶你入門

step1:導(dǎo)入 Maven 相關(guān)依賴;

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><spring.version>4.3.10.RELEASE</spring.version></properties><dependencies><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.0</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>4.2.5.RELEASE</version></dependency></dependencies>

step2:創(chuàng)建發(fā)送端類;

/*** @ClassName JmsSender* @Description* @Author 歐陽(yáng)思海* @Date 2019/8/13 16:39* @Version 1.0**/ public class JmsSender {public static void main(String[] args) {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = null;try {connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);Destination destination = session.createQueue("queue");MessageProducer producer = session.createProducer(destination);TextMessage textMessage = session.createTextMessage("hello activemq");producer.send(textMessage);//session.commit();session.close();} catch (JMSException e) {e.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}} }

上面的代碼創(chuàng)建了一個(gè)消息發(fā)送者,步驟如下:1、創(chuàng)建ActiveMQ實(shí)現(xiàn)的JMS規(guī)范的實(shí)現(xiàn)類ActiveMQConnectionFactory的對(duì)象connectionFactory ,并且給定參數(shù)ActiveMQ的服務(wù)地址;2、由connectionFactory調(diào)用方法createConnection創(chuàng)建連接connection對(duì)象;3、由connection對(duì)象調(diào)用createSession方法創(chuàng)建session會(huì)話對(duì)象;4、有了session對(duì)象之后,就可以發(fā)送者、隊(duì)列或者主題了,這里創(chuàng)建隊(duì)列,session.createQueue("queue"),并給定了隊(duì)列名稱為queue。5、session對(duì)象通過(guò)方法createProducer創(chuàng)建生產(chǎn)者,并且創(chuàng)建消息session.createTextMessage("hello activemq");6、生產(chǎn)者調(diào)用send的方法發(fā)送消息,producer.send(textMessage);

通過(guò)上面的步驟就可以將消息發(fā)送到隊(duì)列中了,接著只要等待消費(fèi)者消費(fèi)消息即可,消息消費(fèi)后,消息就消失了。

通過(guò)上面的講解,也將JMS的主要的接口都概括了,包括:ConnectionFactory(連接工廠)、Session(會(huì)話)、Connection(連接)

step3:創(chuàng)建消費(fèi)端類;

/*** @ClassName JmsReceiver* @Description* @Author 歐陽(yáng)思海* @Date 2019/8/13 16:47* @Version 1.0**/ public class JmsReceiver {public static void main(String[] args) {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = null;try {//創(chuàng)建連接connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//創(chuàng)建隊(duì)列(如果隊(duì)列已經(jīng)存在則不會(huì)創(chuàng)建,queue是隊(duì)列名稱)//destination表示目的地Destination destination = session.createQueue("queue");//創(chuàng)建消息接收者M(jìn)essageConsumer consumer = session.createConsumer(destination);TextMessage textMessage = (TextMessage) consumer.receive();System.out.println(textMessage.getText());session.commit();session.close();} catch (JMSException e) {e.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}} }

消費(fèi)者和生產(chǎn)者的差別不大,前面的創(chuàng)建工廠、創(chuàng)建連接、創(chuàng)建會(huì)話對(duì)象和生產(chǎn)者一樣,區(qū)別在于,session.createConsumer(destination)通過(guò)session創(chuàng)建消費(fèi)者,然后,調(diào)用receive方法接受消息。

運(yùn)行發(fā)送端,查看后臺(tái)管理界面,點(diǎn)擊 Queues 選項(xiàng),發(fā)現(xiàn)有一個(gè)入隊(duì)的消息,并且沒(méi)有出隊(duì)列

運(yùn)行接收端

再查看后臺(tái)管理界面,消息被消費(fèi)了;

5 ActiveMQ整合Spring

這一部分花了挺多時(shí)間琢磨的,首先是應(yīng)為在實(shí)際的開(kāi)發(fā)中,我們整合Spring來(lái)開(kāi)發(fā)項(xiàng)目是最多的一種方式,這一塊如果可以學(xué)透的話,對(duì)于項(xiàng)目開(kāi)發(fā)是非常有好處的,出于這個(gè)出發(fā)點(diǎn),盡可能的把相關(guān)的知識(shí)講解的全面一些。

首先,這一部分分為以下三個(gè)部分來(lái)講解。

  • 不使用 Spring 配置文件方式
  • 使用 Spring 配置文件方式
  • 注解方式(0配置)

5.1 前提條件

  • JDK 1.7 以上
  • Maven 3.0 以上
  • Spring 4.3.1 ,或者以上版本
  • ActiveMQ 5.15.9 目前最新穩(wěn)定版本
  • 項(xiàng)目結(jié)構(gòu)這次搭建的項(xiàng)目是一個(gè)子模塊聚合的項(xiàng)目,結(jié)構(gòu)如下;

    這個(gè)聚合的項(xiàng)目分為生產(chǎn)者(Producer)消費(fèi)者(Consumer)兩個(gè)子模塊。

    導(dǎo)入 Maven 依賴

    <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><spring.version>4.3.10.RELEASE</spring.version></properties><dependencyManagement><dependencies><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>4.2.5.RELEASE</version></dependency></dependencies></dependencyManagement>

    溫馨提示
    由于我這里使用的是子模塊聚合的方式,所以,如果你不是這種方式的項(xiàng)目,直接給出各個(gè)依賴的版本在你的項(xiàng)目中即可!

    5.2 不使用 Spring 配置文件方式

    這一節(jié)的講解中,我們將采用不使用 Spring 的配置文件的方式,Maven 的相關(guān)依賴在上面已經(jīng)給出,請(qǐng)參考上一節(jié)的內(nèi)容。

    生產(chǎn)者(Producer)

    首先,我們來(lái)看一下生產(chǎn)者端,生產(chǎn)者端主要負(fù)責(zé)發(fā)送消息到 Broker 中,發(fā)送的目的地(Destination)可以分為隊(duì)列(Queue)和主題(Topic),下面,我們就看看如何不采用 Spring 配置文件的方式發(fā)送消息

    public static void main(String[] args) {ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = null;try {connection = cf.createConnection();connection.start();Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Queue destination = session.createQueue("queue2");JmsQueueSenderWithNotXml jmsQueueSender = new JmsQueueSenderWithNotXml();jmsQueueSender.setConnectionFactory(cf);jmsQueueSender.setQueue(destination);jmsQueueSender.simpleSend();jmsQueueSender.sendWithConversion();} catch (JMSException e) {e.printStackTrace();}}private JmsTemplate jmsTemplate;private Queue queue;public void setConnectionFactory(ConnectionFactory cf) {this.jmsTemplate = new JmsTemplate(cf);}public void setQueue(Queue queue) {this.queue = queue;}/** @Author 歐陽(yáng)思海* @Description 發(fā)送簡(jiǎn)單消息* @Date 15:45 2019/8/16* @Param []* @return void**/public void simpleSend() {this.jmsTemplate.send(this.queue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage("hello queue world");}});System.out.println("發(fā)送成功!");}/** @Author 歐陽(yáng)思海* @Description 發(fā)送map類型的消息* @Date 15:46 2019/8/16* @Param []* @return void**/public void sendWithConversion() {Map map = new HashMap();map.put("Name", "sihai");map.put("Age", new Integer(18));jmsTemplate.convertAndSend("Queue3", map, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws JMSException {message.setIntProperty("num", 189);message.setJMSCorrelationID("00001");return message;}});System.out.println("發(fā)送成功!");}

    step1:上面是生產(chǎn)者端的所有代碼示例,在這個(gè)示例中,我們首先通過(guò)下面的代碼設(shè)置好ConnectionFactory 和Queue,并且調(diào)用JmsTemplateSpring提供的工具類提供兩個(gè)發(fā)送消息的方法 。

    private JmsTemplate jmsTemplate;private Queue queue;public void setConnectionFactory(ConnectionFactory cf) {this.jmsTemplate = new JmsTemplate(cf);}public void setQueue(Queue queue) {this.queue = queue;}/** @Author 歐陽(yáng)思海* @Description 發(fā)送簡(jiǎn)單消息* @Date 15:45 2019/8/16* @Param []* @return void**/public void simpleSend() {this.jmsTemplate.send(this.queue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage("hello queue world");}});System.out.println("發(fā)送成功!");}/** @Author 歐陽(yáng)思海* @Description 發(fā)送map類型的消息* @Date 15:46 2019/8/16* @Param []* @return void**/public void sendWithConversion() {Map map = new HashMap();map.put("Name", "sihai");map.put("Age", new Integer(18));jmsTemplate.convertAndSend("Queue3", map, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws JMSException {message.setIntProperty("num", 189);message.setJMSCorrelationID("00001");return message;}});System.out.println("發(fā)送成功!");}

    step2:使用Main方法,設(shè)置ConnectionFactory和Queue對(duì)象,接著,調(diào)用發(fā)送方法發(fā)送消息。

    public static void main(String[] args) {ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = null;try {connection = cf.createConnection();connection.start();Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Queue destination = session.createQueue("queue2");JmsQueueSenderWithNotXml jmsQueueSender = new JmsQueueSenderWithNotXml();jmsQueueSender.setConnectionFactory(cf);jmsQueueSender.setQueue(destination);jmsQueueSender.simpleSend();jmsQueueSender.sendWithConversion();} catch (JMSException e) {e.printStackTrace();}}

    step2:接著,我們運(yùn)行上面的代碼,輸出下面結(jié)果,再看一下ActiveMQ的控制臺(tái),看看有沒(méi)有消息發(fā)送成功。

    發(fā)現(xiàn)有一條掛起的消息和入隊(duì)列的消息,說(shuō)明發(fā)送成功!

    消費(fèi)者(Consumer)

    對(duì)于消費(fèi)者,在這一節(jié)先不展開(kāi)講解,可以先參考上面的入門程序的消費(fèi)端的代碼消費(fèi)消息,接下來(lái)的方式再講解消費(fèi)端的消費(fèi)消息。

    5.3 使用 Spring 配置文件方式

    上面一節(jié)中,講解了不使用 Spring 配置的方式如何發(fā)送消息,主要是想讓大家了解一下其中的原理,這一節(jié)中,將使用 Spring 配置的方式講解,這種方式在實(shí)際的開(kāi)發(fā)中還是用的比較多的。

    生產(chǎn)者(Producer)

    既然是配置文件的方式,那么,首先,不得不講如何進(jìn)行xml配置了。

    step1:xml配置文件

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/jms https://www.springframework.org/schema/jms/spring-jms.xsd"><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean></property><property name="maxConnections" value="50"/></bean><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0" value="spring-queue"/></bean><!--<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg index="0" value="spring-topic"/></bean>--><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory"/><property name="defaultDestination" ref="destination"/><property name="messageConverter"><bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/></property></bean></beans>

    在上面的配置中,首先,需要配置connectionFactory(對(duì)應(yīng)不使用配置的connectionFactory對(duì)象),然后,需要配置destination(對(duì)應(yīng)不使用配置的destination),在這里使用的是向隊(duì)列發(fā)送消息,也可以使用主題(Topic),最后,配置 Spring 提供的jmsTemplate模板類。

    step2:使用Main方法運(yùn)行

    public static void main(String[] args) {ApplicationContext application = new FileSystemXmlApplicationContext("G:\\ideaproject\\activemq\\Producer\\src\\main\\resources\\service-jms.xml");JmsTemplate jmsTemplate = (JmsTemplate) application.getBean("jmsTemplate");for (int i = 0; i < 10; i++) {int finalI = i;jmsTemplate.send((session) -> {TextMessage textMessage = session.createTextMessage();textMessage.setText("first message" + finalI);return textMessage;});}}

    在上面的代碼中,調(diào)用了JmsTemplate的send方法發(fā)送消息。運(yùn)行之后,就成功發(fā)送消息了,這種方式還是簡(jiǎn)潔不少的。

    溫馨提示
    上面我使用的是FileSystemXmlApplicationContext獲取xml配置文件,除此之外,你也可以使用ClassPathXmlApplicationContext來(lái)獲取。

    消費(fèi)者(Consumer)

    在上一節(jié)中,沒(méi)有講解消費(fèi)者,在這一節(jié)中,將重點(diǎn)講解。

    step1:首先,我們還是需要配置xml文件

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:amq="http://activemq.apache.org/schema/core"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-4.1.xsdhttp://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms-4.1.xsdhttp://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"><!--連接工廠--><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean></property><property name="maxConnections" value="50"/></bean><!--配置隊(duì)列--><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0" value="queue2"/></bean><!-- 配置主題(topic)--><!-- <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg index="0" value="spring-topic"/></bean>--><!--配置spring的jms模板--><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory"/><property name="defaultDestination" ref="destination"/><property name="messageConverter"><bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/></property></bean><!-- 消息監(jiān)聽(tīng)器 --><!--<bean id="messageListener" class="com.sihai.activemq.listener.MyMessageListener"/>--><bean id="messageListener" class="com.sihai.activemq.listener.MySessionAwareMessageListener"></bean><!--jta事務(wù)--><!--<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>--><!-- 消息監(jiān)聽(tīng)器容器 --><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="destination"/><property name="messageListener" ref="messageListener"/><!--配置本地資源事務(wù)--><!--<property name="sessionTransacted" value="true"/>--><!--配置jta事務(wù)--><!--<property name="transactionManager" ref="transactionManager"/>--></bean><!--<!– 監(jiān)聽(tīng)注解支持 –><jms:annotation-driven />--></beans>

    最前面的配置和生產(chǎn)者是一樣的,需要配置connectionFactory(對(duì)應(yīng)不使用配置的connectionFactory對(duì)象),然后,需要配置destination(對(duì)應(yīng)不使用配置的destination)。

    區(qū)別在于,消費(fèi)者端需要配置一個(gè)消息監(jiān)聽(tīng)器容器,如下。

    <!-- 消息監(jiān)聽(tīng)器 --><!--<bean id="messageListener" class="com.sihai.activemq.listener.MyMessageListener"/>--><bean id="messageListener" class="com.sihai.activemq.listener.MySessionAwareMessageListener"></bean><!--jta事務(wù)--><!--<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>--><!-- 消息監(jiān)聽(tīng)器容器 --><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="destination"/><property name="messageListener" ref="messageListener"/><!--配置本地資源事務(wù)--><!--<property name="sessionTransacted" value="true"/>--><!--配置jta事務(wù)--><!--<property name="transactionManager" ref="transactionManager"/>--></bean>

    那么這個(gè)怎么配置呢?請(qǐng)接著看。

    step2:消息監(jiān)聽(tīng)器容器配置首先,我們需要寫(xiě)一個(gè)類,實(shí)現(xiàn)MessageListener接口,然后實(shí)現(xiàn)一個(gè)名為onMessage的方法,通過(guò)這個(gè)方法就可以監(jiān)聽(tīng)是否有消息,有消息就消費(fèi)

    /*** @ClassName MyMessageListener* @Description 消息消費(fèi)監(jiān)聽(tīng)器實(shí)現(xiàn)* @Author 歐陽(yáng)思海* @Date 2019/8/13 20:39* @Version 1.0**/ @Component public class MyMessageListener implements MessageListener {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {System.out.println(((TextMessage) message).getText());}catch (JMSException ex) {throw new RuntimeException(ex);}}else {throw new IllegalArgumentException("Message must be of type TextMessage");}} }

    如此,配置就完成了。

    step3:啟動(dòng)spring容器,運(yùn)行。

    /** @Author 歐陽(yáng)思海* @Description xml配置方式獲取消息* @Date 18:09 2019/8/16* @Param []* @return void**/@Testpublic void test_01() throws IOException {ClassPathXmlApplicationContext application = new ClassPathXmlApplicationContext("G:\\ideaproject\\activemq\\Consumer\\src\\main\\resources\\service-jms.xml");/*JmsTemplate jmsTemplate = (JmsTemplate) application.getBean("jmsTemplate");String msg = (String) jmsTemplate.receiveAndConvert();System.out.println(msg);*/System.in.read();}

    在上面的代碼中,System.in.read(),這個(gè)作用就是一直等待,有消息就消費(fèi)。

    step4:開(kāi)啟消息監(jiān)聽(tīng)器事務(wù)在消息處理的過(guò)程中是可以開(kāi)啟事務(wù)的,如果出現(xiàn)處理失敗的情況,就會(huì)回滾。在消息監(jiān)聽(tīng)容器當(dāng)中可以配置一個(gè)屬性是sessionTransacted的本地事務(wù),如果value為true,就代表開(kāi)啟本地事務(wù)。具體配置如下:

    <!-- 消息監(jiān)聽(tīng)器容器 --><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="destination"/><property name="messageListener" ref="messageListener"/><!--配置本地資源事務(wù)--><property name="sessionTransacted" value="true"/> </bean>

    消息監(jiān)聽(tīng)器容器

    上面的消費(fèi)者的講解中,其實(shí),最重要的就是消息監(jiān)聽(tīng)器容器配置了,這一部分,我們就詳細(xì)的講解一下消息監(jiān)聽(tīng)器容器的配置方法。

    1 實(shí)現(xiàn)MessageListener接口這種方式就是上面的實(shí)例使用的方式,先看看這個(gè)接口。

    public interface MessageListener {void onMessage(Message var1); }

    這個(gè)接口很簡(jiǎn)單,只有一個(gè)方法onMessage,通過(guò)拿到Message參數(shù)讀取消息,這里就不再多說(shuō)了。

    2 實(shí)現(xiàn)SessionAwareMessageListener接口這個(gè)接口平時(shí)很少用到,但是,其實(shí)是有這個(gè)接口可以實(shí)現(xiàn)的,這個(gè)接口和上面的MessageListener接口有點(diǎn)不一樣,這個(gè)接口是Spring提供的。

    public interface SessionAwareMessageListener<M extends Message> {void onMessage(M var1, Session var2) throws JMSException; }

    另外,你可以看到,這個(gè)接口提供的是一個(gè)泛型接口,可以是M extends Message這個(gè)類型,同時(shí),實(shí)現(xiàn)的方式onMessage,還多了一個(gè)Session參數(shù),可以在獲取消息的同時(shí)處理Session。

    使用實(shí)例

    /*** @ClassName MySessionAwareMessageListener* @Description 實(shí)現(xiàn)SessionAwareMessageListener的消息監(jiān)聽(tīng)器* @Author 歐陽(yáng)思海* @Date 2019/8/16 16:02* @Version 1.0**/ public class MySessionAwareMessageListener implements SessionAwareMessageListener {@Overridepublic void onMessage(Message message, Session session) throws JMSException {if (message instanceof TextMessage) {try {System.out.println(((TextMessage) message).getText());}catch (JMSException ex) {throw new RuntimeException(ex);}}else {throw new IllegalArgumentException("Message must be of type TextMessage");}} }

    5.4 注解方式(0配置)

    前面已經(jīng)介紹了兩種方式,分別是不使用xml配置方式使用xml配置的方式,但是,由于現(xiàn)在微服務(wù)的興起,約定優(yōu)于配置是現(xiàn)在的一種趨勢(shì),所以,在這一節(jié)中,我們使用注解的方式來(lái)處理。

    生產(chǎn)者(Producer)

    由于使用注解的方式,所以,我們不再需要xml配置文件了,但是,我們可以參照上面的xml的配置方式來(lái)配置注解的方式。

    step1:首先,我們需要一個(gè) Java 配置類,如下;

    /*** @ClassName ProducerConfig* @Description 不用xml的配置類* @Author 歐陽(yáng)思海* @Date 2019/8/16 17:41* @Version 1.0**/ @Configuration public class ProducerConfig {@Bean//配置ConnectionFactory用于生成connectionpublic ActiveMQConnectionFactory connectionFactory() {ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory("tcp://localhost:61616");return activeMQConnectionFactory;}@Bean//注冊(cè)SingleConnectionFactory,這個(gè)spring的一個(gè)包裝工廠 用于管理真正的ConnectionFactorypublic SingleConnectionFactory singleConnectionFactory(ActiveMQConnectionFactory activeMQconnectionFactory) {SingleConnectionFactory connectionFactory = new SingleConnectionFactory();//設(shè)置目標(biāo)工廠connectionFactory.setTargetConnectionFactory(activeMQconnectionFactory);return connectionFactory;}@Bean//配置生產(chǎn)者,jmsTemplatepublic JmsTemplate jmsTemplate(SingleConnectionFactory connectionFactory) {JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setConnectionFactory(connectionFactory);return jmsTemplate;}/*** 配置隊(duì)列目的的: 根據(jù)測(cè)試需要配置其中一個(gè)* 1.隊(duì)列 點(diǎn)對(duì)點(diǎn) queue* 2.主題 一對(duì)多 topic*/@Bean //public ActiveMQQueue queueDestination() {ActiveMQQueue activeMQQueue = new ActiveMQQueue("queue-anno");return activeMQQueue;}@Beanpublic ActiveMQTopic topicDestination() {ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic-anno");return activeMQTopic;} }

    上面的配置的每一個(gè)方法就對(duì)應(yīng)xml配置的每一個(gè)節(jié)點(diǎn),對(duì)應(yīng)起來(lái)配置會(huì)比較簡(jiǎn)單,每一個(gè)方法都使用了@Bean這個(gè)注解,類上使用Configuration,將這些配置加入到 spring 容器中。

    step2:啟動(dòng) spring 容器,發(fā)送消息;

    /*** @ClassName JmsSenderWithAnnotation* @Description 注解發(fā)送方式* @Author 歐陽(yáng)思海* @Date 2019/8/16 18:04* @Version 1.0**/ public class JmsSenderWithAnnotation {/** @Author 歐陽(yáng)思海* @Description 測(cè)試點(diǎn)對(duì)點(diǎn)* @Date 18:05 2019/8/16* @Param []* @return void**/@Testpublic void testActiveMqAnnotation() {AnnotationConfigApplicationContext aContext =new AnnotationConfigApplicationContext(ProducerConfig.class);//獲得發(fā)送者的模板對(duì)象JmsTemplate jmsTemplate = aContext.getBean(JmsTemplate.class);Destination bean = (Destination) aContext.getBean("queueDestination");//發(fā)送消息jmsTemplate.send(bean, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {TextMessage message = session.createTextMessage();message.setText("activemq message for queue");return message;}});}/** @Author 歐陽(yáng)思海* @Description 測(cè)試topic發(fā)送* @Date 18:06 2019/8/16* @Param []* @return void**/@Testpublic void testActiveMqAnnotation2() {AnnotationConfigApplicationContext aContext =new AnnotationConfigApplicationContext(ProducerConfig.class);//獲得發(fā)送者的模板對(duì)象JmsTemplate jmsTemplate = aContext.getBean(JmsTemplate.class);Destination bean = (Destination) aContext.getBean("topicDestination");//發(fā)送消息jmsTemplate.send(bean, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {TextMessage message = session.createTextMessage();message.setText("activemq message for topic");return message;}});} }

    分別運(yùn)行這兩個(gè)測(cè)試,查看ActiveMQ控制臺(tái),發(fā)現(xiàn)Queue和Topic都有一條消息發(fā)送成功;

    消費(fèi)者(Consumer)

    消費(fèi)者的大概也差不多,跟xml的配置一樣,多的也是消息監(jiān)聽(tīng)容器的配置,來(lái)看看;

    step1:首先,Java 配置類

    *** @ClassName ConsumerConfig* @Description 不用xml的配置類* @Author 歐陽(yáng)思海* @Date 2019/8/16 17:44* @Version 1.0**/ @ComponentScan(basePackages = {"com.sihai"}) @EnableJms @Configuration public class ConsumerConfig {@Bean//配置ConnectionFactory用于生成connectionpublic ActiveMQConnectionFactory connectionFactory() {ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory("tcp://localhost:61616");return activeMQConnectionFactory;}@Bean//注冊(cè)SingleConnectionFactory,這個(gè)spring的一個(gè)包裝工廠 用于管理真正的ConnectionFactorypublic SingleConnectionFactory singleConnectionFactory(ActiveMQConnectionFactory activeMQconnectionFactory) {SingleConnectionFactory connectionFactory = new SingleConnectionFactory();//設(shè)置目標(biāo)工廠connectionFactory.setTargetConnectionFactory(activeMQconnectionFactory);return connectionFactory;}/*在xml當(dāng)中的如下配置 效果相同* <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">* <property name="connectionFactory" ref="connectionFactory" />* <property name="destination" ref="topicDestination" />* <property name="messageListener" ref="itemListenerMessage" />* </bean>**/@Beanpublic DefaultMessageListenerContainer jmsListenerContainerFactory(SingleConnectionFactory singleConnectionFactory, MyMessageListener myMessageListener, Destination destination) {//創(chuàng)建容器DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();//設(shè)置監(jiān)聽(tīng)器jmsContainer.setMessageListener(myMessageListener);//設(shè)置連接工廠jmsContainer.setConnectionFactory(singleConnectionFactory);//設(shè)置監(jiān)聽(tīng)目的地的名字/也可以直接設(shè)置對(duì)象目的地jmsContainer.setDestination(destination);return jmsContainer;}/*** 1.隊(duì)列 點(diǎn)對(duì)點(diǎn) queue* 2.主題 一對(duì)多 topic*/@Beanpublic ActiveMQQueue queueDestination() {ActiveMQQueue activeMQQueue = new ActiveMQQueue("queue-anno");return activeMQQueue;}/*@Beanpublic ActiveMQTopic topicDestination() {ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic-anno");return activeMQTopic;}*/ }

    其中只有一個(gè)消息監(jiān)聽(tīng)容器的配置是和生產(chǎn)者的配置不同的消息監(jiān)聽(tīng)容器的配置需要配置消息監(jiān)聽(tīng)器、連接工廠和目的地(Destination)

    /*在xml當(dāng)中的如下配置 效果相同* <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">* <property name="connectionFactory" ref="connectionFactory" />* <property name="destination" ref="topicDestination" />* <property name="messageListener" ref="itemListenerMessage" />* </bean>**/@Beanpublic DefaultMessageListenerContainer jmsListenerContainerFactory(SingleConnectionFactory singleConnectionFactory, MyMessageListener myMessageListener, Destination destination) {//創(chuàng)建容器DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();//設(shè)置監(jiān)聽(tīng)器jmsContainer.setMessageListener(myMessageListener);//設(shè)置連接工廠jmsContainer.setConnectionFactory(singleConnectionFactory);//設(shè)置監(jiān)聽(tīng)目的地的名字/也可以直接設(shè)置對(duì)象目的地jmsContainer.setDestination(destination);return jmsContainer;}

    step2:消息監(jiān)聽(tīng)器

    /*** @ClassName MyMessageListener* @Description 消息消費(fèi)監(jiān)聽(tīng)器實(shí)現(xiàn)* @Author 歐陽(yáng)思海* @Date 2019/8/13 20:39* @Version 1.0**/ @Component public class MyMessageListener implements MessageListener {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {System.out.println(((TextMessage) message).getText());}catch (JMSException ex) {throw new RuntimeException(ex);}}else {throw new IllegalArgumentException("Message must be of type TextMessage");}} }

    這個(gè)前面已經(jīng)講過(guò)了,這里就不再累贅了,但是,這里我需要講的是消息監(jiān)聽(tīng)器注解方式的配置,如下。

    step3:消息監(jiān)聽(tīng)器注解方式的配置方法

    /*** @ClassName JmsAnnotation* @Description 注解方式監(jiān)聽(tīng)* @Author 歐陽(yáng)思海* @Date 2019/8/16 17:01* @Version 1.0**/ @Component @EnableJms public class JmsAnnotation {@JmsListener(destination = "queue-anno")public void onMessage(Message message) {if (message instanceof TextMessage) {try {System.out.println(((TextMessage) message).getText());}catch (JMSException ex) {throw new RuntimeException(ex);}}else {throw new IllegalArgumentException("Message must be of type TextMessage");}} }

    你會(huì)發(fā)現(xiàn),在消息監(jiān)聽(tīng)器的類上面需要兩個(gè)配置@Component和@EnableJms,用于標(biāo)記這是一個(gè)消息監(jiān)聽(tīng)器,另外,在onMessage方法上,需要一個(gè)@JmsListener(destination = "queue-anno")注解,可以標(biāo)記需要哪個(gè)destination 。

    注意:如果采用注解的消息監(jiān)聽(tīng),那么需要修改Java類的消息監(jiān)聽(tīng)的容器的配置,否則會(huì)出現(xiàn)問(wèn)題

    step4:消息監(jiān)聽(tīng)容器配置更改

    /*在xml當(dāng)中的如下配置 效果相同* <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">* <property name="connectionFactory" ref="connectionFactory" />* <property name="destination" ref="topicDestination" />* <property name="messageListener" ref="itemListenerMessage" />* </bean>**/@Beanpublic DefaultMessageListenerContainer jmsListenerContainerFactory(SingleConnectionFactory singleConnectionFactory, MyMessageListener myMessageListener, Destination destination) {//創(chuàng)建容器DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();//設(shè)置監(jiān)聽(tīng)器jmsContainer.setMessageListener(myMessageListener);//設(shè)置連接工廠jmsContainer.setConnectionFactory(singleConnectionFactory);//設(shè)置監(jiān)聽(tīng)目的地的名字/也可以直接設(shè)置對(duì)象目的地jmsContainer.setDestination(destination);return jmsContainer;}

    改為

    @Beanpublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory()); // factory.setDestinationResolver(destinationResolver());factory.setSessionTransacted(true);factory.setConcurrency("3-10");return factory;}

    上面的修改會(huì)發(fā)現(xiàn),實(shí)現(xiàn)接口的監(jiān)聽(tīng)器使用的是DefaultMessageListenerContainer,而注解的方式使用的是DefaultJmsListenerContainerFactory,所以,這里需要特別注意。

    此時(shí),消息監(jiān)聽(tīng)器是注解的方式的Java配置類就是下面這樣的。

    /*** @ClassName ConsumerConfig* @Description 不用xml的配置類* @Author 歐陽(yáng)思海* @Date 2019/8/16 17:44* @Version 1.0**/ @ComponentScan(basePackages = {"com.sihai"}) @EnableJms @Configuration public class ConsumerConfig {@Bean//配置ConnectionFactory用于生成connectionpublic ActiveMQConnectionFactory connectionFactory() {ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory("tcp://localhost:61616");return activeMQConnectionFactory;}@Bean//注冊(cè)SingleConnectionFactory,這個(gè)spring的一個(gè)包裝工廠 用于管理真正的ConnectionFactorypublic SingleConnectionFactory singleConnectionFactory(ActiveMQConnectionFactory activeMQconnectionFactory) {SingleConnectionFactory connectionFactory = new SingleConnectionFactory();//設(shè)置目標(biāo)工廠connectionFactory.setTargetConnectionFactory(activeMQconnectionFactory);return connectionFactory;}@Beanpublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory()); // factory.setDestinationResolver(destinationResolver());factory.setSessionTransacted(true);factory.setConcurrency("3-10");return factory;}/*** 1.隊(duì)列 點(diǎn)對(duì)點(diǎn) queue* 2.主題 一對(duì)多 topic*/@Beanpublic ActiveMQQueue queueDestination() {ActiveMQQueue activeMQQueue = new ActiveMQQueue("queue-anno");return activeMQQueue;}/*@Beanpublic ActiveMQTopic topicDestination() {ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic-anno");return activeMQTopic;}*/ }

    step5:啟動(dòng)容器,消費(fèi)消息

    /*** @ClassName SpringSender* @Description* @Author 歐陽(yáng)思海* @Date 2019/8/13 17:22* @Version 1.0**/ public class SpringReceiver {/** @Author 歐陽(yáng)思海* @Description xml配置方式獲取消息* @Date 18:09 2019/8/16* @Param []* @return void**/@Testpublic void test_01() throws IOException {ApplicationContext application = new FileSystemXmlApplicationContext("G:\\ideaproject\\activemq\\Consumer\\src\\main\\resources\\service-jms.xml");/*JmsTemplate jmsTemplate = (JmsTemplate) application.getBean("jmsTemplate");String msg = (String) jmsTemplate.receiveAndConvert();System.out.println(msg);*/System.in.read();}/** @Author 歐陽(yáng)思海* @Description 注解方式獲取消息* @Date 18:10 2019/8/16* @Param []* @return void**/@Testpublic void test_02() throws IOException {AnnotationConfigApplicationContext aContext =new AnnotationConfigApplicationContext(ConsumerConfig.class);/*JmsTemplate jmsTemplate = (JmsTemplate) application.getBean("jmsTemplate");String msg = (String) jmsTemplate.receiveAndConvert();System.out.println(msg);*/System.in.read();} }

    終于,到這里把ActiveMQ整合Spring的全部?jī)?nèi)容就講述完結(jié)了,這一部分講了三個(gè)部分,分別是:

    • 不使用 Spring 配置文件方式
    • 使用 Spring 配置文件方式
    • 注解方式(0配置)

    6 ActiveMQ支持的傳輸協(xié)議

    6.1 默認(rèn)協(xié)議介紹

    在ActiveMQ中支持的協(xié)議還是挺多的,這也是ActiveMQ的一個(gè)特點(diǎn)之一,例如,默認(rèn)支持AMQP、MQTT、OpenWire、STOMP、WebSocket,這些默認(rèn)的協(xié)議的配置都是在activemq.xml配置文件中的。

    <transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/></transportConnectors>

    注意:上面的每種協(xié)議的端口都必須是不一樣的。

    6.2 其他協(xié)議

    除了上面的協(xié)議外,還支持這些協(xié)議:TCP、UDP 、NIO、SSL、Http(s)、vm

    那么如何使用這些協(xié)議呢?

    只需要在上面的activemq.xml配置文件中的transportConnectors節(jié)點(diǎn)添加就可以,例如,添加 nio協(xié)議

    <transportConnectors><!-- 新增協(xié)議 --><transportConnector name="nio" uri="nio://0.0.0.0:61619"/><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/></transportConnectors>

    其他協(xié)議的添加方法也是相似的!

    6.3 簡(jiǎn)化配置

    在ActiveMQ中還有一種更加簡(jiǎn)單的配置方法,在uri中可以使用 auto 來(lái)簡(jiǎn)化配置,ActiveMQ將監(jiān)聽(tīng)器端口的消息自動(dòng)適配相應(yīng)的協(xié)議。

    <transportConnector name="auto" uri="auto://0.0.0.0:61619"/>

    如果需要更加安全,還可以在此基礎(chǔ)上添加ssl協(xié)議。

    <transportConnector name="auto+ssl" uri="auto+ssl://0.0.0.0:61619"/>

    如果還想要提高傳輸?shù)男阅?#xff0c;可以配合上面的nio協(xié)議,提高網(wǎng)絡(luò)性能。

    <transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61619"/>

    7 ActiveMQ的持久化存儲(chǔ)機(jī)制

    持久化的作用是什么呢?

    作用主要是為避免系統(tǒng)以外宕機(jī)而導(dǎo)致消息丟失,在ActiveMQ中支持多種持久化機(jī)制,比如,JDBC、AMQ、KahaDB、LevelDB,下面簡(jiǎn)單介紹一下這幾種機(jī)制。

    • JDBC:基于數(shù)據(jù)庫(kù)存儲(chǔ)的方式,可以存儲(chǔ)在Mysql等數(shù)據(jù)庫(kù)中,這種機(jī)制的性能瓶頸在Mysql等數(shù)據(jù)庫(kù),所以其性能是不太好的。

    配置方法在activemq.xml配置文件中配置,這里我們使用Mysql進(jìn)行配置。

    step1:修改persistenceAdapter節(jié)點(diǎn)

    <persistenceAdapter><jdbcPersistenceAdapter dataSource="#mysqlDataSource" createTablesOnStartup="true"/><!--<kahaDB directory="${activemq.data}/kahadb"/>--></persistenceAdapter>

    其中,dataSource="#mysqlDataSource"是數(shù)據(jù)源引用。

    step2:配置Mysql數(shù)據(jù)源

    <bean id="mysqlDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://localhost:3306/test"/><property name="username" value="root"/><property name="password" value="123456"/></bean>

    這就是spring的配置方式。

    step3:導(dǎo)入數(shù)據(jù)庫(kù)連接池、驅(qū)動(dòng)等Jar包在ActiveMQ的目錄中有一個(gè)lib目錄,是存放jar包的目錄。

    將下面幾個(gè)Jar放入。

    step4:啟動(dòng)ActiveMQ,查看結(jié)果啟動(dòng)之后,打開(kāi)mysql數(shù)據(jù)庫(kù),發(fā)現(xiàn)生成了三張數(shù)據(jù)表。這樣就成功了,每次生成消息之后,就會(huì)將消息的信息存儲(chǔ)到這三張表中,消費(fèi)之后,再刪除信息。

    • AMQ:基于文件存儲(chǔ),這種方式會(huì)把消息寫(xiě)入日志文件,并且是順序存儲(chǔ)方式,這種方式比JDBC方式要好,缺點(diǎn)是:會(huì)為每個(gè)Destination創(chuàng)建索引,占用大量磁盤(pán)空間。

    配置方法在activemq.xml配置文件中配置,更加詳細(xì)參數(shù)請(qǐng)參考:https://activemq.apache.org/amq-message-store。

    <broker brokerName="broker" persistent="true" useShutdownHook="false"><persistenceAdapter><amqPersistenceAdapter directory="數(shù)據(jù)存儲(chǔ)目錄" maxFileLength="32mb"/></persistenceAdapter></broker>

    • KahaDB:這個(gè)5.4版本之后出現(xiàn)的默認(rèn)的持久化方式,與AMQ很相似,不同的是只為Destination創(chuàng)建一個(gè)索引。

    配置方法在activemq.xml配置文件中配置,更加詳細(xì)參數(shù)請(qǐng)參考:https://activemq.apache.org/kahadb。

    <broker brokerName="broker"><persistenceAdapter><kahaDB directory="數(shù)據(jù)存儲(chǔ)目錄" journalMaxFileLength="32mb"/></persistenceAdapter></broker>

    • LevelDB:5.6版本后推出的新的持久化方式。這種比KahaDB更快,跟KahaDB類似,但是不是用自定義B數(shù)實(shí)現(xiàn)。但是需要注意的是,目前官網(wǎng)已經(jīng)不推薦使用這種方式,而是推薦使用KahaDB。

    配置方法在activemq.xml配置文件中配置,更加詳細(xì)的參數(shù)請(qǐng)參考:https://activemq.apache.org/leveldb-store。

    <broker brokerName="broker" ... >...<persistenceAdapter><levelDB directory="數(shù)據(jù)存儲(chǔ)目錄"/></persistenceAdapter>...</broker>

    8 ActiveMQ網(wǎng)絡(luò)連接支持

    Broker的網(wǎng)絡(luò)配置主要有三種配置方法,分別是靜態(tài)配置、動(dòng)態(tài)配置和主從配置。

    8.1 靜態(tài)配置

    靜態(tài)傳輸提供了一種硬編碼機(jī)制,可以使用URI列表發(fā)現(xiàn)其他連接。使用此發(fā)現(xiàn)機(jī)制的連接將嘗試連接到列表中的所有URI,直到成功為止。

    在activemq.xml配置文件中配置。

    <networkConnectors><networkConnector uri="static:(tcp://localhoat:61616)"/></networkConnectors>

    配置語(yǔ)法

    static:(uri1,uri2,uri3,…)?options

    舉例

    static:(tcp://localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100

    uri的屬性說(shuō)明

    8.2 動(dòng)態(tài)配置

    在activemq.xml配置文件中配置。

    <networkConnectors><networkConnector uri="multicast://default"/></networkConnectors>

    8.3 主從配置

    Master-Slave模型是非常常見(jiàn)的,主從模型主要是為了防止一個(gè)網(wǎng)絡(luò)節(jié)點(diǎn)出現(xiàn)問(wèn)題而提出的,提高了穩(wěn)定性。

    在ActiveMQ中也是可配置的,我們可以在activemq.xml配置文件中進(jìn)行相關(guān)配置。

    <networkConnectors><networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> </networkConnectors>

    注意:Master-Slave方式的第一個(gè)url需要是master,其他是slave。

    另外,NetworkConnector 節(jié)點(diǎn)還有其他屬性可以配置,具體詳情可以查看官網(wǎng):https://activemq.apache.org/networks-of-brokers。

    8.4 容錯(cuò)的客戶端連接方法

    在前面的客戶端連接ActiveMQ的時(shí)候只是使用一個(gè)簡(jiǎn)單的url進(jìn)行連接。

    ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory("tcp://localhost:61616");

    但是,這種方式會(huì)出現(xiàn)一個(gè)問(wèn)題,一旦這臺(tái)ActiveMQ宕機(jī)了,就連接不上了,所以,有另外一種容錯(cuò)的方式,當(dāng)一臺(tái)出現(xiàn)宕機(jī),可以連接上其他的機(jī)器,這樣就不會(huì)出現(xiàn)問(wèn)題了。

    ActiveMQConnectionFactory activeMQConnectionFactory= new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://remotehost:61616)");

    其他屬性參數(shù)請(qǐng)參考:https://activemq.apache.org/failover-transport-reference。

    文章有不當(dāng)之處,歡迎指正,如果喜歡微信閱讀,你也可以關(guān)注我的微信公眾號(hào):好好學(xué)java,獲取優(yōu)質(zhì)學(xué)習(xí)資源。

    《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

    總結(jié)

    以上是生活随笔為你收集整理的给女朋友讲ActiveMQ是啥?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。