JMS简明学习教程
基礎(chǔ)篇
JMS是應(yīng)用系統(tǒng)或組件之間相互通信的應(yīng)用程序接口,利用它,我們可以輕易實(shí)現(xiàn)在不同JVM之間相互的遠(yuǎn)程通信。要實(shí)現(xiàn)遠(yuǎn)程通信,RPC同樣也能做到,但RPC卻不可避免地增加了不同系統(tǒng)之間的耦合度,JMS能極大地降低不同的應(yīng)用系統(tǒng)之間的耦合。
?
要學(xué)習(xí)JMS,有幾個(gè)概念必須要搞清楚:
l????? Messaging (消息通知、消息通信)
一種應(yīng)用系統(tǒng)或組件之間相互通信的方式。
?
l????? Message (消息)
消息即為消息通信的載體,消息包括Message Headers, Message properties, Message bodies
?
l????? JMS有兩種方式進(jìn)行消息通信:Point-to-Point (P2P) 和 Publish/Subscriber (PUB/SUB)
?
P2P方式是一對(duì)一的,一條消息只有一個(gè)接收者,默認(rèn)情況下是P2P消息是持久的,也就是說(shuō)發(fā)送者(sender)產(chǎn)生的一條消息(message)發(fā)送到消息隊(duì)列(queue)之上后,只有等到消息接收者(receiver)接收到它,才會(huì)從消息隊(duì)列中刪除,沒(méi)有被接收的消息會(huì)一直存在JMS容器里。這種方式有點(diǎn)像郵政通信,信件只有一個(gè)接收者,信件在接收之前,會(huì)一直存放在信箱里。
?
PUB/SUB方式的工作流程,首先subscriber(訂閱者)向JMS容器訂閱(Listen to)自己感興趣的topic(主題),多個(gè)訂閱者可以同時(shí)對(duì)一個(gè)主題進(jìn)行訂閱,消息發(fā)布者發(fā)布一條消息,所有訂閱了該主題的訂閱者都能收到這個(gè)消息。默認(rèn)情況下,pub/sub方式下的消息不是持久的,這意味著,消息一經(jīng)發(fā)出,不管有沒(méi)有人接收,都不會(huì)保存下來(lái),而且訂閱者只能接收到自已訂閱之后發(fā)布者發(fā)出的消息。這種方式有點(diǎn)像訂閱報(bào)刊雜志,一種報(bào)刊可以有多人同時(shí)訂閱,但訂閱者只能收到開(kāi)始訂閱之后的報(bào)社發(fā)行的期刊。
?
l????? JMS(Java Messaging Service)
是Java EE中的一種技術(shù),它定義一套完整的接口,來(lái)實(shí)現(xiàn)不同系統(tǒng)或應(yīng)用之間的消息通信。這意味著:我們針對(duì)JMS接口編寫的應(yīng)用程序(客戶程序),在任何一個(gè)實(shí)現(xiàn)了標(biāo)準(zhǔn)JMS接口的容器下都能運(yùn)行起來(lái),我們的應(yīng)用程序與容器實(shí)現(xiàn)了真正的解藕,這也就是面向接口編程的好處之一吧。這點(diǎn)類似JDBC編程。
?
l????? JMS提供者(JMS Provider)
JMS提供者,也叫JMS服務(wù)器或JMS容器,也就是JMS服務(wù)的提供者,主流的J2EE容器一般都提供JMS服務(wù)(比如JBoss,BEA WebLogic,IBM WebSphere,Oracle Application Server等都支持)
?
l????? 連接工廠(Connection Factories)
連接工廠是用來(lái)創(chuàng)建客戶端到JMS容器之間JMS連接的工廠,連接工廠有兩種: (QueueConnectionFactory和TopicConnectionFactory),分別用來(lái)創(chuàng)建QueueConnection 和 TopicConnection的。
?
Context ctx = new InitialContext();QueueConnectionFactory queueConnectionFactory =???????????????????? (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");TopicConnectionFactory topicConnectionFactory =???????????????????? (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");? l????? 目的地(Destinations)
目的地是消息生產(chǎn)者(producer)消息發(fā)住的目的地,也是消費(fèi)者(consumer)接收消息的來(lái)源地,它有點(diǎn)像信箱,郵遞員把信件投往信箱,收件人從信箱取信件。對(duì)P2P方式來(lái)說(shuō),目的地就是Queue,對(duì)pub/sub方式來(lái)說(shuō),目的地就是Topic。我們要得到這個(gè)目的地的引用,只能通過(guò)JNDI查找(lookup)的方式得到,因?yàn)槟康牡厥亲?cè)在JMS服務(wù)器的(后面的章節(jié)會(huì)講到如何注冊(cè)一個(gè)目的地)
?
Topic myTopic = (Topic) ctx.lookup("MyTopic");Queue myQueue = (Queue) ctx.lookup("MyQueue");
l????? 連接(Connection)
這里說(shuō)的連接是指客戶端與JMS提供者(容器)之間的連接。連接也分兩種:QueueConnection和TopicConnection,分別對(duì)應(yīng)于P2P連接和Pub/Sub連接。
?
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
連接用完之后必須記得關(guān)閉,否則連接資源不會(huì)被釋放掉。關(guān)閉連接的同時(shí)會(huì)自動(dòng)把會(huì)話、產(chǎn)生者、消費(fèi)者都關(guān)閉掉。
?
l????? 會(huì)話(Session)
會(huì)話是用來(lái)創(chuàng)建消息產(chǎn)生者和消息消費(fèi)者的單線程環(huán)境,你可以它來(lái)創(chuàng)建消息生產(chǎn)者、消費(fèi)者、消息,用它來(lái)維持消息監(jiān)聽(tīng)。
?
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);QueueSession queueSession = queueConnection.createQueueSession(true, 0);l????? 消息生產(chǎn)者(Message Producers)
消息生產(chǎn)者也就是消息的產(chǎn)生者或發(fā)送者,在P2P方式下它是QueueSender,在Pub/Sub方式下它是TopicPublisher。它是一個(gè)由session創(chuàng)建的,用來(lái)把把消息發(fā)送到目的地的對(duì)象。
QueueSender queueSender = queueSession.createSender(myQueue);TopicPublisher topicPublisher = topicSession.createPublisher(myTopic);
一旦你創(chuàng)建好生產(chǎn)者,你就可以用它來(lái)發(fā)送消息
queueSender.send(message);topicPublisher.publish(message);
?
l????? 消息消費(fèi)者(Message Consumer)
消息消費(fèi)者也就是消息的接收者或使用者,在P2P方式下這是QueueReceiver,在Pub/Sub方式下它是TopicSubscriber。這是一個(gè)由session來(lái)創(chuàng)建的,用來(lái)接收來(lái)自目的地消息的對(duì)象。JMS容器來(lái)負(fù)責(zé)把消息從目的地投遞到注冊(cè)了該目的地的消息消費(fèi)者。
?
QueueReceiver queueReceiver = queueSession.createReceiver(myQueue);TopicSubscriber topicSubscriber = topicSession.createSubscriber(myTopic);
一旦創(chuàng)建好消息消費(fèi)者,它就是活動(dòng)的,你可以用它來(lái)接收消息,你也可以用close()方法來(lái)使它失效(Inactive)。當(dāng)你調(diào)用Connection的start()方法之前,消費(fèi)者是不會(huì)接收到任何消息的。兩種接收者都有一個(gè)receive方法,這是一個(gè)同步的方法,也就是說(shuō)程序執(zhí)行到這個(gè)方法會(huì)被阻塞,直到收到消息為止。
queueConnection.start();Message m = queueReceiver.receive(); topicConnection.start();Message m = topicSubscriber.receive(1000); // time out after a second
如果我們不想它被阻塞,就需要異步的接收消息,這時(shí)我們得用消息臨聽(tīng)器(Message Listener)了。
?
?
l????? 消息監(jiān)聽(tīng)器(Message Listener)
消息監(jiān)聽(tīng)器是一個(gè)充當(dāng)消息的異步事件處理器的對(duì)象,它實(shí)現(xiàn)了MessageListener接口,這個(gè)接口只有一個(gè)方法onMessage,在這個(gè)方法里,你可以定義當(dāng)接收到消息之后的要做的操作。你可以用setMessageListener方法為消息消費(fèi)者注冊(cè)一個(gè)監(jiān)聽(tīng)器。
?
MessageListener listener = new MessageListener( {????? public void onMessage(Message msg) {????????? //????? }});topicSubscriber.setMessageListener(listener); //注冊(cè)監(jiān)聽(tīng)topicConnection.start();有一點(diǎn)要注意,如果你先調(diào)用Connection的start,然后再調(diào)用setMessageListener,消息很可能接收不到,正確的做法是先注冊(cè)監(jiān)聽(tīng),再啟動(dòng)Connection。
?
注冊(cè)監(jiān)聽(tīng)之后,一旦JMS容器有消費(fèi)投遞過(guò)來(lái),消息消費(fèi)(接收)者就會(huì)自動(dòng)調(diào)用監(jiān)聽(tīng)器的onMessage方法。這個(gè)方法的帶有一個(gè)參數(shù)Message,這就接收到的消息。
?
?
l????? 消息選擇器(Message Selectors)
假如你只需要一個(gè)對(duì)濾器來(lái)過(guò)濾收到的消息,那么你可以使用消息選擇器,它允許消費(fèi)者指定只對(duì)特定的消息感興趣。消息選擇器只能是工作在JMS容器的,而不是我們的應(yīng)用程序上。消息選擇器是一個(gè)包含一個(gè)表達(dá)式的字符串,這個(gè)表達(dá)式的語(yǔ)法類似SQL的條件表達(dá)式,在createReceiver, createSubscriber這些方法里有一個(gè)參數(shù)讓你指定一個(gè)消息選擇器,由這些方法創(chuàng)建的消費(fèi)者就只能收到與消息選擇器匹配的消息了。
?
?
l????? 消息(Messages)
JMS消息包括三個(gè)部分:消息頭(Header),屬性(Properties),消息體(Body)
其中消息頭是必須的,后兩個(gè)是可選的。
1)消息頭里你可以指定JMSMessageID, JMSCorrelationID, JMSReplyTo, JMSType等信息。
2)屬性指定一些消息頭沒(méi)有包括的附加信息,比如可以在屬性里指定消息選擇器。
3)消息體是消息的內(nèi)容,有5種消息類型:TextMessage,MapMessage,BytesMessage,StreamMessage,ObjectMessage=-
?
TextMessage message = queueSession.createTextMessage();message.setText(msg_text);???? // msg_text is a StringqueueSender.send(message); 在消費(fèi)者端,接收到的總是一個(gè)通用的Message對(duì)象,你需要把它轉(zhuǎn)型成特定的類型才能提取出里面的內(nèi)容。
Message m = queueReceiver.receive();if (m instanceof TextMessage) {??? TextMessage message = (TextMessage) m;??? System.out.println("Reading message: " + message.getText());} else {??? // Handle error}
?
?
?
?
?實(shí)戰(zhàn)篇
前面對(duì)JMS概念的作了一個(gè)基本介紹,下面我們看一個(gè)具體的例子程序
?
Pub/sub方式的消息傳遞的例子:
l???????? HelloPublisher.java
?
package com.jms.test;
?
import java.util.Hashtable;
?
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
?
/**
?* pub/sub方式的消息發(fā)送程序
?*/
public class HelloPublisher {
???
??? TopicConnection topicConnection;// JMS連接,屬于Pub/Sub方式的連接
??? TopicSession topicSession; //JMS會(huì)話,屬于Pub/Sub方式的會(huì)話
??? TopicPublisher topicPublisher; //消息發(fā)布者
??? Topic topic; //主題
???
??? public HelloPublisher(String factoryJNDI, String topicJNDI)
?????????? throws JMSException, NamingException {
?????? Hashtable<String, String> env = new Hashtable<String, String>();
?????? //設(shè)置好連接JMS容器的屬性,不同的容器需要的屬性可能不同,需要查閱相關(guān)文檔
?????? env.put(Context.INITIAL_CONTEXT_FACTORY,
????????????? "org.jnp.interfaces.NamingContextFactory");
?????? env.put(Context.PROVIDER_URL, "localhost:1099");
?????? env.put("java.naming.rmi.security.manager", "yes");
?????? env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
?
?????? //創(chuàng)建連接JMS容器的上下文(context)
?????? Context context = new InitialContext(env);
?
?????? //通過(guò)連接工廠的JNDI名查找ConnectionFactory
?????? TopicConnectionFactory topicFactory =
?????????? (TopicConnectionFactory) context.lookup(factoryJNDI);
?
?????? //用連接工廠創(chuàng)建一個(gè)JMS連接
?????? topicConnection = topicFactory.createTopicConnection();
?
?????? //通過(guò)JMS連接創(chuàng)建一個(gè)Session
?????? topicSession = topicConnection.createTopicSession(false,
????????????? Session.AUTO_ACKNOWLEDGE);
?
?????? //通過(guò)上下文查找到一個(gè)主題(topic)
?????? topic = (Topic) context.lookup(topicJNDI);
?
?????? //用session來(lái)創(chuàng)建一個(gè)特定主題的消息發(fā)送者
?????? topicPublisher = topicSession.createPublisher(topic);
??? }
???
?
??? /**
???? * 發(fā)布一條文本消息
???? * @param msg 待發(fā)布的消息
???? * @throws JMSException
???? */
??? public void publish(String msg) throws JMSException {
?????? //用session來(lái)創(chuàng)建一個(gè)文本類型的消息
?????? TextMessage message = topicSession.createTextMessage();
?????? message.setText(msg);//設(shè)置消息內(nèi)容
?????? topicPublisher.publish(topic, message);//消息發(fā)送,發(fā)送到特定主題
??? }
?
??? public void close() throws JMSException {
?????? topicSession.close();//關(guān)閉session
?????? topicConnection.close();//關(guān)閉連接
??? }
?
??? public static void main(String[] args)
?????? throws JMSException, NamingException {
?????? HelloPublisher publisher =
?????????? new HelloPublisher("ConnectionFactory", "topic/testTopic");
?????? try {
?????????? for (int i = 1; i < 11; i++) {
????????????? String msg = "Hello World no. " + i;
????????????? System.out.println("Publishing message: " + msg);
????????????? publisher.publish(msg);
?????????? }
?????????? publisher.close();//session和connection用完之后一定記得關(guān)閉
?????? } catch (Exception ex) {
?????????? ex.printStackTrace();
?????? }
??? }
}
程序在控制臺(tái)輸出:
Publishing message: Hello World no. 1
Publishing message: Hello World no. 2
Publishing message: Hello World no. 3
Publishing message: Hello World no. 4
Publishing message: Hello World no. 5
Publishing message: Hello World no. 6
Publishing message: Hello World no. 7
Publishing message: Hello World no. 8
Publishing message: Hello World no. 9
Publishing message: Hello World no. 10
?
?
l??????? HelloSubscriber.java
?
package com.jms.test;
?
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
?
/**
?* pub/sub方式下的消息接收器。注意,這個(gè)消息接收器可以與上面的消息發(fā)送器可以工作
* 在不同的JVM中,只要保證它們各自能夠連通JMS容器(JMS Provider)
?*
?*/
public class HelloSubscriber implements MessageListener {
??? TopicConnection topicConnection;
??? TopicSession topicSession;
??? TopicSubscriber topicSubscriber;
??? Topic topic;
?
??? public HelloSubscriber(String factoryJNDI, String topicJNDI)
?????????? throws JMSException, NamingException {
Hashtable<String, String> env = new Hashtable<String, String>();
?????? //設(shè)置好連接JMS容器的屬性,不同的容器需要的屬性可能不同,需要查閱相關(guān)文檔
?????? env.put(Context.INITIAL_CONTEXT_FACTORY,
????????????? "org.jnp.interfaces.NamingContextFactory");
?????? env.put(Context.PROVIDER_URL, "localhost:1099");
?????? env.put("java.naming.rmi.security.manager", "yes");
?????? env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
?????? Context context = new InitialContext();
?
?????? TopicConnectionFactory topicFactory =
?????????? (TopicConnectionFactory) context.lookup(factoryJNDI);
?????? //創(chuàng)建連接
?????? topicConnection = topicFactory.createTopicConnection();
?????? topicSession = topicConnection.createTopicSession(false,
????????????? Session.AUTO_ACKNOWLEDGE);//創(chuàng)建session
?????? topic = (Topic) context.lookup(topicJNDI);//查找到主題
?????? //用session創(chuàng)建一個(gè)特定queue的消息接收者
?????? topicSubscriber = topicSession.createSubscriber(topic);
?????? //注冊(cè)監(jiān)聽(tīng),這里設(shè)置的監(jiān)聽(tīng)是自己,因?yàn)楸绢愐呀?jīng)實(shí)現(xiàn)了MessageListener接口,
?????? //一旦queueReceiver接收到了消息,就會(huì)調(diào)用本類的onMessage方法
?????? topicSubscriber.setMessageListener(this);
?????? System.out.println("HelloSubscriber subscribed to topic: "
????????????? + topicJNDI);
?????? topicConnection.start();//啟動(dòng)連接,這時(shí)監(jiān)聽(tīng)器才真正生效
??? }
?
??? public void onMessage(Message msg) {
?????? try {
?????????? if (msg instanceof TextMessage) {
????????????? //把Message 轉(zhuǎn)型成 TextMessage 并提取消息內(nèi)容
????????????? String msgTxt = ((TextMessage) msg).getText();
????????????? System.out.println("HelloSubscriber got message: " +
????????????????? msgTxt);
?????????? }
?????? } catch (JMSException ex) {
?????????? System.err.println("Could not get text message: " + ex);
?????????? ex.printStackTrace();
?????? }
??? }
?
??? public void close() throws JMSException {
?????? topicSession.close();
?????? topicConnection.close();
??? }
?
??? public static void main(String[] args) {
?????? try {
?????????? new HelloSubscriber("TopicConnectionFactory",
????????????? "topic/testTopic");
?????? } catch (Exception ex) {
?????????? ex.printStackTrace();
?????? }
??? }
}
程序在控制臺(tái)輸出:
HelloSubscriber subscribed to topic: topic/testTopic
HelloSubscriber got message: Hello World no. 1
HelloSubscriber got message: Hello World no. 2
HelloSubscriber got message: Hello World no. 3
HelloSubscriber got message: Hello World no. 4
HelloSubscriber got message: Hello World no. 5
HelloSubscriber got message: Hello World no. 6
HelloSubscriber got message: Hello World no. 7
HelloSubscriber got message: Hello World no. 8
HelloSubscriber got message: Hello World no. 9
HelloSubscriber got message: Hello World no. 10
?
?
?
P2P方式下的消息傳遞
l???????? HelloQueue.java
?
package com.jms.test;
?
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.QueueSender;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.jms.Session;
import javax.jms.JMSException;
?
import java.util.Hashtable;
?
public class HelloQueue {
??? QueueConnection queueConnection; //queue方式的JMS連接
??? QueueSession queueSession; //queue會(huì)話
??? QueueSender queueSender; //queue消息發(fā)送者
??? Queue queue; //消息隊(duì)列
?
??? public HelloQueue(String factoryJNDI, String topicJNDI)
??????????? throws JMSException, NamingException {
??????? //連接JMS Provider的環(huán)境參數(shù)
??????? Hashtable<String, String> props = new Hashtable<String, String>();
??????? props.put(Context.INITIAL_CONTEXT_FACTORY,
??????????????? "org.jnp.interfaces.NamingContextFactory");
??????? //JMS provider的主機(jī)和端口
??????? props.put(Context.PROVIDER_URL, "localhost:1099");
??????? props.put("java.naming.rmi.security.manager", "yes");
??????? props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
??????? Context context = new InitialContext(props);
???????
??????? //lookup到連接工廠
??????? QueueConnectionFactory queueFactory =
??????????? (QueueConnectionFactory) context.lookup(factoryJNDI);
??????? queueConnection = queueFactory.createQueueConnection();//創(chuàng)建連接
??????? queueSession = queueConnection.createQueueSession(false,
??????????????? Session.AUTO_ACKNOWLEDGE);//創(chuàng)建會(huì)話
?
??????? queue = (Queue) context.lookup(topicJNDI);//lookup到特定的消息隊(duì)列
?
??????? queueSender = queueSession.createSender(queue);//創(chuàng)建隊(duì)列消息的發(fā)送者
?
??? }
?
??? public void send(String msg) throws JMSException {
??????? TextMessage message = queueSession.createTextMessage();
??????? message.setText(msg);
??????? queueSender.send(queue, message);
??? }
?
??? public void close() throws JMSException {
??????? queueSession.close();
??????? queueConnection.close();
??? }
?
??? public static void main(String[] args) {
??????? try {
??????????? HelloQueue queue = new HelloQueue("ConnectionFactory",
??????????????????? "queue/testQueue");
??????????? for (int i = 11; i < 21; i++) {
??????????????? String msg = "Hello World no. " + i;
??????????????? System.out.println("Hello Queue Publishing message: " + msg);
??????????????? queue.send(msg);
??????????? }
??????????? queue.close();
??????? } catch (Exception ex) {
??????????? System.err.println("An exception occurred " +
"while testing HelloPublisher25: " + ex);
??????????? ex.printStackTrace();
??????? }
??? }
}
?
程序在控制臺(tái)輸出:
?
Hello Queue Publishing message: " Hello World no. 11
Hello Queue Publishing message: " Hello World no. 12
Hello Queue Publishing message: " Hello World no. 13
Hello Queue Publishing message: " Hello World no. 14
Hello Queue Publishing message: " Hello World no. 15
Hello Queue Publishing message: " Hello World no. 16
Hello Queue Publishing message: " Hello World no. 17
Hello Queue Publishing message: " Hello World no. 18
Hello Queue Publishing message: " Hello World no. 19
Hello Queue Publishing message: " Hello World no. 20
?
?
?
l??????? HelloRecvQueue.java
?
package com.jms.test;
?
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
?
public class HelloRecvQueue implements MessageListener {
??? QueueConnection queueConnection;
??? QueueSession queueSession;
??? QueueReceiver queueReceiver;
??? Queue queue;
?
??? public HelloRecvQueue(String factoryJNDI, String topicJNDI)
??????????? throws JMSException, NamingException {
??????? Context context = new InitialContext();
??????? QueueConnectionFactory queueFactory =
??????????? (QueueConnectionFactory) context.lookup(factoryJNDI);
??????? queueConnection = queueFactory.createQueueConnection();
??????? queueSession = queueConnection.createQueueSession(false,
??????????????? Session.AUTO_ACKNOWLEDGE);
??????? queue = (Queue) context.lookup(topicJNDI);
?
??????? queueReceiver = queueSession.createReceiver(queue);
??????? queueReceiver.setMessageListener(this);
??????? System.out.println("HelloReceQueue receiver to queue: " + topicJNDI);
??????? queueConnection.start();
??? }
?
??? public void onMessage(Message m) {
??????? try {
??????????? String msg = ((TextMessage) m).getText();
??????????? System.out.println("HelloReceQueue got message: " + msg);
??????? } catch (JMSException ex) {
??????????? System.err.println("Could not get text message: " + ex);
??????????? ex.printStackTrace();
??????? }
??? }
?
??? public void close() throws JMSException {
??????? queueSession.close();
??????? queueConnection.close();
??? }
?
??? Public ovid main(String[] args) {
??? new HelloRecvQueue();
}
}
?
?
?
程序在控制臺(tái)輸出:
?
HelloReceQueue got message: Hello World no. 11
HelloReceQueue got message: Hello World no. 12
HelloReceQueue got message: Hello World no. 13
HelloReceQueue got message: Hello World no. 14
HelloReceQueue got message: Hello World no. 15
HelloReceQueue got message: Hello World no. 16
HelloReceQueue got message: Hello World no. 17
HelloReceQueue got message: Hello World no. 18
HelloReceQueue got message: Hello World no. 19
HelloReceQueue got message: Hello World no. 20
?
配置篇
下面我們來(lái)看看是JMS是在JBoss下如何配置的,首先JMS需要一個(gè)數(shù)據(jù)庫(kù)來(lái)保存其持久化的消息,幸運(yùn)的是JBoss自帶有一個(gè)開(kāi)源的JAVA數(shù)據(jù)庫(kù)HSQL(www.hsqldb.org)
?
在這里簡(jiǎn)單地介紹一下這個(gè)數(shù)據(jù)庫(kù),它支持標(biāo)準(zhǔn)的SQL語(yǔ)法和JDBC接口,是一個(gè)用純JAVA編寫的數(shù)據(jù)庫(kù),其實(shí)它只有一個(gè)jar文件而已:hsqldb.jar,在%JBOSS_HOME%/server/default/lib目錄下你能找到它。
啟動(dòng)這個(gè)數(shù)據(jù)庫(kù)有三種模式:Server模式、進(jìn)程模式和內(nèi)存模式,在Server模式下,你可以用下面的命令讓它啟動(dòng)起來(lái):
$cd %JBOSS_HOME%/server/default/lib
$ java -cp hsqldb.jar org.hsqldb.Server -database.0 mydb -dbname.0 demoDB
其中mydb是數(shù)據(jù)庫(kù)名,demoDB是數(shù)據(jù)庫(kù)別名,我們用JDBC連它是就用這個(gè)別名,用戶名是sa,密碼默認(rèn)是空,我們下列語(yǔ)句就能創(chuàng)建表、插入數(shù)據(jù)了
create table employee (
? employee_id int,
? employee_name varchar(50),
? age int,
? hiredate date
)
insert into employee values(1, 'linyufa', 33, '2007-12-17')
insert into employee values(2, 'scott', 25, '2008-11-23')
insert into employee values(3, 'larry', 35, '2004-11-23')
?
想進(jìn)一步了解HSQL的知識(shí),網(wǎng)上有很多學(xué)習(xí)資料,好了,回到我們討論的JMS話題,有了這個(gè)數(shù)據(jù)庫(kù),那我們就不必去找其他數(shù)據(jù)庫(kù)了,JMS默認(rèn)是用內(nèi)存模式來(lái)啟動(dòng)它的,所以我們基本上不用去關(guān)心它是如何工作的。
1)?? 在 %JBOSS_HOME%/server/default/deploy/jms目錄下,
打開(kāi)hsqldb-jdbc-state-service.xml文件,
?
<depends optional-attribute-name="ConnectionManager">
??????????? jboss.jca:service= DataSourceBinding, name=DefaultDS
</depends>
DefaultDS這個(gè)名字就是JMS連接數(shù)據(jù)庫(kù)的數(shù)據(jù)源,可以讓其保持默認(rèn)值。
?
2)?? 再在同一目錄打開(kāi)hsqldb-jdbc2-service.xml 文件,
<depends optional-attribute-name="ConnectionManager">
jboss.jca:service=DataSourceBinding,name=DefaultDS
????? </depends>
DefaultDS這個(gè)名字保持和前面一致即可,也可以讓其保持默認(rèn)值。
?
3)?? 在同一目錄打開(kāi)jbossmq-destinations-service.xml文件,找到下面的代碼段:
<mbean code="org.jboss.mq.server.jmx.Topic"
??? name="jboss.mq.destination:service=Topic,name=testTopic">
??? <depends optional-attribute-name="DestinationManager">
?????? jboss.mq:service=DestinationManager
??? </depends>
??? <depends optional-attribute-name="SecurityManager">
?????? jboss.mq:service=SecurityManager
??? </depends>
??? <attribute name="SecurityConf">
?????? <security>
?????? <role name="guest" read="true" write="true"/>
?????? <role name="publisher" read="true" write="true" create="false"/>
?????? <role name="durpublisher" read="true" write="true" create="true"/>
?????? </security>
??? </attribute>
</mbean>
這是定義一個(gè)名叫testTopic的示例,如果你要定義一個(gè)新的topic,只需要復(fù)制這段代碼,改一下name屬性即可。
?
同樣找到下面這段的代碼,這是定義一個(gè)名叫testQueue的示例,如果要定義一個(gè)新的queue,復(fù)制這段代碼,改一下名字即可。
<mbean code="org.jboss.mq.server.jmx.Queue"
??? name="jboss.mq.destination:service=Queue,name=testQueue">
??? <depends optional-attribute-name="DestinationManager">
?????? jboss.mq:service=DestinationManager
??? </depends>
??? <depends optional-attribute-name="SecurityManager">
??????? jboss.mq:service=SecurityManager
??? </depends>
??? <attribute name="MessageCounterHistoryDayLimit">-1</attribute>
??? <attribute name="SecurityConf">
????? <security>
?????? <role name="guest" read="true" write="true"/>
?????? <role name="publisher" read="true" write="true" create="false"/>
?????? <role name="noacc" read="false" write="false" create="false"/>
????? </security>
??? </attribute>
</mbean>
?
?
?
4)?? 啟動(dòng)Jboss后在控制臺(tái)看到如下輸出,即說(shuō)明JMS正常啟動(dòng)了
09:50:28,390 INFO? [A] Bound to JNDI name: queue/A
09:50:28,406 INFO? [B] Bound to JNDI name: queue/B
09:50:28,406 INFO? [C] Bound to JNDI name: queue/C
09:50:28,406 INFO? [D] Bound to JNDI name: queue/D
09:50:28,421 INFO? [ex] Bound to JNDI name: queue/ex
09:50:28,437 INFO? [testTopic] Bound to JNDI name: topic/testTopic
09:50:28,484 INFO? [securedTopic] Bound to JNDI name: topic/securedTopic
09:50:28,484 INFO? [testDurableTopic] Bound to JNDI name: topic/testDurableTopic
09:50:28,500 INFO? [testQueue] Bound to JNDI name: queue/testQueue
?
5)?? 如果是Jboss4.2或以上的版本,在啟動(dòng)Jboss時(shí)必須指定 –b 0.0.0.0參數(shù),否則本機(jī)之外的任何主機(jī)都無(wú)法連接JMS。可以修改run.bat或run.sh文件,也可以在運(yùn)行命令時(shí)附帶上這個(gè)參數(shù),如下 sh run.sh –b 0.0.0.0
?
從上面介紹可以看出,在Jboss下配置JMS是非常簡(jiǎn)單的,僅需要copy一段代碼,改個(gè)名字即可。如果在WebLogic下,你就要依次配置JMS Module, ConnectionFactory, Topic, Queue, Template,不過(guò)好在console都有向?qū)?#xff0c;非常直觀,所以配置起來(lái)也不是什么難事。
?
JMS編程其他注意事項(xiàng)
創(chuàng)建一個(gè)JMS Connection、查找ConnectionFactory和Destination都是需要很大的系統(tǒng)開(kāi)銷的操作,所以我們的應(yīng)用程序應(yīng)避免頻繁地去做這些操作。一般情況下,我們可以把ConnectionFactory,Connection, Topic, Queue定義成類的成員變量,并在類的構(gòu)造函數(shù)里初始化他們,避免在每次接收和發(fā)送JMS消息時(shí)去做這些工作。但是因此也帶了一個(gè)問(wèn)題,就是說(shuō)當(dāng)Connection不可用了(比如JMS Server重啟了),我們的應(yīng)用程序就會(huì)開(kāi)始不工作了,所以我們要有一種機(jī)制去檢測(cè)我們的Connection是否有效,如果已經(jīng)斷掉,應(yīng)該試圖去重新連接,并通知系統(tǒng)管理員。
?
JMS的Connection和JDBC的Connection類似,不再使用后應(yīng)該關(guān)閉,不管是正常退出,還是異常退出,否則別的客戶程序可能就再也取不到連接了。Session也是如此。
?
因?yàn)镴MS工作模式是異步的,我們要意識(shí)到調(diào)用Connection.start()這個(gè)方法,系統(tǒng)已經(jīng)啟動(dòng)了一個(gè)新的線程在工作,也就是說(shuō)退出了這行語(yǔ)句所在的方法之后,這個(gè)線程還在工作,它會(huì)不斷地去偵聽(tīng)有沒(méi)有新的JMS消息,直到這個(gè)Connection被關(guān)閉或不可用。
?
本文來(lái)自CSDN博客,轉(zhuǎn)載請(qǐng)標(biāo)明出處:http://blog.csdn.net/Linyufa/archive/2009/07/24/4375670.aspx
總結(jié)
- 上一篇: 架构师之道:面向组件的Web架构设计
- 下一篇: 牛B人才简历中的一段