日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

rabbitmq的死信队列(四)

發布時間:2023/12/15 综合教程 25 生活家
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq的死信队列(四) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

死信隊列

死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息成為Dead message后,可以被重新發送到另一個交換機,這個交換機就是DLX。

消息成為死信的三種情況:

  1. 隊列消息長度到達限制;

  2. 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;

  3. 原隊列存在消息過期設置,消息到達超時時間未被消費;

隊列綁定死信交換機:

給隊列設置參數: x-dead-letter-exchange 和 x-dead-letter-routing-key和x-message-ttl和x-max-length

    x-dead-letter-exchange:綁定的死信交換機名稱

   x-dead-letter-routing-key:綁定正常隊列和死信交換機的路由

   x-dead-letter-routing-key:ttl過期時間

   x-max-length:設置正常隊列長度限制

rabbitmq-high-producer項目

application.properties文件

server.port=8081
# ip
spring.rabbitmq.host=127.0.0.1
#默認5672
spring.rabbitmq.port=5672
#用戶名
spring.rabbitmq.username=guest
#密碼
spring.rabbitmq.password=guest
#連接到代理時用的虛擬主機
spring.rabbitmq.virtual-host=/
#是否啟用【發布確認】,默認false
#spring.rabbitmq.publisher-confirm-type=correlated替換spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
#是否啟用【發布返回】,默認false
spring.rabbitmq.publisher-returns=true
#表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#rabbitmq限流,必須在ack確認才能使用
#消費者最小數量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消費者數量
spring.rabbitmq.listener.simple.max-concurrency=10
#在單個請求中處理的消息個數,他應該大于等于事務數量(unack的最大數量)
spring.rabbitmq.listener.simple.prefetch=2
        
DlxQueueRabbitConfig類
package com.qingfeng.rabbitmqhighproducer.dlx.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信隊列
 */
@Configuration
public class DlxQueueRabbitConfig {

    //正常隊列名稱
    public static final String NORMAL_DLX_QUEUE = "normal_dlx_queue";
    //正常交換機名稱
    public static final String NORMAL_DLX_Exchange = "normal_dlx_exchange";

    //ttl過期時間毫秒
    private static final int NORMAL_DLX_EXPIRATION = 10000;

    //設置正常隊列長度限制
    private static final int NORMAL_DLX_LENGTH = 10;

    //死信隊列名稱
    public static final String DLX_QUEUE = "dlx_queue";
    //死信交換機名稱
    public static final String DLX_Exchange = "dlx_exchange";


    //聲明正常交換機
    @Bean("normalDlxExchange")
    public TopicExchange normalDlxExchange(){
        return new TopicExchange(NORMAL_DLX_Exchange);
    }

    //聲明正常隊列綁定死信隊列的交換機
    @Bean("normalDlxQueue")
    public Queue normalDlxQueue(){
        return QueueBuilder.durable(NORMAL_DLX_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_Exchange)
                .withArgument("x-dead-letter-routing-key", "dlx.wq")
                .withArgument("x-message-ttl", NORMAL_DLX_EXPIRATION)
                .withArgument("x-max-length",NORMAL_DLX_LENGTH)
                .build();
    }

    //聲明正常隊列和正常交換機的綁定
    @Bean
    public Binding normalDlxBinding(){
        return BindingBuilder.bind(normalDlxQueue()).to(normalDlxExchange()).with("test.dlx.#");
    }

//=========================================================================

    //聲明死信隊列
    @Bean
    public Queue dlxQueue(){
        return new Queue(DLX_QUEUE);
    }
    //聲明死信交換機
    @Bean
    public TopicExchange dlxExchange(){
        return new TopicExchange(DLX_Exchange);
    }
    //聲明死信隊列和死信交換機的綁定
    @Bean
    public Binding dlxBinding(){
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.#");
    }


}
DlxController類
package com.qingfeng.rabbitmqhighproducer.dlx;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("dlx")
public class DlxController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //http://127.0.0.1:8081/dlx/testTimeDLX
    //測試時間過期
    @GetMapping("/testTimeDLX")
    public String testTimeDLX() {
        String messageId = String.valueOf(UUID.randomUUID());
        //normal_dlx_exchange正常交換機  test.dlx.wq:正常交換機與正常綁定的隊列的路由
        rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"變成死信隊列消息");
        return "ok";
    }


}

啟動rabbitmq-high-producer項目

1.測試原隊列存在消息過期設置,消息到達超時時間未被消費

http://127.0.0.1:8081/dlx/testTimeDLX

我們在設置的ttl過期時間10000毫秒過后,也就是10秒后,正常隊列的消息會轉到死信隊列里面去

2.測試隊列消息長度到達限制

DlxController類
package com.qingfeng.rabbitmqhighproducer.dlx;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("dlx")
public class DlxController {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    //http://127.0.0.1:8081/dlx/veroLengthDLX
    //2.測試隊列超出長度
    @GetMapping("/veroLengthDLX")
    public String veroLengthDLX() {
        for (int i=0;i<20;i++){
            String messageId = String.valueOf(UUID.randomUUID());
            rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"變成死信隊列消息");
        }
        return "ok";
    }



}

啟動rabbitmq-high-producer項目

訪問:http://127.0.0.1:8081/dlx/veroLengthDLX
設置正常隊列長度限制為10,我們生產者發送了20個消息,正常隊列只能保存10個

我們在設置的ttl過期時間10000毫秒過后,也就是10秒后,正常隊列的消息會全部轉到死信隊列里面去

3消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;

在rabbitmq-high-producer項目的DlxController類添加

package com.qingfeng.rabbitmqhighproducer.dlx;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("dlx")
public class DlxController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //3.測試消息被消費者拒收

//http://127.0.0.1:8081/dlx/rejectionDLX
    @GetMapping("/rejectionDLX")
    public String rejectionDLX() {
        String messageId = String.valueOf(UUID.randomUUID());
        rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"變成死信隊列消息");
        return "ok";
    }

}

在rabbitmq-high-consumer項目
DxlListener類  開啟int i = 1/0;//出現錯誤
package com.qingfeng.rabbitmqhighconsumer.dxl;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Consumer ACK機制:
 *  1. 設置手動簽收。 spring.rabbitmq.listener.simple.acknowledge-mode=manual
 *  2. 讓監聽器類實現ChannelAwareMessageListener接口
 *  3. 如果消息成功處理,則調用channel的 basicAck()簽收
 *  4. 如果消息處理失敗,則調用channel的basicNack()拒絕簽收,broker重新發送給consumer
 */

@Component
public class DxlListener {

    //手動簽收
    @RabbitHandler
    @RabbitListener(queues = "normal_dlx_queue")
    public void onMessage(Message message, Channel channel) throws Exception {
        //Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收轉換消息
            System.out.println("接受到的消息為"+new String(message.getBody()));

            //2. 處理業務邏輯
            System.out.println("處理業務邏輯...");
            int i = 1/0;//出現錯誤
            //3. 手動簽收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            /**
             * 4.有異常就拒絕簽收
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * 第三個參數:requeue:重回隊列。如果設置為true,則消息重新回到queue,broker會重新發送該消息給消費
             * requeue:true為將消息重返當前消息隊列,還可以重新發送給消費者;
             * alse:將消息丟棄
             */
            System.out.println("有異常就拒絕簽收");
            //拒絕簽收,不重回隊列,requeue為false,這樣才能到死信隊列里面去
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

啟動rabbitmq-high-producer和rabbitmq-high-consumer項目


測試:http://127.0.0.1:8081/dlx/rejectionDLX

在rabbitmq-high-consumer項目consumer拒絕接收消息,直接轉到死信隊列去了

小結:

  1. 死信交換機和死信隊列和普通的沒有區別

  2. 當消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列

  3. 消息成為死信的三種情況:
    1. 隊列消息長度到達限制;
    2. 消費者拒接消費消息,并且不重回隊列;
    3. 原隊列存在消息過期設置,消息到達超時時間未被消費;




總結

以上是生活随笔為你收集整理的rabbitmq的死信队列(四)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 中文字幕第三页 | 成人传媒 | 在线能看的av网站 | 国v精品久久久网 | 69av视频在线观看 | 国产a黄 | 天天想你在线观看完整版高清 | 日本一区二区在线看 | 色九九 | www.浪潮av.com | 亚洲图片88 | 最新最近中文字幕 | 国产理论片在线观看 | 蜜臀在线观看 | 日韩色道 | 亚洲熟女少妇一区二区 | 一本色道久久hezyo加勒比 | 中文在线观看免费视频 | 国产情侣久久 | 日韩高清在线观看 | 国产吞精囗交免费视频网站 | 美日韩三级 | 少妇69xx| 99久久这里只有精品 | 91麻豆免费看| 一久久久久 | 国产精品99精品久久免费 | 天天综合一区 | av首页在线观看 | 欧美日韩国产激情 | 韩漫动漫免费大全在线观看 | 国产精品第5页 | 1024av在线| 一级黄色片免费播放 | 99热99| 亚洲国产电影在线观看 | 成人免费xxxxx在线观看 | 久草福利免费 | 福利姬在线观看 | 国产精品99久久免费黑人人妻 | 亚洲九九爱 | 国产一区二区三区在线观看视频 | 亚洲国产精品毛片av不卡在线 | 天天干狠狠干 | 亚洲福利网站 | 久草视频精品在线 | 日韩欧美三级视频 | av秋霞| 日本少妇videos高潮 | 在线观看高清视频 | 偷拍久久久 | 欧美肉丝袜videos办公室 | 日本人三级 | 麻豆短视频| 波多野结衣电影免费观看 | 毛片网站在线 | 中文字幕av久久爽一区 | 女人又爽又黄免费女仆 | 日韩国产片 | 神马午夜久久 | 亚洲精品中文字幕成人片 | 91成人国产 | 黄色片一区二区三区 | 久久久夜精品 | 最新福利在线 | 中文字幕色片 | 国产精品露脸视频 | 91欧美一区二区 | 亚洲在线免费视频 | 日本少妇一级片 | 熟妇的味道hd中文字幕 | 色综合天天综合网天天看片 | 国产av 一区二区三区 | 国产精品一区二区三区免费视频 | 夜夜欢视频 | 老妇裸体性激交老太视频 | 性一交一乱一伧老太 | 琪琪电影午夜理论片八戒八戒 | 99这里只有 | 国产一区二区内射 | jjzzjjzz欧美69巨大 | 亚洲成人精品网 | 天天射天天拍 | 欧美一区二区三区在线免费观看 | 人妻夜夜爽天天爽 | 国产美女明星三级做爰 | 瑟瑟视频在线观看 | 逼逼爱插插网站 | 黑人和白人做爰 | 中文字幕国产日韩 | 免费成人美女女 | 男人天堂avav | 久久无码高潮喷水 | 亚洲一区二区中文字幕 | 成人av激情| 波多野结衣免费看 | 中国一级片黄色一级片黄 | 91嫩草网 | 91精品久久久久久久久久入口 |