日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ使用手册

發布時間:2024/10/6 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ使用手册 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. RabbitMQ配置

1.1RabbitMQ管理命令行

# 1.服務器啟動相關命令行 systemctl start|restart|stop|status rabbitmq-server # 2.管理命令行 rabbitmqctl help #查看更多命令 # 3.插件管理命令行 rabbitmq-plugins enable|disable|list

1.2 Web管理界面介紹

1.2.1 OverView概覽

2. 消息隊列模式

第一種模型(直連)

在上圖所示的模型中有以下概念

  • P:生產者
  • C:消費者
  • queue:消息隊列

1.開發生產者

connection = RabbitmqUtils.getConnection(); channel = connection.createChannel(); /*** 參數一:隊列名字(不存在會自動創建)、參數二:是否要持久化、* 參數三:是否獨占隊列、參數四:是否自動刪除隊列* 參數五:額外參數*/ channel.queueDeclare("hello", false, false, false, null); /*** 參數一:交換機名稱、參數二:隊列名稱、* 參數三:額外消息設置、參數四:消息具體內容*/ channel.basicPublish("", "hello", false, false, null, "hello rabbitmq".getBytes());

2.消費者

Connection connection = RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); //通道綁定隊列 channel.queueDeclare("hello",false,false,false,null); /*** 參數一:隊列名稱、參數二:消息自動確認*/ channel.basicConsume("hello",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));} });

工具類RabbitMQUtils.java

package com.rabbitmq.study;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @FileName: RabbitMQUtils* @Author Steven* @Date: 2020/11/14*/ public class RabbitMQUtils {private static ConnectionFactory factory;static {factory = new ConnectionFactory();factory.setHost("xxx.xxx.xxx.xxx");factory.setPort(5672);factory.setVirtualHost("/cms");//這只訪問虛擬主機的用戶名、密碼factory.setUsername("ems");factory.setPassword("123");}public static Connection getConnection() {try {return factory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}return null;}public static void closeChannelAndConn(Channel channel, Connection conn) {try {if (channel != null) {channel.close();}if (conn != null) {conn.close();}} catch (Exception e) {e.printStackTrace();}} }

3.參數說明

/** * 參數一:隊列名字(不存在會自動創建)、參數二:是否要持久化、 * 參數三:是否獨占隊列、參數四:是否自動刪除隊列 * 參數五:額外參數 */ channel.queueDeclare("aa", true, false, false, null);

第二種模型(work queue)

work queue,也被稱為(task queue),任務模型,多個消費者綁定一個隊列共同消費,隊列中的消息一旦被消費就會消失,因此消息不回被重復消費

角色:

  • P:生產者,任務發布
  • queue:隊列存放消息
  • C1:消費者1,消費消息較慢
  • C2:消費者2,消費消息較快

1.生產者Send.java

Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); /*** 參數一:隊列名字(不存在會自動創建)、參數二:是否要持久化、* 參數三:是否獨占隊列、參數四:是否自動刪除隊列* 參數五:額外參數*/ channel.queueDeclare("work", true, false, false, null); /*** 參數一:交換機名稱、參數二:隊列名稱、* 參數三:額外消息設置、參數四:消息具體內容*/ for (int i = 1; i <=100 ; i++) {channel.basicPublish("", "work", false, false, MessageProperties.PERSISTENT_TEXT_PLAIN, (i+"hello rabbitmq").getBytes()); } RabbitMQUtils.closeChannelAndConn(channel,connection);

2.消費者:Recv1.java

Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1:"+new String(body));try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}} });

3.消費者:Recv2.java

Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2:"+new String(body));try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}} });

消息確認機制

1.消費者:Recv1.java

Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //每次只能消費一個消息 channel.basicQos(1); channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2:" + new String(body));//參數1:確認具體為隊列中的那個消息,參數2:是否同時確認多個消息channel.basicAck(envelope.getDeliveryTag(),false);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}} });

2.消費者:Recv2.java

Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //每次只能消費一個消息 channel.basicQos(1); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2:"+new String(body));//參數1:確認具體為隊列中的那個消息,參數2:是否同時確認多個消息channel.basicAck(envelope.getDeliveryTag(),false);} });

第三種模型(fanout)

fanout:扇出,又稱為廣播

在廣播模式下,消息發送流程

  • 多個消費者
  • 每個消費者都綁定這自己的隊列(queue)
  • 每個隊列都要綁定到交換機(exchange)上
  • 生產者只將消息發送給交換機
  • 交換機(exchange)決定將消息發送給哪個隊列

1.開發生產者:send.java

Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); /*** 通道聲明為交換機* 參數1:交換機名稱、參數2:交換機類型 fanout:代表廣播類型*/ channel.exchangeDeclare("logs","fanout"); channel.basicPublish("logs","",null,"RabbitMq fanout".getBytes());

2.消費者:Recv.java

channel.exchangeDeclare("logs","fanout"); //創建臨時隊列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,"logs",""); channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1:"+new String(body));} });

第四種模型(Routing)

1.Routing之訂閱模型-Direct(直連)

在fanout模式中,一條消息,會被所有訂閱的消費者消費,但在某些情景下,我們希望不同的消息被不同的消費者消費,這時就用到Direct類型的Exchange。

在Direct模型下:

  • 隊列與交換機綁定,不能任意綁定,而是指定一個RoutingKey(路由key)
  • 生產者向Exchange(交換機)發送消息時需要指定RoutingKey?(路由key)
  • Exchange不在把消息交給綁定的隊列,而是根據消息的Routing Key進行判斷,只有當消息中的Routing Key隊列中的Routing Key完全一致才會收到消息

流程

圖解

  • P:生產者,向exchange發送消息時會指定一個Routing Key
  • X:Exchange(交換機),接受生產者消息,然后把消息發送給與Routing Key相同的隊列
  • C1:消費者,指定需要接受Routing Key為error的消息
  • C2:消費者,指定需要接受Routing key為error、info、warning的消息

1.生產者

Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct.logs", "direct"); String queueName = channel.queueDeclare().getQueue(); String routingKey = "error"; channel.queueBind(queueName, "direct.logs", routingKey); channel.basicPublish("direct.logs", routingKey, null, (routingKey + " Rabbit Direct").getBytes("utf-8")); RabbitMQUtils.closeChannelAndConn(channel, connection);

2.消費者-1

channel.exchangeDeclare("direct.logs", "direct"); String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, "direct.logs", "info"); channel.queueBind(queueName, "direct.logs", "warning"); channel.queueBind(queueName, "direct.logs", "error"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Recv1日志:" + new String(body));} });

3.消費者-2

Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); /*** 參數一:交換機名稱、參數二:交換機的類型:(direct)直連式*/ channel.exchangeDeclare("direct.logs", "direct"); /*** 聲明一個臨時隊列*/ String queueName = channel.queueDeclare().getQueue(); /*** 參數一:隊列名稱、參數二:交換機名稱、參數三:RoutingKey(路由key)*/ channel.queueBind(queueName, "direct.logs", "info"); channel.queueBind(queueName, "direct.logs", "warning"); channel.queueBind(queueName, "direct.logs", "error"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Recv1日志:" + new String(body));} });

2.Routing之訂閱模型-Topic(直連)

topic類型的exchange與direct相比,都可以根據RoutingKey把消息路由到不同的隊列中,只不過topic在綁定隊列時RoutingKey可以使用通配符,這種模型RoutingKey一般有一個或多個單詞組成多個單詞間用“.”分割類如:item.insert

# 通配符*(start):匹配一個單詞#(start):匹配多個單詞 #如:hello.*: hello.java、hello.worldhello.#: hello.java.world

1.生產者:send.java

Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 參數一:交換機名稱 channel.exchangeDeclare("topics","topic"); String routingKey="user.save.findAll"; channel.basicPublish("topics",routingKey,null,("RabbitMQ Topic RoutingKey:"+routingKey).getBytes()); RabbitMQUtils.closeChannelAndConn(channel,connection);

2.消費者:Recv.java

Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics","topic"); String queueName = channel.queueDeclare().getQueue(); String routingKey="user.#"; channel.queueBind(queueName,"topics",routingKey); channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("RabbitMQ tutorials Topic:"+new String(body));} });

3.SpringBoot整合RabbitMQ

3.1 初始環境單間

1.引入依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2.在application.yml中添加相關配置

server:address: 8080 spring:rabbitmq:addresses: 47.101.36.177username: emspassword: 123virtual-host: /cmsport: 5672application:name: rabbitmq-study

RabbitMQ簡化操作對象,使用時直接在項目中注入即可

3.2 第一種Hello World模型使用

1.生產者:Send.java

//注入RabbitMQ模板對象 @Autowired private RabbitTemplate rabbitTemplate;@Test public void testHelloWord() {rabbitTemplate.convertAndSend("hello","hello world"); }

2.消費者:HelloConsumer.java

@Component // 持久化,非獨占、非自動刪除的隊列 @RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false",autoDelete = "false")) public class HelloConsumer {@RabbitHandlerpublic void recive(String message){System.out.println("消息隊列中的消息:"+message);} }

3.3 第二種work queue模型使用

1.生產者:Send.java

@Test public void testWork(){for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("work","Work模型");}}

2.消費者:WorkConsumer.java

@Component public class WorkConsumer {@RabbitListener(queuesToDeclare=@Queue("work"))public void receive1(String message){System.out.println("work model1:"+message);}@RabbitListener(queuesToDeclare=@Queue("work"))public void receive2(String message){System.out.println("work model2:"+message);} }

3.4第三種廣播模型(fanout)使用

1.生產者:send.java

/*** 廣播模型fanout*/ @Test public void fanoutModel(){rabbitTemplate.convertAndSend("logs","","fanout廣播模型"); }

2.消費者:FanoutConsumer

@Component public class FanCoutModel {@RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定時,創建臨時隊列exchange =@Exchange(name = "logs",type = "fanout"))})public void receive1(String message){System.out.println("message1:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,//不指定時,創建臨時隊列exchange =@Exchange(name = "logs",type = "fanout"))})public void receive2(String message){System.out.println("message1:"+message);}}

3.5第三種路由(RoutingKey)模型使用

1.生產者:send.java

/*** route模式*/ @Test public void testRoute(){rabbitTemplate.convertAndSend("directs","error","通過路由發送error信息"); }

2.消費者:RouteConsumer.java

@Component public class RouteConsumer {@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "directs",type = "direct"),key = {"info","error","warning"})})public void receive1(String message){System.out.println("receive1:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "directs",type = "direct"),key ={"error"})})public void receive2(String message){System.out.println("receive2:"+message);}}

3.6第三種動態路由模型(Topic)使用

1.生產者:send.java

/*** topic模式(動態路由)*/ @Test public void testTopic(){rabbitTemplate.convertAndSend("topics","user.save","topic動態路由模式"); }

2.消費者:TopicConsumer.java

@Component public class TopicConsumer {@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "topics",type = "topic"),key = {"user.*"})})public void receive1(String message){System.out.println("receive1:"+message);}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topics",type = "topic"),key = {"user.#"})})public void receive2(String message){System.out.println("receive2:"+message);} }

4.RabbitMQ集群

4.1集群架構

4.1.1普通集群(副本集群)

總結

以上是生活随笔為你收集整理的RabbitMQ使用手册的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。