日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

activeMQ高并发发送消息异常解决方法

發(fā)布時(shí)間:2024/4/15 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 activeMQ高并发发送消息异常解决方法 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

高并發(fā)發(fā)送消息異常解決方法:

現(xiàn)象:使用10個(gè)線程每100ms發(fā)送一條消息,大約3000多條后,出現(xiàn)異常,所有線程停

???????????? 止:?javax.jms.JMSException:Could not connect to broker

URL: tcp://localhost:61616.Reason:java.net.BindException: ????Addressalready in use: connect; nested exception is

java.net.BindException: Address already inuse: connect

原因:創(chuàng)建了太多jms連接沒(méi)有來(lái)得及回收

解決方法:使用jms連接池

原來(lái)的配置:

???????? <bean>

???????? <property name="environment">

???????? ???????? <props>

?????????????????? ???????? <prop key="java.naming.factory.initial">

org.apache.activemq.jndi.ActiveMQInitialContextFactory

</prop>

?????????????????? ???????? <prop key="java.naming.provider.url">tcp://huzq-linux:61616</prop>

???????? ???????? </props>

???????? </property>

</bean>

<bean>

???????? <property name="jndiName">

<value>ConnectionFactory</value>

</property>

<property name="jndiTemplate">

<ref local="jndiTemplate"></ref>

</property>

</bean>

?

修改為:

<bean>

???????? <property name="connectionFactory">

?????????????????? <bean>

??????????????????????????? <property name="brokerURL" value="tcp://huzq-linux:61616" />

?????????????????? </bean>

???????? </property>

</bean>

?

?

解決activemq多消費(fèi)者并發(fā)處理

遇到一個(gè)現(xiàn)象,如果activemq隊(duì)列積壓了數(shù)據(jù)的話,如果在spring中啟動(dòng)listner,只有一個(gè)consumer執(zhí)行,查閱了很多資料,無(wú)果,后來(lái)偶爾通過(guò)activemq的監(jiān)控網(wǎng)頁(yè)看到消費(fèi)者列表中,只有一個(gè)消費(fèi)者有等待處理的數(shù)據(jù),其他都沒(méi)有,如下圖:


由此得知,activemq有一定機(jī)制將隊(duì)列中的數(shù)據(jù)交給consumer處理,這個(gè)機(jī)制就是數(shù)據(jù)的數(shù)量分配,查資料得知,默認(rèn)是1000,因此,把這個(gè)值調(diào)小就可以了。

在客戶端的連接url中,修改為tcp://ipaddr:61616?jms.prefetchPolicy.all=2

這樣基本消費(fèi)者就分配公平了,不會(huì)出現(xiàn)一個(gè)消費(fèi)者忙死,另外的消費(fèi)者閑死了。

為高并發(fā)程序部署ActiveMQ

使用ActiveMQ來(lái)擴(kuò)展你的應(yīng)用程序需要一些時(shí)間并要花一些精力.本節(jié)中我們將介紹三種技術(shù)用于擴(kuò)展應(yīng)用程序.我們將從垂直擴(kuò)展開(kāi)始,這種擴(kuò)展方式中,單個(gè)代理需要處理成千上萬(wàn)的連接和消息隊(duì)列.

接下來(lái)我們將介紹水平擴(kuò)展,這種擴(kuò)展方式需要處理比前一種方式更多的網(wǎng)絡(luò)連接.最后,我們介紹的傳輸負(fù)載分流,可以在擴(kuò)展和性能間得到平衡,但是會(huì)增加ActiveMQ程序的復(fù)雜性.

1.????????垂直擴(kuò)展:

垂直擴(kuò)展是一種用于增加單個(gè)ActiveMQ代理連接數(shù)(因而也增加了負(fù)載能力)的技術(shù).默認(rèn)情況下,ActiveMQ的被設(shè)計(jì)成盡可高效的傳輸消息以確保低延遲和良好的性能.但是,你也可以進(jìn)行一些配置使的ActiveMQ代理可以同時(shí)處理大量并發(fā)的連接以及大量的消息隊(duì)列.

默認(rèn)情況下,ActiveMQ使用阻塞IO來(lái)處理傳輸連接,這種方式為每一個(gè)連接分配一個(gè)線程.你可以為ActiveMQ代理使用非阻塞IO(同時(shí)客戶端可以使用默認(rèn)的傳輸)以減少線程的使用.可以在ActiveMQ的配置文件中通過(guò)傳輸連接器配置非阻塞IO.下面的是配置非阻塞IO的示例

代碼:配置NIO傳輸連接器

<broker>

<transportConnectors>

<transportConnector name="nio" uri="nio://localhost:61616"/>

</<transportConnectors>

</broker>

除了為每個(gè)連接使用一個(gè)線程的阻塞IO,ActiveMQ還可以為每一個(gè)客戶端連接使用一個(gè)消息分發(fā)線程.你可以通過(guò)將系統(tǒng)參數(shù)org.apache.activemq.UseDedicatedTaskRunner設(shè)置為false來(lái)設(shè)置ActiveMQ使用一個(gè)搞線程池.下面是一個(gè)示例:

ACTIVEMQ_OPTS="-Dorg.apache.activemq.UseDedicatedTaskRunner=false"

確保ActiveMQ代理用于足夠的內(nèi)存來(lái)處理大量的并發(fā)連接,需要分兩步進(jìn)行:

首先,你需要確保運(yùn)行ActiveMQ的JVM在啟動(dòng)之前已經(jīng)配置了足夠的內(nèi)存.可以使用

JVM的-Xmx選項(xiàng)來(lái)配置,如下所示:

ACTIVEMQ_OPTS="-Xmx1024M -Dorg.apache.activemq.UseDedicatedTaskRunner=false"

其次,需要確保JVM配置了適量的專門(mén)供ActiveMQ代理使用的內(nèi)存.這個(gè)配置可用通過(guò)<system-Usage>?元素的limit屬性來(lái)配置.一個(gè)不錯(cuò)的根據(jù)經(jīng)驗(yàn)得到的規(guī)則時(shí),在連接數(shù)為幾百個(gè)時(shí)配置512MB為最小內(nèi)存.

如果測(cè)試發(fā)現(xiàn)內(nèi)存不夠用,可以增加內(nèi)存配置.你可以按照下面代碼示例來(lái)配置ActiveMQ使用的內(nèi)存限制:

代碼:為ActiveMQ代理設(shè)置內(nèi)存使用限制

<systemUsage>

<systemUsage>

?

<memoryUsage>

<memoryUsage limit="512 mb"/>

</memoryUsage>

?

<storeUsage>

<storeUsage limit="10 gb" name="foo"/>

</storeUsage>

?

<tempUsage>

<tempUsage limit="1 gb"/>

</tempUsage>

?

</systemUsage>

</systemUsage>

?

同樣,簡(jiǎn)易減少每個(gè)連接的CPU負(fù)載.如果你正使用Open-Wire格式的消息,關(guān)閉tight encoding選項(xiàng),開(kāi)啟該選項(xiàng)會(huì)導(dǎo)致CPU占有過(guò)多.Tight encoding選項(xiàng)可以通過(guò)客戶端連接的URI中的參數(shù)設(shè)置以便關(guān)閉該選項(xiàng).下面是示例代碼:

String uri = "failover://(tcp://localhost:61616?" + wireFormat.tightEncodingEnabled=false)";

ConnectionFactory cf = new ActiveMQConnectionFactory(uri);

了解了一些擴(kuò)展ActiveMQ代理處理大量連接的調(diào)優(yōu)選項(xiàng)之后,我們?cè)诹私庖恍┳孉ctiveMQ處理大量消息隊(duì)列的調(diào)優(yōu)選項(xiàng).

默認(rèn)的消息隊(duì)列配置中使用一個(gè)獨(dú)立的線程負(fù)責(zé)將消息存儲(chǔ)中的消息提取到消息隊(duì)列中而后再被分發(fā)到對(duì)其感興趣的消息消費(fèi)者.如果有大量的消息隊(duì)列,建議通過(guò)啟用optimizeDispatch這個(gè)屬性

改善這個(gè)特性,示例代碼如下所示:

<destinationPolicy>

<policyMap>

<policyEntries>

<policyEntry queue=">" optimizedDispatch="true"/>

</policyEntries>

</policyMap>

</destinationPolicy>

注意,代碼清單中使用通配符>表示該配置會(huì)遞歸的應(yīng)用到所有的消息隊(duì)列中.

為確保擴(kuò)展配置既可以處理大量連接也可以處理海量消息隊(duì)列,請(qǐng)使用JDBC或更新更快的KahaDB消息存儲(chǔ).默認(rèn)情況下ActiveMQ使用KahaDB消息存儲(chǔ).

到目前位置,我們關(guān)注了連接數(shù)擴(kuò)展,減少線程使用以及選擇正確的消息存儲(chǔ).下面的示例配置代碼展示了ActiveMQ配置中為擴(kuò)展進(jìn)行了調(diào)優(yōu):

代碼:為擴(kuò)展進(jìn)行調(diào)優(yōu)的配置示例代碼

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="amq-broker" dataDirectory="${activemq.base}/data">

?

???????? <persistenceAdapter>

?????????????????? <kahaDB directory="${activemq.base}/data" journalMaxFileLength="32mb"/>

???????? </persistenceAdapter>

?

???????? <destinationPolicy>

?????????????????? <policyMap>

??????????????????????????? <policyEntries>

???????????????????????????????????? <policyEntry queue="&gt;" optimizedDispatch="true"/>

??????????????????????????? </policyEntries>

?????????????????? </policyMap>

???????? </destinationPolicy>

?

???????? <systemUsage>

?????????????????? <systemUsage>

?

??????????????????????????? <memoryUsage>

???????????????????????????????????? <memoryUsage limit="512 mb"/>

??????????????????????????? </memoryUsage>

?

??????????????????????????? <storeUsage>

???????????????????????????????????? <storeUsage limit="10 gb" name="foo"/>

??????????????????????????? </storeUsage>

?

??????????????????????????? <tempUsage>

???????????????????????????????????? <tempUsage limit="1 gb"/>

??????????????????????????? </tempUsage>

?

?????????????????? </systemUsage>

???????? </systemUsage>

?

???????? <transportConnectors>

?????????????????? <transportConnector name="openwire" uri="nio://localhost:61616"/>

???????? </transportConnectors>

</broker>

注意示例代碼中所有為調(diào)優(yōu)而建議的配置條目,這些調(diào)優(yōu)條目在默認(rèn)的配置文件中并沒(méi)有配置,所以請(qǐng)確保給予充分重視.

了解過(guò)如何擴(kuò)展ActiveMQ后,現(xiàn)在是時(shí)候了解使用代理網(wǎng)絡(luò)來(lái)進(jìn)行橫向擴(kuò)展了.

2.????? 橫向擴(kuò)展

除了擴(kuò)展單獨(dú)的代理,你還可以使用代理網(wǎng)絡(luò)來(lái)增加應(yīng)用程序可用的代理數(shù)量.因?yàn)榫W(wǎng)絡(luò)會(huì)自動(dòng)傳遞消息給所有互聯(lián)的具有對(duì)消息感興趣的消息消費(fèi)者的代理,所以你可以配置客戶端連接到一個(gè)代理集群,隨機(jī)的選擇集群中的一個(gè)代理來(lái)連接.可以通過(guò)URI中的參數(shù)來(lái)配置,如下所示:

failover://(tcp://broker1:61616,tcp://broker2:61616)?randomize=true?

為了確保隊(duì)列或持久化主題中的消息不會(huì)卡在某個(gè)代理上而不能進(jìn)行轉(zhuǎn)發(fā),需要在配置網(wǎng)絡(luò)連接時(shí),將dynamicOnly配置成true并使用小一點(diǎn)的prefetchSize.下面是一個(gè)示例:

<networkConnector uri="static://(tcp://remotehost:61617)"

name="bridge" dynamicOnly="true" prefetchSize="1"

</networkConnector>?

示例代理網(wǎng)絡(luò)來(lái)橫向擴(kuò)展并不會(huì)的代理更多的延遲,因?yàn)橄⒃趥魉偷较⑾M(fèi)者在之前會(huì)經(jīng)過(guò)多個(gè)代理.另外一種可選的部署方案可以提供更多的擴(kuò)展性和更好的性能,但是需要在應(yīng)用程序中做更多的計(jì)劃.這種混合的解決方案被稱為傳輸負(fù)載分流(traffic partitioning),這種方案通過(guò)在應(yīng)用程序中分割消息目的地到不同的代理上以完成垂直擴(kuò)展.

3.????? 傳輸負(fù)載分流

客戶端的傳輸負(fù)載分流是一個(gè)垂直和水平混合的負(fù)載分流方案.通常不使用代理網(wǎng)絡(luò),因?yàn)榭蛻舳顺绦驎?huì)決定將哪個(gè)負(fù)載發(fā)送到哪個(gè)(些)代理上.客戶端程序需要維護(hù)多個(gè)JMS連接并且決定哪個(gè)JMS連接應(yīng)該用于那個(gè)消息目的地.

沒(méi)有直接使用網(wǎng)絡(luò)連接的好處是降低了代理見(jiàn)過(guò)量的消息轉(zhuǎn)發(fā).你不需要像在傳統(tǒng)程序中那樣進(jìn)行額外的負(fù)載的均衡處理.到這里,我們已經(jīng)了解了垂直和水平擴(kuò)展以及傳輸負(fù)載分流.你應(yīng)該能夠深刻了解如何使用ActiveMQ來(lái)處理大量的并發(fā)連接和海量的消息目的地的連接了.

Activemq在大流量停出現(xiàn)內(nèi)存耗盡的情況以及解決方案

在大量消息持續(xù)發(fā)送到broker的情況下,當(dāng)broker到消費(fèi)者之間的網(wǎng)絡(luò)滿了以后,broker的消息無(wú)法發(fā)送出去,導(dǎo)致在TransportConnection的dispatchQueue中堆積的消息越來(lái)越多。PendingMessageCursor中的消息不能被及時(shí)消費(fèi),導(dǎo)致broker判斷消費(fèi)者為慢消費(fèi)者。當(dāng)broker的內(nèi)存被耗盡后JVM會(huì)頻繁的進(jìn)行full gc,由于消息不能被回收,所以消息對(duì)象會(huì)從年輕代轉(zhuǎn)移到老年代而不會(huì)釋放內(nèi)存,導(dǎo)致broker幾乎停止對(duì)外服務(wù)。

?

這個(gè)問(wèn)題的根本原因是ActiveMQ只對(duì)接收消息作了流量控制,但是沒(méi)有發(fā)送消息堵塞的情況。需要根據(jù)消息發(fā)送情況來(lái)控制消息的接收。

?

解決方案,在TransportConnection中的dispatchAsync對(duì)dispatchQueue中的消息數(shù)量做判斷,當(dāng)超過(guò)閾值就暫停dispatch,當(dāng)前thread sleep,這樣TopicSubscription就會(huì)暫停接收消息,避免內(nèi)存耗盡

總結(jié)

以上是生活随笔為你收集整理的activeMQ高并发发送消息异常解决方法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。