RabbitMQ(一):Hello World程序
內(nèi)容翻譯自:RabbitMQ Tutorials Java版
RabbitMQ(一):Hello World程序
RabbitMQ(二):Work Queues、循環(huán)分發(fā)、消息確認(rèn)、持久化、公平分發(fā)
RabbitMQ(三):Exchange交換器--fanout
RabbitMQ(四):Exchange交換器--direct
RabbitMQ(五):Exchange交換器--topic
RabbitMQ(六):回調(diào)隊(duì)列callback queue、關(guān)聯(lián)標(biāo)識(shí)correlation id、實(shí)現(xiàn)簡(jiǎn)單的RPC系統(tǒng)
RabbitMQ(七):常用方法說(shuō)明 與 學(xué)習(xí)小結(jié)
介紹:
RabbitMQ是一個(gè)消息代理:它接受并轉(zhuǎn)發(fā)消息。你可以把它當(dāng)成一個(gè)郵局:當(dāng)你想郵寄信件的時(shí)候,你會(huì)把信件放在投遞箱中,并確信郵遞員最終會(huì)將信件送到收件人的手里。在這個(gè)例子中,RabbitMQ就相當(dāng)與投遞箱、郵局和郵遞員。
RabbitMQ與郵局的區(qū)別在于:RabbitMQ并不處理紙質(zhì)信件,而是接受、存儲(chǔ)并轉(zhuǎn)發(fā)二進(jìn)制數(shù)據(jù)---消息。
談到RabbitMQ的消息,通常有幾個(gè)術(shù)語(yǔ):
(1)生產(chǎn)者:是指發(fā)送消息的程序
(2)隊(duì)列:相當(dāng)于RabbitMQ的投遞箱。盡管消息在RabbitMQ和你的應(yīng)用之間傳遞,但是消息僅僅會(huì)在隊(duì)列之中存儲(chǔ)。隊(duì)列只能存儲(chǔ)在內(nèi)存或磁盤中,本質(zhì)上是一個(gè)大的消息緩沖區(qū)。不同的生產(chǎn)者可以發(fā)送消息到同一個(gè)對(duì)隊(duì)列,不同的消費(fèi)者也可以從同一個(gè)隊(duì)列中獲取消息。
(3)消費(fèi)者:等待接受消息的程序。
注意,生產(chǎn)者、消費(fèi)者以及RabbitMQ并不一定要在同一個(gè)主機(jī)上,在絕大部分的應(yīng)用中它們都不在同一主機(jī)上。
在開始教程之前,請(qǐng)確保:你已經(jīng)安裝了RabbitMQ,并且在localhost上運(yùn)行起來(lái)(默認(rèn)端口5672)。如果你使用了不同的主機(jī)或端口,請(qǐng)?jiān)谙挛闹械倪B接設(shè)置中
更改相應(yīng)的參數(shù)。
一、Hello World:
在這一部分,我們將會(huì)使用Java編寫兩個(gè)小程序:一個(gè)發(fā)送單個(gè)消息的生產(chǎn)者、一個(gè)接受消息并打印出消息的消費(fèi)者。這個(gè)消息就是Hello World。
下圖中,P代表生產(chǎn)者,C代表消費(fèi)者,中間紅色的小箱子就代表隊(duì)列--RabbitMQ為了讓消費(fèi)者收到消息而保持的消息緩沖區(qū)。
在這一部分,只需要引入Java客戶端依賴即可:amqp-client.jar,也可以通過(guò)maven的方式引入:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>4.1.0</version> </dependency>1、生產(chǎn)者:
我們將消息的發(fā)布者(生產(chǎn)者)命名為Send,將消息的消費(fèi)者命名為Recv。發(fā)布者將會(huì)連接到RabbitMQ,并且發(fā)送一條消息,然后退出。
在Send.java中,首先引入相關(guān)類:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;再定義隊(duì)列的名字:
private final static String QUEUE_NAME = "hello";然后,創(chuàng)建一個(gè)連接到Rabbit服務(wù)器的連接:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();上面的代碼中,connection是socket連接的抽象,為我們處理了通信協(xié)議版本協(xié)商以及認(rèn)證等。這樣,我們就連接到了本地機(jī)器上的一個(gè)消息代理(broker)。如果想連接到其他機(jī)器上的broker,只要修改IP即可。
之后,我們又創(chuàng)建了一個(gè)通道(channel),大部分的API操作均在這里完成。
對(duì)于Send來(lái)說(shuō),必須指明消息要發(fā)到哪個(gè)隊(duì)列:
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");隊(duì)列的定義是冪等的,它僅僅在不存在時(shí)才會(huì)創(chuàng)建。消息的內(nèi)容是一個(gè)字節(jié)數(shù)組,所以你可以隨意編碼(encode)。
最后,必須將通道和連接關(guān)閉。
channel.close(); connection.close();完整代碼
//引入相關(guān)Class文件 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Send {//定義隊(duì)列名字private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//創(chuàng)建連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//為通道指明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";//發(fā)布消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");//關(guān)閉連接channel.close();connection.close();} }2、接收者(消費(fèi)者):
消費(fèi)者從RabbitMQ中取出消息。不同于發(fā)布者只發(fā)送一條消息就退出,這里我們讓消費(fèi)者一直監(jiān)聽消息,并把接受到的消息打印出來(lái)。
與Send.java類似,首先引入相關(guān)類:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;上面引入的DefaultConsumer是Consumer接口的實(shí)現(xiàn)類,我們使用它來(lái)緩沖從服務(wù)器push來(lái)的消息。
接下來(lái)的設(shè)置與發(fā)布者類似,打開連接和通道,聲明我們想消費(fèi)的隊(duì)列。注意,這里的隊(duì)列的名字要與發(fā)布者中聲明的隊(duì)列的名字一致。
注意,消費(fèi)者同樣聲明了隊(duì)列。這是因?yàn)?#xff0c;我們可能在啟動(dòng)生產(chǎn)者之前啟動(dòng)了消費(fèi)者應(yīng)用,我們想確保在從一個(gè)隊(duì)列消費(fèi)消息之前,這個(gè)隊(duì)列是存在的。
接下來(lái),告訴服務(wù)器(RabbitMQ)把隊(duì)列中的消息發(fā)過(guò)來(lái)。因?yàn)檫@個(gè)過(guò)程是異步的,可以通過(guò)DefaultConsumer來(lái)進(jìn)行回調(diào)。
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");} }; channel.basicConsume(QUEUE_NAME, true, consumer);Consumer的完整代碼如下:
package com.maxwell.rabbitdemo;import com.rabbitmq.client.*;import java.io.IOException;public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//建立連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明要消費(fèi)的隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//回調(diào)消費(fèi)消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(QUEUE_NAME, true, consumer);} }這樣,消費(fèi)者就會(huì)一直監(jiān)聽聲明的隊(duì)列。運(yùn)行一次生產(chǎn)者(即Send.java中的main方法),消費(fèi)者就會(huì)打印出接受到的消息。
?
說(shuō)明:
①與原文略有出入,如有疑問(wèn),請(qǐng)參考原文。
②RabbitMQ的官方rabbitmq-tutorials的java示例中,amqp-client版本為3.5,我改為了4.1,否則后續(xù)的示例教程中會(huì)報(bào)錯(cuò)說(shuō)找不到文件。
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ(一):Hello World程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Java虚拟机:深入详细分析Java C
- 下一篇: RabbitMQ(二):Work Que