Java消息中间件(activeMQ)
文章目錄
- **第一章 消息中間件概述**
- 1. 消息中間件的好處
- 2. 什么是消息中間件
- 3. 什么是JMS(規(guī)范)
- 4. 什么是AMQP(協(xié)議)
- 5. 幾個(gè)常用消息中間對(duì)比
- **第二章 初始JMS**
- **2.1 JSM相關(guān)概念**
- **2.2 隊(duì)列模式**
- **2.3 主題模式**
- **2.4 JSM編碼接口**
- **第三章 ActiveMQ的使用**
- 3.1 activeMQ在Windows平臺(tái)上的安裝
- 3.2 ActiveMQ的隊(duì)列模式
- 3.3 ActiveMQ的主題模式(發(fā)布/訂閱)
- **3.4 spring集成JMS連接ActiveMQ**
- **3.4.1 幾個(gè)相關(guān)類**
- **3.4.2 消息隊(duì)列模式與spring集成**
- 3.4.3 主題模式與spring的集成
- **第四章 ActiveMQ集群**
- **4.1 集群方式**
- **4.2 客戶端配置**
- 4.2.1. ActiveMQ失效轉(zhuǎn)移(failover):
- **4.3 Broker Cluster集群配置**
- **4.4 Master/Slave集群配置**
- **4.5 Broker clusters和Master Slave對(duì)比**
- **4.6 高可用且負(fù)載均衡的集群方案**
- 第五章 消息中間件如何傳對(duì)象
第一章 消息中間件概述
1. 消息中間件的好處
解耦、異步、橫向擴(kuò)展、安全可靠、順序保證2. 什么是消息中間件
發(fā)送和接收數(shù)據(jù),利用高效可靠的異步消息傳遞機(jī)制集成分布式系統(tǒng)3. 什么是JMS(規(guī)范)
Java消息服務(wù)(Java Message Service),是一個(gè)Java平臺(tái)中面向消息中間件的API4. 什么是AMQP(協(xié)議)
AMQP(advanced message queuing protocol),是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)協(xié)議。 此協(xié)議不受客戶端和中間件的不同產(chǎn)品和不同開發(fā)語(yǔ)言的限制。5. 幾個(gè)常用消息中間對(duì)比
| 優(yōu)點(diǎn) | 遵循JMS規(guī)范,安裝方便 | 繼承Erlang天生的并發(fā)性,最初用于金融行業(yè),穩(wěn)定性和安全性有保障 | 依賴zk,可動(dòng)態(tài)擴(kuò)展節(jié)點(diǎn),高性能、高吞吐量、無(wú)限擴(kuò)容、消息可指定追溯 |
| 缺點(diǎn) | 有可能會(huì)丟失消息。現(xiàn)在的重心在下一代產(chǎn)品apolle上,所以5.x的產(chǎn)品不怎么維護(hù)了 | Erlang語(yǔ)言難度較大,不支持動(dòng)態(tài)擴(kuò)展 | 嚴(yán)格的順序機(jī)制,不支持消息優(yōu)先級(jí),不支持標(biāo)準(zhǔn)的消息協(xié)議,不利于平臺(tái)遷移 |
| 支持協(xié)議 | AMQP,OpenWire,Stomp,XMPP | AMQP | |
| 應(yīng)用 | 適合中小企業(yè),不適合好千個(gè)隊(duì)列的應(yīng)用 | 適合對(duì)穩(wěn)定性要求高的企業(yè)級(jí)應(yīng)用 | 應(yīng)用在大數(shù)據(jù)日志處理或?qū)?shí)時(shí)性、可靠性(少量數(shù)據(jù)丟失)要求較低的場(chǎng)景應(yīng)用 |
第二章 初始JMS
2.1 JSM相關(guān)概念
2.2 隊(duì)列模式
1. 特性:
客戶端包括生產(chǎn)者和消費(fèi)者
隊(duì)列中的消息只能被一個(gè)消息費(fèi)者消息
消費(fèi)者可以隨時(shí)消費(fèi)隊(duì)列中的消息
2. 隊(duì)列模型示意圖
2.3 主題模式
1. 特性:
客戶端包括發(fā)布者和訂閱者
主題中的消息被所有訂閱者消息
消費(fèi)者不能消費(fèi)訂閱之前就發(fā)送到主題中的消息
2. 主題模型示意圖
2.4 JSM編碼接口
ConnectionFactory 用于創(chuàng)建連接到消息中間件的連接工廠
Connection 代表了應(yīng)用程序和消息服務(wù)器之間的通信鏈路
Destination 指消息發(fā)布和接收的地點(diǎn),包括隊(duì)列或主題
Session 表示一個(gè)單線程的上下文,用于發(fā)送和接收消息
MessageProducer 由會(huì)話創(chuàng)建,用于發(fā)送消息到目標(biāo)
MessageConsumer 由會(huì)話創(chuàng)建,用于接收發(fā)送到目標(biāo)的消息
Message 是在消費(fèi)者和生產(chǎn)者之間傳送的對(duì)象, 消息頭,一組消息屬性,一個(gè)消息體
第三章 ActiveMQ的使用
3.1 activeMQ在Windows平臺(tái)上的安裝
1.下載ActiveMQ
去官方網(wǎng)站下載:http://activemq.apache.org/activemq-5152-release.html
2.運(yùn)行ActiveMQ
解壓縮apache-activemq-5.5.1-bin.zip到C盤,然后雙擊C:\apache-activemq-5.15.2\bin\win64\activemq.bat運(yùn)行ActiveMQ程序。
啟動(dòng)ActiveMQ以后,登陸:http://localhost:8161/admin/,進(jìn)入管理界面。
用戶名與密碼均為:admin
3.2 ActiveMQ的隊(duì)列模式
生產(chǎn)者代碼片:
package com.queue;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory; /*** 生產(chǎn)者* @author Peter**/ public class Proceducer {/*** */private final static String URL = "tcp://localhost:61616";/*** */private final static String QUEUE_NAME = "queue-name";public static void main(String[] args) throws JMSException {// 1. 創(chuàng)建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創(chuàng)建ConnectionConnection con = factory.createConnection();// 3. 啟動(dòng)連接con.start();// 4. 創(chuàng)建會(huì)話Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 創(chuàng)建一個(gè)目標(biāo)Destination dest = session.createQueue(QUEUE_NAME);// 6. 創(chuàng)建一個(gè)生產(chǎn)者M(jìn)essageProducer pro = (MessageProducer) session.createProducer(dest);for(int i = 0; i<10;i++) {// 7. 創(chuàng)建消息TextMessage msg = session.createTextMessage("消息"+i); // 8. 發(fā)布消息pro.send(msg);System.out.println(msg);}// 9. 關(guān)閉連接con.close();} }執(zhí)行上面代碼后,在管理界面看到的結(jié)果是:
消費(fèi)者代碼片:
/*** 消費(fèi)者* @author Peter*/ public class Consumer {/*** 中間件地址*/private final static String URL = "tcp://localhost:61616";/*** 中間件隊(duì)列名,與生產(chǎn)者的一致*/private final static String QUEUE_NAME = "queue-name";public static void main(String[] args) throws JMSException {// 1. 創(chuàng)建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創(chuàng)建ConnectionConnection con = factory.createConnection();// 3. 啟動(dòng)連接con.start();// 4. 創(chuàng)建會(huì)話Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 5. 創(chuàng)建一個(gè)目標(biāo)Destination dest = session.createQueue(QUEUE_NAME); // 6. 創(chuàng)建一個(gè)消費(fèi)者M(jìn)essageConsumer consumer = session.createConsumer(dest);// 7. 創(chuàng)建一個(gè)監(jiān)聽器consumer.setMessageListener(new MessageListener() { @Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("接收消息為:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}});// 先不關(guān)閉,不然還沒(méi)接收到消息就關(guān)閉了//con.close();} }執(zhí)行上面代碼后,在管理界面的結(jié)果如下:
如果我再新建一個(gè)消費(fèi)者,我們會(huì)發(fā)現(xiàn),兩個(gè)消費(fèi)者在搶收消息,即一個(gè)消費(fèi)者收到了消息,則另一個(gè)消費(fèi)者就收不到該消息了。
3.3 ActiveMQ的主題模式(發(fā)布/訂閱)
由于訂閱者是收不到還未訂閱主題之前的內(nèi)容的,所以必須要先啟動(dòng)訂閱者。
訂閱者代碼片:
/*** 訂閱者* @author Peter**/ public class Consumer {private final static String URL = "tcp://localhost:61616";private final static String TOPIC_NAME = "topic-name";public static void main(String[] args) throws JMSException {// 1. 創(chuàng)建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創(chuàng)建ConnectionConnection con = factory.createConnection();// 3. 啟動(dòng)連接con.start();// 4. 創(chuàng)建會(huì)話Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 5. 創(chuàng)建一個(gè)目標(biāo)【與隊(duì)列模式的區(qū)別就在這里,相當(dāng)于訂閱了該主題】Destination dest = session.createTopic(TOPIC_NAME); // 6. 創(chuàng)建一個(gè)消費(fèi)者M(jìn)essageConsumer consumer = session.createConsumer(dest);// 7. 創(chuàng)建一個(gè)監(jiān)聽器consumer.setMessageListener(new MessageListener() { @Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("接收消息為:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}});// 先不關(guān)閉,不然還沒(méi)接收到消息就關(guān)閉了//con.close();} }發(fā)布者代碼片:
/*** 發(fā)布者* @author Peter**/ public class Proceducer {private final static String URL = "tcp://localhost:61616";private final static String TOPIC_NAME = "topic-name";public static void main(String[] args) throws JMSException {// 1. 創(chuàng)建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創(chuàng)建ConnectionConnection con = factory.createConnection();// 3. 啟動(dòng)連接con.start();// 4. 創(chuàng)建會(huì)話Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 創(chuàng)建一個(gè)目標(biāo)【與隊(duì)列模式的區(qū)別就在這里,相當(dāng)于發(fā)布一個(gè)主題】Destination dest = session.createTopic(TOPIC_NAME);// 6. 創(chuàng)建一個(gè)生產(chǎn)者M(jìn)essageProducer pro = (MessageProducer) session.createProducer(dest);for(int i = 0; i<10;i++) {// 7. 創(chuàng)建消息TextMessage msg = session.createTextMessage("消息"+i); // 8. 發(fā)布消息pro.send(msg);System.out.println(msg);}// 9. 關(guān)閉連接con.close();} }如果我們?cè)傩陆ㄒ粋€(gè)訂閱者,我們會(huì)發(fā)現(xiàn)兩個(gè)訂閱者收到的消息完全一樣。
3.4 spring集成JMS連接ActiveMQ
我們下載的activeMQ壓縮文件里解壓后,能找到相關(guān)的jar包,但spring-jms這個(gè)可去maven倉(cāng)庫(kù)下載
3.4.1 幾個(gè)相關(guān)類
1. ConnectionFactory 用于管理連接的連接工廠【也是連接池:管理JmsTemplate每次發(fā)送消息都會(huì)重新創(chuàng)建的連接、會(huì)話、productor】
實(shí)現(xiàn)類:
SingleConnectionFactory:每次都返回同一個(gè)連接
CachingConnectionFactory:繼承了SingleConnectionFactory,并實(shí)現(xiàn)了緩存
2.JmsTemplate 用于發(fā)送和接收消息的模板類
由spring提供,它是線程安全類,可以在整個(gè)應(yīng)用范圍內(nèi)應(yīng)用
3.MessageListener 消息監(jiān)聽器
只需實(shí)現(xiàn)一個(gè)只接收Message參數(shù)的onMesssage方法
3.4.2 消息隊(duì)列模式與spring集成
1. 發(fā)送消息的接口
public interface ProducerInter {public void sendMessage(String message); }2. 發(fā)送消息實(shí)現(xiàn)類
import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator;public class ProducerImpl implements ProducerInter {@AutowiredJmsTemplate jms;// 由于可能會(huì)有多個(gè)目標(biāo),所以一定要以注入bean的id區(qū)分@Resource(name="destination")Destination destination;@Overridepublic void sendMessage(String message) {jms.send(destination, new MessageCreator() { @Overridepublic Message createMessage(Session sessioin) throws JMSException {TextMessage msg = sessioin.createTextMessage(message);System.out.println("發(fā)送消息:"+msg.getText());return msg;}});}}3. 配置文件(producer.xml)
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"default-autowire="byName" default-lazy-init="false"><!-- 開啟注解 --><context:component-scan base-package="com.jms.spring"></context:component-scan><!-- ActiveMQ提供的 --><bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/></bean><!-- spring提供的連接池 --><bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactoryId"/></bean><!-- 創(chuàng)建一個(gè)點(diǎn)對(duì)點(diǎn)的隊(duì)列目標(biāo)對(duì)象 --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="queuename"/></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactoryId"/></bean><!-- --><bean id="producerImpl" class="com.jms.spring.ProducerImpl"></bean> </beans>4. 測(cè)試發(fā)送
執(zhí)行之后,進(jìn)入管理界面可查看結(jié)果
5. 監(jiān)聽消息類
public class ConsumerMessageListener implements MessageListener{// 監(jiān)聽消息@Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("收到消息:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}}6. 接收消息的配置
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"default-autowire="byName" default-lazy-init="false"><!-- 開啟注解 --><context:component-scan base-package="com.jms.spring"></context:component-scan><!-- ActiveMQ提供的 --><bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/></bean><!-- spring提供的連接池 --><bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactoryId"/></bean><!-- 創(chuàng)建一個(gè)點(diǎn)對(duì)點(diǎn)的隊(duì)列目標(biāo)對(duì)象 --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="queuename"/></bean><!-- 上面的配置與producer.xml里是一樣的 --><!-- 注入消息監(jiān)聽器 --><bean id="consumerMessageListener" class="com.jms.spring.ConsumerMessageListener"></bean><!-- 配置消息監(jiān)聽容器 --><bean id="jmsContainerListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactoryId"/><property name="destination" ref="destination"/><property name="messageListener" ref="consumerMessageListener"/></bean></beans>7. 測(cè)試消費(fèi)者
public class TestConsumer {public static void main(String[] args) {new ClassPathXmlApplicationContext("consumer.xml"); } }3.4.3 主題模式與spring的集成
只需要將配置文件中的目標(biāo)對(duì)象org.apache.activemq.command.ActiveMQQueue改成org.apache.activemq.command.ActiveMQTopic即可。需要注意的是,在主題模式下,一定要先啟動(dòng)消費(fèi)者。
第四章 ActiveMQ集群
4.1 集群方式
客戶端集群:讓多個(gè)消費(fèi)者消費(fèi)同一個(gè)隊(duì)列
Broker clusters:多個(gè)Broker之間同步消息
Master Slave(主從):實(shí)現(xiàn)高可用
4.2 客戶端配置
4.2.1. ActiveMQ失效轉(zhuǎn)移(failover):
定義:允許當(dāng)其中一臺(tái)消息服務(wù)器宕機(jī)時(shí),客戶端在傳輸層上重新連接到其它消息服務(wù)器
語(yǔ)法:failover:(uri1,uri2,…,uriN)?transportOptions
transportOptions參數(shù)說(shuō)明
randomize 默認(rèn)為true,表示在uri列表中選擇uri連接時(shí),是否采用隨機(jī)策略
initialReconnectDelay 默認(rèn)為10,單位毫秒,表示第一嘗試重連之間等待的時(shí)間
maxReconnectionDelay 默認(rèn)30000,單位毫秒,最長(zhǎng)重連的時(shí)間間隔
4.3 Broker Cluster集群配置
1. 原理:
2. NetworkConnector(網(wǎng)絡(luò)連接器)
網(wǎng)絡(luò)連接器主要用于配置ActiveMQ服務(wù)器與服務(wù)器之間的網(wǎng)絡(luò)通訊方式,用于服務(wù)器透?jìng)飨?br /> 分為靜態(tài)連接器和動(dòng)態(tài)連接器
3. 靜態(tài)連接器:適用連接地址不多的情況
<networkConnectors><networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"> </networkConnectors>4. 動(dòng)態(tài)連接器
<networkConnectors><networkConnector uri="multicast://default"> </networkConnectors> <transportConnectors><transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"> </transportConnectors>4.4 Master/Slave集群配置
1. Master/Slave集群方案
Share nothing storage master/slave (5.8 以后的版本刪除了)
Share storage master/slave 共享存儲(chǔ)
Replicated LevelDB Store 基于可復(fù)制的LevelDB Store
2. 共享存儲(chǔ)集群的原理
先啟動(dòng)A,A就因?yàn)榕潘i獨(dú)占資源成為Master,此時(shí)A有外部服務(wù)能力,而B沒(méi)有
如果A掛了,則B獲取資源成為Master,這時(shí)所有請(qǐng)求都會(huì)交給B
3. 基于復(fù)制的LevelDB Store的原理
因?yàn)槭腔赯ooKeeper的,所以至少需要3勸服務(wù)器。zk選舉A作為Master后,A就具有了外部服務(wù)能力,而B、C沒(méi)有。當(dāng)A獲取到外部資源存儲(chǔ)后,會(huì)通過(guò)zk將資源同步到B和C。
如果A故障,則zk會(huì)重新選舉一個(gè)節(jié)點(diǎn)作為Master
4.5 Broker clusters和Master Slave對(duì)比
| Master/Slave | 是 | 否 |
| Broker Cluster | 否 | 是 |
4.6 高可用且負(fù)載均衡的集群方案
第五章 消息中間件如何傳對(duì)象
利用Json
總結(jié)
以上是生活随笔為你收集整理的Java消息中间件(activeMQ)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: solr管理界面详解
- 下一篇: Java 多态的简单介绍.