Java消息队列--ActiveMq 初体验
生活随笔
收集整理的這篇文章主要介紹了
Java消息队列--ActiveMq 初体验
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1、下載安裝ActiveMQActiveMQ官網(wǎng)下載地址:http://activemq.apache.org/download.htmlActiveMQ 提供了Windows 和Linux、Unix 等幾個版本,樓主這里選擇了Linux 版本下進行開發(fā)。從它的目錄來說,還是很簡單的:
1.bin存放的是腳本文件
2.conf存放的是基本配置文件
3.data存放的是日志文件
4.docs存放的是說明文檔
5.examples存放的是簡單的實例
6.lib存放的是activemq所需jar包
7.webapps用于存放項目的目錄2、啟動ActiveMQ
進入到ActiveMQ 安裝目錄的Bin 目錄,linux 下輸入 ./activemq start 啟動activeMQ 服務。
輸入命令之后,會提示我們創(chuàng)建了一個進程IP 號,這時候說明服務已經(jīng)成功啟動了。
ActiveMQ默認啟動時,啟動了內(nèi)置的jetty服務器,提供一個用于監(jiān)控ActiveMQ的admin應用。 admin:http://127.0.0.1:8161/admin/
我們在瀏覽器打開鏈接之后輸入賬號密碼(這里和tomcat 服務器類似)
默認賬號:admin
密碼:admin到這里為止,ActiveMQ 服務端就啟動完畢了。
ActiveMQ 在linux 下的終止命令是 ./activemq stop3、創(chuàng)建一個ActiveMQ工程
上述在官網(wǎng)下載ActiveMq 的時候,我們可以在目錄下看到一個jar包:
這個jar 包就是我們需要在項目中進行開發(fā)中使用到的相關依賴。3.1 創(chuàng)建生產(chǎn)者
public class Producter {//ActiveMq 的默認用戶名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//ActiveMq 的默認登錄密碼private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//ActiveMQ 的鏈接地址private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;AtomicInteger count = new AtomicInteger(0);//鏈接工廠ConnectionFactory connectionFactory;//鏈接對象Connection connection;//事務管理Session session;ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();public void init(){try {//創(chuàng)建一個鏈接工廠connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);//從工廠中創(chuàng)建一個鏈接connection = connectionFactory.createConnection();//開啟鏈接connection.start();//創(chuàng)建一個事務(這里通過參數(shù)可以設置事務的級別)session = connection.createSession(true,Session.SESSION_TRANSACTED);} catch (JMSException e) {e.printStackTrace();}}public void sendMessage(String disname){try {//創(chuàng)建一個消息隊列Queue queue = session.createQueue(disname);//消息生產(chǎn)者MessageProducer messageProducer = null;if(threadLocal.get()!=null){messageProducer = threadLocal.get();}else{messageProducer = session.createProducer(queue);threadLocal.set(messageProducer);}while(true){Thread.sleep(1000);int num = count.getAndIncrement();//創(chuàng)建一條消息TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:我是大帥哥,我現(xiàn)在正在生產(chǎn)東西!,count:"+num);System.out.println(Thread.currentThread().getName()+"productor:我是大帥哥,我現(xiàn)在正在生產(chǎn)東西!,count:"+num);//發(fā)送消息messageProducer.send(msg);//提交事務session.commit();}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
}public class Comsumer {private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;ConnectionFactory connectionFactory;Connection connection;Session session;ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();AtomicInteger count = new AtomicInteger();public void init(){try {connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);connection = connectionFactory.createConnection();connection.start();session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);} catch (JMSException e) {e.printStackTrace();}}public void getMessage(String disname){try {Queue queue = session.createQueue(disname);MessageConsumer consumer = null;if(threadLocal.get()!=null){consumer = threadLocal.get();}else{consumer = session.createConsumer(queue);threadLocal.set(consumer);}while(true){Thread.sleep(1000);TextMessage msg = (TextMessage) consumer.receive();if(msg!=null) {msg.acknowledge();System.out.println(Thread.currentThread().getName()+
": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement());}else {break;}}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
}4、運行ActiveMQ項目
4.1 生產(chǎn)者開始生產(chǎn)消息
public class TestMq {public static void main(String[] args){Producter producter = new Producter();producter.init();TestMq testMq = new TestMq();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}//Thread 1new Thread(testMq.new ProductorMq(producter)).start();//Thread 2new Thread(testMq.new ProductorMq(producter)).start();//Thread 3new Thread(testMq.new ProductorMq(producter)).start();//Thread 4new Thread(testMq.new ProductorMq(producter)).start();//Thread 5new Thread(testMq.new ProductorMq(producter)).start();}private class ProductorMq implements Runnable{Producter producter;public ProductorMq(Producter producter){this.producter = producter;}@Overridepublic void run() {while(true){try {producter.sendMessage("Jaycekon-MQ");Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}}}}
}
ActiveMQ詳細入門使用教程
ActiveMQ介紹MQ是消息中間件,是一種在分布式系統(tǒng)中應用程序借以傳遞消息的媒介,常用的有ActiveMQ, RabbitMQ,kafka。ActiveMQ是Apache下的開源項目,完全支持JMS1.1和J2EE1.4規(guī)范的JMS Provider實現(xiàn)。 特點: 1、支持多種語言編寫客戶端 2、對spring的支持,很容易和spring整合 3、支持多種傳輸協(xié)議:TCP,SSL,NIO,UDP等 4、支持AJAX 消息形式: 1、點對點(queue) 2、一對多(topic) ActiveMQ測試編寫一個測試類對ActiveMQ進行測試,首先得向pom文件中添加ActiveMQ相關的jar包:<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> </dependency> queue的發(fā)送代碼如下:public void testMQProducerQueue() throws Exception{//1、創(chuàng)建工廠連接對象,需要制定ip和端口號ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");//2、使用連接工廠創(chuàng)建一個連接對象Connection connection = connectionFactory.createConnection();//3、開啟連接connection.start();//4、使用連接對象創(chuàng)建會話(session)對象Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5、使用會話對象創(chuàng)建目標對象,包含queue和topic(一對一和一對多)Queue queue = session.createQueue("test-queue");//6、使用會話對象創(chuàng)建生產(chǎn)者對象MessageProducer producer = session.createProducer(queue);//7、使用會話對象創(chuàng)建一個消息對象TextMessage textMessage = session.createTextMessage("hello!test-queue");//8、發(fā)送消息producer.send(textMessage);//9、關閉資源producer.close();session.close();connection.close();}接收代碼:public void TestMQConsumerQueue() throws Exception{//1、創(chuàng)建工廠連接對象,需要制定ip和端口號ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");//2、使用連接工廠創(chuàng)建一個連接對象Connection connection = connectionFactory.createConnection();//3、開啟連接connection.start();//4、使用連接對象創(chuàng)建會話(session)對象Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5、使用會話對象創(chuàng)建目標對象,包含queue和topic(一對一和一對多)Queue queue = session.createQueue("test-queue");//6、使用會話對象創(chuàng)建生產(chǎn)者對象MessageConsumer consumer = session.createConsumer(queue);//7、向consumer對象中設置一個messageListener對象,用來接收消息consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {// TODO Auto-generated method stubif(message instanceof TextMessage){TextMessage textMessage = (TextMessage)message;try {System.out.println(textMessage.getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}});//8、程序等待接收用戶消息System.in.read();//9、關閉資源consumer.close();session.close();connection.close();}接著對topic進行測試,發(fā)送代碼如下:public void TestTopicProducer() throws Exception{//1、創(chuàng)建工廠連接對象,需要制定ip和端口號ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");//2、使用連接工廠創(chuàng)建一個連接對象Connection connection = connectionFactory.createConnection();//3、開啟連接connection.start();//4、使用連接對象創(chuàng)建會話(session)對象Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5、使用會話對象創(chuàng)建目標對象,包含queue和topic(一對一和一對多)Topic topic = session.createTopic("test-topic");//6、使用會話對象創(chuàng)建生產(chǎn)者對象MessageProducer producer = session.createProducer(topic);//7、使用會話對象創(chuàng)建一個消息對象TextMessage textMessage = session.createTextMessage("hello!test-topic");//8、發(fā)送消息producer.send(textMessage);//9、關閉資源producer.close();session.close();connection.close();}接收代碼: 然后運行topic發(fā)送: 可以看到消息已經(jīng)發(fā)送出去。再運行topic接收:ActiveMQ整合spring及項目中運用 activeMQ與spring看一整合到一起使用,除了添加ActiveMQ相關的jar包外,還需要添加spring的jar包:<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </dependency> <!-- 配置能夠產(chǎn)生connection的connectionfactory,由JMS對應的服務廠商提供 --><bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/></bean><!-- 配置spring管理真正connectionfactory的connectionfactory,相當于spring 對connectionfactory的一層封裝 --><bean id="connectionFactory" lass="org.springframework.jms.connection. SingleConnectionFactory"><property name="targetConnectionFactory" ref="tagertConnectionFactory"/></bean><!-- 配置生產(chǎn)者 --><!-- Spring使用JMS工具類,可以用來發(fā)送和接收消息 --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><!-- 這里是配置的spring用來管理connectionfactory的connectionfactory --><property name="connectionFactory" ref="connectionFactory"/></bean><!-- 配置destination --><!-- 隊列目的地 --><bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="spring-queue"/></bean><!-- 話題目的地 --><bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg value="item-add-topic"/></bean>當然,在xml文件中配置好的jmstemplate和destination也要注入進來:@Autowiredprivate JmsTemplate jmsTemplate;@Resource(name="itemAddTopic")private Destination destination;然后消費者應該寫在我們的搜索工程中,首先添加spring和activeMQ的jar包,然后配置xml文件, 再編寫一個監(jiān)聽器,當接收到消息時,就講數(shù)據(jù)存入索引庫,xml文件代碼如下:<!-- 配置監(jiān)聽器 --><bean id="myListener" class="com.taotao.search.listener.MyListener"/><bean id="itemAddListener" class="com.taotao.search.listener.ItemAddListener"/><!-- 系統(tǒng)監(jiān)聽器 --> <!-- <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="queueDestination"/><property name="messageListener" ref="myListener"/></bean> --><bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="itemAddTopic"/><property name="messageListener" ref="itemAddListener"/>?
總結(jié)
以上是生活随笔為你收集整理的Java消息队列--ActiveMq 初体验的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Rocketmq原理最佳实践
- 下一篇: Java 内存溢出(java.lang.