日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ入门中篇

發布時間:2024/1/8 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ入门中篇 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本篇博文目錄

      • 一.Spring整合RabbitMQ
        • 1.導入依賴
        • 2.生產者
        • 3.消費者
        • 4.測試
      • 二.SpringBoot整合RabbitMQ
        • 1.導入依賴
        • 2.生產者
        • 3.消費者
        • 4.測試
      • 三.代碼下載

一.Spring整合RabbitMQ

在spring項目中使用RabbitMQ的Exchange模式的Topics,項目分為消費者spring項目和生產者spring項目,其中都導入amqp-client和spring-rabbit依賴,前者為RabbitMQ的client依賴,后者為spring整合RabbitMQ的依賴。

1.導入依賴

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.5.RELEASE</version></dependency></dependencies>

備注:amqp-client 最新版本為5.16.0;spring-rabbit最新版本3.0.1。

2.生產者

  • 在resources包中創建applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 設置連接工廠,配置基本參數 --><rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"virtual-host="/test"></rabbit:connection-factory><!--fanout-exchange | direct-exchange | topic-exchange聲明一個名為topicExchange的topic交換機,如果這個交換機不存在,則自動創建--><rabbit:topic-exchange name="topicExchange" auto-declare="true"></rabbit:topic-exchange><!-- Spring為我們封裝了RabbitTemplate對象來簡化生產者發送數據的過程,對常用的方法進行了封裝。 --><rabbit:template id="template" connection-factory="connectionFactory" exchange="topicExchange"></rabbit:template><!--在生產者中配置template對象,用于發送數據--><bean id="newsProducer" class="com.itlaoqi.rabbit.exchange.NewsProducer"><property name="rabbitTemplate" ref="template"/></bean><!-- RabbitAdmin對象用于創建、綁定、管理隊列與交換機 --><rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/> </beans>

生產者的配置信息其實就是將Rabbit-client代碼中的代碼配置轉換為配置文件的配置,會簡單的進行修改即可,下來對上面相關配置信息進行解釋。

<rabbit:connection-factory 標簽是設置連接工廠,用來初始化Rabbitmq的連接對象,根據自己的Rabbitmq的配置信息填入連接對象的參數。

  • id:bean的名稱
  • host:主機地址
  • port:主機端口號
  • username:登入的用戶名
  • password:登入的密碼
  • virtual-host:虛擬路徑

<rabbit:topic-exchange 標簽用來初始化Topics類型的交換機信息,詳細參數解釋如下:

  • id:bean的名稱
  • name:交換機的名稱
  • auto-declare:自動創建

備注:交換機有三種方式:fanout-exchange(Sub/Pub) | direct-exchange(Routing) | topic-exchange(Topics)

<rabbit:template Spring為我們封裝了RabbitTemplate對象來簡化生產者發送數據的過程,對常用的方法進行了封裝。

  • id:bean的名稱
  • connection-factory:連接工廠
  • exchange:交換機

既然有了上面的RabbitTemplate對象,我們可以通過RabbitTemplate對象來發送數據,創建一個NewsProducer類,用來處理數據的發送:

NewsProducer類:用來處理生產者的數據發送(該類是一個封裝類,數據的發送通過該類的sendNews()進行實現)

package com.itlaoqi.rabbit.exchange;import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;import java.util.Date;public class NewsProducer {private RabbitTemplate rabbitTemplate = null;public RabbitTemplate getRabbitTemplate() {return rabbitTemplate;}public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendNews(String routingKey , News news){//convertAndSend 用于向exchange發送數據//第一個參數是routingkey//第二個參數是要傳遞的對象,可以是字符串、byte【】或者任何實現了【序列化接口】的對象rabbitTemplate.convertAndSend(routingKey , news);System.out.println("新聞發送成功");}public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");NewsProducer np = (NewsProducer)ctx.getBean("newsProducer");np.sendNews("us.20190101" , new News("新華社" , "特朗普又又又退群啦" , new Date() , "國際新聞內容"));np.sendNews("china.20190101" , new News("鳳凰TV" , "XXX企業榮登世界500強" , new Date() , "國內新聞內容"));} }

上面的NewsProducer類,通過外部傳遞一個RabbitTemplate對象進行初始化,然后通過 sendNews()方法中的 rabbitTemplate.convertAndSend(routingKey , news); 來發送數據,routingKey 表示路由主鍵,news表示數據對象,詳細的news代碼如下,注意該數據對象需要實現Serializable不然會因為無法序列化而拋出異常信息。

News類:傳遞的數據對象

package com.itlaoqi.rabbit.exchange;import java.io.Serializable; import java.util.Date;public class News implements Serializable{private String source;private String title;private Date createTime;private String content;public News(String source, String title, Date createTime, String content) {this.source = source;this.title = title;this.createTime = createTime;this.content = content;}public String getSource() {return source;}public void setSource(String source) {this.source = source;}public String getTitle() {return title;}public void setTitle(String title) {this.title = title;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}}

<bean id="newsProducer" 創建NewsProducer類的bean對象,并傳入初始化參數rabbitTemplate,通過該bean對象實現數據的發送,詳細代碼如下:

public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");NewsProducer np = (NewsProducer)ctx.getBean("newsProducer");np.sendNews("us.20190101" , new News("新華社" , "特朗普又又又退群啦" , new Date() , "國際新聞內容"));np.sendNews("china.20190101" , new News("鳳凰TV" , "XXX企業榮登世界500強" , new Date() , "國內新聞內容"));}

<rabbit:admin RabbitAdmin對象用于創建、綁定、管理隊列與交換機

  • id:bean的名稱
  • connection-factory:連接工廠

通過上面的<rabbit:admin,我們可以通過RabbitAdmin對象來實現隊列和交換機的創建,綁定和管理,詳細的測試代碼如下:

package com.itlaoqi.rabbit.exchange;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import javax.annotation.Resource; import java.util.HashMap;@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext.xml"}) public class RabbitAdminTestor {@Resource(name="rabbitAdmin")private RabbitAdmin rabbitAdmin;@Resourceprivate RabbitTemplate rabbitTemplate;/*** 創建交換機*/@Testpublic void testCreateExchange(){rabbitAdmin.declareExchange(new FanoutExchange("test.exchange.fanout" , true ,false));rabbitAdmin.declareExchange(new DirectExchange("test.exchange.direct" , true ,false));rabbitAdmin.declareExchange(new TopicExchange("test.exchange.topic" , true ,false));}/*** 創建隊列,綁定交換機并通過生產者發送數據*/@Testpublic void testQueueAndBind(){rabbitAdmin.declareQueue(new Queue("test.queue"));rabbitAdmin.declareBinding(new Binding("test.queue", Binding.DestinationType.QUEUE,"test.exchange.topic", "#", new HashMap<String, Object>()));rabbitTemplate.convertAndSend("test.exchange.topic" , "abc" , "abc123");}/*** 刪除隊列和交換機*/@Testpublic void testDelete(){rabbitAdmin.deleteQueue("test.queue");rabbitAdmin.deleteExchange("test.exchange.fanout");rabbitAdmin.deleteExchange("test.exchange.direct");rabbitAdmin.deleteExchange("test.exchange.topic");}}

3.消費者

  • 在resources包中創建applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"virtual-host="/test"></rabbit:connection-factory><rabbit:admin connection-factory="connectionFactory"></rabbit:admin><!--創建隊列--><rabbit:queue name="topicQueue" auto-declare="true" auto-delete="false" durable="false" exclusive="false"/><!--交換機與隊列綁定,并指明篩選條件--><rabbit:topic-exchange name="topicExchange" auto-declare="true"><rabbit:bindings><rabbit:binding queue="topicQueue" pattern="us.*"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--啟動消費者后,Spring底層自動監聽對應的topicQueue數據,一旦有新的消息進來,自動傳入到consumer Bean的recv的News參數中,之后再程序對News進一步處理--><rabbit:listener-container connection-factory="connectionFactory"><rabbit:listener ref="consumer" method="recv" queue-names="topicQueue"/></rabbit:listener-container><bean id="consumer" class="com.itlaoqi.rabbitmq.NewsConsumer"></bean> </beans>

消費者的配置信息和生產者的存在很多相同,下來主要對沒有出現過的配置信息進行解釋。

<rabbit:queue 創建隊列

  • id:bean的名稱
  • name:隊列的名稱
  • auto-declare:自動創建
  • auto-delete:自動刪除
  • exclusive:是否獨占
  • durable:是否持久化

<rabbit:bindings 交換機與隊列綁定,并指明篩選條件

  • queue:隊列
  • pattern:篩選條件,*表示任意一個字符,#表示任意多個字符

<rabbit:listener-container 啟動消費者后,Spring底層自動監聽對應的topicQueue數據,一旦有新的消息進來,自動傳入到consumer Bean的recv的News參數中,之后再程序對News進一步處理

  • connection-factory:連接工廠

<rabbit:listener 具體的監聽對象

  • ref:監聽對象的bean
  • method:監聽方法
  • queue-names:隊列名稱

<bean id="consumer" 監聽的bean對象,具體的監聽方法recv

package com.itlaoqi.rabbitmq;import com.itlaoqi.rabbit.exchange.News; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;public class NewsConsumer {public void recv(News news){System.out.println("接收到最新新聞:" + news.getTitle() + ":" + news.getSource());}public static void main(String[] args) {//初始化IOC容器ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");} }

4.測試


二.SpringBoot整合RabbitMQ

1.導入依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.生產者

  • 添加一個名為springboot-exchange的交換機

  • application.properties配置
# rabbitmq連接配置 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/test spring.rabbitmq.connection-timeout=1000ms#producer #confirmlistener # 在springboot2.2.0.RELEASE版本之后該屬性已經過時了,不能在使用了 #spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-confirm-type=CORRELATED #returnlistener spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true

spring.rabbitmq.publisher-confirms 配置信息在springboot2.2.0.RELEASE版本之后該屬性已經過時了,不能在使用了,而是采用spring.rabbitmq.publisher-confirm-type,該屬性一共有三種值(來源于:https://blog.csdn.net/qingyuan2014/article/details/113916449):

  • NONE值是禁用發布確認模式,是默認值
  • CORRELATED值是發布消息成功到交換器后會觸發回調方法
  • SIMPLE值經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在發布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點返回發送結果,根據返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發送消息到broker;

spring.rabbitmq.publisher-returns 開啟rabbitmq確認機制的return
spring.rabbitmq.template.mandatory mandatory true代表如果消息無法正常投遞則return回生產者,如果false,則直接將消息放棄

  • 生產者數據發送對象

MessageProducer類:用于生產者數據發送,通過sendMsg()里的rabbitTemplate.convertAndSend()方法發送數據。

package com.itlaoqi.rabbitmq.springboot;import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;import javax.annotation.Resource; import java.util.Date;@Component public class MessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate ;RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {@Override/*** CorrelationData 消息的附加信息,即自定義id* isack 代表消息是否被broker(MQ)接收 true 代表接收 false代表拒收。* cause 如果拒收cause則說明拒收的原因,幫助我們進行后續處理*/public void confirm(CorrelationData correlationData, boolean isack, String cause) {System.out.println(correlationData);System.out.println("ack:" + isack);if(isack == false){System.err.println(cause);}}};RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingkey) {System.err.println("Code:" + replyCode + ",Text:" + replyText );System.err.println("Exchange:" + exchange + ",RoutingKey:" + routingkey );}};public void sendMsg(Employee emp){//CorrelationData對象的作用是作為消息的附加信息傳遞,通常我們用它來保存消息的自定義idCorrelationData cd = new CorrelationData(emp.getEmpno() + "-" + new Date().getTime());rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);rabbitTemplate.convertAndSend("springboot-exchange" , "hr.employee" , emp , cd);}}

和spring相比springboot使用起來就比較簡單了,只需要在application.properties中進行配置(沒有進行配置的參數,采用默認參數),就可以使用Rabbitmq提供的RabbitTemplate進行操作,上文中的confirmCallback和returnCallback是Rabbitmq消息確認機制里面對Confirm和return進行監聽,通過rabbitTemplate.setConfirmCallback()和rabbitTemplate.setReturnCallback()進行設置。在springboot2.2.0.RELEASE版本之后ReturnCallback和setReturnCallback 已經過時了,建議采用ReturnsCallback和setReturnsCallback(),ReturnsCallback和ReturnCallback的區別就是后者對參數進行了封裝。

ReturnsCallback中returnedMessage的參數為ReturnedMessage

ReturnCallback中returnedMessage的參數為(message, replyCode, replyText, exchange, routingkey)

convertAndSend方法中第四個參數CorrelationData 對象作用是作為消息的附加信息傳遞,通常我們用它來保存消息的自定義id

  • 數據對象

Employee 類:傳遞的數據對象,注意需要實現Serializable接口不然會因為無法序列化而產生異常。

package com.itlaoqi.rabbitmq.springboot;import java.io.Serializable;public class Employee implements Serializable{private String empno;private String name;private Integer age;public Employee(String empno, String name, Integer age) {this.empno = empno;this.name = name;this.age = age;}public String getEmpno() {return empno;}public void setEmpno(String empno) {this.empno = empno;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;} }
  • 發送數據

  • 添加一個名為springboot-queue的隊列,并綁定到交換機springboot-exchange上,如果不怎么做的話會進入到return中,如下:

添加后,再次運行:

3.消費者

  • application.properties配置
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/test spring.rabbitmq.connection-timeout=1000msspring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.concurrency=1 spring.rabbitmq.listener.simple.max-concurrency=5

spring.rabbitmq.listener.simple.acknowledge-mode 表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto,下面對各種參數進行了解釋。來源于(https://juejin.cn/post/7029232312197840904)

  • NONE : 不確認 :

1、默認所有消息消費成功,會不斷的向消費者推送消息
2、因為 rabbitmq 認為所有消息都被消費成功。所以隊列中存在丟失消息風險。

  • AUTO:自動確認

1、根據消息處理邏輯是否拋出異常自動發送 ack(正常)和nack(異常)給服務端,如果消費者本身邏輯沒有處理好這條數據就存在丟失消息的風險。
2、使用自動確認模式時,需要考慮的另一件事情就是消費者過載。

  • MANUAL:手動確認

1、手動確認在業務失敗后進行一些操作,消費者調用 ack、nack、reject 幾種方法進行確認,如果消息未被 ACK 則發送到下一個消費者或重回隊列。
2、ack 用于肯定確認;nack 用于 否定確認 ;reject 用于否定確認(一次只能拒絕單條消息)

spring.rabbitmq.listener.simple.concurrency : 最小的消費者數量
spring.rabbitmq.listener.simple.max-concurrency : 最大的消費者數量
來源于:https://blog.csdn.net/ystyaoshengting/article/details/105267542

  • 消費者接收類
package com.itlaoqi.rabbitmq.springboot;import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;import java.io.IOException; import java.util.Map;@Component public class MessageConsumer {//@RabbitListener注解用于聲明式定義消息接受的隊列與exhcange綁定的信息//在SpringBoot中,消費者這端使用注解獲取消息@RabbitListener(bindings = @QueueBinding(value = @Queue(value="springboot-queue" , durable="true"),exchange = @Exchange(value = "springboot-exchange" , durable = "true" , type = "topic") ,key = "#"))//用于接收消息的方法@RabbitHandler //通知SpringBoot下面的方法用于接收消息。// 這個方法運行后將處于等待的狀態,有新的消息進來就會自動觸發下面的方法處理消息//@Payload 代表運行時將消息反序列化后注入到后面的參數中public void handleMessage(@Payload Employee employee , Channel channel ,@Headers Map<String,Object> headers) {System.out.println("=========================================");System.out.println("接收到" + employee.getEmpno() + ":" + employee.getName());//所有消息處理后必須進行消息的ack,channel.basicAck()Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);try {channel.basicAck(tag , false);} catch (IOException e) {e.printStackTrace();}System.out.println("=========================================");} }

使用 @RabbitListener 注解用于聲明式定義消息接受的隊列與exhcange綁定的信息,其中 value = @Queue()指向隊列, exchange = @Exchange()指向交換機,key表示條件。

@RabbitListener(bindings = @QueueBinding(value = @Queue(value="springboot-queue" , durable="true"),exchange = @Exchange(value = "springboot-exchange" , durable = "true" , type = "topic") ,key = "#"))

使用 @RabbitHandler 注解用于表明該方法為接收消息的方法,運行后將處于等待的狀態,有新的消息進來就會自動觸發下面的方法進行處理,使用 @Payload 注解表示運行時將消息反序列化后注入到后面的參數中, 使用 @Headers 注解表示消息頭的信息,包括描述的輔助信息可以用來進行消息確認。

@RabbitHandler //通知SpringBoot下面的方法用于接收消息。// 這個方法運行后將處于等待的狀態,有新的消息進來就會自動觸發下面的方法處理消息//@Payload 代表運行時將消息反序列化后注入到后面的參數中public void handleMessage(@Payload Employee employee , Channel channel ,@Headers Map<String,Object> headers) {System.out.println("=========================================");System.out.println("接收到" + employee.getEmpno() + ":" + employee.getName());//所有消息處理后必須進行消息的ack,channel.basicAck()Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);try {channel.basicAck(tag , false);} catch (IOException e) {e.printStackTrace();}System.out.println("=========================================");} }

headers中的內容:

{amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=springboot-exchange, amqp_deliveryTag=3, amqp_consumerQueue=springboot-queue, amqp_redelivered=false, amqp_receivedRoutingKey=hr.employee, spring_listener_return_correlation=eac10f9e-b1e0-4b5a-92a1-c6aa8ea1f959, spring_returned_message_correlation=3306-1674918565983, id=6e5d0290-77c6-58c3-6fe7-7ed554a4667a, amqp_consumerTag=amq.ctag-mt44wHOpGCeH_e69j7zFwg, amqp_lastInBatch=false, contentType=application/x-java-serialized-object, timestamp=1674918566121}

其中headers里的 spring_returned_message_correlation=3306-1674918565983 就是生產者中CorrelationData的自定義id

4.測試


三.代碼下載

在我的微信公眾號后臺回復 rabbitmq 就可以獲取本篇博文相關的源代碼了,如果有什么疑問后臺給為留言,我看見會第一時間回復你的。

總結

以上是生活随笔為你收集整理的RabbitMQ入门中篇的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。