RabbitMQ消息持久化处理
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ消息持久化处理
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
我們來看一下RabbitMQ的消息處理,我們先來看第一個(gè)知識點(diǎn),關(guān)于RabbitMQ持久化的消息處理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保證消息的可靠性的呢,就是靠他的消息持久化,那么什么是消息持久化處理呢,在這里我給大家做一個(gè)簡單的介紹,比如我現(xiàn)在的消息提供者,不停的向我的消息消費(fèi)者發(fā)送消息,在某一個(gè)時(shí)間段內(nèi),或者在某一個(gè)時(shí)間點(diǎn)上,突然我的消息消費(fèi)者宕機(jī)了,然后經(jīng)過搶修以后,這個(gè)服務(wù)又立即啟動了,那么就在宕機(jī)和啟動的時(shí)間軸上,這個(gè)時(shí)候的消息,時(shí)間段所發(fā)出的消息,那么對于消費(fèi)者來講,對于消息消費(fèi)者來講,他是否還能夠接收得到這個(gè)消息了呢,答案肯定是可以接收的到的,最主要是依賴于Rabbit消息持久化的技術(shù),接下來我們就通過編寫這樣一個(gè)案例,來展示消息持久化的一個(gè)體現(xiàn)
首先我們?nèi)?chuàng)建一個(gè)Provider,創(chuàng)建一個(gè)Consumer,我們就拿direct交換器來作為案例交換器,你用fanout,topic都可以,我就那direct,我就先去做一個(gè)Provider,在這里我們給他起一個(gè)名字,在項(xiàng)目名當(dāng)中,我們加一個(gè)durable,表示是一個(gè)持久的,rabbitmq-durable-direct-provider然后我們再去創(chuàng)建一個(gè)Consumerrabbitmq-durable-direct-consumer兩個(gè)項(xiàng)目創(chuàng)建好以后呢,我們先去修改一下他的pom文件,在pom文件當(dāng)中呢, 把a(bǔ)rtifactId和name,做一個(gè)更改,這樣我們兩個(gè)項(xiàng)目就創(chuàng)建好了,創(chuàng)建項(xiàng)目,我們先看我們拷貝過來的項(xiàng)目,在這里我們兩個(gè)消息的接收者,一個(gè)是ErrorReceiver,一個(gè)是InfoReceiver,然后他們都是根據(jù)自己的路由器,決定從哪個(gè)隊(duì)列里去取消息,然后我們?nèi)タ聪⒌奶峁┱?Sender,現(xiàn)在我們用的是error的路由key@Value("${mq.config.queue.error.routing.key}")
private String routingkey;我們看配置文件mq.config.queue.error.routing.key=log.error.routing.key那么按照這個(gè)的發(fā)現(xiàn)規(guī)則來看呢,我們應(yīng)該是ErrorReceiver,接收到消息,然后我們再看一下測試代碼,測試代碼當(dāng)中呢,我們這里有一個(gè)while死循環(huán),然后里面休眠了一秒,一秒發(fā)送一條消息,這個(gè)時(shí)候應(yīng)該會被誰接收呢,被我們的ErrorReceiver接收,我們發(fā)送是有他來發(fā)送的,代碼的一個(gè)整體結(jié)構(gòu),現(xiàn)在我在這里做一個(gè)代碼的修改,我首先讓他休眠兩秒,讓他慢一點(diǎn),這樣看的清楚一點(diǎn),然后我在外面設(shè)置一個(gè)int的變量,是0,我在這里flag++,加到我們的消息當(dāng)中,什么意思呢,給我們的消息加一個(gè)編號,這樣知道是第幾條消息,然后我們再打開我們的管理頁面59.110.158.145:15672大家還記得這個(gè)管理頁面吧,然后我們看這個(gè)queues,現(xiàn)在這里是沒有任何隊(duì)列的,我們接下來先去啟動消費(fèi)者,消費(fèi)者啟動成功,我們再來啟動消息的發(fā)送者,找到他的測試代碼,我們來運(yùn)行他,這個(gè)時(shí)候就兩秒發(fā)送一次消息,兩秒發(fā)送一次消息,這個(gè)時(shí)候ErrorReceiver已經(jīng)接受消息了,兩秒鐘發(fā)送一次,兩秒鐘接收一次,然后我們再看管理頁面,這個(gè)時(shí)候queues就直接有變化了,這個(gè)時(shí)候是不是出現(xiàn)了兩個(gè)隊(duì)列,一個(gè)好似log.error,一個(gè)是log.info,只不過我們現(xiàn)在發(fā)送消息是log.error,不是log.info,所以現(xiàn)在只有一個(gè)隊(duì)列里有數(shù)據(jù)流,然后注意看他的Features,特征,現(xiàn)在特征是一個(gè)AD,鼠標(biāo)往那里一放,auto-delete:true,那么auto-delete是什么含義呢,別著急,我們一會再來講解他,給大家演示一下消息丟失的現(xiàn)象,怎么辦呢,現(xiàn)在顯示的消費(fèi)者的線程,我把消息消費(fèi)者關(guān)掉,你看我在哪里關(guān),點(diǎn),然后我們的消息在第51條停止了,宕機(jī)了,消費(fèi)者宕機(jī)了,然后我的發(fā)送者這一側(cè),由于它是在一個(gè)死循環(huán)里,所以消息還是源源不斷的在發(fā)送,我們這個(gè)時(shí)候再點(diǎn)queues,這里就已經(jīng)動態(tài)更新了是不是又變成了no queues的動態(tài)了
現(xiàn)在沒有隊(duì)列了,你那個(gè)原來的log.error,log.info,隊(duì)列都沒有了,然后現(xiàn)在再看,我再把消費(fèi)者啟動,注意我們是在51條停的,運(yùn)行走,這個(gè)時(shí)候我們就接收到消息了,第208條,這里這個(gè)隊(duì)列就出來了,那么也就是說,從51到208之間的消息,就沒有了,丟失了,這就是一個(gè)典型的消息丟失現(xiàn)象,那我們現(xiàn)在之所以會出現(xiàn)丟失消息的原因是什么呢,關(guān)鍵在這,看我們的Error Receiver,我們現(xiàn)在在設(shè)置這個(gè)Queue的時(shí)候,我們就是用了一個(gè)@Queue的一個(gè)注解,這里有一個(gè)autoDelete屬性,然后我們給的是true,然后當(dāng)時(shí)我們給的屬性的時(shí)候,你就可以理解為一個(gè)臨時(shí)隊(duì)列,那么什么叫臨時(shí)隊(duì)列呢,如果我們在Queue里,autoDelete除了可以在@Queue的注解當(dāng)中,在@Exchange這里也有一個(gè)autoDelete,也有這個(gè)屬性,他里面賦的值也是true或者false,那么我們就把a(bǔ)utoDelete這個(gè)屬性設(shè)置一下,如果我們在@Queue注解上,加了autoDelete,這里是什么含義呢,當(dāng)所有消費(fèi)客戶端,鏈接斷開后,是否自動刪除隊(duì)列,如果設(shè)置成true,表示刪除,如果設(shè)置成false,就表示不刪除,看到了嗎,我們現(xiàn)在只要所有的客戶端,都不再連接這個(gè)隊(duì)列了,這個(gè)隊(duì)列就會被自動的刪除,如果你設(shè)置true的話,如果設(shè)置成false呢,即便是所有的客戶端都斷開連接了,除了@Queue這一塊可以設(shè)置,在@Exchange的注解里,也有autoDelete屬性,如果我們要在這個(gè)注解里設(shè)置了,表示什么含義呢,當(dāng)綁定隊(duì)列都不再使用時(shí),是否自動刪除交換器,同樣的道理,true表示刪除,false表示不刪除,其實(shí)你在哪個(gè)注解里設(shè)置,刪隊(duì)列還是刪交換器,明白這個(gè)意思吧,所以@Queue作為我們講解的重點(diǎn),比如我們要解決丟失消息的問題,怎么辦呢,我們剛才也看到了,其實(shí)之所以會產(chǎn)生消息丟失的原因就是,當(dāng)我們的消費(fèi)者服務(wù)一停掉,那么對于RabbitMQ來講,會自動的把這個(gè)queue刪除掉,所以我們再次啟動的時(shí)候,在另一個(gè)Queue是沒法拿到另一個(gè)消息的,所以我們要解決這個(gè)問題,很簡單,當(dāng)你的消費(fèi)客戶端,斷開連接以后,RabbitMQ也不會刪除這個(gè)Queue,那當(dāng)我再次啟動的時(shí)候,由于這個(gè)Queue還存在,那我們是不是還可以繼續(xù)從這個(gè)Queue里去獲取剛才我沒有獲取的消息,我們再來演示一下,現(xiàn)在我把這個(gè)true改成false@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.error}",autoDelete="false"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),key="${mq.config.queue.error.routing.key}")
)然后我們先去啟動消費(fèi)者,然后再去啟動消息的提供者,我們看這個(gè)時(shí)候消息就來了,我們再去看管理界面,這個(gè)時(shí)候已經(jīng)刷新了,然后注意看這個(gè)Features,由于我們只是在log.error的消息隊(duì)列里,我們只改了這個(gè)的autoDelete,沒有改log.info的,所以log.info還是AD狀態(tài)的,就是當(dāng)客戶端,所有客戶端一斷開以后,而現(xiàn)在在log.error這一塊,他的Feature里面,里面沒有AD了,表示他是一個(gè)持久的隊(duì)列,他并不會隨著你客戶端的關(guān)閉而刪除,我們來看一下是不是這樣的,我們現(xiàn)在還是把服務(wù)停掉,在第28條消息給關(guān)掉了,我們把它清空一下,然后我們的消息提供者還是在不斷地發(fā)送消息,回過來,我們再去請求消費(fèi)者,等一下我們先去看這個(gè)隊(duì)列,看這個(gè)管理界面,是不是log.info沒有了,但是log.error一直存在,而且還有數(shù)據(jù)向隊(duì)列里進(jìn)行輸入,然后注意看這個(gè)ready,這個(gè)Ready是什么呢,其實(shí)就是對于未接收到的數(shù)據(jù)的一個(gè)顯示,也就是說RabbitMQ,在隊(duì)列里存放的消息,如果消息并沒有被消費(fèi)者所消費(fèi),那么他就會給這個(gè)消息加一個(gè)標(biāo)記,表示當(dāng)前這個(gè)消息是未被消費(fèi)的,那我們現(xiàn)在看到的這個(gè)Ready里面,顯示當(dāng)前有多少條消息沒有被消費(fèi),現(xiàn)在已經(jīng)變成27條了,然后我們現(xiàn)在再去啟動消費(fèi)者,運(yùn)行,觀察他的控制臺,看到了嗎,他啟動以后,我們從28條我們停了,從第29條開始讀到消息,一直往后,讀到現(xiàn)在,現(xiàn)在ready是不是沒有了,現(xiàn)在已經(jīng)沒有被消費(fèi)的消息,都被消費(fèi)到了,所以我們解決了丟失消息的現(xiàn)象,只要是你的服務(wù)停了,只要你把a(bǔ)utoDelete設(shè)置成false了,服務(wù)再啟動的時(shí)候,由于隊(duì)列,還存在,所以這些消息我們?nèi)匀豢梢越邮盏?我么之所以接收到的原因就是因?yàn)殛?duì)列存在了,如果我們關(guān)閉以后,我們的消費(fèi)者關(guān)閉以后,其實(shí)發(fā)送者發(fā)送的消息一直在那里存著呢,就是在RabbitMQ服務(wù)器的內(nèi)存當(dāng)中存放著,因?yàn)殛?duì)列也在內(nèi)存當(dāng)中,隊(duì)列里包含了這些數(shù)據(jù),都在內(nèi)存里存放著,但是我的RabbitMQ,整個(gè)服務(wù)都關(guān)掉了,服務(wù)都關(guān)掉了,那隊(duì)列肯定也沒了,消息肯定也沒了,這就是我們采用autoDelete來對知識點(diǎn)的講解,消息持久化的內(nèi)容就到這
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>rabbitmq-durable-direct-consumer</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.12.RELEASE</version><relativePath/> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><!-- 這個(gè)插件,可以將應(yīng)用打包成一個(gè)可執(zhí)行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
spring.application.name=rabbitmq-durable-direct-consumerspring.rabbitmq.host=59.110.158.145
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
mq.config.exchange=log.direct
mq.config.queue.info=log.info
mq.config.queue.info.routing.key=log.info.routing.key
mq.config.queue.error=log.error
mq.config.queue.error.routing.key=log.error.routing.key
package com.learn;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收者* @author Administrator* @RabbitListener bindings:綁定隊(duì)列* @QueueBinding value:綁定隊(duì)列的名稱* exchange:配置交換器* * @Queue value:配置隊(duì)列名稱* autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列* * @Exchange value:為交換器起個(gè)名稱* type:指定具體的交換器類型*/
@Component
@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.error}",autoDelete="false"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),key="${mq.config.queue.error.routing.key}"))
public class ErrorReceiver {/*** 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("Error..........receiver: "+msg);}
}
package com.learn;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收者* @author Administrator* @RabbitListener bindings:綁定隊(duì)列* @QueueBinding value:綁定隊(duì)列的名稱* exchange:配置交換器* * @Queue value:配置隊(duì)列名稱* autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列* * @Exchange value:為交換器起個(gè)名稱* type:指定具體的交換器類型*/
@Component
@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),key="${mq.config.queue.info.routing.key}"))
public class InfoReceiver {/*** 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("Info........receiver: "+msg);}
}
package com.learn;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitDurableDirectConsumerApplication {public static void main(String[] args) {SpringApplication.run(RabbitDurableDirectConsumerApplication.class, args);}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>rabbitmq-durable-direct-provider</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.12.RELEASE</version><relativePath/> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><!-- 這個(gè)插件,可以將應(yīng)用打包成一個(gè)可執(zhí)行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
spring.application.name=rabbitmq-durable-direct-providerspring.rabbitmq.host=59.110.158.145
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guestmq.config.exchange=log.direct
mq.config.queue.info.routing.key=log.info.routing.key
mq.config.queue.error.routing.key=log.error.routing.key
mq.config.queue.error=log.error
package com.learn;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** 消息發(fā)送者* @author Administrator**/
@Component
public class Sender {@Autowiredprivate AmqpTemplate rabbitAmqpTemplate;//exchange 交換器名稱@Value("${mq.config.exchange}")private String exchange;//routingkey 路由鍵@Value("${mq.config.queue.error.routing.key}")private String routingkey;/** 發(fā)送消息的方法*/public void send(String msg){//向消息隊(duì)列發(fā)送消息//參數(shù)一:交換器名稱。//參數(shù)二:路由鍵//參數(shù)三:消息this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);}
}
package com.learn;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitDurableDirectProviderApplication {public static void main(String[] args) {SpringApplication.run(RabbitDurableDirectProviderApplication.class, args);}
}
?
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ消息持久化处理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用RabbitMQ实现松耦合设计
- 下一篇: 什么是服务注册中心