日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

java确认rabbitmq_RabbitMQ 消息确认机制

發(fā)布時(shí)間:2023/12/2 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java确认rabbitmq_RabbitMQ 消息确认机制 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

生產(chǎn)端 Confirm 消息確認(rèn)機(jī)制

消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果 Broker 收到消息,則會(huì)給我們生產(chǎn)者一個(gè)應(yīng)答。生產(chǎn)者進(jìn)行接收應(yīng)答,用來(lái)確定這條消息是否正常的發(fā)送到 Broker ,這種方式也是消息的可靠性投遞的核心保障!

Confirm 確認(rèn)機(jī)制流程圖

如何實(shí)現(xiàn)Confirm確認(rèn)消息?

第一步:在 channel 上開(kāi)啟確認(rèn)模式: channel.confirmSelect()

第二步:在 channel 上添加監(jiān)聽(tīng): channel.addConfirmListener(ConfirmListener listener);, 監(jiān)聽(tīng)成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對(duì)消息進(jìn)行重新發(fā)送、或記錄日志等后續(xù)處理!

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.ConfirmListener;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class ConfirmProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_confirm_exchange";

String routingKey = "item.update";

//指定消息的投遞模式:confirm 確認(rèn)模式

channel.confirmSelect();

//發(fā)送

final long start = System.currentTimeMillis();

for (int i = 0; i < 5 ; i++) {

String msg = "this is confirm msg ";

channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

System.out.println("Send message : " + msg);

}

//添加一個(gè)確認(rèn)監(jiān)聽(tīng), 這里就不關(guān)閉連接了,為了能保證能收到監(jiān)聽(tīng)消息

channel.addConfirmListener(new ConfirmListener() {

/**

* 返回成功的回調(diào)函數(shù)

*/

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.out.println("succuss ack");

System.out.println(multiple);

System.out.println("耗時(shí):" + (System.currentTimeMillis() - start) + "ms");

}

/**

* 返回失敗的回調(diào)函數(shù)

*/

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.out.printf("defeat ack");

System.out.println("耗時(shí):" + (System.currentTimeMillis() - start) + "ms");

}

});

}

}

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConfirmConsumer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_confirm_exchange";

String queueName = "test_confirm_queue";

String routingKey = "item.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

channel.queueDeclare(queueName, false, false, false, null);

//一般不用代碼綁定,在管理界面手動(dòng)綁定

channel.queueBind(queueName, exchangeName, routingKey);

//創(chuàng)建消費(fèi)者并接收消息

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

}

};

//設(shè)置 Channel 消費(fèi)者綁定隊(duì)列

channel.basicConsume(queueName, true, consumer);

}

}

我們此處只關(guān)注生產(chǎn)端輸出消息

Send message : this is confirm msg

Send message : this is confirm msg

Send message : this is confirm msg

Send message : this is confirm msg

Send message : this is confirm msg

succuss ack

true

耗時(shí):3ms

succuss ack

true

耗時(shí):4ms

注意事項(xiàng)

我們采用的是異步 confirm 模式:提供一個(gè)回調(diào)方法,服務(wù)端 confirm 了一條或者多條消息后 Client 端會(huì)回調(diào)這個(gè)方法。除此之外還有單條同步 confirm 模式、批量同步 confirm 模式,由于現(xiàn)實(shí)場(chǎng)景中很少使用我們?cè)诖瞬蛔鼋榻B,如有興趣直接參考官方文檔。

我們運(yùn)行生產(chǎn)端會(huì)發(fā)現(xiàn)每次運(yùn)行結(jié)果都不一樣,會(huì)有多種情況出現(xiàn),因?yàn)?Broker 會(huì)進(jìn)行優(yōu)化,有時(shí)會(huì)批量一次性 confirm ,有時(shí)會(huì)分開(kāi)幾條 confirm。

succuss ack

true

耗時(shí):3ms

succuss ack

false

耗時(shí):4ms

或者

succuss ack

true

耗時(shí):3ms

Return 消息機(jī)制

Return Listener 用于處理一-些不可路 由的消息!

消息生產(chǎn)者,通過(guò)指定一個(gè) Exchange 和 Routingkey,把消息送達(dá)到某一個(gè)隊(duì)列中去,然后我們的消費(fèi)者監(jiān)聽(tīng)隊(duì)列,進(jìn)行消費(fèi)處理操作!

但是在某些情況下,如果我們?cè)诎l(fā)送消息的時(shí)候,當(dāng)前的 exchange 不存在或者指定的路由 key 路由不到,這個(gè)時(shí)候如果我們需要監(jiān)聽(tīng)這種不可達(dá)的消息,就要使用 Return Listener !

在基礎(chǔ)API中有一個(gè)關(guān)鍵的配置項(xiàng):Mandatory:如果為 true,則監(jiān)聽(tīng)器會(huì)接收到路由不可達(dá)的消息,然后進(jìn)行后續(xù)處理,如果為 false,那么 broker 端自動(dòng)刪除該消息!

Return 消息機(jī)制流程圖

Return 消息示例

首先我們需要發(fā)送三條消息,并且故意將第 0 條消息的 routing Key設(shè)置為錯(cuò)誤的,讓他無(wú)法正常路由到消費(fèi)端。

mandatory 設(shè)置為 true 路由不可達(dá)的消息會(huì)被監(jiān)聽(tīng)到,不會(huì)被自動(dòng)刪除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());

最后添加監(jiān)聽(tīng)即可監(jiān)聽(tīng)到不可路由到消費(fèi)端的消息channel.addReturnListener(ReturnListener r))

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReturnListeningProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_return_exchange";

String routingKey = "item.update";

String errRoutingKey = "error.update";

//指定消息的投遞模式:confirm 確認(rèn)模式

channel.confirmSelect();

//發(fā)送

for (int i = 0; i < 3 ; i++) {

String msg = "this is return——listening msg ";

//@param mandatory 設(shè)置為 true 路由不可達(dá)的消息會(huì)被監(jiān)聽(tīng)到,不會(huì)被自動(dòng)刪除

if (i == 0) {

channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());

} else {

channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());

}

System.out.println("Send message : " + msg);

}

//添加一個(gè)確認(rèn)監(jiān)聽(tīng), 這里就不關(guān)閉連接了,為了能保證能收到監(jiān)聽(tīng)消息

channel.addConfirmListener(new ConfirmListener() {

/**

* 返回成功的回調(diào)函數(shù)

*/

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.out.println("succuss ack");

}

/**

* 返回失敗的回調(diào)函數(shù)

*/

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.out.printf("defeat ack");

}

});

//添加一個(gè) return 監(jiān)聽(tīng)

channel.addReturnListener(new ReturnListener() {

public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("return relyCode: " + replyCode);

System.out.println("return replyText: " + replyText);

System.out.println("return exchange: " + exchange);

System.out.println("return routingKey: " + routingKey);

System.out.println("return properties: " + properties);

System.out.println("return body: " + new String(body));

}

});

}

}

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReturnListeningConsumer {

public static void main(String[] args) throws Exception {

//1. 創(chuàng)建一個(gè) ConnectionFactory 并進(jìn)行設(shè)置

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

//2. 通過(guò)連接工廠來(lái)創(chuàng)建連接

Connection connection = factory.newConnection();

//3. 通過(guò) Connection 來(lái)創(chuàng)建 Channel

Channel channel = connection.createChannel();

//4. 聲明

String exchangeName = "test_return_exchange";

String queueName = "test_return_queue";

String routingKey = "item.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

channel.queueDeclare(queueName, false, false, false, null);

//一般不用代碼綁定,在管理界面手動(dòng)綁定

channel.queueBind(queueName, exchangeName, routingKey);

//5. 創(chuàng)建消費(fèi)者并接收消息

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

}

};

//6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列

channel.basicConsume(queueName, true, consumer);

}

}

我們只關(guān)注生產(chǎn)端結(jié)果,消費(fèi)端只收到兩條消息。

Send message : this is return——listening msg

Send message : this is return——listening msg

Send message : this is return——listening msg

return relyCode: 312

return replyText: NO_ROUTE

return exchange: test_return_exchange

return routingKey: error.update

return properties: #contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)

return body: this is return——listening msg

succuss ack

succuss ack

succuss ack

消費(fèi)端 Ack 和 Nack 機(jī)制

消費(fèi)端進(jìn)行消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償!如果由于服務(wù)器宕機(jī)等嚴(yán)重問(wèn)題,那我們就需要手工進(jìn)行ACK保障消費(fèi)端消費(fèi)成功!消費(fèi)端重回隊(duì)列是為了對(duì)沒(méi)有處理成功的消息,把消息重新會(huì)遞給Broker!一般我們?cè)趯?shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列,也就是設(shè)置為False。

參考 api

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

void basicAck(long deliveryTag, boolean multiple) throws IOException;

如何設(shè)置手動(dòng) Ack 、Nack 以及重回隊(duì)列

首先我們發(fā)送五條消息,將每條消息對(duì)應(yīng)的循環(huán)下標(biāo) i 放入消息的 properties 中作為標(biāo)記,以便于我們?cè)诤竺娴幕卣{(diào)方法中識(shí)別。

其次, 我們將消費(fèi)端的 ·channel.basicConsume(queueName, false, consumer); 中的 autoAck屬性設(shè)置為 false,如果設(shè)置為true的話(huà) 將會(huì)正常輸出五條消息。

我們通過(guò) Thread.sleep(2000)來(lái)延時(shí)一秒,用以看清結(jié)果。我們獲取到properties中的num之后,通過(guò)channel.basicNack(envelope.getDeliveryTag(), false, true);將 num為0的消息設(shè)置為 nack,即消費(fèi)失敗,并且將 requeue屬性設(shè)置為true,即消費(fèi)失敗的消息重回隊(duì)列末端。

import com.rabbitmq.client.*;

import java.util.HashMap;

import java.util.Map;

public class AckAndNackProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_ack_exchange";

String routingKey = "item.update";

String msg = "this is ack msg";

for (int i = 0; i < 5; i++) {

Map headers = new HashMap();

headers.put("num" ,i);

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()

.deliveryMode(2)

.headers(headers)

.build();

String tem = msg + ":" + i;

channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes());

System.out.println("Send message : " + msg);

}

channel.close();

connection.close();

}

}

import com.rabbitmq.client.*;

import java.io.IOException;

public class AckAndNackConsumer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

Connection connection = factory.newConnection();

final Channel channel = connection.createChannel();

String exchangeName = "test_ack_exchange";

String queueName = "test_ack_queue";

String routingKey = "item.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

channel.queueDeclare(queueName, false, false, false, null);

//一般不用代碼綁定,在管理界面手動(dòng)綁定

channel.queueBind(queueName, exchangeName, routingKey);

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

if ((Integer) properties.getHeaders().get("num") == 0) {

channel.basicNack(envelope.getDeliveryTag(), false, true);

} else {

channel.basicAck(envelope.getDeliveryTag(), false);

}

}

};

//6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列

channel.basicConsume(queueName, false, consumer);

}

}

我們此處只關(guān)心消費(fèi)端輸出,可以看到第 0 條消費(fèi)失敗重新回到隊(duì)列尾部消費(fèi)。

[x] Received 'this is ack msg:1'

[x] Received 'this is ack msg:2'

[x] Received 'this is ack msg:3'

[x] Received 'this is ack msg:4'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

總結(jié)

以上是生活随笔為你收集整理的java确认rabbitmq_RabbitMQ 消息确认机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。