RabbitMQ 交换器、持久化
?一、 交換器
RabbitMQ交換器(Exchange)分為四種
- ?direct
?默認的交換器類型,消息的RoutingKey與隊列的bindingKey匹配,消息就投遞到相應(yīng)的隊列
- ?fanout
一種發(fā)布/訂閱模式的交換器,發(fā)布一條消息時,fanout把消息廣播附加到fanout交換器的隊列上?
接收類(訂閱):
import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//一旦創(chuàng)建exchange,RabbitMQ不允許對其改變,否則報錯String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");//綁定是交換器與隊列之間的關(guān)系,可以理解為,隊列對此交換器的消息感興趣System.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }發(fā)布類:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;public class ReceiveLog {private static final String EXCHANGE_NAME = "log";public static void main(String[] argv)throws java.io.IOException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "hi";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }
- topic
topic類似于fanout交換器,但更加具體化,用routingKey進行規(guī)則匹配,更靈活的匹配出用戶想要接收的消息
routingKey形如:com.company.module.demo,具體匹配規(guī)則:
"*"與"#"可以匹配任意字符,區(qū)別是"*"只能匹配由"."分割的一段字符,而"#"可以匹配所有字符
? 發(fā)布一條"com.abc.test.push"的消息,能匹配的routingKey:
com.abc.test.* #.test.push #不能匹配的:
com.abc.* *.test.push *?發(fā)布類:
聲明隊列時,需要注意隊列的屬性,雖然隊列的聲明由消費者或生產(chǎn)者完成都可以,但如果由消費者聲明,由于生產(chǎn)者生產(chǎn)消息時,可能隊列還沒有聲明,會造成消息丟失,所以推薦由生產(chǎn)者聲明隊列
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;public class RabbitMqSendTest {private static String queue = "test_queue";private static String exchange = "TestExchange";private static String routingKey = "abc.test";public static void main(String[] args) {ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();factory.setHost("172.16.67.60");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");Connection mqConnection = null;try {mqConnection = factory.newConnection();Channel mqChannel = mqConnection.createChannel();if (null != mqChannel && mqChannel.isOpen()) {mqChannel.exchangeDeclare(exchange, "topic"); // String queueName = mqChannel.queueDeclare().getQueue(); // mqChannel.queueBind(queueName, exchange, routingKey); //聲明隊列名稱與屬性 //durable持久隊列,mq重啟隊列可恢復(fù) exclusive獨占隊列,僅限于聲明它的連接使用操作 //autoDelete 自動刪除 arguments 其他屬性mqChannel.queueDeclare(queue, false, false, false, null);mqChannel.queueBind(queue, exchange, routingKey);//*******************************************mqChannel.basicPublish(exchange, routingKey, null,("hello").getBytes());}} catch (Exception e) {e.printStackTrace();}finally {try {mqConnection.close();} catch (IOException e) {e.printStackTrace();}}} }接收類
import com.rabbitmq.client.*; import java.io.IOException;public class ReceiveTopic {private static String queue = "consume_queue";private static String exchange = "TestExchange";private static String routingKey = "*.test";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("172.16.67.60");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// channel聲明Exchange,名稱與類型channel.exchangeDeclare(exchange, "topic"); // String queuename = channel.queueDeclare().getQueue();channel.queueDeclare(queue, false, false, false, null);channel.queueBind(queue, exchange, "*.test"); //消費者指定消息隊列,并選擇特定的RoutingKeySystem.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer client = new DefaultConsumer(channel) {public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String msgString = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + msgString + "'");}};channel.basicConsume(queue, true,client);System.out.println();} }二、持久化
RabbitMQ默認情況下重啟消息服務(wù)器時,會丟失消息,為了盡量保證消息在服務(wù)器宕機時不丟失,就需要把消息持久化,但是也只是盡量不丟失,由于涉及磁盤寫入,當(dāng)消息量巨大時,mq性能也會被嚴重拉低。
?
轉(zhuǎn)載于:https://www.cnblogs.com/castielangel/p/9952069.html
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ 交换器、持久化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql5.7.23版本环境配置
- 下一篇: @Column