ActiveMQ简述
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/activemq-quick-start/
##概述
ActiveMQ是Apache所提供的一個(gè)開(kāi)源的消息系統(tǒng),完全采用Java來(lái)實(shí)現(xiàn),因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務(wù))規(guī)范。JMS是一組Java應(yīng)用程序接口,它提供消息的創(chuàng)建、發(fā)送、讀取等一系列服務(wù)。JMS提供了一組公共應(yīng)用程序接口和響應(yīng)的語(yǔ)法,類似于Java數(shù)據(jù)庫(kù)的統(tǒng)一訪問(wèn)接口JDBC,它是一種與廠商無(wú)關(guān)的API,使得Java程序能夠與不同廠商的消息組件很好地進(jìn)行通信。
JMS支持兩種消息發(fā)送和接收模型。一種稱為P2P(Ponit to Point)模型,即采用點(diǎn)對(duì)點(diǎn)的方式發(fā)送消息。P2P模型是基于隊(duì)列的,消息生產(chǎn)者發(fā)送消息到隊(duì)列,消息消費(fèi)者從隊(duì)列中接收消息,隊(duì)列的存在使得消息的異步傳輸稱為可能,P2P模型在點(diǎn)對(duì)點(diǎn)的情況下進(jìn)行消息傳遞時(shí)采用。
另一種稱為Pub/Sub(Publish/Subscribe,即發(fā)布-訂閱)模型,發(fā)布-訂閱模型定義了如何向一個(gè)內(nèi)容節(jié)點(diǎn)發(fā)布和訂閱消息,這個(gè)內(nèi)容節(jié)點(diǎn)稱為topic(主題)。主題可以認(rèn)為是消息傳遞的中介,消息發(fā)布這將消息發(fā)布到某個(gè)主題,而消息訂閱者則從主題訂閱消息。主題使得消息的訂閱者與消息的發(fā)布者互相保持獨(dú)立,不需要進(jìn)行接觸即可保證消息的傳遞,發(fā)布-訂閱模型在消息的一對(duì)多廣播時(shí)采用。
##ActiveMQ的安裝
下載最新的安裝包apache-activemq-5.13.2-bin.tar.gz(此包linux下的,案例也是針對(duì)linux系統(tǒng)進(jìn)行闡述,當(dāng)然ActiveMQ也有win版的,這里就不贅述了),可以去官網(wǎng)下載,也可以在下方留言區(qū)留下你的郵箱,博主會(huì)發(fā)給你的~
下載之后解壓: tar -zvxf apache-activemq-5.13.2-bin.tar.gz
ActiveMQ目錄內(nèi)容有:
- bin目錄包含ActiveMQ的啟動(dòng)腳本
- conf目錄包含ActiveMQ的所有配置文件
- data目錄包含日志文件和持久性消息數(shù)據(jù)
- example: ActiveMQ的示例
- lib: ActiveMQ運(yùn)行所需要的lib
- webapps: ActiveMQ的web控制臺(tái)和一些相關(guān)的demo
運(yùn)行命令:activemq start(在activemq/bin下運(yùn)行)
INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env' INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/users/shr/apache-activemq-5.13.2//data/activemq.pid' (pid '986')查看activemq是否運(yùn)行命令:ps -aux | grep activemq
shr 986 1.2 9.7 1281720 201936 pts/5 Sl 19:43 0:17 /users/shr/util/JavaDir/jdk/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/users/shr/apache-activemq-5.13.2//tmp -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data -jar /users/shr/apache-activemq-5.13.2//bin/activemq.jar start shr 1501 0.0 0.0 5176 724 pts/5 S+ 20:06 0:00 grep activemq關(guān)閉命令: activemq stop
INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env' INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java' INFO: Waiting at least 30 seconds for regular process termination of pid '986' : Java Runtime: Oracle Corporation 1.7.0_79 /users/shr/util/JavaDir/jdk1.7.0_79/jreHeap sizes: current=63232k free=62218k max=932096kJVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data Extensions classpath:[/users/shr/apache-activemq-5.13.2/lib,/users/shr/apache-activemq-5.13.2/lib/camel,/users/shr/apache-activemq-5.13.2/lib/optional,/users/shr/apache-activemq-5.13.2/lib/web,/users/shr/apache-activemq-5.13.2/lib/extra] ACTIVEMQ_HOME: /users/shr/apache-activemq-5.13.2 ACTIVEMQ_BASE: /users/shr/apache-activemq-5.13.2 ACTIVEMQ_CONF: /users/shr/apache-activemq-5.13.2/conf ACTIVEMQ_DATA: /users/shr/apache-activemq-5.13.2/data Connecting to pid: 986 ..Stopping broker: localhost .. TERMINATEDActiveMQ的默認(rèn)服務(wù)端口為61616,這個(gè)可以在conf/activemq.xml配置文件中修改:
<transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>##案例
在下載的apache-activemq-5.13.2-bin.tar.gz包中解壓有一個(gè)jar包:activemq-all-5.13.2.jar,引入這個(gè)jar到你的項(xiàng)目中即可開(kāi)始編寫案例代碼。
博主的activemq服務(wù)器地址為10.10.195.187,這個(gè)在下面代碼中會(huì)有體現(xiàn)。
按照J(rèn)MS的規(guī)范,我們首先需要獲得一個(gè)JMS connection factory.,通過(guò)這個(gè)connection factory來(lái)創(chuàng)建connection.在這個(gè)基礎(chǔ)之上我們?cè)賱?chuàng)建session, destination, producer和consumer。因此主要的幾個(gè)步驟如下:
下面來(lái)看代碼舉例(P2P式)。
通過(guò)Java實(shí)現(xiàn)的基于ActiveMQ的請(qǐng)求提交:
創(chuàng)建Session時(shí)有兩個(gè)非常重要的參數(shù),第一個(gè)boolean類型的參數(shù)用來(lái)表示是否采用事務(wù)消息。如果是事務(wù)消息,對(duì)于的參數(shù)設(shè)置為true,此時(shí)消息的提交自動(dòng)有comit處理,消息的回滾則自動(dòng)由rollback處理。加入消息不是事務(wù)的,則對(duì)應(yīng)的該參數(shù)設(shè)置為false,此時(shí)分為三種情況:
- Session.AUTO_ACKNOWLEDGE表示Session會(huì)自動(dòng)確認(rèn)所接收到的消息。
- Session.CLIENT_ACKNOWLEDGE表示由客戶端程序通過(guò)調(diào)用消息的確認(rèn)方法來(lái)確認(rèn)所接收到的消息。
- Session.DUPS_OK_ACKNOWLEDGE使得Session將“懶惰”地確認(rèn)消息,即不會(huì)立即確認(rèn)消息,這樣有可能導(dǎo)致消息重復(fù)投遞。
提供Java實(shí)現(xiàn)的基于ActiveMQ的請(qǐng)求處理:
package com.zzh.activemq;import java.io.Serializable; import java.util.HashMap; import java.util.Map;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class RequestProcessor {public void requestHandler(HashMap<Serializable,Serializable> requestParam) throws Exception{System.out.println("requestHandler....."+requestParam.toString());for(Map.Entry<Serializable, Serializable> entry : requestParam.entrySet()){System.out.println(entry.getKey()+":"+entry.getValue());}}public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.10.195.187:61616");Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue("RequestQueue");//消息消費(fèi)(接收)者M(jìn)essageConsumer consumer = session.createConsumer(destination);RequestProcessor processor = new RequestProcessor();while(true){ObjectMessage message = (ObjectMessage) consumer.receive(1000);if(null != message){System.out.println(message);HashMap<Serializable,Serializable> requestParam = (HashMap<Serializable,Serializable>) message.getObject();processor.requestHandler(requestParam);}else{break;}}} }輸出結(jié)果:
ActiveMQObjectMessage {commandId = 6, responseRequired = false, messageId = ID:hidden-PC-58748-1460550507055-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-58748-1460550507055-1:1:1:1, destination = queue://RequestQueue, transactionId = TX:ID:hidden-PC-58748-1460550507055-1:1:1, expiration = 0, timestamp = 1460550507333, arrival = 0, brokerInTime = 1460550505969, brokerOutTime = 1460550509143, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@74a456bb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} requestHandler.....{朱小廝=zzh} 朱小廝:zzh可以通過(guò)頁(yè)面查看隊(duì)列的使用情況,在瀏覽器中輸入http://10.10.195.187:8161/admin/queues.jsp,用戶名和密碼都是:admin,看到以下頁(yè)面:
這個(gè)是在jetty服務(wù)器下跑的,可以修改conf/jetty.xml來(lái)修改相關(guān)jetty配置。
上面的例子是關(guān)于P2P模式的,不過(guò)有個(gè)不妥之處,就是沒(méi)有資源的釋放。下面舉一個(gè)Pub/Sub模式的。
通過(guò)JMS創(chuàng)建ActiveMQ的topic,并給topic發(fā)送消息:
消息發(fā)送到對(duì)應(yīng)的topic后,需要將listener注冊(cè)到需要訂閱的topic上,以便能夠接收該topic的消息:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class TopicReceive {private MessageConsumer consumer;private Session session;public void init() throws Exception{ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.10.195.187:61616");Connection connection = connectionFactory.createConnection();connection.start();session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("MessageTopic");consumer = session.createConsumer(topic);consumer.setMessageListener(new MessageListener(){@Overridepublic void onMessage(Message message){TextMessage tm = (TextMessage) message;System.out.println(tm);try{System.out.println(tm.getText());}catch (JMSException e){e.printStackTrace();}}});}public static void main(String[] args) throws Exception{TopicReceive receive = new TopicReceive();receive.init();} }輸出結(jié)果:
ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:hidden-PC-50073-1460597487065-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-50073-1460597487065-1:1:1:1, destination = topic://MessageTopic, transactionId = null, expiration = 0, timestamp = 1460597487308, arrival = 0, brokerInTime = 1460597487297, brokerOutTime = 1460597487298, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2e4d3abf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = I'm first} I'm first參考文獻(xiàn)
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/activemq-quick-start/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
超強(qiáng)干貨來(lái)襲 云風(fēng)專訪:近40年碼齡,通宵達(dá)旦的技術(shù)人生
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ简述的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Redis内存使用优化与存储
- 下一篇: Sping+ActiveMQ整合