日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ异步发布确认

發布時間:2024/4/13 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ异步发布确认 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

異步確認雖然編程邏輯比上兩個要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功,下面就讓我們來詳細講解異步確認是怎么實現的。

?

/* * 發布確認模式, * 1、單個確認 * 2、批量確認 * 3、異步批量確認 * */ public class ComfirmMessage {// 批量發消息的個數public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {//3、異步批量確認// 發布1000個異步確認消息,耗時36msComfirmMessage.publicMessageAsync();}public static void publicMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);// 開啟發布確認channel.confirmSelect();// 開始時間long begin = System.currentTimeMillis();// 消息確認成功回調函數ConfirmCallback ackCallback = (deliveryTag,multiply) -> {System.out.println("確認的消息:"+deliveryTag);};// 消息確認失敗回調函數/** 參數1:消息的標記* 參數2:是否為批量確認* */ConfirmCallback nackCallback = (deliveryTag,multiply) -> {System.out.println("未確認的消息:"+deliveryTag);};// 準備消息的監聽器,監聽哪些消息成功,哪些消息失敗/** 參數1:監聽哪些消息成功* 參數2:監聽哪些消息失敗* */channel.addConfirmListener(ackCallback,nackCallback);// 批量發送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));}// 結束時間long end = System.currentTimeMillis();System.out.println("發布"+MESSAGE_COUNT+"個異步確認消息,耗時"+ (end - begin) + "ms");} }

如何處理異步未確認信息?

最好的解決方案就是把未確認的消息放到一個基于內存的能被發布線程訪問的隊列,比如說用ConcurrentLinkedQueue這個隊列在confirm callbacks與發布線程之間進行消息的傳遞

public class ComfirmMessage {// 批量發消息的個數public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {//3、異步批量確認// 發布1000個異步確認消息,耗時36msComfirmMessage.publicMessageAsync();}public static void publicMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);// 開啟發布確認channel.confirmSelect();/** 線程安全有序的一個哈希表 適用于高并發的情況下* 1、輕松地將序號與消息進行關聯* 2、輕松地批量刪除,只要給到序號* 3、支持高并發* */ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();// 消息確認成功回調函數ConfirmCallback ackCallback = (deliveryTag,multiply) -> {// 刪除到已經確認的消息,剩下的就是未確認的消息if(multiply){ConcurrentNavigableMap<Long, String> confiremed = outstandingConfirms.headMap(deliveryTag);confiremed.clear();}else {outstandingConfirms.remove(deliveryTag);}System.out.println("確認的消息:"+deliveryTag);};// 消息確認失敗回調函數/** 參數1:消息的標記* 參數2:是否為批量確認* */ConfirmCallback nackCallback = (deliveryTag,multiply) -> {// 打印一下未確認的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未確認的消息是:" + message +"未確認的消息tag:" + deliveryTag);};// 準備消息的監聽器,監聽哪些消息成功,哪些消息失敗/** 參數1:監聽哪些消息成功* 參數2:監聽哪些消息失敗* */channel.addConfirmListener(ackCallback,nackCallback);// 開始時間long begin = System.currentTimeMillis();// 批量發送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));// 此處記錄下所有要發送的消息的總和outstandingConfirms.put(channel.getNextPublishSeqNo(),message);}// 結束時間long end = System.currentTimeMillis();System.out.println("發布"+MESSAGE_COUNT+"個異步確認消息,耗時"+ (end - begin) + "ms");} }

總結

以上是生活随笔為你收集整理的RabbitMQ异步发布确认的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。