日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rabbitmq 启动异常_RabbitMQ:消息发送确认 与 消息接收确认(ACK)

發(fā)布時(shí)間:2024/10/8 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq 启动异常_RabbitMQ:消息发送确认 与 消息接收确认(ACK) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

默認(rèn)情況下如果一個(gè) Message 被消費(fèi)者所正確接收則會(huì)被從 Queue 中移除
如果一個(gè) Queue 沒被任何消費(fèi)者訂閱,那么這個(gè) Queue 中的消息會(huì)被 Cache(緩存),當(dāng)有消費(fèi)者訂閱時(shí)則會(huì)立即發(fā)送,當(dāng) Message 被消費(fèi)者正確接收時(shí),就會(huì)被從 Queue 中移除。

消息發(fā)送確認(rèn)
發(fā)送的消息怎么樣才算失敗或成功?如何確認(rèn)?

  • 當(dāng)消息無法路由到隊(duì)列時(shí),確認(rèn)消息路由失敗。消息成功路由時(shí),當(dāng)需要發(fā)送的隊(duì)列都發(fā)送成功后,進(jìn)行確認(rèn)消息,對(duì)于持久化隊(duì)列意味著寫入磁盤,對(duì)于鏡像隊(duì)列意味著所有鏡像接收成功

ConfirmCallback

  • 通過實(shí)現(xiàn) ConfirmCallback 接口,消息發(fā)送到 Broker 后觸發(fā)回調(diào),確認(rèn)消息是否到達(dá) Broker 服務(wù)器,也就是只確認(rèn)是否正確到達(dá) Exchange 中
@Component public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("消息唯一標(biāo)識(shí):"+correlationData);System.out.println("確認(rèn)結(jié)果:"+ack);System.out.println("失敗原因:"+cause);}

還需要在配置文件添加配置

spring:rabbitmq:publisher-confirms: true

ReturnCallback

  • 通過實(shí)現(xiàn) ReturnCallback 接口,啟動(dòng)消息失敗返回,比如路由不到隊(duì)列時(shí)觸發(fā)回調(diào)
@Component public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息主體 message : "+message);System.out.println("消息主體 message : "+replyCode);System.out.println("描述:"+replyText);System.out.println("消息使用的交換器 exchange : "+exchange);System.out.println("消息使用的路由鍵 routing : "+routingKey);} }

還需要在配置文件添加配置

spring:rabbitmq:publisher-returns: true

消息接收確認(rèn)
消息消費(fèi)者如何通知 Rabbit 消息消費(fèi)成功?

  • 消息通過 ACK 確認(rèn)是否被正確接收,每個(gè) Message 都要被確認(rèn)(acknowledged),可以手動(dòng)去 ACK 或自動(dòng) ACK
  • 自動(dòng)確認(rèn)會(huì)在消息發(fā)送給消費(fèi)者后立即確認(rèn),但存在丟失消息的可能,如果消費(fèi)端消費(fèi)邏輯拋出異常,也就是消費(fèi)端沒有處理成功這條消息,那么就相當(dāng)于丟失了消息
  • 如果消息已經(jīng)被處理,但后續(xù)代碼拋出異常,使用 Spring 進(jìn)行管理的話消費(fèi)端業(yè)務(wù)邏輯會(huì)進(jìn)行回滾,這也同樣造成了實(shí)際意義的消息丟失
  • 如果手動(dòng)確認(rèn)則當(dāng)消費(fèi)者調(diào)用 ack、nack、reject 幾種方法進(jìn)行確認(rèn),手動(dòng)確認(rèn)可以在業(yè)務(wù)失敗后進(jìn)行一些操作,如果消息未被 ACK 則會(huì)發(fā)送到下一個(gè)消費(fèi)者
  • 如果某個(gè)服務(wù)忘記 ACK 了,則 RabbitMQ 不會(huì)再發(fā)送數(shù)據(jù)給它,因?yàn)?RabbitMQ 認(rèn)為該服務(wù)的處理能力有限
  • ACK 機(jī)制還可以起到限流作用,比如在接收到某條消息時(shí)休眠幾秒鐘
  • 消息確認(rèn)模式有:
    • AcknowledgeMode.NONE:自動(dòng)確認(rèn)
    • AcknowledgeMode.AUTO:根據(jù)情況確認(rèn)
    • AcknowledgeMode.MANUAL:手動(dòng)確認(rèn)

確認(rèn)消息(局部方法處理消息)

  • 默認(rèn)情況下消息消費(fèi)者是自動(dòng) ack (確認(rèn))消息的,如果要手動(dòng) ack(確認(rèn))則需要修改確認(rèn)模式為 manual
spring:rabbitmq:listener:simple:acknowledge-mode: manual

或在 RabbitListenerContainerFactory 中進(jìn)行開啟手動(dòng) ack

@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //開啟手動(dòng) ackreturn factory; }

確認(rèn)消息

@RabbitHandler public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {System.out.println(message);try {channel.basicAck(tag,false); // 確認(rèn)消息} catch (IOException e) {e.printStackTrace();} }
  • 需要注意的 basicAck 方法需要傳遞兩個(gè)參數(shù)
    • deliveryTag(唯一標(biāo)識(shí) ID):當(dāng)一個(gè)消費(fèi)者向 RabbitMQ 注冊(cè)后,會(huì)建立起一個(gè) Channel ,RabbitMQ 會(huì)用 basic.deliver 方法向消費(fèi)者推送消息,這個(gè)方法攜帶了一個(gè) delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識(shí) ID,是一個(gè)單調(diào)遞增的正整數(shù),delivery tag 的范圍僅限于 Channel
    • multiple:為了減少網(wǎng)絡(luò)流量,手動(dòng)確認(rèn)可以被批處理,當(dāng)該參數(shù)為 true 時(shí),則可以一次性確認(rèn) delivery_tag 小于等于傳入值的所有消息

手動(dòng)否認(rèn)、拒絕消息

  • 發(fā)送一個(gè) header 中包含 error 屬性的消息

消費(fèi)者獲取消息時(shí)檢查到頭部包含 error 則 nack 消息

@RabbitHandler public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {System.out.println(message);if (map.get("error")!= null){System.out.println("錯(cuò)誤的消息");try {channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); //否認(rèn)消息return;} catch (IOException e) {e.printStackTrace();}}try {channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //確認(rèn)消息} catch (IOException e) {e.printStackTrace();} }
  • 此時(shí)控制臺(tái)重復(fù)打印,說明該消息被 nack 后一直重新入隊(duì)列然后一直重新消費(fèi)

hello 錯(cuò)誤的消息

hello 錯(cuò)誤的消息

hello 錯(cuò)誤的消息

hello 錯(cuò)誤的消息

也可以拒絕該消息,消息會(huì)被丟棄,不會(huì)重回隊(duì)列

channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //拒絕消息

確認(rèn)消息(全局處理消息)

  • 自動(dòng)確認(rèn)涉及到一個(gè)問題就是如果在處理消息的時(shí)候拋出異常,消息處理失敗,但是因?yàn)樽詣?dòng)確認(rèn)而導(dǎo)致 Rabbit 將該消息刪除了,造成消息丟失
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("consumer_queue"); // 監(jiān)聽的隊(duì)列container.setAcknowledgeMode(AcknowledgeMode.NONE); // NONE 代表自動(dòng)確認(rèn)container.setMessageListener((MessageListener) message -> { //消息監(jiān)聽處理System.out.println("====接收到消息=====");System.out.println(new String(message.getBody()));//相當(dāng)于自己的一些消費(fèi)邏輯拋錯(cuò)誤throw new NullPointerException("consumer fail");});return container; }

手動(dòng)確認(rèn)消息

@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("consumer_queue"); // 監(jiān)聽的隊(duì)列container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手動(dòng)確認(rèn)container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { //消息處理System.out.println("====接收到消息=====");System.out.println(new String(message.getBody()));if(message.getMessageProperties().getHeaders().get("error") == null){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println("消息已經(jīng)確認(rèn)");}else {//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);System.out.println("消息拒絕");}});return container; }

AcknowledgeMode 除了 NONE 和 MANUAL 之外還有 AUTO ,它會(huì)根據(jù)方法的執(zhí)行情況來決定是否確認(rèn)還是拒絕(是否重新入queue)

  • 如果消息成功被消費(fèi)(成功的意思是在消費(fèi)的過程中沒有拋出異常),則自動(dòng)確認(rèn)
  • 當(dāng)拋出 AmqpRejectAndDontRequeueException 異常的時(shí)候,則消息會(huì)被拒絕,且 requeue = false(不重新入隊(duì)列)
  • 當(dāng)拋出 ImmediateAcknowledgeAmqpException 異常,則消費(fèi)者會(huì)被確認(rèn)
  • 其他的異常,則消息會(huì)被拒絕,且 requeue = true(如果此時(shí)只有一個(gè)消費(fèi)者監(jiān)聽該隊(duì)列,則有發(fā)生死循環(huán)的風(fēng)險(xiǎn),多消費(fèi)端也會(huì)造成資源的極大浪費(fèi),這個(gè)在開發(fā)過程中一定要避免的)。可以通過 setDefaultRequeueRejected(默認(rèn)是true)去設(shè)置
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("consumer_queue"); // 監(jiān)聽的隊(duì)列container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 根據(jù)情況確認(rèn)消息container.setMessageListener((MessageListener) (message) -> {System.out.println("====接收到消息=====");System.out.println(new String(message.getBody()));//拋出NullPointerException異常則重新入隊(duì)列//throw new NullPointerException("消息消費(fèi)失敗");//當(dāng)拋出的異常是AmqpRejectAndDontRequeueException異常的時(shí)候,則消息會(huì)被拒絕,且requeue=false//throw new AmqpRejectAndDontRequeueException("消息消費(fèi)失敗");//當(dāng)拋出ImmediateAcknowledgeAmqpException異常,則消費(fèi)者會(huì)被確認(rèn)throw new ImmediateAcknowledgeAmqpException("消息消費(fèi)失敗");});return container; }

消息可靠總結(jié)

  • 持久化
    • exchange要持久化
    • queue要持久化
    • message要持久化
  • 消息確認(rèn)
    • 啟動(dòng)消費(fèi)返回(@ReturnList注解,生產(chǎn)者就可以知道哪些消息沒有發(fā)出去)
    • 生產(chǎn)者和Server(broker)之間的消息確認(rèn)
    • 消費(fèi)者和Server(broker)之間的消息確認(rèn)

總結(jié)

以上是生活随笔為你收集整理的rabbitmq 启动异常_RabbitMQ:消息发送确认 与 消息接收确认(ACK)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产日韩欧美在线观看视频 | 三级av在线免费观看 | 欧美成人黑人猛交 | 亚洲欧美日韩精品永久在线 | 欧美成人性生活片 | 日本视频www色 | 少妇逼逼| 日本三级吃奶头添泬无码苍井空 | 欧美一区二区三区免费视频 | 国产精品久久在线观看 | 免费黄色在线网址 | 精品电影一区二区 | 久久精品无码一区二区三区免费 | 中文字幕第一页av | 手机看片欧美日韩 | 国产av一区二区三区 | 国产91免费视频 | 奇米影视在线 | 日韩在线看片 | 日韩成人不卡 | 国产性―交一乱―色―情人 | 亚洲偷| 91久久极品少妇xxxxⅹ软件 | 国产特黄大片aaaa毛片 | 操极品| 不卡一区二区在线 | 嫩草在线播放 | 97精品人妻一区二区三区蜜桃 | 91av在线播放 | 少妇人妻一区 | 深夜福利视频网站 | 色亚洲欧美 | 四虎影库永久在线 | 国产精品igao视频 | 少妇人妻无码专区视频 | 亚洲一区二区三区综合 | 高h全肉污文play带道具 | 99久久久无码国产精品性青椒 | 日韩系列在线 | 久久久久爱 | 在线观看毛片网站 | 性少妇bbw张开 | 国产精品一卡二卡三卡 | 国产乱子伦精品 | 欧美美女性高潮 | 久久久久99精品国产片 | 国产黄在线观看 | 国产免费资源 | 天天操网| 精品国产免费人成在线观看 | 中文字幕2018| 国产精品成人久久久久久久 | 极品人妻一区二区 | 围产精品久久久久久久 | 91福利视频免费观看 | 欧美日本国产 | 免费视频一区二区 | 久久亚洲综合色 | 日韩日韩日韩日韩日韩 | 男人的天堂你懂的 | 中文文字幕文字幕高清 | 国产在线视频导航 | 天天做天天干 | 日韩一区二区中文字幕 | 亚洲一区二区精品视频 | 91久久国产综合久久91 | 污污网址在线观看 | 久久亚洲AV成人无码国产人妖 | 每日更新av | 武侠古典av | yy111122少妇光屁股影院 | 91porn九色| 欧美中文字幕在线 | 日韩毛片在线视频 | 成人精品一区二区三区电影黑人 | 日本午夜视频 | 一区二区三区毛片 | 日韩av在线免费看 | 久久久久国色av免费观看性色 | 色噜噜狠狠狠综合曰曰曰 | 久久国产精品免费观看 | 少妇精品视频 | 国产另类精品 | 久久入口 | 日韩免费一二三区 | 三级网站国产 | 欧美成人一级 | 日本a一级片 | 91免费看片 | 久久综合激情网 | 免费在线观看视频a | 黄色一级片免费在线观看 | 阿的白色内裤hd中文 | 尤物综合网 | 在线视频日韩精品 | 九九热精品免费视频 | 香蕉国产在线观看 | 天堂在线视频免费观看 | 国产ts变态重口人妖hd |