日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?

發布時間:2023/12/10 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

優先級隊列

方式一:可以通過RabbitMQ管理界面配置隊列的優先級屬性,如下圖的x-max-priority

方式二:代碼設置

Map<String,Object> args = new HashMap<String,Object>();

args.put("x-max-priority", 10);

channel.queueDeclare("queue_priority", true, false, false, args);

這里設置的是一個隊列queue的最大優先級,之后要在發送的消息中設置消息本身的優先級,設置代碼:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

builder.priority(5);

AMQP.BasicProperties properties = builder.build();

channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());

完整代碼:生產者

public class Producer {

??? public static final String ip = "10.0.40.127";

??? public static final int port = 5672;

??? public static final String username = "admin";

??? public static final String password = "123456";

?

??? public static void main(String[] args) throws IOException{

????? ??ConnectionFactory connectionFactory = new ConnectionFactory();

??????? connectionFactory.setPassword(password);

??????? connectionFactory.setUsername(username);

??????? connectionFactory.setPort(port);

??????? connectionFactory.setHost(ip);

?

???? /*?? Connection connection = connectionFactory.newConnection();

??????? Channel channel = connection.createChannel();

?

??????? //create exchange

??????? channel.exchangeDeclare("exchange_priority", "direct", true);

?

??????? //create queue with priority

??????? Map<String, Object> params = new HashMap<>();

??????? params.put("x-max-priority", 10);

??????? channel.queueDeclare("queue_priority", true, false, false, params);

??????? channel.queueBind("queue_priority", "exchange_priority", "rk_priority");

?

??????? //send message with priority

??????? for (int i = 0; i < 10; i++) {

??????????? AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

??????????? if (i % 2 == 0) {

??????????????? builder.priority(5);

??????????? }

??????????? AMQP.BasicProperties properties = builder.build();

??????????? channel.basicPublish("exchange_priority", "rk_priority", properties, ("produce messages-" + i).getBytes());

??????? }

?

??????? channel.close();

??????? connection.close();*/

??? }

}

消費者

public class Consumer {

??? public static final String ip = "10.0.40.127";

??? public static final int port = 5672;

??? public static final String username = "admin";

??? public static final String password = "123456";

?

??? public static void main(String[] args) throws IOException, InterruptedException {

??????? ConnectionFactory connectionFactory = new ConnectionFactory();

??????? connectionFactory.setPassword(password);

??????? connectionFactory.setUsername(username);

??????? connectionFactory.setPort(port);

??????? connectionFactory.setHost(ip);

?

/*??????? Connection connection = connectionFactory.newConnection();

??????? Channel channel = connection.createChannel();

?

??????? QueueingConsumer consumer = new QueueingConsumer(channel);

??????? channel.basicConsume("queue_priority",consumer);

??????? while (true) {

??????????? QueueingConsumer.Delivery delivery = consumer.nextDelivery();

??????????? String msg = new String(delivery.getBody());

??????????? System.out.println(msg);

??????????? channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

??????? }*/

??? }

}

打印輸出:先輸出偶數,后輸出奇數

如何限流?

1、為什么要對消費端限流?

如果Rabbitmq 服務器積壓了有上萬條未處理的消息,如果這時候連上了一個消費端,那么巨量的消息瞬間全部推送過來,但是單個客戶端無法同時處理這么多。當數據量特別大的時候對消費端限流,用于保持消費端的穩定,當消息數量激增的時候很有可能造成資源耗盡,以及影響服務的性能,導致系統的卡頓甚至直接崩潰。

2、限流的實現方式—限流api

RabbitMQ 提供了一種 qos (服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息(通過基于 consume 或者 channel 設置 Qos 的值)未被確認前,不進行消費新的消息。

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

  • prefetchSize:0,單條消息大小限制,0代表不限制
  • prefetchCount:一次性消費的消息數量。告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,即一旦有 N 個消息還沒有 ack,則該 consumer 將 block 掉,直到有消息 ack。
  • global:true、false 是否將上面設置應用于 channel,就是上面限制 channel 級別還是 consumer 級別。當我們設置為 false 的時候生效
  • prefetchCount 在 no_ask=false 的情況下才生效,即在自動應答的情況下這兩個值是不生效的

3、如何進行限流?

  • 首先第一步,使用消費端限流需要關閉自動 ack,將 autoAck 設置為 falsechannel.basicConsume(queueName, false, consumer);
  • 第二步設置具體的限流大小以及數量。channel.basicQos(0, 15, false);
  • 第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設置批量處理 ack 回應為 truechannel.basicAck(envelope.getDeliveryTag(), true);

?

?消息確認機制

1如果沒有開啟ack消息確認,rabbitmq會認為這條消息沒有被消費,會將消息再次放入到隊列中,再次讓你消費,形成死循環;

2、消費端配置了手動ack,但是在異常捕獲中設置了消息重新入隊,那么還是會出現死循環

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

因為最后一個參數requeue一般都會為true,此次沒調用到數據,把這個消息返回到隊列中再消費,如果代碼中出現了int a=1/0,那么還是會造成死循環。

?消息重試機制

當你開啟了手動ack的時候再消費端如果在消費的時候出現異常也會導致循環消費,所以要啟動消息重試機制,默認是3次重試去消費一條消息,如果沒有消費完成,則丟棄(刪除)該消息或者放入死信隊列中或者進行人工補償。

erver.port=8889

?

spring.rabbitmq.host=192.168.221.150

spring.rabbitmq.port=5672

spring.rabbitmq.username=zl

spring.rabbitmq.password=123

#開啟消息確認機制

spring.rabbitmq.publisher-confirms=true

#支持消息發送失敗返回隊列

spring.rabbitmq.publisher-returns=true

?

#設置為 true 后 消費者在消息沒有被路由到合適隊列情況下會被return監聽,而不會自動刪除

spring.rabbitmq.template.mandatory=true

?

spring.rabbitmq.connection-timeout=15000

#用戶虛擬機權限名稱

spring.rabbitmq.virtual-host=/

?

#設置消費端手動 ack?? none不確認? auto自動確認? manual手動確認

spring.rabbitmq.listener.simple.acknowledge-mode=manual

#消費者最小數量

spring.rabbitmq.listener.simple.concurrency=1

#消費之最大數量

spring.rabbitmq.listener.simple.max-concurrency=1

?

#開啟消費者重試機制(為false時關閉消費者重試,這時消費端代碼異常會一直重復收到消息)

spring.rabbitmq.listener.simple.retry.enabled=true

#重試次數5

spring.rabbitmq.listener.simple.retry.max-attempts=5

#重試時間間隔

spring.rabbitmq.listener.simple.retry.initial-interval=5000

?

#重試次數超過上面的設置之后是否丟棄(false不丟棄時需要寫相應代碼將該消息加入死信隊列)

spring.rabbitmq.listener.simple.default-requeue-rejected=true

?

#在單個請求中處理的消息個數,他應該大于等于事務數量(unack的最大數量)

spring.rabbitmq.listener.simple.prefetch=2

?

1、觸發重試機制需要消費者拋出異常,而不能try/catch捕捉異常,不然會死循環

2、對于重試之后仍然異常的消息,mq默認的處理類是RejectAndDontRequeueRecoverer

見名知意。

SimpleRabbitListenerContainerFactoryConfigurer——>>RejectAndDontRequeueRecoverer(實現了MessageRecoverer接口

?

?

MessageRecoverer接口實現類

RejectAndDontRequeueRecoverer

RepublishMessageRecoverer

ImmediateRequeueMessageRecoverer

?

優化處理一對于重試之后仍然異常的消息,可以采用RepublishMessageRecoverer,將消息發送到其他的隊列中,再專門針對新的隊列進行處理

?

優化處理二:采用死信隊列的方式處理重試失敗的消息

/**

?* 死信交換機

?* @return

?*/

@Bean

public DirectExchange dlxExchange(){

??? return new DirectExchange(dlxExchangeName);

}

?

/**

?* 死信隊列

?* @return

?*/

@Bean

public Queue dlxQueue(){

??? return new Queue(dlxQueueName);

}

?

/**

?* 死信隊列綁定死信交換機

?* @param dlxQueue

?* @param dlxExchange

?* @return

?*/

@Bean

public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){

??? return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);

}

業務代碼添加死信交換機、死信路由配置

/**

?* 業務隊列

?* @return

?*/

@Bean

public Queue queue(){

??? Map<String,Object> params = new HashMap<>();

??? params.put("x-dead-letter-exchange",dlxExchangeName);//聲明當前隊列綁定的死信交換機

??? params.put("x-dead-letter-routing-key",dlxRoutingKey);//聲明當前隊列的死信路由鍵

??? return QueueBuilder.durable(queueName).withArguments(params).build();

??? //return new Queue(queueName,true);

}

?

注意點:

1消費者在重試5次后,由于MessageCover默認的實現類是RejectAndDontRequeueRecoverer,也就是requeue=false,因為業務隊列綁定了死信隊列,消息會從業務隊列中刪除,同時發送到死信隊列中。

2如果ack模式是手動ack,那么需要調用channe.nack方法,同時設置requeue=false才會將異常消息發送到死信隊列中

重試使用場景:

對于消費端異常的消息,如果在有限次重試過程中消費成功是最好,如果有限次重試之后仍然失敗的消息,不管是采用RejectAndDontRequeueRecoverer還是使用死信隊列都是可以的,同時也可以采用折中的方法,先將消息從業務隊列中ack掉,再將消息發送到另外的一個隊列中,后續再單獨處理異常數據的隊列

考慮下面兩個場景:

1http下載視頻或者圖片或者調用第三方接口

2空指針異常或者類型轉換異常(其他的受檢查的運行時異常)

第一種重試有意義,第二種重試無意義,需要記錄日志以及人工處理或者輪詢任務方式處理。

重試的使用方式:

1、自動ack模式,不能catch異常

2、手動ack模式,不能try—catch異常

建議自動ack模式使用重試機制,如果一定要在手動ack模式下使用retry功能,最好還是確認在有限次重試過程中可以重試成功,否則超過重試次數,又沒辦法執行nack,會出現消息一直unack死循環

消息冪等性

問題

解決方案

消息重復消費問題:消費者消息處理了,沒來的及提交offset,再重啟可能導致重復消費

方式一:使用全局MessageID判斷消費方使用同一個,解決冪等性。

方式二:用一個消息消費表來記錄每一條消息,給每個一個消息設置一個id(uuid),消費了就保存到表中去。消息過來的時候先查詢是否已經消費。

?

總結

以上是生活随笔為你收集整理的Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。