Topic交换器-编写消费者
生活随笔
收集整理的這篇文章主要介紹了
Topic交换器-编写消费者
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
我們接著來編寫Consumer,回到我們的代碼當中,打開我們的topic-consumer,配置文件我們已經改完了,先放到這里,看我們的代碼,我們先改第一個InfoReceiver,InfoReceive這一塊我們需要改什么呢,還是@RabbitListener這,首先第一個,像隊列的名稱我們不需要動,隊列名稱我們還是叫log.info,我們給他準備了三個隊列,log.info,log.error,和全日志的隊列,這兩個我們都不要動,所以這里不用動,交換器我們也不用動,我們已經把后面的value改成log.topic了mq.config.exchange=log.topic但是這里得改,這個type得改type=ExchangeTypes.TOPIC現在這個type里面給定的還是direct交換器,我們要把它改成TOPIC,所以一定要注意,然后接下來就是key,我們用的是完全匹配的路由key,那么我們在這個交換器,在這個隊列當中,接收什么樣的消息呢,只要是你log.info的,不管你是user的還是product的,還是order的,只要是log.info的,都會進入到這里,那么前面的前綴,就得去匹配,所以我們這個key就是*.log.info看到了嗎,也就是我們這個路由key,其實給的就是*.log.info,能明白吧,這樣消息隊列就會根據后綴進行模糊匹配,只要是后綴相同的,就把消息發送到這個隊列當中,那么這個隊列是誰呢,就是我們的log.info了,這樣我們就把一個infoReceive就改好了,接下來還有Error,Error也是這樣的,首先我們要把交換器類型改成Topic,其次是這個key,這個key我們看一下,是*.log.error,擁有log.error的key,就會進入到這個隊列當中key="*.log.error"注意這一塊我是直接寫死到里面的,我之所以這么做的目的呢,就是可以直觀的看到怎么去寫他的通配,其實如果我們是在真正的開發過程當中,這些也是可以放到properties文件當中的,這樣我們就設置了第二個消息隊列的這樣的一個路由key,接下來我們是不是還有一個全日志隊列,我們再copy一個,這個我們叫LogsReceiver,然后打開他,這里我們有什么要改的呢,看一下,配置文件,這個隊列名稱,是不是已經改了mq.config.queue.logs=log.all這個在原本拷貝的例子里是沒有的,要換成這個隊列名稱,然后其次就是他的交換器,交換器不用動,還是Topic,然后就是這個路由key,在全日志的處理當中,只要你路由key含有log的消息,是不是都會進入到這里,所以這塊我們得改成*.log.*key="*.log.*"只要你的路由key中含有log,都會進入到我們的全隊列當中,這樣我們這塊就改完了,然后我們還有什么需要改動,現在這一塊,這塊到底打印的是什么信息,這塊我們還得改一下,這個我們是叫all receiverpublic class LogsReceiver {/*** 接收消息的方法。采用消息隊列監聽機制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("......All........receiver: "+msg);}
}這表示消息隊列打印的前綴,再加后面的消息,我們把這個copy一下,我們把這幾個改一下public class ErrorReceiver {/*** 接收消息的方法。采用消息隊列監聽機制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("......Error........receiver: "+msg);}
}這個是error,然后infopublic class InfoReceiver {/*** 接收消息的方法。采用消息隊列監聽機制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("......Info........receiver: "+msg);}
}這樣我們的Consumer的代碼就寫好了,然后我們把它整理到筆記當中,這個是info里面的代碼,然后這里需要注意的,一個是這個,type=ExchangeTypes.TOPIC,還有一個是key="*.log.info"這個,然后第二個,ErrorReceiver,把它copy下來放到我們的筆記當中,然后這里需要注意的是,type=ExchangeTypes.TOPIC,還有一個是key,key="*.log.error",然后最后一個是LogsReceiver,這里也是type=ExchangeTypes.TOPIC,還有key="*.log.*",這樣我們的consumer代碼就寫好了,接下來我們就測試我們的是否可用,回到我們的Provider當中,我們要去修改一下我們的測試代碼,現在我們的代碼就不用while循環了,我們直接發送消息就行了,現在這個是UserSender,我們還有OrderSender,還有ProductSender,都得注入進來,@Autowired
private UserSender usersender;@Autowired
private ProductSender productsender;@Autowired
private OrderSender ordersender;我們需要把ProductSender注入進來,然后還有一個,OrderSender,然后我們讓這三個服務都去發送消息,@Test
public void test1(){this.usersender.send("UserSender.....");this.productsender.send("ProductSender....");this.ordersender.send("OrderSender......");
}這樣是12條消息,應該是12條消息,每個里面有四個,然后我們三個都發一遍,接下來我們就來測試一下,代碼是否可用,我們把接收者啟動,就是topic-consumer,然后我們再去運行我們的測試類,運行我們的測試代碼,我們觀察控制臺,這個時候我們可以看到,消息拿到了,我們先看info的,這是我的info Receiver,......All........receiver: user.log.debug.....UserSender.....
......Info........receiver: user.log.info.....UserSender.....
......Error........receiver: user.log.error.....UserSender.....
......Info........receiver: product.log.info.....ProductSender....
......Error........receiver: product.log.error.....ProductSender....
......All........receiver: user.log.info.....UserSender.....
......Info........receiver: order.log.info.....OrderSender......
......Error........receiver: order.log.error.....OrderSender......
......All........receiver: user.log.warn.....UserSender.....
......All........receiver: user.log.error.....UserSender.....
......All........receiver: product.log.debug.....ProductSender....
......All........receiver: product.log.info.....ProductSender....
......All........receiver: product.log.warn.....ProductSender....
......All........receiver: product.log.error.....ProductSender....
......All........receiver: order.log.debug.....OrderSender......
......All........receiver: order.log.info.....OrderSender......
......All........receiver: order.log.warn.....OrderSender......
......All........receiver: order.log.error.....OrderSender......由于我們ALL當中配的是什么,我們配的是key="*.log.*",那么也就意味著,只要你的路由key當中,含有log,是不是都會進入到這個隊列當中,是不是都會進入到這個隊列當中,那么這里的信息肯定是最多的,應該有幾個,12條信息就對了,所以說呢,在*.log.*里面,他就是把所有的消息都放在全日志的級別當中,所以以上我們通過這個案例,就演示了對Topic的一個使用,topic最大的一個特點是什么呢,可以節省我們的隊列,是某一部分的消息,可以對應一個隊列,這樣可以降低我們隊列的一個數量,其實你細想這個問題也是這樣的,我這個隊列只是記錄log.info的信息,我管你是用戶的log.info,還是商品的log.info,還是訂單的log.info,只要是你產生的log.info的信息,都放到我這個隊列里就可以了,所以對于這樣的一個需求呢,我們就得通過topic交換器來實現了,并且配置這種通配的路由key,所以說topic的交換器呢,他也叫規則匹配,那個星號就是規則,測試代碼我們也加進去了,那么對于topic的交換器我們就講解完了
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>rabbitmq-topic-consumer</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.12.RELEASE</version><relativePath/> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><!-- 這個插件,可以將應用打包成一個可執行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
spring.application.name=rabbitmq-topic-consumerspring.rabbitmq.host=59.110.158.145
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guestmq.config.exchange=log.topic
mq.config.queue.info=log.info
mq.config.queue.error=log.error
mq.config.queue.logs=log.all
package com.learn;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收者* @author Administrator* @RabbitListener bindings:綁定隊列* @QueueBinding value:綁定隊列的名稱* exchange:配置交換器* * @Queue value:配置隊列名稱* autoDelete:是否是一個可刪除的臨時隊列* * @Exchange value:為交換器起個名稱* type:指定具體的交換器類型*/
@Component
@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),key="*.log.info"))
public class InfoReceiver {/*** 接收消息的方法。采用消息隊列監聽機制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("......Info........receiver: "+msg);}
}
package com.learn;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收者* @author Administrator* @RabbitListener bindings:綁定隊列* @QueueBinding value:綁定隊列的名稱* exchange:配置交換器* * @Queue value:配置隊列名稱* autoDelete:是否是一個可刪除的臨時隊列* * @Exchange value:為交換器起個名稱* type:指定具體的交換器類型*/
@Component
@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.error}",autoDelete="true"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),key="*.log.error"))
public class ErrorReceiver {/*** 接收消息的方法。采用消息隊列監聽機制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("......Error........receiver: "+msg);}
}
package com.learn;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收者* @author Administrator* @RabbitListener bindings:綁定隊列* @QueueBinding value:綁定隊列的名稱* exchange:配置交換器* * @Queue value:配置隊列名稱* autoDelete:是否是一個可刪除的臨時隊列* * @Exchange value:為交換器起個名稱* type:指定具體的交換器類型*/
@Component
@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.logs}",autoDelete="true"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),key="*.log.*"))
public class LogsReceiver {/*** 接收消息的方法。采用消息隊列監聽機制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("......All........receiver: "+msg);}
}
package com.learn;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitTopicConsumerApplication {public static void main(String[] args) {SpringApplication.run(RabbitTopicConsumerApplication.class, args);}
}
?
總結
以上是生活随笔為你收集整理的Topic交换器-编写消费者的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Topic交换器-编写生产者
- 下一篇: Fanout交换器-搭建环境