javascript
使用Spring Cloud Stream与RabbitMQ集成
在我以前的文章中,我寫了兩個系統之間非常簡單的集成場景-一個生成一個工作單元,另一個處理該工作單元,以及Spring Integration如何使這種集成非常容易。
在這里,我將演示如何使用Spring Cloud Stream進一步簡化此集成方案
我在這里有示例代碼– pom.xml中提供了適用于Spring Cloud Stream的正確maven依賴關系。
制片人
因此,再次從負責生成工作單元的生產者開始。 將消息發送到RabbitMQ所需的代碼明智的全部工作就是按照以下方式進行Java配置:
@Configuration @EnableBinding(WorkUnitsSource.class) @IntegrationComponentScan public class IntegrationConfiguration {}從表面上看,這看似簡單,但在幕后做了很多事情,據我了解并從文檔中了解到,這些是該配置觸發的:
1.創建基于綁定到@EnableBinding批注的類的Spring Integration消息通道。 上面的WorkUnitsSource類是一個稱為“ worksChannel”的自定義通道的定義,如下所示:
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel;public interface WorkUnitsSource {String CHANNEL_NAME = "worksChannel";@OutputMessageChannel worksChannel();}2.根據運行時可用的“綁定程序”實現(例如RabbitMQ,Kaffka,Redis,Gemfire),上一步中的通道將連接到系統中的適當結構–因此,例如,我希望我的“ worksChannel”依次發送消息到RabbitMQ,Spring Cloud Stream將負責在RabbitMQ中自動創建主題交換
我希望就數據如何發送到RabbitMQ進行一些進一步的自定義-特別是我希望域對象在發送之前先序列化為json,并且我想指定將有效負載發送到的RabbitMQ交換的名稱。由某些配置控制,這些配置可以使用yaml文件以以下方式附加到通道:
spring:cloud:stream:bindings:worksChannel:destination: work.exchangecontentType: application/jsongroup: testgroup最后一個細節是應用程序其余部分與Spring Cloud Stream交互的方式,可以通過定義消息網關直接在Spring Integration中完成:
import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import works.service.domain.WorkUnit;@MessagingGateway public interface WorkUnitGateway {@Gateway(requestChannel = WorkUnitsSource.CHANNEL_NAME)void generate(WorkUnit workUnit);}基本上就是這樣,Spring Cloud Stream現在將連接整個Spring集成流程,并在RabbitMQ中創建適當的結構。
消費者
與生產者類似,首先我想定義一個名為“ worksChannel”的通道,該通道將處理來自RabbitMQ的傳入消息:
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel;public interface WorkUnitsSink {String CHANNEL_NAME = "worksChannel";@InputSubscribableChannel worksChannel(); }然后讓Spring Cloud Stream根據此定義創建通道和RabbitMQ綁定:
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.context.annotation.Configuration;@Configuration @EnableBinding(WorkUnitsSink.class) public class IntegrationConfiguration {}為了處理消息,Spring Cloud Stream提供了一個偵聽器,可以通過以下方式創建它:
@Service public class WorkHandler {private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);@StreamListener(WorkUnitsSink.CHANNEL_NAME)public void process(WorkUnit workUnit) {LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());} }最后是將這個通道連接到yaml文件中表示的RabbitMQ基礎結構的配置:
spring:cloud:stream:bindings:worksChannel:destination: work.exchangegroup: testgroup現在,如果啟動了生產者和任何數量的使用者,則通過生產者發送的消息將作為json發送到Rabbit MQ主題交換,由使用者檢索,反序列化為對象并傳遞給工作處理器。
現在,純粹由Spring Cloud Stream庫按照慣例處理創建RabbitMQ基礎結構所涉及的大量樣板。 盡管Spring Cloud Stream嘗試提供原始Spring Integration的基礎,但是掌握Spring Integration的基本知識以有效使用Spring Cloud Stream還是很有用的。
此處描述的示例可在我的github存儲庫中找到
翻譯自: https://www.javacodegeeks.com/2016/08/integrating-rabbitmq-using-spring-cloud-stream.html
總結
以上是生活随笔為你收集整理的使用Spring Cloud Stream与RabbitMQ集成的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓设置按钮背景颜色(安卓设置按钮)
- 下一篇: Spring Batch:多种格式输出编