日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

消息确认机制---confirm异步

發(fā)布時間:2025/6/17 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 消息确认机制---confirm异步 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一:介紹

1.異步模式介紹

  Channel對象提供ConfirmListener()回調(diào)方法只包含deliverTag(當(dāng)前Channel發(fā)出的序列號),我們需要自己為每一個Channel維護一個unconfirm的消息序列集合,沒publish一條數(shù)據(jù),集合就加1,每回調(diào)一次handleAck方法,unconfirm集合刪掉相應(yīng)的一條(multiple=false)或者多條(multiple=true)記錄。從程序運行效率上看,這個unconfirm集合最好采用有序集合SortedSet存儲結(jié)構(gòu)。

?

二:程序

1.生產(chǎn)者

1 package com.mq.AsynConfirm; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.ConfirmListener; 6 import com.rabbitmq.client.Connection; 7 8 import java.io.IOException; 9 import java.util.Collections; 10 import java.util.SortedSet; 11 import java.util.TreeSet; 12 13 public class Send { 14 private static final String QUEUE_NAME="test_queue_confirm_asyn"; 15 public static void main(String[] args)throws Exception{ 16 Connection connection= ConnectionUtil.getConnection(); 17 Channel channel=connection.createChannel(); 18 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 19 //生產(chǎn)者調(diào)用confirmSelect將channel設(shè)置為nconfirm模式 20 channel.confirmSelect(); 21 final SortedSet<Long> confirmSet= Collections.synchronizedSortedSet(new TreeSet<Long>()); 22 channel.addConfirmListener(new ConfirmListener() { 23 //沒有問題 24 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 25 if (multiple){ 26 System.out.println("handleAck multiple"); 27 confirmSet.headSet(deliveryTag+1).clear(); 28 }else{ 29 System.out.println("handleAck false"); 30 confirmSet.remove(deliveryTag); 31 } 32 } 33 //有問題 34 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 35 if (multiple){ 36 System.out.println("handleNack multiple"); 37 confirmSet.headSet(deliveryTag+1).clear(); 38 }else{ 39 System.out.println("handleNack false"); 40 confirmSet.remove(deliveryTag); 41 } 42 } 43 }); 44 String msg="success"; 45 while (true){ 46 long seqNo=channel.getNextPublishSeqNo(); 47 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); 48 confirmSet.add(seqNo); 49 } 50 51 } 52 }

?

2.消費者

1 package com.mq.AsynConfirm; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class Receive { 9 private static final String QUEUE_NAME="test_queue_confirm_asyn"; 10 public static void main(String[] args)throws Exception { 11 Connection connection = ConnectionUtil.getConnection(); 12 Channel channel = connection.createChannel(); 13 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 14 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ 15 @Override 16 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 17 System.out.println(new String(body,"utf-8")); 18 } 19 }); 20 } 21 }

?

3.現(xiàn)象

  Send:

  

?

轉(zhuǎn)載于:https://www.cnblogs.com/juncaoit/p/8635633.html

總結(jié)

以上是生活随笔為你收集整理的消息确认机制---confirm异步的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。