007_JMS中的持久订阅
1. 持久訂閱時, 客戶端需要首先向JMS提供者注冊一個表面自己身份的id(clientId)。這樣當咱們這個客戶端處于離線時, JMS提供者會為這個客戶端保存所有發送到主題的消息。當客戶端再次連接到JMS提供者時, JMS提供者根據這個客戶端id, 把消息發送給它。
2. 創建持久訂閱必須設置一個客戶端id, 不然會報如下錯誤
3. 設置客戶端id
3.1. 設置客戶端id要緊跟在創建連接之后
// 1. 創建一個連接工廠 TopicConnectionFactory cf = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); // 2. 創建連接 TopicConnection conn = cf.createTopicConnection(); // 3. 設置客戶端id conn.setClientID(clientId);3.2. 如果設置客戶端id沒有緊跟在創建連接之后回報如下錯誤
4. 多個客戶端設置clientID, clientID不能重復。如果已有一個活動的被clientID標識的客戶端, 再出現一個重復clientID標識的客戶端連接, 會報如下錯誤
5. 持久訂閱的實現機制
5.1. 生產者發送消息給提供者, 如果此時提供者發現沒有任何的消費者(包括在線/離線), 那么就會認為該消息無用, 不需要存儲, 會直接刪除。
5.2. 如果有在線的消費者, 那么提供者會將消息直接傳送給在線的消費者, 因為這個時候連接是通的, 消息有傳輸的通道。
5.3. 如果有離線的消費者, 那么提供者會把屬于該消費者的消息存儲下來, 等消費者在線的時候, 再將保存的離線消息推送給它。對于持久訂閱者, 提供者會在該消費者第一次登錄在線的時候, 將它的身份信息記錄下來。記錄身份的關鍵就是clientID和主題名稱。當持久訂閱者又重新在線的時候, 提供者會根據當前連接的clientID和主題名稱, 去查詢屬于它的離線消息, 并進行推送。
6. 例子
6.1. 新建一個名為JMSDurableSubscriber的Java項目, 同時拷入相關jar包
6.2. 編寫MyProducer.java
package com.jmsapp.persistent;import javax.jms.JMSException; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class MyProducer {// 默認連接用戶名private static final String dftUsr = ActiveMQConnection.DEFAULT_USER;// 默認用戶密碼private static final String dftPwd = ActiveMQConnection.DEFAULT_PASSWORD;// 默認連接地址private static final String dftUrl = ActiveMQConnection.DEFAULT_BROKER_URL;// 隊列名稱private static final String topicName = "persistentSubscriber";public static void main(String[] args) {// 1. 創建一個連接工廠TopicConnectionFactory cf = new ActiveMQConnectionFactory(dftUsr, dftPwd, dftUrl);// 連接對象TopicConnection conn = null;// 會話對象TopicSession session = null;try {// 2. 創建連接conn = cf.createTopicConnection();// 3. 啟動連接conn.start();// 4. 創建會話session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 創建消息目的地。如果是點對點, 那么它的實現是Queue; 如果是訂閱模式, 那它的實現是Topic。這里我們創建一個名為persistentSubscriber的主題。Topic topic = session.createTopic(topicName);// 6. 消息生產者TopicPublisher publisher = session.createPublisher(topic);// 7. 創建文本消息和發送消息StreamMessage message = session.createStreamMessage();message.writeString("JMS中的持久訂閱");publisher.publish(message);} catch (JMSException e) {e.printStackTrace();} finally {try {if (session != null) {session.close();}} catch (JMSException e1) {e1.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}} }6.3. 編寫MyConsumer.java
package com.jmsapp.persistent;import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class MyConsumer {// 默認連接用戶名private static final String dftUsr = ActiveMQConnection.DEFAULT_USER;// 默認用戶密碼private static final String dftPwd = ActiveMQConnection.DEFAULT_PASSWORD;// 默認連接地址private static final String dftUrl = ActiveMQConnection.DEFAULT_BROKER_URL;// 隊列名稱private static final String topicName = "persistentSubscriber";// 客戶端idprivate static final String clientId = "rjbd";public static void main(String[] args) {// 1. 創建一個連接工廠TopicConnectionFactory cf = new ActiveMQConnectionFactory(dftUsr, dftPwd, dftUrl);// 連接對象TopicConnection conn = null;// 會話對象TopicSession session = null;try {// 2. 創建連接conn = cf.createTopicConnection();// 3. 設置客戶端idconn.setClientID(clientId);// 4. 創建會話session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 創建消息目的地。如果是點對點, 那么它的實現是Queue; 如果是訂閱模式, 那它的實現是Topic。這里我們創建一個名為persistentSubscriber的主題。Topic topic = session.createTopic(topicName);// 6. 消息消費者TopicSubscriber subscriber = session.createDurableSubscriber(topic, clientId);// 7. 接收消息subscriber.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message msg) {try {StreamMessage message = (StreamMessage) msg;System.out.println("接收: " + message.readString());} catch (JMSException e) {e.printStackTrace();}}});// 8. 啟動連接, 準備開始接收消息conn.start();} catch (JMSException e) {e.printStackTrace();}} }6.4. 運行MyConsumer.java, 接收端是一直處于運行狀態的
6.5. 終止運行MyConsumer.java
6.6. 運行MyProducer.java
6.7. 再次運行MyConsumer.java, 接收到消息(非持久化訂閱, 這樣操作就接收不到消息)
總結
以上是生活随笔為你收集整理的007_JMS中的持久订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 006_Topic消息模式发送对象消息
- 下一篇: 008_Queue消息模式发送映射消息