日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ消息

發布時間:2023/12/3 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ消息 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

如何確保RabbitMQ消息的可靠性?

  • 開啟生產者確認機制,確保生產者的消息能到達隊列
  • 開啟持久化功能,確保消息未消費前在隊列中不會丟失
  • 開啟消費者確認機制為auto,由spring確認消息處理成功后完成ack
  • 開啟消費者失敗重試機制,并設置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機,交由人工處理

1.生產者確認機制

  • 對應配置:
logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debug # Debug Info Warn Error Fatal spring:rabbitmq:host: 192.168.23.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /publisher-confirm-type: correlated #ConfirmCallback 生產者消費確認到交換機publisher-returns: true #ConfirmCallback ReturnCallback 到隊列template:mandatory: true

  • 啟動配置類
    每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項目啟動過程中配置

ApplicationContextAware ->bean工廠通知->拿到rabbitTemplate

@Slf4j @Configuration //生產者消息確認,確認信心到達隊列 public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//獲取RabbitTemplate對象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {//判斷是否是延遲消息if(message.getMessageProperties().getReceivedDelay()>0){return;}//失敗時才會回調//處理:記錄日志log.error("消息發送到隊列失敗,響應碼:{},失敗原因:{},交換機:{},路由key:{},消息:{}",replyCode,replyText,exchange,routingKey,message);//可以得到所有的錯誤信息,有需要的話,可以選擇重發信息});} }
  • 消息發送
@Test//生產者消息確認,確認信息到達交換機public void testSendMessage2SimpleQueue1() throws InterruptedException {String routingKey = "red";// 1.消息體String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封裝到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(confirm -> {if (confirm.isAck()){//ASClog.debug("消息發送到交換機成功:ID:{}",correlationData.getId());}else {//nASClog.debug("消息發送到交換機失敗:ID:{},原因:{}",correlationData.getId(),confirm.getReason());}}, throwable -> {log.error("消息發送異常, ID:{}, 原因{}",correlationData.getId(),throwable.getMessage());});// 4.發送消息rabbitTemplate.convertAndSend("exchange.direct", routingKey, message,correlationData);// 休眠一會兒,等待ack回執Thread.sleep(2000);}

2.消息持久化

  • 交換機持久化
    RabbitMQ中交換機默認是非持久化的,mq重啟后就丟失
    默認情況下,由SpringAMQP聲明的交換機都是持久化的
@Bean public DirectExchange simpleExchange(){// 三個參數:交換機名稱、是否持久化、當沒有queue與其綁定時是否自動刪除return new DirectExchange("simple.direct", true, false); }@RabbitListener value = @Queue(name = "dl.ttl.queue", durable = "true"), 持久化exchange = @Exchange(name = "dl.ttl.direct",durable = "true"), //死信交換機
  • 隊列持久化
    RabbitMQ中隊列默認是非持久化的,mq重啟后就丟失
    默認情況下,由SpringAMQP聲明的隊列都是持久化的
@Bean public Queue simpleQueue(){// 使用QueueBuilder構建隊列,durable就是持久化的return QueueBuilder.durable("simple.queue").build(); }
  • 消息持久化
    利用SpringAMQP發送消息時,可以設置消息的屬性(MessageProperties),指定delivery-mode
    默認情況下,SpringAMQP發出的任何消息都是持久化的,不用特意指定

3.1消費者確認機制

RabbitMQ是閱后即焚機制,RabbitMQ確認消息被消費者消費后會立刻刪除。

而RabbitMQ是通過消費者回執來確認消費者是否成功處理消息的:消費者獲取消息后,應該向RabbitMQ發送ACK回執,表明自己已經處理消息。

設想這樣的場景:

  • RabbitMQ投遞消息給消費者
  • 消費者獲取消息后,返回ACK給RabbitMQ
  • RabbitMQ刪除消息
  • 消費者宕機,消息尚未處理
  • 這樣,消息就丟失了。因此消費者返回ACK的時機非常重要。

    而SpringAMQP則允許配置三種確認模式:

    • manual:手動ack,需要在業務代碼結束后,調用api發送ack。
    • auto:自動ack,由spring監測listener代碼是否出現異常,沒有異常則返回ack;拋出異常則返回nack
    • none:關閉ack,MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除

    由此可知:

    • none模式下,消息投遞是不可靠的,可能丟失
    • auto模式類似事務機制,出現異常時返回nack,消息回滾到mq;沒有異常,返回ack
    • manual:自己根據業務情況,判斷什么時候該ack

    一般,我們都是使用默認的auto即可

    3.2消費失敗重試機制

    • 重試接收的交換機及隊列配置類
    @Configuration public class ExchangeErrorQueueConfig {private final String ExchangeName ="error.direct";private final String QueueName ="error.queue";private final String RoutingKey ="error";@Bean//定義錯誤交換機public DirectExchange errorMessageExchange(){return new DirectExchange(ExchangeName);}//定義錯誤處理隊列@Beanpublic Queue errorQueue(){return new Queue(QueueName);}//將交換機和隊列綁定@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(RoutingKey);}//定義一個RepublishMessageRecoverer,關聯隊列和交換機@Beanpublic RepublishMessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,ExchangeName,RoutingKey);} }

    消費者兩種模式配置

    logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debug spring:rabbitmq:host: 192.168.23.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1#acknowledge-mode: none # 關閉ack 消息處理拋異常時,消息依然被RabbitMQ刪除acknowledge-mode: auto # ack 自動返回結果retry:enabled: true # 開啟消費者失敗重試 在消費者本地重試,不會返回隊列initial-interval: 1000 # 初識的失敗等待時長為1秒multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-intervalmax-attempts: 3 # 最大重試次數stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false

    總結

    以上是生活随笔為你收集整理的RabbitMQ消息的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。