日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ教程大全看这一篇就够了-java版本

發(fā)布時間:2023/12/29 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ教程大全看这一篇就够了-java版本 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

目錄

什么是RabbitMQ?

RabbitMQ 核心概念

Docker 安裝 RabbitMQ?

RabbitMQ 控制臺頁面介紹

RabbitMQ 交換機(jī)?Exchange 介紹

Direct Exchange 定向、直連交換機(jī)

Fanout Exchange 發(fā)布/訂閱、廣播、扇形交換機(jī)

Topic Exchange 主題、通配符交換機(jī)

Headers Exchanges(少用)

RabbitMQ 代碼 Java版

1:簡單隊列

2:工作隊列?Work Queues?

3:Fanout 發(fā)布/訂閱 交換機(jī)模式

4:Routing 路由 交換機(jī)模式

5:Topic?主題 交換機(jī)模式

SpringBoot 整合 RabbitMQ

RabbitMQ 消息可靠性投遞

生產(chǎn)者到交換機(jī) 開啟ACK確認(rèn)可靠消息投遞

交換機(jī)到隊列 可靠消息投遞

RabbitMQ 消息確認(rèn)消費ACK

消息的可靠消費

SpringBoot 配置 RabbitMQ 廣播 發(fā)布/訂閱

RabbitMQ TTL死信隊列

什么是 TTL?

什么是死信隊列?

什么是死信交換機(jī)?

消息什么情況下會成為死信消息?

如何設(shè)置消息的TTL存活時間?

RabbitMQ 控制臺 操作 死信隊列綁定死信交換機(jī)

RabbitMQ 延遲隊列

什么是延遲隊列?

定時消息的使用場景

RabbitMQ 實現(xiàn)延遲消息

SpringBoot 實現(xiàn)延遲隊列

RabbitMQ 的集群環(huán)境

RabbitMQ 普通集群模式的介紹

RabbitMQ 搭建普通集群環(huán)境

1:準(zhǔn)備節(jié)點環(huán)境

2:節(jié)點配置成集群

3:SpringBoot 整合 RabbitMQ 普通集群

RabbitMQ 鏡像集群模式的介紹(推薦)

策略policy介紹


官網(wǎng):https://www.rabbitmq.com/

什么是RabbitMQ?

RabbitMQ是一個開源的AMQP實現(xiàn),服務(wù)器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、C、用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不錯,與SpringAMQP完美的整合、API豐富易用

RabbitMQ 核心概念

  • Broker

    RabbitMQ的服務(wù)端程序,可以認(rèn)為一個mq節(jié)點就是一個broker。

  • Producer 生產(chǎn)者

    創(chuàng)建消息Message,然后發(fā)布到RabbitMQ隊列中

  • Consumer 消費者

    消費隊列中的消息

  • Message 消息

    生產(chǎn)消費的內(nèi)容,有消息頭和消息體,也包括多個屬性配置,比如routingKey路由鍵

  • Queue 隊列

    是RabbitMQ的內(nèi)部對象,用于存儲消息,消息都只能存儲在隊列中

  • Channel 信道

    一條支持多路復(fù)用的通道,獨立的雙向數(shù)據(jù)流通道,可以發(fā)布、訂閱、接收消息。信道是建立在真實的TCP連接內(nèi)的虛擬連接,復(fù)用TCP連接的通道

  • Connection 連接

    是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關(guān)部分邏輯,一個連接上可以有多個channel進(jìn)行通信

  • Exchange 交換器

    生產(chǎn)者將消息發(fā)送到Exchange,交換器將消息路由到一個或者多個隊列中,隊列和交換機(jī)是多對多的關(guān)系。

  • RoutingKey 路由鍵

    生產(chǎn)者將消息發(fā)給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規(guī)則

    最大長度255 字節(jié)

  • Binding 綁定

    通過綁定將交換器與隊列關(guān)聯(lián)起來,在綁定的時候一般會指定一個綁定鍵 (BindingKey),這樣 RabbitMQ 就知道如何正確地將消息路由到隊列了

  • Virtual host 虛擬主機(jī)

    用于不同業(yè)務(wù)模塊的邏輯隔離,一個Virtual Host里面可以有若干個Exchange和Queue,同一個VirtualHost里面不能有相同名稱的Exchange或Queue

    默認(rèn)是 / ,可以使用 /dev /test /pro


Docker 安裝 RabbitMQ?

使用源碼安裝需要的依賴多、且版本和維護(hù)相對復(fù)雜,需要erlang環(huán)境、版本也有要求。

linux 上安裝 docker

https://github.com/docker-library/docs/tree/master/rabbitmq

docker pull rabbitmq:management? ? ? ? ? ? ?

// 拉取遠(yuǎn)程鏡像,management 帶 后臺管理頁面的版本

docker images? ? ? ? ? ? ? ? ? ? ? ? ????????????????

// 查看本機(jī)存在的鏡像

docker run -d -h rabbitmq_1 --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management?

// 運行 docker 鏡像

參數(shù)說明:

  • run -d :? run 運行鏡像 -d 后臺運行
  • -h :自定義容器的主機(jī)名,它會被寫到容器內(nèi)的 /etc/hostname 和 /etc/hosts,作為容器主機(jī)IP的別名,并且將顯示在容器的bash中
  • --name:自定義容器名稱
  • -p 15672:15672 :management 界面管理訪問端口
  • -p 5672:5672? ? ?:amqp 訪問端口
  • -e rabbitma參數(shù)

rabbitmq 訪問地址:http://ip:15672? ? ? ? ? ? ? ? // 如果訪問不了,請查看防火墻端口是否開放

rabbitmq 默認(rèn)登錄賬號和密碼:guest/guest

開機(jī)自動啟動 rabbitmq

docker update 容器ID?--restart=always

rabbitma 的主要端口

4369 ????????erlang 發(fā)現(xiàn)口
5672 ????????client 端通信口
15672? ? ? ?管理界面 ui 端口
25672 ??????server 間內(nèi)部通信口


RabbitMQ 控制臺頁面介紹

RabbitMQ控制面板介紹 - 簡書


RabbitMQ 交換機(jī)?Exchange 介紹

  • 生產(chǎn)者將消息發(fā)送到 Exchange,交換器將消息路由到一個或者多個隊列中,交換機(jī)有多個類型,隊列和交換機(jī)是多對多的關(guān)系。
  • 交換機(jī)只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,如果沒有隊列和exchange綁定,或者沒有符合的路由規(guī)則,則消息會被丟失。
  • RabbitMQ有四種交換機(jī)類型,分別是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后的不常用。
  • Direct Exchange 定向、直連交換機(jī)

  • 將一個隊列綁定到交換機(jī)上,要求該消息與一個特定的路由鍵完全匹配
  • 例子:如果一個隊列綁定到該交換機(jī)上要求路由鍵 “aabb”,則只有被標(biāo)記為“aabb”的消息才被轉(zhuǎn)發(fā),不會轉(zhuǎn)發(fā)aabb.cc,也不會轉(zhuǎn)發(fā)gg.aabb,只會轉(zhuǎn)發(fā)aabb
  • 處理路由健
  • Fanout Exchange 發(fā)布/訂閱、廣播、扇形交換機(jī)

  • 只需要簡單的將隊列綁定到交換機(jī)上,一個發(fā)送到交換機(jī)的消息都會被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊列上。很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息
  • Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的,用于發(fā)布訂閱,廣播形式,中文是扇形
  • 不處理路由健
  • Topic Exchange 主題、通配符交換機(jī)

  • 主題交換機(jī)是一種發(fā)布/訂閱的模式,結(jié)合了直連交換機(jī)與扇形交換機(jī)的特點
  • 將路由鍵和某模式進(jìn)行匹配。此時隊列需要綁定在一個模式上
  • 符號“#”匹配一個或多個詞,符號“*”匹配只匹配一個詞
  • 例子:因此“abc.#”能夠匹配到“abc.def.ghi”,但是“abc.*” 只會匹配到“abc.def”
  • Headers Exchanges(少用)

  • 根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配, 在綁定Queue與Exchange時指定一組鍵值對
  • 當(dāng)消息發(fā)送到RabbitMQ時會取到該消息的headers與Exchange綁定時指定的鍵值對進(jìn)行匹配
  • 如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列
  • 不處理路由鍵

RabbitMQ 代碼 Java版

maven項目中依賴rabbitmq的包

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version> </dependency>

1:簡單隊列

官網(wǎng)教程:RabbitMQ tutorial - "Hello World!" — RabbitMQ

  • 發(fā)送消息
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;// 發(fā)送消息 public class Send {// 隊列名稱private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 創(chuàng)建連接工廠對象ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75"); // rabbit server 所在IP地址factory.setPort(5672); // rabbit server amqp端口號factory.setUsername("guest"); // rabbit server 登錄賬號factory.setPassword("guest"); // rabbit server 登錄密碼factory.setVirtualHost("/dev"); // 指定連接到哪個虛擬主機(jī)try (// 創(chuàng)建連接Connection connection = factory.newConnection();// 創(chuàng)建信道Channel channel = connection.createChannel()) {/** queueDeclare:隊列不存在時自動創(chuàng)建隊列,如果存在使用存在的* 參數(shù)1:隊列名稱* 參數(shù)2:是否持久化* 參數(shù)3:是否獨占* 參數(shù)4:沒有消費者的時候是否自動刪除隊列* 參數(shù)5:其他*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息String message = "Hello World!";/** 發(fā)布消息* 參數(shù)1:交換機(jī)* 參數(shù)2:隊列* 參數(shù)3:其他額外的參數(shù)* 參數(shù)4:要發(fā)送的消息,byte[]類型*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.err.println(" [x] Sent '" + message + "'");}} }

執(zhí)行代碼,可以在 rabbitmq 控制臺上看到隊列已經(jīng)被創(chuàng)建了,并且有一條未被消費的消息

  • 消費消息
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;// 消費消息 public class Recv {// 隊列名稱private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.err.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + message + "'");};/** basicConsume:監(jiān)聽隊列* 參數(shù)1:監(jiān)聽的隊列名稱* 參數(shù)2:autoAck:是否在收到消息后自動確認(rèn)(消費端拿到消息后,自動告訴 rabbitmq server 我已經(jīng)收到消息了)* 參數(shù)3:回調(diào),處理消息*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});// 第2個監(jiān)聽隊列的方法 // Consumer consumer = new DefaultConsumer(channel) { // @Override // public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // String message = new String(body, StandardCharsets.UTF_8); // System.out.println(" [x] Received '" + message + "'"); // } // }; // channel.basicConsume(QUEUE_NAME, true, consumer);} }

?執(zhí)行代碼,可以在 rabbitmq 控制臺上看到隊列的消息已經(jīng)被消費了,并且可以看到 連接信息


2:工作隊列?Work Queues?

官網(wǎng)教程:RabbitMQ tutorial - Work Queues — RabbitMQ

例如:生產(chǎn)者一秒可以生產(chǎn) 一萬個消息,消費者一秒可以消費 一千個消息,這種情況如果只有一個消費者,消息就會堆積在隊列中。這時就需要部署多個消費者節(jié)點。

多個消費者負(fù)載均衡策略是 輪詢。

  • 兩個消費者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;// 第一個消費節(jié)點 public class Recv1 {// 隊列名稱private final static String QUEUE_NAME = "work_mq";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.err.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");/** 處理完消息后,手動確認(rèn) Ack* 參數(shù)1:消息標(biāo)簽* 參數(shù)2:是否批量 Ack*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 參數(shù)2:關(guān)閉自動 ack 確認(rèn)channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});} } import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;// 第二個消費節(jié)點 public class Recv2 {// 隊列名稱private final static String QUEUE_NAME = "work_mq";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.err.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者2: '" + message + "'");/** 處理完消息后,手動確認(rèn) Ack* 參數(shù)1:消息標(biāo)簽* 參數(shù)2:是否批量 Ack*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 參數(shù)2:關(guān)閉自動 ack 確認(rèn)channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});} }
  • 一個生產(chǎn)者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;// 發(fā)送消息 public class Send {// 隊列名稱private final static String QUEUE_NAME = "work_mq";public static void main(String[] argv) throws Exception {// 創(chuàng)建連接工廠對象ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");try (// 創(chuàng)建連接Connection connection = factory.newConnection();// 創(chuàng)建信道Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 批量發(fā)送消息for (int i = 0; i < 10; i++) {// 消息String message = "Hello Work! ___ " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.err.println(" [x] 生產(chǎn)者: '" + message + "'");}}} }

先啟動2個消費者監(jiān)聽隊列,再啟動生產(chǎn)者生產(chǎn)消息。可以看到消息被輪詢消費

設(shè)置 多節(jié)點消費者負(fù)載均衡策略為:公平策略 (能者多勞)

Channel channel = connection.createChannel(); // 消費者設(shè)置 qos為 1, 一個消費完后繼續(xù)消費 channel.basicQos(1);


3:Fanout 發(fā)布/訂閱 交換機(jī)模式

官網(wǎng)教程:https://www.rabbitmq.com/tutorials/tutorial-three-python.html

作用:生產(chǎn)者發(fā)布消息后,所有監(jiān)聽廣播類型指定交換機(jī)的的消費者都可以消費此消息。

  • 2個或多個消費者
import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第一個消費節(jié)點 public class Recv1 {// 交換機(jī)名稱private final static String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 廣播類型 的交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機(jī)進(jìn)行綁定,fanout交換機(jī)不需要routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調(diào)String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監(jiān)聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} } import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第二個消費節(jié)點 public class Recv2 {// 交換機(jī)名稱private final static String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 廣播類型 的交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機(jī)進(jìn)行綁定,fanout交換機(jī)不需要routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調(diào)String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者2: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監(jiān)聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} }
  • 一個生產(chǎn)者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;// 發(fā)送消息 public class Send {// 交換機(jī)名稱private final static String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] argv) throws Exception {// 創(chuàng)建連接工廠對象ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.189.75");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 綁定 廣播類型 的交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 消息String message = "廣播消息。。。";// 發(fā)送消息channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));}} }

4:Routing 路由 交換機(jī)模式

官網(wǎng)教程:https://www.rabbitmq.com/tutorials/tutorial-four-java.html

  • 隊列和交換機(jī)(Direct)綁定,需要指定一個routingKey(也叫Bingding Key)
  • 消息生產(chǎn)者發(fā)送消息給交換機(jī),需要指定routingKey
  • 交換機(jī)根據(jù)消息的routingKey,轉(zhuǎn)發(fā)給對應(yīng)的隊列

示例:日志收集系統(tǒng)

  • 一個隊列收集 error 日志
  • 一個隊列收集 全部 日志
  • 2個消費者,一個消費 error 消息,一個消費 全部 消息
import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第一個隊列,消費所有消息 public class Recv1 {// 交換機(jī)名稱private final static String EXCHANGE_NAME = "exchange_direct";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 直連類型 的交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機(jī)進(jìn)行綁定,direct交換機(jī) 需要指定 routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "errorRoutingKey");channel.queueBind(queueName, EXCHANGE_NAME, "infoRoutingKey");channel.queueBind(queueName, EXCHANGE_NAME, "debugRoutingKey");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調(diào)String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監(jiān)聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} } import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第二個隊列,消費error消息 public class Recv2 {// 交換機(jī)名稱private final static String EXCHANGE_NAME = "exchange_direct";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 直連類型 的交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機(jī)進(jìn)行綁定,direct交換機(jī) 需要指定 routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "errorRoutingKey");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調(diào)String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監(jiān)聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} }
  • ?一個生產(chǎn)者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;// 發(fā)送消息 public class Send {// 交換機(jī)名稱private final static String EXCHANGE_NAME = "exchange_direct";public static void main(String[] argv) throws Exception {// 創(chuàng)建連接工廠對象ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 綁定 廣播類型 的交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 消息String errorMsg = "error消息";String infoMsg = "info消息";String debugMsg = "debug消息";// 發(fā)送消息channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, errorMsg.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, infoMsg.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debugMsg.getBytes(StandardCharsets.UTF_8));}} }


5:Topic?主題 交換機(jī)模式

官網(wǎng)教程:RabbitMQ tutorial - Topics — RabbitMQ

  • Topic?可以實現(xiàn)發(fā)布訂閱模式Fanout 和 路由模式Direct 的功能,更加靈活,支持模式匹配,通配符等。
  • 交換機(jī)通過通配符進(jìn)行轉(zhuǎn)發(fā)到對應(yīng)的隊列,* 代表一個詞,#代表1個或多個詞,一般用#作為通配符居多,比如 #.order, 會匹配 info.order 、sys.error.order, 而 *.order ,只會匹配 info.order, 使用.進(jìn)行分割多個詞。
  • 注意:
    • 交換機(jī)和隊列綁定時用的binding使用通配符的路由健
    • 生產(chǎn)者發(fā)送消息時需要使用具體的路由健

示例:日志收集系統(tǒng)

  • 一個隊列收集 error 日志
  • 一個隊列收集 全部 日志
  • 2個消費者,一個消費 error 消息,一個消費 全部 消息
import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第一個隊列,消費所有消息 public class Recv1 {// 交換機(jī)名稱private final static String EXCHANGE_NAME = "exchange_topic";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 主題類型 的交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機(jī)進(jìn)行綁定,topic交換機(jī) 需要指定 routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "*.log.*");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調(diào)String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監(jiān)聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} } import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;// 第二個隊列,消費error消息 public class Recv2 {// 交換機(jī)名稱private final static String EXCHANGE_NAME = "exchange_topic";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 綁定 主題類型 的交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 獲取隊列String queueName = channel.queueDeclare().getQueue();// 隊列和交換機(jī)進(jìn)行綁定,topic交換機(jī) 需要指定 routingKeychannel.queueBind(queueName, EXCHANGE_NAME, "*.log.error");DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消費消息回調(diào)String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] 消費者1: '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 監(jiān)聽消息隊列channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});} }
  • ?一個生產(chǎn)者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;// 發(fā)送消息 public class Send {// 交換機(jī)名稱private final static String EXCHANGE_NAME = "exchange_topic";public static void main(String[] argv) throws Exception {// 創(chuàng)建連接工廠對象ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.31.71");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/dev");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 綁定 主題類型 的交換機(jī)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 消息String errorMsg = "訂單服務(wù) error消息";String infoMsg = "訂單服務(wù) info消息";String debugMsg = "用戶服務(wù) debug消息";// 發(fā)送消息channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, errorMsg.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, infoMsg.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "user.log.debug", null, debugMsg.getBytes(StandardCharsets.UTF_8));}} }


SpringBoot 整合 RabbitMQ

pom 文件中添加依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.5</version> </dependency>

application.yml 文件配置 rabbitmq

注意:1:guest 賬號只能連本機(jī)的mq服務(wù),實際開發(fā)的時候請創(chuàng)建一個新的賬號。2:rabbitmq集成在maven聚合組件中,然后這個組件被其他服務(wù)依賴以此達(dá)到整合mq的方式的時候,application 文件的后綴名要是 .properties (rabbitmq 讀取不到 yml 后綴的配置)

spring:rabbitmq:host: 192.168.31.71port: 5672username: guestpassword: guestvirtual-host: /dev

配置 交換機(jī)和隊列綁定的 Bean

import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;@Component public class RabbitmqConfig {// 自定義交換機(jī)名稱public static final String EXCHANGE_NAME = "order_exchange";// 自定義隊列名稱public static final String QUEUE_NAME = "order_queue";/*** 創(chuàng)建 topic 交換機(jī)*/@Bean(EXCHANGE_NAME) // 多個交換機(jī)時要指定交換機(jī)的Bean名稱public Exchange orderExchange() {return ExchangeBuilder// 指定 主題類型的交換機(jī) 名稱.topicExchange(EXCHANGE_NAME)// 是否持久化.durable(true).build();}/*** 創(chuàng)建持久化隊列*/@Bean(QUEUE_NAME) // 多個隊列時要指定隊里的Bean名稱public Queue orderQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}/*** 隊列和交換機(jī)綁定*/@Beanpublic Binding orderBinding(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {return BindingBuilder// 綁定的隊列.bind(queue)// 隊列綁定到 指定的交換機(jī).to(exchange)// 綁定的 routingKey.with("order.#")// 沒有其他參數(shù).noargs();} }

發(fā)送消息

import com.lxx.rabbitmq.config.RabbitmqConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {/** 發(fā)送消息* 參數(shù)1:要發(fā)送的交換機(jī)* 參數(shù)2:指定匹配的 routingKey* 參數(shù)3:要發(fā)送的消息*/rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "訂單 error消息");} }

消費者監(jiān)聽隊列

import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "order_queue") // 消費者監(jiān)聽這個隊列 public class OrderMqListener {/*** 監(jiān)聽到隊列有消息后,RabbitHandler 處理指定類型的消息** @param body 消息*/@RabbitHandlerpublic void messageHandler(String body, Message message) {System.err.println(message.getMessageProperties().getMessageId());System.err.println(message.getMessageProperties().getDeliveryTag());System.err.println(message.toString());System.err.println(" X 字符串消費者:" + body);}/*** 監(jiān)聽到隊列有消息后,RabbitHandler 處理指定類型的消息** @param body 消息*/@RabbitHandlerpublic void messageHandler(Integer body, Message message) {System.err.println(" X 數(shù)字消費者:" + body);} }

RabbitMQ 消息可靠性投遞

什么是消息的可靠投遞?

保證消息百分百發(fā)送到消息隊列中

如果確保消息的可靠投遞

消息生產(chǎn)者 需要接受到mq服務(wù)端 接受到消息的確認(rèn)應(yīng)答
完善的消息補(bǔ)償機(jī)制,發(fā)送失敗的消息可以再感知并二次處理

RabbitMQ消息投遞路徑:生產(chǎn)者->交換機(jī)->隊列->消費者

通過兩個的點控制消息的可靠性投遞

  • 生產(chǎn)者到交換機(jī)
    • 通過confirmCallback
  • 交換機(jī)到隊列
    • 通過returnCallback
  • 生產(chǎn)者到交換機(jī) 開啟ACK確認(rèn)可靠消息投遞

appliction.yml 配置

spring:rabbitmq:# 開啟消息 confirm 二次確認(rèn)publisher-confirm-type: correlated

消息監(jiān)聽代碼沒變化

發(fā)送消息,代碼如下:

import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.err.println("ConfirmCallback ================");System.err.println(" ================ correlationData = " + correlationData);System.err.println(" ================ ack = " + ack);System.err.println(" ================ cause = " + cause);if (ack) {System.out.println("發(fā)送成功");// 更新數(shù)據(jù)庫消息狀態(tài)為 成功} else {System.err.println("發(fā)送失敗");// 更新數(shù)據(jù)庫消息狀態(tài)為 失敗}}});// 發(fā)送消息之前 ,數(shù)據(jù)庫新增一條消息記錄狀態(tài),狀態(tài)是 發(fā)送 TODO// 發(fā)送消息rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "訂單 ConfirmCallback 消息");// 模擬投遞失敗 // rabbitTemplate.convertAndSend("不存在的交換機(jī)", "order.error", "訂單 ConfirmCallback消息");} }
  • 交換機(jī)到隊列 可靠消息投遞

appliction.yml 配置

spring:rabbitmq:# 開啟 交換機(jī)到 隊列publisher-returns: truetemplate:# 為true,則交換機(jī)處理消息到路由失敗后,則會返回給生產(chǎn)者。 或者代碼 rabbitTemplate.setMandatory(true) 是一樣的效果mandatory: true

消息監(jiān)聽代碼沒變化

發(fā)送消息,代碼如下:

package com.lxx.rabbitmq;import com.lxx.rabbitmq.config.RabbitmqConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {int replyCode = returned.getReplyCode();System.err.println("ReturnsCallback ================");System.err.println(" ================ code = " + replyCode);System.err.println(" ================ returned = " + returned.toString());}});// 發(fā)送消息rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "訂單 ReturnsCallback 消息");// 模擬投遞失敗 // rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "不存在的routingKey", "訂單 ReturnsCallback 消息");}}

RabbitMQ 消息確認(rèn)消費ACK

消費者從broker中監(jiān)聽到消息,要確保消息被正常處理。

RabbitMQ 消費者ACK介紹

  • 消費者從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊列中刪除
  • 消費者在處理消息出現(xiàn)了網(wǎng)絡(luò)不穩(wěn)定、服務(wù)器異常等現(xiàn)象,那么就不會有ACK反饋,RabbitMQ會認(rèn)為這個消息沒有正常消費,會將消息重新放入隊列中Ready
  • 只有當(dāng)消費者正確發(fā)送ACK反饋,RabbitMQ確認(rèn)收到后,消息才會從RabbitMQ服務(wù)器的數(shù)據(jù)中刪除。
  • 消息的ACK確認(rèn)機(jī)制默認(rèn)是開啟狀態(tài)自動ACK,消息如未被進(jìn)行ACK的消息確認(rèn)機(jī)制,這條消息被鎖定Unacked

  • 消息的可靠消費

appliction.yml 配置

spring:rabbitmq:listener:simple:#開啟手動確認(rèn)消息,如果消息重新入隊,進(jìn)行重試acknowledge-mode: manualretry:enabled: true #是否開啟消費者重試max-attempts: 5 #最大重試次數(shù)initial-interval: 5000ms #重試間隔時間(單位毫秒)max-interval: 1200000ms #重試最大時間間隔(單位毫秒)multiplier: 2 #間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設(shè)置的最大間隔時間

發(fā)送消息代碼沒變化

消息監(jiān)聽,代碼如下:

import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component @RabbitListener(queues = "order_queue") public class OrderMqListener {@RabbitHandlerpublic void messageHandler(String body, Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.err.println(deliveryTag);System.err.println(message.toString());System.err.println(" X 字符串消費者:" + body);// 告訴 broker 消息被正常消費 確認(rèn)ACKchannel.basicAck(deliveryTag, false);/** 告訴 broker,消息被消費后 拒絕確認(rèn)ACK* 參數(shù)一:deliveryTag,消息被投遞的次數(shù)* 參數(shù)二:是否批量 拒絕ACK,false 一條一條的拒絕ack* 參數(shù)上:是否重新投遞到隊列中*///channel.basicNack(deliveryTag, false, true); // 一次可以拒絕接收0個或多個//channel.basicReject(deliveryTag, true); // 一次只能拒絕接收一個消息} }

SpringBoot 配置 RabbitMQ 廣播 發(fā)布/訂閱

一個消息生產(chǎn)者,多個消費者節(jié)點,共同消費同一條消息

配置 廣播 交換機(jī)和隊列綁定的 Bean

import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitmqConfig {// 自定義交換機(jī)名稱public static final String EXCHANGE_NAME = "order_exchange";// 自定義隊列名稱public static final String QUEUE_NAME = "order_queue";/*** 創(chuàng)建 廣播 交換機(jī)*/@Bean(EXCHANGE_NAME)public FanoutExchange orderExchange() {return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();}/*** 創(chuàng)建持久化隊列*/@Bean(QUEUE_NAME)public Queue orderQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}/*** 隊列和交換機(jī)綁定*/@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange());} }

發(fā)送消息

import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {/** 發(fā)送消息* 參數(shù)1:要發(fā)送的交換機(jī)* 參數(shù)2:廣播不要指定路由key* 參數(shù)3:要發(fā)送的消息*/rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "", "廣播消息");} }

消費者監(jiān)聽,消費者可以多節(jié)點/集群部署,多節(jié)點可以消費同一條消息

import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component;@Component public class OrderMqListener {/*** 監(jiān)聽到隊列有消息后,RabbitHandler 處理指定類型的消息** @param body 消息*/@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(), // 注意:此處不能指定隊列名稱。 如果指定隊列只能被一個消費者節(jié)點消費exchange = @Exchange(value = RabbitmqConfig.EXCHANGE_NAME, type = ExchangeTypes.FANOUT)))public void messageHandler(String body) {System.err.println(" X 消息 :" + body);} }


RabbitMQ TTL死信隊列

什么是 TTL?

  • time to live 消息存活時間的意思。
  • 如果消息在存活時間內(nèi)未被消費,則會被清除。
  • RabbitMQ支持兩種ttl設(shè)置
    • 整個隊列進(jìn)行配置ttl(居多)
    • 單獨消息進(jìn)行配置ttl

什么是死信隊列?

用來存放 在存活時間內(nèi)未被消費消息 的隊列? ? ? ? // 過期消息不清楚,存放在此隊列?

什么是死信交換機(jī)?

Dead Letter Exchange(死信交換機(jī),縮寫:DLX)當(dāng)消息成為死信后,會被重新發(fā)送到另一個交換機(jī),這個交換機(jī)就是DLX死信交換機(jī)。

注意:死信隊列和死信交換機(jī) 與 普通隊列普通交換機(jī)沒區(qū)別。

消息什么情況下會成為死信消息?

  • 消費者拒收消息(basic.reject/basic.nack),并且沒有重新入隊 requeue=false
  • 消息在隊列中未被消費,且超過隊列或者消息本身的過期時間TTL(time-to-live)
  • 隊列的消息長度達(dá)到極限

成為死信的結(jié)果:如果該隊列綁定了死信交換機(jī),則消息會被死信交換機(jī)重新路由到死信隊列

如何設(shè)置消息的TTL存活時間?

方式一:隊列過期,對整個隊列消息設(shè)置統(tǒng)一過期時間

x-message-ttl? ? ? 單位:ms毫秒

方式二:消息過期,對單個消息進(jìn)行設(shè)置

expiration??? ? ? ? ?單位:ms毫秒

注意:兩者都配置的話,時間短的先觸發(fā)。


RabbitMQ 控制臺 操作 死信隊列綁定死信交換機(jī)

-- 代碼操作和普通操作沒有不同,這里學(xué)習(xí)控制面板的操作

創(chuàng)建死信交換機(jī)

創(chuàng)建死信隊列

死信隊列和死信交換機(jī)綁定

新建普通隊列,設(shè)置隊列的過期時間。指定普通隊列對應(yīng)的死信交換機(jī)

向普通隊列 里發(fā)送消息,過期后,消息路由到 死信隊列


RabbitMQ 延遲隊列

什么是延遲隊列?

一種帶有延遲功能的消息隊列,Producer 將消息發(fā)送到消息隊列 服務(wù)端,但并不期望這條消息立馬投遞,而是推遲到在當(dāng)前時間點之后的某一個時間投遞到 Consumer 進(jìn)行消費,該消息即定時消息。

定時消息的使用場景

  • 通過消息觸發(fā)一些定時任務(wù),比如在某一固定時間點向用戶發(fā)送提醒消息
  • 用戶登錄之后5分鐘給用戶做分類推送、用戶多少天未登錄給用戶做召回推送;
  • 消息生產(chǎn)和消費有時間窗口要求:比如在天貓電商交易中超時未支付關(guān)閉訂單的場景,在訂單創(chuàng)建時會發(fā)送一條 延時消息。這條消息將會在 30 分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應(yīng)的訂單是否已完成支付。 如支付未完成,則關(guān)閉訂單。如已完成支付則忽略

RabbitMQ 實現(xiàn)延遲消息

RabbitMQ本身是不支持延遲隊列的。需要結(jié)合死信隊列的特性,達(dá)到延遲消息的目的。

  • 消息生產(chǎn)者
    • 消息投遞到普通的交換機(jī)
    • 消息過期,進(jìn)入死信隊列
  • 消費消費者
    • 消費者監(jiān)聽死信交換機(jī)的隊列

SpringBoot 實現(xiàn)延遲隊列

配置 死信交換機(jī)和死信隊列,配置普通交換機(jī)和普通隊列,配置普通隊列綁定到死信交換機(jī)

import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;@Component public class OrderTimeoutCloseConfig {// ==================================================死信隊列 start========================================================/*** 死信交換機(jī),訂單超時關(guān)閉*/public static final String ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE = "order_timeout_close_dead_exchange";/*** 死信隊列,訂單超時關(guān)閉*/public static final String ORDER_TIMEOUT_CLOSE_DEAD_QUEUE = "order_timeout_close_dead_queue";/*** 進(jìn)入死信隊列的路由key,訂單超時關(guān)閉*/public static final String ORDER_TIMEOUT_CLOSE_ROUTING_KEY = "order_timeout_close_routing_key";/*** 創(chuàng)建 死信 交換機(jī)*/@Bean(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE)public TopicExchange orderTimeoutCloseDeadExchange() {return ExchangeBuilder.topicExchange(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE).durable(true).build();}/*** 創(chuàng)建 死信 隊列*/@Bean(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE)public Queue orderTimeoutCloseDeadQueue() {return QueueBuilder.durable(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE).build();}/*** 死信隊列和死信交換機(jī)綁定*/@Beanpublic Binding deadOrderTimeoutBinding(@Qualifier(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE) Queue queue, @Qualifier(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_TIMEOUT_CLOSE_ROUTING_KEY).noargs();}/** 死信隊列和死信交換機(jī)綁定,方式二*/ // @Bean // public Binding deadOrderTimeoutBinding() { // return new Binding( // ORDER_TIMEOUT_CLOSE_DEAD_QUEUE, // Binding.DestinationType.QUEUE, // ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE, // ORDER_TIMEOUT_CLOSE_ROUTING_KEY, // null // ); // }// ==================================================死信隊列 end========================================================// ==================================================普通隊列 start========================================================/*** 普通交換機(jī),訂單超時,用于進(jìn)入死信隊列*/public static final String ORDER_TIMEOUT_INTO_DEAD_EXCHANGE = "order_timeout_into_dead_exchange";/*** 普通隊列,訂單超時,用于進(jìn)入死信隊列*/public static final String ORDER_TIMEOUT_INTO_DEAD_QUEUE = "order_timeout_into_dead_queue";/*** 進(jìn)入普通隊列的路由key,訂單超時關(guān)閉*/public static final String ORDER_TIMEOUT_INTO_ROUTING_KEY = "order_timeout_into_routing_key";/*** 創(chuàng)建 普通 交換機(jī)*/@Bean(ORDER_TIMEOUT_INTO_DEAD_EXCHANGE)public TopicExchange orderTimeoutIntoDeadExchange() {return ExchangeBuilder.topicExchange(ORDER_TIMEOUT_INTO_DEAD_EXCHANGE).durable(true).build();}/*** 創(chuàng)建 普通 隊列,普通隊列和死信隊列進(jìn)行綁定*/@Bean(ORDER_TIMEOUT_INTO_DEAD_QUEUE)public Queue orderTimeoutIntoDeadQueue() {/* // 方式一Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE); // 要綁定的死信交換機(jī)args.put("x-dead-letter-routing-key", ORDER_TIMEOUT_CLOSE_ROUTING_KEY); // 要綁定的死信 binding keyargs.put("x-message-ttl", 10000); // 普通隊列的消息過期時間,過期后 消息進(jìn)入死信隊列,單位:ms毫秒return QueueBuilder.durable(ORDER_TIMEOUT_INTO_DEAD_QUEUE).withArguments(args).build();*/// 方式二return QueueBuilder.durable(ORDER_TIMEOUT_INTO_DEAD_QUEUE)// 要綁定的死信交換機(jī).deadLetterExchange(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE)// 要綁定的死信 binding key.deadLetterRoutingKey(ORDER_TIMEOUT_CLOSE_ROUTING_KEY)// 普通隊列的消息過期時間,過期后 消息進(jìn)入死信隊列,單位:ms毫秒.ttl(10000) // 這里測試指定10秒,正式情況可以指定30分鐘.build();}/** 普通隊列和普通交換機(jī)綁定*/@Beanpublic Binding orderTimeoutBinding() {return new Binding(ORDER_TIMEOUT_INTO_DEAD_QUEUE,Binding.DestinationType.QUEUE,ORDER_TIMEOUT_INTO_DEAD_EXCHANGE,ORDER_TIMEOUT_INTO_ROUTING_KEY,null);}// ==================================================普通隊列 end======================================================== }

消費者,監(jiān)聽死信隊列

import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component public class OrderTimeoutCloseMQListener {@RabbitHandler@RabbitListener(queues = OrderTimeoutCloseConfig.ORDER_TIMEOUT_CLOSE_DEAD_QUEUE) // 監(jiān)聽死信隊列public void messageHandler(String body, Message message, Channel channel) throws IOException {// 1:監(jiān)聽到 訂單消息,拿到訂單idSystem.err.println(" X 監(jiān)聽死信隊列收到消息 body = " + body);// 2:用訂單id,查詢數(shù)據(jù)庫訂單信息,如果訂單狀態(tài)是 已支付,這里不做操作// 3:如果訂單狀態(tài)是 未支付,把訂單狀態(tài)設(shè)置成 超時未支付channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} }

生產(chǎn)者,向普通隊列發(fā)送消息

import net.minidev.json.JSONObject; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;import java.util.HashMap; import java.util.Map;@SpringBootTest class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 模擬下單成功*/@Testpublic void testBuy() {// 1:用戶下單把訂單信息存入數(shù)據(jù)庫,返回訂單id// 2:發(fā)送訂單id到 普通消息隊列Map<String, String> map = new HashMap<>();map.put("orderId", "123456789");rabbitTemplate.convertAndSend(OrderTimeoutCloseConfig.ORDER_TIMEOUT_INTO_DEAD_EXCHANGE, OrderTimeoutCloseConfig.ORDER_TIMEOUT_INTO_ROUTING_KEY, JSONObject.toJSONString(map));} }

RabbitMQ 的集群環(huán)境

RabbitMQ 普通集群模式的介紹

????????集群有 3 個節(jié)點,node1、node2、node3,三個節(jié)點有相同的元數(shù)據(jù)(交換機(jī)、隊列結(jié)構(gòu)),一個消息只存在一個節(jié)點上,不在其他節(jié)點同時存在。

例如:

????????A消息,存在node1節(jié)點上。A消息在node2、node3節(jié)點上不存在。

消費者監(jiān)聽node1節(jié)點可以直接消費到 A消息。假如消費者監(jiān)聽的是node2節(jié)點,那么rabbitmq 會把A消息被消費的時候才從 node1 節(jié)點取出放入到node2節(jié)點,然后node2節(jié)點再把消息轉(zhuǎn)發(fā)給消費者。

問題:

? ? ? ? 1:假如node1節(jié)點故障,那么node2無法獲取node1節(jié)點上未被消費的消息。

? ? ? ? 2:如果node1持久化后發(fā)生故障,消息需要等到node1恢復(fù)正常后才可以正常消費。

? ? ? ? 3:如果node1未做持久化發(fā)生故障,那么node1節(jié)點上的消息將會丟失。

應(yīng)用場景:

? ? ? ? 該模式適用于消息無需持久化的場景,例如日志傳輸隊列。

注意:集群需要保證每個節(jié)點有相同的token令牌。

消息持久化


RabbitMQ 搭建普通集群環(huán)境

1:準(zhǔn)備節(jié)點環(huán)境

3個節(jié)點的訪問 web控制臺訪問端口分別是:15671、15672、15673

準(zhǔn)備3個目錄,用于放 3個節(jié)點

/usr/local/rabbitmq/1 /usr/local/rabbitmq/2 /usr/local/rabbitmq/3

創(chuàng)建 節(jié)點1

sudo docker run -d \ --name rabbitmq_1 \ -h rabbitmq_host1 \ -p 4361:4369 \ -p 5671:5672 \ -p 15671:15672 \ -p 25671:25672 \ -e RABBITMQ_NODENAME='rabbit' \ -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \ --privileged=true \ -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq/ \ rabbitmq:management

創(chuàng)建 節(jié)點2

sudo docker run -d \ --name rabbitmq_2 \ -h rabbitmq_host2 \ -p 4362:4369 \ -p 5672:5672 \ -p 15672:15672 \ -p 25672:25672 \ -e RABBITMQ_NODENAME='rabbit' \ -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \ --link rabbitmq_1:rabbitmq_host1 \ --privileged=true \ -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq/ \ rabbitmq:management

創(chuàng)建 節(jié)點3

sudo docker run -d \ --name rabbitmq_3 \ -h rabbitmq_host3 \ -p 4363:4369 \ -p 5673:5672 \ -p 15673:15672 \ -p 25673:25672 \ -e RABBITMQ_NODENAME='rabbit' \ -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \ --link rabbitmq_1:rabbitmq_host1 \ --link rabbitmq_2:rabbitmq_host2 \ --privileged=true \ -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq/ \ rabbitmq:management

參數(shù)說明:

  • -e RABBITMQ_ERLANG_COOKIE:指定集群節(jié)點的cookie,節(jié)點的cookie要配置相同。?
  • --link:容器互聯(lián),讓容器之前可以相互ping通
  • --privileged:讓容器內(nèi)部的用戶有root權(quán)限,不然用戶對容器內(nèi)部的文件沒有操作權(quán)限permission denied
  • -v :讓物理機(jī)路徑與容器里的路徑映射,容器里的路徑的數(shù)據(jù)會存儲在物理機(jī)上

節(jié)點完成后,可以訪問 http://ip:端口,查看2個節(jié)點是否創(chuàng)建成功。

如果容器啟動失敗,可以使用 docker logs 容器id 命令查看啟動日志。


2:節(jié)點配置成集群

配置之前查看節(jié)點狀態(tài),進(jìn)入容器內(nèi),使用命令:rabbitmqctl cluster_status

配置節(jié)點1

docker exec -it 節(jié)點1的容器名稱 /bin/bash? // 進(jìn)入啟動的docker容器內(nèi) rabbitmqctl stop_app? ? ? ? ? ? ? ? ? ? ? // 停止 rabbitmq 服務(wù),rabbitmqctl是rabbitmq的操作命令? rabbitmqctl reset // 重置 rabbitmq rabbitmqctl start_app // 啟動 rabbitmq 服務(wù) exit // 退出 docker 容器

配置節(jié)點2,加入集群

docker exec -it 節(jié)點2的容器名稱 /bin/bash? ? ? ? // 進(jìn)入啟動的docker容器內(nèi) rabbitmqctl stop_app? ? ? ? ? ? ? ? ? ? ? // 停止 rabbitmq 服務(wù),rabbitmqctl是rabbitmq的操作命令? rabbitmqctl reset // 重置 rabbitmq rabbitmqctl join_cluster --ram 節(jié)點1的hostname // 加入集群 --ram 參數(shù)是以內(nèi)存的方式加入,不帶此參數(shù)默認(rèn)是磁盤的方式,節(jié)點1的hostname是:rabbit@rabbitmq_host1 rabbitmqctl start_app // 啟動 rabbitmq 服務(wù) exit // 退出 docker 容器

配置節(jié)點3,加入集群

docker exec -it 節(jié)點3的容器名稱 /bin/bash? ? ? ? // 進(jìn)入啟動的docker容器內(nèi) rabbitmqctl stop_app? ? ? ? ? ? ? ? ? ? ? // 停止 rabbitmq 服務(wù),rabbitmqctl是rabbitmq的操作命令? rabbitmqctl reset // 重置 rabbitmq rabbitmqctl join_cluster --ram 節(jié)點1的hostname // 加入集群 --ram 參數(shù)是以內(nèi)存的方式加入,不帶此參數(shù)默認(rèn)是磁盤的方式,節(jié)點1的hostname是:rabbit@rabbitmq_host1 rabbitmqctl start_app // 啟動 rabbitmq 服務(wù) exit // 退出 docker 容器

配置完成之后,可以在容器內(nèi)使用命令:rabbitmqctl cluster_status,查看集群狀態(tài),可以看到集群現(xiàn)在有3個節(jié)點在運行。1個磁盤節(jié)點,2個內(nèi)存節(jié)點

訪問網(wǎng)頁,可以看到有3個節(jié)點

消息隊列和交換機(jī)在所有節(jié)點上存在。消息只在自己的節(jié)點上存在,當(dāng)一個節(jié)點宕機(jī)后,宕機(jī)節(jié)點上的消息無法被消費(消息不可用)


3:SpringBoot 整合 RabbitMQ 普通集群

application.yml 文件配置 rabbitmq 集群地址,其他配置不變

spring:rabbitmq:listener:simple:acknowledge-mode: manualpublisher-returns: truetemplate:mandatory: truepublisher-confirm-type: correlated # host: 192.168.189.75 # port: 5672virtual-host: /devpassword: guestusername: guest### 配置節(jié)點地址addresses: 192.168.189.75:5671,192.168.189.75:5672,192.168.189.75:5673

代碼操作,和上面的單節(jié)點的一樣正常的生產(chǎn)監(jiān)聽消息就行了,這里就不重復(fù)貼代碼了


RabbitMQ 鏡像集群模式的介紹(推薦)

????????隊列做成鏡像隊列,鏡像隊列中的消息在各個節(jié)點之間同步(A消息在各個節(jié)點中都存在)。

好處:
?? ?實現(xiàn)了高可用,部分節(jié)點宕機(jī)后,不影響消息的正常消費。
?? ?鏡像集群模式可以保證消息100%不丟失。適用于高可用要求高的需求,例如訂單服務(wù)。

缺點:
?? ?消息數(shù)量過多,大量的消息同步會加大網(wǎng)絡(luò)寬帶的消耗。節(jié)點越多服務(wù)器性能受影響越大
?? ?
注意:集群需要保證每個節(jié)點有相同的token令牌。

策略policy介紹

policy是用來控制和修改集群的vhost隊列和Exchange復(fù)制行為。哪些Exchange或者queue的數(shù)據(jù)需要復(fù)制、同步,以及如何復(fù)制同步。

  • 創(chuàng)建一個policy策略

路徑:進(jìn)入rabbitmq控制臺 -> Admin -> Policies -> Add / update a policy

參數(shù)介紹:

Name:自定義策略名稱,建議不要使用空格

Pattern:用于匹配隊列/交換機(jī)的正則表達(dá)式,^ 符號,表示匹配所有

Apply to:應(yīng)用到交換機(jī)和隊列

Priority:優(yōu)先級。一個隊列/交換機(jī)只會有一個生效的 Policy,如果匹配多個 Policy,則優(yōu)先級數(shù)值最大的 Policy 生效。

Definition:JSON格式的一組鍵值對,表示設(shè)置的屬性,會被注入匹配隊列/交換機(jī)

  • ha-mode:
    • all:表示在集群中所有的節(jié)點上進(jìn)行鏡像同步(一般都用這個參數(shù))
    • exactly:表示在指定個數(shù)的節(jié)點上進(jìn)行鏡像同步,節(jié)點的個數(shù)由ha-params指定
    • nodes:表示在指定的節(jié)點上進(jìn)行鏡像同步,節(jié)點名稱通過ha-params指定
  • ha-sync-mode:鏡像消息同步方式
    • automatic: 自動(默認(rèn))
    • manually:手動

policy策略創(chuàng)建完成后,鏡像隊列就配置成功了。可以看到隊列發(fā)生了如下變化

鏡像集群注意點:

  • 集群啟動順序:先啟動磁盤節(jié)點 => 再啟動內(nèi)存節(jié)點
  • 集群關(guān)閉順序:先關(guān)閉內(nèi)存節(jié)點 => 再關(guān)閉磁盤節(jié)點
  • 最后關(guān)閉必須是磁盤節(jié)點,否則容易造成集群啟動失敗、數(shù)據(jù)丟失等異常情況

可以看到節(jié)點宕掉一個后,消息還是存在的

總結(jié)

以上是生活随笔為你收集整理的RabbitMQ教程大全看这一篇就够了-java版本的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

91看片成人 | 亚洲精品自在在线观看 | 久久精品国产成人 | 色网站黄| 欧美国产日韩一区二区三区 | 啪嗒啪嗒免费观看完整版 | 国产成人精品一区二区在线 | 久久久电影网站 | 激情伊人五月天 | 蜜臀aⅴ国产精品久久久国产 | 欧洲精品在线视频 | 欧美日本三级 | 天天躁日日躁狠狠躁 | .精品久久久麻豆国产精品 亚洲va欧美 | 成人午夜黄色 | 黄色三级免费 | 日韩免费视频网站 | 二区三区精品 | 视频在线播放国产 | 中文字幕 国产精品 | 精品国产乱码久久久久久1区二区 | 免费aa大片 | 97人人模人人爽人人喊网 | 国产黄色片久久 | 国产高清久久久久 | 亚洲涩涩网 | 人人看人人做人人澡 | 韩国av一区二区三区在线观看 | av一本久道久久波多野结衣 | 九九热在线播放 | 久久与婷婷 | 西西444www大胆高清视频 | 中文字幕在线观看第一页 | 开心丁香婷婷深爱五月 | 成人黄色在线视频 | 91看国产 | 99情趣网视频 | 免费日韩一区二区 | 美女福利视频网 | 亚洲涩涩涩涩涩涩 | 月丁香婷婷| 国产剧情一区二区在线观看 | 婷婷丁香视频 | 91一区二区在线 | 99精品国产福利在线观看免费 | 一区二区三区免费网站 | 免费婷婷 | 久九视频| 91精品国产91久久久久 | 久草在线中文视频 | 91网页版在线观看 | 欧美污污网站 | 国产 欧美 日本 | 国产又粗又猛又爽又黄的视频免费 | 91精品毛片 | 亚洲成人精品 | 人人狠狠综合久久亚洲婷 | 西西4444www大胆视频 | 久久视频这里有久久精品视频11 | 日韩视频在线观看视频 | 精品日本视频 | 成人午夜电影久久影院 | 超碰人人草 | 国产日韩在线视频 | 欧美日韩精品免费观看视频 | 人人要人人澡人人爽人人dvd | 黄色av电影在线观看 | 91在线公开视频 | 国产黄色一级片在线 | av网站有哪些 | 2019中文在线观看 | 成人免费观看完整版电影 | 国产女人40精品一区毛片视频 | 999久久久久久久久 69av视频在线观看 | 91精品国产九九九久久久亚洲 | 日韩欧美精品在线视频 | 成人a视频片观看免费 | 9999亚洲| 国产福利一区二区在线 | 国产伦理一区二区 | 四虎最新入口 | 成 人 黄 色 视频播放1 | 亚洲精品在线电影 | 97在线观看视频免费 | 日韩在线 | 粉嫩av一区二区三区四区在线观看 | 亚洲激情 在线 | 久久精品中文视频 | 91av视频网站 | 久久久久久久久久久久久影院 | 久久成熟 | 精品一区二区精品 | 久草在在线视频 | 91视频 - 114av| 久久久久久国产精品久久 | 精品一区二区视频 | 亚洲精品乱码 | 91久久国产自产拍夜夜嗨 | 国产一级黄 | 久久综合婷婷综合 | 麻豆视频免费播放 | 久久午夜色播影院免费高清 | 国产精品毛片一区 | 午夜精品视频免费在线观看 | 开心色停停 | 欧美日韩国产精品爽爽 | 午夜精品久久久久99热app | 免费欧美精品 | 久久在线免费 | 国产成人专区 | 96香蕉视频 | av大全在线看| 五月婷网 | 国产一级黄色免费看 | 日本三级在线观看中文字 | 国产精品久久久亚洲 | 韩国一区二区三区在线观看 | 一区二区三区免费在线 | 日韩免费精品 | 五月婷婷丁香综合 | 国产手机在线观看视频 | 911香蕉| 久热色超碰| 国产黄色在线网站 | 天天操夜| 狠狠狠狠狠狠 | 中文伊人 | 91夫妻自拍 | 亚洲午夜电影网 | 狠狠色噜噜狠狠狠狠2021天天 | 欧美色图另类 | 亚洲精品国产区 | 在线观看成人福利 | 国产短视频在线播放 | 国产色就色 | 久久久亚洲网站 | 狠狠色丁香久久婷婷综合五月 | 欧美地下肉体性派对 | 一级黄色大片在线观看 | 亚洲a网 | 日日日操操 | 久久国产精品一区二区 | 国产精品久久一区二区三区, | 91丨精品丨蝌蚪丨白丝jk | 久久中文字幕导航 | 国产免费作爱视频 | 成人av电影网址 | 国产免费专区 | 国产麻豆视频免费观看 | 三级av在线免费观看 | 97精品超碰一区二区三区 | 久色婷婷| 国产精品网站一区二区三区 | 国产精品视频专区 | 在线看片91 | 五月婷婷天堂 | 国产精品久久久久久久妇 | 亚洲日本一区二区在线 | 99久热在线精品视频观看 | 成人视屏免费看 | 婷婷久久久久 | 国产69熟| 国产乱对白刺激视频在线观看女王 | 二区视频在线 | 欧美日韩性视频在线 | 青青河边草免费 | 国产天天综合 | 精品视频97| 国产精品一区二区 91 | 国产精品入口麻豆 | 日韩精品视频第一页 | 亚洲精品视频网站在线观看 | 在线观看黄污 | 中文国产字幕 | 免费在线观看成人av | 中国一区二区视频 | 国产一区二区三区在线 | 国产一级二级在线观看 | 国产亚洲精品久久久久久久久久 | 国产一区二区三区久久久 | 色一级片 | 久久人人爽 | 中文字幕一区二区三区久久蜜桃 | 狠狠狠狠狠狠天天爱 | 你操综合| 国产成人精品午夜在线播放 | 人人爽人人爽人人片av免 | 99精品视频在线免费观看 | 日韩视频www | 超碰97.com | 欧美精品久久久久久久久久 | 国产不卡在线观看视频 | 中文国产成人精品久久一 | 成人小视频在线免费观看 | 欧美午夜a| 最新av中文字幕 | 99av国产精品欲麻豆 | 国产偷v国产偷∨精品视频 在线草 | 久久伊人综合 | 在线播放亚洲 | 精品国产视频一区 | 欧美一二三区在线观看 | 免费亚洲黄色 | www91在线观看 | 免费精品人在线二线三线 | 久久国产精品免费一区 | 91高清免费 | 亚洲在线视频免费 | av蜜桃在线 | 99精品色 | 中文字幕在线观看的网站 | 久久最新 | 国产在线污 | 国产高清一级 | 在线观看黄a| 狠狠狠色丁香婷婷综合激情 | 中文久久精品 | 欧美孕妇视频 | 亚洲专区免费观看 | 伊人影院99 | 91av手机在线观看 | 在线影院 国内精品 | 国产精品美女久久久免费 | 黄污视频网站大全 | 国产99久久久国产精品 | 欧美久久精品 | 成人av动漫在线观看 | 亚州av免费 | 国产福利a | 成年人在线播放视频 | 国产成人精品亚洲日本在线观看 | 国产18精品乱码免费看 | 亚洲精品在线网站 | 国产成人99av超碰超爽 | 一区二区三区免费在线观看视频 | 91av社区| 99性视频| 人人爽人人爽人人爽人人爽 | 人人澡av | 国产精品久久久久久久久久三级 | 欧美-第1页-屁屁影院 | 偷拍精品一区二区三区 | 国产资源在线播放 | 国产精品mm | 成人一级免费视频 | 狠狠操夜夜 | 久久久久久久久久亚洲精品 | 在线观看成人一级片 | 国产成人福利在线观看 | 在线观看亚洲成人 | 国产精品网站 | 久久免费视频4 | aav在线| 久草在线中文视频 | 国产亚洲精品日韩在线tv黄 | 日韩午夜三级 | 亚洲精品在线观看av | 热久精品 | 国产在线 一区二区三区 | 色综合综合 | 欧美91精品久久久久国产性生爱 | 久久久久久久久久国产精品 | 国产精品久久久久久久久搜平片 | 午夜视频在线瓜伦 | 色综合欧洲 | 久久精品电影网 | 91在线视频免费 | 免费网站黄 | 有码中文字幕在线观看 | 麻豆手机在线 | 四虎影视成人 | 欧美日韩性视频在线 | 国产精品中文字幕在线 | 国产一区二区在线播放 | 一级免费观看 | 黄色av成人在线 | 亚洲国产69 | 在线免费看片 | 天天躁日日躁狠狠躁av中文 | 最近中文字幕免费大全 | 亚洲 中文 在线 精品 | 97免费在线观看 | 欧美日韩国产伦理 | 青青草在久久免费久久免费 | 香蕉影院在线 | 蜜臀av性久久久久蜜臀aⅴ流畅 | 天天射天天干天天插 | 一区二区三区高清在线观看 | 亚洲精品9| 911国产在线观看 | 免费看污黄网站 | 亚洲成人资源 | 免费在线观看一级片 | 在线观看网站你懂的 | 最新成人av| 午夜久久影院 | 人人爽人人爽人人片av免 | 免费h漫在线观看 | 久久免费的精品国产v∧ | 人人精久| 国产一级a毛片视频爆浆 | 精品成人久久 | 日日激情 | 国产又黄又硬又爽 | 欧美坐爱视频 | 天天色天天射天天干 | 黄色小说在线观看视频 | 婷婷亚洲综合五月天小说 | 天天操天天干天天爽 | 4438全国亚洲精品观看视频 | 黄色小网站在线观看 | 精品国产一区二区三区不卡 | av在线网站免费观看 | 欧美巨乳网 | 久久黄色免费视频 | av一级片| 九九热在线播放 | 国产成人精品在线播放 | 丁香花在线视频观看免费 | 亚洲精品一区二区在线观看 | 欧美怡红院视频 | 婷婷色视频 | 97热久久免费频精品99 | 色天天综合网 | 亚洲美女在线一区 | 国产精品久久久久久电影 | 国产成人精品免高潮在线观看 | 国产成人免费在线 | 欧美成人按摩 | 日日干 天天干 | 黄色大片网 | 视频成人免费 | 久久综合激情 | 97超碰人人模人人人爽人人爱 | jizz999| 日韩午夜在线 | 日韩在线精品视频 | 亚洲国产理论片 | 中文字幕一区二区三区在线播放 | 国产福利91精品一区 | 中文字幕亚洲欧美日韩2019 | 日韩精品资源 | 国产成人精品一二三区 | 麻豆小视频在线观看 | 中文字幕高清有码 | 中文一区二区三区在线观看 | 亚洲精品美女在线观看播放 | 国产精品视频不卡 | 伊人黄色网 | 夜夜视频 | 天天透天天插 | 久久人人97超碰精品888 | 亚洲高清av在线 | 久草久| 亚洲国产成人在线播放 | 精品毛片在线 | 中文字幕精品在线 | 亚洲片在线资源 | 精品一区精品二区 | 日韩久久久久久 | 久久99久久99精品免观看软件 | av观看免费在线 | 亚洲婷婷丁香 | 成全在线视频免费观看 | 看国产黄色片 | 99 视频 高清 | 香蕉影视在线观看 | 国产精品毛片久久久 | 久久久久久久久久久黄色 | 毛片网免费 | 一区二区视频在线播放 | 亚洲电影毛片 | 日韩一区二区三区高清在线观看 | 91丝袜美腿 | 波多野结衣小视频 | 国产特级毛片aaaaaa | 色婷婷亚洲婷婷 | 在线观看精品一区 | 免费黄色在线播放 | 日韩一区在线播放 | 在线黄色av电影 | 狠狠操狠狠操 | 欧美一级电影 | 91丨九色丨国产女 | 三级av免费 | 黄色av一级 | 天天操天天干天天玩 | 亚洲国产视频a | 麻豆成人精品视频 | 黄网站色视频免费观看 | 国产这里只有精品 | 日韩av影片在线观看 | 五月婷婷黄色 | 黄色a在线观看 | 欧美激情视频一区 | 在线免费色 | 免费一级片在线 | 天天天干天天天操 | 久久久色 | 国产精品爽爽久久久久久蜜臀 | 免费精品在线观看 | 日韩精品免费在线播放 | 日韩av高清在线观看 | 97看片 | 欧美成人理伦片 | 97成人在线观看视频 | 久久的色 | 久久久久久久99精品免费观看 | 高清中文字幕av | 久久综合给合久久狠狠色 | 欧美最新另类人妖 | 免费观看久久 | 欧美 日韩 国产 中文字幕 | 欧美疯狂性受xxxxx另类 | 91丨九色丨国产在线观看 | 黄色毛片电影 | 81精品国产乱码久久久久久 | 国产精品欧美激情在线观看 | 精品国产99 | 欧美日韩一区二区在线观看 | 亚洲欧洲精品久久 | 中文字幕高清在线 | 在线观看视频中文字幕 | 中文字幕亚洲不卡 | 日韩精品不卡在线 | 一级性av| av丁香花 | 精品国偷自产国产一区 | 国产在线超碰 | 国产伦精品一区二区三区… | 欧美视频网址 | 久久精品欧美 | 亚洲电影av在线 | 日本中文字幕一二区观 | 久久国产精品久久精品国产演员表 | 国产精品久久久久久久久久久久久久 | 久草剧场 | a√资源在线 | 中文字幕 成人 | 最新中文字幕 | 美女免费黄视频网站 | 人人爽人人香蕉 | 人人搞人人干 | 欧美日韩一区二区三区免费视频 | 国产最新在线观看 | 天天爽天天碰狠狠添 | 91桃色在线免费观看 | 国产一区福利在线 | 国产精品午夜8888 | 国产日韩在线视频 | 久久综合影音 | 国产老熟 | 欧美 亚洲 另类 激情 另类 | 天堂av官网 | 国产高清视频 | 一本之道乱码区 | 成人在线电影观看 | 天天干天天拍天天操天天拍 | 99久久精品免费看国产四区 | 久久五月精品 | 亚洲综合一区二区精品导航 | 超碰在线免费97 | 久久午夜免费观看 | 在线观看久久 | 久久视频在线 | 国产视频18 | sesese图片| 91黄色成人 | 日韩三级.com| 免费高清看电视网站 | 国产视频一区二区在线 | 国产 一区二区三区 在线 | av片中文字幕| 欧美日韩一区二区三区在线观看视频 | 92中文资源在线 | 探花视频在线观看免费版 | 欧美另类交在线观看 | 国产一区二区视频在线 | 久久久综合九色合综国产精品 | 精品久久久免费视频 | 97超碰中文字幕 | 日日麻批40分钟视频免费观看 | 六月婷婷网| 免费毛片一区二区三区久久久 | 欧美一区日韩精品 | 久久国产精品小视频 | 一级一片免费视频 | 天天操天天射天天插 | 日韩高清激情 | 色婷婷综合视频在线观看 | 欧美激情综合色 | 久久成人18免费网站 | 91手机电视 | 中文字幕首页 | 久草久草久草久草 | 色噜噜狠狠狠狠色综合久不 | 国产精品一区二区av日韩在线 | 国产一级特黄毛片在线毛片 | 亚洲视频资源在线 | 播五月综合 | 一区二区三区日韩精品 | 一区三区视频在线观看 | 91网页版免费观看 | 精品夜夜嗨av一区二区三区 | 99久久日韩精品免费热麻豆美女 | 国产免费国产 | 韩日精品在线观看 | 国产午夜在线观看视频 | 黄色成人影院 | 91麻豆精品国产91久久久更新时间 | 亚洲精品免费视频 | 日韩av黄 | 91精品999 | 欧美一区二区在线免费看 | 最新中文字幕在线播放 | 91九色在线观看视频 | 国产精品第一视频 | 中文字幕国产在线 | 97视频网址 | 国产精品 中文字幕 亚洲 欧美 | 日韩视频免费观看高清完整版在线 | 一级成人免费 | 精品欧美一区二区在线观看 | 欧美天天综合网 | 国产精品视频免费观看 | 成人久久久久久久久久 | 九九精品视频在线 | 久久免费视频4 | 深夜视频久久 | 日韩欧美视频在线观看免费 | 日韩免费一二三区 | 免费亚洲精品 | 国产精品日韩在线观看 | 免费一级片视频 | 欧美a免费 | 99欧美| 亚洲精品国产精品久久99 | 五月天婷亚洲天综合网精品偷 | 国产一区二区三区免费观看视频 | 99久久久国产精品 | 国产精品久久久久婷婷二区次 | 国产久视频| 精品久久五月天 | 久久久久麻豆v国产 | 国产精品自产拍在线观看网站 | 四虎成人精品永久免费av | 免费观看av网站 | 国产精品大片免费观看 | av理论电影 | 激情喷水| 久久躁日日躁aaaaxxxx | 91网页版免费观看 | 日日夜夜91 | 亚洲全部视频 | 91看片看淫黄大片 | 91免费在线播放 | 在线免费观看视频a | av官网| 成年人黄色免费网站 | 日韩成人中文字幕 | 美女黄频网站 | 国产 日韩 在线 亚洲 字幕 中文 | 国产福利av | 日韩一级成人av | 久久av免费观看 | 国产一区二区精品91 | 国产精品成人aaaaa网站 | 国产一区二区精品久久 | 在线播放精品一区二区三区 | 黄av在线 | 人人盈棋牌 | 精品国产aⅴ一区二区三区 在线直播av | 欧美极度另类 | 欧美精品你懂的 | 日韩免费一区二区在线观看 | 亚洲视频分类 | 免费观看av | 免费看一及片 | 热久久最新地址 | 在线电影日韩 | 欧美亚洲国产精品久久高清浪潮 | 久久久久久久久网站 | 久久久综合电影 | 亚洲精品美女在线 | 久久香蕉电影网 | 麻豆一区二区三区视频 | a天堂一码二码专区 | 亚洲国产资源 | 色婷婷福利视频 | 久久婷婷精品视频 | 成年免费在线视频 | 综合在线观看 | 欧美一级免费黄色片 | 亚洲国产中文字幕在线观看 | 国产日产欧美在线观看 | 美女视频黄网站 | 三级黄色免费片 | 国产日韩欧美精品在线观看 | www.av小说| 特黄色大片 | a级国产乱理伦片在线观看 亚洲3级 | 激情开心站 | 在线亚洲日本 | 欧美色图另类 | 在线观看国产一区 | 久久99最新地址 | 日韩免费在线观看视频 | 网站免费黄| 99这里只有 | 日韩不卡高清视频 | 亚洲一区日韩精品 | 亚洲手机天堂 | 婷婷亚洲综合五月天小说 | 精品视频久久 | 91麻豆精品国产91久久久久久久久 | 五月婷婷久 | 91色在线观看视频 | 日韩一区视频在线 | 深夜国产福利 | 国产伦理精品一区二区 | 国产精品久久久久久久久久久久午夜 | 最新av在线网站 | 在线播放 日韩专区 | 亚洲精品免费在线观看视频 | 天堂av在线网 | 亚洲视频免费 | 亚洲美女在线一区 | 蜜桃视频成人在线观看 | 精品久久网 | 一级一级一片免费 | 欧美精品亚洲精品日韩精品 | 日韩在线观看小视频 | 精品国产乱子伦一区二区 | 不卡av在线免费观看 | 91精品视频播放 | 91大神精品视频在线观看 | 黄色电影网站在线观看 | 国产特级毛片 | 亚洲精品视频网址 | 国产精品永久在线 | 久久久久国产精品免费免费搜索 | www.夜夜| 99se视频在线观看 | 久久成人午夜 | 久久精品一区二区三区中文字幕 | 精品成人a区在线观看 | 91亚洲精品久久久中文字幕 | 999精品 | 91免费视频黄 | 日韩一区精品 | 国产小视频免费观看 | 丁香在线观看完整电影视频 | 亚洲综合欧美精品电影 | 美女久久久 | 日韩精品视频网站 | 天天操人 | 亚洲一级二级三级 | 国产精品aⅴ | 成人av资源网 | 日韩美女免费线视频 | 免费看一级黄色 | 欧美欧美 | 国产无遮挡又黄又爽在线观看 | 亚洲视频axxx | 日韩中文字幕在线观看 | 欧美色就是色 | 天天天色综合 | 69国产精品成人在线播放 | 日韩在线视频一区 | 日韩一区二区三区在线看 | 欧美日韩一级在线 | 天天草夜夜 | 精品一区av | 狠狠色丁香久久婷婷综合五月 | 欧美一级片播放 | 香蕉视频在线视频 | 911久久香蕉国产线看观看 | 久久99精品国产 | 伊人五月天.com | 人人干人人爽 | 婷婷在线综合 | 成人免费视频播放 | 久久艹影院 | 久久视频在线看 | 国产免费资源 | 国产精品99久久久久久小说 | 久久国产高清视频 | 久久久久久久久久电影 | 亚洲亚洲精品在线观看 | 免费观看丰满少妇做爰 | 欧美久久久 | 久久久久久黄色 | 一区 二区电影免费在线观看 | av.com在线| 久久久久久毛片精品免费不卡 | 久久久免费播放 | 二区三区毛片 | 欧美日韩伦理一区 | 亚洲精品久久激情国产片 | 狠狠88综合久久久久综合网 | 欧美日比视频 | 日本久久久久久久久久 | 精品国产成人在线 | 伊人婷婷综合 | 91精彩视频在线观看 | 蜜臀av.com| 久久人人爽人人 | 一区二区视频在线看 | 国产精品第一页在线 | 成人国产精品入口 | 99电影 | 国语麻豆 | 一区在线播放 | 国语精品免费视频 | 亚洲爽爽网 | 亚洲成人精品av | 五月天欧美精品 | 一区二区精品视频 | 国产999视频| 午夜国产一区二区 | 日韩在线资源 | 国产精品一区一区三区 | 亚洲高清视频一区二区三区 | 国产精品嫩草影视久久久 | 国产精品99精品久久免费 | 久久伊人色综合 | 日韩毛片在线播放 | 人人干狠狠干 | 亚洲精品小视频在线观看 | 韩国精品福利一区二区三区 | 色婷婷亚洲精品 | 最新日韩精品 | 综合久久影院 | 免费福利在线观看 | 久99久精品 | 久久久私人影院 | 91精品视频免费看 | 欧美少妇xx | 成人亚洲网 | 91精品国产91热久久久做人人 | 狠狠地操| 激情久久久久 | 日韩久久久久久 | 人人玩人人添人人 | 又黄又色又爽 | 超级碰99| 在线观看av大片 | 九九免费精品视频 | 在线91视频 | 黄色软件网站在线观看 | 九九九热 | 日韩精品视频第一页 | 亚洲精品乱码久久久久久蜜桃不爽 | 在线国产激情视频 | 在线免费观看视频 | 色妞久久福利网 | 超碰在线日本 | 亚洲天天在线 | 中文字幕免费高清在线观看 | 一区二区中文字幕在线观看 | 天天插综合 | 欧美日韩在线观看不卡 | www国产亚洲精品久久网站 | 色婷婷国产精品 | 国产精品一区二区你懂的 | 欧美日韩久久一区 | 亚洲视频aaa| 国产精品一区二区无线 | 国产馆在线播放 | 中文字幕在线播放一区二区 | 日韩欧美视频一区 | 国产99久| 日韩在线观看一区二区 | 国产精品福利小视频 | 日韩专区在线播放 | 911久久香蕉国产线看观看 | 日韩高清免费在线 | 中文字幕日韩国产 | 日韩久久久久久久 | 久久成人国产精品 | 亚洲免费在线观看视频 | 91porny九色在线播放 | 欧美日韩一区二区在线观看 | 亚洲高清不卡av | 免费观看黄色12片一级视频 | 国产精品午夜在线观看 | 国产区精品在线 | 国产精品日韩久久久久 | 日韩欧美一区二区在线播放 | 99精品国产99久久久久久福利 | 日韩免费一级a毛片在线播放一级 | 国产精品乱码高清在线看 | 亚洲精品在线观看av | 国产黑丝一区二区三区 | 色婷婷狠狠五月综合天色拍 | 91资源在线视频 | a级国产乱理论片在线观看 特级毛片在线观看 | 国产精品黄 | www.夜色321.com | 亚洲精品视频免费在线观看 | 亚洲国产播放 | 久久看看 | 精品久久久久_ | 久久精品资源 | 国产精品一区二区在线观看免费 | av黄色免费看 | 免费人成网ww44kk44 | av一区在线 | 久久成人视屏 | 99热精品视 | 国产一区二区日本 | 久久精品爱爱视频 | 国产69精品久久久久久久久久 | 欧美日韩国产亚洲乱码字幕 | 在线观看视频一区二区三区 | 久久国产精品一区二区 | 午夜精品一二区 | 91高清视频 | 综合精品久久 | 特级毛片在线 | 丁香综合 | 99免费精品视频 | 久久超碰免费 | 久久久精选 | 91精品第一页 | 四虎在线免费观看视频 | 亚洲爱爱视频 | 夜夜躁日日躁 | 久久99精品一区二区三区三区 | 精品a级片| 国产中文字幕视频在线观看 | 亚洲人人精品 | 亚洲狠狠婷婷综合久久久 | 黄色小网站免费看 | 国产精品亚洲人在线观看 | 91中文在线 | 国产精品黄色av | 激情丁香婷婷 | 国产视频在线观看一区二区 | 久久在现 | 中文字幕在线影院 | 91精品国产三级a在线观看 | 丰满少妇一级片 | 国产亚洲精品久久久久久无几年桃 | 国产精品一区二区av麻豆 | 国产成人精品av | 日本久久影视 | 国产成人三级在线 | 国产h片在线观看 | 黄色的视频网站 | 99精品国产aⅴ| 免费视频91蜜桃 | 在线一区二区三区 | 中文字幕中文字幕 | 亚洲欧美视频 | 久久免费观看视频 | 亚洲精品久久久蜜桃直播 | 久保带人 | 久久成熟 | 免费黄色一区 | av理论电影 | 丁香婷婷成人 | 亚洲综合在线播放 | 91视频国产高清 | 国产小视频在线免费观看视频 | 日韩av不卡在线播放 | 久99久久| 天天激情站 | 国产小视频国产精品 | 欧美日韩在线网站 | 亚洲国产午夜 | 四虎在线免费观看 | 国内偷拍精品视频 | 91大神精品视频 | 人人爽人人爽人人片av免 | 国产精品av在线 | 一本一道久久a久久精品蜜桃 | 日韩欧美电影在线 | 欧美日韩在线观看一区二区三区 | 久久免费视频国产 | 精品久久久久久久 | 91看片淫黄大片在线播放 | 婷婷色av| 狠狠色狠狠色合久久伊人 | 久久国色夜色精品国产 | 国产免费黄视频在线观看 | 亚洲欧洲成人精品av97 | 婷婷成人亚洲综合国产xv88 | 91在线播 | 国产在线观看免费观看 | 成人在线免费观看网站 | 黄色三级免费 | 国产在线一线 | 精品免费国产一区二区三区四区 | 麻豆久久一区二区 | 96超碰在线 | av电影免费在线播放 | 欧美三级在线播放 | 伊香蕉大综综综合久久啪 | 欧美大码xxxx | 毛片永久免费 | 91久久精品一区二区二区 | 偷拍精偷拍精品欧洲亚洲网站 | 看黄色91| 国产精品永久免费在线 | 丁香婷婷久久久综合精品国产 | 久久开心激情 | 99视频国产在线 | 色婷婷一区| 亚洲黄色在线免费观看 | 五月婷婷在线视频观看 | 免费黄在线观看 | 亚洲人人av | 国产亚洲免费的视频看 | 精品国产视频在线观看 | 亚洲国产精品免费 | www.国产精品 | 中文av日韩 | 婷婷婷国产在线视频 | 九九九九色 | 日本公妇色中文字幕 | 国产中文欧美日韩在线 | 久久精品视频18 | 日韩美女免费线视频 | 麻豆国产电影 | 日韩在线观看一区 | 国产精品久久久久永久免费观看 | 999久久国产 | 99久久精品费精品 | 成人久久视频 | 在线精品一区二区 | 亚洲精选在线 | 免费瑟瑟网站 | 91av成人 | 在线电影 你懂得 | 天天夜夜狠狠操 | 亚洲专区中文字幕 | 91精品高清 | 五月综合色 | 成人va在线观看 | 99热这里只有精品在线观看 | h视频在线看 | 久久久久 | 国产午夜精品久久久久久久久久 | 欧美精品在线一区二区 | 国产亚洲情侣一区二区无 | 国产亚洲成人网 | 婷婷色吧| 国产一级高清视频 | 久草色在线观看 | 狠狠色丁香婷婷综合橹88 | 精品亚洲免a | aaawww| 国产一区二区在线播放 | 一区二区三区日韩在线观看 | 日韩毛片在线一区二区毛片 | 91精品推荐 | 亚洲精品理论 | 精品极品在线 | 国产精品久久久久永久免费 | 中文资源在线观看 | 国产欧美三级 | 二区三区在线观看 | 国产亚洲欧美精品久久久久久 | 色综久久 | 国产伦理精品一区二区 | 国精产品满18岁在线 | 香蕉视频国产在线观看 | 久久高清国产视频 | 国产在线观看高清视频 | 又爽又黄又无遮挡网站动态图 | 色七七亚洲影院 | 美女免费网视频 | 亚洲精品乱码久久久久 | 欧美成人h版在线观看 | 最新中文字幕视频 | 国产一区二区视频在线 | 国产精品99久久久久久久久 | 国产又黄又爽无遮挡 | 免费观看一级特黄欧美大片 | 亚洲男男gaygayxxxgv | 国产在线观看你懂得 | 国产99免费 | 97免费在线观看视频 | 久久精品婷婷 | 精品96久久久久久中文字幕无 | 日p在线观看 | 91成人在线网站 | 欧美成人a在线 | 久草在线视频免赞 | 色99之美女主播在线视频 | 日韩爱爱网站 | 亚洲最新av网址 | 91免费观看视频网站 | 国产精品久一 | 国产视频精品视频 | 狠狠综合久久 | 久久久精品欧美一区二区免费 |