spring整合rabbitMQ最新版
生活随笔
收集整理的這篇文章主要介紹了
spring整合rabbitMQ最新版
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
文章目錄
- 一、簡(jiǎn)單對(duì)象
- 1. 依賴(lài)
- 2. 生產(chǎn)者
- 3. 消費(fèi)者
- 4. 配置文件
- 5. spring版本
- 二、復(fù)雜對(duì)象
- 2.1. 生產(chǎn)者
- 2.2. 消費(fèi)者
一、簡(jiǎn)單對(duì)象
1. 依賴(lài)
<!--spring整合rabbitmq--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.1.RELEASE</version></dependency>注:maven方式,這一個(gè)依賴(lài)即可,如果是非maven項(xiàng)目,需要引入5個(gè)jar如下:
推薦使用mavne方式,簡(jiǎn)單,非Maven項(xiàng)目,先用maven把以來(lái)下載本地倉(cāng)庫(kù),復(fù)制到非maven的項(xiàng)目中即可。
2. 生產(chǎn)者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--生產(chǎn)者者配置如下:--><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.vhost}"connection-timeout="${rabbitmq.conTimeout}"publisher-confirms="${rabbitmq.publisher-confirms}"publisher-returns="${rabbitmq.publisher-returns}"/><!-- 管理消息隊(duì)列 --><rabbit:admin connection-factory="connectionFactory"/><!--此處為配置文件方式 管控臺(tái)配置模式需要注釋 默認(rèn)模式管控臺(tái) Start--><!-- 定義一個(gè)隊(duì)列或者多個(gè)隊(duì)列 自動(dòng)聲明--><rabbit:queue name="Queue-1" auto-declare="true" durable="true"/><rabbit:topic-exchange name="exchange-1"><rabbit:bindings><!-- 可綁定多個(gè)隊(duì)列,發(fā)送的時(shí)候指定key進(jìn)行發(fā)送 --><rabbit:binding queue="Queue-1" pattern="ws.tjqb"/></rabbit:bindings></rabbit:topic-exchange><!--此處為配置文件方式 管控臺(tái)配置模式需要注釋 默認(rèn)模式管控臺(tái) End--><!-- 定義交換機(jī) 自動(dòng)聲明--><rabbit:topic-exchange name="exchange-1"auto-declare="true" durable="true"/><!-- 5. 配置消息對(duì)象json轉(zhuǎn)換類(lèi) --><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!-- 定義MQ消息模板1. id : 定義消息模板ID2.connection-factory : 把定義的連接工廠放到消息模板中3.confirm-callback : confirm確認(rèn)機(jī)制4.return-callback : return確認(rèn)機(jī)制5.mandatory :#有2種狀態(tài)設(shè)置為 true 后 消費(fèi)者在消息沒(méi)有被路由到合適隊(duì)列情況下會(huì)被return監(jiān)聽(tīng),而不會(huì)自動(dòng)刪除;設(shè)置為 false 后 消費(fèi)者在消息沒(méi)有被路由到合適隊(duì)列情況下會(huì)自動(dòng)刪除--><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory"exchange="exchange-1"confirm-callback="confirmCallBackListener"return-callback="returnCallBackListener"mandatory="true"message-converter="jsonMessageConverter"/> </beans> package com.gblfy.order.controller;import com.gblfy.order.pojo.FisCallingTrace; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID;@RestController public class Send {public static final String EXCHANGE = "exchange-1";@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/test")public String test() {String uuidStr = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuidStr);// 發(fā)送消息Map<String, String> map = new HashMap<>();map.put("email", "550731230@qq.com");rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);return "success";}3. 消費(fèi)者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--消費(fèi)者配置如下:--><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"connection-timeout="${rabbitmq.conTimeout}"publisher-confirms="${rabbitmq.publisher-confirms}"publisher-returns="${rabbitmq.publisher-returns}"/><!-- 管理消息隊(duì)列 --><rabbit:admin connection-factory="connectionFactory"/><!-- 聲明多個(gè)消費(fèi)者對(duì)象 --><bean id="emailMessageListener" class="com.gblfy.order.mqhandler.EmailMessageListener"/><!-- 監(jiān)聽(tīng)隊(duì)列1. connectionFactory 連接工廠2. manual 手動(dòng)簽收3. ref="" 消費(fèi)者監(jiān)聽(tīng)--><rabbit:listener-container connection-factory="connectionFactory"acknowledge="manual"concurrency="${rabbitmq.concurrency}"max-concurrency="${rabbitmq.max-concurrency}"><rabbit:listener ref="emailMessageListener" method="onMessage" queue-names="Queue-1"/></rabbit:listener-container> </beans> package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener;import java.io.IOException; @Component public class EmailMessageListener implements MessageListener {private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void onMessage(Message message) {try {JsonNode jsonNode = MAPPER.readTree(message.getBody());String email = jsonNode.get("email").asText();System.out.println("獲取隊(duì)列中消息:" + email);} catch (IOException e) {e.printStackTrace();}} }4. 配置文件
#RabbitMQ 連接信息 #IP地址 rabbitmq.host=192.168.0.114 #端口 rabbitmq.port=5672 #用戶(hù)名 rabbitmq.username=fis #密碼 rabbitmq.password=ncl@1234 #虛擬主機(jī) rabbitmq.vhost=/app/fisMQ #連接超時(shí)時(shí)間 rabbitmq.conTimeout=15000 #發(fā)送確認(rèn) 對(duì)應(yīng)RabbitTemplate.ConfirmCallback接口 #消息發(fā)送成功 有2個(gè)重要參數(shù) # ack 狀態(tài)為true correlationId 全局唯一ID用于標(biāo)識(shí)每一支隊(duì)列 rabbitmq.publisher-confirms=true #發(fā)送失敗回退,對(duì)應(yīng)RabbitTemplate.ReturnCallback接口 rabbitmq.publisher-returns=true #默認(rèn)消費(fèi)者數(shù)量 rabbitmq.concurrency=10 #最大消費(fèi)者數(shù)量 rabbitmq.max-concurrency=205. spring版本
目前適配的spring版本4.2.3.RELEASE
二、復(fù)雜對(duì)象
聲明:配置文件不變
2.1. 生產(chǎn)者
package com.gblfy.order.controller;import com.gblfy.order.pojo.FisCallingTrace; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID;@RestController public class Send {public static final String EXCHANGE = "exchange-1";@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/test")public String test() {FisCallingTrace f = getFisCallingTrace();String uuidStr = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuidStr);// 發(fā)送消息Map<String, Object> map = new HashMap<>();map.put("mReqXml", "請(qǐng)求報(bào)文");map.put("mResXml", "響應(yīng)報(bào)文");map.put("mUUID", uuidStr);map.put("serviceName", "NYHC");map.put("fisCallingTrace", f);rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);return "success";}// @RequestMapping("/test")// public String test() {// String uuidStr = UUID.randomUUID().toString();// CorrelationData correlationId = new CorrelationData(uuidStr);// // 發(fā)送消息// Map<String, String> map = new HashMap<>();// map.put("email", "550731230@qq.com");// rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);// return "success";// }private FisCallingTrace getFisCallingTrace() {FisCallingTrace f = new FisCallingTrace();f.setServicename("tjqb");f.setServicetype("1");f.setInterfacetype("2");f.setResstatus("1");f.setResremark("紐約數(shù)據(jù)回傳接口");f.setReqdate(new Date());f.setReqtime("10:00:00");f.setResdate(new Date());f.setRestime("10:00:00");f.setReqxml("請(qǐng)求報(bào)文");f.setResxml("響應(yīng)報(bào)文");return f;}}2.2. 消費(fèi)者
package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.gblfy.order.pojo.FisCallingTrace; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component;import java.io.IOException;@Slf4j @Component public class EmailMessageListener implements ChannelAwareMessageListener {private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try {JsonNode jsonNode = MAPPER.readTree(message.getBody());String mReqXml = jsonNode.get("mReqXml").asText();String mResXml = jsonNode.get("mResXml").asText();String mUUID = jsonNode.get("mUUID").asText();String serviceName = jsonNode.get("serviceName").asText();System.out.println("獲取隊(duì)列中消息:" + mReqXml);System.out.println("獲取隊(duì)列中消息:" + mResXml);System.out.println("獲取隊(duì)列中消息:" + mUUID);System.out.println("獲取隊(duì)列中消息:" + serviceName);JsonNode jsonNode1 = jsonNode.get("fisCallingTrace");String jsonStr = MAPPER.writeValueAsString(jsonNode1);FisCallingTrace f= MAPPER.readValue(jsonStr , FisCallingTrace.class);System.out.println("獲取隊(duì)列中消息:" + f.getReqxml());System.out.println("獲取隊(duì)列中消息:" + f.getResxml());// 消息的標(biāo)識(shí),false只確認(rèn)當(dāng)前一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {e.printStackTrace();}log.info("解析操作");log.info("落庫(kù)操作");} }總結(jié)
以上是生活随笔為你收集整理的spring整合rabbitMQ最新版的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: PLSQL 设置日期格式为年月日不显示时
- 下一篇: Stream filter过滤案例