javascript
Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务
文章目錄
- 概述
- 添加依賴
- 配置文件配置RabbitMQ的地址信息
- 接口定義
- 接收方 @EnableBinding @StreamListener
- 測(cè)試
- 消費(fèi)組
- 發(fā)送復(fù)雜對(duì)象
- 消息回執(zhí)
- 代碼
概述
官網(wǎng) : https://spring.io/projects/spring-cloud-stream
概括來(lái)說(shuō),Spring Cloud Stream 進(jìn)一步封裝了消息隊(duì)列,可以做到代碼層面對(duì)消息隊(duì)列無(wú)感知。
這里我們僅僅是做個(gè)入門級(jí)別的介紹,更多用法還是參考官網(wǎng)上的指導(dǎo)說(shuō)明,畢竟最權(quán)威了。
添加依賴
無(wú)需多說(shuō),要想使用Spring Cloud Stream ,第一步肯定是添加依賴了 ,如下
這里使用的消息隊(duì)列是 RabbitMQ ,如果你是用的是kafka,換成對(duì)應(yīng)的spring-cloud-starter-stream-kafka依賴即可
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>配置文件配置RabbitMQ的地址信息
spring-cloud-starter-stream-rabbit是Spring Cloud Stream對(duì)RabbitMQ的封裝,包含了對(duì)RabbitMQ的自動(dòng)化配置,比如連接的RabbitMQ的默認(rèn)地址localhost,默認(rèn)端口5672,默認(rèn)用戶guest,默認(rèn)密碼guest,如果采用的是如上默認(rèn)配置,可以不用修改配置。
這里我把配置文件放到了遠(yuǎn)端的Git,通過(guò)config server 拉取配置。
RabbitMQ的安裝 ,這里我選擇了使用Docker鏡像,安裝如下
在Docker CE中安裝RabbitMQ
接口定義
可知: Sink和Source兩個(gè)接口分別定義了輸入通道和輸出通道,Processor通過(guò)繼承Source和Sink,同時(shí)具有輸入通道和輸出通道。這里我們就模仿Sink和Source,自定義一個(gè)消息通道。
package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel;public interface ArtisanSink {// 同一個(gè)服務(wù)里面的通道名字不能一樣,在不同的服務(wù)里可以相同名字的通道// 否則啟動(dòng)拋出如下異常 bean definition with this name already existsString INPUT = "MyMsgInput";@Input(ArtisanSink.INPUT)SubscribableChannel input();}如上定義了一個(gè)名為MyMsgInput的消息輸入通道,@Input注解的參數(shù)則表示了消息通道的名稱
接收方 @EnableBinding @StreamListener
StreamReceive 用來(lái)接收RabbitMQ發(fā)送來(lái)的消息
package com.artisan.order.message;import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component;/*** 接收方*/@Component // Step1 注解 綁定剛才的接口 @EnableBinding(ArtisanSink.class) @Slf4j public class StreamReceive {// Step2 @StreamListener 綁定對(duì)象的名稱@StreamListener(ArtisanSink.INPUT)public void processStreamMsg(Object msg){log.info("StreamReceive: {}",msg);}}- 第一步: 使用了@EnableBinding注解實(shí)現(xiàn)對(duì)消息通道的綁定,我們?cè)谠撟⒔庵羞€傳入了一個(gè)參數(shù)ArtisanSink.class,ArtisanSink是一個(gè)自定義接口,主要功能是實(shí)現(xiàn)對(duì)輸入消息通道綁定的定義。
- 第二步:在StreamReceive 類中定義了processStreamMsg方法,重點(diǎn)是在該方法上添加了@StreamListener注解,該注解表示該方法為消息中間件上數(shù)據(jù)流的事件監(jiān)聽(tīng)器,ArtisanSink.INPUT參數(shù)表示這是input消息通道上的監(jiān)聽(tīng)處理器。
測(cè)試
模擬發(fā)送發(fā)發(fā)送消息,方便起見(jiàn),我們直接在Controller層寫(xiě)個(gè)方法吧
package com.artisan.order.controller;import com.artisan.order.message.Sink; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;@RestController @Slf4j public class MsgStreamController {@Autowiredprivate ArtisanSink sink;@GetMapping("/sendMsgByStream")public void sendMsgByStream(){String message = "I am one msg sent by Spring Cloud Stream";sink.input().send(MessageBuilder.withPayload(message).build());} }通過(guò) @Autowired自動(dòng)注入剛才的Sink接口,然后調(diào)用 sink.input().send方法發(fā)送消息即可。
啟動(dòng)服務(wù),觀察RabbitMQ上的隊(duì)列 ,自動(dòng)創(chuàng)建了一個(gè)
點(diǎn)進(jìn)去看下
MyMsgInput和 在接口中的定義一致 。
訪問(wèn): http://localhost:8081/sendMsgByStream
觀察日志:
2019-04-13 10:56:32.749 INFO 820 --- [nio-8081-exec-4] com.artisan.order.message.StreamReceive : StreamReceive: I am one msg sent by Spring Cloud Stream接收方收到了一條消息如上,OK。
消費(fèi)組
需求: 由于服務(wù)可能會(huì)有多個(gè)實(shí)例同時(shí)在運(yùn)行,我們只希望消息被一個(gè)實(shí)例所接收
先來(lái)改造下項(xiàng)目,啟動(dòng)多個(gè)服務(wù)實(shí)例
為了多啟動(dòng)幾個(gè)節(jié)點(diǎn),我們需要把定義在遠(yuǎn)端Git上的要加載到bootstrap.yml中的端口信息給注釋掉,否則第二個(gè)端口因端口沖突起不來(lái)。
然后通過(guò)如下方式在JVM參數(shù)中指定啟動(dòng)端口
第一個(gè)app 啟動(dòng)端口 -Dserver.port=8082
第一個(gè)app 啟動(dòng)端口 -Dserver.port=5656
啟動(dòng)后查看在Eureka Server上的注冊(cè)情況
再看看RabbitMQ的消息隊(duì)列情況,兩個(gè) OK
舊版本中 ,如果不做任何設(shè)置,此時(shí)發(fā)送一條消息將會(huì)被所有的實(shí)例接收到 ,但是可以通過(guò)消息分組來(lái)解決 。
具體可參考: https://segmentfault.com/a/1190000011796459
主要是配置分組
spring:cloud:stream:bindings:# MyMsgInput 自定義 order消費(fèi)組MyMsgInput:# 消息組的名稱group: order#輸入通道的主題名destination: MyMsgInput#存在消息隊(duì)列中的消息,如果是復(fù)雜對(duì)象,則以JSON的形式展示content-type: application/json新版本:
Spring Boot : 2.0.3.RELEASE
Spring Cloud : Finchley.RELEASE
經(jīng)過(guò)測(cè)試 不存在這個(gè)問(wèn)題
把這倆節(jié)點(diǎn)的日志信息都清空掉,重新發(fā)送個(gè)消息
我們就用5656這個(gè)節(jié)點(diǎn)好了 ,http://localhost:5656/sendMsgByStream
經(jīng)過(guò)驗(yàn)證只有5656這一個(gè)節(jié)點(diǎn)收到了消息。無(wú)需設(shè)置分組。
發(fā)送復(fù)雜對(duì)象
上面的例子中我們發(fā)送的是一個(gè)字符串,
如果是復(fù)雜對(duì)象呢? 來(lái)測(cè)試下
@GetMapping("/sendMsgByStream2")public void sendMsgByStream2(){OrderDTO orderDTO = new OrderDTO();orderDTO.setOrderId("11111");orderDTO.setOrderAmount(new BigDecimal(9999));sink.input().send(MessageBuilder.withPayload(orderDTO).build());}啟動(dòng)5656端口的服務(wù),訪問(wèn) http://localhost:5656/sendMsgByStream2
觀察日志:
2019-04-13 17:06:47.438 INFO 13764 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)OK。
這是我們?nèi)绻严⑾M(fèi)方注釋掉,讓消息累計(jì)在消息隊(duì)列中,我們?nèi)タ聪孪㈥?duì)列中存儲(chǔ)的復(fù)雜對(duì)象的格式
啟動(dòng)5656端口的服務(wù),訪問(wèn) http://localhost:5656/sendMsgByStream2
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers消息回執(zhí)
消費(fèi)者收到消息后給發(fā)送方一個(gè)ACK確認(rèn),該如何做呢?
比如接收到消息后,返回給ArtisanSource.OUTPUT一個(gè)消息,直接使用@SendTo直接即可,就會(huì)將返回的字符串發(fā)送給ArtisanSource.OUTPUT通道
定義一個(gè)
package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel;public interface ArtisanSource {String OUTPUT = "MyMsgOutput";@Output(ArtisanSource.OUTPUT)MessageChannel output(); }寫(xiě)一個(gè)該消息的接收方
package com.artisan.order.message;import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component;/*** 接收方*/@Component // Step1 注解 綁定剛才的接口 @EnableBinding(ArtisanSource.class) @Slf4j public class StreamReceive2 {// Step2 @StreamListener 綁定對(duì)象的名稱@StreamListener(ArtisanSource.OUTPUT)public void processStreamMsg2(String msg){log.info("OUTPUT StreamReceive: {}",msg);}}啟動(dòng)微服務(wù),訪問(wèn) http://localhost:5656/sendMsgByStream2
2019-04-13 18:06:51.817 INFO 972 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : INPUT StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null) 2019-04-13 18:06:51.823 INFO 972 --- [nio-5656-exec-1] c.artisan.order.message.StreamReceive2 : OUTPUT StreamReceive: received OK代碼
https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan_order
總結(jié)
以上是生活随笔為你收集整理的Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Spring Boot2.x-15 整合
- 下一篇: Spring Cloud【Finchle