RabbitMQ异步发布确认
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ异步发布确认
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
異步確認雖然編程邏輯比上兩個要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功,下面就讓我們來詳細講解異步確認是怎么實現的。
?
/* * 發布確認模式, * 1、單個確認 * 2、批量確認 * 3、異步批量確認 * */ public class ComfirmMessage {// 批量發消息的個數public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {//3、異步批量確認// 發布1000個異步確認消息,耗時36msComfirmMessage.publicMessageAsync();}public static void publicMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);// 開啟發布確認channel.confirmSelect();// 開始時間long begin = System.currentTimeMillis();// 消息確認成功回調函數ConfirmCallback ackCallback = (deliveryTag,multiply) -> {System.out.println("確認的消息:"+deliveryTag);};// 消息確認失敗回調函數/** 參數1:消息的標記* 參數2:是否為批量確認* */ConfirmCallback nackCallback = (deliveryTag,multiply) -> {System.out.println("未確認的消息:"+deliveryTag);};// 準備消息的監聽器,監聽哪些消息成功,哪些消息失敗/** 參數1:監聽哪些消息成功* 參數2:監聽哪些消息失敗* */channel.addConfirmListener(ackCallback,nackCallback);// 批量發送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));}// 結束時間long end = System.currentTimeMillis();System.out.println("發布"+MESSAGE_COUNT+"個異步確認消息,耗時"+ (end - begin) + "ms");} }如何處理異步未確認信息?
最好的解決方案就是把未確認的消息放到一個基于內存的能被發布線程訪問的隊列,比如說用ConcurrentLinkedQueue這個隊列在confirm callbacks與發布線程之間進行消息的傳遞
public class ComfirmMessage {// 批量發消息的個數public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {//3、異步批量確認// 發布1000個異步確認消息,耗時36msComfirmMessage.publicMessageAsync();}public static void publicMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);// 開啟發布確認channel.confirmSelect();/** 線程安全有序的一個哈希表 適用于高并發的情況下* 1、輕松地將序號與消息進行關聯* 2、輕松地批量刪除,只要給到序號* 3、支持高并發* */ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();// 消息確認成功回調函數ConfirmCallback ackCallback = (deliveryTag,multiply) -> {// 刪除到已經確認的消息,剩下的就是未確認的消息if(multiply){ConcurrentNavigableMap<Long, String> confiremed = outstandingConfirms.headMap(deliveryTag);confiremed.clear();}else {outstandingConfirms.remove(deliveryTag);}System.out.println("確認的消息:"+deliveryTag);};// 消息確認失敗回調函數/** 參數1:消息的標記* 參數2:是否為批量確認* */ConfirmCallback nackCallback = (deliveryTag,multiply) -> {// 打印一下未確認的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未確認的消息是:" + message +"未確認的消息tag:" + deliveryTag);};// 準備消息的監聽器,監聽哪些消息成功,哪些消息失敗/** 參數1:監聽哪些消息成功* 參數2:監聽哪些消息失敗* */channel.addConfirmListener(ackCallback,nackCallback);// 開始時間long begin = System.currentTimeMillis();// 批量發送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));// 此處記錄下所有要發送的消息的總和outstandingConfirms.put(channel.getNextPublishSeqNo(),message);}// 結束時間long end = System.currentTimeMillis();System.out.println("發布"+MESSAGE_COUNT+"個異步確認消息,耗時"+ (end - begin) + "ms");} }總結
以上是生活随笔為你收集整理的RabbitMQ异步发布确认的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ批量确认发布
- 下一篇: RabbitMQ交换机简介