日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务

發(fā)布時(shí)間:2025/3/21 javascript 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 概述
  • 添加依賴
  • 配置文件配置RabbitMQ的地址信息
  • 接口定義
  • 接收方 @EnableBinding @StreamListener
  • 測試
  • 消費(fèi)組
  • 發(fā)送復(fù)雜對象
  • 消息回執(zhí)
  • 代碼

概述

官網(wǎng) : https://spring.io/projects/spring-cloud-stream

概括來說,Spring Cloud Stream 進(jìn)一步封裝了消息隊(duì)列,可以做到代碼層面對消息隊(duì)列無感知。

這里我們僅僅是做個(gè)入門級別的介紹,更多用法還是參考官網(wǎng)上的指導(dǎo)說明,畢竟最權(quán)威了。


添加依賴

無需多說,要想使用Spring Cloud Stream ,第一步肯定是添加依賴了 ,如下

這里使用的消息隊(duì)列是 RabbitMQ ,如果你是用的是kafka,換成對應(yīng)的spring-cloud-starter-stream-kafka依賴即可

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>


配置文件配置RabbitMQ的地址信息

spring-cloud-starter-stream-rabbit是Spring Cloud Stream對RabbitMQ的封裝,包含了對RabbitMQ的自動化配置,比如連接的RabbitMQ的默認(rèn)地址localhost,默認(rèn)端口5672,默認(rèn)用戶guest,默認(rèn)密碼guest,如果采用的是如上默認(rèn)配置,可以不用修改配置。

這里我把配置文件放到了遠(yuǎn)端的Git,通過config server 拉取配置。

RabbitMQ的安裝 ,這里我選擇了使用Docker鏡像,安裝如下

在Docker CE中安裝RabbitMQ


接口定義

可知: Sink和Source兩個(gè)接口分別定義了輸入通道和輸出通道,Processor通過繼承Source和Sink,同時(shí)具有輸入通道和輸出通道。這里我們就模仿Sink和Source,自定義一個(gè)消息通道。

package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel;public interface ArtisanSink {// 同一個(gè)服務(wù)里面的通道名字不能一樣,在不同的服務(wù)里可以相同名字的通道// 否則啟動拋出如下異常 bean definition with this name already existsString INPUT = "MyMsgInput";@Input(ArtisanSink.INPUT)SubscribableChannel input();}

如上定義了一個(gè)名為MyMsgInput的消息輸入通道,@Input注解的參數(shù)則表示了消息通道的名稱


接收方 @EnableBinding @StreamListener

StreamReceive 用來接收RabbitMQ發(fā)送來的消息

package com.artisan.order.message;import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component;/*** 接收方*/@Component // Step1 注解 綁定剛才的接口 @EnableBinding(ArtisanSink.class) @Slf4j public class StreamReceive {// Step2 @StreamListener 綁定對象的名稱@StreamListener(ArtisanSink.INPUT)public void processStreamMsg(Object msg){log.info("StreamReceive: {}",msg);}}
  • 第一步: 使用了@EnableBinding注解實(shí)現(xiàn)對消息通道的綁定,我們在該注解中還傳入了一個(gè)參數(shù)ArtisanSink.class,ArtisanSink是一個(gè)自定義接口,主要功能是實(shí)現(xiàn)對輸入消息通道綁定的定義。
  • 第二步:在StreamReceive 類中定義了processStreamMsg方法,重點(diǎn)是在該方法上添加了@StreamListener注解,該注解表示該方法為消息中間件上數(shù)據(jù)流的事件監(jiān)聽器,ArtisanSink.INPUT參數(shù)表示這是input消息通道上的監(jiān)聽處理器。

測試

模擬發(fā)送發(fā)發(fā)送消息,方便起見,我們直接在Controller層寫個(gè)方法吧

package com.artisan.order.controller;import com.artisan.order.message.Sink; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;@RestController @Slf4j public class MsgStreamController {@Autowiredprivate ArtisanSink sink;@GetMapping("/sendMsgByStream")public void sendMsgByStream(){String message = "I am one msg sent by Spring Cloud Stream";sink.input().send(MessageBuilder.withPayload(message).build());} }

通過 @Autowired自動注入剛才的Sink接口,然后調(diào)用 sink.input().send方法發(fā)送消息即可。

啟動服務(wù),觀察RabbitMQ上的隊(duì)列 ,自動創(chuàng)建了一個(gè)

點(diǎn)進(jìn)去看下

MyMsgInput和 在接口中的定義一致 。

訪問: http://localhost:8081/sendMsgByStream

觀察日志:

2019-04-13 10:56:32.749 INFO 820 --- [nio-8081-exec-4] com.artisan.order.message.StreamReceive : StreamReceive: I am one msg sent by Spring Cloud Stream

接收方收到了一條消息如上,OK。


消費(fèi)組

需求: 由于服務(wù)可能會有多個(gè)實(shí)例同時(shí)在運(yùn)行,我們只希望消息被一個(gè)實(shí)例所接收

先來改造下項(xiàng)目,啟動多個(gè)服務(wù)實(shí)例

為了多啟動幾個(gè)節(jié)點(diǎn),我們需要把定義在遠(yuǎn)端Git上的要加載到bootstrap.yml中的端口信息給注釋掉,否則第二個(gè)端口因端口沖突起不來。

然后通過如下方式在JVM參數(shù)中指定啟動端口
第一個(gè)app 啟動端口 -Dserver.port=8082
第一個(gè)app 啟動端口 -Dserver.port=5656

啟動后查看在Eureka Server上的注冊情況


再看看RabbitMQ的消息隊(duì)列情況,兩個(gè) OK


舊版本中 ,如果不做任何設(shè)置,此時(shí)發(fā)送一條消息將會被所有的實(shí)例接收到 ,但是可以通過消息分組來解決 。
具體可參考: https://segmentfault.com/a/1190000011796459

主要是配置分組

spring:cloud:stream:bindings:# MyMsgInput 自定義 order消費(fèi)組MyMsgInput:# 消息組的名稱group: order#輸入通道的主題名destination: MyMsgInput#存在消息隊(duì)列中的消息,如果是復(fù)雜對象,則以JSON的形式展示content-type: application/json

新版本:
Spring Boot : 2.0.3.RELEASE
Spring Cloud : Finchley.RELEASE

經(jīng)過測試 不存在這個(gè)問題

把這倆節(jié)點(diǎn)的日志信息都清空掉,重新發(fā)送個(gè)消息

我們就用5656這個(gè)節(jié)點(diǎn)好了 ,http://localhost:5656/sendMsgByStream
經(jīng)過驗(yàn)證只有5656這一個(gè)節(jié)點(diǎn)收到了消息。無需設(shè)置分組。


發(fā)送復(fù)雜對象

上面的例子中我們發(fā)送的是一個(gè)字符串,

如果是復(fù)雜對象呢? 來測試下

@GetMapping("/sendMsgByStream2")public void sendMsgByStream2(){OrderDTO orderDTO = new OrderDTO();orderDTO.setOrderId("11111");orderDTO.setOrderAmount(new BigDecimal(9999));sink.input().send(MessageBuilder.withPayload(orderDTO).build());}

啟動5656端口的服務(wù),訪問 http://localhost:5656/sendMsgByStream2

觀察日志:

2019-04-13 17:06:47.438 INFO 13764 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)

OK。

這是我們?nèi)绻严⑾M(fèi)方注釋掉,讓消息累計(jì)在消息隊(duì)列中,我們?nèi)タ聪孪㈥?duì)列中存儲的復(fù)雜對象的格式

啟動5656端口的服務(wù),訪問 http://localhost:5656/sendMsgByStream2

org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers

消息回執(zhí)

消費(fèi)者收到消息后給發(fā)送方一個(gè)ACK確認(rèn),該如何做呢?

比如接收到消息后,返回給ArtisanSource.OUTPUT一個(gè)消息,直接使用@SendTo直接即可,就會將返回的字符串發(fā)送給ArtisanSource.OUTPUT通道

定義一個(gè)

package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel;public interface ArtisanSource {String OUTPUT = "MyMsgOutput";@Output(ArtisanSource.OUTPUT)MessageChannel output(); }

寫一個(gè)該消息的接收方

package com.artisan.order.message;import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component;/*** 接收方*/@Component // Step1 注解 綁定剛才的接口 @EnableBinding(ArtisanSource.class) @Slf4j public class StreamReceive2 {// Step2 @StreamListener 綁定對象的名稱@StreamListener(ArtisanSource.OUTPUT)public void processStreamMsg2(String msg){log.info("OUTPUT StreamReceive: {}",msg);}}

啟動微服務(wù),訪問 http://localhost:5656/sendMsgByStream2

2019-04-13 18:06:51.817 INFO 972 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : INPUT StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null) 2019-04-13 18:06:51.823 INFO 972 --- [nio-5656-exec-1] c.artisan.order.message.StreamReceive2 : OUTPUT StreamReceive: received OK

代碼

https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan_order

總結(jié)

以上是生活随笔為你收集整理的Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。