ActiveMQ的queue以及topic两种消息处理机制分析
轉(zhuǎn)自: http://itindex.net/detail/50057-activemq-queue-topic
?
上一期介紹了我們項(xiàng)目要用到activeMQ來(lái)作為jms總線,并且給大家介紹了activeMQ的集群和高可用部署方案,本期給大家再介紹下,如何根據(jù)自己的項(xiàng)目需求,更好地使用activeMQ的兩種消息處理模式。
?
???????
?
1??? queue與topic的技術(shù)特點(diǎn)對(duì)比
?
???????????? topic??????????????????????????????????????????????????????????????????? queue
?
| ????? | ????? topic | |
| 概要 | Publish Subscribe messaging 發(fā)布訂閱消息 | Point-to-Point 點(diǎn)對(duì)點(diǎn) |
| 有無(wú)狀態(tài) | topic數(shù)據(jù)默認(rèn)不落地,是無(wú)狀態(tài)的。 | Queue數(shù)據(jù)默認(rèn)會(huì)在mq服務(wù)器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲(chǔ)。 |
| 完整性保障 | 并不保證publisher發(fā)布的每條數(shù)據(jù),Subscriber都能接受到。 | Queue保證每條數(shù)據(jù)都能被receiver接收。 |
| 消息是否會(huì)丟失 | 一般來(lái)說(shuō)publisher發(fā)布消息到某一個(gè)topic時(shí),只有正在監(jiān)聽(tīng)該topic地址的sub能夠接收到消息;如果沒(méi)有sub在監(jiān)聽(tīng),該topic就丟失了。 | Sender發(fā)送消息到目標(biāo)Queue,receiver可以異步接收這個(gè)Queue上的消息。Queue上的消息如果暫時(shí)沒(méi)有receiver來(lái)取,也不會(huì)丟失。 |
| 消息發(fā)布接收策略 | 一對(duì)多的消息發(fā)布接收策略,監(jiān)聽(tīng)同一個(gè)topic地址的多個(gè)sub都能收到publisher發(fā)送的消息。Sub接收完通知mq服務(wù)器 | 一對(duì)一的消息發(fā)布接收策略,一個(gè)sender發(fā)送的消息,只能有一個(gè)receiver接收。receiver接收完后,通知mq服務(wù)器已接收,mq服務(wù)器對(duì)queue里的消息采取刪除或其他操作。 |
?
????????? Topic和queue的最大區(qū)別在于topic是以廣播的形式,通知所有在線監(jiān)聽(tīng)的客戶端有新的消息,沒(méi)有監(jiān)聽(tīng)的客戶端將收不到消息;而queue則是以點(diǎn)對(duì)點(diǎn)的形式通知多個(gè)處于監(jiān)聽(tīng)狀態(tài)的客戶端中的一個(gè)。
?
?
?
2??? topic和queue方式的消息處理效率比較
?
??????? 通過(guò)增加監(jiān)聽(tīng)客戶端的并發(fā)數(shù)來(lái)驗(yàn)證,topic的消息推送,是否會(huì)因?yàn)楸O(jiān)聽(tīng)客戶端的并發(fā)上升而出現(xiàn)明顯的下降,測(cè)試環(huán)境的服務(wù)器為ci環(huán)境的ActiveMQ,客戶端為我的本機(jī)。
?
??????? 從實(shí)測(cè)的結(jié)果來(lái)看,topic方式發(fā)送的消息,發(fā)送和接收的效率,在一個(gè)訂閱者和100個(gè)訂閱者的前提下沒(méi)有明顯差異,但在500個(gè)訂閱者(線程)并發(fā)的 前提下,效率差異很明顯(由于500線程并發(fā)的情況下,我本機(jī)的cpu占用率已高達(dá)70-90%,所以無(wú)法確認(rèn)是我本機(jī)測(cè)試造成的性能瓶頸還是topic 消息發(fā)送方式存在性能瓶頸,造成效率下降如此明顯)。
?
??????? Topic方式發(fā)送的消息與queue方式發(fā)送的消息,發(fā)送和接收的效率,在一個(gè)訂閱者和100個(gè)訂閱者的前提下沒(méi)有明顯差異,但在500個(gè)訂閱者并發(fā)的前提下,topic方式的效率明顯低于queue。
?
??????? Queue方式發(fā)送的消息,在一個(gè)訂閱者、100個(gè)訂閱者和500個(gè)訂閱者的前提下,發(fā)送和接收的效率沒(méi)有明顯變化。
?
Topic實(shí)測(cè)數(shù)據(jù):
?
?
?
|
| 發(fā)送者發(fā)送的消息總數(shù) | 所有訂閱者接收到消息的總數(shù) | 消息發(fā)送和接收平均耗時(shí) |
| 單訂閱者 | 100 | 100 | 101ms |
| 100訂閱者 | 100 | 10000 | 103ms |
| 500訂閱者 | 100 | 50000 | 14162ms |
?
?
?
Queue實(shí)測(cè)數(shù)據(jù):
?
?
?
|
| 發(fā)送者發(fā)送的消息總數(shù) | 所有訂閱者接收到消息的總數(shù) | 消息發(fā)送和接收平均耗時(shí) |
| 單訂閱者 | 100 | 100 | 96ms |
| 100訂閱者 | 100 | 100 | 96ms |
| 500訂閱者 | 100 | 100 | 100ms |
?
?
?
3???? topic方式的消息處理示例
?
3.1???? 通過(guò)客戶端代碼調(diào)用來(lái)發(fā)送一個(gè)topic的消息:
?
import javax.jms.Connection;
?
import javax.jms.ConnectionFactory;
?
import javax.jms.DeliveryMode;
?
import javax.jms.Destination;
?
import javax.jms.MessageProducer;
?
import javax.jms.Session;
?
import javax.jms.TextMessage;
?
?
?
import org.apache.activemq.ActiveMQConnection;
?
import org.apache.activemq.ActiveMQConnectionFactory;
?
?
?
publicclass SendTopic {
?
??? privatestaticfinalint SEND_NUMBER = 5;
?
??? publicstaticvoid sendMessage(Session session, MessageProducer producer)
?
??????????? throws Exception {
?
?? ????? for ( int i = 1; i <= SEND_NUMBER; i++) {
?
??????????? TextMessage message = session
?
??????????????????? .createTextMessage("ActiveMq發(fā)送的消息" + i);
?
??????????? //發(fā)送消息到目的地方
?
??????????? System. out.println("發(fā)送消息:" + "ActiveMq 發(fā)送的消息" + i);
?
??????????? producer.send(message);
?
??????? }
?
??? }
?
???
?
??? publicstaticvoid main(String[] args) {
?
??????? // ConnectionFactory:連接工廠,JMS用它創(chuàng)建連接
?
??????? ConnectionFactory connectionFactory;
?
??????? // Connection:JMS客戶端到JMS Provider的連接
?
??????? Connection connection = null;
?
??????? // Session:一個(gè)發(fā)送或接收消息的線程
?
??????? Session session;
?
??????? // Destination:消息的目的地;消息發(fā)送給誰(shuí).
?
??????? Destination destination;
?
??????? // MessageProducer:消息發(fā)送者
?
??????? MessageProducer producer;
?
??????? // TextMessage message;
?
??????? //構(gòu)造ConnectionFactory實(shí)例對(duì)象,此處采用ActiveMq的實(shí)現(xiàn)jar
?
??????? connectionFactory = new ActiveMQConnectionFactory(
?
??????????????? ActiveMQConnection. DEFAULT_USER,
?
??????????????? ActiveMQConnection. DEFAULT_PASSWORD,
?
??????????????? "tcp://10.20.8.198:61616");
?
??????? try {
?
??????????? //構(gòu)造從工廠得到連接對(duì)象
?
??????????? connection = connectionFactory.createConnection();
?
??????????? //啟動(dòng)
?
??????????? connection.start();
?
??????????? //獲取操作連接
?
??????????? session = connection.createSession( true, Session. AUTO_ACKNOWLEDGE);
?
??????????? //獲取session注意參數(shù)值FirstTopic是一個(gè)服務(wù)器的topic(與queue消息的發(fā)送相比,這里是唯一的不同)
?
??????????? destination = session.createTopic("FirstTopic");
?
??????? ????//得到消息生成者【發(fā)送者】
?
??????????? producer = session.createProducer(destination);
?
??????????? //設(shè)置不持久化,此處學(xué)習(xí),實(shí)際根據(jù)項(xiàng)目決定
?
??????????? producer.setDeliveryMode(DeliveryMode. PERSISTENT);
?
??????????? //構(gòu)造消息,此處寫死,項(xiàng)目就是參數(shù),或者方法獲取
?
??????????? sendMessage(session, producer);
?
??????????? session.commit();
?
??????? } catch (Exception e) {
?
??????????? e.printStackTrace();
?
??????? } finally {
?
??????????? try {
?
??????????????? if ( null != connection)
?
??????????????????? connection.close();
?
??????????? } catch (Throwable ignore) {
?
??????????? }
?
??????? }
?
??? }
?
}
?
?
?
3.2???? 啟動(dòng)多個(gè)客戶端監(jiān)聽(tīng)來(lái)接收topic的消息:
?
publicclass ReceiveTopic implements Runnable {
?
????? private StringthreadName;
?
?
?
????? ReceiveTopic(String threadName) {
?
?????????? this.threadName = threadName;
?
????? }
?
?
?
????? publicvoid run() {
?
?????????? // ConnectionFactory:連接工廠,JMS用它創(chuàng)建連接
?
?????????? ConnectionFactory connectionFactory;
?
?????????? // Connection:JMS客戶端到JMS Provider的連接
?
?????????? Connection connection = null;
?
?????????? // Session:一個(gè)發(fā)送或接收消息的線程
?
?????????? Session session;
?
?????????? // Destination:消息的目的地;消息發(fā)送給誰(shuí).
?
?????????? Destination destination;
?
?????????? //消費(fèi)者,消息接收者
?
?????????? MessageConsumer consumer;
?
?????????? connectionFactory = new ActiveMQConnectionFactory(
?
????????????????????? ActiveMQConnection. DEFAULT_USER,
?
????????????????????? ActiveMQConnection. DEFAULT_PASSWORD,"tcp://10.20.8.198:61616");
?
?????????? try {
?
???????????????? //構(gòu)造從工廠得到連接對(duì)象
?
???????????????? connection = connectionFactory.createConnection();
?
???????????????? //啟動(dòng)
?
???????????????? connection.start();
?
???????????????? //獲取操作連接,默認(rèn)自動(dòng)向服務(wù)器發(fā)送接收成功的響應(yīng)
?
???????????????? session = connection.createSession( false, Session. AUTO_ACKNOWLEDGE);
?
???????????????? //獲取session注意參數(shù)值FirstTopic是一個(gè)服務(wù)器的topic
?
???????????????? destination = session.createTopic("FirstTopic");
?
???????????????? consumer = session.createConsumer(destination);
?
???????????????? while ( true) {
?
????????????????????? //設(shè)置接收者接收消息的時(shí)間,為了便于測(cè)試,這里設(shè)定為100s
?
????????????????????? TextMessage message = (TextMessage) consumer
?
????????????????????????????????? .receive(100 * 1000);
?
????????????????????? if ( null != message) {
?
??????????????????????????? System. out.println("線程"+threadName+"收到消息:" + message.getText());
?
????????????????????? } else {
?
??????????????????????????? continue;
?
????????????????????? }
?
???????????????? }
?
?????????? } catch (Exception e) {
?
???????????????? e.printStackTrace();
?
?????????? } finally {
?
???????????????? try {
?
????????????????????? if ( null != connection)
?
??????????????????????????? connection.close();
?
???????????????? } catch (Throwable ignore) {
?
???????????????? }
?
?????????? }
?
????? }
?
?
?
????? publicstaticvoid main(String[] args) {
?
????? ????? //這里啟動(dòng)3個(gè)線程來(lái)監(jiān)聽(tīng)FirstTopic的消息,與queue的方式不一樣三個(gè)線程都能收到同樣的消息
?
?????????? ReceiveTopic receive1= new ReceiveTopic("thread1");
?
?????????? ReceiveTopic receive2= new ReceiveTopic("thread2");
?
?????????? ReceiveTopic receive3= new ReceiveTopic("thread3");
?
?????????? Thread thread1= new Thread(receive1);
?
?????????? Thread thread2= new Thread(receive2);
?
?????????? Thread thread3= new Thread(receive3);
?
?????????? thread1.start();
?
?????????? thread2.start();
?
?????????? thread3.start();
?
????? }
?
}
?
?
?
4???? queue方式的消息處理示例
?
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ的queue以及topic两种消息处理机制分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 使用Solr 增加索引以及检索
- 下一篇: webshpere缓存--web.xml