消息队列入门案例-编码
生活随笔
收集整理的這篇文章主要介紹了
消息队列入门案例-编码
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
我們就來編寫里面的代碼,那么我們應該怎么去編寫這個代碼呢,先把步驟說一下,在使用消息隊列的時候呢,你是不是得有一個隊列,那我們是不是先把隊列給創建出來,然后有消息的發送者,消息的接收者,就是Provider和Consumer,那么我們先來看第一個環節,先去創建一個隊列,那么這個隊列在什么時候創建呢,應該在項目啟動的時候就將項目創建好,先要一個創建隊列的一個類,這樣的一個對象,幫我們把隊列給創建出來,那我們現在用的是SpringBoot,如果沒有用SpringBoot的時候,可以用XML文件去配置bean標簽,然后將創建隊列的這個類,放到bean標簽當中,然后隨著SpringBoot啟動,就可以幫我們把隊列對象實例化出來,那我們用SpringBoot怎么去做呢,大家還有印象,因為SpringBoot里面是沒有XML的,我們可以通過一個class,加上相應的注解,來解決初始化的問題,所以我現在寫一個class,我叫QueueConfig,隊列的一個配置,這個類的作用就是,創建消息隊列,那么這個類是不是要在我們項目啟動時,初始化出來,所以我們要用一個@Configuration的注解,然后接下來,讓他執行這個類的方法,那么這個方法我們要用一個bean的注解,這個方法他返回一個什么呢,返回一個Queue,別導錯包了,我們用的是org.springframework.amqp.core.Queue;然后我們給他取一個名叫createQueue,然后我們return,new一個Queue就可以了,然后我們要給我們的隊列起一個名字,比如叫hello-queue,這個方法的作用是,創建隊列,所以我們的SpringBoot在啟動時,它會對所有標記了@Configuration的類做初始化,同時會加有@Bean的方法,這個方法一被執行,就返回Queue,這個Queue就放在IOC容器當中,這是比較簡單的,這樣我們一個消息隊列的類就寫完了@Configuration
public class QueueConfig {/*** 創建隊列* @return*/@Beanpublic Queue createQueue(){return new Queue("hello-queue");}
}我們隊列的名稱叫hello-queue,接下來我們編寫一個發送消息的Class,我們起個名比如叫Sender,這里是消息發送者,然后這個消息發送者,在我們程序運行的時候,是不是讓Spring初始化出來,他既不屬于WEB層的框架,也不屬于業務層的框架,也不屬于持久層的框架,所以我們這里要用哪個注解呢,我們可以用@Component,直接加一個組件的注解,然后這里我先提供一個方法,這個方法用于專門發送消息的,發送消息的方法,這個方法沒有返回值,我們定義成void的就可以了,可以根據自己的需要來決定返回什么,然后我們叫send,然后send方法我們給他一個入參,這個入參,就是你給我方法什么,我就給消息隊列發送什么,是不是得給消息隊列里發送消息了,可是我怎么去拿消息隊列呢,在這里SpringBoot已經給我們封裝了,我們用的Queue是org.springframework.amqp.core.Queue是封裝完的,既然把它封裝了,包括對于隊列的操作方式,Spring也做封裝了,都是在amqp的包下,我們現在想要操作RabbitMQ怎么辦呢,封裝了一個類,叫rabbitAmqpTemplate,Spring把template的技術發揮到至極的狀態,只要他對什么東西做整合,最基本的會提供一個操作模板類,我們的模板就是什么什么template,整合hibnate就是hibernateTemplate,整合JDBC就是jdbcTemplate,那么他在整合Rabbbit的時候呢,提供一個RabbitTemplate,提供操作Rabbit的封裝,那么我先需要將他注入進來@Autowired
private AmqpTemplate rabbitAmqpTemplate;我們看,它是一個接口,然后我們起個名叫rabbitAmqpTemplate,然后我們可以通過注入進來的這個對象,來進行對消息隊列的一個操作,比如我們現在要發送怎么辦呢,下面有一個convertAndSend的方法,然后我們調用哪個呢,有兩個參數的,這兩個參數,我們來說一下,第一個參數表示什么意思呢,參數一表示隊列的名稱,就是你要往哪個隊列發送消息,參數二什么呢,消息,我們之前講過,消息隊列里我們是可以創建多個隊列的,我們可以在RabbitMQ里面創建多個消息隊列,然后生產者和消費者可以創建不同的隊列,我們在創建隊列的時候給他起了一個名稱,然后我們在發送消息的時候呢,根據隊列的名稱來做一個指定,所以我們要注意的就是,這個名稱是不能有重復的了,就是這塊需要注意,第一個隊列名稱,我們要往哪個隊列發送消息呢,我們這里的隊列的名稱叫"hello-queue",然后要發送什么消息呢,是不是傳遞過來的msg,這樣我們的一個消息發送者就編寫完了,還是比較簡單的,然后我們還得編寫一個消息的接收者@Component
public class Sender {@Autowiredprivate AmqpTemplate rabbitAmqpTemplate;/** 發送消息的方法*/public void send(String msg){//向消息隊列發送消息//參數一:隊列的名稱。//參數二:消息this.rabbitAmqpTemplate.convertAndSend("hello-queue", msg);}
}這個我叫什么呢,叫Receive,這個是消息接收者,然后他也得是一個組件,@Component,然后在這里我們加一個方法,比如叫process,叫什么都行,那么消息接收者是怎么接收消息的呢,他采用的是一個消息隊列的監聽器,也就是說如果你的隊列里一旦有變化了,就會觸發方法的一個接收,換句話說,你怎么讓我們加的方法去接收消息呢,就依賴于消息隊列的事件,我們就在消息接收者的方法上,加一個注解,叫@RabbitListener,那么這個注解什么意思呢,就是我剛剛說的,哪個隊列一旦有變化了,或者有發送者發送消息了,馬上就會執行這個方法,因為我們把這個注解加在這個方法上,采用失效通知的方法,從隊列當中接收消息,是這樣的一個過程,那么去讓他接收哪個隊列呢,我們可以創建好多隊列是不是,所以這一塊他有一個屬性,queues,他是一個數組,他可以對多個隊列進行一個監聽,那我們現在只有一個隊列,只要讓他監聽一個隊列就可以了,所以我們就不用寫數組格式了,直接寫一個字符串,你要是寫數組,里面要的也是字符串,要的是你隊列的名稱,隊列的名稱叫什么,叫hello-queue,所以把這個隊列的名稱拿過來,往這里一放,這里就表示去監聽這個隊列,一旦這個隊列里有消息進來了,馬上會觸發這個方法,這個方法我們要做的事就是,在隊列里把消息拿出來就可以了,他也會把消息傳遞過來,它會通過一個參數,String msg就可以了,這個是接收消息的方法,采用消息隊列監聽機制,然后會把這個消息傳遞過來,我們在這里直接打印他就可以了前面可以加上一個標記,這里是什么呢,是receiver,這樣我們消息接收方也就搞定了,@Component
public class Receiver {/*** 接收消息的方法。采用消息隊列監聽機制* @param msg*/@RabbitListener(queues="hello-queue")public void process(String msg){System.out.println("receiver: "+msg);}
}然后我們接下來我們再看一下,消息接收方和發送方我們都有了,我們是不是就可以編寫測試代碼了,回過來找到我們測試的這個包,我們叫com.learn.test,然后在測試包里我們建立一個測試類,我們建立一個QueueTest,這個是消息隊列測試類,既然是測試類,我們需要加一些注解,@RunWith,然后SpringRunner.classorg.springframework.test.context.junit4.SpringRunner然后我們還需要加一個@SpringBootTest,然后這里給一個啟動類classes=SpringBootMQApplication.class我們這個啟動類叫SpringBootMQApplication,接下來我們寫測試方法,測試消息隊列,其實我們要做的事你看一下,我們只要調用send方法就可以了,send方法一旦給消息隊列發送消息了,監聽事件馬上會觸發,然后會調用我們的process方法,然后將消息傳遞到參數當中,然后我們將消息打印出來,所以我們操作的就是發送者,所以我們要將消息的發送者注入進來@RunWith(SpringRunner.class)
@SpringBootTest(classes=SpringBootMQApplication.class)
public class QueueTest {@Autowiredprivate Sender sender;/** 測試消息隊列*/@Testpublic void test1(){this.sender.send("Hello RabbitMQ" + new Date());}
}接下來我們來做一個測試,所以RabbitMQ,或者消息隊列,他使用起來還是比較簡單的,無論是RabbitMQ也好,還是ActiveMQ也好,RocketMQ也好,其實他們的MQ原理上都是一樣的,操作上也大致相同,然后不同的是什么呢,就是他們的API可能會有一些變化,我們根據不同的消息隊列去熟悉他的API就可以了,編寫代碼,第一步創建隊列,現在我們隊列產生了以后,還記得這個管理界面吧59.110.158.145:15672http://59.110.158.145:15672/#/queues這里會顯示你隊列里的信息,有多少個這里會顯示多少個,然后我們在這里可以看到,我們是不是有一個hello-queue,這個隊列就叫hello-queue,然后狀態是什么,然后其他的信息在這里,消息的總量,還有消息讀取的流量,在這都會有一些顯示,所以我們通過他,管理我們的消息隊列就會更容易一些,當然你也可以刪掉,這里有一個delete,我們可以把它刪掉,這樣你的消息隊列就從MQ中刪除了,除非你再運行一遍他才會有,比如我再測試一下,成功了,然后我們再來看,這個隊列又出來了,顯示的是創建出來的隊列的信息,然后創建生產者,或者消息提供者,就是我們的Sender,然后是消息接收者,然后是啟動類,然后是我們的測試代碼
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>springcloud-mq</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.12.RELEASE</version><relativePath/> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><!-- 這個插件,可以將應用打包成一個可執行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
#server.port=8010
spring.application.name=springcloud-mq
spring.rabbitmq.host=59.110.158.145
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
package com.learn.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 創建消息隊列* @author Administrator**/
@Configuration
public class QueueConfig {/*** 創建隊列* @return*/@Beanpublic Queue createQueue(){return new Queue("hello-queue");}
}
package com.learn.config;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收者* @author Administrator**/
@Component
public class Receiver {/*** 接收消息的方法。采用消息隊列監聽機制* @param msg*/@RabbitListener(queues="hello-queue")public void process(String msg){System.out.println("receiver: "+msg);}
}
package com.learn.config;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 消息發送者* @author Administrator**/
@Component
public class Sender {@Autowiredprivate AmqpTemplate rabbitAmqpTemplate;/** 發送消息的方法*/public void send(String msg){//向消息隊列發送消息//參數一:隊列的名稱。//參數二:消息this.rabbitAmqpTemplate.convertAndSend("hello-queue", msg);}
}
package com.learn;import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** 自動配置* @author Leon.Sun*/
@SpringBootApplication
@EnableRabbit
public class SpringBootMQApplication {public static void main(String[] args) {// Spring應用啟動起來SpringApplication.run(SpringBootMQApplication.class,args);}}
package com.learn.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import com.learn.SpringBootMQApplication;
import com.learn.config.Sender;/*** 消息隊列測試類* @author Administrator*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes=SpringBootMQApplication.class)
public class QueueTest {@Autowiredprivate Sender sender;/** 測試消息隊列*/@Testpublic void test1(){for(int i=0;i<20;i++) {this.sender.send("Hello RabbitMQ" + i);}}
}
?
總結
以上是生活随笔為你收集整理的消息队列入门案例-编码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 消息队列入门案例-环境搭建
- 下一篇: RabbitMQ原理讲解