日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

springcloud 相同服务名_SpringCloud系列之SpringCloud Stream

發布時間:2023/12/10 javascript 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springcloud 相同服务名_SpringCloud系列之SpringCloud Stream 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

SpringCloud Stream

技術興起的原因:為了解決系統中不同中間件的適配問題,出現了cloud stream,采用適配綁定的方式,自動給不同的MQ之間進行切換。

屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型。

官方定義Spring Cloud Stream是一個構建消息驅動微服務的框架。

應用程序通過inputs(消費者)或者outputs(生產者)來與Spring Cloud Stream中binder對象交互。通過我們配置來綁定,而Spring Cloud Stream的binder對象負責與消息中間件交互。

Spring Cloud Stream為一些供應商的消息中間件產品提供了個性化的自動配置,引用了發布、訂閱、消費、分區的三個核心概念。

官方版本目前僅僅支持RabbitMQ和Kafka。

MQ相關術語

Message:生產者/消費者之間靠消息媒介傳遞信息內容

MessageChannel:消息必須走特定的通道

消息通道的子接口SubscribableChannel,由MessageHandle消息處理器所訂閱。

相關注解

Middleware:中間件,目前只支持RabbitMQ和Kafka

Binder:應用層和消息中間件之間的封裝,實現了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型,這些可以通過配置文件修改。

Input:表示輸入通道,消息進入該通道傳到應用程序。

Output:注解標識輸出通道,發布的消息將通過該通道離開應用程序。

StreamListener:監聽隊列,用于消費者的隊列的消息接收。

EnableBinding:將信道channel和exchange綁定在一起。

首先創建一個provider,服務提供者rabbitmq-provider8801

導入依賴

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-actuator

org.springframework.cloud

spring-cloud-starter-netflix-eureka-client

org.springframework.cloud

spring-cloud-starter-stream-rabbit

org.springframework.boot

spring-boot-devtools

runtime

true

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

編寫配置文件application.yml

server:

port: 8801

spring:

application:

name: cloud-stream-provider

cloud:

stream:

binders: # 在此處配置要綁定的rabbitmq的服務信息;

defaultRabbit: # 表示定義的名稱,用于于binding整合

type: rabbit # 消息組件類型

environment: # 設置rabbitmq的相關的環境配置

spring:

rabbitmq:

host: 192.168.31.52 #rabbitmq服務啟動所在機器的IP地址

port: 5672

username: guest

password: guest

bindings: # 服務的整合處理

output: # 這個名字是一個通道的名稱

destination: studyExchange # 表示要使用的Exchange名稱定義

content-type: application/json # 設置消息類型,本次為json,文本則設置“text/plain”

binder: defaultRabbit # 設置要綁定的消息服務的具體設置

eureka:

client: # 客戶端進行Eureka注冊的配置

service-url:

defaultZone: http://localhost:7001/eureka

instance:

lease-renewal-interval-in-seconds: 2# 設置心跳的時間間隔(默認是30秒)

lease-expiration-duration-in-seconds: 5# 如果現在超過了5秒的間隔(默認是90秒)

instance-id: send-8801.com # 在信息列表時顯示主機名稱

prefer-ip-address: true# 訪問的路徑變為IP地址

編寫一個發送數據的接口IMessageProvider

public interface IMessageProvider {

String sendMessage();

}

接口的實現類IMessageProviderImpl

@EnableBinding(Source.class) //定義消息的推送管道

public class IMessageProviderImpl implements IMessageProvider

{

@Resource

private MessageChannel output; // 消息發送管道

@Override

public String sendMessage()

{

String serial = UUID.randomUUID().toString();

output.send(MessageBuilder.withPayload(serial).build());

System.out.println("*****serial: "+serial);

return null;

}

}

controller層下的SendMessageController

@RestController

public class SendMessageController {

@Autowired

private IMessageProvider iMessageProvider;

@GetMapping(value = "/sendMessage")

public String send(){

return iMessageProvider.sendMessage();

}

}

啟動Eureka7001,啟動服務提供者8801.啟動虛擬機上的RabbitMQ

記得把虛擬機防火墻關了。

[hadoop@centos7 bin]$ systemctl stop firewalld

[hadoop@centos7 bin]$ systemctl status firewalld

然后測試一下服務提供者是否正常運行。

控制臺輸出UUID。

然后再創建一個服務消費者,在MQ的另一端進行消費消息。

創建另一個模塊,cloud-stream-rabbitmq-consumer8802

導入依賴

org.springframework.boot

spring-boot-starter-web

org.springframework.cloud

spring-cloud-starter-netflix-eureka-client

org.springframework.cloud

spring-cloud-starter-stream-rabbit

org.springframework.boot

spring-boot-starter-actuator

org.springframework.boot

spring-boot-devtools

runtime

true

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

和上一個服務提供者的依賴一樣。

寫配置文件application.yml

server:

port: 8802

spring:

application:

name: cloud-stream-consumer

cloud:

stream:

binders: # 在此處配置要綁定的rabbitmq的服務信息;

defaultRabbit: # 表示定義的名稱,用于于binding整合

type: rabbit # 消息組件類型

environment: # 設置rabbitmq的相關的環境配置

spring:

rabbitmq:

host: 192.168.31.52

port: 5672

username: guest

password: guest

bindings: # 服務的整合處理

input: # 這個名字是一個通道的名稱

destination: studyExchange # 表示要使用的Exchange名稱定義

content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”

binder: defaultRabbit # 設置要綁定的消息服務的具體設置

eureka:

client: # 客戶端進行Eureka注冊的配置

service-url:

defaultZone: http://localhost:7001/eureka

instance:

lease-renewal-interval-in-seconds: 2# 設置心跳的時間間隔(默認是30秒)

lease-expiration-duration-in-seconds: 5# 如果現在超過了5秒的間隔(默認是90秒)

instance-id: receive-8802.com # 在信息列表時顯示主機名稱

prefer-ip-address: true# 訪問的路徑變為IP地址

創建一個消費者的ReceiveMessageController

@Component

@EnableBinding(Sink.class)

public class ReceiveMessageController {

@Value("${server.port}")

private String serverPort;

@StreamListener(Sink.INPUT)

public void input(Message message){

System.out.println("message = "+message.getPayload()+"\t"+"serverPort= "+serverPort);

}

}

如果消費者成功接收消息,則在控制臺輸出產生的UUID和端口號。

啟動Eureka7001,啟動服務提供者8801,啟動服務消費者8802,還有MQ。

在Eureka中可以看到兩個服務已經啟動。

每次請求http://localhost:8801/sendMessage;消費者都能輸出結果,輸出的UUID與提供者的一致。

登錄RabbitMQ的web管理,可以看到我們新建的exchange,并且可以查看消息隊列中的請求次數的情況。

發送的消息除了可以是字符串類型還可以發送對象,在消費者接受數據的時候,會將實體轉換成JSON字符串。

配置文件中,如果你使用的消息中間件是kafka,type: kafka;environment是設置消息中間件的配置信息,端口,主機地址,用戶名,密碼等,可以設置多個binder,適應不同的場景。

重復消費問題

默認情況下,每個消費者的分組名都是隨機的,不同的,對于不同的組會引起重復消費的問題,例如:消息提供者只向消息隊列中發送了一個消息,正常情況下,消費者A從隊列中拿走之后,消費者B不能再獲得相同的消息,但是由于AB是不同的組,所以A和B都會獲取相同的消息,這就導致了資源被重復消費。

微服務應用放置到同一個group中,就能夠保證消息只會被其中應用消費一次,不同的組是可以消費的,同一個組內會發生競爭關系,只有其中一個可以消費。

同一個應用的不同微服務,只用在配置文件中指定相同的group。

再次發送消息時,只有消費者其中一個能消費。避免了重復消費。

消息持久化

當兩個消費者A和B,A設置了group屬性值,B沒有設置,這時,消費者全部宕機,但是消息生產者一直響MQ中生產消息,這時候重啟A和B兩者有什么區別呢?

正因為B沒有這時分組,B再次啟動后不會再向MQ中取數據,而A啟動成功后可以正常消費消息隊列中的消息。

因此設置了group的消費者,可以保證消息隊列中的消息持久化,group對于消費者來講很重要,既能避免重復消費,又能在消費者重啟后依然可以消費消息隊列中未消費的消息。

總結

以上是生活随笔為你收集整理的springcloud 相同服务名_SpringCloud系列之SpringCloud Stream的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。