日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

rabbitmq java教程_GitHub - maxwellyue/rabbitmq-tutorial-java: RabbitMQ官方教程的翻译和说明--Java版...

發(fā)布時(shí)間:2024/10/8 java 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq java教程_GitHub - maxwellyue/rabbitmq-tutorial-java: RabbitMQ官方教程的翻译和说明--Java版... 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

說明

主要通過Hello Word對RabbitMQ有初步認(rèn)識

工作隊(duì)列,即一個(gè)生產(chǎn)者對多個(gè)消費(fèi)者

循環(huán)分發(fā)、消息確認(rèn)、消息持久、公平分發(fā)

如何同一個(gè)消息同時(shí)發(fā)給多個(gè)消費(fèi)者

開始引入RabbitMQ消息模型中的重要概念路由器Exchange以及綁定等

使用了fanout類型的路由器

如何選擇性地接收消息

使用了direct路由器

如何通過多重標(biāo)準(zhǔn)接收消息

使用了topic路由器,可通過靈活的路由鍵和綁定鍵的設(shè)置,

進(jìn)一步增強(qiáng)消息選擇的靈活性

如何使用RabbitMQ實(shí)現(xiàn)一個(gè)簡單的RPC系統(tǒng)

回調(diào)隊(duì)列callback queue和關(guān)聯(lián)標(biāo)識correlation id

RabbitMQ 一般工作流程

生產(chǎn)者和RabbitMQ服務(wù)器建立連接和通道,聲明路由器,同時(shí)為消息設(shè)置路由鍵,這樣,所有的消息就會(huì)以特定的路由鍵發(fā)給路由器,具體路由器會(huì)發(fā)送到哪個(gè)或哪幾個(gè)隊(duì)列,生產(chǎn)者在大部分場景中都不知道。(1個(gè)路由器,但不同的消息可以有不同的路由鍵)。

消費(fèi)者和RabbitMQ服務(wù)器建立連接和通道,然后聲明隊(duì)列,聲明路由器,然后通過設(shè)置綁定鍵(或叫路由鍵)為隊(duì)列和路由器指定綁定關(guān)系,這樣,消費(fèi)者就可以根據(jù)綁定鍵的設(shè)置來接收消息。(1個(gè)路由器,1個(gè)隊(duì)列,但不同的消費(fèi)者可以設(shè)置不同的綁定關(guān)系)。

主要方法

聲明隊(duì)列(創(chuàng)建隊(duì)列):可以生產(chǎn)者和消費(fèi)者都聲明,也可以消費(fèi)者聲明生產(chǎn)者不聲明,也可以生產(chǎn)者聲明而消費(fèi)者不聲明。最好是都聲明。(生產(chǎn)者未聲明,消費(fèi)者聲明這種情況如果生產(chǎn)者先啟動(dòng),會(huì)出現(xiàn)消息丟失的情況,因?yàn)殛?duì)列未創(chuàng)建)

channel.queueDeclare(String queue, //隊(duì)列的名字

boolean durable, //該隊(duì)列是否持久化(即是否保存到磁盤中)

boolean exclusive,//該隊(duì)列是否為該通道獨(dú)占的,即其他通道是否可以消費(fèi)該隊(duì)列

boolean autoDelete,//該隊(duì)列不再使用的時(shí)候,是否讓RabbitMQ服務(wù)器自動(dòng)刪除掉

Map arguments)//其他參數(shù)

聲明路由器(創(chuàng)建路由器):生產(chǎn)者、消費(fèi)者都要聲明路由器---如果聲明了隊(duì)列,可以不聲明路由器。

channel.exchangeDeclare(String exchange,//路由器的名字

String type,//路由器的類型:topic、direct、fanout、header

boolean durable,//是否持久化該路由器

boolean autoDelete,//是否自動(dòng)刪除該路由器

boolean internal,//是否是內(nèi)部使用的,true的話客戶端不能使用該路由器

Map arguments) //其他參數(shù)

綁定隊(duì)列和路由器:只用在消費(fèi)者

channel.queueBind(String queue, //隊(duì)列

String exchange, //路由器

String routingKey, //路由鍵,即綁定鍵

Map arguments) //其他綁定參數(shù)

發(fā)布消息:只用在生產(chǎn)者

channel.basicPublish(String exchange, //路由器的名字,即將消息發(fā)到哪個(gè)路由器

String routingKey, //路由鍵,即發(fā)布消息時(shí),該消息的路由鍵是什么

BasicProperties props, //指定消息的基本屬性

byte[] body)//消息體,也就是消息的內(nèi)容,是字節(jié)數(shù)組

BasicProperties props:指定消息的基本屬性,如deliveryMode為2時(shí)表示消息持久,2以外的值表示不持久化消息

//BasicProperties介紹

String corrId = "";

String replyQueueName = "";

Integer deliveryMode = 2;

String contentType = "application/json";

AMQP.BasicProperties props = new AMQP.BasicProperties

.Builder()

.correlationId(corrId)

.replyTo(replyQueueName)

.deliveryMode(deliveryMode)

.contentType(contentType)

.build();

接收消息:只用在消費(fèi)者

channel.basicConsume(String queue, //隊(duì)列名字,即要從哪個(gè)隊(duì)列中接收消息

boolean autoAck, //是否自動(dòng)確認(rèn),默認(rèn)true

Consumer callback)//消費(fèi)者,即誰接收消息

消費(fèi)者中一般會(huì)有回調(diào)方法來消費(fèi)消息

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, //該消費(fèi)者的標(biāo)簽

Envelope envelope,//字面意思為信封:packaging data for the message

AMQP.BasicProperties properties, //message content header data

byte[] body) //message body

throws IOException {

//獲取消息示例

String message = new String(body, "UTF-8");

//接下來就可以根據(jù)消息處理一些事情

}

};

路由器類型

fanout:會(huì)忽視綁定鍵,每個(gè)消費(fèi)者都可以接受到所有的消息(前提是每個(gè)消費(fèi)者都要有各自單獨(dú)的隊(duì)列,而不是共有同一隊(duì)列)。

direct:只有綁定鍵和路由鍵完全匹配時(shí),才可以接受到消息。

topic:可以設(shè)置多個(gè)關(guān)鍵詞作為路由鍵,在綁定鍵中可以使用*和#來匹配

headers:(可以忽視它的存在)

看主要代碼

//生產(chǎn)者

channel.queueDeclare(QUEUE_NAME, false, false, false, null); ----①

String message = "Hello World!";

channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

//消費(fèi)者

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

channel.basicConsume(QUEUE_NAME, true, consumer);

這里,生產(chǎn)者和消費(fèi)者都沒有聲明路由器,而是聲明了同名的隊(duì)列。生產(chǎn)者發(fā)布消息時(shí),使用了默認(rèn)的無名路由器(""),并以隊(duì)列的名字作為了路由鍵。消費(fèi)者在消費(fèi)時(shí),由于沒有聲明路由器,這并不表示沒有路由器的存在,消費(fèi)者此時(shí)使用的是默認(rèn)的路由器,即Default exchange,該路由器和所有的隊(duì)列都進(jìn)行綁定,并且使用隊(duì)列的名字作為了路由鍵進(jìn)行綁定。所以,生產(chǎn)者使用默認(rèn)路由器以隊(duì)列的名字作為了綁定鍵進(jìn)行了消息發(fā)布,而消費(fèi)者也使用了默認(rèn)的路由器,并以隊(duì)列的名字作為綁定鍵進(jìn)行了綁定。而默認(rèn)路由器是direct類型,路由鍵和綁定鍵完全匹配時(shí),消費(fèi)者才能接受到消息,所以教程1中的消費(fèi)者可以接收到消息。(為了認(rèn)證這一點(diǎn),可以將代碼①去掉,然后先運(yùn)行消費(fèi)者,讓它等待監(jiān)聽,然后啟動(dòng)生產(chǎn)者,發(fā)送消息,消費(fèi)者同樣會(huì)收到消息。這里的生產(chǎn)者聲明隊(duì)列,只是讓RabbitMQ服務(wù)器先創(chuàng)建這個(gè)隊(duì)列,以免發(fā)送的消息因?yàn)檎也坏疥?duì)列而丟失。)

看主要代碼

//生產(chǎn)者

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String message = "1.";

channel.basicPublish("", TASK_QUEUE_NAME,

MessageProperties.PERSISTENT_TEXT_PLAIN,

message.getBytes("UTF-8"));

//消費(fèi)者

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

channel.basicQos(1);---①

...channel.basicAck(envelope.getDeliveryTag(), false);...---③

boolean autoAck = false;---②

channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

這里也使用了默認(rèn)的direct路由器。假如啟動(dòng)多個(gè)工作者(消費(fèi)者),按道理這些工作者應(yīng)該可以接收到所有的消息啊,但是不要忘了這幾個(gè)工作者都是從同一個(gè)隊(duì)列中取消息,消息取出一個(gè),隊(duì)列中就少一個(gè),所以每個(gè)工作者都只是收到的消息的一部分。既然這幾個(gè)工作者都從同一個(gè)隊(duì)列中取消息,那每個(gè)工作者應(yīng)該怎么取呢?

如果沒有代碼①,并且②設(shè)置為true,即自動(dòng)確認(rèn)收到消息,RabbitMQ只要發(fā)出消息就認(rèn)為消費(fèi)者收到了,此時(shí)RabbitMQ采取的是循環(huán)分發(fā)的策略,在這幾個(gè)工作者中循環(huán)輪流分發(fā)消息。每個(gè)工作者接受到的消息數(shù)量都是相同的。

如果有代碼①,并且②設(shè)置為false,則RabbitMQ會(huì)采取公平分發(fā)策略,即將消息發(fā)給空閑的工作者(空閑,工作者將消息處理完畢,執(zhí)行了代碼③;不空閑,即工作者還在處理消息,還沒有給RabbitMQ發(fā)回確認(rèn)信息,即還沒有執(zhí)行代碼③)。

代碼①中的參數(shù)1:(prefetchCount)maximum number of messages that the server will deliver。

為了防止隊(duì)列丟失,在聲明隊(duì)列的時(shí)候指定了durable為true。為了防止消息丟失,設(shè)置了消息屬性BasicProperties為MessageProperties.PERSISTENT_TEXT_PLAIN,讓我們看看值是什么:

可以看出里面包含了deliveryMode=2。從這張圖也可以看到BasicProperties屬性的全貌。

如果想讓多個(gè)消費(fèi)者共同消費(fèi)某些消息,只要讓他們共用同一隊(duì)列即可(當(dāng)然前提是你得保證消息可以都進(jìn)到這個(gè)隊(duì)列中來,如本例中使用direct路由器,消息的路由鍵和隊(duì)列的綁定鍵設(shè)為一致,當(dāng)然也可以使用fanout路由器,路由鍵和綁定鍵隨意設(shè)置,不一致也能收到,因?yàn)閒anout路由器會(huì)忽略路由鍵的設(shè)置)。

看主要代碼

//生產(chǎn)者

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

//消費(fèi)者

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

String queueName = channel.queueDeclare().getQueue();---①

channel.queueBind(queueName, EXCHANGE_NAME, "");

channel.basicConsume(queueName, true, consumer);

教程三才引出路由器的概念。生產(chǎn)者和消費(fèi)者聲明了同樣的路由,并指明路由類型為fanout,該路由器會(huì)忽視路由鍵,將消息發(fā)布到所有綁定的隊(duì)列中(仍需要綁定,只是綁定時(shí)綁定鍵任意就行了)。

假如啟動(dòng)多個(gè)消費(fèi)者,因?yàn)榇a①中調(diào)用無參的聲明去惡劣方法channel.queueDeclare(),就會(huì)創(chuàng)建了一個(gè)非持久、獨(dú)特的、自動(dòng)刪除的隊(duì)列,并返回一個(gè)自動(dòng)生成的名字。所以多個(gè)消費(fèi)者取消息時(shí)使用的是各自的隊(duì)列,不會(huì)存在多個(gè)消費(fèi)者從同一個(gè)隊(duì)列取消息的情況。

這樣多個(gè)消費(fèi)者就可以接收到同一消息。

如果想實(shí)現(xiàn)多個(gè)消費(fèi)者都可以接收到所有的消息,只要讓他們各自使用單獨(dú)的隊(duì)列即可(當(dāng)然前提是保證路由鍵和綁定鍵的設(shè)置可以讓消息都進(jìn)入到隊(duì)列,如本例中使用fanout路由器,無需考慮綁定鍵和路由鍵)。

看主要代碼:

//生產(chǎn)者

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));

//消費(fèi)者

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

String queueName = channel.queueDeclare().getQueue();

String[] severities = {"info", "warning", "error"};

for (String severity : severities) {

channel.queueBind(queueName, EXCHANGE_NAME, severity);

}

channel.basicConsume(queueName, true, consumer);

可以看出,教程3使用了direct路由器,該路由器的特點(diǎn)是可以設(shè)定路由鍵和綁定鍵,消費(fèi)者只能從隊(duì)列中取出兩者匹配的消息。

在生產(chǎn)者發(fā)消息時(shí),為消息設(shè)置不同的路由鍵(如例子中severity可以設(shè)為info、warn、error)。

消費(fèi)者在通過為隊(duì)列設(shè)置多個(gè)綁定關(guān)系,來選擇想要接收的消息。

這里有一個(gè)概念叫做多重綁定,即多個(gè)隊(duì)列以相同的綁定鍵binding key綁定到同一個(gè)路由器上,此時(shí)direct路由器就會(huì)像fanout路由器一樣,將消息廣播給所有符合路由規(guī)則的隊(duì)列。

看主要代碼:

//生產(chǎn)者

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

String routingKey = "";

String message = "msg...";

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

//消費(fèi)者

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

String queueName = channel.queueDeclare().getQueue();

String bingingKeys[] = {""};

for (String bindingKey : bingingKeys) {

channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

}

channel.basicConsume(queueName, true, consumer);

這里使用了topic路由器,它與direct路由器類似,不同在于,topic路由器可以為路由鍵設(shè)置多重標(biāo)準(zhǔn)。一個(gè)消息有一個(gè)路由鍵,direct路由器只能為路由鍵指定一個(gè)關(guān)鍵字,但是topic路由器可以在路由鍵中通過點(diǎn)號分割多個(gè)單詞來組成路由鍵,消費(fèi)者在綁定的時(shí)候,可以設(shè)置多重標(biāo)準(zhǔn)來選擇接受。

舉個(gè)例子:假如日志根據(jù)嚴(yán)重級別info、warn、error,也可以根據(jù)來源分為cron、kern、auth。某個(gè)日志消息設(shè)置路由鍵為kern.info,表示來自kern的info級別的日志。想要選擇接收消息的時(shí)候,direct路由器就辦不到,它要么可以根據(jù)嚴(yán)重級別來篩選,要么根據(jù)來源來篩選,而topic路由器則可以輕松應(yīng)對,只要將綁定鍵設(shè)置為kern.info就可以精準(zhǔn)獲取該類型的日志。

教程6屬于RabbitMQ在RPC領(lǐng)域的應(yīng)用,與上面幾個(gè)教程的內(nèi)容沒有多大銜接,可以直接閱讀原文。

總結(jié)

以上是生活随笔為你收集整理的rabbitmq java教程_GitHub - maxwellyue/rabbitmq-tutorial-java: RabbitMQ官方教程的翻译和说明--Java版...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。