activeMQ高并发发送消息异常解决方法
高并發發送消息異常解決方法:
現象:使用10個線程每100ms發送一條消息,大約3000多條后,出現異常,所有線程停
???????????? 止:?javax.jms.JMSException:Could not connect to broker
URL: tcp://localhost:61616.Reason:java.net.BindException: ????Addressalready in use: connect; nested exception is
java.net.BindException: Address already inuse: connect
原因:創建了太多jms連接沒有來得及回收
解決方法:使用jms連接池
原來的配置:
???????? <bean> ???????? <property name="environment"> ???????? ???????? <props> ?????????????????? ???????? <prop key="java.naming.factory.initial"> org.apache.activemq.jndi.ActiveMQInitialContextFactory </prop> ?????????????????? ???????? <prop key="java.naming.provider.url">tcp://huzq-linux:61616</prop> ???????? ???????? </props> ???????? </property> </bean> <bean> ???????? <property name="jndiName"> <value>ConnectionFactory</value> </property> <property name="jndiTemplate"> <ref local="jndiTemplate"></ref> </property> </bean> ? |
修改為:
<bean> ???????? <property name="connectionFactory"> ?????????????????? <bean> ??????????????????????????? <property name="brokerURL" value="tcp://huzq-linux:61616" /> ?????????????????? </bean> ???????? </property> </bean> ? |
?
解決activemq多消費者并發處理
遇到一個現象,如果activemq隊列積壓了數據的話,如果在spring中啟動listner,只有一個consumer執行,查閱了很多資料,無果,后來偶爾通過activemq的監控網頁看到消費者列表中,只有一個消費者有等待處理的數據,其他都沒有,如下圖:
由此得知,activemq有一定機制將隊列中的數據交給consumer處理,這個機制就是數據的數量分配,查資料得知,默認是1000,因此,把這個值調小就可以了。
在客戶端的連接url中,修改為tcp://ipaddr:61616?jms.prefetchPolicy.all=2
這樣基本消費者就分配公平了,不會出現一個消費者忙死,另外的消費者閑死了。
為高并發程序部署ActiveMQ
使用ActiveMQ來擴展你的應用程序需要一些時間并要花一些精力.本節中我們將介紹三種技術用于擴展應用程序.我們將從垂直擴展開始,這種擴展方式中,單個代理需要處理成千上萬的連接和消息隊列.
接下來我們將介紹水平擴展,這種擴展方式需要處理比前一種方式更多的網絡連接.最后,我們介紹的傳輸負載分流,可以在擴展和性能間得到平衡,但是會增加ActiveMQ程序的復雜性.
1.????????垂直擴展:
垂直擴展是一種用于增加單個ActiveMQ代理連接數(因而也增加了負載能力)的技術.默認情況下,ActiveMQ的被設計成盡可高效的傳輸消息以確保低延遲和良好的性能.但是,你也可以進行一些配置使的ActiveMQ代理可以同時處理大量并發的連接以及大量的消息隊列.
默認情況下,ActiveMQ使用阻塞IO來處理傳輸連接,這種方式為每一個連接分配一個線程.你可以為ActiveMQ代理使用非阻塞IO(同時客戶端可以使用默認的傳輸)以減少線程的使用.可以在ActiveMQ的配置文件中通過傳輸連接器配置非阻塞IO.下面的是配置非阻塞IO的示例
代碼:配置NIO傳輸連接器
<broker> <transportConnectors> <transportConnector name="nio" uri="nio://localhost:61616"/> </<transportConnectors> </broker> |
除了為每個連接使用一個線程的阻塞IO,ActiveMQ還可以為每一個客戶端連接使用一個消息分發線程.你可以通過將系統參數org.apache.activemq.UseDedicatedTaskRunner設置為false來設置ActiveMQ使用一個搞線程池.下面是一個示例:
ACTIVEMQ_OPTS="-Dorg.apache.activemq.UseDedicatedTaskRunner=false" |
確保ActiveMQ代理用于足夠的內存來處理大量的并發連接,需要分兩步進行:
首先,你需要確保運行ActiveMQ的JVM在啟動之前已經配置了足夠的內存.可以使用
JVM的-Xmx選項來配置,如下所示:
ACTIVEMQ_OPTS="-Xmx1024M -Dorg.apache.activemq.UseDedicatedTaskRunner=false" |
其次,需要確保JVM配置了適量的專門供ActiveMQ代理使用的內存.這個配置可用通過<system-Usage>?元素的limit屬性來配置.一個不錯的根據經驗得到的規則時,在連接數為幾百個時配置512MB為最小內存.
如果測試發現內存不夠用,可以增加內存配置.你可以按照下面代碼示例來配置ActiveMQ使用的內存限制:
代碼:為ActiveMQ代理設置內存使用限制
<systemUsage> <systemUsage> ? <memoryUsage> <memoryUsage limit="512 mb"/> </memoryUsage> ? <storeUsage> <storeUsage limit="10 gb" name="foo"/> </storeUsage> ? <tempUsage> <tempUsage limit="1 gb"/> </tempUsage> ? </systemUsage> </systemUsage> ? |
同樣,簡易減少每個連接的CPU負載.如果你正使用Open-Wire格式的消息,關閉tight encoding選項,開啟該選項會導致CPU占有過多.Tight encoding選項可以通過客戶端連接的URI中的參數設置以便關閉該選項.下面是示例代碼:
String uri = "failover://(tcp://localhost:61616?" + wireFormat.tightEncodingEnabled=false)"; ConnectionFactory cf = new ActiveMQConnectionFactory(uri); |
了解了一些擴展ActiveMQ代理處理大量連接的調優選項之后,我們在了解一些讓ActiveMQ處理大量消息隊列的調優選項.
默認的消息隊列配置中使用一個獨立的線程負責將消息存儲中的消息提取到消息隊列中而后再被分發到對其感興趣的消息消費者.如果有大量的消息隊列,建議通過啟用optimizeDispatch這個屬性
改善這個特性,示例代碼如下所示:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" optimizedDispatch="true"/> </policyEntries> </policyMap> </destinationPolicy> |
注意,代碼清單中使用通配符>表示該配置會遞歸的應用到所有的消息隊列中.
為確保擴展配置既可以處理大量連接也可以處理海量消息隊列,請使用JDBC或更新更快的KahaDB消息存儲.默認情況下ActiveMQ使用KahaDB消息存儲.
到目前位置,我們關注了連接數擴展,減少線程使用以及選擇正確的消息存儲.下面的示例配置代碼展示了ActiveMQ配置中為擴展進行了調優:
代碼:為擴展進行調優的配置示例代碼
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="amq-broker" dataDirectory="${activemq.base}/data"> ? ???????? <persistenceAdapter> ?????????????????? <kahaDB directory="${activemq.base}/data" journalMaxFileLength="32mb"/> ???????? </persistenceAdapter> ? ???????? <destinationPolicy> ?????????????????? <policyMap> ??????????????????????????? <policyEntries> ???????????????????????????????????? <policyEntry queue=">" optimizedDispatch="true"/> ??????????????????????????? </policyEntries> ?????????????????? </policyMap> ???????? </destinationPolicy> ? ???????? <systemUsage> ?????????????????? <systemUsage> ? ??????????????????????????? <memoryUsage> ???????????????????????????????????? <memoryUsage limit="512 mb"/> ??????????????????????????? </memoryUsage> ? ??????????????????????????? <storeUsage> ???????????????????????????????????? <storeUsage limit="10 gb" name="foo"/> ??????????????????????????? </storeUsage> ? ??????????????????????????? <tempUsage> ???????????????????????????????????? <tempUsage limit="1 gb"/> ??????????????????????????? </tempUsage> ? ?????????????????? </systemUsage> ???????? </systemUsage> ? ???????? <transportConnectors> ?????????????????? <transportConnector name="openwire" uri="nio://localhost:61616"/> ???????? </transportConnectors> </broker> |
注意示例代碼中所有為調優而建議的配置條目,這些調優條目在默認的配置文件中并沒有配置,所以請確保給予充分重視.
了解過如何擴展ActiveMQ后,現在是時候了解使用代理網絡來進行橫向擴展了.
2.????? 橫向擴展
除了擴展單獨的代理,你還可以使用代理網絡來增加應用程序可用的代理數量.因為網絡會自動傳遞消息給所有互聯的具有對消息感興趣的消息消費者的代理,所以你可以配置客戶端連接到一個代理集群,隨機的選擇集群中的一個代理來連接.可以通過URI中的參數來配置,如下所示:
failover://(tcp://broker1:61616,tcp://broker2:61616)?randomize=true? |
為了確保隊列或持久化主題中的消息不會卡在某個代理上而不能進行轉發,需要在配置網絡連接時,將dynamicOnly配置成true并使用小一點的prefetchSize.下面是一個示例:
<networkConnector uri="static://(tcp://remotehost:61617)" name="bridge" dynamicOnly="true" prefetchSize="1" </networkConnector>? |
示例代理網絡來橫向擴展并不會的代理更多的延遲,因為消息在傳送到消息消費者在之前會經過多個代理.另外一種可選的部署方案可以提供更多的擴展性和更好的性能,但是需要在應用程序中做更多的計劃.這種混合的解決方案被稱為傳輸負載分流(traffic partitioning),這種方案通過在應用程序中分割消息目的地到不同的代理上以完成垂直擴展.
3.????? 傳輸負載分流
客戶端的傳輸負載分流是一個垂直和水平混合的負載分流方案.通常不使用代理網絡,因為客戶端程序會決定將哪個負載發送到哪個(些)代理上.客戶端程序需要維護多個JMS連接并且決定哪個JMS連接應該用于那個消息目的地.
沒有直接使用網絡連接的好處是降低了代理見過量的消息轉發.你不需要像在傳統程序中那樣進行額外的負載的均衡處理.到這里,我們已經了解了垂直和水平擴展以及傳輸負載分流.你應該能夠深刻了解如何使用ActiveMQ來處理大量的并發連接和海量的消息目的地的連接了.
Activemq在大流量停出現內存耗盡的情況以及解決方案
在大量消息持續發送到broker的情況下,當broker到消費者之間的網絡滿了以后,broker的消息無法發送出去,導致在TransportConnection的dispatchQueue中堆積的消息越來越多。PendingMessageCursor中的消息不能被及時消費,導致broker判斷消費者為慢消費者。當broker的內存被耗盡后JVM會頻繁的進行full gc,由于消息不能被回收,所以消息對象會從年輕代轉移到老年代而不會釋放內存,導致broker幾乎停止對外服務。
?
這個問題的根本原因是ActiveMQ只對接收消息作了流量控制,但是沒有發送消息堵塞的情況。需要根據消息發送情況來控制消息的接收。
?
解決方案,在TransportConnection中的dispatchAsync對dispatchQueue中的消息數量做判斷,當超過閾值就暫停dispatch,當前thread sleep,這樣TopicSubscription就會暫停接收消息,避免內存耗盡
總結
以上是生活随笔為你收集整理的activeMQ高并发发送消息异常解决方法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: activeMQ的三种通讯模式
- 下一篇: activemq的使用经验