日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java消息中间件

發布時間:2023/12/10 java 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java消息中间件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.概述

中間件
非底層操作系統軟件,非業務應用軟件,不是直接給最終用戶使用的,不能直接給客戶帶來價值的軟件統稱為中間件。
消息中間件
管制關注于數據的發送和接收,利用高效可靠的異步消息傳遞機制集成分布式系統。
優點
① 解耦 ② 異步 ③ 橫向擴展 ④ 安全可靠 ⑤ 順序保證(比如kafka)
jms
java消息服務(Java Message Service)即JMS,是一個Java平臺中關于面向消息中間件的api,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信
什么是AMQP
AMQP(advanced message queuing protocol)是一個提供統一消息服務的應用層標準協議,基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同開發語言等條件的限制
常見消息中間件
activeMQ(支持多語言,實現jms1.1,j2ee1.4規范),RabbitMQ(支持更多語言,基于AMQP規范),kafka(高吞吐量,分布式,分區,O(1)磁盤順序提供消息持久化)

JMS消息模式
隊列模式
客戶端包括生產者和消費者
隊列中的消息只能被一個消費者消費
消費者可以隨時消費隊列中的消息
主題模式
客戶端包括發布者和訂閱者
主題中的消息被所有訂閱者消費
消費者不能消費訂閱之前就發送到主題中的消息
JMS編碼接口
ConnectionFactory 用于創建連接到消息中間件的連接工廠
Connection 代表了應用程序和消息服務器之間的通信鏈路
Destination 指消息發布和接收的地點,包括隊列或主題
Session 表示一個單線程的上下文,用于發送和接收消息
MessageConsumer 由會話創建,用于接收發送到目標的消息
MessageProducer 由會話創建,用于發送消息到目標
Message 是在消費者和生產者之間傳送的對象,消息頭,一組消息屬性,一個消息體
流程

2.安裝activeMQ

官網 : http://activemq.apache.org/
第一種啟動
進入bin ,activemq.bat 啟動
進入瀏覽器 http://127.0.0.1:8161
用戶名密碼默認為admin
第二種以服務啟動
InstallService.bat以管理員身份運行
服務中有ActiveMQ
linux
解壓壓縮包
進入bin 輸入 activemq start,啟動完成

3.jms 演示

創建maven項目
添加依賴

<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.9.0</version></dependency>

創建隊列模式的生產者AppProducer

package com.jms.queue;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class AppProducer {public static final String url = "tcp://localhost:61616";public static final String queueName = "queue-test ";public static void main(String[] args) throws JMSException {//1.創建連接工廠ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);//2.創建ConnectionConnection connection = connectionFactory.createConnection();//3.啟動連接connection.start();//4.創建會話 第一個參數:是否在事務中去處理, 第二個參數.應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.創建一個目標Destination destination = session.createQueue(queueName);//6創建一個生產者MessageProducer producer = session.createProducer(destination);for (int i = 0; i < 100; i++) {//7.創基建消息TextMessage textMessage = session.createTextMessage("test:"+i);producer.send(textMessage);System.out.println("發送消息:"+textMessage.getText());}//9關閉連接connection.close();} }

運行項目,打開activeMQ管理工具


100個消息,沒有消費者消費,連接已關閉
消費者 AppConsumer

package com.jms.queue;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class AppConsumer {public static final String url = "tcp://localhost:61616";public static final String queueName = "queue-test ";public static void main(String[] args) throws JMSException {//1.創建連接工廠ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);//2.創建ConnectionConnection connection = connectionFactory.createConnection();//3.啟動連接connection.start();//4.創建會話 第一個參數:是否在事務中去處理, 第二個參數.應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.創建一個目標Destination destination = session.createQueue(queueName);//6創建一個消費者MessageConsumer consumber = session.createConsumer(destination);//7創建一個監聽器consumber.setMessageListener(new MessageListener() {public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.out.println("接收消息:"+textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});//8關閉連接 消息是異步的 ,在程序退出是關閉,在這里不可以關閉//connection.close();} }

當啟動2個消費者,再啟動生產者,結果是2個消費者平均消費

創建主題模式發布者AppProducer

package com.jms.topic;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class AppProducer {public static final String url = "tcp://localhost:61616";public static final String topicName = "topic-test ";public static void main(String[] args) throws JMSException {//1.創建連接工廠ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);//2.創建ConnectionConnection connection = connectionFactory.createConnection();//3.啟動連接connection.start();//4.創建會話 第一個參數:是否在事務中去處理, 第二個參數.應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.創建一個目標Destination destination = session.createTopic(topicName);//6創建一個生產者MessageProducer producer = session.createProducer(destination);for (int i = 0; i < 100; i++) {//7.創基建消息TextMessage textMessage = session.createTextMessage("test:"+i);producer.send(textMessage);System.out.println("發送消息:"+textMessage.getText());}//9關閉連接connection.close();} }


這時直接運行訂閱者接收不到消息,因為發布者先運行了

訂閱者AppConsumer

package com.jms.topic;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class AppConsumer {public static final String url = "tcp://localhost:61616";public static final String topicName = "topic-test ";public static void main(String[] args) throws JMSException {//1.創建連接工廠ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);//2.創建ConnectionConnection connection = connectionFactory.createConnection();//3.啟動連接connection.start();//4.創建會話 第一個參數:是否在事務中去處理, 第二個參數.應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.創建一個目標Destination destination = session.createTopic(topicName);//6創建一個消費者MessageConsumer consumber = session.createConsumer(destination);//7創建一個監聽器consumber.setMessageListener(new MessageListener() {public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.out.println("接收消息:"+textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});//8關閉連接 消息是異步的 ,在程序退出是關閉//connection.close();} }

啟動兩個訂閱者,再啟動發布者,兩個訂閱者均可收到發布者的消息

4.使用spring集成jms鏈接activeMQ

ConnectionFactory 用于管理連接的工廠
JmsTemplate 用于發送和接收消息的模板類
MessageListerner 消息監聽器

ConnectionFactory 是spring 為我們提供的連接池
兩種連接池SingleConnectionFactory 和 CachingConnectionFactory
SingleConnectionFactory 是對于jms建立請求,只會返回一個連接
CachingConnectionFactory 實現了SingleConnectionFactory 的所有功能,還提供了緩存

JmsTemplate spring提供,線程安全,可以使用JmsTemplate 操作jms

MessageListerner 實現onMessage方法,接收Message參數

添加依賴

<dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.7.0</version><exclusions><exclusion><artifactId>spring-context</artifactId><groupId>org.springframework</groupId></exclusion></exclusions></dependency>

生產者 創建接口ProducerService

package com.spring.producer;public interface ProducerService {void sendMessage(String message);}

實現類

package com.spring.producer.impl;import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service;import com.spring.producer.ProducerService; @Service public class ProducerServiceImpl implements ProducerService {@Autowiredprivate JmsTemplate jmsTemplate;@Resource(name= "queueDestination")//因為可能配置多個目的地,所以使用resource name進行區分Destination destination;public void sendMessage(final String message) {//使用JmsTemplate發送消息jmsTemplate.send(destination,new MessageCreator() {//創建消息public Message createMessage(Session session) throws JMSException {TextMessage textMessage = session.createTextMessage(message);return textMessage;}});System.out.println("發送消息:"+message);}}

配置

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/mvchttp://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd"><context:annotation-config></context:annotation-config><context:component-scan base-package="com.spring.*" /><!-- ActiveMQ提供 ConnectionFactory--><bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"></property></bean><!-- spring-jms提供的連接池 --><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactory"></property></bean> <!-- 一個隊列目的地,點對點 --><bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><!-- 指定隊列名字 --><constructor-arg value="springQueue"></constructor-arg></bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory"></property></bean><!-- <bean id="ProducerServiceImpl" class="com.spring.producer.impl.ProducerServiceImpl"></bean> --> </beans>

啟動類

package com.spring.producer;import org.springframework.context.ApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;public class AppProducer {public static void main(String[] args) {ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");ProducerService service = context.getBean(ProducerService.class);for(int i=0;i<100;i++){service.sendMessage("test"+i);}//會自動清理資源((AbstractApplicationContext) context).close();} }

可以把公共地方提取出來

<import resource="common.xml"/>

創建消費者
配置

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/mvchttp://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd"><import resource="common.xml"/><!-- 導入公共配置 --><!-- 配置消息監聽器 --><bean id="consumerMessageListener" class="com.spring.consumer.ConsumberMessageListener"></bean><!-- jms容器 管理容器指定消息目的地,和消息監聽者 --><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"></property><!-- 目的地 --><property name="destination" ref="queueDestination"></property><!-- 監聽器 --><property name="messageListener" ref="consumerMessageListener"></property></bean> </beans>

監聽類

package com.spring.consumer;import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;import org.springframework.stereotype.Service; @Service public class ConsumberMessageListener implements MessageListener {public void onMessage(Message message) {TextMessage textMessage = (TextMessage)message;try {System.out.println("接收消息"+textMessage.getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}

啟動類

package com.spring.consumer;import org.springframework.context.ApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;public class AppConsumber {public static void main(String[] args) {ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");} }

主題模式
在common配置添加

<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg value="toptic"/></bean>

改ProducerServiceImpl Resource為

name= "topicDestination"

改consumer.xml jmsContainer

<property name="destination" ref="queueDestination"></property>

總結

以上是生活随笔為你收集整理的Java消息中间件的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。