二.java下使用RabbitMQ实现hello world
上一篇文章介紹了windows環境下的安裝和配置rabbitMQ,具體戳這邊,一.windows環境下rabbit的的安裝和配置。
現在我們可以著手編寫hello world程序了,一窺RabbitMQ的效用,從rabbitmq的官網的get start進入rabbitMQ文檔學習區,即這個頁面https://www.rabbitmq.com/getstarted.html。
由于網上關于rabbitMQ的中文材料和教程不是很多,所以只好硬著頭皮看官網文檔了。
可以看到官網主要從6個步驟來介紹學習軌跡,并且每個步驟均有多種編程語言的版本。由于本人采用的是java語言,所以就從一個java版本的hello world開始rabbitMQ的學習吧。
一.Introduction(簡介)
1.可以將RabbitMQ理解為一個消息代理,它接收、存儲、和分發數據信息。
2.RabbitMQ主要由三個元素組成,producer(生產者),隊列(queue),和消費者(Consumer).
3.生產者生產消息,隊列存儲消息,消費者接收消息。他們之間的關系是多對多的,即多個生產者可以向一個隊列中存放消息,多個消費者可以從一個隊列中獲取消息。
4.值得注意的是,RabbitMQ代理器和生產者、消費者并不需要在同一個服務器上,他們可以是分布式的。
?
二.hello world
現在我們可以進入正題,用RabbitMQ來寫一個hello world 的demo,以對RabbitMQ這個中間件有個直觀的認識。
在這個demo中,我們將編寫兩個類,一個是生產者類,一個是消費者類,生產者類負責發送一個簡單的message,而消費者類負責接收這個消息并且打印出來。
1.首先添加maven依賴包,如下。
<!-- rabbitMQ --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>4.0.2</version></dependency>2.新建Send類,如下所示。
package com.xdx.learn;import java.io.IOException; import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Send {private final static String QUEUE_NAME="hello";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.1.195");Connection connection=factory.newConnection();Channel channel=connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message="hello world";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Sent '"+message+"'");channel.close();connection.close();}}
運行上述代碼,報錯如下。
這是因為我照抄官網的代碼,官網的demo是基于本地的連接,而我是遠程連接,所以必須顯式地指定連接端口,用戶名,密碼之類的信息,修改上述代碼,修改后如下。
import java.io.IOException; import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Send {private final static String QUEUE_NAME="hello";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.1.195");//服務器ipfactory.setPort(5672);//端口factory.setUsername("xdx");//登錄名factory.setPassword("xxxxxx");//密碼Connection connection=factory.newConnection();Channel channel=connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message="hello world";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Sent '"+message+"'");channel.close();connection.close();} }
然后再運行,這次可以運行成功了。
然后我們去RabbitMQ的管理后臺,就可以看到隊列中有一個queue了,名字就叫做hello。如下圖所示。
如果我再執行以下剛才那段代碼,就會發現messages的數量又多了一個,如下所示。
?
3.接下來是Recv.java類,用于接收消息,不同意發布消息的類,接收消息的類必須一直保持運行的狀態,以便監聽消息的到來。
package com.xdx.learn;import java.io.IOException; import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope;public class Recv {private final static String QUEUE_NAME="hello";public static void main(String[] args) throws IOException, TimeoutException {//下面的配置與生產者相對應ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.1.195");//服務器ipfactory.setPort(5672);//端口factory.setUsername("xdx");//登錄名factory.setPassword("xxxx");//密碼Connection connection=factory.newConnection();//連接Channel channel=connection.createChannel();//頻道channel.queueDeclare(QUEUE_NAME, false, false, false, null);//隊列System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//defaultConsumer實現了Consumer,我們將使用它來緩存生產者發送過來儲存在隊列中的消息。當我們可以接收消息的時候,從中獲取。Consumer consumer=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};//接收到消息以后,推送給RabbitMQ,確認收到了消息。channel.basicConsume(QUEUE_NAME, true, consumer);}}
運行結果如下:
此時我們再去RabbitMQ的控制臺查看,發現hello隊列中已經沒有message了。
? 注意到消費者的代碼,有一個實現了DefaultConsumer接口的Consumer對象,去查看Consumer的源碼,我們可以知道它的handleDelivery方法被一個一直存在的線程(該線程不是Connection所在的線程)調用,當有消息的時候,就會被執行。
以上就是一個簡單的生產者和消費者的例子,其實RabbitMQ在這個過程中充當了一個消息存儲器的角色,它負責接收,分配消息,而發送,接收消息的工作由我們編程來實現。經過這個例子,我們對RabbitMQ有了一個直觀的簡單的理解。更多的細節將在下面的文章中來學習。
轉載于:https://www.cnblogs.com/roy-blog/p/8023791.html
總結
以上是生活随笔為你收集整理的二.java下使用RabbitMQ实现hello world的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 打包下载文件_java下载打包
- 下一篇: 修改java启动参数_如何修改jvm启动