日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ(一):Hello World程序

發(fā)布時間:2024/9/30 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ(一):Hello World程序 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

內(nèi)容翻譯自:RabbitMQ Tutorials Java版


RabbitMQ(一):Hello World程序

RabbitMQ(二):Work Queues、循環(huán)分發(fā)、消息確認、持久化、公平分發(fā)

RabbitMQ(三):Exchange交換器--fanout

RabbitMQ(四):Exchange交換器--direct

RabbitMQ(五):Exchange交換器--topic

RabbitMQ(六):回調(diào)隊列callback queue、關聯(lián)標識correlation id、實現(xiàn)簡單的RPC系統(tǒng)

RabbitMQ(七):常用方法說明 與 學習小結(jié)


介紹:

RabbitMQ是一個消息代理:它接受并轉(zhuǎn)發(fā)消息。你可以把它當成一個郵局:當你想郵寄信件的時候,你會把信件放在投遞箱中,并確信郵遞員最終會將信件送到收件人的手里。在這個例子中,RabbitMQ就相當與投遞箱、郵局和郵遞員。

RabbitMQ與郵局的區(qū)別在于:RabbitMQ并不處理紙質(zhì)信件,而是接受、存儲并轉(zhuǎn)發(fā)二進制數(shù)據(jù)---消息。

談到RabbitMQ的消息,通常有幾個術語:

(1)生產(chǎn)者:是指發(fā)送消息的程序

(2)隊列:相當于RabbitMQ的投遞箱。盡管消息在RabbitMQ和你的應用之間傳遞,但是消息僅僅會在隊列之中存儲。隊列只能存儲在內(nèi)存或磁盤中,本質(zhì)上是一個大的消息緩沖區(qū)。不同的生產(chǎn)者可以發(fā)送消息到同一個對隊列,不同的消費者也可以從同一個隊列中獲取消息。

(3)消費者:等待接受消息的程序。

注意,生產(chǎn)者、消費者以及RabbitMQ并不一定要在同一個主機上,在絕大部分的應用中它們都不在同一主機上。

在開始教程之前,請確保:你已經(jīng)安裝了RabbitMQ,并且在localhost上運行起來(默認端口5672)。如果你使用了不同的主機或端口,請在下文中的連接設置中
更改相應的參數(shù)。


一、Hello World:

在這一部分,我們將會使用Java編寫兩個小程序:一個發(fā)送單個消息的生產(chǎn)者、一個接受消息并打印出消息的消費者。這個消息就是Hello World。

下圖中,P代表生產(chǎn)者,C代表消費者,中間紅色的小箱子就代表隊列--RabbitMQ為了讓消費者收到消息而保持的消息緩沖區(qū)。

在這一部分,只需要引入Java客戶端依賴即可:amqp-client.jar,也可以通過maven的方式引入:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>4.1.0</version> </dependency>

1、生產(chǎn)者:

我們將消息的發(fā)布者(生產(chǎn)者)命名為Send,將消息的消費者命名為Recv。發(fā)布者將會連接到RabbitMQ,并且發(fā)送一條消息,然后退出。

在Send.java中,首先引入相關類:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;

再定義隊列的名字:

private final static String QUEUE_NAME = "hello";

然后,創(chuàng)建一個連接到Rabbit服務器的連接:

ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();

上面的代碼中,connection是socket連接的抽象,為我們處理了通信協(xié)議版本協(xié)商以及認證等。這樣,我們就連接到了本地機器上的一個消息代理(broker)。如果想連接到其他機器上的broker,只要修改IP即可。

之后,我們又創(chuàng)建了一個通道(channel),大部分的API操作均在這里完成。

對于Send來說,必須指明消息要發(fā)到哪個隊列:

channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");

隊列的定義是冪等的,它僅僅在不存在時才會創(chuàng)建。消息的內(nèi)容是一個字節(jié)數(shù)組,所以你可以隨意編碼(encode)。

最后,必須將通道和連接關閉。

channel.close(); connection.close();

完整代碼

//引入相關Class文件 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Send {//定義隊列名字private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//創(chuàng)建連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//為通道指明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";//發(fā)布消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");//關閉連接channel.close();connection.close();} }

2、接收者(消費者):

消費者從RabbitMQ中取出消息。不同于發(fā)布者只發(fā)送一條消息就退出,這里我們讓消費者一直監(jiān)聽消息,并把接受到的消息打印出來。

與Send.java類似,首先引入相關類:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;

上面引入的DefaultConsumer是Consumer接口的實現(xiàn)類,我們使用它來緩沖從服務器push來的消息。
接下來的設置與發(fā)布者類似,打開連接和通道,聲明我們想消費的隊列。注意,這里的隊列的名字要與發(fā)布者中聲明的隊列的名字一致。

public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] argv)throws java.io.IOException,java.lang.InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, fasle, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");...} }

注意,消費者同樣聲明了隊列。這是因為,我們可能在啟動生產(chǎn)者之前啟動了消費者應用,我們想確保在從一個隊列消費消息之前,這個隊列是存在的。

接下來,告訴服務器(RabbitMQ)把隊列中的消息發(fā)過來。因為這個過程是異步的,可以通過DefaultConsumer來進行回調(diào)。

Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");} }; channel.basicConsume(QUEUE_NAME, true, consumer);

Consumer的完整代碼如下:

package com.maxwell.rabbitdemo;import com.rabbitmq.client.*;import java.io.IOException;public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//建立連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明要消費的隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//回調(diào)消費消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(QUEUE_NAME, true, consumer);} }

這樣,消費者就會一直監(jiān)聽聲明的隊列。運行一次生產(chǎn)者(即Send.java中的main方法),消費者就會打印出接受到的消息。

?


說明:

①與原文略有出入,如有疑問,請參考原文。

②RabbitMQ的官方rabbitmq-tutorials的java示例中,amqp-client版本為3.5,我改為了4.1,否則后續(xù)的示例教程中會報錯說找不到文件。

總結(jié)

以上是生活随笔為你收集整理的RabbitMQ(一):Hello World程序的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。