rabbitmq java文档_《RabbitMQ官方文档》订阅与发布
之前的教程中,我們創建了一個工作隊列。在一個工作隊列背后的假設是將每個任務都準確地交付給一個工作人員。在這個環節我們要做些完全不同的事情—我們將要把一個消息傳遞給多個消費者。這種模式被稱為“發布/訂閱”。
為了闡述這種模式,我們打算構建一個簡單的日志系統。它由兩個程序組成—第一個發出日志消息,第二個接收消息并打印出來。
在我們的日志系統中,每次運行接收者程序的副本都將收到消息。這使我們能夠運行一個接收者程序,將日志導向磁盤;同時我們可以運行另一個接收者程序查看屏幕上的日志。
實際上,已發布的日志消息將被廣播到所有接收者。
交換
在教程的前面部分中,我們發送和接受消息都來自一個隊列,現在是時候引進一個完整的Rabbit消息模型了。
讓我們快速地回顧一下在之前的教程里介紹的內容:
生產者是發送消息的用戶程序。
隊列是儲存消息的緩存。
消費者是接收消息的用戶程序。
在Rabbit消息模型中核心的思想是,生產者從不直接向一個隊列發送任何消息。事實上,通常生產者甚至不知道將一個消息是否傳遞到了某一隊列。
相反的,生產者只能向交換器(exchange)發送消息。交換器是一個非常簡單的東西。它在一端從生產者接收消息,在另一端將消息發布到隊列。交換器必須準確地知道它收到的消息要做什么。它應該被添加到一個特殊的隊列?它應該被添加到多個隊列?或者它應該被廢棄?這些規則由交換類型定義。
有許多交換類型是可用的:direct,tpic,headers和fanout。我們來關注最后一種–fanout,我們來創建一個這種類型的交換器命名為logs:
channel.exchangeDeclare("logs", "fanout");
這個fanout交換器非常簡單。就像你可以從它的名字中猜到的,它僅僅是將它收到的所有消息傳遞到它所知的隊列中去。這正是我們的日志系統所需要的。
列出交換器
你可以用有用的rabbitmqctl列出交換器:
sudo rabbitmqctl list_exchanges
在列表里會有一些名字為 amq.*的默認(未命名的),但是現在你可能不需要使用它們。
匿名交換器
在教程的前面部分中,我們還不知道交換器,但仍能夠將消息發送到對了,這是因為我們可能用了一個默認的交換器,我們用空字符串(””)來識別它。
想想我們之前是怎樣發布一條消息的:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數是交換器的名字。空字符串指明是默認或匿名的交換器:消息通過“routingKey”選擇指定的名稱路由到隊列,如果它存在的話。
現在我們可以發布到我們命名的交換器了。
channel.basicPublish( "logs", "", null, message.getBytes());
臨時隊列
也許你記得我們之前使用的隊列都有一個指定的名字(記得hello和task——queue嗎?)。能夠為一個隊列指定名字對于我們來說是很重要的—我們需要將工作者指向相同的隊列。當你想要在生產者和消費者之間同用一個隊列的時候,給一個隊列賦予名字是很重要的。
但這不適用于我們的日志系統。我們想要監聽所有的消息,而不只是其中的一部分。我們也僅只對當前活動的消息感興趣而不是舊的。為了解決這個問題我們需要兩個東西。
首先,每當我們連接Rabbit我們需要一個刷新過的空隊列。我們可以創建一個隊列并使用一個隨機的名字,甚至更好的做法是讓服務器為我們選擇一個隨機的名字:
其次,一旦我們使消費者從隊列斷開后,隊列應該自動被刪掉。
在Java客戶端,當我們使用一個不帶參數的queueDeclare()時,我們使用自動生成的名字創建了一個,不持久的,專用的,自動刪除的隊列。
String queueName = channel.queueDeclare().getQueue();
此時queueName包含一個隨機的隊列名字。例如它可能是amq.gen-JzTY20BRgKO-HjmUJj0wLg.
綁定
我們已經創建了一個fanout交換器和一個隊列。現在我們需要讓交換器將消息發送到我們的隊列里。交換器和一個隊列之間的關系稱為一個綁定。
channel.queueBind(queueName, "logs", "");
現在開始logs交換器會添加消息到我們的隊列。
列出綁定
你可以列出所有存在的綁定,你可以猜到這條命令:
rabbitmqctl list_bindings
將他們聯系在一起
用于發出消息的生產者程序,看起來和之前的教程里沒有太大差異。最重要的改變是現在我們要發布消息到我們的logs交換器而不是匿名的。我們需要在發送的時候提供一個routingKey,但對于fanout交換器來說,這個值會被忽略掉。
以下是EmitLog.java程序的代碼:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
正如你所見,在建立一個連接后我們聲明了一個交換器,這個步驟是必須的因為發布到一個不存在的交換器是禁止的。
如果沒有隊列綁定到交換器上消息將丟失,但這對于我們來說沒有影響;如果沒有消費者在監聽,我們可以安全地丟棄消息。
這是ReceiveLogs.java:的代碼:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
像我們之前那樣編譯它。
javac -cp $CP EmitLog.java ReceiveLogs.java
如果你想要把日志保存到文件,只需打開控制臺并輸入:
java -cp $CP ReceiveLogs > logs_from_rabbit.log
如果你想要在屏幕里看到日志,spawn一個新的終端并且運行:
java -cp?$CP ReceiveLogs
當然,要發出日志類型:
java -cp $CP EmitLog
使用rabbitmqctl list_bindings 你可以驗證代碼實際上是根據需要創建綁定和隊列。在兩個ReceiveLogs.java程序中你應該可以看到這些東西:
sudo rabbitmqctl list_bindings
# => Listing bindings …
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => …done.
結果解釋很直接:數據從交換器logs而來去到兩個系統分配名字的隊列。這正是我們想要的。
要了解如何監聽消息的一個子集,我們繼續閱讀教程4。
總結
以上是生活随笔為你收集整理的rabbitmq java文档_《RabbitMQ官方文档》订阅与发布的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java只有值传递_面试官:为什么 Ja
- 下一篇: 官媒发文质疑增高针滥用,长春高新股价跌停