日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ActiveMq使用笔记

發布時間:2023/12/19 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMq使用笔记 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

java JMS技術

.1.?? 什么是JMS

???????? JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。

???????? JMS是一種與廠商無關的 API,用來訪問消息收發系統消息。它類似于JDBC(Java Database Connectivity):這里,JDBC 是可以用來訪問許多不同關系數據庫的 API,而 JMS 則提供同樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。 JMS 使您能夠通過消息收發服務(有時稱為消息中介程序或路由器)從一個 JMS 客戶機向另一個 JMS客戶機發送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶著應用程序的數據或有效負載。根據有效負載的類型來劃分,可以將消息分為幾種類型,它們分別攜帶:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負載的消息 (Message)。

.2.?? JMS規范

.2.1.??? 專業技術規范

JMS(Java Messaging Service)是Java平臺上有關面向消息中間件(MOM)的技術規范,它便于消息系統中的Java應用程序進行消息交換,并且通過提供標準的產生、發送、接收消息的接口簡化企業應用的開發,翻譯為Java消息服務。

.2.2.??? 體系架構

JMS由以下元素組成。

JMS提供者provider:連接面向消息中間件的,JMS接口的一個實現。提供者可以是Java平臺的JMS實現,也可以是非Java平臺的面向消息中間件的適配器。

JMS客戶:生產或消費基于消息的Java的應用程序或對象。

JMS生產者:創建并發送消息的JMS客戶。

JMS消費者:接收消息的JMS客戶。

JMS消息:包括可以在JMS客戶之間傳遞的數據的對象

JMS隊列:一個容納那些被發送的等待閱讀的消息的區域。與隊列名字所暗示的意思不同,消息的接受順序并不一定要與消息的發送順序相同。一旦一個消息被閱讀,該消息將被從隊列中移走。

JMS主題:一種支持發送消息給多個訂閱者的機制。

.2.3.??? Java消息服務應用程序結構支持兩種模型

1、? 點對點或隊列模型

在點對點或隊列模型下,一個生產者向一個特定的隊列發布消息,一個消費者從該隊列中讀取消息。這里,生產者知道消費者的隊列,并直接將消息發送到消費者的隊列。

這種模式被概括為:

只有一個消費者將獲得消息

生產者不需要在接收者消費該消息期間處于運行狀態,接收者也同樣不需要在消息發送時處于運行狀態。

每一個成功處理的消息都由接收者簽收

2、發布者/訂閱者模型

發布者/訂閱者模型支持向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。

?

這種模式被概括為:

多個消費者可以獲得消息

在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。

?

1.下載ActiveMQ

去官方網站下載:http://activemq.apache.org/

2.運行ActiveMQ

解壓縮apache-activemq-5.5.1-bin.zip,

修改配置文件activeMQ.xml,將0.0.0.0修改為localhost

默認的activeMQ.xml文件如下:

修改后:

<transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616"/><transportConnector name="ssl" uri="ssl://localhost:61617"/><transportConnector name="stomp" uri="stomp://localhost:61613"/><transportConnector uri="http://localhost:8081"/><transportConnector uri="udp://localhost:61618"/> </transportConnectors>

然后雙擊apache-activemq-5.5.1\bin\activemq.bat運行ActiveMQ程序。

訪問的時候如果需要用戶名和密碼 都是admin admin...

啟動topic的相關的生產者和消費者:

生產者代碼:

ProducerTest.java

import java.util.Random;import javax.jms.JMSException; public class ProducerTest { /** * @param args */ public static void main(String[] args) throws JMSException, Exception { ProducerTool producer = new ProducerTool(); Random random = new Random();for(int i=0;i<20;i++){Thread.sleep(random.nextInt(10)*1000);producer.produceMessage("Hello, world!--"+i); producer.close();}} }

?ProducerTool.java

import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "mytopic"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null;// 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); }// 發送消息 public void produceMessage(String message) throws JMSException, Exception { initialize(); TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sending message: " + message); producer.send(msg); System.out.println("Producer:->Message sent complete!"); }// 關閉連接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null){producer.close(); } if (session != null){session.close(); } if (connection != null){connection.close(); } } }

消費者代碼:

ConsumerTest.java

import javax.jms.JMSException;public class ConsumerTest implements Runnable {static Thread t1 = null;/*** @param args* @throws InterruptedException* @throws InterruptedException* @throws JMSException* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {t1 = new Thread(new ConsumerTest());t1.setDaemon(false);t1.start();/*** 如果發生異常,則重啟consumer*//*while (true) {System.out.println(t1.isAlive());if (!t1.isAlive()) {t1 = new Thread(new ConsumerTest());t1.start();System.out.println("重新啟動");}Thread.sleep(5000);}*/// 延時500毫秒之后停止接受消息// Thread.sleep(500);// consumer.close(); }public void run() {try {ConsumerTool consumer = new ConsumerTool();consumer.consumeMessage();while (ConsumerTool.isconnection) { }} catch (Exception e) {}} }

ConsumerTool.java

import javax.jms.Connection; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /*** 消費者的模板 * @author ABC**/ public class ConsumerTool implements MessageListener,ExceptionListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url =ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "mytopic"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; public static Boolean isconnection=false;// 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic(subject); consumer = session.createConsumer(destination); } // 消費消息 public void consumeMessage() throws JMSException, Exception { initialize(); connection.start();consumer.setMessageListener(this); //注冊一個消息監聽器,有消息就執行onMessage()方法connection.setExceptionListener(this);//注冊一個異常監聽器,有異常就執行onException()方法isconnection=true;System.out.println("Consumer:->Begin listening..."); // 開始監聽 // Message message = consumer.receive(); }// 關閉連接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); }// 消息處理函數 public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer:->Received: " + msg); } else { System.out.println("Consumer:->Received: " + message); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }public void onException(JMSException arg0) {isconnection=false;//出現異常把isconnection設置成false } }

只啟動ProducerTest.java

如果這個時候把ActiveMq 關閉再開啟....重新訪問

?

之前的主題 mytopic產生的數據就沒有了.....

ActiveMq默認是沒有做持久化的,如果是Kafka只要是發過去的消息,都會一直存在,也可以設置一個過期的時間.到了期限,那些消息也是可以清除掉.否則就會一直都在.

ActiveMq一般是用在JavaEE中的....Kafka是用在大數據領域的.

再運行生產者的模板代碼: ConsumerTest.java

生產者生產的數據:

再運行生產者的模板代碼: ConsumerTest.java

生產者生產的數據:

消費者消費到數據:

?

看WEBUI

?

其他常用的JMS實現

要使用Java消息服務,你必須要有一個JMS提供者,管理會話和隊列。既有開源的提供者也有專有的提供者。

開源的提供者包括:

Apache ActiveMQ

JBoss 社區所研發的 HornetQ

Joram

Coridan的MantaRay

The OpenJMS Group的OpenJMS

專有的提供者包括:

BEA的BEA WebLogic Server JMS

TIBCO Software的EMS

GigaSpaces Technologies的GigaSpaces

Softwired 2006的iBus

IONA Technologies的IONA JMS

SeeBeyond的IQManager(2005年8月被Sun Microsystems并購)

webMethods的JMS+ -

my-channels的Nirvana

Sonic Software的SonicMQ

SwiftMQ的SwiftMQ

IBM的WebSphere MQ

?========================================================

附關于ActiveMq處理queue的模板代碼:

ProducerTest.java

import java.util.Random;import javax.jms.JMSException; public class ProducerTest { /** * @param args * @throws Exception * @throws JMSException */ public static void main(String[] args) throws JMSException, Exception{ ProducerTool producer = new ProducerTool();Random random = new Random();for(int i=0;i<20;i++){Thread.sleep(random.nextInt(10)*1000);producer.produceMessage("Hello, world!--"+i); producer.close();}} }

ProducerTool.java

import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "myqueue"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null;// 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); }// 發送消息 public void produceMessage(String message) throws JMSException, Exception { initialize(); TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sending message: " + message); producer.send(msg); System.out.println("Producer:->Message sent complete!"); }// 關閉連接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null){producer.close(); } if (session != null){session.close(); } if (connection != null){connection.close(); } } }

CustomerTest.java

public class ConsumerTest implements Runnable {static Thread t1 = null;public static void main(String[] args) throws InterruptedException {t1 = new Thread(new ConsumerTest());t1.start(); // while (true) { // System.out.println(t1.isAlive()); // if (!t1.isAlive()) { // t1 = new Thread(new ConsumerTest()); // t1.start(); // System.out.println("重新啟動"); // } // Thread.sleep(5000); // }// 延時500毫秒之后停止接受消息// Thread.sleep(500);// consumer.close(); }public void run() {try {ConsumerTool consumer = new ConsumerTool();consumer.consumeMessage();while (ConsumerTool.isconnection) { //System.out.println(123); }} catch (Exception e) {}} }

CustomerTool.java

import javax.jms.Connection; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class ConsumerTool implements MessageListener,ExceptionListener {private String user = ActiveMQConnection.DEFAULT_USER;private String password = ActiveMQConnection.DEFAULT_PASSWORD;private String url = ActiveMQConnection.DEFAULT_BROKER_URL;private String subject = "myqueue";private Destination destination = null;private Connection connection = null;private Session session = null;private MessageConsumer consumer = null;private ActiveMQConnectionFactory connectionFactory=null;public static Boolean isconnection=false;// 初始化private void initialize() throws JMSException {connectionFactory= new ActiveMQConnectionFactory(user, password, url);connection = connectionFactory.createConnection();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);destination = session.createQueue(subject);consumer = session.createConsumer(destination);}// 消費消息public void consumeMessage() throws JMSException {initialize();connection.start();consumer.setMessageListener(this);connection.setExceptionListener(this);System.out.println("Consumer:->Begin listening...");isconnection=true;// 開始監聽Message message = consumer.receive();System.out.println(message.getJMSMessageID());}// 關閉連接public void close() throws JMSException {System.out.println("Consumer:->Closing connection");if (consumer != null){consumer.close();}if (session != null){session.close();}if (connection != null){connection.close();}}// 消息處理函數public void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage txtMsg = (TextMessage) message;String msg = txtMsg.getText();System.out.println("Consumer:->Received: " + msg);} else {System.out.println("Consumer:->Received: " + message);}} catch (JMSException e) {e.printStackTrace();}}public void onException(JMSException arg0){isconnection=false;} }

?

總結

以上是生活随笔為你收集整理的ActiveMq使用笔记的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。