日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 前端技术 > javascript >内容正文

javascript

Spring Cloud Stream如何消费自己生产的消息

發(fā)布時(shí)間:2025/1/21 javascript 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud Stream如何消费自己生产的消息 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在上一篇《Spring Cloud Stream如何處理消息重復(fù)消費(fèi)》中,我們通過(guò)消費(fèi)組的配置解決了多實(shí)例部署情況下消息重復(fù)消費(fèi)這一入門時(shí)的常見(jiàn)問(wèn)題。本文將繼續(xù)說(shuō)說(shuō)在另外一個(gè)被經(jīng)常問(wèn)到的問(wèn)題:如果微服務(wù)生產(chǎn)的消息自己也想要消費(fèi)一份,應(yīng)該如何實(shí)現(xiàn)呢?

  • 常見(jiàn)錯(cuò)誤

在放出標(biāo)準(zhǔn)答案前,先放出一個(gè)常見(jiàn)的錯(cuò)誤姿勢(shì)和告警信息(以便您可以通過(guò)搜索引擎找到這里_)。以下錯(cuò)誤基于Spring Boot 2.0.5、Spring Cloud Finchley SR1。

首先,根據(jù)入門示例,為了生產(chǎn)和消費(fèi)消息,需要定義兩個(gè)通道:一個(gè)輸入、一個(gè)輸出。比如下面這樣:

public interface TestTopic {String OUTPUT = "example-topic";String INPUT = "example-topic";@Output(OUTPUT)MessageChannel output();@Input(INPUT)SubscribableChannel input();}

通過(guò)INPUT和OUTPUT使用相同的名稱,讓生產(chǎn)消息和消費(fèi)消息指向相同的Topic,從而實(shí)現(xiàn)消費(fèi)自己發(fā)出的消息。

接下來(lái),創(chuàng)建一個(gè)HTTP接口,并通過(guò)上面定義的輸出通道觸來(lái)生產(chǎn)消息,比如:

@Slf4j @RestController public class TestController {@Autowiredprivate TestTopic testTopic;@GetMapping("/sendMessage")public String messageWithMQ(@RequestParam String message) {testTopic.output().send(MessageBuilder.withPayload(message).build());return "ok";}}

已經(jīng)有生產(chǎn)消息的實(shí)現(xiàn),下面來(lái)創(chuàng)建對(duì)輸入通道的監(jiān)聽(tīng),以實(shí)現(xiàn)消息的消費(fèi)邏輯。

@Slf4j @Component public class TestListener {@StreamListener(TestTopic.INPUT)public void receive(String payload) {log.info("Received: " + payload);throw new RuntimeException("BOOM!");}}

最后,在應(yīng)用主類中,使用@EnableBinding注解來(lái)開(kāi)啟它,比如:

@EnableBinding(TestTopic.class) @SpringBootApplication public class TestApplication {public static void main(String[] args) {SpringApplication.run(TestApplication.class, args);}}

看似天衣無(wú)縫的操作,然而在啟動(dòng)的瞬間,你可能收到了下面這樣的錯(cuò)誤:

org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.didispace.stream.TestTopic; factoryMethodName=input; initMethodName=null; destroyMethodName=nullat org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:64) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:54) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:86) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:562) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:541) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:76) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:45) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:358) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[na:1.8.0_151]at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:357) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:328) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:233) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:271) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:91) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:694) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:532) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:61) ~[spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:780) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at org.springframework.boot.SpringApplication.run(SpringApplication.java:1277) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at org.springframework.boot.SpringApplication.run(SpringApplication.java:1265) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]at com.didispace.stream.TestApplication.main(TestApplication.java:13) [classes/:na]
  • 正確姿勢(shì)

根據(jù)錯(cuò)誤提示:Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists,沒(méi)有啟動(dòng)成功的原因是已經(jīng)存在了一個(gè)名為example-topic的Bean,那么為什么會(huì)重復(fù)創(chuàng)建這個(gè)Bean呢?

實(shí)際上,在F版的Spring Cloud Stream中,當(dāng)我們使用@Output和@Input注解來(lái)定義消息通道時(shí),都會(huì)根據(jù)傳入的通道名稱來(lái)創(chuàng)建一個(gè)Bean。而在上面的例子中,我們定義的@Output和@Input名稱是相同的,因?yàn)槲覀兿到y(tǒng)輸入和輸出是同一個(gè)Topic,這樣才能實(shí)現(xiàn)對(duì)自己生產(chǎn)消息的消費(fèi)。

既然這樣,我們定義相同的通道名是行不通了,那么我們只能通過(guò)定義不同的通道名,并為這兩個(gè)通道配置相同的目標(biāo)Topic來(lái)將這一對(duì)輸入輸出指向同一個(gè)實(shí)際的Topic。對(duì)于上面的錯(cuò)誤程序,只需要做如下兩處改動(dòng):

第一步:修改通道名,使用不同的名字

public interface TestTopic {String OUTPUT = "example-topic-output";String INPUT = "example-topic-input";@Output(OUTPUT)MessageChannel output();@Input(INPUT)SubscribableChannel input();}

第二步:在配置文件中,為這兩個(gè)通道設(shè)置相同的Topic名稱,比如:

spring.cloud.stream.bindings.example-topic-input.destination=aaa-topic spring.cloud.stream.bindings.example-topic-output.destination=aaa-topic

這樣,這兩個(gè)輸入輸出通道就會(huì)都指向名為aaa-topic的Topic了。
最后,再啟動(dòng)該程序,沒(méi)有報(bào)錯(cuò)。然后訪問(wèn)接口:localhost:8080/sendMessage?message=hello-didi,可以在控制臺(tái)中看到如下信息:

2018-11-17 23:24:10.425 INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672] 2018-11-17 23:24:10.453 INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#266753da:0/SimpleConnection@627fba83 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60752] 2018-11-17 23:24:10.458 INFO 32039 --- [ctor-http-nio-2] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (aaa-topic.anonymous.fNUxZ8C0QIafxrhkFBFI1A) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost. 2018-11-17 23:24:10.483 INFO 32039 --- [IafxrhkFBFI1A-1] com.didispace.stream.TestListener

消費(fèi)自己生產(chǎn)的消息成功了!讀者也還可以訪問(wèn)一下應(yīng)用的/actuator/beans端點(diǎn),看看當(dāng)前Spring上下文中有哪些Bean,應(yīng)該可以看到有下面Bean,也就是上面分析的兩個(gè)通道的Bean對(duì)象

"example-topic-output": {"aliases": [],"scope": "singleton","type": "org.springframework.integration.channel.DirectChannel","resource": null,"dependencies": [] }, "example-topic-input": {"aliases": [],"scope": "singleton","type": "org.springframework.integration.channel.DirectChannel","resource": null,"dependencies": [] },
  • 后記

其實(shí)大部分開(kāi)發(fā)者在使用Spring Cloud Stream時(shí)候碰到的問(wèn)題都源于對(duì)Spring Cloud Stream的核心概念還是不夠理解。所以,還是推薦讀一下下面的文章和示例:

  • 入門示例

  • 核心概念

  • 消費(fèi)組

  • 消費(fèi)分區(qū)

  • 代碼示例

本文示例讀者可以通過(guò)查看下面?zhèn)}庫(kù)的中的stream-consumer-self項(xiàng)目:

  • Github
  • Gitee

本文由 程序猿DD-翟永超 創(chuàng)作,采用 CC BY 3.0 CN協(xié)議 進(jìn)行許可。 可自由轉(zhuǎn)載、引用,但需署名作者且注明文章出處。如轉(zhuǎn)載至微信公眾號(hào),請(qǐng)?jiān)谖哪┨砑幼髡吖娞?hào)二維碼。

與50位技術(shù)專家面對(duì)面20年技術(shù)見(jiàn)證,附贈(zèng)技術(shù)全景圖

總結(jié)

以上是生活随笔為你收集整理的Spring Cloud Stream如何消费自己生产的消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。