RabbitMQ入门中篇
本篇博文目錄
- 一.Spring整合RabbitMQ
- 1.導入依賴
- 2.生產者
- 3.消費者
- 4.測試
- 二.SpringBoot整合RabbitMQ
- 1.導入依賴
- 2.生產者
- 3.消費者
- 4.測試
- 三.代碼下載
一.Spring整合RabbitMQ
在spring項目中使用RabbitMQ的Exchange模式的Topics,項目分為消費者spring項目和生產者spring項目,其中都導入amqp-client和spring-rabbit依賴,前者為RabbitMQ的client依賴,后者為spring整合RabbitMQ的依賴。
1.導入依賴
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.5.RELEASE</version></dependency></dependencies>備注:amqp-client 最新版本為5.16.0;spring-rabbit最新版本3.0.1。
2.生產者
- 在resources包中創建applicationContext.xml
生產者的配置信息其實就是將Rabbit-client代碼中的代碼配置轉換為配置文件的配置,會簡單的進行修改即可,下來對上面相關配置信息進行解釋。
<rabbit:connection-factory 標簽是設置連接工廠,用來初始化Rabbitmq的連接對象,根據自己的Rabbitmq的配置信息填入連接對象的參數。
- id:bean的名稱
- host:主機地址
- port:主機端口號
- username:登入的用戶名
- password:登入的密碼
- virtual-host:虛擬路徑
<rabbit:topic-exchange 標簽用來初始化Topics類型的交換機信息,詳細參數解釋如下:
- id:bean的名稱
- name:交換機的名稱
- auto-declare:自動創建
備注:交換機有三種方式:fanout-exchange(Sub/Pub) | direct-exchange(Routing) | topic-exchange(Topics)
<rabbit:template Spring為我們封裝了RabbitTemplate對象來簡化生產者發送數據的過程,對常用的方法進行了封裝。
- id:bean的名稱
- connection-factory:連接工廠
- exchange:交換機
既然有了上面的RabbitTemplate對象,我們可以通過RabbitTemplate對象來發送數據,創建一個NewsProducer類,用來處理數據的發送:
NewsProducer類:用來處理生產者的數據發送(該類是一個封裝類,數據的發送通過該類的sendNews()進行實現)
package com.itlaoqi.rabbit.exchange;import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;import java.util.Date;public class NewsProducer {private RabbitTemplate rabbitTemplate = null;public RabbitTemplate getRabbitTemplate() {return rabbitTemplate;}public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendNews(String routingKey , News news){//convertAndSend 用于向exchange發送數據//第一個參數是routingkey//第二個參數是要傳遞的對象,可以是字符串、byte【】或者任何實現了【序列化接口】的對象rabbitTemplate.convertAndSend(routingKey , news);System.out.println("新聞發送成功");}public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");NewsProducer np = (NewsProducer)ctx.getBean("newsProducer");np.sendNews("us.20190101" , new News("新華社" , "特朗普又又又退群啦" , new Date() , "國際新聞內容"));np.sendNews("china.20190101" , new News("鳳凰TV" , "XXX企業榮登世界500強" , new Date() , "國內新聞內容"));} }上面的NewsProducer類,通過外部傳遞一個RabbitTemplate對象進行初始化,然后通過 sendNews()方法中的 rabbitTemplate.convertAndSend(routingKey , news); 來發送數據,routingKey 表示路由主鍵,news表示數據對象,詳細的news代碼如下,注意該數據對象需要實現Serializable不然會因為無法序列化而拋出異常信息。
News類:傳遞的數據對象
package com.itlaoqi.rabbit.exchange;import java.io.Serializable; import java.util.Date;public class News implements Serializable{private String source;private String title;private Date createTime;private String content;public News(String source, String title, Date createTime, String content) {this.source = source;this.title = title;this.createTime = createTime;this.content = content;}public String getSource() {return source;}public void setSource(String source) {this.source = source;}public String getTitle() {return title;}public void setTitle(String title) {this.title = title;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}}<bean id="newsProducer" 創建NewsProducer類的bean對象,并傳入初始化參數rabbitTemplate,通過該bean對象實現數據的發送,詳細代碼如下:
public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");NewsProducer np = (NewsProducer)ctx.getBean("newsProducer");np.sendNews("us.20190101" , new News("新華社" , "特朗普又又又退群啦" , new Date() , "國際新聞內容"));np.sendNews("china.20190101" , new News("鳳凰TV" , "XXX企業榮登世界500強" , new Date() , "國內新聞內容"));}<rabbit:admin RabbitAdmin對象用于創建、綁定、管理隊列與交換機
- id:bean的名稱
- connection-factory:連接工廠
通過上面的<rabbit:admin,我們可以通過RabbitAdmin對象來實現隊列和交換機的創建,綁定和管理,詳細的測試代碼如下:
package com.itlaoqi.rabbit.exchange;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import javax.annotation.Resource; import java.util.HashMap;@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext.xml"}) public class RabbitAdminTestor {@Resource(name="rabbitAdmin")private RabbitAdmin rabbitAdmin;@Resourceprivate RabbitTemplate rabbitTemplate;/*** 創建交換機*/@Testpublic void testCreateExchange(){rabbitAdmin.declareExchange(new FanoutExchange("test.exchange.fanout" , true ,false));rabbitAdmin.declareExchange(new DirectExchange("test.exchange.direct" , true ,false));rabbitAdmin.declareExchange(new TopicExchange("test.exchange.topic" , true ,false));}/*** 創建隊列,綁定交換機并通過生產者發送數據*/@Testpublic void testQueueAndBind(){rabbitAdmin.declareQueue(new Queue("test.queue"));rabbitAdmin.declareBinding(new Binding("test.queue", Binding.DestinationType.QUEUE,"test.exchange.topic", "#", new HashMap<String, Object>()));rabbitTemplate.convertAndSend("test.exchange.topic" , "abc" , "abc123");}/*** 刪除隊列和交換機*/@Testpublic void testDelete(){rabbitAdmin.deleteQueue("test.queue");rabbitAdmin.deleteExchange("test.exchange.fanout");rabbitAdmin.deleteExchange("test.exchange.direct");rabbitAdmin.deleteExchange("test.exchange.topic");}}3.消費者
- 在resources包中創建applicationContext.xml
消費者的配置信息和生產者的存在很多相同,下來主要對沒有出現過的配置信息進行解釋。
<rabbit:queue 創建隊列
- id:bean的名稱
- name:隊列的名稱
- auto-declare:自動創建
- auto-delete:自動刪除
- exclusive:是否獨占
- durable:是否持久化
<rabbit:bindings 交換機與隊列綁定,并指明篩選條件
- queue:隊列
- pattern:篩選條件,*表示任意一個字符,#表示任意多個字符
<rabbit:listener-container 啟動消費者后,Spring底層自動監聽對應的topicQueue數據,一旦有新的消息進來,自動傳入到consumer Bean的recv的News參數中,之后再程序對News進一步處理
- connection-factory:連接工廠
<rabbit:listener 具體的監聽對象
- ref:監聽對象的bean
- method:監聽方法
- queue-names:隊列名稱
<bean id="consumer" 監聽的bean對象,具體的監聽方法recv
package com.itlaoqi.rabbitmq;import com.itlaoqi.rabbit.exchange.News; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;public class NewsConsumer {public void recv(News news){System.out.println("接收到最新新聞:" + news.getTitle() + ":" + news.getSource());}public static void main(String[] args) {//初始化IOC容器ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");} }4.測試
二.SpringBoot整合RabbitMQ
1.導入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>2.生產者
- 添加一個名為springboot-exchange的交換機
- application.properties配置
spring.rabbitmq.publisher-confirms 配置信息在springboot2.2.0.RELEASE版本之后該屬性已經過時了,不能在使用了,而是采用spring.rabbitmq.publisher-confirm-type,該屬性一共有三種值(來源于:https://blog.csdn.net/qingyuan2014/article/details/113916449):
- NONE值是禁用發布確認模式,是默認值
- CORRELATED值是發布消息成功到交換器后會觸發回調方法
- SIMPLE值經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在發布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點返回發送結果,根據返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發送消息到broker;
spring.rabbitmq.publisher-returns 開啟rabbitmq確認機制的return
spring.rabbitmq.template.mandatory mandatory true代表如果消息無法正常投遞則return回生產者,如果false,則直接將消息放棄
- 生產者數據發送對象
MessageProducer類:用于生產者數據發送,通過sendMsg()里的rabbitTemplate.convertAndSend()方法發送數據。
package com.itlaoqi.rabbitmq.springboot;import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;import javax.annotation.Resource; import java.util.Date;@Component public class MessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate ;RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {@Override/*** CorrelationData 消息的附加信息,即自定義id* isack 代表消息是否被broker(MQ)接收 true 代表接收 false代表拒收。* cause 如果拒收cause則說明拒收的原因,幫助我們進行后續處理*/public void confirm(CorrelationData correlationData, boolean isack, String cause) {System.out.println(correlationData);System.out.println("ack:" + isack);if(isack == false){System.err.println(cause);}}};RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingkey) {System.err.println("Code:" + replyCode + ",Text:" + replyText );System.err.println("Exchange:" + exchange + ",RoutingKey:" + routingkey );}};public void sendMsg(Employee emp){//CorrelationData對象的作用是作為消息的附加信息傳遞,通常我們用它來保存消息的自定義idCorrelationData cd = new CorrelationData(emp.getEmpno() + "-" + new Date().getTime());rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);rabbitTemplate.convertAndSend("springboot-exchange" , "hr.employee" , emp , cd);}}和spring相比springboot使用起來就比較簡單了,只需要在application.properties中進行配置(沒有進行配置的參數,采用默認參數),就可以使用Rabbitmq提供的RabbitTemplate進行操作,上文中的confirmCallback和returnCallback是Rabbitmq消息確認機制里面對Confirm和return進行監聽,通過rabbitTemplate.setConfirmCallback()和rabbitTemplate.setReturnCallback()進行設置。在springboot2.2.0.RELEASE版本之后ReturnCallback和setReturnCallback 已經過時了,建議采用ReturnsCallback和setReturnsCallback(),ReturnsCallback和ReturnCallback的區別就是后者對參數進行了封裝。
ReturnsCallback中returnedMessage的參數為ReturnedMessage
ReturnCallback中returnedMessage的參數為(message, replyCode, replyText, exchange, routingkey)
convertAndSend方法中第四個參數CorrelationData 對象作用是作為消息的附加信息傳遞,通常我們用它來保存消息的自定義id
- 數據對象
Employee 類:傳遞的數據對象,注意需要實現Serializable接口不然會因為無法序列化而產生異常。
package com.itlaoqi.rabbitmq.springboot;import java.io.Serializable;public class Employee implements Serializable{private String empno;private String name;private Integer age;public Employee(String empno, String name, Integer age) {this.empno = empno;this.name = name;this.age = age;}public String getEmpno() {return empno;}public void setEmpno(String empno) {this.empno = empno;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;} }- 發送數據
- 添加一個名為springboot-queue的隊列,并綁定到交換機springboot-exchange上,如果不怎么做的話會進入到return中,如下:
添加后,再次運行:
3.消費者
- application.properties配置
spring.rabbitmq.listener.simple.acknowledge-mode 表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto,下面對各種參數進行了解釋。來源于(https://juejin.cn/post/7029232312197840904)
- NONE : 不確認 :
1、默認所有消息消費成功,會不斷的向消費者推送消息
2、因為 rabbitmq 認為所有消息都被消費成功。所以隊列中存在丟失消息風險。
- AUTO:自動確認
1、根據消息處理邏輯是否拋出異常自動發送 ack(正常)和nack(異常)給服務端,如果消費者本身邏輯沒有處理好這條數據就存在丟失消息的風險。
2、使用自動確認模式時,需要考慮的另一件事情就是消費者過載。
- MANUAL:手動確認
1、手動確認在業務失敗后進行一些操作,消費者調用 ack、nack、reject 幾種方法進行確認,如果消息未被 ACK 則發送到下一個消費者或重回隊列。
2、ack 用于肯定確認;nack 用于 否定確認 ;reject 用于否定確認(一次只能拒絕單條消息)
spring.rabbitmq.listener.simple.concurrency : 最小的消費者數量
spring.rabbitmq.listener.simple.max-concurrency : 最大的消費者數量
來源于:https://blog.csdn.net/ystyaoshengting/article/details/105267542
- 消費者接收類
使用 @RabbitListener 注解用于聲明式定義消息接受的隊列與exhcange綁定的信息,其中 value = @Queue()指向隊列, exchange = @Exchange()指向交換機,key表示條件。
@RabbitListener(bindings = @QueueBinding(value = @Queue(value="springboot-queue" , durable="true"),exchange = @Exchange(value = "springboot-exchange" , durable = "true" , type = "topic") ,key = "#"))使用 @RabbitHandler 注解用于表明該方法為接收消息的方法,運行后將處于等待的狀態,有新的消息進來就會自動觸發下面的方法進行處理,使用 @Payload 注解表示運行時將消息反序列化后注入到后面的參數中, 使用 @Headers 注解表示消息頭的信息,包括描述的輔助信息可以用來進行消息確認。
@RabbitHandler //通知SpringBoot下面的方法用于接收消息。// 這個方法運行后將處于等待的狀態,有新的消息進來就會自動觸發下面的方法處理消息//@Payload 代表運行時將消息反序列化后注入到后面的參數中public void handleMessage(@Payload Employee employee , Channel channel ,@Headers Map<String,Object> headers) {System.out.println("=========================================");System.out.println("接收到" + employee.getEmpno() + ":" + employee.getName());//所有消息處理后必須進行消息的ack,channel.basicAck()Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);try {channel.basicAck(tag , false);} catch (IOException e) {e.printStackTrace();}System.out.println("=========================================");} }headers中的內容:
{amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=springboot-exchange, amqp_deliveryTag=3, amqp_consumerQueue=springboot-queue, amqp_redelivered=false, amqp_receivedRoutingKey=hr.employee, spring_listener_return_correlation=eac10f9e-b1e0-4b5a-92a1-c6aa8ea1f959, spring_returned_message_correlation=3306-1674918565983, id=6e5d0290-77c6-58c3-6fe7-7ed554a4667a, amqp_consumerTag=amq.ctag-mt44wHOpGCeH_e69j7zFwg, amqp_lastInBatch=false, contentType=application/x-java-serialized-object, timestamp=1674918566121}
其中headers里的 spring_returned_message_correlation=3306-1674918565983 就是生產者中CorrelationData的自定義id
4.測試
三.代碼下載
在我的微信公眾號后臺回復 rabbitmq 就可以獲取本篇博文相關的源代碼了,如果有什么疑問后臺給為留言,我看見會第一時間回復你的。
總結
以上是生活随笔為你收集整理的RabbitMQ入门中篇的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 摩托罗拉Android系统,摩托罗拉官方
- 下一篇: 读史鉴今:女人的12种结局