java连接rabbitmq_没用过消息队列?一文带你体验RabbitMQ收发消息
楔子
先給大家說聲抱歉,最近一周都沒有發(fā)文,有一些比較要緊重要的事需要處理。
今天正好得空,本來說準(zhǔn)備寫SpringIOC相關(guān)的東西,但是發(fā)現(xiàn)想要梳理一遍,還是需要很多時間,所以我打算慢慢寫,先把MQ給寫了,再慢慢寫其他相關(guān)的,畢竟偏理論的東西一遍要比較難寫,像MQ這種偏實(shí)戰(zhàn)的大家可以clone代碼去玩一玩,還是比較方便的。
同時MQ也是Java進(jìn)階不必可少的技術(shù)棧之一,所以Java開發(fā)從業(yè)者對它是必須要了解的。
現(xiàn)在市面上有三種消息隊(duì)列比較火分別是:RabbitMQ,RocketMQ和Kafka。
今天要講的消息隊(duì)列中我會以RabbitMQ作為案例來入門,因?yàn)镾pringBoot的amqp中默認(rèn)只集成了RabbitMQ,用它來講會方便許多,且RabbitMQ的性能和穩(wěn)定性都很不錯,是一款經(jīng)過時間考驗(yàn)的開源組件。
祝有好收獲。
本文代碼: 碼云地址?GitHub地址
1. 消息隊(duì)列?
消息隊(duì)列(MQ)全稱為Message Queue,是一種應(yīng)用程序?qū)?yīng)用程序的通信方法。
翻譯一下就是:在應(yīng)用之間放一個消息組件,然后應(yīng)用雙方通過這個消息組件進(jìn)行通信。
好端端的為啥要在中間放個組件呢?
小系統(tǒng)其實(shí)是用不到消息隊(duì)列的,一般分布式系統(tǒng)才會引入消息隊(duì)列,因?yàn)榉植际较到y(tǒng)需要抗住高并發(fā),需要多系統(tǒng)解耦,更需要對用戶比較友好的響應(yīng)速度,而消息隊(duì)列的特性可以天然解耦,方便異步更能起到一個頂住高并發(fā)的削峰作用,完美解決上面的三個問題。
然萬物抱陽負(fù)陰,系統(tǒng)之間突然加了個中間件,提高系統(tǒng)復(fù)雜度的同時也增加了很多問題:
- 消息丟失怎么辦?
- 消息重復(fù)消費(fèi)怎么辦?
- 某些任務(wù)需要消息的順序消息,順序消費(fèi)怎么保證?
- 消息隊(duì)列組件的可用性如何保證?
這些都是使用消息隊(duì)列過程中需要思考需要考慮的地方,消息隊(duì)列能給你帶來很大的便利,也能給你帶來一些對應(yīng)的麻煩。
上面說了消息隊(duì)列帶來的好處以及問題,而這些不在我們今天這篇的討論范圍之內(nèi),我打算之后再寫這些,我們今天要做的是搭建出一個消息隊(duì)列環(huán)境,讓大家感受一下基礎(chǔ)的發(fā)消息與消費(fèi)消息,更高級的問題會放在以后討論。
2. RabbitMQ一覽
RabbitMQ是一個消息組件,是一個erlang開發(fā)的AMQP(Advanced Message Queue)的開源實(shí)現(xiàn)。
AMQP,即Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。
RabbitMQ采用了AMQP協(xié)議,至于這協(xié)議怎么怎么樣,我們關(guān)心的是RabbitMQ結(jié)構(gòu)如何且怎么用。
還是那句話,學(xué)東西需要先觀其大貌,我們要用RabbitMQ首先要知道它整體是怎么樣,這樣才有利于我們接下來的學(xué)習(xí)。
我們先來看看我剛畫的架構(gòu)圖,因?yàn)镽abbitMQ實(shí)現(xiàn)了AMQP協(xié)議,所以這些概念也是AMQP中共有的。
- Broker: 中間件本身。接收和分發(fā)消息的應(yīng)用,這里指的就是RabbitMQ Server。
- Virtual host: 虛擬主機(jī)。出于多租戶和安全因素設(shè)計(jì)的,把AMQP的基本組件劃分到一個虛擬的分組中,類似于網(wǎng)絡(luò)中的namespace概念。當(dāng)多個不同的用戶使用同一個RabbitMQ server提供的服務(wù)時,可以劃分出多個vhost,每個用戶在自己的vhost創(chuàng)建exchange/queue等。
- Connection: 連接。publisher/consumer和broker之間的TCP連接。斷開連接的操作只會在client端進(jìn)行,Broker不會斷開連接,除非出現(xiàn)網(wǎng)絡(luò)故障或broker服務(wù)出現(xiàn)問題。
- Channel: 渠道。如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷會比較大且效率也較低。Channel是在connection內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個thread創(chuàng)建單獨(dú)的channel進(jìn)行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷。
- Exchange: 路由。根據(jù)分發(fā)規(guī)則,匹配查詢表中的routing key,分發(fā)消息到queue中去。
- Queue: 消息的隊(duì)列。消息最終被送到這里等待消費(fèi),一個message可以被同時拷貝到多個queue中。
- Binding: 綁定。exchange和queue之間的虛擬連接,binding中可以包含routing key。Binding信息被保存到exchange中的查詢表中,用于message的分發(fā)依據(jù)。
看完了這些概念,我再給大家梳理一遍其流程:
當(dāng)我們的生產(chǎn)者端往Broker(RabbitMQ)中發(fā)送了一條消息,Broker會根據(jù)其消息的標(biāo)識送往不同的Virtual host,然后Exchange會根據(jù)消息的路由key和交換器類型將消息分發(fā)到自己所屬的Queue中去。
然后消費(fèi)者端會通過Connection中的Channel獲取剛剛推送的消息,拉取消息進(jìn)行消費(fèi)。
Tip:某個Exchange有哪些屬于自己的Queue,是由Binding綁定關(guān)系決定的。
3. RabbitMQ環(huán)境
上面講了RabbitMQ大概的結(jié)構(gòu)圖和一個消息的運(yùn)行流程,講完了理論,這里我們就準(zhǔn)備實(shí)操一下吧,先進(jìn)行RabbitMQ安裝。
官網(wǎng)下載地址:www.rabbitmq.com/download.ht…
由于我還沒有屬于自己MAC電腦,所以這里的演示就按照Windows的來了,不過大家都是程序員,安裝個東西總歸是難不倒大家的吧
Windows下載地址:www.rabbitmq.com/install-win…
進(jìn)去之后可以直接找到Direct Downloads,下載相關(guān)EXE程序進(jìn)行安裝就可以了。
由于RabbitMQ是由erlang語言編寫的,所以安裝之前我們還需要安裝erlang環(huán)境,你下載RabbitMQ之后直接點(diǎn)擊安裝,如果沒有相關(guān)環(huán)境,安裝程序會提示你,然后會讓你的瀏覽器打開erlang的下載頁面,在這個頁面上根據(jù)自己的系統(tǒng)類型點(diǎn)擊下載安裝即可,安裝完畢后再去安裝RabbitMQ。
這兩者的安裝都只需要一直NEXT下一步就可以了。
安裝完成之后可以按一下Windows鍵看到效果如下:
Tip:其中Rabbit-Command后面會用到,是RabbitMQ的命令行操作臺。
安裝完RabbitMQ我們需要對我們的開發(fā)環(huán)境也導(dǎo)入RabbitMQ相關(guān)的JAR包。
為了方便起見,我們可以直接使用Spring-boot-start的方式導(dǎo)入,這里面也會包含所有我們需要用到的RabbitMQ相關(guān)的JAR包。
org.springframework.boot spring-boot-starter-amqp直接引入spring-boot-starter-amqp即可。
4. ?Hello World
搭建好環(huán)境之后,我們就可以上手了。
考慮到這是一個入門文章,讀者很多可能沒有接觸過RabbitMQ,直接使用自動配置的方式可能會令大家很迷惑,因?yàn)樽詣优渲脮帘魏芏嗉?xì)節(jié),導(dǎo)致大家只看到了被封裝后的樣子,不利于大家理解。
所以在本節(jié)Hello World這里,我會直接使用最原始的連接方式進(jìn)行演示,讓大家看到最原始的連接的樣子。
Tip:這種方式演示的代碼我都在放在prototype包下面。
4.1 生產(chǎn)者
先來看看生產(chǎn)者代碼,也就是我們push消息的代碼:
public static final String QUEUE_NAME = "erduo"; // 創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 連接到本地server connectionFactory.setHost("127.0.0.1"); // 通過連接工廠創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 通過連接創(chuàng)建通道 Channel channel = connection.createChannel(); // 創(chuàng)建一個名為耳朵的隊(duì)列,該隊(duì)列非持久(RabbitMQ重啟后會消失)、非獨(dú)占(非僅用于此鏈接)、非自動刪除(服務(wù)器將不再使用的隊(duì)列刪除) channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello, 我是耳朵。" + LocalDateTime.now().toString(); // 發(fā)布消息 // 四個參數(shù)為:指定路由器,指定key,指定參數(shù),和二進(jìn)制數(shù)據(jù)內(nèi)容 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("生產(chǎn)者發(fā)送消息結(jié)束,發(fā)送內(nèi)容為:" + msg); channel.close(); connection.close();代碼我都給了注釋,但是我還是要給大家講解一遍,梳理一下。
先通過RabbitMQ中的ConnectionFactory配置一下將要連接的server-host,然后創(chuàng)建一個新連接,再通過此連接創(chuàng)建通道(Channel),通過這個通道創(chuàng)建隊(duì)列和發(fā)送消息。
這里看上去還是很好理解的,我需要把創(chuàng)建隊(duì)列和發(fā)送消息這里再拎出來說一下。
創(chuàng)建隊(duì)列
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;創(chuàng)建隊(duì)列的方法里面有五個參數(shù),第一個是參數(shù)是隊(duì)列的名稱,往后的三個參數(shù)代表不同的配置,最后一個參數(shù)是額外參數(shù)。
- durable:代表是否將此隊(duì)列持久化。
- exclusive:代表是否獨(dú)占,如果設(shè)置為獨(dú)占隊(duì)列則此隊(duì)列僅對首次聲明它的連接可見,并在連接斷開時自動刪除。
- autoDelete:代表斷開連接后是否自動刪除此隊(duì)列。
- arguments:代表其他額外參數(shù)。
這些參數(shù)中durable經(jīng)常會用到,它代表了我們可以對隊(duì)列做持久化,以保證RabbitMQ宕機(jī)恢復(fù)后此隊(duì)列也可以自行恢復(fù)。
發(fā)送消息
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;發(fā)送消息的方法里是四個參數(shù),第一個是必須的指定exchange,上面的示例代碼中我們傳入了一個空字符串,這代表我們交由默認(rèn)的匿名exchange去幫我們路由消息。
第二個參數(shù)是路由key,exchange會根據(jù)此key對消息進(jìn)行路由轉(zhuǎn)發(fā),第三個參數(shù)是額外參數(shù),講消息持久化時會用到一下,最后一個參數(shù)就是我們要發(fā)送的數(shù)據(jù)了,需要將我們的數(shù)據(jù)轉(zhuǎn)成字節(jié)數(shù)組的方式傳入。
測試
講完了這些API之后,我們可以測試一下我們的代碼了,run一下之后,會在控制臺打出如下:
這樣之后我們就把消息發(fā)送到了RabbitMQ中去,此時可以打開RabbitMQ控制臺(前文安裝時提到過)去使用命令rabbitmqctl.bat list_queues去查看消息隊(duì)列現(xiàn)在的情況:
可以看到有一條message在里面,這就代表我們的消息已經(jīng)發(fā)送成功了,接下來我們可以編寫一個消費(fèi)者對里面的message進(jìn)行消費(fèi)了。
4.2 消費(fèi)者
消費(fèi)者代碼和生產(chǎn)者的差不多,都需要建立連接建立通道:
// 創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 連接到本地server connectionFactory.setHost("127.0.0.1"); // 通過連接工廠創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 通過連接創(chuàng)建通道 Channel channel = connection.createChannel(); // 創(chuàng)建消費(fèi)者,阻塞接收消息 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("-------------------------------------------"); System.out.println("consumerTag : " + consumerTag); System.out.println("exchangeName : " + envelope.getExchange()); System.out.println("routingKey : " + envelope.getRoutingKey()); String msg = new String(body, StandardCharsets.UTF_8); System.out.println("消息內(nèi)容 : " + msg); } }; // 啟動消費(fèi)者消費(fèi)指定隊(duì)列 channel.basicConsume(Producer.QUEUE_NAME, consumer);// channel.close();// connection.close();建立完通道之后,我們需要創(chuàng)建一個消費(fèi)者對象,然后用這個消費(fèi)者對象去消費(fèi)指定隊(duì)列中的消息。
這個示例中我們就是新建了一個consumer,然后用它去消費(fèi)隊(duì)列-erduo中的消息。
最后兩句代碼我給注釋掉了,因?yàn)橐坏┌堰B接也關(guān)閉了,那我們的消費(fèi)者就不能保持消費(fèi)狀態(tài)了,所以要開著連接,監(jiān)聽此隊(duì)列。
ok,運(yùn)行這段程序,然后我們的消費(fèi)者會去隊(duì)列-erduo拿到里面的消息,效果如下:
- consumerTag:是這個消息的標(biāo)識。
- exchangeName:是這個消息所發(fā)送exchange的名字,我們先前傳入的是空字符串,所以這里也是空字符串。
- exchangeName:是這個消息所發(fā)送路由key。
這樣我們的程序就處在一個監(jiān)聽的狀態(tài)下,你再次調(diào)用生產(chǎn)者發(fā)送消息消費(fèi)者就會實(shí)時的在控制上打印消息內(nèi)容。
5. 消息接收確認(rèn)(ACK)
上面我們演示了生產(chǎn)者和消費(fèi)者,我們生產(chǎn)者發(fā)送一條消息,消費(fèi)者消費(fèi)一條信息,這個時候我們的RabbitMQ應(yīng)該有多少消息?
理論上來說發(fā)送一條,消費(fèi)一條,現(xiàn)在里面應(yīng)該是0才對,但是現(xiàn)在的情況并不是:
消息隊(duì)列里面還是有1條信息,我們重啟一下消費(fèi)者,又打印了一遍我們消費(fèi)過的那條消息,通過消息上面的時間我們可以看出來還是當(dāng)時我們發(fā)送的那條信息,也就是說我們消費(fèi)者消費(fèi)過了之后這條信息并沒有被刪除。
這種狀況出現(xiàn)的原因是因?yàn)镽abbitMQ消息接收確認(rèn)機(jī)制,也就是說一條信息被消費(fèi)者接收到了之后,需要進(jìn)行一次確認(rèn)操作,這條消息才會被刪除。
RabbitMQ中默認(rèn)消費(fèi)確認(rèn)是手動的,也可以將其設(shè)置為自動刪除,自動刪除模式消費(fèi)者接收到消息之后就會自動刪除這條消息,如果消息處理過程中發(fā)生了異常,這條消息就等于沒被處理完但是也被刪除掉了,所以這里我們會一直使用手動確認(rèn)模式。
消息接受確認(rèn)(ACK)的代碼很簡單,只要在原來消費(fèi)者的代碼里加上一句就可以了:
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("-------------------------------------------"); System.out.println("consumerTag : " + consumerTag); System.out.println("exchangeName : " + envelope.getExchange()); System.out.println("routingKey : " + envelope.getRoutingKey()); String msg = new String(body, StandardCharsets.UTF_8); System.out.println("消息內(nèi)容 : " + msg); // 消息確認(rèn) channel.basicAck(envelope.getDeliveryTag(), false); System.out.println("消息已確認(rèn)"); } };我們將代碼改成如此之后,可以再run一次消費(fèi)者,可以看看效果:
再來看看RabbitMQ中的隊(duì)列情況:
從圖中我們可以看出消息消費(fèi)后已經(jīng)成功被刪除了,其實(shí)大膽猜一猜,自動刪除應(yīng)該是在我們的代碼還沒執(zhí)行之前就幫我們返回了確認(rèn),所以這就導(dǎo)致了消息丟失的可能性。
我們采用手動確認(rèn)的方式之后,可以先將邏輯處理完畢之后(可能出現(xiàn)異常的地方可以try-catch起來),把手動確認(rèn)的代碼放到最后一行,這樣如果出現(xiàn)異常情況導(dǎo)致這條消息沒有被確認(rèn),那么這條消息會在之后被重新消費(fèi)一遍。
后記
今天的內(nèi)容就到這里,下一篇將會我們將會撇棄傳統(tǒng)的手動建立連接的方式進(jìn)行發(fā)消息收消息,而轉(zhuǎn)用Spring幫我們定義好的注解和Spring提供的RabbitTemplate,更方便的收發(fā)消息。
消息隊(duì)列呢,其實(shí)用法都是一樣的,只是各個開源消息隊(duì)列的側(cè)重點(diǎn)稍有不同,我們應(yīng)該根據(jù)我們自己的項(xiàng)目需求來決定我們應(yīng)該選取什么樣的消息隊(duì)列來為我們的項(xiàng)目服務(wù),這個項(xiàng)目選型的工作一般都是開發(fā)組長幫你們做了,一般是輪不到我們來做的,但是面試的時候可能會考察相關(guān)知識,所以這幾種消息隊(duì)列我們都應(yīng)該有所涉獵。
好了,以上就是本期的全部內(nèi)容,感謝你能看到這里,歡迎對本文點(diǎn)贊收藏與評論,你們的每個點(diǎn)贊都是我創(chuàng)作的最大動力。
作者:和耳朵
鏈接:https://juejin.im/post/6856571028496351239
來源:掘金
總結(jié)
以上是生活随笔為你收集整理的java连接rabbitmq_没用过消息队列?一文带你体验RabbitMQ收发消息的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: restful post请求_猿蜕变9—
- 下一篇: 两波形相位差的计算值_正弦交流电的相位差