activeMQ高并发发送消息异常解决方法
高并發(fā)發(fā)送消息異常解決方法:
現(xiàn)象:使用10個(gè)線程每100ms發(fā)送一條消息,大約3000多條后,出現(xiàn)異常,所有線程停
???????????? 止:?javax.jms.JMSException:Could not connect to broker
URL: tcp://localhost:61616.Reason:java.net.BindException: ????Addressalready in use: connect; nested exception is
java.net.BindException: Address already inuse: connect
原因:創(chuàng)建了太多jms連接沒(méi)有來(lái)得及回收
解決方法:使用jms連接池
原來(lái)的配置:
???????? <bean> ???????? <property name="environment"> ???????? ???????? <props> ?????????????????? ???????? <prop key="java.naming.factory.initial"> org.apache.activemq.jndi.ActiveMQInitialContextFactory </prop> ?????????????????? ???????? <prop key="java.naming.provider.url">tcp://huzq-linux:61616</prop> ???????? ???????? </props> ???????? </property> </bean> <bean> ???????? <property name="jndiName"> <value>ConnectionFactory</value> </property> <property name="jndiTemplate"> <ref local="jndiTemplate"></ref> </property> </bean> ? |
修改為:
<bean> ???????? <property name="connectionFactory"> ?????????????????? <bean> ??????????????????????????? <property name="brokerURL" value="tcp://huzq-linux:61616" /> ?????????????????? </bean> ???????? </property> </bean> ? |
?
解決activemq多消費(fèi)者并發(fā)處理
遇到一個(gè)現(xiàn)象,如果activemq隊(duì)列積壓了數(shù)據(jù)的話,如果在spring中啟動(dòng)listner,只有一個(gè)consumer執(zhí)行,查閱了很多資料,無(wú)果,后來(lái)偶爾通過(guò)activemq的監(jiān)控網(wǎng)頁(yè)看到消費(fèi)者列表中,只有一個(gè)消費(fèi)者有等待處理的數(shù)據(jù),其他都沒(méi)有,如下圖:
由此得知,activemq有一定機(jī)制將隊(duì)列中的數(shù)據(jù)交給consumer處理,這個(gè)機(jī)制就是數(shù)據(jù)的數(shù)量分配,查資料得知,默認(rèn)是1000,因此,把這個(gè)值調(diào)小就可以了。
在客戶端的連接url中,修改為tcp://ipaddr:61616?jms.prefetchPolicy.all=2
這樣基本消費(fèi)者就分配公平了,不會(huì)出現(xiàn)一個(gè)消費(fèi)者忙死,另外的消費(fèi)者閑死了。
為高并發(fā)程序部署ActiveMQ
使用ActiveMQ來(lái)擴(kuò)展你的應(yīng)用程序需要一些時(shí)間并要花一些精力.本節(jié)中我們將介紹三種技術(shù)用于擴(kuò)展應(yīng)用程序.我們將從垂直擴(kuò)展開(kāi)始,這種擴(kuò)展方式中,單個(gè)代理需要處理成千上萬(wàn)的連接和消息隊(duì)列.
接下來(lái)我們將介紹水平擴(kuò)展,這種擴(kuò)展方式需要處理比前一種方式更多的網(wǎng)絡(luò)連接.最后,我們介紹的傳輸負(fù)載分流,可以在擴(kuò)展和性能間得到平衡,但是會(huì)增加ActiveMQ程序的復(fù)雜性.
1.????????垂直擴(kuò)展:
垂直擴(kuò)展是一種用于增加單個(gè)ActiveMQ代理連接數(shù)(因而也增加了負(fù)載能力)的技術(shù).默認(rèn)情況下,ActiveMQ的被設(shè)計(jì)成盡可高效的傳輸消息以確保低延遲和良好的性能.但是,你也可以進(jìn)行一些配置使的ActiveMQ代理可以同時(shí)處理大量并發(fā)的連接以及大量的消息隊(duì)列.
默認(rèn)情況下,ActiveMQ使用阻塞IO來(lái)處理傳輸連接,這種方式為每一個(gè)連接分配一個(gè)線程.你可以為ActiveMQ代理使用非阻塞IO(同時(shí)客戶端可以使用默認(rèn)的傳輸)以減少線程的使用.可以在ActiveMQ的配置文件中通過(guò)傳輸連接器配置非阻塞IO.下面的是配置非阻塞IO的示例
代碼:配置NIO傳輸連接器
<broker> <transportConnectors> <transportConnector name="nio" uri="nio://localhost:61616"/> </<transportConnectors> </broker> |
除了為每個(gè)連接使用一個(gè)線程的阻塞IO,ActiveMQ還可以為每一個(gè)客戶端連接使用一個(gè)消息分發(fā)線程.你可以通過(guò)將系統(tǒng)參數(shù)org.apache.activemq.UseDedicatedTaskRunner設(shè)置為false來(lái)設(shè)置ActiveMQ使用一個(gè)搞線程池.下面是一個(gè)示例:
ACTIVEMQ_OPTS="-Dorg.apache.activemq.UseDedicatedTaskRunner=false" |
確保ActiveMQ代理用于足夠的內(nèi)存來(lái)處理大量的并發(fā)連接,需要分兩步進(jìn)行:
首先,你需要確保運(yùn)行ActiveMQ的JVM在啟動(dòng)之前已經(jīng)配置了足夠的內(nèi)存.可以使用
JVM的-Xmx選項(xiàng)來(lái)配置,如下所示:
ACTIVEMQ_OPTS="-Xmx1024M -Dorg.apache.activemq.UseDedicatedTaskRunner=false" |
其次,需要確保JVM配置了適量的專門(mén)供ActiveMQ代理使用的內(nèi)存.這個(gè)配置可用通過(guò)<system-Usage>?元素的limit屬性來(lái)配置.一個(gè)不錯(cuò)的根據(jù)經(jīng)驗(yàn)得到的規(guī)則時(shí),在連接數(shù)為幾百個(gè)時(shí)配置512MB為最小內(nèi)存.
如果測(cè)試發(fā)現(xiàn)內(nèi)存不夠用,可以增加內(nèi)存配置.你可以按照下面代碼示例來(lái)配置ActiveMQ使用的內(nèi)存限制:
代碼:為ActiveMQ代理設(shè)置內(nèi)存使用限制
<systemUsage> <systemUsage> ? <memoryUsage> <memoryUsage limit="512 mb"/> </memoryUsage> ? <storeUsage> <storeUsage limit="10 gb" name="foo"/> </storeUsage> ? <tempUsage> <tempUsage limit="1 gb"/> </tempUsage> ? </systemUsage> </systemUsage> ? |
同樣,簡(jiǎn)易減少每個(gè)連接的CPU負(fù)載.如果你正使用Open-Wire格式的消息,關(guān)閉tight encoding選項(xiàng),開(kāi)啟該選項(xiàng)會(huì)導(dǎo)致CPU占有過(guò)多.Tight encoding選項(xiàng)可以通過(guò)客戶端連接的URI中的參數(shù)設(shè)置以便關(guān)閉該選項(xiàng).下面是示例代碼:
String uri = "failover://(tcp://localhost:61616?" + wireFormat.tightEncodingEnabled=false)"; ConnectionFactory cf = new ActiveMQConnectionFactory(uri); |
了解了一些擴(kuò)展ActiveMQ代理處理大量連接的調(diào)優(yōu)選項(xiàng)之后,我們?cè)诹私庖恍┳孉ctiveMQ處理大量消息隊(duì)列的調(diào)優(yōu)選項(xiàng).
默認(rèn)的消息隊(duì)列配置中使用一個(gè)獨(dú)立的線程負(fù)責(zé)將消息存儲(chǔ)中的消息提取到消息隊(duì)列中而后再被分發(fā)到對(duì)其感興趣的消息消費(fèi)者.如果有大量的消息隊(duì)列,建議通過(guò)啟用optimizeDispatch這個(gè)屬性
改善這個(gè)特性,示例代碼如下所示:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" optimizedDispatch="true"/> </policyEntries> </policyMap> </destinationPolicy> |
注意,代碼清單中使用通配符>表示該配置會(huì)遞歸的應(yīng)用到所有的消息隊(duì)列中.
為確保擴(kuò)展配置既可以處理大量連接也可以處理海量消息隊(duì)列,請(qǐng)使用JDBC或更新更快的KahaDB消息存儲(chǔ).默認(rèn)情況下ActiveMQ使用KahaDB消息存儲(chǔ).
到目前位置,我們關(guān)注了連接數(shù)擴(kuò)展,減少線程使用以及選擇正確的消息存儲(chǔ).下面的示例配置代碼展示了ActiveMQ配置中為擴(kuò)展進(jìn)行了調(diào)優(yōu):
代碼:為擴(kuò)展進(jìn)行調(diào)優(yōu)的配置示例代碼
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="amq-broker" dataDirectory="${activemq.base}/data"> ? ???????? <persistenceAdapter> ?????????????????? <kahaDB directory="${activemq.base}/data" journalMaxFileLength="32mb"/> ???????? </persistenceAdapter> ? ???????? <destinationPolicy> ?????????????????? <policyMap> ??????????????????????????? <policyEntries> ???????????????????????????????????? <policyEntry queue=">" optimizedDispatch="true"/> ??????????????????????????? </policyEntries> ?????????????????? </policyMap> ???????? </destinationPolicy> ? ???????? <systemUsage> ?????????????????? <systemUsage> ? ??????????????????????????? <memoryUsage> ???????????????????????????????????? <memoryUsage limit="512 mb"/> ??????????????????????????? </memoryUsage> ? ??????????????????????????? <storeUsage> ???????????????????????????????????? <storeUsage limit="10 gb" name="foo"/> ??????????????????????????? </storeUsage> ? ??????????????????????????? <tempUsage> ???????????????????????????????????? <tempUsage limit="1 gb"/> ??????????????????????????? </tempUsage> ? ?????????????????? </systemUsage> ???????? </systemUsage> ? ???????? <transportConnectors> ?????????????????? <transportConnector name="openwire" uri="nio://localhost:61616"/> ???????? </transportConnectors> </broker> |
注意示例代碼中所有為調(diào)優(yōu)而建議的配置條目,這些調(diào)優(yōu)條目在默認(rèn)的配置文件中并沒(méi)有配置,所以請(qǐng)確保給予充分重視.
了解過(guò)如何擴(kuò)展ActiveMQ后,現(xiàn)在是時(shí)候了解使用代理網(wǎng)絡(luò)來(lái)進(jìn)行橫向擴(kuò)展了.
2.????? 橫向擴(kuò)展
除了擴(kuò)展單獨(dú)的代理,你還可以使用代理網(wǎng)絡(luò)來(lái)增加應(yīng)用程序可用的代理數(shù)量.因?yàn)榫W(wǎng)絡(luò)會(huì)自動(dòng)傳遞消息給所有互聯(lián)的具有對(duì)消息感興趣的消息消費(fèi)者的代理,所以你可以配置客戶端連接到一個(gè)代理集群,隨機(jī)的選擇集群中的一個(gè)代理來(lái)連接.可以通過(guò)URI中的參數(shù)來(lái)配置,如下所示:
failover://(tcp://broker1:61616,tcp://broker2:61616)?randomize=true? |
為了確保隊(duì)列或持久化主題中的消息不會(huì)卡在某個(gè)代理上而不能進(jìn)行轉(zhuǎn)發(fā),需要在配置網(wǎng)絡(luò)連接時(shí),將dynamicOnly配置成true并使用小一點(diǎn)的prefetchSize.下面是一個(gè)示例:
<networkConnector uri="static://(tcp://remotehost:61617)" name="bridge" dynamicOnly="true" prefetchSize="1" </networkConnector>? |
示例代理網(wǎng)絡(luò)來(lái)橫向擴(kuò)展并不會(huì)的代理更多的延遲,因?yàn)橄⒃趥魉偷较⑾M(fèi)者在之前會(huì)經(jīng)過(guò)多個(gè)代理.另外一種可選的部署方案可以提供更多的擴(kuò)展性和更好的性能,但是需要在應(yīng)用程序中做更多的計(jì)劃.這種混合的解決方案被稱為傳輸負(fù)載分流(traffic partitioning),這種方案通過(guò)在應(yīng)用程序中分割消息目的地到不同的代理上以完成垂直擴(kuò)展.
3.????? 傳輸負(fù)載分流
客戶端的傳輸負(fù)載分流是一個(gè)垂直和水平混合的負(fù)載分流方案.通常不使用代理網(wǎng)絡(luò),因?yàn)榭蛻舳顺绦驎?huì)決定將哪個(gè)負(fù)載發(fā)送到哪個(gè)(些)代理上.客戶端程序需要維護(hù)多個(gè)JMS連接并且決定哪個(gè)JMS連接應(yīng)該用于那個(gè)消息目的地.
沒(méi)有直接使用網(wǎng)絡(luò)連接的好處是降低了代理見(jiàn)過(guò)量的消息轉(zhuǎn)發(fā).你不需要像在傳統(tǒng)程序中那樣進(jìn)行額外的負(fù)載的均衡處理.到這里,我們已經(jīng)了解了垂直和水平擴(kuò)展以及傳輸負(fù)載分流.你應(yīng)該能夠深刻了解如何使用ActiveMQ來(lái)處理大量的并發(fā)連接和海量的消息目的地的連接了.
Activemq在大流量停出現(xiàn)內(nèi)存耗盡的情況以及解決方案
在大量消息持續(xù)發(fā)送到broker的情況下,當(dāng)broker到消費(fèi)者之間的網(wǎng)絡(luò)滿了以后,broker的消息無(wú)法發(fā)送出去,導(dǎo)致在TransportConnection的dispatchQueue中堆積的消息越來(lái)越多。PendingMessageCursor中的消息不能被及時(shí)消費(fèi),導(dǎo)致broker判斷消費(fèi)者為慢消費(fèi)者。當(dāng)broker的內(nèi)存被耗盡后JVM會(huì)頻繁的進(jìn)行full gc,由于消息不能被回收,所以消息對(duì)象會(huì)從年輕代轉(zhuǎn)移到老年代而不會(huì)釋放內(nèi)存,導(dǎo)致broker幾乎停止對(duì)外服務(wù)。
?
這個(gè)問(wèn)題的根本原因是ActiveMQ只對(duì)接收消息作了流量控制,但是沒(méi)有發(fā)送消息堵塞的情況。需要根據(jù)消息發(fā)送情況來(lái)控制消息的接收。
?
解決方案,在TransportConnection中的dispatchAsync對(duì)dispatchQueue中的消息數(shù)量做判斷,當(dāng)超過(guò)閾值就暫停dispatch,當(dāng)前thread sleep,這樣TopicSubscription就會(huì)暫停接收消息,避免內(nèi)存耗盡
總結(jié)
以上是生活随笔為你收集整理的activeMQ高并发发送消息异常解决方法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: activeMQ的三种通讯模式
- 下一篇: activemq的使用经验