jms规范以及activeMq相关介绍
jms
提出的指在統一各種MOM(Message-Oriented Middleware )系統接口的規范,只是接口,不包含實現,實現JMS 接口的消息中間件稱為JMS Provider。activemq是Apache出品的開源項目,它是JMS規范的一個實現
接口構成:
ConnectionFactory:連接工廠接口用于建立連接
Connection:封裝了JMS 客戶端到JMS Provider 的連接
Destination:用來指定生產的消息的目標和它消費的消息的來源的對象
Session:單線程上下文創建生產者、消費者。消息等
MessageProducer:由Session 對象創建的用來發送消息的對象
MessageConsumer:由Session 對象創建的用來接收消息的對象
消息結構:
消息頭:1、自動分配的消息頭:JMSDeliveryMode、JMSMessageID、JMSTimestamp、JMSExpiration、JMSRedelivered、JMSPriority2、開發者分配的消息頭:JMSReplyTo、JMSCorrelationID、JMSType
消息屬性:1. 應用需要用到的屬性;2. 消息頭中原有的一些可選屬性;3. JMS Provider 需要用到的屬性
消息體:1、TextMessage2、MapMessage3、BytesMessage4、StreamMessage5、ObjectMessage6、Message
開發步驟:
1.創建ConnectionFactory2.創建Connection3.創建一個或多個JMS Session4.創建Queue或TopicSession5.創建MessageProducer或MessageConsumer6.發送或者接受消息:(1、異步接收MessageConsumer:setMessageListener(MessageListener m)2、同步接收MessageConsumer:receive() 以上兩種接收方式返回Message 對象)
activeMq
模型分析:
生產者通過TransportConnection對象發送信息,TransportConnection對象指向一個regionBroker,這個regionBroker(區域中間件)中包含了四種區域(隊列域(queueRegion)、主題域(topicRegion)、臨時隊列域(tempQueueRegion)、臨時主題域(tempTopicRegion)),而每個Region(域)中包含了N個destination(目的地對象),生產者的信息通過regionBroker傳給Region后又傳給destination,在destination中進行持久化操作,并且"待發送消息指針"對象指向待發送消息的位置,然后當有消息訂閱時destination將信息發送給訂閱者,(這里注意,訂閱信息也在region中),然后訂閱機制通過transportConnection發送消息給消費者進程
消息指針:通常消息在未發生或者發送后未收到消費者的確認信息時都會持久保存消息到存儲中。當有消費者來可以消費消息時,broker會批量從存儲中取出消息,發送給消費者。游標就是指向下次批量獲取消息時的存儲位置,有如下幾種:
- Store-based cursors ?基于存儲的,慢速消費者需要使用pending cursor來指定下一次批量讀取消息的位置
- VM cursors?內存保存一些游標
- File-based cursors? 內存超過閾值,存儲到臨時文件。
通信機制:
消息終結條件:?Consume ack?消息過期 ?存儲設備溢出
消息發送過程如圖:
這里主要涉及到異步發送和同步發送,異步發送不會返回發送結果,但是效率較高,適用于大量并且允許丟失的數據,同步發送會阻塞并返回發送結果,適用于不可丟失的情況
判定異步發送還是同步發送是在mq代碼中判斷的,非持久化消息是異步發送的,持久化消息并且是在非事務模式下是同步發送的。但是在開啟事務的情況下,消息都是異步發送。至于持久化消息非持久化消息,以及事務模式在后邊"可靠機制"章節中講解
消息接收流程:
1,服務器發送消息有一個隊列,如果隊列滿了需要等待消息的回執(ack)來清理消息,后繼續發送,消費者接收信息后需要返回回執信息ack
2.消費者客戶端用receive方法可以定義欲獲取消息個數,如果大于0則通過拉取的方式獲取相應數量的信息,如果小于0則通過推送的方式獲取消息,獲取隊列中沒有消息則阻塞
3.消費者客戶端用messagelistener方法異步獲取消息時調用onmessage方法處理消息
可靠機制:
指定消息傳送模式:
1.NON_PERSISTENT(非持久性消息)
??? 保證這些消息最多被傳送一次。對于這些消息,可靠性并非主要的考慮因素。此模式并不要求持久性的數據存儲,也不保證消息服務由于某種原因導致失敗后消息不會丟失。
2.PERSISTENT(持久性消息)
??????? 這是ActiveMQ的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對于這些消息,可靠性是優先考慮的因素。可靠性的另一個重要方面是確保持久性消息傳送至目標后,消息服務在向消費者傳送它們之前不會丟失這些消息。
有兩種方法指定傳送模式:
1.使用setDeliveryMode方法,這樣所有的消息都采用此傳送模式;
2.使用send方法為每一條消息設置傳送模式;
默認存儲方式:kahadb,文件消息存儲器
事務和消息的簽收ack:
簽收可以由ActiveMQ發起,也可以由客戶端發起,取決于Session簽收模式的設置。
?在帶事務的Session中,簽收自動發生在事務提交時。如果事務回滾,所有已經接收的消息將會被再次傳送。Session配置為SESSION_TRANSACTED。
?在不帶事務的Session中,一條消息何時和如何被簽收取決于Session的設置控制消息的簽收
- 1.Session.AUTO_ACKNOWLEDGE(自動簽收)
- 2.Session.CLIENT_ACKNOWLEDGE(客戶端簽收)
- 3.Session.DUPS_OK_ACKNOWLEDGE(不必確保對傳送消息的簽收)
???????消息的重傳:
?觸發條件 CLIENT_ACKNOWLEDGE+recover(),事務rollback(),AUTO_ACKNOWLEDGE異常
?RedeliveryPolicy 配置對應的重發機制
?重傳次數配置
?重發最終失敗會丟棄到死信隊列
處理失敗的過期的、處理失敗的消息,將會被ActiveMQ置入“ActiveMQ.DLQ”這個死信隊列中 死信隊列可以拿到,通過監聽死信隊列獲取這些信息? 死信隊列支持配置 比如消息過期的不放進來
設置消息優先級:
普通情況下可以確保將單個會話向目標發送的所有消息按其發送順序傳送至消費者。然而,如果為這些消息分配了不同的優先級,消息傳送系統將首先嘗試傳送優先級較高的消息。
有兩種方法設置消息的優先級:
1.使用setDeliveryMode方法,這樣所有的消息都采用此傳送模式;
2.使用send方法為每一條消息設置傳送模式;
注意:消息級別分為0至9,其中0至4是普通級別,5至9為加急消息
允許消息過期:
默認情況下,消息永不會過期。如果消息在特定周期內失去意義,那么可以設置過期時間。
有兩種方法設置消息的過期時間,時間單位為毫秒:
1.使用setTimeToLive方法為所有的消息設置過期時間;
2.使用send方法為每一條消息設置過期時間;
注意:是在消息發送端調用setTimeToLive方法。
創建臨時目標
ActiveMQ通過createTemporaryQueue和createTemporaryTopic創建臨時目標,這些目標持續到創建它的Connection關閉。只有創建臨時目標的Connection所創建的客戶端才可以從臨時目標中接收消息,但是任何的生產者都可以向臨時目標中發送消息。如果關閉了創建此目標的Connection,那么臨時目標被關閉,內容也將消失。
Topic持久訂閱:
消息訂閱分為非持久訂閱(non-durable subscription)和持久訂閱(durable subscription),非持久訂閱只有當客戶端處于激活狀態,也就是和ActiveMQ保持連接狀態才能收到發送到某個主題的消息,而當客戶端處于離線狀態,這個時間段發到主題的消息將會丟失,永遠不會收到。
建立持久訂閱的步驟:
1 .? 生產者設置為持久化模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
1.?消費者 為連接設置一個客戶ID;
topicConnection.setClientID("clientc");
2.? 消費者為訂閱的主題指定一個訂閱名稱;topicSession.createDurableSubscriber(mytopic,"clientc")
注意:上述組合必須唯一。
持久訂閱例子方法:TopicSubscriber createDurableSubscriber(Topic topic, String name)
非持久化訂閱例子:session.createSubscriber(topic);//無需向connection指定clientId
lMessage Groups 消息分組
?針對queue 一個隊列 對應多個消費者,默認消費者會去競爭獲取消息。
?利用消息分組,具有相同groupId的消息會被投送到同一個消費者(只要這個consumer保持active)
?負載均衡? broken收到消息,檢查消息JMSXGroupID屬性。如果存在,那么broker會檢查是否有某個consumer擁有這個message group。如果沒有,那么broker會選擇一個consumer,并將它關聯到這個message group。此后,這個consumer會接收這個message group的所有消息
- message.setStringProperty("JMSXGroupID","GroupA");
???????spring集成jms:
Spring框架提供一個模板機制JMSTemplate來隱藏Java API細節。參考源碼類JMSTemplate.java
Spring提供了JMSTemplate類,所以開發者不必為JMS實現編寫樣本代碼。何為模板模式,請參考https://blog.csdn.net/l450741881/article/details/88837064
總結
以上是生活随笔為你收集整理的jms规范以及activeMq相关介绍的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SOLO在windows 10环境下安装
- 下一篇: java泰坦宙斯之战程序_详解Hadoo