深入掌握JMS JMSCorrelationID与Selector
http://wangh8080.blog.163.com/blog/static/197848297201241843917244/
深入掌握J(rèn)MS(一):JSM基礎(chǔ)??
2012-05-18 16:39:17|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)大中小?訂閱
1.JMS基本概念
JMS(Java Message Service) 即Java消息服務(wù)。它提供標(biāo)準(zhǔn)的產(chǎn)生、發(fā)送、接收消息的接口簡化企業(yè)應(yīng)用的開發(fā)。它支持兩種消息通信模型:點(diǎn)到點(diǎn)(point-to-point)(P2P)模型和發(fā)布/訂閱(Pub/Sub)模型。
P2P模型規(guī)定了一個(gè)消息只能有一個(gè)接收者;Pub/Sub模型允許一個(gè)消息可以有多個(gè)接收者。
對于點(diǎn)到點(diǎn)模型,消息生產(chǎn)者產(chǎn)生一個(gè)消息后,把這個(gè)消息發(fā)送到一個(gè)Queue(隊(duì)列)中,然后消息接收者再從這個(gè)Queue中讀取,一旦這個(gè)消息被一個(gè)接收者讀取之后,它就在這個(gè)Queue中消失了,所以一個(gè)消息只能被一個(gè)接收者消費(fèi)。
與點(diǎn)到點(diǎn)模型不同,發(fā)布/訂閱模型中,消息生產(chǎn)者產(chǎn)生一個(gè)消息后,把這個(gè)消息發(fā)送到一個(gè)Topic中,這個(gè)Topic可以同時(shí)有多個(gè)接收者在監(jiān)聽,當(dāng)一個(gè)消息到達(dá)這個(gè)Topic之后,所有消息接收者都會(huì)收到這個(gè)消息。
簡單的講,點(diǎn)到點(diǎn)模型和發(fā)布/訂閱模型的區(qū)別就是前者是一對一,后者是一對多。
2.幾個(gè)重要概念
Destination:消息發(fā)送的目的地,也就是前面說的Queue和Topic。創(chuàng)建好一個(gè)消息之后,只需要把這個(gè)消息發(fā)送到目的地,消息的發(fā)送者就可以繼續(xù)做自己的事情,而不用等待消息被處理完成。至于這個(gè)消息什么時(shí)候,會(huì)被哪個(gè)消費(fèi)者消費(fèi),完全取決于消息的接受者。
Message:從字面上就可以看出是被發(fā)送的消息。它有下面幾種類型:
StreamMessage:Java數(shù)據(jù)流消息,用標(biāo)準(zhǔn)流操作來順序的填充和讀取。
MapMessage:一個(gè)Map類型的消息;名稱為string類型,而值為Java的基本類型。
TextMessage:普通字符串消息,包含一個(gè)String。
ObjectMessage:對象消息,包含一個(gè)可序列化的Java對象
BytesMessage:二進(jìn)制數(shù)組消息,包含一個(gè)byte[]。
XMLMessage:一個(gè)XML類型的消息。
?
最常用的是TextMessage和ObjectMessage。
Session:與JMS提供者所建立的會(huì)話,通過Session我們才可以創(chuàng)建一個(gè)Message。
Connection:與JMS提供者建立的一個(gè)連接。可以從這個(gè)連接創(chuàng)建一個(gè)會(huì)話,即Session。
ConnectionFactory:那如何創(chuàng)建一個(gè)Connection呢?這就需要下面講到的ConnectionFactory了。通過這個(gè)工廠類就可以得到一個(gè)與JMS提供者的連接,即Conection。
Producer:消息的生產(chǎn)者,要發(fā)送一個(gè)消息,必須通過這個(gè)生產(chǎn)者來發(fā)送。
MessageConsumer:與生產(chǎn)者相對應(yīng),這是消息的消費(fèi)者或接收者,通過它來接收一個(gè)消息。
前面多次提到JMS提供者,因?yàn)镴MS給我們提供的只是一系列接口,當(dāng)我們使用一個(gè)JMS的時(shí)候,還是需要一個(gè)第三方的提供者,它的作用就是真正管理這些Connection,Session,Topic和Queue等。
?
通過下面這個(gè)簡圖可以看出上面這些概念的關(guān)系。
ConnectionFactory---->Connection--->Session--->Message
Destination+Session------------------------------------>Producer
Destination+Session------------------------------------>MessageConsumer
那么可能有人會(huì)問:ConnectionFactory和Destination從哪兒得到?
這就和JMS提供者有關(guān)了.如果在一個(gè)JavaEE環(huán)境中,可以通過JNDI查找得到,如果在一個(gè)非JavaEE環(huán)境中,那只能通過JMS提供者提供給我們的接口得到了.
?
http://wangh8080.blog.163.com/blog/static/197848297201241844727827/
深入掌握J(rèn)MS(二):一個(gè)JMS例子??
2012-05-18 16:50:37|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)大中小?訂閱
前一講簡單的介紹了一下JMS的基本概念,這一講結(jié)合一個(gè)例子讓大家深入理解前一講的基本概念.
?
首先需要做的是選擇一個(gè)JMS提供者,如果在JavaEE環(huán)境中可以不用考慮這些.
我們選擇ActiveMQ,官方地址: http://activemq.apache.org/.網(wǎng)上有很多介紹ActiveMQ的文檔,所以在這里就不介紹了.
?
按照上一講的這個(gè)簡圖,
ConnectionFactory---->Connection--->Session--->Message
Destination+Session------------------------------------>Producer
Destination+Session------------------------------------>MessageConsumer
?
首先需要得到ConnectionFactoy和Destination,這里創(chuàng)建一個(gè)一對一的Queue作為Destination。
?
前一講簡單的介紹了一下JMS的基本概念,這一講結(jié)合一個(gè)例子讓大家深入理解前一講的基本概念.
?
首先需要做的是選擇一個(gè)JMS提供者,如果在JavaEE環(huán)境中可以不用考慮這些.
我們選擇ActiveMQ,官方地址: http://activemq.apache.org/.網(wǎng)上有很多介紹ActiveMQ的文檔,所以在這里就不介紹了.
?
按照上一講的這個(gè)簡圖,
ConnectionFactory---->Connection--->Session--->Message
Destination+Session------------------------------------>Producer
Destination+Session------------------------------------>MessageConsumer
?
首先需要得到ConnectionFactoy和Destination,這里創(chuàng)建一個(gè)一對一的Queue作為Destination。
?
ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Queue queue = new ActiveMQQueue("testQueue");
?
然后又ConnectionFactory創(chuàng)建一個(gè)Connection,再啟動(dòng)這個(gè)Connection:
Connection?connection?=?factory.createConnection();??
connection.start();??
?
接下來需要由Connection創(chuàng)建一個(gè)Session:
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE)
?
現(xiàn)在暫且不用管參數(shù)的含義,以后會(huì)詳細(xì)講到.
下面就可以創(chuàng)建Message了,這里創(chuàng)建一個(gè)TextMessage。
Message?message?=?session.createTextMessage("HelloJMS!");?
?
要想把剛才創(chuàng)建的消息發(fā)送出去,需要由Session和Destination創(chuàng)建一個(gè)消息生產(chǎn)者
MessageProducer?producer?=?session.createProducer(queue);?
?
下面就可以發(fā)送剛才創(chuàng)建的消息了:
producer.send(message);
?
?
?
消息發(fā)送完成之后,我們需要?jiǎng)?chuàng)建一個(gè)消息消費(fèi)者來接收這個(gè)消息:
MessageConsumer?comsumer?=?session.createConsumer(queue);??
Message?recvMessage?=?comsumer.receive();??
?
消息消費(fèi)者接收到這個(gè)消息之后,就可以得到它的內(nèi)容:
System.out.println(((TextMessage)recvMessage).getText());??
?
?
至此,一個(gè)簡單的JMS例子就完成了。
下面是全部源碼:
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class MessageSendAndReceive {
??? public static void main(String[] args) throws Exception {
??????? ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??????? Connection connection = factory.createConnection();
??????? connection.start();
??????? Queue queue = new ActiveMQQueue("testQueue");
??????? final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??????? Message message = session.createTextMessage("Hello JMS!");
??????? MessageProducer producer = session.createProducer(queue);
??????? producer.send(message);
??????? System.out.println("Send Message Completed!");
??????? MessageConsumer comsumer = session.createConsumer(queue);
??????? Message recvMessage = comsumer.receive();
??????? System.out.println(((TextMessage)recvMessage).getText());
??? }
}
?
?
?
http://wangh8080.blog.163.com/blog/static/19784829720124184540477/
深入掌握J(rèn)MS(三):MessageListener??
2012-05-18 16:54:00|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)大中小?訂閱
消息的消費(fèi)者接收消息可以采用兩種方式:
1、consumer.receive() 或 consumer.receive(int timeout);
2、注冊一個(gè)MessageListener。
采用第一種方式,消息的接收者會(huì)一直等待下去,直到有消息到達(dá),或者超時(shí)。
后一種方式會(huì)注冊一個(gè)監(jiān)聽器,當(dāng)有消息到達(dá)的時(shí)候,會(huì)回調(diào)它的onMessage()方法。
?
下面舉例說明:
MessageConsumer comsumer = session.createConsumer(queue);
comsumer.setMessageListener(new MessageListener(){
?@Override
?public void onMessage(Message m) {
??TextMessage textMsg = (TextMessage) m;
??try{
???System.out.println(textMsg.getText());
??} catch (JMSException e) {
???e.printStackTrace();
??}
?}
});
?
?
http://wangh8080.blog.163.com/blog/static/197848297201241845658879/
深入掌握J(rèn)MS(四):實(shí)戰(zhàn)Queue??
2012-05-18 16:56:58|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)大中小?訂閱
Queue實(shí)現(xiàn)的是點(diǎn)到點(diǎn)模型,在下面的例子中,啟動(dòng)2個(gè)消費(fèi)者共同監(jiān)聽一個(gè)Queue,然后循環(huán)給這個(gè)Queue中發(fā)送多個(gè)消息,我們依然采用ActiveMQ。
?
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class QueueTest {
??? public static void main(String[] args) throws Exception {
??????? ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??????? Connection connection = factory.createConnection();
??????? connection.start();
??????? //創(chuàng)建一個(gè)Queue
??????? Queue queue = new ActiveMQQueue("testQueue");
??????? //創(chuàng)建一個(gè)Session
??????? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??????? //注冊消費(fèi)者1
??????? MessageConsumer comsumer1 = session.createConsumer(queue);
??????? comsumer1.setMessageListener(new MessageListener(){
??????????? public void onMessage(Message m) {
??????????????? try {
??????????????????? System.out.println("Consumer1 get " + ((TextMessage)m).getText());
??????????????? } catch (JMSException e) {
??????????????????? e.printStackTrace();
??????????????? }
??????????? }
??????? });
??????? //注冊消費(fèi)者2
??????? MessageConsumer comsumer2 = session.createConsumer(queue);
??????? comsumer2.setMessageListener(new MessageListener(){
??????????? public void onMessage(Message m) {
??????????????? try {
??????????????????? System.out.println("Consumer2 get " + ((TextMessage)m).getText());
??????????????? } catch (JMSException e) {
??????????????????? e.printStackTrace();
??????????????? }
??????????? }
??????? });
??????? //創(chuàng)建一個(gè)生產(chǎn)者,然后發(fā)送多個(gè)消息。
??????? MessageProducer producer = session.createProducer(queue);
??????? for(int i=0; i<10; i++){
??????????? producer.send(session.createTextMessage("Message:" + i));
??????? }
??? }
}
?
?
運(yùn)行這個(gè)例子會(huì)得到下面的輸出結(jié)果:
?
Consumer1 get Message:0
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:9
?
可以看出每個(gè)消息直被消費(fèi)了一次,但是如果有多個(gè)消費(fèi)者同時(shí)監(jiān)聽一個(gè)Queue的話,無法確定一個(gè)消息最終會(huì)被哪一個(gè)消費(fèi)者消費(fèi)。
?
?
?
http://wangh8080.blog.163.com/blog/static/197848297201241854953613/
深入掌握J(rèn)MS(五):實(shí)戰(zhàn)Topic??
2012-05-18 17:59:32|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)大中小?訂閱
與Queue不同的是,Topic實(shí)現(xiàn)的是發(fā)布/訂閱模型,在下面的例子中,啟動(dòng)2個(gè)消費(fèi)者共同監(jiān)聽一個(gè)Topic,然后循環(huán)給這個(gè)Topic中發(fā)送多個(gè)消息。
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
public class TopicTest {
??? public static void main(String[] args) throws Exception {
??????? ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??????? Connection connection = factory.createConnection();
??????? connection.start();
??????? //創(chuàng)建一個(gè)Topic
??????? Topic topic= new ActiveMQTopic("testTopic");
??????? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??????? //注冊消費(fèi)者1
??????? MessageConsumer comsumer1 = session.createConsumer(topic);
??????? comsumer1.setMessageListener(new MessageListener(){
??????????? public void onMessage(Message m) {
??????????????? try {
??????????????????? System.out.println("Consumer1 get " + ((TextMessage)m).getText());
??????????????? } catch (JMSException e) {
??????????????????? e.printStackTrace();
??????????????? }
??????????? }
??????? });
??????? //注冊消費(fèi)者2
??????? MessageConsumer comsumer2 = session.createConsumer(topic);
??????? comsumer2.setMessageListener(new MessageListener(){
??????????? public void onMessage(Message m) {
??????????????? try {
??????????????????? System.out.println("Consumer2 get " + ((TextMessage)m).getText());
??????????????? } catch (JMSException e) {
??????????????????? e.printStackTrace();
??????????????? }
??????????? }
??????? });
??????? //創(chuàng)建一個(gè)生產(chǎn)者,然后發(fā)送多個(gè)消息。
??????? MessageProducer producer = session.createProducer(topic);
??????? for(int i=0; i<10; i++){
??????????? producer.send(session.createTextMessage("Message:" + i));
??????? }
??? }
}
?
?
運(yùn)行后得到下面的輸出結(jié)果:
?
Consumer1 get Message:0
Consumer2 get Message:0
Consumer1 get Message:1
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:2
Consumer1 get Message:3
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:4
Consumer1 get Message:5
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:6
Consumer1 get Message:7
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:8
Consumer1 get Message:9
Consumer2 get Message:9
?
?
?
http://wangh8080.blog.163.com/blog/static/19784829720124211123297/
深入掌握J(rèn)MS(六):消息頭??
2012-05-21 11:02:03|??分類: 默認(rèn)分類 |??標(biāo)簽: |字號(hào)大中小?訂閱
一個(gè)消息對象分為三部分:消息頭(Headers),屬性(Properties)和消息體(Payload)。
?
對于StreamMessage和MapMessage,消息本身就有特定的結(jié)構(gòu),而對于TextMessage,ObjectMessage和BytesMessage是無結(jié)構(gòu)的。
一個(gè)消息可以包含一些重要的數(shù)據(jù)或者僅僅是一個(gè)事件的通知。
消息的Headers部分通常包含一些消息的描述信息,它們都是標(biāo)準(zhǔn)的描述信息。
包含下面一些值:
JMSDestination 消息的目的地,Topic或者是Queue。
JMSDeliveryMode 消息的發(fā)送模式:persistent或nonpersistent。前者表示消息在被消費(fèi)之前,如果JMS提供者DOWN了,重新啟動(dòng)后消息仍然存在。后者在這種情況下表示消息會(huì)被丟失。
可以通過下面的方式設(shè)置:
Producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
JMSTimestamp 當(dāng)調(diào)用send()方法的時(shí)候,JMSTimestamp會(huì)被自動(dòng)設(shè)置為當(dāng)前事件。
可以通過下面方式得到這個(gè)值:
long timestamp = message.getJMSTimestamp();
JMSExpiration 表示一個(gè)消息的有效期。只有在這個(gè)有效期內(nèi),消息消費(fèi)者才可以消費(fèi)這個(gè)消息。默認(rèn)值為0,表示消息永不過期。
可以通過下面的方式設(shè)置:
producer.setTimeToLive(3600000); //有效期1小時(shí) (1000毫秒 * 60秒 * 60分)
?
JMSPriority 消息的優(yōu)先級(jí)。0-4為正常的優(yōu)先級(jí),5-9為高優(yōu)先級(jí)。
可以通過下面方式設(shè)置:producer.setPriority(9);
?
JMSMessageID 一個(gè)字符串用來唯一標(biāo)示一個(gè)消息。
?
JMSReplyTo 有時(shí)消息生產(chǎn)者希望消費(fèi)者回復(fù)一個(gè)消息,JMSReplyTo為一個(gè)Destination,表示需要回復(fù)的目的地。當(dāng)然消費(fèi)者可以不理會(huì)它。
?
JMSCorrelationID 通常用來關(guān)聯(lián)多個(gè)Message。例如需要回復(fù)一個(gè)消息,可以把JMSCorrelationID設(shè)置為所收到的消息的JMSMessageID。
?
JMSType 表示消息體的結(jié)構(gòu),和JMS提供者有關(guān)。
?
JMSRedelivered 如果這個(gè)值為true,表示消息是被重新發(fā)送了。因?yàn)橛袝r(shí)消費(fèi)者沒有確認(rèn)他已經(jīng)收到消息或者JMS提供者不確定消費(fèi)者是否已經(jīng)收到。
?
除了Header,消息發(fā)送者可以添加一些屬性(Properties)。這些屬性可以是應(yīng)用自定義的屬性,JMS定義的屬性和JMS提供者定義的屬性。我們通常只適用自定義的屬性。
?
后面會(huì)講到這些Header和屬性的用法。
?
?
?
http://wangh8080.blog.163.com/blog/static/197848297201242111416389/
深入掌握J(rèn)MS(七):DeliveryMode例子??
2012-05-21 11:04:16|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)大中小?訂閱
在下面的例子中,分別發(fā)送一個(gè)Persistent和nonpersistent的消息,然后關(guān)閉退出JMS.
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class DeliveryModeSendTest {
?public static void main(String[] args) throws Exception {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??Queue queue = new ActiveMQQueue("testQueue");
??Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??MessageProducer producer = session.createProducer(queue);
??producer.setDeliveryMode(DeliveryMode.PERSISTENT);
??producer.send(session.createTextMessage("A persistent Message"));
??producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
??producer.send(session.createTextMessage("A non persistent Message"));
??System.out.println("Send messages sucessfully!");
?}
}
運(yùn)行上面的程序,當(dāng)輸出“Send messages sucessfully!”時(shí),說明兩個(gè)消息都已經(jīng)發(fā)送成功,然后我們結(jié)束它,來停止JMS Provider。
?
接下來我們重新啟動(dòng)JMS Provicer,然后添加一個(gè)消費(fèi)者:
?
?
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class DeliveryModeReceiveTest {
?public static void main(String[] args) throws Exception {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??Queue queue = new ActiveMQQueue("testQueue");
??Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
??MessageConsumer comsumer = session.createConsumer(queue);
??comsumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println("Consumer get " + ((TextMessage) m).getText());
????} catch (JMSException e) {
?????e.printStackTrace();
????}
???}
??});
?}
}
?
?
?
http://wangh8080.blog.163.com/blog/static/197848297201242111652711/
深入掌握J(rèn)MS(八):JMSReplyTo??
2012-05-21 11:06:52|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)大中小?訂閱
在下面的例子中,首先創(chuàng)建兩個(gè)Queue,發(fā)送者給一個(gè)Queue發(fā)送,接收者接收到消息之后給另一個(gè)Queue回復(fù)一個(gè)Message,然后再創(chuàng)建一個(gè)消費(fèi)者來接受所回復(fù)的消息。
?
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class MessageSendReceiveAndReply {
?public static void main(String[] args) throws Exception {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??// 消息發(fā)送到這個(gè)Queue
??Queue queue = new ActiveMQQueue("testQueue");
??// 消息回復(fù)到這個(gè)Queue
??Queue replyQueue = new ActiveMQQueue("replyQueue");
??final Session session = connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
??// 創(chuàng)建一個(gè)消息,并設(shè)置它的JMSReplyTo為replyQueue。
??Message message = session.createTextMessage("Andy");
??message.setJMSReplyTo(replyQueue);
??MessageProducer producer = session.createProducer(queue);
??producer.send(message);
??// 消息的接收者
??MessageConsumer comsumer = session.createConsumer(queue);
??comsumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????// 創(chuàng)建一個(gè)新的MessageProducer來發(fā)送一個(gè)回復(fù)消息。
?????MessageProducer producer = session.createProducer(m.getJMSReplyTo());
?????producer.send(session.createTextMessage("Hello " + ((TextMessage) m).getText()));
????} catch (JMSException e1) {
?????e1.printStackTrace();
????}
???}
??});
??// 這個(gè)接收者用來接收回復(fù)的消息
??MessageConsumer comsumer2 = session.createConsumer(replyQueue);
??comsumer2.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println(((TextMessage) m).getText());
????} catch (JMSException e) {
?????e.printStackTrace();
????}
???}
??});
?}
}
?
?
首先消息生產(chǎn)者發(fā)送一個(gè)消息,內(nèi)容為“Andy”,
然后消費(fèi)者收到這個(gè)消息之后根據(jù)消息的JMSReplyTo,回復(fù)一個(gè)消息,內(nèi)容為“Hello Andy'。
最后在回復(fù)的Queue上創(chuàng)建一個(gè)接收回復(fù)消息的消費(fèi)者,它輸出所回復(fù)的內(nèi)容。
運(yùn)行上面的程序,可以得到下面的輸出結(jié)果:
Hello Andy
?
?
?
http://wangh8080.blog.163.com/blog/static/197848297201242111921713/
深入掌握J(rèn)MS(九):Selector??
2012-05-21 11:09:21|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)大中小?訂閱
前面的例子中創(chuàng)建一個(gè)消息消費(fèi)者使用的是:
sesssion.createConsumer(destination)
另外,還提供了另一種方式:
sesssion.createConsumer(destination, selector)
這里selector是一個(gè)字符串,用來過濾消息。也就是說,這種方式可以創(chuàng)建一個(gè)可以只接收特定消息的一個(gè)消費(fèi)者。Selector的格式是類似于SQL-92的一種語法。可以用來比較消息頭信息和屬性。
?
下面的例子中,創(chuàng)建兩個(gè)消費(fèi)者,共同監(jiān)聽同一個(gè)Queue,但是它們的Selector不同,然后創(chuàng)建一個(gè)消息生產(chǎn)者,來發(fā)送多個(gè)消息。
?
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class JMSSelectorTest {
?public static void main(String[] args) throws Exception {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??Queue queue = new ActiveMQQueue("testQueue");
??Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??MessageConsumer comsumerA = session.createConsumer(queue, "receiver = 'A'");
??comsumerA.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println("ConsumerA get " + ((TextMessage) m).getText());
????} catch (JMSException e1) {
????}
???}
??});
??MessageConsumer comsumerB = session.createConsumer(queue, "receiver = 'B'");
??comsumerB.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println("ConsumerB get " + ((TextMessage) m).getText());
????} catch (JMSException e) {
????}
???}
??});
??MessageProducer producer = session.createProducer(queue);
??for (int i = 0; i < 10; i++) {
???String receiver = (i % 3 == 0 ? "A" : "B");
???TextMessage message = session.createTextMessage("Message" + i + ", receiver:" + receiver);
???message.setStringProperty("receiver", receiver);
???producer.send(message);
??}
?}
}
?
?
?
結(jié)果如下:
ConsumerA get Message0, receiver:A
ConsumerB get Message1, receiver:B
ConsumerB get Message2, receiver:B
ConsumerA get Message3, receiver:A
ConsumerB get Message4, receiver:B
ConsumerB get Message5, receiver:B
ConsumerA get Message6, receiver:A
ConsumerB get Message7, receiver:B
ConsumerB get Message8, receiver:B
ConsumerA get Message9, receiver:A
可以看出,消息消費(fèi)者只會(huì)取走它自己感興趣的消息。
?
?
?
?
?
?
http://wangh8080.blog.163.com/blog/static/1978482972012421111143248/
深入掌握J(rèn)MS(十):JMSCorrelationID與Selector??
2012-05-21 11:11:43|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)大中小?訂閱
前面講過JMSCorrelationID主要是用來關(guān)聯(lián)多個(gè)Message,例如需要回復(fù)一個(gè)消息的時(shí)候,通常把回復(fù)的消息的JMSCorrelationID設(shè)置為原來消息的ID。
?
在下面這個(gè)例子中,創(chuàng)建了三個(gè)消息生產(chǎn)者A,B,C和三個(gè)消息消費(fèi)者A,B,C。生產(chǎn)者A給消費(fèi)者A發(fā)送一個(gè)消息,同時(shí)需要消費(fèi)者A給它回復(fù)一個(gè)消息。B、C與A類似。
簡圖如下:
生產(chǎn)者A-----發(fā)送----〉消費(fèi)者A-----回復(fù)------〉生產(chǎn)者A
生產(chǎn)者B-----發(fā)送----〉消費(fèi)者B-----回復(fù)------〉生產(chǎn)者B
生產(chǎn)者C-----發(fā)送----〉消費(fèi)者C-----回復(fù)------〉生產(chǎn)者C
需要注意的是,所有的發(fā)送和回復(fù)都使用同一個(gè)Queue,通過Selector區(qū)分。
?
?
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class JMSCorrelationIDTest {
?private Queue queue;
?private Session session;
?public JMSCorrelationIDTest() throws JMSException {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??queue = new ActiveMQQueue("testQueue");
??session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??setupConsumer("ConsumerA");
??setupConsumer("ConsumerB");
??setupConsumer("ConsumerC");
??setupProducer("ProducerA", "ConsumerA");
??setupProducer("ProducerB", "ConsumerB");
??setupProducer("ProducerC", "ConsumerC");
?}
?private void setupConsumer(final String name) throws JMSException {
??// 創(chuàng)建一個(gè)消費(fèi)者,它只接受屬于它自己的消息
??MessageConsumer consumer = session.createConsumer(queue, "receiver='" + name + "'");
??consumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????MessageProducer producer = session.createProducer(queue);
?????System.out.println(name + " get:" + ((TextMessage) m).getText());
?????// 回復(fù)一個(gè)消息
?????Message replyMessage = session.createTextMessage("Reply from " + name);
?????// 設(shè)置JMSCorrelationID為剛才收到的消息的ID
?????replyMessage.setJMSCorrelationID(m.getJMSMessageID());
?????producer.send(replyMessage);
????} catch (JMSException e) {
????}
???}
??});
?}
?private void setupProducer(final String name, String consumerName)
???throws JMSException {
??MessageProducer producer = session.createProducer(queue);
??producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
??// 創(chuàng)建一個(gè)消息,并設(shè)置一個(gè)屬性receiver,為消費(fèi)者的名字。
??Message message = session.createTextMessage("Message from " + name);
??message.setStringProperty("receiver", consumerName);
??producer.send(message);
??// 等待回復(fù)的消息
??MessageConsumer replyConsumer = session.createConsumer(queue,"JMSCorrelationID='" + message.getJMSMessageID() + "'");
??replyConsumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println(name + " get reply:" + ((TextMessage) m).getText());
????} catch (JMSException e) {
????}
???}
??});
?}
?public static void main(String[] args) throws Exception {
??new JMSCorrelationIDTest();
?}
}
?
?
運(yùn)行結(jié)果為:
ConsumerA get:Message from ProducerA
ProducerA get reply:Reply from ConsumerA
ConsumerB get:Message from ProducerB
ProducerB get reply:Reply from ConsumerB
ConsumerC get:Message from ProducerC
ProducerC get reply:Reply from ConsumerC
?
?
http://wangh8080.blog.163.com/blog/static/197848297201242111134751/
?
深入掌握J(rèn)MS(十一):TemporaryQueue和TemporaryTopic .??
2012-05-21 11:13:04|??分類: 學(xué)習(xí) |??標(biāo)簽: |字號(hào)大中小?訂閱
TemporaryQueue和TemporaryTopic,從字面上就可以看出它們是“臨時(shí)”的目的地。可以通過Session來創(chuàng)建,例如:
TemporaryQueue replyQueue = session.createTemporaryQueue();
雖然它們是由Session來創(chuàng)建的,但是它們的生命周期確實(shí)整個(gè)Connection。如果在一個(gè)Connection上創(chuàng)建了兩個(gè)Session,則一個(gè)Session創(chuàng)建的TemporaryQueue或TemporaryTopic也可以被另一個(gè)Session訪問。那如果這兩個(gè)Session是由不同的Connection創(chuàng)建,則一個(gè)Session創(chuàng)建的TemporaryQueue不可以被另一個(gè)Session訪問。
另外,它們的主要作用就是用來指定回復(fù)目的地, 即作為JMSReplyTo。
在下面的例子中,先創(chuàng)建一個(gè)Connection,然后創(chuàng)建兩個(gè)Session,其中一個(gè)Session創(chuàng)建了一個(gè)TemporaryQueue,另一個(gè)Session在這個(gè)TemporaryQueue上讀取消息。
?
?
?
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class TemporaryQueueTest {
?public static void main(String[] args) throws Exception {
??ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
??Connection connection = factory.createConnection();
??connection.start();
??Queue queue = new ActiveMQQueue("testQueue2");
??final Session session = connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
??// 使用session創(chuàng)建一個(gè)TemporaryQueue。
??TemporaryQueue replyQueue = session.createTemporaryQueue();
??// 接收消息,并回復(fù)到指定的Queue中(即replyQueue)
??MessageConsumer comsumer = session.createConsumer(queue);
??comsumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println("Get Message: " + ((TextMessage) m).getText());
?????MessageProducer producer = session.createProducer(m.getJMSReplyTo());
?????producer.send(session.createTextMessage("ReplyMessage"));
????} catch (JMSException e) {
????}
???}
??});
??// 使用同一個(gè)Connection創(chuàng)建另一個(gè)Session,來讀取replyQueue上的消息。
??Session session2 = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
??MessageConsumer replyComsumer = session2.createConsumer(replyQueue);
??replyComsumer.setMessageListener(new MessageListener() {
???public void onMessage(Message m) {
????try {
?????System.out.println("Get reply: " + ((TextMessage) m).getText());
????} catch (JMSException e) {
????}
???}
??});
??MessageProducer producer = session.createProducer(queue);
??TextMessage message = session.createTextMessage("SimpleMessage");
??message.setJMSReplyTo(replyQueue);
??producer.send(message);
?}
}
運(yùn)行結(jié)果為:
Get Message: SimpleMessage
Get reply: ReplyMessage
如果將:
Session session2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
更改為:
Connection connection2 = factory.createConnection();
Session session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
就會(huì)得到類似于下面的異常:
Exception in thread "main" javax.jms.InvalidDestinationException: Cannot use a Temporary destination from another Connection
?
總結(jié)
以上是生活随笔為你收集整理的深入掌握JMS JMSCorrelationID与Selector的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OpenEjb使用笔记--让Tomcat
- 下一篇: JMS中queue和topic区别