使用JMS实现请求/应答程序
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
JMS是用來為發(fā)送者和接收者解耦的;
消息通過一個(gè)進(jìn)程發(fā)送給代理,然后代理在另外一個(gè)進(jìn)程異步的接收消息,一種可以利用JMS來實(shí)現(xiàn)的系統(tǒng)架構(gòu)被稱為請(qǐng)求/應(yīng)答。
概括的說:一個(gè)請(qǐng)求/應(yīng)答場(chǎng)景包括一個(gè)發(fā)送消息(請(qǐng)求)并期望接收消息返回值(應(yīng)答)的應(yīng)用程序。通常,這樣的系統(tǒng)被設(shè)計(jì)成CS架構(gòu),服務(wù)端和客戶端通過網(wǎng)絡(luò)傳輸協(xié)議(TCP,UDP等等)同步的進(jìn)行通信。這種架構(gòu)方式在可擴(kuò)展方面具有明顯的限制,很難獲得長(zhǎng)遠(yuǎn)發(fā)展。消息系統(tǒng)正是為此而生,通過基于消息的請(qǐng)求/應(yīng)答設(shè)計(jì)模式能夠設(shè)計(jì)出易于擴(kuò)展的系統(tǒng)主要以異步處理方式實(shí)現(xiàn)。
請(qǐng)求/應(yīng)答系統(tǒng):注意,客戶端包含消息生產(chǎn)者(producer)和消息消費(fèi)者(consumer),并且工作者(worker)也包含消息生產(chǎn)者(producer)和消息消費(fèi)者(consumer)。后面將解釋客戶端和工作者(worker)。
首先,消息生產(chǎn)者創(chuàng)建一個(gè)以JMS消息格式封裝的請(qǐng)求并在消息中設(shè)置一些重要的屬性,包括correlation ID(通過消息的JMSCorrelationID屬性設(shè)置)和reply destination(響應(yīng)發(fā)送目的地,通過JMSReplyTo屬性設(shè)置)。correlation ID屬性非常重要,因?yàn)樵谡?qǐng)求數(shù)量非常多時(shí)需要使用這個(gè)屬性來關(guān)聯(lián)請(qǐng)求和應(yīng)答。屬性指定應(yīng)答發(fā)往的目的地(通常是一個(gè)臨時(shí)的JMS目的地,因?yàn)閞eply destination比較消耗資源)。接下來,客戶端配置,一個(gè)消息消費(fèi)者監(jiān)聽響應(yīng)消息目的地(reply destination)。
其次,一個(gè)工作者(woker)接收到請(qǐng)求,并處理請(qǐng)求,然后發(fā)送一個(gè)響應(yīng)消息到請(qǐng)求消息的JMSReplyTo屬性指定的目的中。響應(yīng)消息必須用原始請(qǐng)求消息correlation ID的屬性值來設(shè)置JMSCorrelationID屬性,當(dāng)客戶端收到響應(yīng)消息后,可以通過correlation ID關(guān)聯(lián)到初始的請(qǐng)求。
這種結(jié)構(gòu)如何實(shí)現(xiàn)高可擴(kuò)展性,想象一個(gè)場(chǎng)景:單一的工作者無法處理大量并發(fā)的請(qǐng)求負(fù)載時(shí)怎么辦?當(dāng)然沒問題:可以添加工作者來平衡負(fù)載。這些工作者甚至分布到自不同的主機(jī),這也是這種可擴(kuò)展性設(shè)計(jì)中最重要的部分。因?yàn)楣ぷ髡卟⒉皇窃跔?zhēng)奪相同主機(jī)上的資源,所以唯一的限制是代理中消息的最大吞吐量,它比使用普通的客戶端服務(wù)器架構(gòu)能達(dá)到的最大吞吐量要大得多。并且,ActiveMQ可以進(jìn)行水平和垂直擴(kuò)展。
下面讓我們看看請(qǐng)求/應(yīng)答程序的基本實(shí)現(xiàn).
實(shí)現(xiàn)服務(wù)和工作者(worker)
????首先,需要關(guān)注的是系統(tǒng)中使用的消息代理。先要啟動(dòng)代理,以便兩邊程序都啟動(dòng)時(shí)可以連接到代理。為方便說明本例中使用一個(gè)嵌入式代理。其次,需要啟動(dòng)系統(tǒng)中的工作者(worker)。工作者由消息監(jiān)聽器組成,用來接收處理消息和發(fā)送消息響應(yīng)。
在請(qǐng)求/響應(yīng)實(shí)例創(chuàng)建中一個(gè)代理,消費(fèi)者以及生產(chǎn)者
public void start() throws Exception {createBroker();setupConsumer(); } private void createBroker() throws Exception {broker = new BrokerService();broker.setPersistent(false);broker.setUseJmx(false);broker.addConnector(brokerUrl);broker.start(); } private void setupConsumer() throws JMSException {ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);Connection connection;connection = connectionFactory.createConnection();connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination adminQueue = session.createQueue(requestQueue);producer = session.createProducer(null);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);consumer = session.createConsumer(adminQueue);consumer.setMessageListener(this); } public void stop() throws Exception {producer.close();consumer.close();session.close();broker.stop(); }從代碼中可以看到,start()方法調(diào)用一個(gè)方法創(chuàng)建并啟動(dòng)一個(gè)嵌入式代理,另外一個(gè)方法用于啟動(dòng)工作者. createBroker()方法使用BrokerService類來創(chuàng)建一個(gè)嵌入式代理.setupConsumer()方法通過創(chuàng)建 JMS所需的所有對(duì)象來發(fā)送和接收消息,這些JMS對(duì)象包括:一個(gè)連接,一個(gè)session,一個(gè)消息目的地,一個(gè)消息消費(fèi)者和一個(gè)生產(chǎn)者。 創(chuàng)建消息生產(chǎn)者的時(shí)候沒有設(shè)置默認(rèn)的消息目的地,因?yàn)樵撋a(chǎn)者會(huì)將消息發(fā)送到每個(gè)消息的 JMSReplyTo屬性所指定的目的地中。下面再詳細(xì)看下請(qǐng)求/響應(yīng)中的監(jiān)聽者,看看它是如何處理每個(gè)請(qǐng)求的:
public void onMessage(Message message) {try{TextMessage response = this.session.createTextMessage();if (message instanceof TextMessage) {TextMessage txtMsg = (TextMessage) message;String messageText = txtMsg.getText();response.setText(handleRequest(messageText));}response.setJMSCorrelationID(message.getJMSCorrelationID());producer.send(message.getJMSReplyTo(), response);} catch (JMSException e) {e.printStackTrace();} } public String handleRequest(String messageText) {return "Response to '" + messageText + "'"; }消息監(jiān)聽器創(chuàng)建一個(gè)新消息,并設(shè)置合適的correlation ID,然后將消息發(fā)送到響應(yīng)消息隊(duì)列。很簡(jiǎn)單但是很重要,盡管在這個(gè)消息監(jiān)聽器的實(shí)現(xiàn)中沒做什么驚天動(dòng)地的事情,但是它展示了工作者完成器任務(wù)的必要的基本步驟。根據(jù)需求,可以在監(jiān)聽器中添加其他任意額外的操作或者數(shù)據(jù)庫訪問操作。
啟動(dòng)服務(wù)很簡(jiǎn)單:創(chuàng)建一個(gè)server實(shí)例并調(diào)用start()方法。main方法容納了server的的所有功能,如下面的代碼清單所示:
public static void main(String[] args) throws Exception {Server server = new Server();server.start();System.out.println();System.out.println("Press any key to stop the server");System.out.println();System.in.read();server.stop(); }一旦server啟動(dòng)完成,worker就正常運(yùn)行了,這樣所有準(zhǔn)備接收客戶端請(qǐng)求的工作已經(jīng)就緒。
實(shí)現(xiàn)客戶端:客戶端要做到工作是初始化發(fā)送到代理的請(qǐng)求。這是整個(gè)請(qǐng)求/應(yīng)答過程的起點(diǎn),并且通常在一個(gè)業(yè)務(wù)邏輯處理過程中觸發(fā)。這個(gè)過程可能是接受訂單,履行訂單,整合各類業(yè)務(wù)系統(tǒng),財(cái)務(wù)狀況中的買入賣出等,不管是什么情況,請(qǐng)求/響應(yīng)過程從發(fā)送一個(gè)消息開始。發(fā)送一個(gè)消息到代理需要標(biāo)準(zhǔn)的連接(connection),session,消息目的地(destination)以及消息生產(chǎn)者(producer),它們都是在client的start()方法中創(chuàng)建的。下面的的代碼清單中提供了完整的
示例:
啟動(dòng)和停止響應(yīng)/應(yīng)答系統(tǒng)客戶端的方法
public void start() throws JMSException {ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);connection = connectionFactory.createConnection();connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination adminQueue = session.createQueue(requestQueue);producer = session.createProducer(adminQueue);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);tempDest = session.createTemporaryQueue();consumer = session.createConsumer(tempDest);consumer.setMessageListener(this); }public void stop() throws JMSException {producer.close();consumer.close();session.close();connection.close(); }消息生產(chǎn)者發(fā)送消息到請(qǐng)求隊(duì)列中,然后消息消費(fèi)者監(jiān)聽新創(chuàng)建的臨時(shí)隊(duì)列。下面的代碼中展示了實(shí)現(xiàn)客戶端的真正邏輯:
public void request(String request) throws JMSException {System.out.println("Requesting: " + request);TextMessage txtMessage = session.createTextMessage();txtMessage.setText(request);txtMessage.setJMSReplyTo(tempDest);String correlationId = UUID.randomUUID().toString();txtMessage.setJMSCorrelationID(correlationId);this.producer.send(txtMessage); } public void onMessage(Message message) {try{System.out.println("Received response for: " + ((TextMessage) message).getText());} catch(JMSException e) {e.printStackTrace();} }所示的request()方法使用請(qǐng)求內(nèi)容創(chuàng)建一個(gè)消息并設(shè)置JMSReplyTo屬性值,接著發(fā)送這個(gè)消息到臨時(shí)隊(duì)列,最后設(shè)置correlation ID 屬性值。上述3個(gè)步驟很重要.在這個(gè)例子中,是使用一個(gè)隨機(jī)的UUID值來設(shè)置correlation ID的,也還可以使用其他任何ID生成器來生成這個(gè)ID。
接下就可以發(fā)送一個(gè)請(qǐng)求了,啟動(dòng)客戶端也可以像啟動(dòng)sever一樣,簡(jiǎn)單的使用一個(gè)main方法即可,下面是代碼清單:
啟動(dòng)請(qǐng)求/應(yīng)答系統(tǒng)客戶端
public static void main(String[] args) throws Exception {Client client = new Client();client.start();int i = 0;while (i++ < 10) {client.request("REQUEST-" + i);}Thread.sleep(3000); //wait for repliesclient.stop(); }?
????如前文所述,這個(gè)是一個(gè)簡(jiǎn)單的請(qǐng)求/應(yīng)答系統(tǒng)的實(shí)現(xiàn)。因此,啟動(dòng)客戶端以后,會(huì)發(fā)送10個(gè)請(qǐng)求到代理。運(yùn)行這個(gè)實(shí)例程序需要兩個(gè)終端:一個(gè)用于運(yùn)行server,另一個(gè)用于client,必須先運(yùn)行server。sever通過Server類來實(shí)現(xiàn),client通過Client類實(shí)現(xiàn)。因?yàn)檫@兩個(gè)類都是通過main方法初始化的,所以運(yùn)行它們很容易。注意:到當(dāng)client啟動(dòng)后,送了10個(gè)請(qǐng)求用于激活請(qǐng)求/響應(yīng)進(jìn)程,然后收到了來自worker的響應(yīng)。盡管這個(gè)例子很簡(jiǎn)單,但是日后必將是你在其他業(yè)務(wù)中實(shí)現(xiàn)請(qǐng)求/響應(yīng)系統(tǒng)的參考。
????使用請(qǐng)求/應(yīng)答模式,代理將每秒鐘收到的來自無數(shù)的客戶端的成千上萬個(gè)請(qǐng)求全部分發(fā)到不同的主機(jī)中處理。?在生產(chǎn)系統(tǒng)中,會(huì)使用更多的代理實(shí)例用于備份,失效轉(zhuǎn)移以及負(fù)載均衡。這些代理也會(huì)被分布于很多的主機(jī)上。處理如此多請(qǐng)求的唯一方法是使用多工作者(worker)。因?yàn)橄l(fā)送者發(fā)送消息的速度可能比消息消費(fèi)者接收并處理消息的速度快的多。所以就需要大量的工作者(worker),這些工作者同樣也分布于大量的主機(jī)上。
????使用多工作者的好處是任何的工作者都可以根據(jù)需要進(jìn)行啟用或者停用,而整個(gè)系統(tǒng)不會(huì)收到影響。消息生產(chǎn)者和工作者會(huì)正常處理消息,即使她們當(dāng)中的一些已經(jīng)崩潰了,也不會(huì)影響系統(tǒng)運(yùn)行。這正是那些大型系統(tǒng)可以處理海量負(fù)載的原因--使用前文介紹過的基于請(qǐng)求/應(yīng)答模式的異步消息系統(tǒng).
????JMS的API可以說是繁瑣的,因?yàn)樗箝_發(fā)者書寫大量的初始化代碼用于初始化必要的JMS對(duì)象,包括connection, session, producer, consumer等等。使用Spring框架通過提供可靠的API來幫助開發(fā)者移除(類似于JMS對(duì)象初始化)的那些固定的代碼,以便簡(jiǎn)化整個(gè)配置過程。這正式使用Spring框架帶來的好處。
轉(zhuǎn)載于:https://my.oschina.net/mclimber/blog/1510875
總結(jié)
以上是生活随笔為你收集整理的使用JMS实现请求/应答程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是OOA/OOD
- 下一篇: 求一颗二叉树中两个节点的最低公共父节点