日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ 交换器、持久化

發(fā)布時間:2024/4/17 编程问答 57 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 交换器、持久化 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?一、 交換器

  RabbitMQ交換器(Exchange)分為四種

  •   direct    ? ?
  •   fanout
  •   topic
  •   headers
    • ?direct

      ?默認的交換器類型,消息的RoutingKey與隊列的bindingKey匹配,消息就投遞到相應(yīng)的隊列

    • ?fanout

      一種發(fā)布/訂閱模式的交換器,發(fā)布一條消息時,fanout把消息廣播附加到fanout交換器的隊列上? 

      接收類(訂閱): 

    import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//一旦創(chuàng)建exchange,RabbitMQ不允許對其改變,否則報錯String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");//綁定是交換器與隊列之間的關(guān)系,可以理解為,隊列對此交換器的消息感興趣System.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }

      發(fā)布類: 

    import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;public class ReceiveLog {private static final String EXCHANGE_NAME = "log";public static void main(String[] argv)throws java.io.IOException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "hi";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }
    • topic

      topic類似于fanout交換器,但更加具體化,用routingKey進行規(guī)則匹配,更靈活的匹配出用戶想要接收的消息

      routingKey形如:com.company.module.demo,具體匹配規(guī)則:

        "*"與"#"可以匹配任意字符,區(qū)別是"*"只能匹配由"."分割的一段字符,而"#"可以匹配所有字符   

    ?  發(fā)布一條"com.abc.test.push"的消息,能匹配的routingKey:

    com.abc.test.* #.test.push #

      不能匹配的:

    com.abc.* *.test.push *

    ?發(fā)布類:

      聲明隊列時,需要注意隊列的屬性,雖然隊列的聲明由消費者或生產(chǎn)者完成都可以,但如果由消費者聲明,由于生產(chǎn)者生產(chǎn)消息時,可能隊列還沒有聲明,會造成消息丟失,所以推薦由生產(chǎn)者聲明隊列

    import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;public class RabbitMqSendTest {private static String queue = "test_queue";private static String exchange = "TestExchange";private static String routingKey = "abc.test";public static void main(String[] args) {ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();factory.setHost("172.16.67.60");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");Connection mqConnection = null;try {mqConnection = factory.newConnection();Channel mqChannel = mqConnection.createChannel();if (null != mqChannel && mqChannel.isOpen()) {mqChannel.exchangeDeclare(exchange, "topic"); // String queueName = mqChannel.queueDeclare().getQueue(); // mqChannel.queueBind(queueName, exchange, routingKey); //聲明隊列名稱與屬性 //durable持久隊列,mq重啟隊列可恢復(fù) exclusive獨占隊列,僅限于聲明它的連接使用操作 //autoDelete 自動刪除 arguments 其他屬性mqChannel.queueDeclare(queue, false, false, false, null);mqChannel.queueBind(queue, exchange, routingKey);//*******************************************mqChannel.basicPublish(exchange, routingKey, null,("hello").getBytes());}} catch (Exception e) {e.printStackTrace();}finally {try {mqConnection.close();} catch (IOException e) {e.printStackTrace();}}} }

    接收類

    import com.rabbitmq.client.*; import java.io.IOException;public class ReceiveTopic {private static String queue = "consume_queue";private static String exchange = "TestExchange";private static String routingKey = "*.test";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("172.16.67.60");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// channel聲明Exchange,名稱與類型channel.exchangeDeclare(exchange, "topic"); // String queuename = channel.queueDeclare().getQueue();channel.queueDeclare(queue, false, false, false, null);channel.queueBind(queue, exchange, "*.test"); //消費者指定消息隊列,并選擇特定的RoutingKeySystem.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer client = new DefaultConsumer(channel) {public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String msgString = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + msgString + "'");}};channel.basicConsume(queue, true,client);System.out.println();} }

    二、持久化

      RabbitMQ默認情況下重啟消息服務(wù)器時,會丟失消息,為了盡量保證消息在服務(wù)器宕機時不丟失,就需要把消息持久化,但是也只是盡量不丟失,由于涉及磁盤寫入,當(dāng)消息量巨大時,mq性能也會被嚴重拉低。

    ?

    轉(zhuǎn)載于:https://www.cnblogs.com/castielangel/p/9952069.html

    總結(jié)

    以上是生活随笔為你收集整理的RabbitMQ 交换器、持久化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。