javascript
Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务
文章目錄
- 概述
- 添加依賴
- 配置文件配置RabbitMQ的地址信息
- 接口定義
- 接收方 @EnableBinding @StreamListener
- 測試
- 消費(fèi)組
- 發(fā)送復(fù)雜對象
- 消息回執(zhí)
- 代碼
概述
官網(wǎng) : https://spring.io/projects/spring-cloud-stream
概括來說,Spring Cloud Stream 進(jìn)一步封裝了消息隊(duì)列,可以做到代碼層面對消息隊(duì)列無感知。
這里我們僅僅是做個(gè)入門級別的介紹,更多用法還是參考官網(wǎng)上的指導(dǎo)說明,畢竟最權(quán)威了。
添加依賴
無需多說,要想使用Spring Cloud Stream ,第一步肯定是添加依賴了 ,如下
這里使用的消息隊(duì)列是 RabbitMQ ,如果你是用的是kafka,換成對應(yīng)的spring-cloud-starter-stream-kafka依賴即可
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>配置文件配置RabbitMQ的地址信息
spring-cloud-starter-stream-rabbit是Spring Cloud Stream對RabbitMQ的封裝,包含了對RabbitMQ的自動化配置,比如連接的RabbitMQ的默認(rèn)地址localhost,默認(rèn)端口5672,默認(rèn)用戶guest,默認(rèn)密碼guest,如果采用的是如上默認(rèn)配置,可以不用修改配置。
這里我把配置文件放到了遠(yuǎn)端的Git,通過config server 拉取配置。
RabbitMQ的安裝 ,這里我選擇了使用Docker鏡像,安裝如下
在Docker CE中安裝RabbitMQ
接口定義
可知: Sink和Source兩個(gè)接口分別定義了輸入通道和輸出通道,Processor通過繼承Source和Sink,同時(shí)具有輸入通道和輸出通道。這里我們就模仿Sink和Source,自定義一個(gè)消息通道。
package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel;public interface ArtisanSink {// 同一個(gè)服務(wù)里面的通道名字不能一樣,在不同的服務(wù)里可以相同名字的通道// 否則啟動拋出如下異常 bean definition with this name already existsString INPUT = "MyMsgInput";@Input(ArtisanSink.INPUT)SubscribableChannel input();}如上定義了一個(gè)名為MyMsgInput的消息輸入通道,@Input注解的參數(shù)則表示了消息通道的名稱
接收方 @EnableBinding @StreamListener
StreamReceive 用來接收RabbitMQ發(fā)送來的消息
package com.artisan.order.message;import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component;/*** 接收方*/@Component // Step1 注解 綁定剛才的接口 @EnableBinding(ArtisanSink.class) @Slf4j public class StreamReceive {// Step2 @StreamListener 綁定對象的名稱@StreamListener(ArtisanSink.INPUT)public void processStreamMsg(Object msg){log.info("StreamReceive: {}",msg);}}- 第一步: 使用了@EnableBinding注解實(shí)現(xiàn)對消息通道的綁定,我們在該注解中還傳入了一個(gè)參數(shù)ArtisanSink.class,ArtisanSink是一個(gè)自定義接口,主要功能是實(shí)現(xiàn)對輸入消息通道綁定的定義。
- 第二步:在StreamReceive 類中定義了processStreamMsg方法,重點(diǎn)是在該方法上添加了@StreamListener注解,該注解表示該方法為消息中間件上數(shù)據(jù)流的事件監(jiān)聽器,ArtisanSink.INPUT參數(shù)表示這是input消息通道上的監(jiān)聽處理器。
測試
模擬發(fā)送發(fā)發(fā)送消息,方便起見,我們直接在Controller層寫個(gè)方法吧
package com.artisan.order.controller;import com.artisan.order.message.Sink; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;@RestController @Slf4j public class MsgStreamController {@Autowiredprivate ArtisanSink sink;@GetMapping("/sendMsgByStream")public void sendMsgByStream(){String message = "I am one msg sent by Spring Cloud Stream";sink.input().send(MessageBuilder.withPayload(message).build());} }通過 @Autowired自動注入剛才的Sink接口,然后調(diào)用 sink.input().send方法發(fā)送消息即可。
啟動服務(wù),觀察RabbitMQ上的隊(duì)列 ,自動創(chuàng)建了一個(gè)
點(diǎn)進(jìn)去看下
MyMsgInput和 在接口中的定義一致 。
訪問: http://localhost:8081/sendMsgByStream
觀察日志:
2019-04-13 10:56:32.749 INFO 820 --- [nio-8081-exec-4] com.artisan.order.message.StreamReceive : StreamReceive: I am one msg sent by Spring Cloud Stream接收方收到了一條消息如上,OK。
消費(fèi)組
需求: 由于服務(wù)可能會有多個(gè)實(shí)例同時(shí)在運(yùn)行,我們只希望消息被一個(gè)實(shí)例所接收
先來改造下項(xiàng)目,啟動多個(gè)服務(wù)實(shí)例
為了多啟動幾個(gè)節(jié)點(diǎn),我們需要把定義在遠(yuǎn)端Git上的要加載到bootstrap.yml中的端口信息給注釋掉,否則第二個(gè)端口因端口沖突起不來。
然后通過如下方式在JVM參數(shù)中指定啟動端口
第一個(gè)app 啟動端口 -Dserver.port=8082
第一個(gè)app 啟動端口 -Dserver.port=5656
啟動后查看在Eureka Server上的注冊情況
再看看RabbitMQ的消息隊(duì)列情況,兩個(gè) OK
舊版本中 ,如果不做任何設(shè)置,此時(shí)發(fā)送一條消息將會被所有的實(shí)例接收到 ,但是可以通過消息分組來解決 。
具體可參考: https://segmentfault.com/a/1190000011796459
主要是配置分組
spring:cloud:stream:bindings:# MyMsgInput 自定義 order消費(fèi)組MyMsgInput:# 消息組的名稱group: order#輸入通道的主題名destination: MyMsgInput#存在消息隊(duì)列中的消息,如果是復(fù)雜對象,則以JSON的形式展示content-type: application/json新版本:
Spring Boot : 2.0.3.RELEASE
Spring Cloud : Finchley.RELEASE
經(jīng)過測試 不存在這個(gè)問題
把這倆節(jié)點(diǎn)的日志信息都清空掉,重新發(fā)送個(gè)消息
我們就用5656這個(gè)節(jié)點(diǎn)好了 ,http://localhost:5656/sendMsgByStream
經(jīng)過驗(yàn)證只有5656這一個(gè)節(jié)點(diǎn)收到了消息。無需設(shè)置分組。
發(fā)送復(fù)雜對象
上面的例子中我們發(fā)送的是一個(gè)字符串,
如果是復(fù)雜對象呢? 來測試下
@GetMapping("/sendMsgByStream2")public void sendMsgByStream2(){OrderDTO orderDTO = new OrderDTO();orderDTO.setOrderId("11111");orderDTO.setOrderAmount(new BigDecimal(9999));sink.input().send(MessageBuilder.withPayload(orderDTO).build());}啟動5656端口的服務(wù),訪問 http://localhost:5656/sendMsgByStream2
觀察日志:
2019-04-13 17:06:47.438 INFO 13764 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)OK。
這是我們?nèi)绻严⑾M(fèi)方注釋掉,讓消息累計(jì)在消息隊(duì)列中,我們?nèi)タ聪孪㈥?duì)列中存儲的復(fù)雜對象的格式
啟動5656端口的服務(wù),訪問 http://localhost:5656/sendMsgByStream2
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers消息回執(zhí)
消費(fèi)者收到消息后給發(fā)送方一個(gè)ACK確認(rèn),該如何做呢?
比如接收到消息后,返回給ArtisanSource.OUTPUT一個(gè)消息,直接使用@SendTo直接即可,就會將返回的字符串發(fā)送給ArtisanSource.OUTPUT通道
定義一個(gè)
package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel;public interface ArtisanSource {String OUTPUT = "MyMsgOutput";@Output(ArtisanSource.OUTPUT)MessageChannel output(); }寫一個(gè)該消息的接收方
package com.artisan.order.message;import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component;/*** 接收方*/@Component // Step1 注解 綁定剛才的接口 @EnableBinding(ArtisanSource.class) @Slf4j public class StreamReceive2 {// Step2 @StreamListener 綁定對象的名稱@StreamListener(ArtisanSource.OUTPUT)public void processStreamMsg2(String msg){log.info("OUTPUT StreamReceive: {}",msg);}}啟動微服務(wù),訪問 http://localhost:5656/sendMsgByStream2
2019-04-13 18:06:51.817 INFO 972 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : INPUT StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null) 2019-04-13 18:06:51.823 INFO 972 --- [nio-5656-exec-1] c.artisan.order.message.StreamReceive2 : OUTPUT StreamReceive: received OK代碼
https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan_order
總結(jié)
以上是生活随笔為你收集整理的Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Boot2.x-15 整合
- 下一篇: Spring Cloud【Finchle