當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
RabbitMQ(六)整合SpringBoot
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ(六)整合SpringBoot
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.1 導入依賴
<dependencies><!--RabbitMQ 依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 測試依賴--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>1.2 修改配置文件
spring.rabbitmq.host=xxx.xxx.xxx.xxx spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=1231.3 隊列ttl
1.3.1 隊列架構圖
創建兩個隊列 QA 和 QB,兩者隊列 TTL 分別設置為 10S 和 40S,然后在創建一個交換機 X 和死信交 換機 Y,它們的類型都是 direct,創建一個死信隊列 QD,它們的綁定關系如下:
1.3.2 隊列配置類
@Configuration public class TtlQueueConfig {private static final String X_EXCHANGE = "X";private static final String QUEUE_A = "QA";private static final String QUEUE_B = "QB";private static final String Y_DEAD_LETTER_EXCHANGE = "Y";private static final String DEAD_LETTER_QUEUE = "QD"; ?// 聲明 xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);} ?// 聲明 xExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);} ?//聲明隊列 A ttl 為 10s 并綁定到對應的死信交換機@Bean("queueA")public Queue queueA() {Map<String, Object> args = new HashMap<>(3);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//聲明當前隊列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//聲明隊列的 TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();} ?// 聲明隊列 A 綁定 X 交換機@Beanpublic Binding queueaBindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");} ?//聲明隊列 B ttl 為 40s 并綁定到對應的死信交換機@Bean("queueB")public Queue queueB() {Map<String, Object> args = new HashMap<>(3);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//聲明當前隊列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//聲明隊列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();} ?//聲明隊列 B 綁定 X 交換機@Beanpublic Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queue1B).to(xExchange).with("XB");} ?//聲明死信隊列 QD@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);} ?//聲明死信隊列 QD 綁定關系@Beanpublic Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");} }1.3.3 生產者
@Slf4j @RequestMapping("ttl") @RestController public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate; ?@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("當前時間:{},發送一條信息給兩個 TTL 隊列:{}", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息來自 ttl 為 10S 的隊列: " + message);rabbitTemplate.convertAndSend("X", "XB", "消息來自 ttl 為 40S 的隊列: " + message);} }1.3.4 消費者
@Slf4j @Component public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("當前時間:{},收到死信隊列信息{}", new Date().toString(), msg);} }1.3.5 消息消費
發起一個請求 http://localhost:8080/ttl/sendMsg/hello
第一條消息在 10S 后變成了死信消息,然后被消費者消費掉,第二條消息在 40S 之后變成了死信消息,然后被消費掉,這樣一個延時隊列就打造完成了。
1.3.6 消費者延遲時間設置問題
? ? /*** 可以發送指定過期時間的消息* 不同過期時間指定不同的隊列會造成隊列過多* 我們可以不指定隊列的過期時間,而是在生產者這邊指定消息的過期時間** @param message* @param ttlTime*/@GetMapping("sendExpirationMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info("當前時間:{},發送一條時長{}毫秒 TTL 信息給隊列 C:{}", new Date(), ttlTime, message);}RabbitMQ 只會檢查第一個消息是否過期,如果過期則丟到死信隊列, 如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優先得到執行。
1.3.7 延時隊列總結
延時隊列在需要延時處理的場景下非常有用,使用 RabbitMQ 來實現延時隊列可以很好的利用 RabbitMQ 的特性,如:消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過 RabbitMQ 集群的特性,可以很好的解決單點故障問題,不會因為單個節點掛掉導致延時隊列不可用或者消息丟失。
當然,延時隊列還有很多其它選擇,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的時間輪,這些方式各有特點,看需要適用的場景。
總結
以上是生活随笔為你收集整理的RabbitMQ(六)整合SpringBoot的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ(四)交换机exchan
- 下一篇: 图文结合分析Spring的面向切面编程-