日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

使用Spring Integration重试RabbitMQ

發布時間:2023/12/3 javascript 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用Spring Integration重试RabbitMQ 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我最近閱讀了有關使用RabbitMQ重試的方法
在這里,并想嘗試類似的方法
Spring Integration ,提供了一組很棒的集成抽象。

TL; DR解決的問題是重試一次消息(在處理失敗的情況下),兩次重試之間有較大的延遲(例如10分鐘以上)。 該方法利用RabbitMQ支持
死信交換 ,看起來像這樣

流程的要點是:

1.工作調度員創建“工作單元”,并通過交換機將其發送到RabbitMQ隊列。

2.工作隊列設置為
死信交換 。 如果消息處理由于任何原因失敗,則“工作單元”將以“工作單元死信隊列”結束。

3.依次將工作單位死信隊列與工作單位交換設置為死信交換,以此方式創建一個循環。 此外,將死信隊列中的消息過期設置為10分鐘,這樣,一旦消息過期,它將再次返回到工作單元隊列中。

4.要打破周期,一旦超過某個計數閾值,處理代碼就必須停止處理。

使用Spring Integration實現

我已經使用Spring Integration和RabbitMQ講述了一條快樂的小路
在之前 ,這里我將主要基于此代碼構建。

設置的一個很好的部分是適當的死信交換/隊列的配置,當使用Spring的Java配置表示時,看起來像這樣:

@Configuration public class RabbitConfig {@Autowiredprivate ConnectionFactory rabbitConnectionFactory;@BeanExchange worksExchange() {return ExchangeBuilder.topicExchange("work.exchange").durable().build();}@Beanpublic Queue worksQueue() {return QueueBuilder.durable("work.queue").withArgument("x-dead-letter-exchange", worksDlExchange().getName()).build();}@BeanBinding worksBinding() {return BindingBuilder.bind(worksQueue()).to(worksExchange()).with("#").noargs();}// Dead letter exchange for holding rejected work units..@BeanExchange worksDlExchange() {return ExchangeBuilder.topicExchange("work.exchange.dl").durable().build();}//Queue to hold Deadletter messages from worksQueue@Beanpublic Queue worksDLQueue() {return QueueBuilder.durable("works.queue.dl").withArgument("x-message-ttl", 20000).withArgument("x-dead-letter-exchange", worksExchange().getName()).build();}@BeanBinding worksDlBinding() {return BindingBuilder.bind(worksDLQueue()).to(worksDlExchange()).with("#").noargs();}... }

請注意,這里我將“死信”隊列的TTL設置為20秒,這意味著20秒后,一條失敗的消息將返回到處理隊列中。 一旦完成此設置并在RabbitMQ中創建了適當的結構,代碼的消耗部分將如下所示,使用
Spring Integration Java DSL :

@Configuration public class WorkInbound {@Autowiredprivate RabbitConfig rabbitConfig;@Beanpublic IntegrationFlow inboundFlow() {return IntegrationFlows.from(Amqp.inboundAdapter(rabbitConfig.workListenerContainer())).transform(Transformers.fromJson(WorkUnit.class)).log().filter("(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true", f -> f.discardChannel("nullChannel")).handle("workHandler", "process").get();}}

這里的大多數重試邏輯是由RabbitMQ基礎結構處理的,這里唯一的變化是通過在特定的2次重試后顯式丟棄消息來打破周期。 此中斷表示為上面的過濾器,查看了RabbitMQ一旦發送到Dead Letter交換后將其添加到消息的稱為“ x-death”的標頭。 過濾器確實有些丑陋-可以用Java代碼更好地表達它。

還有一點要注意的是,重試邏輯可以使用Spring Integration在過程中表示,但是我想研究一個重試時間可能很長(例如15到20分鐘)的流程,該流程在過程中無法正常工作而且也不安全,因為我希望應用程序的任何實例都可以處理消息重試。

如果您想進一步探索,請嘗試在
我的github倉庫 – https://github.com/bijukunjummen/si-dsl-rabbit-sample

參考:

使用RabbitMQ重試:http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq

翻譯自: https://www.javacodegeeks.com/2016/09/rabbitmq-retries-using-spring-integration.html

總結

以上是生活随笔為你收集整理的使用Spring Integration重试RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。