ActiveMQ消息传送机制以及ACK机制详解
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
?AcitveMQ是作為一種消息存儲(chǔ)和分發(fā)組件,涉及到client與broker端數(shù)據(jù)交互的方方面面,它不僅要擔(dān)保消息的存儲(chǔ)安全性,還要提供額外的手段來(lái)確保消息的分發(fā)是可靠的。
?
一. ActiveMQ消息傳送機(jī)制
? ? Producer客戶(hù)端使用來(lái)發(fā)送消息的, Consumer客戶(hù)端用來(lái)消費(fèi)消息;它們的協(xié)同中心就是ActiveMQ broker,broker也是讓producer和consumer調(diào)用過(guò)程解耦的工具,最終實(shí)現(xiàn)了異步RPC/數(shù)據(jù)交換的功能。隨著ActiveMQ的不斷發(fā)展,支持了越來(lái)越多的特性,也解決開(kāi)發(fā)者在各種場(chǎng)景下使用ActiveMQ的需求。比如producer支持異步調(diào)用;使用flow control機(jī)制讓broker協(xié)同consumer的消費(fèi)速率;consumer端可以使用prefetchACK來(lái)最大化消息消費(fèi)的速率;提供"重發(fā)策略"等來(lái)提高消息的安全性等。在此我們不詳細(xì)介紹。
一條消息的生命周期如下:
? ? ? ? ? ? ? ? ? ? ? ?
??圖片中簡(jiǎn)單的描述了一條消息的生命周期,不過(guò)在不同的架構(gòu)環(huán)境中,message的流動(dòng)行可能更加復(fù)雜.將在稍后有關(guān)broker的架構(gòu)中詳解..一條消息從producer端發(fā)出之后,一旦被broker正確保存,那么它將會(huì)被consumer消費(fèi),然后ACK,broker端才會(huì)刪除;不過(guò)當(dāng)消息過(guò)期或者存儲(chǔ)設(shè)備溢出時(shí),也會(huì)終結(jié)它。
? ? ? ? ? ?
?
??這是一張很復(fù)雜,而且有些凌亂的圖片;這張圖片中簡(jiǎn)單的描述了:1)producer端如何發(fā)送消息 2) consumer端如何消費(fèi)消息 3) broker端如何調(diào)度。如果用文字來(lái)描述圖示中的概念,恐怕一言難盡。圖示中,提及到prefetchAck,以及消息同步、異步發(fā)送的基本邏輯;這對(duì)你了解下文中的ACK機(jī)制將有很大的幫助。
?
二. optimizeACK
? ? "可優(yōu)化的ACK",這是ActiveMQ對(duì)于consumer在消息消費(fèi)時(shí),對(duì)消息ACK的優(yōu)化選項(xiàng),也是consumer端最重要的優(yōu)化參數(shù)之一,你可以通過(guò)如下方式開(kāi)啟:
? ? 1) 在brokerUrl中增加如下查詢(xún)字符串:?
String brokerUrl = "tcp://localhost:61616?" + "jms.optimizeAcknowledge=true" + "&jms.optimizeAcknowledgeTimeOut=30000" + "&jms.redeliveryPolicy.maximumRedeliveries=6"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);2) 在destinationUri中,增加如下查詢(xún)字符串:
String queueName = "test-queue?customer.prefetchSize=100"; Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue(queueName);? 我們需要在brokerUrl指定optimizeACK選項(xiàng),在destinationUri中指定prefetchSize(預(yù)獲取)選項(xiàng),其中brokerUrl參數(shù)選項(xiàng)是全局的,即當(dāng)前factory下所有的connection/session/consumer都會(huì)默認(rèn)使用這些值;而destinationUri中的選項(xiàng),只會(huì)在使用此destination的consumer實(shí)例中有效;如果同時(shí)指定,brokerUrl中的參數(shù)選項(xiàng)值將會(huì)被覆蓋。optimizeAck表示是否開(kāi)啟“優(yōu)化ACK”,只有在為true的情況下,prefetchSize(下文中將會(huì)簡(jiǎn)寫(xiě)成prefetch)以及optimizeAcknowledgeTimeout參數(shù)才會(huì)有意義。此處需要注意"optimizeAcknowledgeTimeout"選項(xiàng)只能在brokerUrl中配置。
? ? prefetch值建議在destinationUri中指定,因?yàn)樵赽rokerUrl中指定比較繁瑣;在brokerUrl中,queuePrefetchSize和topicPrefetchSize都需要單獨(dú)設(shè)定:"&jms.prefetchPolicy.queuePrefetch=12&jms.prefetchPolicy.topicPrefetch=12"等來(lái)逐個(gè)指定。
?
? ? 如果prefetchACK為true,那么prefetch必須大于0;當(dāng)prefetchACK為false時(shí),你可以指定prefetch為0以及任意大小的正數(shù)。不過(guò),當(dāng)prefetch=0是,表示consumer將使用PULL(拉取)的方式從broker端獲取消息,broker端將不會(huì)主動(dòng)push消息給client端,直到client端發(fā)送PullCommand時(shí);當(dāng)prefetch>0時(shí),就開(kāi)啟了broker push模式,此后只要當(dāng)client端消費(fèi)且ACK了一定的消息之后,會(huì)立即push給client端多條消息。
?
? ? 當(dāng)consumer端使用receive()方法同步獲取消息時(shí),prefetch可以為0和任意正值;當(dāng)prefetch=0時(shí),那么receive()方法將會(huì)首先發(fā)送一個(gè)PULL指令并阻塞,直到broker端返回消息為止,這也意味著消息只能逐個(gè)獲取(類(lèi)似于Request<->Response),這也是Activemq中PULL消息模式;當(dāng)prefetch > 0時(shí),broker端將會(huì)批量push給client 一定數(shù)量的消息(<= prefetch),client端會(huì)把這些消息(unconsumedMessage)放入到本地的隊(duì)列中,只要此隊(duì)列有消息,那么receive方法將會(huì)立即返回,當(dāng)一定量的消息ACK之后,broker端會(huì)繼續(xù)批量push消息給client端。
?
? ? 當(dāng)consumer端使用MessageListener異步獲取消息時(shí),這就需要開(kāi)發(fā)設(shè)定的prefetch值必須 >=1,即至少為1;在異步消費(fèi)消息模式中,設(shè)定prefetch=0,是相悖的,也將獲得一個(gè)Exception。
?
? ? 此外,我們還可以brokerUrl中配置“redelivery”策略,比如當(dāng)一條消息處理異常時(shí),broker端可以重發(fā)的最大次數(shù);和下文中提到REDELIVERED_ACK_TYPE互相協(xié)同。當(dāng)消息需要broker端重發(fā)時(shí),consumer會(huì)首先在本地的“deliveredMessage隊(duì)列”(Consumer已經(jīng)接收但還未確認(rèn)的消息隊(duì)列)刪除它,然后向broker發(fā)送“REDELIVERED_ACK_TYPE”類(lèi)型的確認(rèn)指令,broker將會(huì)把指令中指定的消息重新添加到pendingQueue(亟待發(fā)送給consumer的消息隊(duì)列)中,直到合適的時(shí)機(jī),再次push給client。
?
? ? 到目前為止,或許你知道了optimizeACK和prefeth的大概意義,不過(guò)我們可能還會(huì)有些疑惑!!optimizeACK和prefetch配合,將會(huì)達(dá)成一個(gè)高效的消息消費(fèi)模型:批量獲取消息,并“延遲”確認(rèn)(ACK)。prefetch表達(dá)了“批量獲取”消息的語(yǔ)義,broker端主動(dòng)的批量push多條消息給client端,總比client多次發(fā)送PULL指令然后broker返回一條消息的方式要優(yōu)秀很多,它不僅減少了client端在獲取消息時(shí)阻塞的次數(shù)和阻塞的時(shí)間,還能夠大大的減少網(wǎng)絡(luò)開(kāi)支。optimizeACK表達(dá)了“延遲確認(rèn)”的語(yǔ)義(ACK時(shí)機(jī)),client端在消費(fèi)消息后暫且不發(fā)送ACK,而是把它緩存下來(lái)(pendingACK),等到這些消息的條數(shù)達(dá)到一定閥值時(shí),只需要通過(guò)一個(gè)ACK指令把它們?nèi)看_認(rèn);這比對(duì)每條消息都逐個(gè)確認(rèn),在性能上要提高很多。由此可見(jiàn),prefetch優(yōu)化了消息傳送的性能,optimizeACK優(yōu)化了消息確認(rèn)的性能。
?
? ? 當(dāng)consumer端消息消費(fèi)的速率很高(相對(duì)于producer生產(chǎn)消息),而且消息的數(shù)量也很大時(shí)(比如消息源源不斷的生產(chǎn)),我們使用optimizeACK + prefetch將會(huì)極大的提升consumer的性能。不過(guò)反過(guò)來(lái):
? ? 1) 如果consumer端消費(fèi)速度很慢(對(duì)消息的處理是耗時(shí)的),過(guò)大的prefetchSize,并不能有效的提升性能,反而不利于consumer端的負(fù)載均衡(只針對(duì)queue);按照良好的設(shè)計(jì)準(zhǔn)則,當(dāng)consumer消費(fèi)速度很慢時(shí),我們通常會(huì)部署多個(gè)consumer客戶(hù)端,并使用較小的prefetch,同時(shí)關(guān)閉optimizeACK,可以讓消息在多個(gè)consumer間“負(fù)載均衡”(即均勻的發(fā)送給每個(gè)consumer);如果較大的prefetchSize,將會(huì)導(dǎo)致broker一次性push給client大量的消息,但是這些消息需要很久才能ACK(消息積壓),而且在client故障時(shí),還會(huì)導(dǎo)致這些消息的重發(fā)。
?
? ? 2) 如果consumer端消費(fèi)速度很快,但是producer端生成消息的速率較慢,比如生產(chǎn)者10秒鐘生成10條消息,但是consumer一秒就能消費(fèi)完畢,而且我們還部署了多個(gè)consumer!!這種場(chǎng)景下,建議開(kāi)啟optimizeACK,但是需要設(shè)置的prefetchSize不能過(guò)大;這樣可以保證每個(gè)consumer都能有"活干",否則將會(huì)出現(xiàn)一個(gè)consumer非常忙碌,但是其他consumer幾乎收不到消息。
?
? ? 3) 如果消息很重要,特別是不愿意接收到”redelivery“的消息,那么我們需要將optimizeACK=false,prefetchSize=1
?
? ? 既然optimizeACK是”延遲“確認(rèn),那么就引入一種潛在的風(fēng)險(xiǎn):在消息被消費(fèi)之后還沒(méi)有來(lái)得及確認(rèn)時(shí),client端發(fā)生故障,那么這些消息就有可能會(huì)被重新發(fā)送給其他consumer,那么這種風(fēng)險(xiǎn)就需要client端能夠容忍“重復(fù)”消息。
?
? ? prefetch值默認(rèn)為1000,當(dāng)然這個(gè)值可能在很多場(chǎng)景下是偏大的;我們暫且不考慮ACK模式(參見(jiàn)下文),通常情況下,我們只需要簡(jiǎn)單的統(tǒng)計(jì)出單個(gè)consumer每秒的最大消費(fèi)消息數(shù)即可,比如一個(gè)consumer每秒可以處理100個(gè)消息,我們期望consumer端每2秒確認(rèn)一次,那么我們的prefetchSize可以設(shè)置為100 * 2 /0.65大概為300。無(wú)論如何設(shè)定此值,client持有的消息條數(shù)最大為:prefetch + “DELIVERED_ACK_TYPE消息條數(shù)”(DELIVERED_ACK_TYPE參見(jiàn)下文)
?
? ? ?即使當(dāng)optimizeACK為true,也只會(huì)當(dāng)session的ACK模式為AUTO_ACKNOWLEDGE時(shí)才會(huì)生效,即在其他類(lèi)型的ACK模式時(shí)consumer端仍然不會(huì)“延遲確認(rèn)”,即:
consumer.optimizeAck = connection.optimizeACK && session.isAutoAcknowledge()當(dāng)consumer.optimizeACK有效時(shí),如果客戶(hù)端已經(jīng)消費(fèi)但尚未確認(rèn)的消息(deliveredMessage)達(dá)到prefetch * 0.65,consumer端將會(huì)自動(dòng)進(jìn)行ACK;同時(shí)如果離上一次ACK的時(shí)間間隔,已經(jīng)超過(guò)"optimizeAcknowledgeTimout"毫秒,也會(huì)導(dǎo)致自動(dòng)進(jìn)行ACK。
? ? 此外簡(jiǎn)單的補(bǔ)充一下,批量確認(rèn)消息時(shí),只需要在ACK指令中指明“firstMessageId”和“l(fā)astMessageId”即可,即消息區(qū)間,那么broker端就知道此consumer(根據(jù)consumerId識(shí)別)需要確認(rèn)哪些消息。
?
三. ACK模式與類(lèi)型介紹
? ? JMS API中約定了Client端可以使用四種ACK模式,在javax.jms.Session接口中:
- AUTO_ACKNOWLEDGE?= 1 ? ?自動(dòng)確認(rèn)
- CLIENT_ACKNOWLEDGE?= 2 ? ?客戶(hù)端手動(dòng)確認(rèn) ??
- DUPS_OK_ACKNOWLEDGE?= 3 ? ?自動(dòng)批量確認(rèn)
- SESSION_TRANSACTED?= 0 ? ?事務(wù)提交并確認(rèn)
? ? 此外AcitveMQ補(bǔ)充了一個(gè)自定義的ACK模式:
- INDIVIDUAL_ACKNOWLEDGE?= 4 ? ?單條消息確認(rèn)
? ? 我們?cè)陂_(kāi)發(fā)JMS應(yīng)用程序的時(shí)候,會(huì)經(jīng)常使用到上述ACK模式,其中"INDIVIDUAL_ACKNOWLEDGE?"只有ActiveMQ支持,當(dāng)然開(kāi)發(fā)者也可以使用它. ACK模式描述了Consumer與broker確認(rèn)消息的方式(時(shí)機(jī)),比如當(dāng)消息被Consumer接收之后,Consumer將在何時(shí)確認(rèn)消息。對(duì)于broker而言,只有接收到ACK指令,才會(huì)認(rèn)為消息被正確的接收或者處理成功了,通過(guò)ACK,可以在consumer(/producer)與Broker之間建立一種簡(jiǎn)單的“擔(dān)保”機(jī)制.?
? ?
? ? Client端指定了ACK模式,但是在Client與broker在交換ACK指令的時(shí)候,還需要告知ACK_TYPE,ACK_TYPE表示此確認(rèn)指令的類(lèi)型,不同的ACK_TYPE將傳遞著消息的狀態(tài),broker可以根據(jù)不同的ACK_TYPE對(duì)消息進(jìn)行不同的操作。
?
? ? 比如Consumer消費(fèi)消息時(shí)出現(xiàn)異常,就需要向broker發(fā)送ACK指令,ACK_TYPE為"REDELIVERED_ACK_TYPE",那么broker就會(huì)重新發(fā)送此消息。在JMS API中并沒(méi)有定義ACT_TYPE,因?yàn)樗ǔJ且环N內(nèi)部機(jī)制,并不會(huì)面向開(kāi)發(fā)者。ActiveMQ中定義了如下幾種ACK_TYPE(參看MessageAck類(lèi)):
?
- DELIVERED_ACK_TYPE?= 0 ? ?消息"已接收",但尚未處理結(jié)束
- STANDARD_ACK_TYPE?= 2 ? ?"標(biāo)準(zhǔn)"類(lèi)型,通常表示為消息"處理成功",broker端可以刪除消息了
- POSION_ACK_TYPE?= 1 ? ?消息"錯(cuò)誤",通常表示"拋棄"此消息,比如消息重發(fā)多次后,都無(wú)法正確處理時(shí),消息將會(huì)被刪除或者DLQ(死信隊(duì)列)
- REDELIVERED_ACK_TYPE?= 3 ? ?消息需"重發(fā)",比如consumer處理消息時(shí)拋出了異常,broker稍后會(huì)重新發(fā)送此消息
- INDIVIDUAL_ACK_TYPE?= 4 ? ?表示只確認(rèn)"單條消息",無(wú)論在任何ACK_MODE下 ? ?
- UNMATCHED_ACK_TYPE?= 5 ? ?在Topic中,如果一條消息在轉(zhuǎn)發(fā)給“訂閱者”時(shí),發(fā)現(xiàn)此消息不符合Selector過(guò)濾條件,那么此消息將 不會(huì)轉(zhuǎn)發(fā)給訂閱者,消息將會(huì)被存儲(chǔ)引擎刪除(相當(dāng)于在Broker上確認(rèn)了消息)。
? ? 到目前為止,我們已經(jīng)清楚了大概的原理: Client端在不同的ACK模式時(shí),將意味著在不同的時(shí)機(jī)發(fā)送ACK指令,每個(gè)ACK Command中會(huì)包含ACK_TYPE,那么broker端就可以根據(jù)ACK_TYPE來(lái)決定此消息的后續(xù)操作. 接下來(lái),我們?cè)敿?xì)的分析ACK模式與ACK_TYPE.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);?我們需要在創(chuàng)建Session時(shí)指定ACK模式,由此可見(jiàn),ACK模式將是session共享的,意味著一個(gè)session下所有的 consumer都使用同一種ACK模式。在創(chuàng)建Session時(shí),開(kāi)發(fā)者不能指定除ACK模式列表之外的其他值.如果此session為事務(wù)類(lèi)型,用戶(hù)指定的ACK模式將被忽略,而強(qiáng)制使用"SESSION_TRANSACTED"類(lèi)型;如果session非事務(wù)類(lèi)型時(shí),也將不能將 ACK模式設(shè)定為"SESSION_TRANSACTED",畢竟這是相悖的.? ?
? ? ? ? ? ? ? ??
? Consumer消費(fèi)消息的風(fēng)格有2種: 同步/異步..使用consumer.receive()就是同步,使用messageListener就是異步;在同一個(gè)consumer中,我們不能同時(shí)使用這2種風(fēng)格,比如在使用listener的情況下,當(dāng)調(diào)用receive()方法將會(huì)獲得一個(gè)Exception。兩種風(fēng)格下,消息確認(rèn)時(shí)機(jī)有所不同。
?? 同步調(diào)用時(shí),在消息從receive方法返回之前,就已經(jīng)調(diào)用了ACK;因此如果Client端沒(méi)有處理成功,此消息將丟失(可能重發(fā),與ACK模式有關(guān))。
? ? 基于異步調(diào)用時(shí),消息的確認(rèn)是在onMessage方法返回之后,如果onMessage方法異常,會(huì)導(dǎo)致消息不能被ACK,會(huì)觸發(fā)重發(fā)。
?
四. ACK模式詳解
? ??AUTO_ACKNOWLEDGE :?自動(dòng)確認(rèn),這就意味著消息的確認(rèn)時(shí)機(jī)將有consumer擇機(jī)確認(rèn)."擇機(jī)確認(rèn)"似乎充滿(mǎn)了不確定性,這也意味著,開(kāi)發(fā)者必須明確知道"擇機(jī)確認(rèn)"的具體時(shí)機(jī),否則將有可能導(dǎo)致消息的丟失,或者消息的重復(fù)接收.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何運(yùn)作的呢?
??? 1) 對(duì)于consumer而言,optimizeAcknowledge屬性只會(huì)在AUTO_ACK模式下有效。
??? 2) 其中DUPS_ACKNOWLEGE也是一種潛在的AUTO_ACK,只是確認(rèn)消息的條數(shù)和時(shí)間上有所不同。
? ? 3) 在“同步”(receive)方法返回message之前,會(huì)檢測(cè)optimizeACK選項(xiàng)是否開(kāi)啟,如果沒(méi)有開(kāi)啟,此單條消息將立即確認(rèn),所以在這種情況下,message返回之后,如果開(kāi)發(fā)者在處理message過(guò)程中出現(xiàn)異常,會(huì)導(dǎo)致此消息也不會(huì)redelivery,即"潛在的消息丟失";如果開(kāi)啟了optimizeACK,則會(huì)在unAck數(shù)量達(dá)到prefetch * 0.65時(shí)確認(rèn),當(dāng)然我們可以指定prefetchSize = 1來(lái)實(shí)現(xiàn)逐條消息確認(rèn)。
? ? 4) 在"異步"(messageListener)方式中,將會(huì)首先調(diào)用listener.onMessage(message),此后再ACK,如果onMessage方法異常,將導(dǎo)致client端補(bǔ)充發(fā)送一個(gè)ACK_TYPE為REDELIVERED_ACK_TYPE確認(rèn)指令;如果onMessage方法正常,消息將會(huì)正常確認(rèn)(STANDARD_ACK_TYPE)。此外需要注意,消息的重發(fā)次數(shù)是有限制的,每條消息中都會(huì)包含“redeliveryCounter”計(jì)數(shù)器,用來(lái)表示此消息已經(jīng)被重發(fā)的次數(shù),如果重發(fā)次數(shù)達(dá)到閥值,將會(huì)導(dǎo)致發(fā)送一個(gè)ACK_TYPE為POSION_ACK_TYPE確認(rèn)指令,這就導(dǎo)致broker端認(rèn)為此消息無(wú)法消費(fèi),此消息將會(huì)被刪除或者遷移到"dead letter"通道中。
? ? 因此當(dāng)我們使用messageListener方式消費(fèi)消息時(shí),通常建議在onMessage方法中使用try-catch,這樣可以在處理消息出錯(cuò)時(shí)記錄一些信息,而不是讓consumer不斷去重發(fā)消息;如果你沒(méi)有使用try-catch,就有可能會(huì)因?yàn)楫惓6鴮?dǎo)致消息重復(fù)接收的問(wèn)題,需要注意你的onMessage方法中邏輯是否能夠兼容對(duì)重復(fù)消息的判斷。
? ? ? ? ? ? ?
??CLIENT_ACKNOWLEDGE :?客戶(hù)端手動(dòng)確認(rèn),這就意味著AcitveMQ將不會(huì)“自作主張”的為你ACK任何消息,開(kāi)發(fā)者需要自己擇機(jī)確認(rèn)。在此模式下,開(kāi)發(fā)者需要需要關(guān)注幾個(gè)方法:1) message.acknowledge(),2) ActiveMQMessageConsumer.acknowledege(),3) ActiveMQSession.acknowledge();其1)和3)是等效的,將當(dāng)前session中所有consumer中尚未ACK的消息都一起確認(rèn),2)只會(huì)對(duì)當(dāng)前consumer中那些尚未確認(rèn)的消息進(jìn)行確認(rèn)。開(kāi)發(fā)者可以在合適的時(shí)機(jī)必須調(diào)用一次上述方法。為了避免混亂,對(duì)于這種ACK模式下,建議一個(gè)session下只有一個(gè)consumer。
?
? ? 我們通常會(huì)在基于Group(消息分組)情況下會(huì)使用CLIENT_ACKNOWLEDGE,我們將在一個(gè)group的消息序列接受完畢之后確認(rèn)消息(組);不過(guò)當(dāng)你認(rèn)為消息很重要,只有當(dāng)消息被正確處理之后才能確認(rèn)時(shí),也可以使用此模式 ?。
?
??? 如果開(kāi)發(fā)者忘記調(diào)用acknowledge方法,將會(huì)導(dǎo)致當(dāng)consumer重啟后,會(huì)接受到重復(fù)消息,因?yàn)閷?duì)于broker而言,那些尚未真正ACK的消息被視為“未消費(fèi)”。
??? 開(kāi)發(fā)者可以在當(dāng)前消息處理成功之后,立即調(diào)用message.acknowledge()方法來(lái)"逐個(gè)"確認(rèn)消息,這樣可以盡可能的減少因網(wǎng)絡(luò)故障而導(dǎo)致消息重發(fā)的個(gè)數(shù);當(dāng)然也可以處理多條消息之后,間歇性的調(diào)用acknowledge方法來(lái)一次確認(rèn)多條消息,減少ack的次數(shù)來(lái)提升consumer的效率,不過(guò)這仍然是一個(gè)利弊權(quán)衡的問(wèn)題。
?
??? 除了message.acknowledge()方法之外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也可以確認(rèn)消息,只不過(guò)前者只會(huì)確認(rèn)當(dāng)前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。
?
??? 無(wú)論是“同步”/“異步”,ActiveMQ都不會(huì)發(fā)送STANDARD_ACK_TYPE,直到message.acknowledge()調(diào)用。如果在client端未確認(rèn)的消息個(gè)數(shù)達(dá)到prefetchSize * 0.5時(shí),會(huì)補(bǔ)充發(fā)送一個(gè)ACK_TYPE為DELIVERED_ACK_TYPE的確認(rèn)指令,這會(huì)觸發(fā)broker端可以繼續(xù)push消息到client端。(參看PrefetchSubscription.acknwoledge方法)
?
? ? 在broker端,針對(duì)每個(gè)Consumer,都會(huì)保存一個(gè)因?yàn)?#34;DELIVERED_ACK_TYPE"而“拖延”的消息個(gè)數(shù),這個(gè)參數(shù)為prefetchExtension,事實(shí)上這個(gè)值不會(huì)大于prefetchSize * 0.5,因?yàn)镃onsumer端會(huì)嚴(yán)格控制DELIVERED_ACK_TYPE指令發(fā)送的時(shí)機(jī)(參見(jiàn)ActiveMQMessageConsumer.ackLater方法),broker端通過(guò)“prefetchExtension”與prefetchSize互相配合,來(lái)決定即將push給client端的消息個(gè)數(shù),count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已經(jīng)發(fā)送給client端但是還沒(méi)有“STANDARD_ACK_TYPE”的消息總量;由此可見(jiàn),在CLIENT_ACK模式下,足夠快速的調(diào)用acknowledge()方法是決定consumer端消費(fèi)消息的速率;如果client端因?yàn)槟撤N原因?qū)е耡cknowledge方法未被執(zhí)行,將導(dǎo)致大量消息不能被確認(rèn),broker端將不會(huì)push消息,事實(shí)上client端將處于“假死”狀態(tài),而無(wú)法繼續(xù)消費(fèi)消息。我們要求client端在消費(fèi)1.5*prefetchSize個(gè)消息之前,必須acknowledge()一次;通常我們總是每消費(fèi)一個(gè)消息調(diào)用一次,這是一種良好的設(shè)計(jì)。
?
? ? 此外需要額外的補(bǔ)充一下:所有ACK指令都是依次發(fā)送給broker端,在CLIET_ACK模式下,消息在交付給listener之前,都會(huì)首先創(chuàng)建一個(gè)DELIVERED_ACK_TYPE的ACK指令,直到client端未確認(rèn)的消息達(dá)到"prefetchSize * 0.5"時(shí)才會(huì)發(fā)送此ACK指令,如果在此之前,開(kāi)發(fā)者調(diào)用了acknowledge()方法,會(huì)導(dǎo)致消息直接被確認(rèn)(STANDARD_ACK_TYPE)。broker端通常會(huì)認(rèn)為“DELIVERED_ACK_TYPE”確認(rèn)指令是一種“slow consumer”信號(hào),如果consumer不能及時(shí)的對(duì)消息進(jìn)行acknowledge而導(dǎo)致broker端阻塞,那么此consumer將會(huì)被標(biāo)記為“slow”,此后queue中的消息將會(huì)轉(zhuǎn)發(fā)給其他Consumer。
?
? ??DUPS_OK_ACKNOWLEDGE :?"消息可重復(fù)"確認(rèn),意思是此模式下,可能會(huì)出現(xiàn)重復(fù)消息,并不是一條消息需要發(fā)送多次ACK才行。它是一種潛在的"AUTO_ACK"確認(rèn)機(jī)制,為批量確認(rèn)而生,而且具有“延遲”確認(rèn)的特點(diǎn)。對(duì)于開(kāi)發(fā)者而言,這種模式下的代碼結(jié)構(gòu)和AUTO_ACKNOWLEDGE一樣,不需要像CLIENT_ACKNOWLEDGE那樣調(diào)用acknowledge()方法來(lái)確認(rèn)消息。
?
? ? 1) 在ActiveMQ中,如果在Destination是Queue通道,我們真的可以認(rèn)為DUPS_OK_ACK就是“AUTO_ACK + optimizeACK + (prefetch > 0)”這種情況,在確認(rèn)時(shí)機(jī)上幾乎完全一致;此外在此模式下,如果prefetchSize =1 或者沒(méi)有開(kāi)啟optimizeACK,也會(huì)導(dǎo)致消息逐條確認(rèn),從而失去批量確認(rèn)的特性。
?
? ? 2) 如果Destination為T(mén)opic,DUPS_OK_ACKNOWLEDGE才會(huì)產(chǎn)生JMS規(guī)范中詮釋的意義,即無(wú)論optimizeACK是否開(kāi)啟,都會(huì)在消費(fèi)的消息個(gè)數(shù)>=prefetch * 0.5時(shí),批量確認(rèn)(STANDARD_ACK_TYPE),在此過(guò)程中,不會(huì)發(fā)送DELIVERED_ACK_TYPE的確認(rèn)指令,這是1)和AUTO_ACK的最大的區(qū)別。
?
? ? 這也意味著,當(dāng)consumer故障重啟后,那些尚未ACK的消息會(huì)重新發(fā)送過(guò)來(lái)。
?
? ??SESSION_TRANSACTED :?當(dāng)session使用事務(wù)時(shí),就是使用此模式。在事務(wù)開(kāi)啟之后,和session.commit()之前,所有消費(fèi)的消息,要么全部正常確認(rèn),要么全部redelivery。這種嚴(yán)謹(jǐn)性,通常在基于GROUP(消息分組)或者其他場(chǎng)景下特別適合。在SESSION_TRANSACTED模式下,optimizeACK并不能發(fā)揮任何效果,因?yàn)樵诖四J较?#xff0c;optimizeACK會(huì)被強(qiáng)制設(shè)定為false,不過(guò)prefetch仍然可以決定DELIVERED_ACK_TYPE的發(fā)送時(shí)機(jī)。
?
? ? 因?yàn)镾ession非線程安全,那么當(dāng)前session下所有的consumer都會(huì)共享同一個(gè)transactionContext;同時(shí)建議,一個(gè)事務(wù)類(lèi)型的Session中只有一個(gè)Consumer,以避免rollback()或者commit()方法被多個(gè)consumer調(diào)用而造成的消息混亂。
? ??
? ? 當(dāng)consumer接受到消息之后,首先檢測(cè)TransactionContext是否已經(jīng)開(kāi)啟,如果沒(méi)有,就會(huì)開(kāi)啟并生成新的transactionId,并把信息發(fā)送給broker;此后將檢測(cè)事務(wù)中已經(jīng)消費(fèi)的消息個(gè)數(shù)是否 >= prefetch * 0.5,如果大于則補(bǔ)充發(fā)送一個(gè)“DELIVERED_ACK_TYPE”的確認(rèn)指令;這時(shí)就開(kāi)始調(diào)用onMessage()方法,如果是同步(receive),那么即返回message。上述過(guò)程,和其他確認(rèn)模式?jīng)]有任何特殊的地方。
? ?
? ? 當(dāng)開(kāi)發(fā)者決定事務(wù)可以提交時(shí),必須調(diào)用session.commit()方法,commit方法將會(huì)導(dǎo)致當(dāng)前session的事務(wù)中所有消息立即被確認(rèn);事務(wù)的確認(rèn)過(guò)程中,首先把本地的deliveredMessage隊(duì)列中尚未確認(rèn)的消息全部確認(rèn)(STANDARD_ACK_TYPE);此后向broker發(fā)送transaction提交指令并等待broker反饋,如果broker端事務(wù)操作成功,那么將會(huì)把本地deliveredMessage隊(duì)列清空,新的事務(wù)開(kāi)始;如果broker端事務(wù)操作失敗(此時(shí)broker已經(jīng)rollback),那么對(duì)于session而言,將執(zhí)行inner-rollback,這個(gè)rollback所做的事情,就是將當(dāng)前事務(wù)中的消息清空并要求broker重發(fā)(REDELIVERED_ACK_TYPE),同時(shí)commit方法將拋出異常。
?
? ? 當(dāng)session.commit方法異常時(shí),對(duì)于開(kāi)發(fā)者而言通常是調(diào)用session.rollback()回滾事務(wù)(事實(shí)上開(kāi)發(fā)者不調(diào)用也沒(méi)有問(wèn)題),當(dāng)然你可以在事務(wù)開(kāi)始之后的任何時(shí)機(jī)調(diào)用rollback(),rollback意味著當(dāng)前事務(wù)的結(jié)束,事務(wù)中所有的消息都將被重發(fā)。需要注意,無(wú)論是inner-rollback還是調(diào)用session.rollback()而導(dǎo)致消息重發(fā),都會(huì)導(dǎo)致message.redeliveryCounter計(jì)數(shù)器增加,最終都會(huì)受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",如果rollback的次數(shù)過(guò)多,而達(dá)到重發(fā)次數(shù)的上限時(shí),消息將會(huì)被DLQ(dead letter)。
?
? ??INDIVIDUAL_ACKNOWLEDGE :?單條消息確認(rèn),這種確認(rèn)模式,我們很少使用,它的確認(rèn)時(shí)機(jī)和CLIENT_ACKNOWLEDGE幾乎一樣,當(dāng)消息消費(fèi)成功之后,需要調(diào)用message.acknowledege來(lái)確認(rèn)此消息(單條),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法將導(dǎo)致整個(gè)session中所有消息被確認(rèn)(批量確認(rèn))。
?
? ??結(jié)語(yǔ):到目前為止,我們已經(jīng)已經(jīng)簡(jiǎn)單的了解了ActiveMQ中消息傳送機(jī)制,還有JMS中ACK策略,重點(diǎn)分析了optimizeACK的策略,希望開(kāi)發(fā)者能夠在使用activeMQ中避免一些不必要的錯(cuò)誤。本文如有疏漏和錯(cuò)誤之處,請(qǐng)各位不吝賜教,特此感謝。
?
?
轉(zhuǎn)載于:https://my.oschina.net/thinwonton/blog/997715
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ消息传送机制以及ACK机制详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Objective-C基础笔记(3)OC
- 下一篇: oracle 10g/11g 命令对照