日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[RabbitMQ]消息应答概念_消息手动应答代码

發布時間:2023/12/4 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [RabbitMQ]消息应答概念_消息手动应答代码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

消息應答

概念

消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并僅只完成了部分突然它掛掉了,會發生什么情況。RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消
息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續發送給該消費這的消息,因為它無法接收到。

為了保證消息在發送過程中不丟失,rabbitmq 引入消息應答機制,消息應答就是:消費者在接收到消息并且處理該消息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該消息刪除了。

自動應答

消息發送后立即被認為已經傳送成功,這種模式需要在高吞吐量和數據傳輸安全性方面做權衡,因為這種模式如果消息在接收到之前,消費者那邊出現連接或者 channel 關閉,那么消息就丟失了,當然另一方面這種模式消費者那邊可以傳遞過載的消息,沒有對傳遞的消息數量進行限制,當然這樣有可能使得消費者這邊由于接收太多還來不及處理的消息,導致這些消息的積壓,最終使得內存耗盡,最終這些消費者線程被操作系統殺死,所以這種模式僅適用在消費者可以高效并以某種速率能夠處理這些消息的情況下使用。

消息應答的方法

A.Channel.basicAck(用于肯定確認)
RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了

B.Channel.basicNack(用于否定確認)

C.Channel.basicReject(用于否定確認)
與 Channel.basicNack 相比少一個參數(批量處理參數)
不處理該消息了直接拒絕,可以將其丟棄了

Multiple 的解釋

手動應答的好處是可以批量應答并且減少網絡擁堵

multiple 的 true 和 false 代表不同意思

true 代表批量應答 channel 上未應答的消息

  • 比如說 channel 上有傳送 tag 的消息 5,6,7,8 當前 tag 是 8 那么此時5-8 的這些還未應答的消息都會被確認收到消息應答

false 同上面相比

  • 只會應答 tag=8 的消息 5,6,7 這三個消息依然不會被確認收到消息應答

消息應答自動重新入隊

如果消費者由于某些原因失去連接(其通道已關閉,連接已關閉或 TCP 連接丟失),導致消息未發送 ACK 確認,RabbitMQ 將了解到消息未完全處理,并將對其重新排隊。如果此時其他消費者可以處理,它將很快將其重新分發給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確保不會丟失任何消息

消息手動應答代碼

默認消息采用的是自動應答,所以我們要想實現消息消費過程中不丟失,需要把自動應答改為手動應答,消費者在上面代碼的基礎上增加下面畫紅色部分代碼。

消息生產者

package com.atguigu.three;import com.atguigu.utils.RabbitMqUtils; import com.rabbitmq.client.Channel;import java.util.Scanner;/*** 消息在手動應答時是不丟失,放回隊列中重新消費**/ public class Task02 {//隊列名稱public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//聲明隊列channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);//從控制臺中輸入信息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("生產者發出消息:"+message);}}}

睡眠工具類

public class SleepUtils {public static void sleep(int second){try {Thread.sleep(1000*second);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}} }

消費者

package com.atguigu.three;import com.atguigu.utils.RabbitMqUtils; import com.atguigu.utils.SleepUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;/**** 消息在手動應答時是不丟失,放回隊列中重新消費*/ public class Worker03 {//隊列名稱public static final String TASK_QUEUE_NAME = "ack_queue";//接收消息public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1等待接收消息處理時間較短");DeliverCallback deliverCallback = (cousumerTag,message)->{//沉睡1SSleepUtils.sleep(1);System.out.println("接收到的消息:" + new String(message.getBody(),"UTF-8" ));//手動應答/*** 1.消息的標記 tag* 2.是否批量應答 false:不批量應答信道中的消息 true:批量**/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手動應答boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag ->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");}));}} package com.atguigu.three;import com.atguigu.utils.RabbitMqUtils; import com.atguigu.utils.SleepUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;/**** 消息在手動應答時是不丟失,放回隊列中重新消費*/ public class Worker04 {//隊列名稱public static final String TASK_QUEUE_NAME = "ack_queue";//接收消息public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2等待接收消息處理時間較短");DeliverCallback deliverCallback = (cousumerTag,message)->{//沉睡1SSleepUtils.sleep(30);System.out.println("接收到的消息:" + new String(message.getBody(),"UTF-8" ));//手動應答/*** 1.消息的標記 tag* 2.是否批量應答 false:不批量應答信道中的消息 true:批量**/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手動應答boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag ->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");}));}}

手動應答效果演示

正常情況下消息發送方發送兩個消息 C1 和 C2 分別接收到消息并進行處理

在發送者發送消息 dd,發出消息之后的把 C2 消費者停掉,按理說該 C2 來處理該消息,但是由于它處理時間較長,在還未處理完,也就是說 C2 還沒有執行 ack 代碼的時候,C2 被停掉了,此時會看到消息被 C1 接收到了,說明消息 dd 被重新入隊,然后分配給能處理消息的 C1 處理了

總結

以上是生活随笔為你收集整理的[RabbitMQ]消息应答概念_消息手动应答代码的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。