RabbitMQ –使用Spring集成Java DSL串行处理消息
如果您曾經需要使用RabbitMQ來串行處理消息,并且有一群監聽器來處理消息,那么我所看到的最好方法是在監聽器上使用“獨占消費者”標志,每個監聽器上有1個線程來處理消息。
專用使用者標志可確保只有1個使用者可以從特定隊列中讀取消息,并且該使用者上的1個線程可確保按順序處理消息。 但是有一個問題,我待會兒再講。
讓我用基于Spring Boot和Spring Integration的RabbitMQ消息使用者來演示這種行為。
首先,這是用于使用Spring java配置設置隊列的配置,請注意,由于這是Spring Boot應用程序,因此在將Spring-amqp庫添加到依賴項列表時,它將自動創建RabbitMQ連接工廠:
@Configuration @Configuration public class RabbitConfig {@Autowiredprivate ConnectionFactory rabbitConnectionFactory;@Beanpublic Queue sampleQueue() {return new Queue("sample.queue", true, false, false);}}給定這個示例隊列,一個從該隊列獲取消息并對其進行處理的偵聽器如下所示,該流程是使用出色的Spring集成Java DSL庫編寫的:
@Configuration public class RabbitInboundFlow {private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class);@Autowiredprivate RabbitConfig rabbitConfig;@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();listenerContainer.setConnectionFactory(this.connectionFactory);listenerContainer.setQueues(this.rabbitConfig.sampleQueue());listenerContainer.setConcurrentConsumers(1);listenerContainer.setExclusive(true);return listenerContainer;}@Beanpublic IntegrationFlow inboundFlow() {return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer())).transform(Transformers.objectToString()).handle((m) -> {logger.info("Processed {}", m.getPayload());}).get();}}該流程非常簡潔地用inboundFlow方法表示,RabbitMQ的消息有效負載從字節數組轉換為String,最后只需將消息記錄到日志中即可進行處理。
該流程的重要部分是偵聽器配置,請注意將使用者設置為獨占使用者的標志,并且在該使用者中將線程處理數設置為1。即使僅啟動了應用程序的多個實例,該處理數也被設置為1。其中一個監聽器將能夠連接和處理消息。
現在來看問題,考慮一種情況,消息處理需要一段時間才能完成,并且在消息處理期間會回滾。 如果處理消息的應用程序實例在處理此類消息的過程中被停止,則行為是另一個實例將開始處理隊列中的消息,當停止的實例回滾消息時,該回滾然后將郵件傳遞給新的排他消費者,從而使郵件混亂。
- 如果您有興趣進一步探索它,可以使用以下github項目來使用此功能:https://github.com/bijukunjummen/test-rabbit-exclusive。
翻譯自: https://www.javacodegeeks.com/2014/12/rabbitmq-processing-messages-serially-using-spring-integration-java-dsl.html
總結
以上是生活随笔為你收集整理的RabbitMQ –使用Spring集成Java DSL串行处理消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: foxy中文(foxy中文什么意思)
- 下一篇: 企业Java中事务隔离级别的初学者指南