日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

给你1分钟,回答下RabbitMQ如何保证消息不丢?

發(fā)布時(shí)間:2025/3/16 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 给你1分钟,回答下RabbitMQ如何保证消息不丢? 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一條消費(fèi)成功被消費(fèi)經(jīng)歷了生產(chǎn)者->MQ->消費(fèi)者,因此在這三個(gè)步驟中都有可能造成消息丟失。

一 消息生產(chǎn)者沒(méi)有把消息成功發(fā)送到MQ

1.1 事務(wù)機(jī)制

AMQP協(xié)議提供了事務(wù)機(jī)制,在投遞消息時(shí)開啟事務(wù)支持,如果消息投遞失敗,則回滾事務(wù)。

自定義事務(wù)管理器

@Configuration public?class?RabbitTranscation?{@Beanpublic?RabbitTransactionManager?rabbitTransactionManager(ConnectionFactory?connectionFactory){return?new?RabbitTransactionManager(connectionFactory);}@Beanpublic?RabbitTemplate?rabbitTemplate(ConnectionFactory?connectionFactory){return?new?RabbitTemplate(connectionFactory);} }

修改yml

spring:rabbitmq:#?消息在未被隊(duì)列收到的情況下返回publisher-returns:?true

開啟事務(wù)支持

rabbitTemplate.setChannelTransacted(true);

消息未接收時(shí)調(diào)用ReturnCallback

rabbitTemplate.setMandatory(true);

生產(chǎn)者投遞消息

@Service public?class?ProviderTranscation?implements?RabbitTemplate.ReturnCallback?{@AutowiredRabbitTemplate?rabbitTemplate;@PostConstructpublic?void?init(){//?設(shè)置channel開啟事務(wù)rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setReturnCallback(this);}@Overridepublic?void?returnedMessage(Message?message,?int?replyCode,?String?replyText,?String?exchange,?String?routingKey)?{System.out.println("這條消息發(fā)送失敗了"+message+",請(qǐng)?zhí)幚?#34;);}@Transactional(rollbackFor?=?Exception.class,transactionManager?=?"rabbitTransactionManager")public?void?publishMessage(String?message)?throws?Exception?{rabbitTemplate.setMandatory(true);rabbitTemplate.convertAndSend("javatrip",message);} }

但是,很少有人這么干,因?yàn)檫@是同步操作,一條消息發(fā)送之后會(huì)使發(fā)送端阻塞,以等待RabbitMQ-Server的回應(yīng),之后才能繼續(xù)發(fā)送下一條消息,生產(chǎn)者生產(chǎn)消息的吞吐量和性能都會(huì)大大降低。

1.2 發(fā)送方確認(rèn)機(jī)制

發(fā)送消息時(shí)將信道設(shè)置為confirm模式,消息進(jìn)入該信道后,都會(huì)被指派給一個(gè)唯一ID,一旦消息被投遞到所匹配的隊(duì)列后,RabbitMQ就會(huì)發(fā)送給生產(chǎn)者一個(gè)確認(rèn)。

開啟消息確認(rèn)機(jī)制

spring:rabbitmq:#?消息在未被隊(duì)列收到的情況下返回publisher-returns:?true#?開啟消息確認(rèn)機(jī)制publisher-confirm-type:?correlated

消息未接收時(shí)調(diào)用ReturnCallback

rabbitTemplate.setMandatory(true);

生產(chǎn)者投遞消息

@Service public?class?ConfirmProvider?implements?RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback?{@AutowiredRabbitTemplate?rabbitTemplate;@PostConstructpublic?void?init()?{rabbitTemplate.setReturnCallback(this);rabbitTemplate.setConfirmCallback(this);}@Overridepublic?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{if(ack){System.out.println("確認(rèn)了這條消息:"+correlationData);}else{System.out.println("確認(rèn)失敗了:"+correlationData+";出現(xiàn)異常:"+cause);}}@Overridepublic?void?returnedMessage(Message?message,?int?replyCode,?String?replyText,?String?exchange,?String?routingKey)?{System.out.println("這條消息發(fā)送失敗了"+message+",請(qǐng)?zhí)幚?#34;);}public?void?publisMessage(String?message){rabbitTemplate.setMandatory(true);rabbitTemplate.convertAndSend("javatrip",message);} }

如果消息確認(rèn)失敗后,我們可以進(jìn)行消息補(bǔ)償,也就是消息的重試機(jī)制。當(dāng)未收到確認(rèn)信息時(shí)進(jìn)行消息的重新投遞。設(shè)置如下配置即可完成。

spring:rabbitmq:#?支持消息發(fā)送失敗后重返隊(duì)列publisher-returns:?true#?開啟消息確認(rèn)機(jī)制publisher-confirm-type:?correlatedlistener:simple:retry:#?開啟重試enabled:?true#?最大重試次數(shù)max-attempts:?5#?重試時(shí)間間隔initial-interval:?3000

二 消息發(fā)送到MQ后,MQ宕機(jī)導(dǎo)致內(nèi)存中的消息丟失

消息在MQ中有可能發(fā)生丟失,這時(shí)候我們就需要將隊(duì)列和消息都進(jìn)行持久化。

@Queue注解為我們提供了隊(duì)列相關(guān)的一些屬性,具體如下:

  • name: 隊(duì)列的名稱;

  • durable: 是否持久化;

  • exclusive: 是否獨(dú)享、排外的;

  • autoDelete: 是否自動(dòng)刪除;

  • arguments:隊(duì)列的其他屬性參數(shù),有如下可選項(xiàng),可參看圖2的arguments:

    • x-message-ttl:消息的過(guò)期時(shí)間,單位:毫秒;

    • x-expires:隊(duì)列過(guò)期時(shí)間,隊(duì)列在多長(zhǎng)時(shí)間未被訪問(wèn)將被刪除,單位:毫秒;

    • x-max-length:隊(duì)列最大長(zhǎng)度,超過(guò)該最大值,則將從隊(duì)列頭部開始刪除消息;

    • x-max-length-bytes:隊(duì)列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過(guò)該閾值則從隊(duì)列頭部開始刪除消息;

    • x-overflow:設(shè)置隊(duì)列溢出行為。這決定了當(dāng)達(dá)到隊(duì)列的最大長(zhǎng)度時(shí)消息會(huì)發(fā)生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁隊(duì)列類型僅支持drop-head;

    • x-dead-letter-exchange:死信交換器名稱,過(guò)期或被刪除(因隊(duì)列長(zhǎng)度超長(zhǎng)或因空間超出閾值)的消息可指定發(fā)送到該交換器中;

    • x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時(shí)會(huì)使用該路由鍵,如果不設(shè)置,則使用消息的原來(lái)的路由鍵值

    • x-single-active-consumer:表示隊(duì)列是否是單一活動(dòng)消費(fèi)者,true時(shí),注冊(cè)的消費(fèi)組內(nèi)只有一個(gè)消費(fèi)者消費(fèi)消息,其他被忽略,false時(shí)消息循環(huán)分發(fā)給所有消費(fèi)者(默認(rèn)false)

    • x-max-priority:隊(duì)列要支持的最大優(yōu)先級(jí)數(shù);如果未設(shè)置,隊(duì)列將不支持消息優(yōu)先級(jí);

    • x-queue-mode(Lazy mode):將隊(duì)列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使用;如果未設(shè)置,隊(duì)列將保留內(nèi)存緩存以盡可能快地傳遞消息;

    • x-queue-master-locator:在集群模式下設(shè)置鏡像隊(duì)列的主節(jié)點(diǎn)信息。

    持久化隊(duì)列

    創(chuàng)建隊(duì)列的時(shí)候?qū)⒊志没瘜傩詃urable設(shè)置為true,同時(shí)要將autoDelete設(shè)置為false

    @Queue(value?=?"javatrip",durable?=?"true",autoDelete?=?"false")

    持久化消息

    發(fā)送消息的時(shí)候?qū)⑾⒌膁eliveryMode設(shè)置為2,在Spring Boot中消息默認(rèn)就是持久化的。

    三 消費(fèi)者消費(fèi)消息的時(shí)候,未消費(fèi)完畢就出現(xiàn)了異常

    消費(fèi)者剛消費(fèi)了消息,還沒(méi)有處理業(yè)務(wù),結(jié)果發(fā)生異常。這時(shí)候就需要關(guān)閉自動(dòng)確認(rèn),改為手動(dòng)確認(rèn)消息。

    修改yml為手動(dòng)簽收模式

    spring:rabbitmq:listener:simple:#?手動(dòng)簽收模式acknowledge-mode:?manual#?每次簽收一條消息prefetch:?1

    消費(fèi)者手動(dòng)簽收

    @Component @RabbitListener(queuesToDeclare?=?@Queue(value?=?"javatrip",?durable?=?"true")) public?class?Consumer?{@RabbitHandlerpublic?void?receive(String?message,?@Headers?Map<String,Object>?headers,?Channel?channel)?throws?Exception{System.out.println(message);//?唯一的消息IDLong?deliverTag?=?(Long)?headers.get(AmqpHeaders.DELIVERY_TAG);//?確認(rèn)該條消息if(...){channel.basicAck(deliverTag,false);}else{//?消費(fèi)失敗,消息重返隊(duì)列channel.basicNack(deliverTag,false,true);}} }

    四 總結(jié)

    消息丟失的原因?

    生產(chǎn)者、MQ、消費(fèi)者都有可能造成消息丟失

    如何保證消息的可靠性?

    • 發(fā)送方采取發(fā)送者確認(rèn)模式

    • MQ進(jìn)行隊(duì)列及消息的持久化

    • 消費(fèi)者消費(fèi)成功后手動(dòng)確認(rèn)消息

    有道無(wú)術(shù),術(shù)可成;有術(shù)無(wú)道,止于術(shù)

    歡迎大家關(guān)注Java之道公眾號(hào)

    好文章,我在看??

    總結(jié)

    以上是生活随笔為你收集整理的给你1分钟,回答下RabbitMQ如何保证消息不丢?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。