日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java activemq 分布式_分布式--ActiveMQ 消息中间件(一)

發(fā)布時(shí)間:2023/12/31 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java activemq 分布式_分布式--ActiveMQ 消息中间件(一) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1). ActiveMQ

ActiveMQ是Apache所提供的一個(gè)開源的消息系統(tǒng),完全采用Java來實(shí)現(xiàn),因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務(wù))規(guī)范。JMS是一組Java應(yīng)用程序接口,它提供消息的創(chuàng)建、發(fā)送、讀取等一系列服務(wù)。JMS提供了一組公共應(yīng)用程序接口和響應(yīng)的語法,類似于Java數(shù)據(jù)庫的統(tǒng)一訪問接口JDBC,它是一種與廠商無關(guān)的API,使得Java程序能夠與不同廠商的消息組件很好地進(jìn)行通信。

2). Java Message Service(JMS)

JMS支持兩種消息發(fā)送和接收模型。

一種稱為P2P(Ponit to Point)模型,即采用點(diǎn)對(duì)點(diǎn)的方式發(fā)送消息。P2P模型是基于隊(duì)列的,消息生產(chǎn)者發(fā)送消息到隊(duì)列,消息消費(fèi)者從隊(duì)列中接收消息,隊(duì)列的存在使得消息的異步傳輸稱為可能,P2P模型在點(diǎn)對(duì)點(diǎn)的情況下進(jìn)行消息傳遞時(shí)采用。

圖1.png

另一種稱為Pub/Sub(Publish/Subscribe,即發(fā)布-訂閱)模型,發(fā)布-訂閱模型定義了如何向一個(gè)內(nèi)容節(jié)點(diǎn)發(fā)布和訂閱消息,這個(gè)內(nèi)容節(jié)點(diǎn)稱為topic(主題)。主題可以認(rèn)為是消息傳遞的中介,消息發(fā)布這將消息發(fā)布到某個(gè)主題,而消息訂閱者則從主題訂閱消息。主題使得消息的訂閱者與消息的發(fā)布者互相保持獨(dú)立,不需要進(jìn)行接觸即可保證消息的傳遞,發(fā)布-訂閱模型在消息的一對(duì)多廣播時(shí)采用。

圖2.png

3). JMS術(shù)語

Provider/MessageProvider:生產(chǎn)者

Consumer/MessageConsumer:消費(fèi)者

PTP:Point To Point,點(diǎn)對(duì)點(diǎn)通信消息模型

Pub/Sub:Publish/Subscribe,發(fā)布訂閱消息模型

Queue:隊(duì)列,目標(biāo)類型之一,和PTP結(jié)合

Topic:主題,目標(biāo)類型之一,和Pub/Sub結(jié)合

ConnectionFactory:連接工廠,JMS用它創(chuàng)建連接

Connnection:JMS Client到JMS Provider的連接

Destination:消息目的地,由Session創(chuàng)建

Session:會(huì)話,由Connection創(chuàng)建,實(shí)質(zhì)上就是發(fā)送、接受消息的一個(gè)線程,因此生產(chǎn)者、消費(fèi)者都是Session創(chuàng)建的

圖3.png

bin (windows下面的bat(分32、64位)和unix/linux下面的sh)

conf (activeMQ配置目錄,包含最基本的activeMQ配置文件)

data (默認(rèn)是空的)

docs (index,replease版本里面沒有文檔,-.-b不知道為啥不帶)

example (幾個(gè)例子)

lib (activemMQ使用到的lib)

webapps 注意ActiveMQ自帶Jetty提供Web管控臺(tái)

webapps-demo 示例

activemq-all-5.15.3.jar

LICENSE.txt

README.txt

5). 配置

Web控制臺(tái)賬號(hào)和密碼(apache-activemq-5.15.3\conf)

圖4.png

網(wǎng)絡(luò)端口(apache-activemq-5.15.3\conf)--默認(rèn)為8161

圖5.png

6). 啟動(dòng)

\apache-activemq-5.15.3\bin\win64\目錄下雙擊activemq.bat文件,在瀏覽器中輸入http://localhost:8161/admin/, 用戶名和密碼輸入admin即可

圖6.png

7). 消息中間件(MOM:Message Orient middleware)

消息中間件有很多的用途和優(yōu)點(diǎn):

1 將數(shù)據(jù)從一個(gè)應(yīng)用程序傳送到另一個(gè)應(yīng)用程序,或者從軟件的一個(gè)模塊傳送到另外一個(gè)模塊;

負(fù)責(zé)建立網(wǎng)絡(luò)通信的通道,進(jìn)行數(shù)據(jù)的可靠傳送。

保證數(shù)據(jù)不重發(fā),不丟失

能夠?qū)崿F(xiàn)跨平臺(tái)操作,能夠?yàn)椴煌僮飨到y(tǒng)上的軟件集成技工數(shù)據(jù)傳送服務(wù)

8).什么情況下使用ActiveMQ?

多個(gè)項(xiàng)目之間集成

(1) 跨平臺(tái)

(2) 多語言

(3) 多項(xiàng)目

降低系統(tǒng)間模塊的耦合度,解耦

(1) 軟件擴(kuò)展性

系統(tǒng)前后端隔離

(1) 前后端隔離,屏蔽高安全區(qū)

2. ActiveMQ 示例

1). P2P 示例

I. 導(dǎo)包--activemq-all-5.15.3.jar

II. Producer

/**

* 定義消息的生產(chǎn)者

* @author mazaiting

*/

public class Producer {

// 用戶名

private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

// 密碼

private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

// 鏈接

private static final String BROKENURL = ActiveMQConnection.DEFAULT_BROKER_URL;

/**

* 定義消息并發(fā)送,等待消息的接收者(消費(fèi)者)消費(fèi)此消息

* @param args

* @throws JMSException

*/

public static void main(String[] args) throws JMSException {

// 消息中間件的鏈接工廠

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

USERNAME, PASSWORD, BROKENURL);

// 連接

Connection connection = null;

// 會(huì)話

Session session = null;

// 消息的目的地

Destination destination = null;

// 消息生產(chǎn)者

MessageProducer messageProducer = null;

try {

// 通過連接工廠獲取鏈接

connection = connectionFactory.createConnection();

// 創(chuàng)建會(huì)話,進(jìn)行消息的發(fā)送

// 參數(shù)一:是否啟用事務(wù)

// 參數(shù)二:設(shè)置自動(dòng)簽收

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

// 創(chuàng)建消息隊(duì)列

destination = session.createQueue("talkWithMo");

// 創(chuàng)建一個(gè)消息生產(chǎn)者

messageProducer = session.createProducer(destination);

// 設(shè)置持久化/非持久化, 如果非持久化,MQ重啟后可能后導(dǎo)致消息丟失

messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// 模擬發(fā)送消息

for (int i = 0; i < 5; i++) {

TextMessage textMessage = session.createTextMessage("給媽媽發(fā)送的消息:"+i);

System.out.println("textMessage: " + textMessage);

messageProducer.send(textMessage);

}

// 如果設(shè)置了事務(wù),會(huì)話就必須提交

session.commit();

} catch (JMSException e) {

e.printStackTrace();

} finally {

if (null != connection) {

connection.close();

}

}

}

}

III. Consumer

/**

* 定義消息的消費(fèi)者

* @author mazaiting

*/

public class Consumer {

// 用戶名

private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

// 密碼

private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

// 鏈接

private static final String BROKENURL = ActiveMQConnection.DEFAULT_BROKER_URL;

/**

* 接收消息

* @param args

* @throws JMSException

*/

public static void main(String[] args) throws JMSException {

// 消息中間件的鏈接工廠

ConnectionFactory connectionFactory = null;

// 鏈接

Connection connection = null;

// 會(huì)話

Session session = null;

// 消息的目的地

Destination destination = null;

// 消息的消費(fèi)者

MessageConsumer messageConsumer = null;

// 實(shí)例化鏈接工廠,創(chuàng)建一個(gè)鏈接

connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKENURL);

try {

// 通過工廠獲取鏈接

connection = connectionFactory.createConnection();

// 啟動(dòng)鏈接

connection.start();

// 創(chuàng)建會(huì)話,進(jìn)行消息的接收

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

// 創(chuàng)建消息隊(duì)列

destination = session.createQueue("talkWithMo");

// 創(chuàng)建一個(gè)消息的消費(fèi)者

messageConsumer = session.createConsumer(destination);

// 模擬接收消息

while (true) {

TextMessage textMessage = (TextMessage) messageConsumer.receive(10000);

if (null != textMessage) {

System.out.println("收到消息: " + textMessage);

} else {

break;

}

}

// 提交

session.commit();

} catch (JMSException e) {

e.printStackTrace();

} finally {

if (null != connection) {

connection.close();

}

}

}

}

IV. 測(cè)試

先運(yùn)行生產(chǎn)者Producer

圖7.png

ActiveMQ控制臺(tái)

圖8.png

再運(yùn)行消費(fèi)者Consumer

圖9.png

ActiveMQ控制臺(tái)

圖10.png

V. 消息類型

StreamMessage Java原始值的數(shù)據(jù)流

MapMessage 一套名稱-鍵值對(duì)

TextMessage 一個(gè)字符串對(duì)象

ObjectMessage 一個(gè)序列號(hào)的Java對(duì)象

BytesMessage 一個(gè)未解釋字節(jié)的數(shù)據(jù)流

VI. 控制臺(tái) Queue

Messages Enqueued:表示生產(chǎn)了多少條消息,記做P

Messages Dequeued:表示消費(fèi)了多少條消息,記做C

Number Of Consumers:表示在該隊(duì)列上還有多少消費(fèi)者在等待接受消息

Number Of Pending Messages:表示還有多少條消息沒有被消費(fèi),實(shí)際上是表示消息的積壓程度,就是P-C

VII. 簽收

簽收就是消費(fèi)者接受到消息后,需要告訴消息服務(wù)器,我收到消息了。當(dāng)消息服務(wù)器收到回執(zhí)后,本條消息將失效。因此簽收將對(duì)PTP模式產(chǎn)生很大影響。如果消費(fèi)者收到消息后,并不簽收,那么本條消息繼續(xù)有效,很可能會(huì)被其他消費(fèi)者消費(fèi)掉!

AUTO_ACKNOWLEDGE:表示在消費(fèi)者receive消息的時(shí)候自動(dòng)的簽收

CLIENT_ACKNOWLEDGE:表示消費(fèi)者receive消息后必須手動(dòng)的調(diào)用acknowledge()方法進(jìn)行簽收

DUPS_OK_ACKNOWLEDGE:簽不簽收無所謂了,只要消費(fèi)者能夠容忍重復(fù)的消息接受,當(dāng)然這樣會(huì)降低Session的開銷

2). request/reply模型

I. 實(shí)現(xiàn)思路

圖11.png

Client的Producer發(fā)出一個(gè)JMS message形式的request,request上附加了一些額外的屬性:

correlation ID(用來和返回的correlation ID對(duì)比進(jìn)行驗(yàn)證),

JMSReplyTo屬性(放置jms message的destination,這樣worker的Consumer獲得jms message就能得到destination)

Worker的consumer收到requset,處理request并用producer發(fā)出reply,destination就從requset的JMSReplyTo屬性中得到。

II. Server代碼

public class Server implements MessageListener {

// 經(jīng)紀(jì)人鏈接

private static final String BROKER_URL = "tcp://localhost:61616";

// 請(qǐng)求隊(duì)列

private static final String REQUEST_QUEUE = "requestQueue";

// 經(jīng)紀(jì)人服務(wù)

private BrokerService brokerService;

// 會(huì)話

private Session session;

// 生產(chǎn)者

private MessageProducer producer;

// 消費(fèi)者

private MessageConsumer consumer;

private void start() throws Exception {

createBroker();

setUpConsumer();

}

/**

* 創(chuàng)建經(jīng)紀(jì)人

* @throws Exception

*/

private void createBroker() throws Exception {

// 創(chuàng)建經(jīng)紀(jì)人服務(wù)

brokerService = new BrokerService();

// 設(shè)置是否持久化

brokerService.setPersistent(false);

// 設(shè)置是否使用JMX

brokerService.setUseJmx(false);

// 添加鏈接

brokerService.addConnector(BROKER_URL);

// 啟動(dòng)

brokerService.start();

}

/**

* 設(shè)置消費(fèi)者

* @throws JMSException

*/

private void setUpConsumer() throws JMSException {

// 創(chuàng)建連接工廠

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

// 創(chuàng)建連接

Connection connection = connectionFactory.createConnection();

// 啟動(dòng)連接

connection.start();

// 創(chuàng)建Session

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 創(chuàng)建隊(duì)列

Destination adminQueue = session.createQueue(REQUEST_QUEUE);

// 創(chuàng)建生產(chǎn)者

producer = session.createProducer(null);

// 設(shè)置持久化模式

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// 創(chuàng)建消費(fèi)者

consumer = session.createConsumer(adminQueue);

// 消費(fèi)者設(shè)置消息監(jiān)聽

consumer.setMessageListener(this);

}

public void stop() throws Exception {

producer.close();

consumer.close();

session.close();

brokerService.stop();

}

@Override

public void onMessage(Message message) {

try {

// 創(chuàng)建新消息

TextMessage response = this.session.createTextMessage();

// 判斷消息是否是文本消息

if (message instanceof TextMessage) {

// 強(qiáng)轉(zhuǎn)為文本消息

TextMessage textMessage = (TextMessage) message;

// 獲取消息內(nèi)容

String text = textMessage.getText();

// 設(shè)置消息

response.setText(handleRequest(text));

}

response.setJMSCorrelationID(message.getJMSCorrelationID());

producer.send(message.getJMSReplyTo(), response);

} catch (JMSException e) {

e.printStackTrace();

}

}

/**

* 構(gòu)建消息內(nèi)容

* @param text 文本

* @return

*/

private String handleRequest(String text) {

return "Response to '" + text + "'";

}

public static void main(String[] args) throws Exception {

Server server = new Server();

// 啟動(dòng)

server.start();

System.out.println();

System.out.println("Press any key to stop the server");

System.out.println();

System.in.read();

server.stop();

}

}

III. Client代碼

public class Client implements MessageListener {

// 經(jīng)紀(jì)人鏈接

private static final String BROKER_URL = "tcp://localhost:61616";

// 請(qǐng)求隊(duì)列

private static final String REQUEST_QUEUE = "requestQueue";

// 連接

private Connection connection;

// 會(huì)話

private Session session;

// 生產(chǎn)者

private MessageProducer producer;

// 消費(fèi)者

private MessageConsumer consumer;

// 請(qǐng)求隊(duì)列

private Queue tempDest;

public void start() throws JMSException {

// 連接工廠

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

// 創(chuàng)建連接

connection = activeMQConnectionFactory.createConnection();

// 開啟連接

connection.start();

// 創(chuàng)建會(huì)話

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 創(chuàng)建隊(duì)列

Destination adminQueue = session.createQueue(REQUEST_QUEUE);

// 創(chuàng)建生產(chǎn)者

producer = session.createProducer(adminQueue);

// 設(shè)置持久化模式

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// 創(chuàng)建模板隊(duì)列

tempDest = session.createTemporaryQueue();

// 創(chuàng)建消費(fèi)者

consumer = session.createConsumer(tempDest);

// 設(shè)置消息監(jiān)聽

consumer.setMessageListener(this);

}

/**

* 停止

* @throws JMSException

*/

public void stop() throws JMSException {

producer.close();

consumer.close();

session.close();

}

/**

* 請(qǐng)求

* @param request

* @throws JMSException

*/

public void request(String request) throws JMSException {

System.out.println("Request: " + request);

// 創(chuàng)建文本消息

TextMessage textMessage = session.createTextMessage();

// 設(shè)置文本內(nèi)容

textMessage.setText(request);

// 設(shè)置回復(fù)

textMessage.setJMSReplyTo(tempDest);

// 獲取UUID

String correlationId = UUID.randomUUID().toString();

// 設(shè)置JMS id

textMessage.setJMSCorrelationID(correlationId);

// 發(fā)送消息

this.producer.send(textMessage);

}

@Override

public void onMessage(Message message) {

try {

System.out.println("Received response for: " + ((TextMessage)message).getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

public static void main(String[] args) throws JMSException, InterruptedException {

Client client = new Client();

// 啟動(dòng)

client.start();

int i = 0;

while(i++ < 10) {

client.request("REQUEST- " + i);

}

Thread.sleep(3000);

client.stop();

}

}

IV. 測(cè)試

啟動(dòng)Server

圖12.png

啟動(dòng)Client

圖13.png

總結(jié)

以上是生活随笔為你收集整理的java activemq 分布式_分布式--ActiveMQ 消息中间件(一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。