日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMq 发布订阅 Publish/Subscribe fanout/direct

發布時間:2025/3/15 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMq 发布订阅 Publish/Subscribe fanout/direct 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

?

概述

交換機

臨時隊列

代碼


概述

在上篇中了解到rabbitmq 生產者生產消息到隊列,多個消費者可以接受。這篇文章主要記錄廣播類型為fanout。生產者不在將產生的消息發送到隊列,而是將消息發送到交換機exchange,交換機會根據不同的交換規則,將消息發送到不同的隊列。交換器必須知道她所接收的消息是什么?它應該將消息放到哪個隊列中或者還是應該丟棄?這些規則都是按照交換機的規則來確定的。

? ? ? ? ? ?

交換機

Exchange(交換機):生產者會將消息發送到交換機,然后交換機通過路由策略(規則)將消息路由到匹配的隊列中去

交換規則:

Fanout 不處理路由。需要簡單的將隊列綁定到交換機上。一個發送到該類型交換機的消息都會被廣播到與該交換機綁定的所有隊列上。(本篇文章)

direct:它會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中。

channel.basicPublish(“direct”, “warn”, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

我們定義direct交換機,綁定路由warn 這時候發送消息只能發送的綁定的隊列中 如隊列1 隊列2 但是如果綁定路由為info 則只有隊列2可以收到。

topic:direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似。

定義

//參數1 名稱 參數2 類型 channel.exchangeDeclare("fanout", "fanout");

臨時隊列

在生產者和消費者之間創建一個新的隊列,這時候又不想使用原來的隊列,臨時隊列就是為這個場景而生的:

首先,每當我們連接到RabbitMQ,我們需要一個新的空隊列,我們可以用一個隨機名稱來創建,或者說讓服務器選擇一個隨機隊列名稱給我們,一旦我們斷開消費者,隊列應該立即被刪除。

在Java客戶端,提供queuedeclare()為我們創建一個非持久化、獨立、自動刪除的隊列名稱。

隊列綁定

BindOk com.rabbitmq.client.Channel.queueBind(String queue, String exchange, String routingKey) throws IOExceptionBind a queue to an exchange, with no extra arguments.Parameters:queue the name of the queueexchange the name of the exchangeroutingKey the routine key to use for the binding Returns:a binding-confirm method if the binding was successfully created Throws:java.io.IOException - if an error is encountered

代碼

該代碼為fanout模式

生產者

package com.ll.mq.hellomq.queue;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;/*** * @author ll 生產者**/ public class Producer {private static final String EXCHANGE_NAME = "fanoutStudy";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 分發消息for(int i = 0 ; i < 5; i++){String message = "Hello World! " + i;channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" send'" + message + "'");}channel.close();connection.close();}}

消費者1

package com.ll.mq.hellomq.fanout;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /*** * @author ll 消費者1**/ public class ConsumerOne {private static final String EXCHANGE_NAME = "fanoutStudy";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 聲明交換機類型channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 獲取臨時隊列String queueName = channel.queueDeclare().getQueue();// 綁定channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" ConsumerOne '" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }

消費者2

package com.ll.mq.hellomq.fanout;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope;public class ConsumerTwo {private static final String EXCHANGE_NAME = "fanoutStudy";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明交換機類型channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//獲取臨時隊列String queueName = channel.queueDeclare().getQueue();//綁定channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" ConsumerTwo '" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }

結果:

生產者:?send'Hello World! 0'
? ? ? ? ? ? ? ?send'Hello World! 1'
? ? ? ? ? ? ? ?send'Hello World! 2'
? ? ? ? ? ? ? ?send'Hello World! 3'
? ? ? ? ? ? ? ?send'Hello World! 4'

消費者1? ??ConsumerOne 'Hello World! 0'
? ? ? ? ? ? ? ?ConsumerOne 'Hello World! 1'
? ? ? ? ? ? ? ?ConsumerOne 'Hello World! 2'
? ? ? ? ? ? ? ConsumerOne 'Hello World! 3'
? ? ? ? ? ? ? ConsumerOne 'Hello World! 4'

消費者2? ?ConsumerTwo'Hello World! 0'
? ? ? ? ? ? ? ?ConsumerTwo'Hello World! 1'
? ? ? ? ? ? ? ?ConsumerTwo'Hello World! 2'
? ? ? ? ? ? ? ConsumerTwo'Hello World! 3'
? ? ? ? ? ? ? ConsumerTwo'Hello World! 4'

rabbitmq結果:

?

下一篇?https://blog.csdn.net/lilongwangyamin/article/details/105117288?rabbitmq topic

總結

以上是生活随笔為你收集整理的RabbitMq 发布订阅 Publish/Subscribe fanout/direct的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。