日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ事务和Confirm发送方消息确认——深入解读

發布時間:2025/3/11 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ事务和Confirm发送方消息确认——深入解读 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

引言

根據前面的知識(深入了解RabbitMQ工作原理及簡單使用、Rabbit的幾種工作模式介紹與實踐)我們知道,如果要保證消息的可靠性,需要對消息進行持久化處理,然而消息持久化除了需要代碼的設置之外,還有一個重要步驟是至關重要的,那就是保證你的消息順利進入Broker(代理服務器),如圖所示:

正常情況下,如果消息經過交換器進入隊列就可以完成消息的持久化,但如果消息在沒有到達broker之前出現意外,那就造成消息丟失,有沒有辦法可以解決這個問題?

RabbitMQ有兩種方式來解決這個問題:

  • 通過AMQP提供的事務機制實現;
  • 使用發送者確認模式實現;
  • 一、事務使用

    事務的實現主要是對信道(Channel)的設置,主要的方法有三個:

  • channel.txSelect()聲明啟動事務模式;

  • channel.txComment()提交事務;

  • channel.txRollback()回滾事務;

  • 從上面的可以看出事務都是以tx開頭的,tx應該是transaction extend(事務擴展模塊)的縮寫,如果有準確的解釋歡迎在博客下留言。

    我們來看具體的代碼實現:

    // 創建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(_queueName, true, false, false, null); String message = String.format("時間 => %s", new Date().getTime()); try {channel.txSelect(); // 聲明事務// 發送消息channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));channel.txCommit(); // 提交事務 } catch (Exception e) {channel.txRollback(); } finally {channel.close();conn.close(); }

    注意:用戶需把config.xx配置成自己Rabbit的信息。

    從上面的代碼我們可以看出,在發送消息之前的代碼和之前介紹的都是一樣的,只是在發送消息之前,需要聲明channel為事務模式,提交或者回滾事務即可。

    了解了事務的實現之后,那么事務究竟是怎么執行的,讓我們來使用wireshark抓個包看看,如圖所示:

    輸入ip.addr==rabbitip && amqp查看客戶端和rabbit之間的通訊,可以看到交互流程:

    • 客戶端發送給服務器Tx.Select(開啟事務模式)
    • 服務器端返回Tx.Select-Ok(開啟事務模式ok)
    • 推送消息
    • 客戶端發送給事務提交Tx.Commit
    • 服務器端返回Tx.Commit-Ok

    以上就完成了事務的交互流程,如果其中任意一個環節出現問題,就會拋出IoException移除,這樣用戶就可以攔截異常進行事務回滾,或決定要不要重復消息。

    那么,既然已經有事務了,沒什么還要使用發送方確認模式呢,原因是因為事務的性能是非常差的。事務性能測試

    事務模式,結果如下:

    • 事務模式,發送1w條數據,執行花費時間:14197s
    • 事務模式,發送1w條數據,執行花費時間:13597s
    • 事務模式,發送1w條數據,執行花費時間:14216s

    非事務模式,結果如下:

    • 非事務模式,發送1w條數據,執行花費時間:101s
    • 非事務模式,發送1w條數據,執行花費時間:77s
    • 非事務模式,發送1w條數據,執行花費時間:106s

    從上面可以看出,非事務模式的性能是事務模式的性能高149倍,我的電腦測試是這樣的結果,不同的電腦配置略有差異,但結論是一樣的,事務模式的性能要差很多,那有沒有既能保證消息的可靠性又能兼顧性能的解決方案呢?那就是接下來要講的Confirm發送方確認模式。

    擴展知識

    我們知道,消費者可以使用消息自動或手動發送來確認消費消息,那如果我們在消費者模式中使用事務(當然如果使用了手動確認消息,完全用不到事務的),會發生什么呢?

    消費者模式使用事務

    假設消費者模式中使用了事務,并且在消息確認之后進行了事務回滾,那么RabbitMQ會產生什么樣的變化?

    結果分為兩種情況:

  • autoAck=false手動應對的時候是支持事務的,也就是說即使你已經手動確認了消息已經收到了,但在確認消息會等事務的返回解決之后,在做決定是確認消息還是重新放回隊列,如果你手動確認現在之后,又回滾了事務,那么已事務回滾為主,此條消息會重新放回隊列;
  • autoAck=true如果自定確認為true的情況是不支持事務的,也就是說你即使在收到消息之后在回滾事務也是于事無補的,隊列已經把消息移除了;
  • 二、Confirm發送方確認模式

    Confirm發送方確認模式使用和事務類似,也是通過設置Channel進行發送方確認的。

    Confirm的三種實現方式:

    方式一:channel.waitForConfirms()普通發送方確認模式;

    方式二:channel.waitForConfirmsOrDie()批量確認模式;

    方式三:channel.addConfirmListener()異步監聽發送方確認模式;

    方式一:普通Confirm模式

    // 創建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啟發送方確認模式 channel.confirmSelect(); String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); if (channel.waitForConfirms()) {System.out.println("消息發送成功" ); }

    看代碼可以知道,我們只需要在推送消息之前,channel.confirmSelect()聲明開啟發送方確認模式,再使用channel.waitForConfirms()等待消息被服務器確認即可。

    方式二:批量Confirm模式

    // 創建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啟發送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) {String message = String.format("時間 => %s", new Date().getTime());channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } channel.waitForConfirmsOrDie(); //直到所有信息都發布,只要有一個未確認就會IOException System.out.println("全部執行完成");

    以上代碼可以看出來channel.waitForConfirmsOrDie(),使用同步方式等所有的消息發送之后才會執行后面代碼,只要有一個消息未被確認就會拋出IOException異常。

    方式三:異步Confirm模式

    // 創建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啟發送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) {String message = String.format("時間 => %s", new Date().getTime());channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } //異步監聽確認和未確認的消息 channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("未確認消息,標識:" + deliveryTag);}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(String.format("已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple));} });

    異步模式的優點,就是執行效率高,不需要等待消息執行完,只需要監聽消息即可,以上異步返回的信息如下:

    可以看出,代碼是異步執行的,消息確認有可能是批量確認的,是否批量確認在于返回的multiple的參數,此參數為bool值,如果true表示批量執行了deliveryTag這個值以前的所有消息,如果為false的話表示單條確認。

    Confirm性能測試

    測試前提:與事務一樣,我們發送1w條消息。

    方式一:Confirm普通模式

    • 執行花費時間:2253s
    • 執行花費時間:2018s
    • 執行花費時間:2043s

    方式二:Confirm批量模式

    • 執行花費時間:1576s
    • 執行花費時間:1400s
    • 執行花費時間:1374s

    方式三:Confirm異步監聽方式

    • 執行花費時間:1498s
    • 執行花費時間:1368s
    • 執行花費時間:1363s

    總結

    綜合總體測試情況來看:Confirm批量確定和Confirm異步模式性能相差不大,Confirm模式要比事務快10倍左右。

    長按二維碼關注我的技術公眾號

    總結

    以上是生活随笔為你收集整理的RabbitMQ事务和Confirm发送方消息确认——深入解读的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 永久免费未网 | 曰本黄色片 | 人妻精品无码一区二区三区 | 成人动漫视频在线观看 | 噼里啪啦国语电影 | 国产八区 | 国产同性人妖ts口直男 | 久草视频这里只有精品 | 男女羞羞动态图 | 人人爱国产 | www.四虎在线 | 一级日韩一级欧美 | 亚洲天堂avav | 国产成人精品一区二区三区免费 | 午夜在线视频播放 | 精品免费视频一区二区 | 精品久久久中文字幕人妻 | 成人免费国产 | 国产区一区二区三区 | 日韩欧美精品一区二区三区 | 亚洲色图17p | 国产在线拍揄自揄拍无码 | 星铁乱淫h侵犯h文 | 亚洲免费在线视频 | 日批在线播放 | 国产调教打屁股xxxx网站 | a天堂在线视频 | 亚洲欧美日韩第一页 | 日本欧美韩国国产精品 | 国产性色av | 欧美成人高清视频 | 精品欧美一区二区三区 | 久色在线 | 亚洲av无码一区二区乱子伦 | 中文字幕亚洲欧美日韩在线不卡 | 国产精品秘 | 九色丨蝌蚪丨成人 | 国产在线观看网站 | a资源在线观看 | 一级片一级| 日韩精品伦理 | 椎名由奈在线观看 | 日韩精品在线观看一区二区 | 亚久久| 国产午夜一区二区三区 | 一级黄色大片免费观看 | 国产二区三区视频 | 胖女人毛片 | 国产中文网 | 国产精品porn| 欧美国产综合 | 我和公激情中文字幕 | 国产精品无码无卡无需播放器 | 国精产品99永久一区一区 | 欧美日韩高清免费 | 97色爱 | 国产一级特黄视频 | 中文日韩 | 成人久久国产 | 国产日韩欧美精品一区 | 久操资源网| 亚洲精品av在线 | 看个毛片| 懂色av一区二区三区在线播放 | 在线国产观看 | 91免费播放 | 美日韩成人 | 激情一区二区三区 | 亚洲AV无码一区二区三区蜜桃 | 一区二区视频免费观看 | 色爽交 | 国产人妻777人伦精品hd | 久久久久久中文字幕 | 欧美第一精品 | 国产精品一线天粉嫩av | www麻豆 | 国产精品无圣光 | 蜜臀av午夜精品 | 欧美大尺度床戏做爰 | 欧美激情免费看 | 18禁一区二区 | 久久精品亚洲一区二区 | 天天综合视频 | 国产成人精品免费看视频 | 免费黄色小说视频 | 青青草国产在线 | 91传媒视频在线观看 | 在哪里看毛片 | 日本公与丰满熄 | 亚洲经典在线观看 | 奶水喷溅虐奶乳奴h文 | 咪咪色影院 | 亚洲xxxxx| 久久电影一区二区 | 色综合天天操 | 欧美美女一区二区 | 香蕉视频日本 | 日本大尺度电影免费观看全集中文版 | 亚洲婷婷综合网 |