javascript
SpringBoot 整合ActiveMQ
?消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息,流量削鋒等問題。實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。是大型分布式系統(tǒng)不可缺少的中間件。消息形式支持點(diǎn)對點(diǎn)和訂閱-發(fā)布。
ActiveMQ是什么
ActiveMQ是消息隊(duì)列技術(shù),為解決高并發(fā)問題而生
ActiveMQ生產(chǎn)者消費(fèi)者模型(生產(chǎn)者和消費(fèi)者可以跨平臺、跨系統(tǒng))
ActiveMQ支持如下兩種消息傳輸方式
點(diǎn)對點(diǎn)模式,生產(chǎn)者生產(chǎn)了一個(gè)消息,只能由一個(gè)消費(fèi)者進(jìn)行消費(fèi)
發(fā)布/訂閱模式,生產(chǎn)者生產(chǎn)了一個(gè)消息,可以由多個(gè)消費(fèi)者進(jìn)行消費(fèi)
SpringBoot整合ActiveMQ
? ? ? 1. ActiveMQ下載啟動
? ? ? ? ? http://activemq.apache.org/download-archives.html ,本文用的是windows版的5.15.3版本,下載下來是壓縮包,自行解壓一個(gè)到目錄下,CMD進(jìn)入到解壓目錄下的bin目錄下,執(zhí)行 activemq.bat start 啟動。 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 如果能成功訪問http://localhost:8161/admin(用戶名和密碼默認(rèn)為admin),則啟動成功。
? ? ? 2. ?創(chuàng)建兩個(gè)springboot項(xiàng)目,分別作為消息提供者(provider)和消費(fèi)者(consumer),添加依賴
<dependency>
? ? <groupId>org.springframework.boot</groupId>
? ? <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--消息隊(duì)列連接池-->
<dependency>
? ? <groupId>org.apache.activemq</groupId>
? ? <artifactId>activemq-pool</artifactId>
? ? <version>5.15.0</version>
</dependency>
? ? ? 3. ?在兩個(gè)項(xiàng)目中的application.properires配置消息隊(duì)列,并在啟動類添加@EnableJms開啟消息隊(duì)列。
?application.properties
# failover:(tcp://localhost:61616,tcp://localhost:61617)
# tcp://localhost:61616
spring.activemq.broker-url=tcp://localhost:61616
#true 表示使用內(nèi)置的MQ,false則連接服務(wù)器
spring.activemq.in-memory=false
#true表示使用連接池;false時(shí),每發(fā)送一條數(shù)據(jù)創(chuàng)建一個(gè)連接
spring.activemq.pool.enabled=true
#連接池最大連接數(shù)
spring.activemq.pool.max-connections=10
#空閑的連接過期時(shí)間,默認(rèn)為30秒
spring.activemq.pool.idle-timeout=30000
#強(qiáng)制的連接過期時(shí)間,與idleTimeout的區(qū)別在于:idleTimeout是在連接空閑一段時(shí)間失效,而expiryTimeout不管當(dāng)前連接的情況,只要達(dá)到指定時(shí)間就失效。默認(rèn)為0,never
spring.activemq.pool.expiry-timeout=0
? 啟動類(provider),consumer同樣
@SpringBootApplication
@EnableJms //啟動消息隊(duì)列
public class ProviderApplication {
?? ?public static void main(String[] args) {
?? ??? ?SpringApplication.run(ProviderApplication.class, args);
?? ?}
}
? ? ? 4. ?prover項(xiàng)目結(jié)構(gòu)圖
?
? ? BeanConfig定義消息隊(duì)列
import javax.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
?
/*
?* @author uv
?* @date 2018/9/15 14:21
?*/
@Configuration
public class BeanConfig {
?
? ? //定義存放消息的隊(duì)列
? ? @Bean
? ? public Queue queue() {
? ? ? ? return new ActiveMQQueue("ActiveMQQueue");
? ? }
}
? ? ?ProviderController
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
?
/*
?* @author uv
?* @date 2018/9/15 14:54
?*
?*/
@RestController
public class ProviderController {
?
? ? //注入存放消息的隊(duì)列,用于下列方法一
? ? @Autowired
? ? private Queue queue;
?
? ? //注入springboot封裝的工具類
? ? @Autowired
? ? private JmsMessagingTemplate jmsMessagingTemplate;
?
? ? @RequestMapping("send")
? ? public void send(String name) {
? ? ? ? //方法一:添加消息到消息隊(duì)列
? ? ? ? jmsMessagingTemplate.convertAndSend(queue, name);
? ? ? ? //方法二:這種方式不需要手動創(chuàng)建queue,系統(tǒng)會自行創(chuàng)建名為test的隊(duì)列
? ? ? ? //jmsMessagingTemplate.convertAndSend("test", name);
? ? }
}
? ? ? 5. ?啟動provider,向消息隊(duì)列添加數(shù)據(jù),本次添加5條數(shù)據(jù)
? ? ? 查看 http://localhost:8161/admin/queues.jsp 如下
Number Of Pending Messages:消息隊(duì)列中待處理的消息
Number Of Consumers:消費(fèi)者的數(shù)量
Messages Enqueued:累計(jì)進(jìn)入過消息隊(duì)列的總量
Messages Dequeued:累計(jì)消費(fèi)過的消息總量
? ? ? 6. consumer項(xiàng)目結(jié)構(gòu)圖
?
application.properties 和 ConsumerApplication 同 provider類似,如下為不同的ConsumerService:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
?
/*
?* @author uv
?* @date 2018/9/15 18:36
?*
?*/
@Component
public class ConsumerService {
?
? ? @Autowired
? ? private JmsMessagingTemplate jmsMessagingTemplate;
?
? ? // 使用JmsListener配置消費(fèi)者監(jiān)聽的隊(duì)列,其中name是接收到的消息
? ? @JmsListener(destination = "ActiveMQQueue")
? ? // SendTo 會將此方法返回的數(shù)據(jù), 寫入到 OutQueue 中去.
? ? @SendTo("SQueue")
? ? public String handleMessage(String name) {
? ? ? ? System.out.println("成功接受Name" + name);
? ? ? ? return "成功接受Name" + name;
? ? }
}
? ? ? 7. 啟動consumer,控制臺輸出如下
前三條重啟不顯示了。
?
? ? ? 消息接收成功,查看 http://localhost:8161/admin/queues.jsp ,如下圖所示,消息隊(duì)列中不再有未處理的消息,由于consumer的啟動,消費(fèi)者的數(shù)量為1,Messages Dequeued(累計(jì)消費(fèi)過的消息總量)的數(shù)值也變成了5;另外消費(fèi)者接收到5條消息處理后,返回到OutQueue 5條消息,下圖可以看出來。
?
? ? ? 8. ActiveMQ的持久化
? ? ?持久化可以參考這篇文章 https://blog.csdn.net/qq_22200097/article/details/82716859
?
總結(jié)
以上是生活随笔為你收集整理的SpringBoot 整合ActiveMQ的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: gin-binding参数效验
- 下一篇: gradle idea java ssm