當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
SpringCloud Stream消息驱动
生活随笔
收集整理的這篇文章主要介紹了
SpringCloud Stream消息驱动
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
為啥有這個技術???
1. 這個stream是操作消息隊列的,簡化,學習消息隊列的成本降低。 2. 可操作rabbitMQ兔子message queue,kafaka,可理解為jdbc可操作oracle, mysql.. 3. spring家的技術學就完了。。stream
- 對消息驅動需要了解的概念
- 消息驅動生產者,消費者
- 再建一個服務,消息消費者,問題
對消息驅動需要了解的概念
(1) 網站 文檔
https://spring.io/projects/spring-cloud-stream#overview
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/
(2) 介紹
什么是SpringCloudStream 官方定義Spring Cloud Stream是一個構建消息驅動微服務的框架。 應用程序通過inputs或者outputs與Spring Cloud Stream中binder對象交互。 通過我們配置來binding(綁定), 而Spring Cloud Stream的binder對象負責與消息中間件交互。 所以,我們只需要搞清楚如何與Spring Cloud Stream 交互就可以訪便使用消息驅動的方式。 通過使用Spring Integration來連接消息代理中間件以實現消息事件驅動。 Spring Cloud Stream為一些供應商的消息中間件產品提供了個性化的自動化配置實現, 引用了發布-訂閱、消費組、分區的三個核心概念。 目前僅支持RabbitMQ、Kafka.(3) 標準mq
-
Message
- 生產者/消費者之間靠消息媒介傳遞信息內容
-
消息通道MessageChannel
- 消息必須走特定的通道
-
消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器訂閱
- 消息通道里的消息如何被消費呢,誰負責收發處理
(4) 為什么用Cloud Stream
1 綁定stream憑什么可以統一底層差異
- 在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行信息交互的時候,
于各消息中間件構建的初衷不同,它們的實現細節上會有較大的差異性
通過定義綁定器作為中間層,完美地實現了應用程序與消息中間件細節之間的隔離。
通過向應用程序暴露統- -的Channel通道, 使得應用程序不需要再考慮各種不同的消息中間件實現。
2 binder架構
INPUT對應于消費者 OUTPUT對應于生產者(5) Stream中的消息通信方式遵循了發布-訂閱模式
Topic主題進行廣播 在RabbitMQ就是Exchange 交換機 在kafka中就是Topic(6) 常用api,注解
消息驅動生產者,消費者
pom
ymal server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此處配置要綁定的rabbitmq的服務信息;defaultRabbit: # 表示定義的名稱,用于于binding整合type: rabbit # 消息組件類型environment: # 設置rabbitmq的相關的環境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服務的整合處理input: # 這個名字是一個通道的名稱destination: studyExchange # 表示要使用的Exchange名稱定義content-type: application/json # 設置消息類型,本次為json,文本則設置“text/plain”binder: defaultRabbit # 設置要綁定的消息服務的具體設置eureka:client: # 客戶端進行Eureka注冊的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)instance-id: receive-8802.com # 在信息列表時顯示主機名稱prefer-ip-address: true # 訪問的路徑變為IP地址
發送消息接口,實現
public interface IMessageProvider {public String send(); }/// 實現 import com.atguigu.springcloud.service.IMessageProvider; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.messaging.MessageChannel; import org.springframework.integration.support.MessageBuilder; import javax.annotation.Resource; import org.springframework.cloud.stream.messaging.Source;import javax.annotation.Resource; import java.util.UUID;@EnableBinding(Source.class) //定義消息的推送管道 public class MessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output; // 消息發送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("*****serial: "+serial);return null;} }yaml, pom同時,yaml要改端口。
定義controller接收消息
再建一個服務,消息消費者,問題
1. 消息重復。發送者發送消息,兩個消費者都會接收到消息,如果是支付多個模塊, 收到一條消息,多個模塊會收到壞賬,需要分組,只對一個支付模塊發消息. 2. 消息持久化。當關掉消費者,消息丟失。a. 新增group配置,自定義group
group: damnb. 持久化,服務掛了,保證消息不丟失
頁數配置分組解決
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的SpringCloud Stream消息驱动的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 自学电脑编程_程序人生:盲人程序员蔡勇斌
- 下一篇: docker 打包_Springboot