日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ 从入门到精通 (一)

發(fā)布時(shí)間:2023/12/18 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 从入门到精通 (一) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

目錄

  • 1. 初識(shí)RabbitMQ
  • 2. AMQP
  • 3.RabbitMQ的極速入門
  • 4. Exchange(交換機(jī))詳解
    • 4.1 Direct Exchange
    • 4.2 Topic Exchange
    • 4.3 Fanout Exchange
  • 5. Message 消息

1. 初識(shí)RabbitMQ

RabbitMQ 是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,用來通過普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù),RabbitMQ是使用 Erlang語言來編寫的,并且RabbitMQ是基于AMQP協(xié)議的

RabbitMQ的優(yōu)點(diǎn):

  • 開源、性能優(yōu)秀、穩(wěn)定性保障
  • 提供可靠性消息投遞模式(confirm)、返回模式(return)
  • 與SpringAMQP完美的整合、API豐富
  • 集群模式豐富,表達(dá)式配置,HA模式,鏡像隊(duì)列模型
  • 保證數(shù)據(jù)不丟失的前提下做到高可靠性、可用性

RabbitMQ官網(wǎng)

RabbitMQ的整體架構(gòu):

?
RabbitMQ的消息流轉(zhuǎn):

?

?

2. AMQP

AMQP全稱: Advanced Message Queuing Protocol

AMQP翻譯: 高級(jí)消息隊(duì)列協(xié)議

AMQP定義: 是具有現(xiàn)代特征的二進(jìn)制協(xié)議。是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)

?
?

AMQP核心概念:

  • Server:又稱Broker,接受客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)
  • Connection:連接,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接
  • Channel:網(wǎng)絡(luò)信道,幾乎所有的操作都在Channel中進(jìn)行,Channel是進(jìn)行消息讀寫的通道。客戶端可建立多個(gè)Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)
  • Message:消息,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由Properties和Body組成。Properties可以對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性;Body則是消息體的內(nèi)容
  • Virtual host:虛擬地址,用于進(jìn)行邏輯隔離,最上層的消息路由。同一個(gè)Virtual Host里面不能有相同名稱的Exchange或Queue
  • Exchange:交換機(jī),接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列
  • Binding:Exchange和Queue之間的虛擬連接,binding中可以包含routing key
  • Routing key:一個(gè)路由規(guī)則,虛擬機(jī)可用它確定如何路由一個(gè)特定消息
  • Queue:也稱為Message Queue,消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者

?

?

3.RabbitMQ的極速入門

后臺(tái)啟動(dòng): ./rabbitmq start &

關(guān)閉: ./rabbitmqctl stop

節(jié)點(diǎn)狀態(tài): ./rabbitmqctl status

管控臺(tái): http://ip:15672

?

?

RabbitMQ生產(chǎn)消費(fèi)快速入門:

環(huán)境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依賴配置)

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.9.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency></dependencies>

?

public class Procuder {public static void main(String[] args) throws Exception {//1.創(chuàng)建一個(gè)ConnectionFactory 并進(jìn)行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通過連接工廠創(chuàng)建連接Connection connection = connectionFactory.newConnection();//3.通過Connection 創(chuàng)建一個(gè) ChannelChannel channel = connection.createChannel();/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:指定交換機(jī) 不指定 則默認(rèn) (AMQP default交換機(jī)) 通過routingkey進(jìn)行匹配 * props 消息屬性* body 消息體*///4.通過Channel發(fā)送數(shù)據(jù)for(int i = 0; i < 5; i++){System.out.println("生產(chǎn)消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", null, msg.getBytes());}//5.記得關(guān)閉相關(guān)的連接channel.close();connection.close();} }

?

public class Consumer {public static void main(String[] args) throws Exception{//1.創(chuàng)建一個(gè)ConnectionFactory 并進(jìn)行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通過連接工廠創(chuàng)建連接Connection connection = connectionFactory.newConnection();//3.通過Connection 創(chuàng)建一個(gè) ChannelChannel channel = connection.createChannel();//4. 聲明創(chuàng)建一個(gè)隊(duì)列String queueName = "test";/*** durable 是否持久化* exclusive 獨(dú)占的 相當(dāng)于加了一把鎖*/channel.queueDeclare(queueName,true,false,false,null);//5.創(chuàng)建消費(fèi)者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.設(shè)置channel/*** ACK: 當(dāng)一條消息從生產(chǎn)端發(fā)到消費(fèi)端,消費(fèi)端接收到消息后會(huì)馬上回送一個(gè)ACK信息給broker,告訴它這條消息收到了* autoack: * true 自動(dòng)簽收 當(dāng)消費(fèi)者一收到消息就表示消費(fèi)者收到了消息,消費(fèi)者收到了消息就會(huì)立即從隊(duì)列中刪除。* false 手動(dòng)簽收 當(dāng)消費(fèi)者收到消息在合適的時(shí)候來顯示的進(jìn)行確認(rèn),說我已經(jīng)接收到了該消息了,RabbitMQ可以從隊(duì)列中刪除該消息了* */channel.basicConsume(queueName, true, queueingConsumer);//7.獲取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消費(fèi)端:" + msg);//Envelope envelope = delivery.getEnvelope();}} }

?

4. Exchange(交換機(jī))詳解

Exchange: 接收消息,并根據(jù)路由鍵轉(zhuǎn)發(fā)消息所綁定的隊(duì)列

?

交換機(jī)屬性:

  • Name: 交換機(jī)名稱
  • Type: 交換機(jī)類型 diect、topic、fanout、headers
  • Durability: 是否需要持久化,true為持久化
  • AutoDelete: 當(dāng)最后一個(gè)綁定到Exchange的隊(duì)列刪除后,自動(dòng)刪除該Exchange
  • Internal: 當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為false (百分之99的情況默認(rèn)為false 除非對(duì)Erlang語言較了解,做一些擴(kuò)展)
  • Arguments: 擴(kuò)展參數(shù), 用于擴(kuò)展AMQP協(xié)議可自定化使用

?

4.1 Direct Exchange

所有發(fā)送到Direct Exchange的消息被轉(zhuǎn)發(fā)到RouteKey指定的Queue

注意:Direct模式可以使用RabbitMQ自帶的Exchange: default Exchange,所以不需要將Exchange進(jìn)行任何綁定(binding)操作,消息傳遞時(shí),RoutingKey必須完全匹配才會(huì)被隊(duì)列接收,否則該消息會(huì)被拋棄

?

public class ProducerDirectExchange {public static void main(String[] args) throws Exception {//1.創(chuàng)建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2.創(chuàng)建ConnectionConnection connection = connectionFactory.newConnection();//3.創(chuàng)建ChannelChannel channel = connection.createChannel();//4.聲明String exchangeName = "test_direct_exchange";String routingKey = "test.direct";//5.發(fā)送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());} }

?

public class ConsumerDirectExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//聲明String exchangeName = "test_direct_exchange";String exchangeType = "direct";String queueName = "test_direct_queue";String routingKey = "test.direct";//表示聲明了一個(gè)交換機(jī)channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示聲明了一個(gè)隊(duì)列channel.queueDeclare(queueName,false,false,false,null);//建立一個(gè)綁定關(guān)系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//參數(shù):隊(duì)列名稱,是否自動(dòng)ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循環(huán)獲取消息while(true){//獲取消息,如果沒有消息,這一步將會(huì)一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}} }

?

4.2 Topic Exchange

所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上

Exchange將RouteKey和某Topic進(jìn)行模糊匹配,此時(shí)隊(duì)列需要綁定一個(gè)Topic

注意:可以使用通配符進(jìn)行匹配

符號(hào) # 匹配一個(gè)或多個(gè)詞

符號(hào) * 匹配不多不少一個(gè)詞

例如: "log.#" 能夠匹配到 “l(fā)og.info.oa”

? "log.*" 只會(huì)匹配到 "log.err"

public class ProducerTopicExchange {public static void main(String[] args) throws Exception {//1.創(chuàng)建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.創(chuàng)建ConnectionConnection connection = connectionFactory.newConnection();//3.創(chuàng)建ChannelChannel channel = connection.createChannel();//4.聲明String exchangeName = "test_topic_exchange";String routingKey1 = "user.save";String routingKey2 = "user.update";String routingKey3 = "user.delete.abc";//5.發(fā)送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());} }

?

public class ConsumerTopicExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//聲明String exchangeName = "test_topic_exchange";String exchangeType = "topic";String queueName = "test_topic_queue";String routingKey = "user.#";//表示聲明了一個(gè)交換機(jī)channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示聲明了一個(gè)隊(duì)列channel.queueDeclare(queueName,false,false,false,null);//建立一個(gè)綁定關(guān)系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//參數(shù):隊(duì)列名稱,是否自動(dòng)ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循環(huán)獲取消息while(true){//獲取消息,如果沒有消息,這一步將會(huì)一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}} }

?

4.3 Fanout Exchange

不處理路由鍵,只需要簡單的將隊(duì)列綁定到交換機(jī)上
發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上
所以Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的

?

public class ProducerFanoutExchange {public static void main(String[] args) throws Exception {//1.創(chuàng)建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.創(chuàng)建ConnectionConnection connection = connectionFactory.newConnection();//3.創(chuàng)建ChannelChannel channel = connection.createChannel();//4.聲明String exchangeName = "test_fanout_exchange";//5.發(fā)送for(int i = 0; i < 10 ; i++){String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, "", null, msg.getBytes());}channel.close();connection.close();} }

?

public class ConsumerFanoutExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//聲明String exchangeName = "test_fanout_exchange";String exchangeType = "fanout";String queueName = "test_topic_queue";//無需指定路由key String routingKey = "";//表示聲明了一個(gè)交換機(jī)channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示聲明了一個(gè)隊(duì)列channel.queueDeclare(queueName,false,false,false,null);//建立一個(gè)綁定關(guān)系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//參數(shù):隊(duì)列名稱,是否自動(dòng)ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循環(huán)獲取消息while(true){//獲取消息,如果沒有消息,這一步將會(huì)一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}} }

?

5. Message 消息

服務(wù)器與應(yīng)用程序之間傳遞的數(shù)據(jù),本質(zhì)上就是一段數(shù)據(jù),由Properties和Body組成

常用屬性:delivery mode、headers (自定義屬性)

其他屬性:content_type、content_encoding、priority、expiration

消息的properties屬性用法示例:

public class Procuder {public static void main(String[] args) throws Exception {//1.創(chuàng)建一個(gè)ConnectionFactory 并進(jìn)行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通過連接工廠創(chuàng)建連接Connection connection = connectionFactory.newConnection();//3.通過Connection 創(chuàng)建一個(gè) ChannelChannel channel = connection.createChannel();Map<String,Object> headers = new HashMap<>();headers.put("my1", "111");headers.put("my2", "222");//10秒不消費(fèi) 消息過期移除消息隊(duì)列AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();//4.通過Channel發(fā)送數(shù)據(jù)for(int i = 0; i < 5; i++){System.out.println("生產(chǎn)消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", properties, msg.getBytes());}//5.記得關(guān)閉相關(guān)的連接channel.close();connection.close();} }

?

public class Consumer {public static void main(String[] args) throws Exception{//1.創(chuàng)建一個(gè)ConnectionFactory 并進(jìn)行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通過連接工廠創(chuàng)建連接Connection connection = connectionFactory.newConnection();//3.通過Connection 創(chuàng)建一個(gè) ChannelChannel channel = connection.createChannel();//4. 聲明創(chuàng)建一個(gè)隊(duì)列String queueName = "test";channel.queueDeclare(queueName,true,false,false,null);//5.創(chuàng)建消費(fèi)者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.設(shè)置channelchannel.basicConsume(queueName, true, queueingConsumer);//7.獲取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消費(fèi)端:" + msg);Map<String, Object> headers = delivery.getProperties().getHeaders();System.err.println("headers value:" + headers.get("my1"));}} }

轉(zhuǎn)載于:https://www.cnblogs.com/dwlovelife/p/10982735.html

總結(jié)

以上是生活随笔為你收集整理的RabbitMQ 从入门到精通 (一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。