java确认rabbitmq_RabbitMQ 消息确认机制
生產(chǎn)端 Confirm 消息確認(rèn)機(jī)制
消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果 Broker 收到消息,則會(huì)給我們生產(chǎn)者一個(gè)應(yīng)答。生產(chǎn)者進(jìn)行接收應(yīng)答,用來(lái)確定這條消息是否正常的發(fā)送到 Broker ,這種方式也是消息的可靠性投遞的核心保障!
Confirm 確認(rèn)機(jī)制流程圖
如何實(shí)現(xiàn)Confirm確認(rèn)消息?
第一步:在 channel 上開(kāi)啟確認(rèn)模式: channel.confirmSelect()
第二步:在 channel 上添加監(jiān)聽(tīng): channel.addConfirmListener(ConfirmListener listener);, 監(jiān)聽(tīng)成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對(duì)消息進(jìn)行重新發(fā)送、或記錄日志等后續(xù)處理!
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class ConfirmProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "item.update";
//指定消息的投遞模式:confirm 確認(rèn)模式
channel.confirmSelect();
//發(fā)送
final long start = System.currentTimeMillis();
for (int i = 0; i < 5 ; i++) {
String msg = "this is confirm msg ";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
System.out.println("Send message : " + msg);
}
//添加一個(gè)確認(rèn)監(jiān)聽(tīng), 這里就不關(guān)閉連接了,為了能保證能收到監(jiān)聽(tīng)消息
channel.addConfirmListener(new ConfirmListener() {
/**
* 返回成功的回調(diào)函數(shù)
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("succuss ack");
System.out.println(multiple);
System.out.println("耗時(shí):" + (System.currentTimeMillis() - start) + "ms");
}
/**
* 返回失敗的回調(diào)函數(shù)
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("defeat ack");
System.out.println("耗時(shí):" + (System.currentTimeMillis() - start) + "ms");
}
});
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConfirmConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String queueName = "test_confirm_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代碼綁定,在管理界面手動(dòng)綁定
channel.queueBind(queueName, exchangeName, routingKey);
//創(chuàng)建消費(fèi)者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
channel.basicConsume(queueName, true, consumer);
}
}
我們此處只關(guān)注生產(chǎn)端輸出消息
Send message : this is confirm msg
Send message : this is confirm msg
Send message : this is confirm msg
Send message : this is confirm msg
Send message : this is confirm msg
succuss ack
true
耗時(shí):3ms
succuss ack
true
耗時(shí):4ms
注意事項(xiàng)
我們采用的是異步 confirm 模式:提供一個(gè)回調(diào)方法,服務(wù)端 confirm 了一條或者多條消息后 Client 端會(huì)回調(diào)這個(gè)方法。除此之外還有單條同步 confirm 模式、批量同步 confirm 模式,由于現(xiàn)實(shí)場(chǎng)景中很少使用我們?cè)诖瞬蛔鼋榻B,如有興趣直接參考官方文檔。
我們運(yùn)行生產(chǎn)端會(huì)發(fā)現(xiàn)每次運(yùn)行結(jié)果都不一樣,會(huì)有多種情況出現(xiàn),因?yàn)?Broker 會(huì)進(jìn)行優(yōu)化,有時(shí)會(huì)批量一次性 confirm ,有時(shí)會(huì)分開(kāi)幾條 confirm。
succuss ack
true
耗時(shí):3ms
succuss ack
false
耗時(shí):4ms
或者
succuss ack
true
耗時(shí):3ms
Return 消息機(jī)制
Return Listener 用于處理一-些不可路 由的消息!
消息生產(chǎn)者,通過(guò)指定一個(gè) Exchange 和 Routingkey,把消息送達(dá)到某一個(gè)隊(duì)列中去,然后我們的消費(fèi)者監(jiān)聽(tīng)隊(duì)列,進(jìn)行消費(fèi)處理操作!
但是在某些情況下,如果我們?cè)诎l(fā)送消息的時(shí)候,當(dāng)前的 exchange 不存在或者指定的路由 key 路由不到,這個(gè)時(shí)候如果我們需要監(jiān)聽(tīng)這種不可達(dá)的消息,就要使用 Return Listener !
在基礎(chǔ)API中有一個(gè)關(guān)鍵的配置項(xiàng):Mandatory:如果為 true,則監(jiān)聽(tīng)器會(huì)接收到路由不可達(dá)的消息,然后進(jìn)行后續(xù)處理,如果為 false,那么 broker 端自動(dòng)刪除該消息!
Return 消息機(jī)制流程圖
Return 消息示例
首先我們需要發(fā)送三條消息,并且故意將第 0 條消息的 routing Key設(shè)置為錯(cuò)誤的,讓他無(wú)法正常路由到消費(fèi)端。
mandatory 設(shè)置為 true 路由不可達(dá)的消息會(huì)被監(jiān)聽(tīng)到,不會(huì)被自動(dòng)刪除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
最后添加監(jiān)聽(tīng)即可監(jiān)聽(tīng)到不可路由到消費(fèi)端的消息channel.addReturnListener(ReturnListener r))
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReturnListeningProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "item.update";
String errRoutingKey = "error.update";
//指定消息的投遞模式:confirm 確認(rèn)模式
channel.confirmSelect();
//發(fā)送
for (int i = 0; i < 3 ; i++) {
String msg = "this is return——listening msg ";
//@param mandatory 設(shè)置為 true 路由不可達(dá)的消息會(huì)被監(jiān)聽(tīng)到,不會(huì)被自動(dòng)刪除
if (i == 0) {
channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
} else {
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
}
System.out.println("Send message : " + msg);
}
//添加一個(gè)確認(rèn)監(jiān)聽(tīng), 這里就不關(guān)閉連接了,為了能保證能收到監(jiān)聽(tīng)消息
channel.addConfirmListener(new ConfirmListener() {
/**
* 返回成功的回調(diào)函數(shù)
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("succuss ack");
}
/**
* 返回失敗的回調(diào)函數(shù)
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("defeat ack");
}
});
//添加一個(gè) return 監(jiān)聽(tīng)
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("return relyCode: " + replyCode);
System.out.println("return replyText: " + replyText);
System.out.println("return exchange: " + exchange);
System.out.println("return routingKey: " + routingKey);
System.out.println("return properties: " + properties);
System.out.println("return body: " + new String(body));
}
});
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReturnListeningConsumer {
public static void main(String[] args) throws Exception {
//1. 創(chuàng)建一個(gè) ConnectionFactory 并進(jìn)行設(shè)置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通過(guò)連接工廠來(lái)創(chuàng)建連接
Connection connection = factory.newConnection();
//3. 通過(guò) Connection 來(lái)創(chuàng)建 Channel
Channel channel = connection.createChannel();
//4. 聲明
String exchangeName = "test_return_exchange";
String queueName = "test_return_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代碼綁定,在管理界面手動(dòng)綁定
channel.queueBind(queueName, exchangeName, routingKey);
//5. 創(chuàng)建消費(fèi)者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
channel.basicConsume(queueName, true, consumer);
}
}
我們只關(guān)注生產(chǎn)端結(jié)果,消費(fèi)端只收到兩條消息。
Send message : this is return——listening msg
Send message : this is return——listening msg
Send message : this is return——listening msg
return relyCode: 312
return replyText: NO_ROUTE
return exchange: test_return_exchange
return routingKey: error.update
return properties: #contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
return body: this is return——listening msg
succuss ack
succuss ack
succuss ack
消費(fèi)端 Ack 和 Nack 機(jī)制
消費(fèi)端進(jìn)行消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償!如果由于服務(wù)器宕機(jī)等嚴(yán)重問(wèn)題,那我們就需要手工進(jìn)行ACK保障消費(fèi)端消費(fèi)成功!消費(fèi)端重回隊(duì)列是為了對(duì)沒(méi)有處理成功的消息,把消息重新會(huì)遞給Broker!一般我們?cè)趯?shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列,也就是設(shè)置為False。
參考 api
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
void basicAck(long deliveryTag, boolean multiple) throws IOException;
如何設(shè)置手動(dòng) Ack 、Nack 以及重回隊(duì)列
首先我們發(fā)送五條消息,將每條消息對(duì)應(yīng)的循環(huán)下標(biāo) i 放入消息的 properties 中作為標(biāo)記,以便于我們?cè)诤竺娴幕卣{(diào)方法中識(shí)別。
其次, 我們將消費(fèi)端的 ·channel.basicConsume(queueName, false, consumer); 中的 autoAck屬性設(shè)置為 false,如果設(shè)置為true的話(huà) 將會(huì)正常輸出五條消息。
我們通過(guò) Thread.sleep(2000)來(lái)延時(shí)一秒,用以看清結(jié)果。我們獲取到properties中的num之后,通過(guò)channel.basicNack(envelope.getDeliveryTag(), false, true);將 num為0的消息設(shè)置為 nack,即消費(fèi)失敗,并且將 requeue屬性設(shè)置為true,即消費(fèi)失敗的消息重回隊(duì)列末端。
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
public class AckAndNackProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String routingKey = "item.update";
String msg = "this is ack msg";
for (int i = 0; i < 5; i++) {
Map headers = new HashMap();
headers.put("num" ,i);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.headers(headers)
.build();
String tem = msg + ":" + i;
channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes());
System.out.println("Send message : " + msg);
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class AckAndNackConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代碼綁定,在管理界面手動(dòng)綁定
channel.queueBind(queueName, exchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if ((Integer) properties.getHeaders().get("num") == 0) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
channel.basicConsume(queueName, false, consumer);
}
}
我們此處只關(guān)心消費(fèi)端輸出,可以看到第 0 條消費(fèi)失敗重新回到隊(duì)列尾部消費(fèi)。
[x] Received 'this is ack msg:1'
[x] Received 'this is ack msg:2'
[x] Received 'this is ack msg:3'
[x] Received 'this is ack msg:4'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
總結(jié)
以上是生活随笔為你收集整理的java确认rabbitmq_RabbitMQ 消息确认机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 章鱼和八爪鱼有什么区别 章鱼和八爪鱼的区
- 下一篇: java换成中文_如果我们的编程替换成中