日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

深入掌握JMS JMSCorrelationID与Selector

發(fā)布時(shí)間:2024/4/17 编程问答 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入掌握JMS JMSCorrelationID与Selector 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

http://wangh8080.blog.163.com/blog/static/197848297201241843917244/

深入掌握J(rèn)MS(一):JSM基礎(chǔ)??

2012-05-18 16:39:17|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)?訂閱

1.JMS基本概念

JMS(Java Message Service) 即Java消息服務(wù)。它提供標(biāo)準(zhǔn)的產(chǎn)生、發(fā)送、接收消息的接口簡化企業(yè)應(yīng)用的開發(fā)。它支持兩種消息通信模型:點(diǎn)到點(diǎn)(point-to-point)(P2P)模型和發(fā)布/訂閱(Pub/Sub)模型。

P2P模型規(guī)定了一個(gè)消息只能有一個(gè)接收者;Pub/Sub模型允許一個(gè)消息可以有多個(gè)接收者。
對于點(diǎn)到點(diǎn)模型,消息生產(chǎn)者產(chǎn)生一個(gè)消息后,把這個(gè)消息發(fā)送到一個(gè)Queue(隊(duì)列)中,然后消息接收者再從這個(gè)Queue中讀取,一旦這個(gè)消息被一個(gè)接收者讀取之后,它就在這個(gè)Queue中消失了,所以一個(gè)消息只能被一個(gè)接收者消費(fèi)。
與點(diǎn)到點(diǎn)模型不同,發(fā)布/訂閱模型中,消息生產(chǎn)者產(chǎn)生一個(gè)消息后,把這個(gè)消息發(fā)送到一個(gè)Topic中,這個(gè)Topic可以同時(shí)有多個(gè)接收者在監(jiān)聽,當(dāng)一個(gè)消息到達(dá)這個(gè)Topic之后,所有消息接收者都會(huì)收到這個(gè)消息。
簡單的講,點(diǎn)到點(diǎn)模型和發(fā)布/訂閱模型的區(qū)別就是前者是一對一,后者是一對多。

2.幾個(gè)重要概念

Destination:消息發(fā)送的目的地,也就是前面說的Queue和Topic。創(chuàng)建好一個(gè)消息之后,只需要把這個(gè)消息發(fā)送到目的地,消息的發(fā)送者就可以繼續(xù)做自己的事情,而不用等待消息被處理完成。至于這個(gè)消息什么時(shí)候,會(huì)被哪個(gè)消費(fèi)者消費(fèi),完全取決于消息的接受者。
Message:從字面上就可以看出是被發(fā)送的消息。它有下面幾種類型:
StreamMessage:Java數(shù)據(jù)流消息,用標(biāo)準(zhǔn)流操作來順序的填充和讀取。
MapMessage:一個(gè)Map類型的消息;名稱為string類型,而值為Java的基本類型。
TextMessage:普通字符串消息,包含一個(gè)String。
ObjectMessage:對象消息,包含一個(gè)可序列化的Java對象
BytesMessage:二進(jìn)制數(shù)組消息,包含一個(gè)byte[]。
XMLMessage:一個(gè)XML類型的消息。

?

最常用的是TextMessage和ObjectMessage。

Session:與JMS提供者所建立的會(huì)話,通過Session我們才可以創(chuàng)建一個(gè)Message。
Connection:與JMS提供者建立的一個(gè)連接。可以從這個(gè)連接創(chuàng)建一個(gè)會(huì)話,即Session。
ConnectionFactory:那如何創(chuàng)建一個(gè)Connection呢?這就需要下面講到的ConnectionFactory了。通過這個(gè)工廠類就可以得到一個(gè)與JMS提供者的連接,即Conection。
Producer:消息的生產(chǎn)者,要發(fā)送一個(gè)消息,必須通過這個(gè)生產(chǎn)者來發(fā)送。
MessageConsumer:與生產(chǎn)者相對應(yīng),這是消息的消費(fèi)者或接收者,通過它來接收一個(gè)消息。

前面多次提到JMS提供者,因?yàn)镴MS給我們提供的只是一系列接口,當(dāng)我們使用一個(gè)JMS的時(shí)候,還是需要一個(gè)第三方的提供者,它的作用就是真正管理這些Connection,Session,Topic和Queue等。

?

通過下面這個(gè)簡圖可以看出上面這些概念的關(guān)系。

ConnectionFactory---->Connection--->Session--->Message
Destination+Session------------------------------------>Producer
Destination+Session------------------------------------>MessageConsumer

那么可能有人會(huì)問:ConnectionFactory和Destination從哪兒得到?
這就和JMS提供者有關(guān)了.如果在一個(gè)JavaEE環(huán)境中,可以通過JNDI查找得到,如果在一個(gè)非JavaEE環(huán)境中,那只能通過JMS提供者提供給我們的接口得到了.


?

http://wangh8080.blog.163.com/blog/static/197848297201241844727827/

深入掌握J(rèn)MS(二):一個(gè)JMS例子??

2012-05-18 16:50:37|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)?訂閱

前一講簡單的介紹了一下JMS的基本概念,這一講結(jié)合一個(gè)例子讓大家深入理解前一講的基本概念.

?

首先需要做的是選擇一個(gè)JMS提供者,如果在JavaEE環(huán)境中可以不用考慮這些.
我們選擇ActiveMQ,官方地址: http://activemq.apache.org/.網(wǎng)上有很多介紹ActiveMQ的文檔,所以在這里就不介紹了.

?

按照上一講的這個(gè)簡圖,
ConnectionFactory---->Connection--->Session--->Message
Destination+Session------------------------------------>Producer
Destination+Session------------------------------------>MessageConsumer

?

首先需要得到ConnectionFactoy和Destination,這里創(chuàng)建一個(gè)一對一的Queue作為Destination。

?

前一講簡單的介紹了一下JMS的基本概念,這一講結(jié)合一個(gè)例子讓大家深入理解前一講的基本概念.

?

首先需要做的是選擇一個(gè)JMS提供者,如果在JavaEE環(huán)境中可以不用考慮這些.
我們選擇ActiveMQ,官方地址: http://activemq.apache.org/.網(wǎng)上有很多介紹ActiveMQ的文檔,所以在這里就不介紹了.

?

按照上一講的這個(gè)簡圖,
ConnectionFactory---->Connection--->Session--->Message
Destination+Session------------------------------------>Producer
Destination+Session------------------------------------>MessageConsumer

?

首先需要得到ConnectionFactoy和Destination,這里創(chuàng)建一個(gè)一對一的Queue作為Destination。

?

ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Queue queue = new ActiveMQQueue("testQueue");

?

然后又ConnectionFactory創(chuàng)建一個(gè)Connection,再啟動(dòng)這個(gè)Connection:

Connection?connection?=?factory.createConnection();??
connection.start();??

?

接下來需要由Connection創(chuàng)建一個(gè)Session:

Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE)

?

現(xiàn)在暫且不用管參數(shù)的含義,以后會(huì)詳細(xì)講到.

下面就可以創(chuàng)建Message了,這里創(chuàng)建一個(gè)TextMessage。

Message?message?=?session.createTextMessage("HelloJMS!");?

?

要想把剛才創(chuàng)建的消息發(fā)送出去,需要由Session和Destination創(chuàng)建一個(gè)消息生產(chǎn)者
MessageProducer?producer?=?session.createProducer(queue);?

?

下面就可以發(fā)送剛才創(chuàng)建的消息了:

producer.send(message);

?

?

?

消息發(fā)送完成之后,我們需要?jiǎng)?chuàng)建一個(gè)消息消費(fèi)者來接收這個(gè)消息:

MessageConsumer?comsumer?=?session.createConsumer(queue);??
Message?recvMessage?=?comsumer.receive();??

?

消息消費(fèi)者接收到這個(gè)消息之后,就可以得到它的內(nèi)容:

System.out.println(((TextMessage)recvMessage).getText());??

?

?

至此,一個(gè)簡單的JMS例子就完成了。

下面是全部源碼:

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class MessageSendAndReceive {

??? public static void main(String[] args) throws Exception {
??????? ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??????? Connection connection = factory.createConnection();
??????? connection.start();
??????? Queue queue = new ActiveMQQueue("testQueue");
??????? final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??????? Message message = session.createTextMessage("Hello JMS!");
??????? MessageProducer producer = session.createProducer(queue);
??????? producer.send(message);
??????? System.out.println("Send Message Completed!");
??????? MessageConsumer comsumer = session.createConsumer(queue);
??????? Message recvMessage = comsumer.receive();
??????? System.out.println(((TextMessage)recvMessage).getText());
??? }

}

?

?

?

http://wangh8080.blog.163.com/blog/static/19784829720124184540477/

深入掌握J(rèn)MS(三):MessageListener??

2012-05-18 16:54:00|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)?訂閱

消息的消費(fèi)者接收消息可以采用兩種方式:


1、consumer.receive() 或 consumer.receive(int timeout);
2、注冊一個(gè)MessageListener。

采用第一種方式,消息的接收者會(huì)一直等待下去,直到有消息到達(dá),或者超時(shí)。
后一種方式會(huì)注冊一個(gè)監(jiān)聽器,當(dāng)有消息到達(dá)的時(shí)候,會(huì)回調(diào)它的onMessage()方法。

?

下面舉例說明:

MessageConsumer comsumer = session.createConsumer(queue);
comsumer.setMessageListener(new MessageListener(){
?@Override
?public void onMessage(Message m) {
??TextMessage textMsg = (TextMessage) m;
??try{
???System.out.println(textMsg.getText());
??} catch (JMSException e) {
???e.printStackTrace();
??}
?}
});

?

?

http://wangh8080.blog.163.com/blog/static/197848297201241845658879/

深入掌握J(rèn)MS(四):實(shí)戰(zhàn)Queue??

2012-05-18 16:56:58|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)?訂閱

Queue實(shí)現(xiàn)的是點(diǎn)到點(diǎn)模型,在下面的例子中,啟動(dòng)2個(gè)消費(fèi)者共同監(jiān)聽一個(gè)Queue,然后循環(huán)給這個(gè)Queue中發(fā)送多個(gè)消息,我們依然采用ActiveMQ。

?

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class QueueTest {

??? public static void main(String[] args) throws Exception {
??????? ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??????? Connection connection = factory.createConnection();
??????? connection.start();

??????? //創(chuàng)建一個(gè)Queue
??????? Queue queue = new ActiveMQQueue("testQueue");
??????? //創(chuàng)建一個(gè)Session
??????? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

??????? //注冊消費(fèi)者1
??????? MessageConsumer comsumer1 = session.createConsumer(queue);
??????? comsumer1.setMessageListener(new MessageListener(){
??????????? public void onMessage(Message m) {
??????????????? try {
??????????????????? System.out.println("Consumer1 get " + ((TextMessage)m).getText());
??????????????? } catch (JMSException e) {
??????????????????? e.printStackTrace();
??????????????? }
??????????? }
??????? });

??????? //注冊消費(fèi)者2
??????? MessageConsumer comsumer2 = session.createConsumer(queue);
??????? comsumer2.setMessageListener(new MessageListener(){
??????????? public void onMessage(Message m) {
??????????????? try {
??????????????????? System.out.println("Consumer2 get " + ((TextMessage)m).getText());
??????????????? } catch (JMSException e) {
??????????????????? e.printStackTrace();
??????????????? }
??????????? }
??????? });

??????? //創(chuàng)建一個(gè)生產(chǎn)者,然后發(fā)送多個(gè)消息。
??????? MessageProducer producer = session.createProducer(queue);
??????? for(int i=0; i<10; i++){
??????????? producer.send(session.createTextMessage("Message:" + i));
??????? }
??? }

}

?

?

運(yùn)行這個(gè)例子會(huì)得到下面的輸出結(jié)果:

?

Consumer1 get Message:0
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:9

?

可以看出每個(gè)消息直被消費(fèi)了一次,但是如果有多個(gè)消費(fèi)者同時(shí)監(jiān)聽一個(gè)Queue的話,無法確定一個(gè)消息最終會(huì)被哪一個(gè)消費(fèi)者消費(fèi)。

?

?

?

http://wangh8080.blog.163.com/blog/static/197848297201241854953613/

深入掌握J(rèn)MS(五):實(shí)戰(zhàn)Topic??

2012-05-18 17:59:32|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)?訂閱

與Queue不同的是,Topic實(shí)現(xiàn)的是發(fā)布/訂閱模型,在下面的例子中,啟動(dòng)2個(gè)消費(fèi)者共同監(jiān)聽一個(gè)Topic,然后循環(huán)給這個(gè)Topic中發(fā)送多個(gè)消息。

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;

public class TopicTest {

??? public static void main(String[] args) throws Exception {
??????? ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??????? Connection connection = factory.createConnection();
??????? connection.start();
??????? //創(chuàng)建一個(gè)Topic
??????? Topic topic= new ActiveMQTopic("testTopic");
??????? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??????? //注冊消費(fèi)者1
??????? MessageConsumer comsumer1 = session.createConsumer(topic);
??????? comsumer1.setMessageListener(new MessageListener(){
??????????? public void onMessage(Message m) {
??????????????? try {
??????????????????? System.out.println("Consumer1 get " + ((TextMessage)m).getText());
??????????????? } catch (JMSException e) {
??????????????????? e.printStackTrace();
??????????????? }
??????????? }
??????? });
??????? //注冊消費(fèi)者2
??????? MessageConsumer comsumer2 = session.createConsumer(topic);
??????? comsumer2.setMessageListener(new MessageListener(){
??????????? public void onMessage(Message m) {
??????????????? try {
??????????????????? System.out.println("Consumer2 get " + ((TextMessage)m).getText());
??????????????? } catch (JMSException e) {
??????????????????? e.printStackTrace();
??????????????? }
??????????? }
??????? });
??????? //創(chuàng)建一個(gè)生產(chǎn)者,然后發(fā)送多個(gè)消息。
??????? MessageProducer producer = session.createProducer(topic);
??????? for(int i=0; i<10; i++){
??????????? producer.send(session.createTextMessage("Message:" + i));
??????? }
??? }

}

?

?

運(yùn)行后得到下面的輸出結(jié)果:

?

Consumer1 get Message:0
Consumer2 get Message:0
Consumer1 get Message:1
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:2
Consumer1 get Message:3
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:4
Consumer1 get Message:5
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:6
Consumer1 get Message:7
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:8
Consumer1 get Message:9
Consumer2 get Message:9

?

?

?

http://wangh8080.blog.163.com/blog/static/19784829720124211123297/

深入掌握J(rèn)MS(六):消息頭??

2012-05-21 11:02:03|??分類: 默認(rèn)分類 |??標(biāo)簽: |字號(hào)?訂閱

一個(gè)消息對象分為三部分:消息頭(Headers),屬性(Properties)和消息體(Payload)。

?

對于StreamMessage和MapMessage,消息本身就有特定的結(jié)構(gòu),而對于TextMessage,ObjectMessage和BytesMessage是無結(jié)構(gòu)的。

一個(gè)消息可以包含一些重要的數(shù)據(jù)或者僅僅是一個(gè)事件的通知。

消息的Headers部分通常包含一些消息的描述信息,它們都是標(biāo)準(zhǔn)的描述信息。

包含下面一些值:

JMSDestination 消息的目的地,Topic或者是Queue。


JMSDeliveryMode 消息的發(fā)送模式:persistent或nonpersistent。前者表示消息在被消費(fèi)之前,如果JMS提供者DOWN了,重新啟動(dòng)后消息仍然存在。后者在這種情況下表示消息會(huì)被丟失。

可以通過下面的方式設(shè)置:
Producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);


JMSTimestamp 當(dāng)調(diào)用send()方法的時(shí)候,JMSTimestamp會(huì)被自動(dòng)設(shè)置為當(dāng)前事件。

可以通過下面方式得到這個(gè)值:
long timestamp = message.getJMSTimestamp();

JMSExpiration 表示一個(gè)消息的有效期。只有在這個(gè)有效期內(nèi),消息消費(fèi)者才可以消費(fèi)這個(gè)消息。默認(rèn)值為0,表示消息永不過期。

可以通過下面的方式設(shè)置:
producer.setTimeToLive(3600000); //有效期1小時(shí) (1000毫秒 * 60秒 * 60分)

?

JMSPriority 消息的優(yōu)先級(jí)。0-4為正常的優(yōu)先級(jí),5-9為高優(yōu)先級(jí)。

可以通過下面方式設(shè)置:producer.setPriority(9);

?

JMSMessageID 一個(gè)字符串用來唯一標(biāo)示一個(gè)消息。

?

JMSReplyTo 有時(shí)消息生產(chǎn)者希望消費(fèi)者回復(fù)一個(gè)消息,JMSReplyTo為一個(gè)Destination,表示需要回復(fù)的目的地。當(dāng)然消費(fèi)者可以不理會(huì)它。

?

JMSCorrelationID 通常用來關(guān)聯(lián)多個(gè)Message。例如需要回復(fù)一個(gè)消息,可以把JMSCorrelationID設(shè)置為所收到的消息的JMSMessageID。

?

JMSType 表示消息體的結(jié)構(gòu),和JMS提供者有關(guān)。

?

JMSRedelivered 如果這個(gè)值為true,表示消息是被重新發(fā)送了。因?yàn)橛袝r(shí)消費(fèi)者沒有確認(rèn)他已經(jīng)收到消息或者JMS提供者不確定消費(fèi)者是否已經(jīng)收到。

?

除了Header,消息發(fā)送者可以添加一些屬性(Properties)。這些屬性可以是應(yīng)用自定義的屬性,JMS定義的屬性和JMS提供者定義的屬性。我們通常只適用自定義的屬性。

?

后面會(huì)講到這些Header和屬性的用法。


?

?

?

http://wangh8080.blog.163.com/blog/static/197848297201242111416389/

深入掌握J(rèn)MS(七):DeliveryMode例子??

2012-05-21 11:04:16|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)?訂閱

在下面的例子中,分別發(fā)送一個(gè)Persistent和nonpersistent的消息,然后關(guān)閉退出JMS.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class DeliveryModeSendTest {

?public static void main(String[] args) throws Exception {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??Queue queue = new ActiveMQQueue("testQueue");
??Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??MessageProducer producer = session.createProducer(queue);
??producer.setDeliveryMode(DeliveryMode.PERSISTENT);
??producer.send(session.createTextMessage("A persistent Message"));
??producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
??producer.send(session.createTextMessage("A non persistent Message"));
??System.out.println("Send messages sucessfully!");
?}
}

運(yùn)行上面的程序,當(dāng)輸出“Send messages sucessfully!”時(shí),說明兩個(gè)消息都已經(jīng)發(fā)送成功,然后我們結(jié)束它,來停止JMS Provider。

?

接下來我們重新啟動(dòng)JMS Provicer,然后添加一個(gè)消費(fèi)者:

?

?

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class DeliveryModeReceiveTest {

?public static void main(String[] args) throws Exception {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??Queue queue = new ActiveMQQueue("testQueue");
??Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
??MessageConsumer comsumer = session.createConsumer(queue);
??comsumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println("Consumer get " + ((TextMessage) m).getText());
????} catch (JMSException e) {
?????e.printStackTrace();
????}
???}
??});
?}
}

?

?

?

http://wangh8080.blog.163.com/blog/static/197848297201242111652711/

深入掌握J(rèn)MS(八):JMSReplyTo??

2012-05-21 11:06:52|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)?訂閱

在下面的例子中,首先創(chuàng)建兩個(gè)Queue,發(fā)送者給一個(gè)Queue發(fā)送,接收者接收到消息之后給另一個(gè)Queue回復(fù)一個(gè)Message,然后再創(chuàng)建一個(gè)消費(fèi)者來接受所回復(fù)的消息。

?

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class MessageSendReceiveAndReply {

?public static void main(String[] args) throws Exception {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??// 消息發(fā)送到這個(gè)Queue
??Queue queue = new ActiveMQQueue("testQueue");

??// 消息回復(fù)到這個(gè)Queue
??Queue replyQueue = new ActiveMQQueue("replyQueue");
??final Session session = connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);

??// 創(chuàng)建一個(gè)消息,并設(shè)置它的JMSReplyTo為replyQueue。
??Message message = session.createTextMessage("Andy");
??message.setJMSReplyTo(replyQueue);
??MessageProducer producer = session.createProducer(queue);
??producer.send(message);
??// 消息的接收者
??MessageConsumer comsumer = session.createConsumer(queue);
??comsumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????// 創(chuàng)建一個(gè)新的MessageProducer來發(fā)送一個(gè)回復(fù)消息。
?????MessageProducer producer = session.createProducer(m.getJMSReplyTo());
?????producer.send(session.createTextMessage("Hello " + ((TextMessage) m).getText()));
????} catch (JMSException e1) {
?????e1.printStackTrace();
????}
???}
??});
??// 這個(gè)接收者用來接收回復(fù)的消息
??MessageConsumer comsumer2 = session.createConsumer(replyQueue);
??comsumer2.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println(((TextMessage) m).getText());
????} catch (JMSException e) {
?????e.printStackTrace();
????}
???}
??});
?}
}

?

?

首先消息生產(chǎn)者發(fā)送一個(gè)消息,內(nèi)容為“Andy”,
然后消費(fèi)者收到這個(gè)消息之后根據(jù)消息的JMSReplyTo,回復(fù)一個(gè)消息,內(nèi)容為“Hello Andy'。
最后在回復(fù)的Queue上創(chuàng)建一個(gè)接收回復(fù)消息的消費(fèi)者,它輸出所回復(fù)的內(nèi)容。


運(yùn)行上面的程序,可以得到下面的輸出結(jié)果:
Hello Andy

?

?

?

http://wangh8080.blog.163.com/blog/static/197848297201242111921713/

深入掌握J(rèn)MS(九):Selector??

2012-05-21 11:09:21|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)?訂閱

前面的例子中創(chuàng)建一個(gè)消息消費(fèi)者使用的是:
sesssion.createConsumer(destination)


另外,還提供了另一種方式:
sesssion.createConsumer(destination, selector)


這里selector是一個(gè)字符串,用來過濾消息。也就是說,這種方式可以創(chuàng)建一個(gè)可以只接收特定消息的一個(gè)消費(fèi)者。Selector的格式是類似于SQL-92的一種語法。可以用來比較消息頭信息和屬性。

?

下面的例子中,創(chuàng)建兩個(gè)消費(fèi)者,共同監(jiān)聽同一個(gè)Queue,但是它們的Selector不同,然后創(chuàng)建一個(gè)消息生產(chǎn)者,來發(fā)送多個(gè)消息。

?

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class JMSSelectorTest {

?public static void main(String[] args) throws Exception {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??Queue queue = new ActiveMQQueue("testQueue");
??Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??MessageConsumer comsumerA = session.createConsumer(queue, "receiver = 'A'");
??comsumerA.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println("ConsumerA get " + ((TextMessage) m).getText());
????} catch (JMSException e1) {
????}
???}
??});
??MessageConsumer comsumerB = session.createConsumer(queue, "receiver = 'B'");
??comsumerB.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println("ConsumerB get " + ((TextMessage) m).getText());
????} catch (JMSException e) {
????}
???}
??});
??MessageProducer producer = session.createProducer(queue);
??for (int i = 0; i < 10; i++) {
???String receiver = (i % 3 == 0 ? "A" : "B");
???TextMessage message = session.createTextMessage("Message" + i + ", receiver:" + receiver);
???message.setStringProperty("receiver", receiver);
???producer.send(message);
??}
?}
}

?

?

?

結(jié)果如下:


ConsumerA get Message0, receiver:A
ConsumerB get Message1, receiver:B
ConsumerB get Message2, receiver:B
ConsumerA get Message3, receiver:A
ConsumerB get Message4, receiver:B
ConsumerB get Message5, receiver:B
ConsumerA get Message6, receiver:A
ConsumerB get Message7, receiver:B
ConsumerB get Message8, receiver:B
ConsumerA get Message9, receiver:A

可以看出,消息消費(fèi)者只會(huì)取走它自己感興趣的消息。

?

?

?

?

?

?

http://wangh8080.blog.163.com/blog/static/1978482972012421111143248/

深入掌握J(rèn)MS(十):JMSCorrelationID與Selector??

2012-05-21 11:11:43|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)?訂閱

前面講過JMSCorrelationID主要是用來關(guān)聯(lián)多個(gè)Message,例如需要回復(fù)一個(gè)消息的時(shí)候,通常把回復(fù)的消息的JMSCorrelationID設(shè)置為原來消息的ID。

?

在下面這個(gè)例子中,創(chuàng)建了三個(gè)消息生產(chǎn)者A,B,C和三個(gè)消息消費(fèi)者A,B,C。生產(chǎn)者A給消費(fèi)者A發(fā)送一個(gè)消息,同時(shí)需要消費(fèi)者A給它回復(fù)一個(gè)消息。B、C與A類似。


簡圖如下:

生產(chǎn)者A-----發(fā)送----〉消費(fèi)者A-----回復(fù)------〉生產(chǎn)者A

生產(chǎn)者B-----發(fā)送----〉消費(fèi)者B-----回復(fù)------〉生產(chǎn)者B

生產(chǎn)者C-----發(fā)送----〉消費(fèi)者C-----回復(fù)------〉生產(chǎn)者C


需要注意的是,所有的發(fā)送和回復(fù)都使用同一個(gè)Queue,通過Selector區(qū)分。

?

?

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class JMSCorrelationIDTest {
?private Queue queue;
?private Session session;

?public JMSCorrelationIDTest() throws JMSException {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??queue = new ActiveMQQueue("testQueue");
??session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??setupConsumer("ConsumerA");
??setupConsumer("ConsumerB");
??setupConsumer("ConsumerC");
??setupProducer("ProducerA", "ConsumerA");
??setupProducer("ProducerB", "ConsumerB");
??setupProducer("ProducerC", "ConsumerC");
?}

?private void setupConsumer(final String name) throws JMSException {
??// 創(chuàng)建一個(gè)消費(fèi)者,它只接受屬于它自己的消息
??MessageConsumer consumer = session.createConsumer(queue, "receiver='" + name + "'");
??consumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????MessageProducer producer = session.createProducer(queue);
?????System.out.println(name + " get:" + ((TextMessage) m).getText());
?????// 回復(fù)一個(gè)消息
?????Message replyMessage = session.createTextMessage("Reply from " + name);
?????// 設(shè)置JMSCorrelationID為剛才收到的消息的ID
?????replyMessage.setJMSCorrelationID(m.getJMSMessageID());
?????producer.send(replyMessage);
????} catch (JMSException e) {
????}
???}
??});
?}

?private void setupProducer(final String name, String consumerName)
???throws JMSException {
??MessageProducer producer = session.createProducer(queue);
??producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
??// 創(chuàng)建一個(gè)消息,并設(shè)置一個(gè)屬性receiver,為消費(fèi)者的名字。
??Message message = session.createTextMessage("Message from " + name);
??message.setStringProperty("receiver", consumerName);
??producer.send(message);
??// 等待回復(fù)的消息
??MessageConsumer replyConsumer = session.createConsumer(queue,"JMSCorrelationID='" + message.getJMSMessageID() + "'");
??replyConsumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println(name + " get reply:" + ((TextMessage) m).getText());
????} catch (JMSException e) {
????}
???}
??});
?}

?public static void main(String[] args) throws Exception {
??new JMSCorrelationIDTest();
?}
}

?

?

運(yùn)行結(jié)果為:
ConsumerA get:Message from ProducerA
ProducerA get reply:Reply from ConsumerA
ConsumerB get:Message from ProducerB
ProducerB get reply:Reply from ConsumerB
ConsumerC get:Message from ProducerC
ProducerC get reply:Reply from ConsumerC

?

?

http://wangh8080.blog.163.com/blog/static/197848297201242111134751/

?

深入掌握J(rèn)MS(十一):TemporaryQueue和TemporaryTopic .??

2012-05-21 11:13:04|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)?訂閱

TemporaryQueue和TemporaryTopic,從字面上就可以看出它們是“臨時(shí)”的目的地。可以通過Session來創(chuàng)建,例如:


TemporaryQueue replyQueue = session.createTemporaryQueue();


雖然它們是由Session來創(chuàng)建的,但是它們的生命周期確實(shí)整個(gè)Connection。如果在一個(gè)Connection上創(chuàng)建了兩個(gè)Session,則一個(gè)Session創(chuàng)建的TemporaryQueue或TemporaryTopic也可以被另一個(gè)Session訪問。那如果這兩個(gè)Session是由不同的Connection創(chuàng)建,則一個(gè)Session創(chuàng)建的TemporaryQueue不可以被另一個(gè)Session訪問。


另外,它們的主要作用就是用來指定回復(fù)目的地, 即作為JMSReplyTo。


在下面的例子中,先創(chuàng)建一個(gè)Connection,然后創(chuàng)建兩個(gè)Session,其中一個(gè)Session創(chuàng)建了一個(gè)TemporaryQueue,另一個(gè)Session在這個(gè)TemporaryQueue上讀取消息。

?

?

?

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class TemporaryQueueTest {
?public static void main(String[] args) throws Exception {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??Queue queue = new ActiveMQQueue("testQueue2");
??final Session session = connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
??// 使用session創(chuàng)建一個(gè)TemporaryQueue。
??TemporaryQueue replyQueue = session.createTemporaryQueue();
??// 接收消息,并回復(fù)到指定的Queue中(即replyQueue)
??MessageConsumer comsumer = session.createConsumer(queue);
??comsumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println("Get Message: " + ((TextMessage) m).getText());
?????MessageProducer producer = session.createProducer(m.getJMSReplyTo());
?????producer.send(session.createTextMessage("ReplyMessage"));
????} catch (JMSException e) {
????}
???}
??});
??// 使用同一個(gè)Connection創(chuàng)建另一個(gè)Session,來讀取replyQueue上的消息。
??Session session2 = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
??MessageConsumer replyComsumer = session2.createConsumer(replyQueue);
??replyComsumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println("Get reply: " + ((TextMessage) m).getText());
????} catch (JMSException e) {
????}
???}
??});
??MessageProducer producer = session.createProducer(queue);
??TextMessage message = session.createTextMessage("SimpleMessage");
??message.setJMSReplyTo(replyQueue);
??producer.send(message);
?}
}


運(yùn)行結(jié)果為:
Get Message: SimpleMessage
Get reply: ReplyMessage


如果將:
Session session2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
更改為:
Connection connection2 = factory.createConnection();
Session session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);


就會(huì)得到類似于下面的異常:
Exception in thread "main" javax.jms.InvalidDestinationException: Cannot use a Temporary destination from another Connection

?

總結(jié)

以上是生活随笔為你收集整理的深入掌握JMS JMSCorrelationID与Selector的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。