消息中间件和JMS介绍
在一個(gè)公司創(chuàng)立初期,他可能只有幾個(gè)應(yīng)用,系統(tǒng)之間的關(guān)聯(lián)也不是那么大,A系統(tǒng)調(diào)用B系統(tǒng)就直接調(diào)用B提供的API接口;后來這個(gè)公司做大了,他一步步發(fā)展有了幾十個(gè)系統(tǒng),這時(shí)候A系統(tǒng)要調(diào)用B系統(tǒng)的接口,但是B系統(tǒng)前幾天剛改了一下接口A并不知情。所以A發(fā)現(xiàn)調(diào)不通于是給B系統(tǒng)管理員打電話,小王啊,改了接口咋不告訴我呢。我還以為我們系統(tǒng)出錯(cuò)了呢。弄得小王一頓尷尬,我這自己改個(gè)東西還的通知這個(gè)通知那個(gè)的。
1 中間件介紹
我們看到上面的故事中的小王他真的是很累啊。自己修改一個(gè)接口還的給所有調(diào)用接口的系統(tǒng)管理員打電話告知API發(fā)生變化。說到這個(gè)問題啊,還是的說我們系統(tǒng)之間的耦合。對于一個(gè)小公司來說是無所謂,但是對于一個(gè)大公司這種情況簡直是致命的。于是最近幾年這些越來越大的互聯(lián)網(wǎng)公司在這種挑戰(zhàn)下提出了中間件這個(gè)概念:中間件在操作系統(tǒng)軟件,網(wǎng)絡(luò)和數(shù)據(jù)庫之上,應(yīng)用軟件之下,總的作用是為處于自己上層的軟件提供靈活的開發(fā)環(huán)境。因而中間件是指一類軟件,是基于分布式處理的軟件,最突出的特點(diǎn)是其網(wǎng)絡(luò)通信功能。也可認(rèn)為中間件是位于平臺和應(yīng)用之間的通用服務(wù),這些服務(wù)具有標(biāo)準(zhǔn)的程序接口和協(xié)議。針對不同的操作系統(tǒng)和硬件平臺,可以有符合接口和協(xié)議的多種實(shí)現(xiàn)。
1.1 中間件分類
中間件可以分為六類:
1) 終端仿真/屏幕轉(zhuǎn)換
2) 數(shù)據(jù)訪問中間件(UDA)
3) 遠(yuǎn)程過程調(diào)用中間件(RPC)
4) 消息中間件(MOM)
5) 交易中間件(TPM)
6) 對象中間件
然而在實(shí)際應(yīng)用中,一般將中間件分為兩大類:
一類是底層中間件,用于支撐單個(gè)應(yīng)用系統(tǒng)或解決一類問題,包括交易中間件、應(yīng)用服務(wù)器、消息中間件、數(shù)據(jù)訪問中間件等;
另一類是高層中間件,更多的用于系統(tǒng)整合,包括企業(yè)應(yīng)用集成中間件、工作流中間件、門戶中間件等,他們通常會與多個(gè)應(yīng)用系統(tǒng)打交道,在系統(tǒng)中層次較高,并大多基于前一類的底層中間件運(yùn)行。
終端仿真/屏幕轉(zhuǎn)換
此類中間件用于實(shí)現(xiàn)客戶機(jī)圖形用戶接口與已有的字符接口方式的服務(wù)器應(yīng)用程序之間的互操作,應(yīng)用與早期的大型機(jī)系統(tǒng),現(xiàn)在已很少使用。
數(shù)據(jù)訪問中間件
此類中間件是為了建立數(shù)據(jù)應(yīng)用資源互操作的模式,對異構(gòu)環(huán)境下的數(shù)據(jù)庫或文件系統(tǒng)實(shí)現(xiàn)聯(lián)接。
遠(yuǎn)程過程調(diào)用中間件
此類中間件可以使開發(fā)人員在需要時(shí)調(diào)用位于遠(yuǎn)端服務(wù)器上的過程,屏蔽了在調(diào)用過程中的通信細(xì)節(jié)。一個(gè)應(yīng)用程序使用RPC來遠(yuǎn)程執(zhí)行一個(gè)位于不同地址空間里的過程,在效果上看和執(zhí)行本地調(diào)用相同。
交易中間件
此類中間件是專門針對聯(lián)機(jī)交易系統(tǒng)而設(shè)計(jì)的。聯(lián)機(jī)交易系統(tǒng)需要處理大量并發(fā)進(jìn)程,處理并發(fā)涉及到操作系統(tǒng),文件系統(tǒng),編程語言,數(shù)據(jù)通信,數(shù)據(jù)庫系統(tǒng),系統(tǒng)管理,應(yīng)用軟件等。而交易中間件根據(jù)分布式交易處理的標(biāo)準(zhǔn)及參考模型,對資源管理,交易管理和應(yīng)用進(jìn)行了實(shí)現(xiàn),從而使得基于交易中間件開發(fā)應(yīng)用程序更為簡單。交易中間件基本上只適用于聯(lián)機(jī)交易系統(tǒng),是一種較為專用的中間件。
消息中間件
此類中間件是指利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成。通過提供消息傳遞和消息排隊(duì)模型,它可以在分布式環(huán)境下擴(kuò)展進(jìn)程間的通信。
消息中間件可以即支持同步方式,又支持異步方式。異步中間件比同步中間件具有更強(qiáng)的容錯(cuò)性,在系統(tǒng)故障時(shí)可以保證消息的正常傳輸。異步中間件技術(shù)又分為兩類:廣播方式和發(fā)布/訂閱方式。由于發(fā)布/訂閱方式可以指定哪種類型的用戶可以接受哪種類型的消息,更加有針對性,事實(shí)上已成為異步中間件的非正式標(biāo)準(zhǔn)。目前主流的消息中間件產(chǎn)品有IBM的MQSeries,BEA的MessageQ和Sun的JMS等[1]。
對象中間件
傳統(tǒng)的對象技術(shù)通過封裝、繼承及多態(tài)提供了良好的代碼重用功能。但這些對象只存在與一個(gè)程序中,外界并不知道它們的存在,也無法訪問它們。對象中間件提供了一個(gè)標(biāo)準(zhǔn)的構(gòu)建框架,能使不同廠家的軟件通過不同的地址空間,網(wǎng)絡(luò)和操作系統(tǒng)實(shí)現(xiàn)交互訪問。對象中間件的目標(biāo)是為軟件用戶及開發(fā)者提供一種應(yīng)用級的即插即用的互操作性。目前主流的對象中間件有OMG的CORBA,Microsoft 的COM以及IBM的SOM,Sun的RMI等。
中間件的特點(diǎn)
一般來講,中間件具有以下一些特點(diǎn):滿足大量應(yīng)用的需求,運(yùn)行于多種硬件和操作系統(tǒng)平臺,支持分布式計(jì)算,支持標(biāo)準(zhǔn)接口和協(xié)議。開發(fā)人員通過調(diào)用中間件提供的大量API,實(shí)現(xiàn)異構(gòu)環(huán)境的通信,從而屏蔽異構(gòu)系統(tǒng)中復(fù)雜的操作系統(tǒng)和網(wǎng)絡(luò)協(xié)議。
由于標(biāo)準(zhǔn)接口對于可移植性和標(biāo)準(zhǔn)協(xié)議對于互操作性的重要性,中間件已成為許多標(biāo)準(zhǔn)化工作的主要部分。分布式應(yīng)用軟件借助中間件可以在不同的技術(shù)之間共享資源。
總的來說,中間件屏蔽了底層操作系統(tǒng)的復(fù)雜性,使程序開發(fā)人員面對一個(gè)簡單而統(tǒng)一的開發(fā)環(huán)境,減少了程序設(shè)計(jì)的復(fù)雜性,將注意力集中與自己的業(yè)務(wù)上,不必再為程序在不同軟件系統(tǒng)上的移植而重復(fù)工作,從而大大減少了技術(shù)上的負(fù)擔(dān)。
2 消息中間件
面向消息的中間件(MOM),提供了以松散耦合的靈活方式集成應(yīng)用程序的一種機(jī)制。它們提供了基于存儲和轉(zhuǎn)發(fā)的應(yīng)用程序之間的異步數(shù)據(jù)發(fā)送,即應(yīng)用程序彼此不直接通信,而是與作為中介的MOM通信。MOM提供了有保證的消息發(fā)送(至少是在盡可能地做到這一點(diǎn)),應(yīng)用程序開發(fā)人員無需了解遠(yuǎn)程過程調(diào)用(RPC)和網(wǎng)絡(luò)/通信協(xié)議的細(xì)節(jié)。
消息隊(duì)列技術(shù)是分布式應(yīng)用間交換信息的一種技術(shù)。消息隊(duì)列可駐留在內(nèi)存或磁盤上,隊(duì)列存儲消息直到它們被用程序讀走。通過消息隊(duì)列,應(yīng)用程序可獨(dú)立地執(zhí)行–它們不需要知道彼此的位置、或在繼續(xù)執(zhí)行前不需要等待接收程序接收此消息。在分布式計(jì)算環(huán)境中,為了集成分布式應(yīng)用,開發(fā)者需要對異構(gòu)網(wǎng)絡(luò)環(huán)境下的分布式應(yīng)用提供有效的通信手段。為了管理需要共享的信息,對應(yīng)用提供公共的信息交換機(jī)制是重要的。設(shè)計(jì)分布式應(yīng)用的方法主要有:遠(yuǎn)程過程調(diào)用(RPC)–分布式計(jì)算環(huán)境(DCE)的基礎(chǔ)標(biāo)準(zhǔn)成分之一;對象事務(wù)監(jiān)控(OTM)–基于CORBA的面向?qū)ο蠊I(yè)標(biāo)準(zhǔn)與事務(wù)處理(TP)監(jiān)控技術(shù)的組合;消息隊(duì)列(MessageQueue)–構(gòu)造分布式應(yīng)用的松耦合方法。
MOM將消息路由給應(yīng)用程B,這樣消息就可以存在于完全不同的計(jì)算機(jī)上,MOM負(fù)責(zé)處理網(wǎng)絡(luò)通信。如果網(wǎng)絡(luò)連接不可用,MOM會存儲消息,直到連接變得可用時(shí),再將消息轉(zhuǎn)發(fā)給應(yīng)用程序B。
靈活性的另一方面體現(xiàn)在,當(dāng)應(yīng)用程序A發(fā)送其消息時(shí),應(yīng)用程序B甚至可以不處于執(zhí)行狀態(tài)。MOM將保留這個(gè)消息,直到應(yīng)用程序B開始執(zhí)行并試著檢索消息為止。這還防止了應(yīng)用程序A因?yàn)榈却龖?yīng)用程序B檢索消息而出現(xiàn)阻塞。這種異步通信要求應(yīng)用程序的設(shè)計(jì)與現(xiàn)在大多數(shù)應(yīng)用程序不同,不過,對于時(shí)間無關(guān)或并行處理,它可能是一個(gè)極其有用的方法。
2.1 消息中間件的傳遞模式
消息中間件一般有兩種傳遞模式:點(diǎn)對點(diǎn)模式(P2P)和發(fā)布-訂閱模式(Pub/Sub)。
點(diǎn)對點(diǎn)模式
Point-to-Point(P2P)我們很容易理解,即生產(chǎn)者和消費(fèi)者之間的消息往來。?
每個(gè)消息都被發(fā)送到特定的消息隊(duì)列,接收者從隊(duì)列中獲取消息。隊(duì)列保留著消息,直到他們被消費(fèi)或超時(shí)。
P2P的特點(diǎn):
每個(gè)消息只有一個(gè)消費(fèi)者(Consumer)(即一旦被消費(fèi),消息就不再在消息隊(duì)列中);
發(fā)送者和接收者之間在時(shí)間上沒有依賴性,也就是說當(dāng)發(fā)送者發(fā)送了消息之后,不管接收者有沒有正在運(yùn)行,它不會影響到消息被發(fā)送到隊(duì)列;
接收者在成功接收消息之后需向隊(duì)列應(yīng)答成功。
發(fā)布-訂閱模式(Pub/Sub)
我們可以聯(lián)想到賣報(bào)紙的過程:印刷廠把當(dāng)天的報(bào)紙印好然后送到郵遞員手里,郵遞員風(fēng)雨兼程的把報(bào)紙送到每一位訂閱者手里。由此我們可以看到發(fā)布-訂閱模式的一些特點(diǎn):
每個(gè)消息可以有多個(gè)消費(fèi)者;
發(fā)布者和訂閱者之間有時(shí)間上的依賴性。針對某個(gè)主題(Topic)的訂閱者,它必須創(chuàng)建一個(gè)訂閱者之后,才能消費(fèi)發(fā)布者的消息,而且為了消費(fèi)消息,訂閱者必須保持運(yùn)行的狀態(tài);
由上介紹我們可以看出這兩種模式各有千秋,如果你需要點(diǎn)對點(diǎn)的發(fā)送消息那么使用P2P更專注,如果你是群發(fā)消息,顯然pub/sub模式更適合。
3 基于多種協(xié)議的消息傳遞機(jī)制
目前市場上對于網(wǎng)絡(luò)消息傳遞的協(xié)議版本很多,不同的協(xié)議有不同的規(guī)范,我們在使用時(shí)要比對實(shí)現(xiàn)不同協(xié)議的產(chǎn)品。下面我們看一下目前主流的消息傳遞協(xié)議:
3.1 AMQP協(xié)議
AMQP,即Advanced Message Queuing Protocol,高級消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。AMQP協(xié)議是一種二進(jìn)制協(xié)議,提供客戶端應(yīng)用與消息中間件之間異步、安全、高效地交互。
AMQP是一個(gè)應(yīng)用層的異步消息傳遞協(xié)議,為面向消息的中間件而設(shè)計(jì)。其目的是通過協(xié)議使應(yīng)用模塊之間或應(yīng)用程序與中間件等進(jìn)行充分解耦。而在設(shè)計(jì)初期,AMQP的原始用途只是為金融界提供一個(gè)可以彼此協(xié)作的消息協(xié)議。現(xiàn)在已經(jīng)有相當(dāng)一部分遵循AMQP的服務(wù)器和客戶端供使用。其中RabbitMQ是AMQP的一款開源標(biāo)準(zhǔn)實(shí)現(xiàn)。
支持所有消息中間件的功能:消息交換、文件傳輸、流傳輸、遠(yuǎn)程進(jìn)程調(diào)用等。
AMQP的服務(wù)器(Broker)主要由交換器、消息、隊(duì)列組成。Broker的主要功能是消息的路由和緩存。對于需要保障可靠性的消息,RabbitMQ可以將消息、隊(duì)列和交換器的數(shù)據(jù)寫入本地硬盤。而對于響應(yīng)時(shí)間敏感的消息,RabbitMQ可以不配置持久化機(jī)制。
解決的問題:
1)信息的發(fā)送者和接收者如何維持這個(gè)連接,如果一方的連接中斷,這期間的數(shù)據(jù)如何防止丟失?
2)如何降低發(fā)送者和接收者的耦合度?
3)如何讓Priority高的接收者先接到數(shù)據(jù)?
4)如何做到load balance?有效均衡接收者的負(fù)載?
5)如何有效的將數(shù)據(jù)發(fā)送到相關(guān)的接收者?也就是說將接收者subscribe 不同的數(shù)據(jù),如何做有效的filter。
6)如何做到可擴(kuò)展,甚至將這個(gè)通信模塊發(fā)到cluster上?
7)如何保證接收者接收到了完整,正確的數(shù)據(jù)?
AMQP協(xié)議解決了以上的問題,而RabbitMQ實(shí)現(xiàn)了AMQP。
3.2 STOMP協(xié)議
STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,簡單(流)文本定向消息協(xié)議。
它提供了一個(gè)可互操作的連接格式,允許STOMP客戶端與任意STOMP消息代理(Broker)進(jìn)行交互。STOMP協(xié)議由于設(shè)計(jì)簡單,易于開發(fā)客戶端,因此在多種語言和多種平臺上得到廣泛地應(yīng)用。
STOMP協(xié)議的前身是TTMP協(xié)議(一個(gè)簡單的基于文本的協(xié)議),專為消息中間件設(shè)計(jì)。
STOMP是一個(gè)非常簡單和容易實(shí)現(xiàn)的協(xié)議,其設(shè)計(jì)靈感源自于HTTP的簡單性。盡管STOMP協(xié)議在服務(wù)器端的實(shí)現(xiàn)可能有一定的難度,但客戶端的實(shí)現(xiàn)卻很容易。例如,可以使用Telnet登錄到任何的STOMP代理,并與STOMP代理進(jìn)行交互。
STOMP是除AMQP開放消息協(xié)議之外地另外一個(gè)選擇, 實(shí)現(xiàn)了被用在JMS brokers中特定的有線協(xié)議,比如OpenWire。它僅僅是實(shí)現(xiàn)通用消息操作中的一部分,并非想要覆蓋全面的消息API。
STOMP server就好像是一系列的目的地, 消息會被發(fā)送到這里。STOMP協(xié)議把目的地當(dāng)作不透明的字符串,其語法是服務(wù)端具體的實(shí)現(xiàn)。 此外STOMP沒有定義目的地的交付語義是什么,語義的目的地可以從服務(wù)器到服務(wù)器,甚至從目的地到目的地。這使得服務(wù)器有可創(chuàng)造性的語義,去支持STOMP。
STOMP client的用戶代理可以充當(dāng)兩個(gè)角色(可能同時(shí)):
作為生產(chǎn)者,通過SENDframe發(fā)送消息到server
作為消費(fèi)者,發(fā)送SUBSCRIBEframe到目的地并且通過MESSAGEframe從server獲取消息。
STOMP協(xié)議工作于TCP協(xié)議之上,使用了下列命令:
SEND 發(fā)送
SUBSCRIBE 訂閱
UNSUBSCRIBE 退訂
BEGIN 開始
COMMIT 提交
ABORT 取消
ACK 確認(rèn)
DISCONNECT 斷開
目前最流行的STOMP消息代理是Apache ActiveMQ。
3.3 JMS協(xié)議
JMS是Java Message Service的縮寫,即Java消息服務(wù)。
在大型互聯(lián)網(wǎng)中,我們采用消息中間件可以進(jìn)行應(yīng)用之間的解耦以及操作的異步,這是消息中間件兩個(gè)最基礎(chǔ)的特點(diǎn),也正是我們所需要的。在此基礎(chǔ)上,我們著重思考的是消息的順序保證、擴(kuò)展性、可靠性、業(yè)務(wù)操作與消息發(fā)送一致性,以及多集群訂閱者等方面的問題。當(dāng)然,這些我們要思考的東西,JMS都已經(jīng)想到了,先看下JMS能幫開發(fā)者做什么:
1、定義一組消息公用概念和實(shí)用工具
所有Java應(yīng)用程序都可以使用JMS中定義的API去完成消息的創(chuàng)建、接收與發(fā)送,任何實(shí)現(xiàn)了JMS標(biāo)準(zhǔn)的MOM都可以作為消息的中介,完成消息的存儲轉(zhuǎn)發(fā)
2、最大化消息應(yīng)用程序的可移植性
MOM提供了有保證的消息發(fā)送,應(yīng)用程序開發(fā)人員無需了解遠(yuǎn)程過程調(diào)用(RPC)和網(wǎng)絡(luò)/通信協(xié)議的細(xì)節(jié),提供了程序的可移植性
3、最大化降低應(yīng)用程序與應(yīng)用程序之間的耦合度
由于MOM的存在,各個(gè)應(yīng)用程序只關(guān)心和MOM之間如何進(jìn)行消息的接收與發(fā)送,而無須關(guān)注MOM的另一邊,其他程序是如何接收和發(fā)送的
JMS定義了一套通用的接口和相關(guān)語義,提供了諸如持久、驗(yàn)證和事務(wù)的消息服務(wù),它最主要的目的是允許Java應(yīng)用程序訪問現(xiàn)有的消息中間件。JMS規(guī)范沒有指定在消息節(jié)點(diǎn)間所使用的通訊底層協(xié)議,來保證應(yīng)用開發(fā)人員不用與其細(xì)節(jié)打交道,一個(gè)特定的JMS實(shí)現(xiàn)可能提供基于TCP/IP、HTTP、UDP或者其它的協(xié)議。
由于沒有統(tǒng)一的規(guī)范和標(biāo)準(zhǔn),基于消息中間件的應(yīng)用不可移植,不同的消息中間件也不能互操作,這大大阻礙了消息中間件的發(fā)展。 Java Message Service(JMS, Java消息服務(wù))是SUN及其伙伴公司提出的旨在統(tǒng)一各種消息中間件系統(tǒng)接口的規(guī)范。
目前許多廠商采用并實(shí)現(xiàn)了JMS API,現(xiàn)在,JMS產(chǎn)品能夠?yàn)槠髽I(yè)提供一套完整的消息傳遞功能,目前我們看到的比較流行的JMS商業(yè)軟件和開源產(chǎn)品:WebLogic、SonicMQ、ActiveMQ、OpenJMS都是基于JMS規(guī)范的實(shí)現(xiàn)。
4 JMS介紹
在 JMS 之前,每一家 MOM 廠商都用專有 API 為應(yīng)用程序提供對其產(chǎn)品的訪問,通常可用于許多種語言,其中包括 Java 語言。JMS 通過 MOM 產(chǎn)品為 Java 程序提供了一個(gè)發(fā)送和接收消息的標(biāo)準(zhǔn)的、便利的方法。用 JMS 編寫的程序可以在任何實(shí)現(xiàn) JMS 標(biāo)準(zhǔn)的 MOM 上運(yùn)行。
JMS 可移植性的關(guān)鍵在于:JMS API 是由 Sun 作為一組接口而提供的。提供了 JMS 功能的產(chǎn)品是通過提供一個(gè)實(shí)現(xiàn)這些接口的提供者來做到這一點(diǎn)的。開發(fā)人員可以通過定義一組消息和一組交換這些消息的客戶機(jī)應(yīng)用程序建立 JMS 應(yīng)用程序。
JMS 支持兩種消息類型P2P 和Pub/Sub,在JMS消息模型中,根據(jù)點(diǎn)對點(diǎn)模式和發(fā)布/訂閱模式,這些要素由擴(kuò)展出了各自的內(nèi)容:
JMS標(biāo)準(zhǔn)?? ?點(diǎn)對點(diǎn)模式?? ?發(fā)布/訂閱模式
ConnectionFactory?? ?QueueConnectionFactory?? ?TopicConnectionFactory
Connection?? ?QueueConnection?? ?TopicConnection
Destination?? ?Queue?? ?Topic
Session?? ?QueueSession?? ?TopicSession
MessageProducer?? ?QueueSender?? ?TopicPublisher
MessageConsumer?? ?QueueReceiver?? ?TopicSubscriber
JMS為發(fā)開者提供了很多的要素,看一下比較重要的幾個(gè):
要 素?? ?作 用
Destination?? ?表示消息所走通道的目標(biāo)定義,用來定義消息從發(fā)送端發(fā)出后要走的通道,而不是接收方。Destination屬于管理類對象
ConnectionFactory?? ?顧名思義,用于創(chuàng)建連接對象,ConnectionFactory屬于管理類的對象
Connection?? ?連接接口,所負(fù)責(zé)的重要工作時(shí)創(chuàng)建Session
Session?? ?會話接口,這是一個(gè)非常重要的對象,消息發(fā)送者、消息接收者以及消息對象本身,都是通過這個(gè)會話對象創(chuàng)建的
MessageConsumer?? ?消息的消費(fèi)者,也就是訂閱消息并處理消息的對象
MessageProducer?? ?消息的生產(chǎn)者,也就是用來發(fā)送消息的對象
XXXMessage?? ?指各種類型的消息對象,包括ByteMesage、ObjectMessage、StreamMessage和TextMessage這5種
JMS消息模型
JMS 消息由以下幾部分組成:消息頭,屬性,消息體。
消息頭(header):JMS消息頭包含了許多字段,它們是消息發(fā)送后由JMS提供者或消息發(fā)送者產(chǎn)生,用來表示消息、設(shè)置優(yōu)先權(quán)和失效時(shí)間等等,并且為消息確定路由。
屬性(property):由消息發(fā)送者產(chǎn)生,用來添加刪除消息頭以外的附加信息。
消息體(body):由消息發(fā)送者產(chǎn)生,JMS中定義了5種消息體:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。
JMS編程模型
一般來說我們在開發(fā)基于JMS協(xié)議的客戶端由一下幾部構(gòu)成:
1) 用JNDI 得到ConnectionFactory對象;
2) 用JNDI 得到目標(biāo)隊(duì)列或主題對象,即Destination對象;
3) 用ConnectionFactory創(chuàng)建Connection 對象;
4) 用Connection對象創(chuàng)建一個(gè)或多個(gè)JMS Session;
5) 用Session 和Destination 創(chuàng)建MessageProducer和MessageConsumer;
6) 通知Connection 開始傳遞消息。
因?yàn)閖ms需要使用到J2EE服務(wù)器,我們平常用的tomcat屬于J2SE類型的服務(wù)器,常見的J2EE服務(wù)器包括:Geronimo,JBoss 4, GlassFish,WebLogic 。我們在這里使用glassfish 容器。安裝和使用有很多教程,在此就不貼了。首先我們進(jìn)去glassfish的控制臺,設(shè)置一下我們的發(fā)送者和接受者對象:
下面我們用oracle提供的jms接口來寫一個(gè)服務(wù)端,我們先來寫一個(gè)P2P模式的例子:
MySender.java
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.naming.*;
import javax.jms.*;
public class MySender {
? ? public static void main(String[] args) {
? ? ? ? try
? ? ? ? { ? //1)創(chuàng)建一個(gè)connection
? ? ? ? ? ? InitialContext ctx=new InitialContext();
? ? ? ? ? ? QueueConnectionFactory f=(QueueConnectionFactory)ctx.lookup("myQueueConnectionFactory");
? ? ? ? ? ? QueueConnection con=f.createQueueConnection();
? ? ? ? ? ? con.start();
? ? ? ? ? ? //2) 創(chuàng)建一個(gè)會話接口
? ? ? ? ? ? QueueSession ses=con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
? ? ? ? ? ? //3) 獲取會話接口對象
? ? ? ? ? ? Queue t=(Queue)ctx.lookup("myQueue");
? ? ? ? ? ? //4)創(chuàng)建一個(gè)發(fā)送者對象
? ? ? ? ? ? QueueSender sender=ses.createSender(t);
? ? ? ? ? ? //5) 創(chuàng)建一個(gè)消息對象
? ? ? ? ? ? TextMessage msg=ses.createTextMessage();
? ? ? ? ? ? //6) 把我們的消息寫入msg對象中
? ? ? ? ? ? BufferedReader b=new BufferedReader(new InputStreamReader(System.in));
? ? ? ? ? ? while(true)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? System.out.println("Enter Msg, end to terminate:");
? ? ? ? ? ? ? ? String s=b.readLine();
? ? ? ? ? ? ? ? if (s.equals("end"))
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? msg.setText(s);
? ? ? ? ? ? ? ? //7) 發(fā)送消息
? ? ? ? ? ? ? ? sender.send(msg);
? ? ? ? ? ? ? ? System.out.println("Message successfully sent.");
? ? ? ? ? ? }
? ? ? ? ? ? //8) 關(guān)閉連接
? ? ? ? ? ? con.close();
? ? ? ? }catch(Exception e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
MyReceiver.java
import javax.jms.*;
import javax.naming.InitialContext;
public class MyReceiver {
? ? public static void main(String[] args) {
? ? ? ? try{
? ? ? ? ? ? //1) 創(chuàng)建一個(gè)connection
? ? ? ? ? ? InitialContext ctx=new InitialContext();
? ? ? ? ? ? QueueConnectionFactory f=(QueueConnectionFactory)ctx.lookup("myQueueConnectionFactory");
? ? ? ? ? ? QueueConnection con=f.createQueueConnection();
? ? ? ? ? ? con.start();
? ? ? ? ? ? //2) 創(chuàng)建一個(gè)會話接口
? ? ? ? ? ? QueueSession ses=con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
? ? ? ? ? ? //3) 獲取會話接口對象
? ? ? ? ? ? Queue t=(Queue)ctx.lookup("myQueue");
? ? ? ? ? ? //4)創(chuàng)建一個(gè)發(fā)送者對象
? ? ? ? ? ? QueueReceiver receiver=ses.createReceiver(t);
? ? ? ? ? ? //5) 創(chuàng)建一個(gè)消監(jiān)聽對象
? ? ? ? ? ? MyListener listener=new MyListener();
? ? ? ? ? ? //6) 將監(jiān)聽器注冊到receiver,用來監(jiān)聽receiver
? ? ? ? ? ? receiver.setMessageListener(listener);
? ? ? ? ? ? System.out.println("Receiver1 is ready, waiting for messages...");
? ? ? ? ? ? System.out.println("press Ctrl+c to shutdown...");
? ? ? ? ? ? while(true){
? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? }
? ? ? ? }catch(Exception e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
MyListener.java
import javax.jms.*;
public class MyListener implements MessageListener {
? ? public void onMessage(Message m) {
? ? ? ? try{
? ? ? ? ? ? TextMessage msg=(TextMessage)m;
? ? ? ? ? ? System.out.println("following message is received:"+msg.getText());
? ? ? ? }catch(JMSException e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
Pub/Sub模式:
MySender.java
import javax.jms.*;
import javax.naming.InitialContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class MySender {
? ? public static void main(String[] args) {
? ? ? ? try
? ? ? ? { ? //1)創(chuàng)建一個(gè)connection
? ? ? ? ? ? InitialContext ctx=new InitialContext();
? ? ? ? ? ? TopicConnectionFactory f=(TopicConnectionFactory)ctx.lookup("myTopicConnectionFactory");
? ? ? ? ? ? TopicConnection con=f.createTopicConnection();
? ? ? ? ? ? con.start();
? ? ? ? ? ? //2) 創(chuàng)建一個(gè)會話接口
? ? ? ? ? ? TopicSession ses=con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
? ? ? ? ? ? //3) 獲取會話接口對象
? ? ? ? ? ? Topic t=(Topic)ctx.lookup("myTopic");
? ? ? ? ? ? //4)創(chuàng)建一個(gè)發(fā)送者對象
? ? ? ? ? ? TopicPublisher publisher=ses.createPublisher(t);
? ? ? ? ? ? //5) 創(chuàng)建一個(gè)消息對象
? ? ? ? ? ? TextMessage msg=ses.createTextMessage();
? ? ? ? ? ? //6) 把我們的消息寫入msg對象中
? ? ? ? ? ? BufferedReader b=new BufferedReader(new InputStreamReader(System.in));
? ? ? ? ? ? while(true)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? System.out.println("Enter Msg, end to terminate:");
? ? ? ? ? ? ? ? String s=b.readLine();
? ? ? ? ? ? ? ? if (s.equals("end"))
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? msg.setText(s);
? ? ? ? ? ? ? ? //7) 發(fā)送消息
? ? ? ? ? ? ? ? publisher.publish(msg);
? ? ? ? ? ? ? ? System.out.println("Message successfully sent.");
? ? ? ? ? ? }
? ? ? ? ? ? //8) 關(guān)閉連接
? ? ? ? ? ? con.close();
? ? ? ? }catch(Exception e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
MyReceiver.java
import javax.jms.*;
import javax.naming.InitialContext;
public class MyReceiver {
? ? public static void main(String[] args) {
? ? ? ? try{
? ? ? ? ? ? //1) 創(chuàng)建一個(gè)connection
? ? ? ? ? ? InitialContext ctx=new InitialContext();
? ? ? ? ? ? TopicConnectionFactory f=(TopicConnectionFactory)ctx.lookup("myTopicConnectionFactory");
? ? ? ? ? ? TopicConnection con=f.createTopicConnection();
? ? ? ? ? ? //2) 創(chuàng)建一個(gè)會話接口
? ? ? ? ? ? TopicSession ses=con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
? ? ? ? ? ? //3) 獲取會話接口對象
? ? ? ? ? ? Topic t=(Topic)ctx.lookup("myTopic");
? ? ? ? ? ? //4)創(chuàng)建一個(gè)發(fā)送者對象
? ? ? ? ? ? TopicSubscriber receiver=ses.createSubscriber(t);
? ? ? ? ? ? //5) 創(chuàng)建一個(gè)消監(jiān)聽對象
? ? ? ? ? ? MyListener listener=new MyListener();
? ? ? ? ? ? //6) 將監(jiān)聽器注冊到receiver,用來監(jiān)聽receiver
? ? ? ? ? ? receiver.setMessageListener(listener);
? ? ? ? ? ? System.out.println("Receiver1 is ready, waiting for messages...");
? ? ? ? ? ? System.out.println("press Ctrl+c to shutdown...");
? ? ? ? ? ? while(true){
? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? }
? ? ? ? }catch(Exception e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
MyListener.java
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyListener implements MessageListener {
? ? public void onMessage(Message m) {
? ? ? ? try{
? ? ? ? ? ? TextMessage msg=(TextMessage)m;
? ? ? ? ? ? System.out.println("following message is received:"+msg.getText());
? ? ? ? }catch(JMSException e){System.out.println(e);}
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
上面兩個(gè)案例我們運(yùn)行可以看到消息成功的發(fā)送出去了。熟悉了JMS的語法,使用起來還是很簡單。
上面我們介紹到了JMS,JMS是一個(gè)用于提供消息服務(wù)的技術(shù)規(guī)范,它制定了在整個(gè)消息服務(wù)提供過程中的所有數(shù)據(jù)結(jié)構(gòu)和交互流程。JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺中關(guān)于面向消息中間件(MOM)的API。 Java消息服務(wù)是一個(gè)與具體平臺無關(guān)的API,絕大多數(shù)MOM提供商都對JMS提供支持。
下面我們引入另一個(gè)概念:MQ(Message Queue)。
應(yīng)用程序通過寫和檢索出入列隊(duì)的針對應(yīng)用程序的數(shù)據(jù)(消息)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進(jìn)行通信,而不是通過直接調(diào)用彼此來通信,直接調(diào)用通常是用于諸如遠(yuǎn)程過程調(diào)用的技術(shù)。排隊(duì)指的是應(yīng)用程序通過隊(duì)列來通信。隊(duì)列的使用除去了接收和發(fā)送應(yīng)用程序同時(shí)執(zhí)行的要求。
MQ和JMS類似,但不同的是JMS是SUN Java消息中間件服務(wù)的一個(gè)標(biāo)準(zhǔn)和API定義,而MQ則是遵循了AMQP協(xié)議的具體實(shí)現(xiàn)和產(chǎn)品。JMS是一個(gè)用于提供消息服務(wù)的技術(shù)規(guī)范,它制定了在整個(gè)消息服務(wù)提供過程中的所有數(shù)據(jù)結(jié)構(gòu)和交互流程。而MQ則是消息隊(duì)列服務(wù),是面向消息中間件(MOM)的最終實(shí)現(xiàn),是真正的服務(wù)提供者;MQ的實(shí)現(xiàn)可以基于JMS,也可以基于其他規(guī)范或標(biāo)準(zhǔn)。MQ 有很多產(chǎn)品:IBM的,rabbitmq, activemq 等,rabbitmq 只支持點(diǎn)對點(diǎn)的方式。所以沒有完全實(shí)現(xiàn)JMS的標(biāo)準(zhǔn),所以說它不是一個(gè)JMS產(chǎn)品,而rabitmq 和Jobss JMS 它們實(shí)現(xiàn)了JMS的各項(xiàng)標(biāo)準(zhǔn),是開源的JMS產(chǎn)品。目前完全實(shí)現(xiàn)JMS協(xié)議的mq是activemq,所以接下來我們先重點(diǎn)看一下activemq。從activemq入手去探索javaEE的世界。
---------------------?
作者:rickiyang?
來源:CSDN?
原文:https://blog.csdn.net/a953713428/article/details/70770087?
版權(quán)聲明:本文為博主原創(chuàng)文章,轉(zhuǎn)載請附上博文鏈接!
總結(jié)
以上是生活随笔為你收集整理的消息中间件和JMS介绍的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JMS(Java消息服务)入门教程
- 下一篇: 从零开始玩转JMX(一)——简介和Sta