(RabbitMQ) Java Client API Guide
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
本篇翻譯的是RabbitMQ官方文檔關(guān)于API的內(nèi)容,原文鏈接:http://www.rabbitmq.com/api-guide.html。博主對(duì)其內(nèi)容進(jìn)行大體上的翻譯,有些許部分會(huì)保留英文,個(gè)人覺(jué)得這樣更加有韻味,如果全部翻譯成中文,會(huì)存在偏差,文不達(dá)意(主要是功力淺薄~~)。文章也對(duì)部分內(nèi)容進(jìn)行一定的解釋,增強(qiáng)對(duì)相關(guān)知識(shí)點(diǎn)的理解。
Overview
RabbitMQ java client uses com.rabbitmq.client as its top-level package, 關(guān)鍵的classes和interface如下:
- Channel
- Connection
- ConnectionFactory
- Consumer
AMQP協(xié)議層面的操作通過(guò)Channel接口實(shí)現(xiàn)。Connection是用來(lái)open Channels的,可以注冊(cè)event handlers,也可以在結(jié)束是close connections. Connection是通過(guò)ConnectionFactory來(lái)進(jìn)行初始化操作的,當(dāng)然也需要配置不同的connection設(shè)置,比如vhost或者username等。
Connections and Channels
關(guān)鍵的API如Connection和Channel,分別代表了AMQP-0-9-1的connection和channel。典型的包導(dǎo)入如下:
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;Connecting to a broker
下面的代碼用來(lái)在給定的參數(shù)(hostname, port number等)下連接一個(gè)AMQP broker:
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); Connection conn = factory.newConnection();也可以選擇使用URI來(lái)實(shí)現(xiàn),示例如下:
ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost"); Connection conn = factory.newConnection();Connection接口被用來(lái)open一個(gè)channel:
Channel channel = conn.createChannel();這樣在創(chuàng)建之后,Channel可以用來(lái)發(fā)送或者接受消息了。
在使用完之后,關(guān)閉連接:
顯示的關(guān)閉channel是一個(gè)很好的習(xí)慣,但這不是必須的,在基本的connection關(guān)閉的時(shí)候channel也會(huì)自動(dòng)的關(guān)閉。
Using Exchanges and Queues
AMQP的high-level構(gòu)建模塊exchanges和queues是Client端應(yīng)用所必須的。在使用之前必須先“declared”(聲明),確保在使用之前已經(jīng)存在,如果不存在則創(chuàng)建它,這些操作都包含在declare里。
下面的代碼是演示如何declare一個(gè)exchange和queue:
上面創(chuàng)建了一個(gè)durable, non-autodelete并且綁定類型為direct的exchange以及一個(gè)non-durable, exclusive,autodelete的queue(此queue的名稱由broker端自動(dòng)生成)。這里的exchange和queue也都沒(méi)有設(shè)置特殊的arguments。
上面的代碼也展示了如果使用routing key將queue和exchange綁定起來(lái)。上面聲明的queue具備如下特性:排他的(只對(duì)當(dāng)前client同一個(gè)Connection可用, 同一個(gè)Connection的不同的Channel可共用),并且也會(huì)在client連接斷開(kāi)時(shí)自動(dòng)刪除。
如果要在client共享一個(gè)queue,可以做如下聲明:
channel.exchangeDeclare(exchangeName, "direct", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey);這里的queue是durable的,非排他的,non-autodelete, 而且也有一個(gè)確定的已知的名稱(又Client指定而非broker端自動(dòng)生成)。
注意:Channel的API方法都是可以重載的,比如exchangeDeclare,queueDeclare根據(jù)參數(shù)的不同,可以有不同的重載形式,根據(jù)自身的需要去進(jìn)行調(diào)用。
Publish messages
如果要發(fā)送一個(gè)消息可以采用Channel.basicPublish的方式:
byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);為了更好的控制,你也可以使用mandatory這個(gè)屬性,或者可以發(fā)送一些特定屬性的消息:
channel.basicPublish(exchangeName, routingKey, mandatory,MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);這個(gè)方法發(fā)送了一條消息,這條消息的delivery mode為2,即消息需要被持久化在broker中,同時(shí)priority優(yōu)先級(jí)為1,content-type為text/plain。你可以可以自己設(shè)定消息的屬性:
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("bob").build()),messageBodyBytes);你也可以發(fā)送一條帶有header的消息:
Map<String, Object> headers = new HashMap<String, Object>(); headers.put("latitude", 51.5252949); headers.put("longitude", -0.0905493);channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);你也可以發(fā)送一條帶有超時(shí)時(shí)間expiration的消息:
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);以上只是舉例,由于篇幅關(guān)系,這里就不一一列舉所有的可能情形了。
Channel#basicPublish方法在以下兩種情形下會(huì)被阻塞,具體可以參考http://www.rabbitmq.com/alarms.html:
- When memory use goes above the configured limit.(內(nèi)存不夠)
- When disk space drops below the configured limit.(磁盤空間不足)
Channles and Concurrency Consideration(Thread Safaty)
Channel實(shí)例不能在線程建共享,應(yīng)用程序應(yīng)該為每一個(gè)線程開(kāi)辟一個(gè)Channel, 而不是在多線程建共享Channel。某些情況下Channel的操作可以并發(fā)運(yùn)行,但是某些情況下并發(fā)會(huì)導(dǎo)致在網(wǎng)絡(luò)上錯(cuò)誤的幀交叉,同時(shí)也會(huì)影響publisher confirm, 故多線程共享Channel是非線程安全的。
Receiving messages by subscription
import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;接受消息一般是通過(guò)實(shí)現(xiàn)Consumer接口或者繼承DefaultConsumer來(lái)實(shí)現(xiàn)。當(dāng)調(diào)用與Consumer相關(guān)的API方法時(shí),不同的訂閱采用consumer tags以作彼此的區(qū)分,在同一個(gè)Channel中的Consumer也需要通過(guò)唯一的consumer tags以作區(qū)分。。
消費(fèi)消息demo如下:
boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{String routingKey = envelope.getRoutingKey();String contentType = properties.getContentType();long deliveryTag = envelope.getDeliveryTag();// (process the message components here ...)channel.basicAck(deliveryTag, false);}});注意到上面代碼我們顯示的設(shè)置autoAck=false, 對(duì)于Consumer來(lái)說(shuō)這個(gè)設(shè)置是非常必要的。(譯者注:具體可以參考RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)中Consumer確認(rèn)那一章節(jié)。)
同時(shí)對(duì)于Consumer來(lái)說(shuō)重寫handleDelivery方法也是十分方便的。更復(fù)雜的Consumer會(huì)重寫(override)更多的方法,比如handleShutdownSignal當(dāng)channels和connections close的時(shí)候會(huì)調(diào)用,handleConsumeOk在其他callback方法之前調(diào)用,返回consumer tags.
Consumer同樣可以override handleCancelOk和handleCancel方法,這樣在顯示的或者隱式的取消的時(shí)候調(diào)用。
你可以通過(guò)Channel.basicCancel方法顯示的cancel一個(gè)指定的Consumer:
channel.basicCancel(consumerTag);(譯者注:這句代碼首先觸發(fā)handleConsumerOk,之后觸發(fā)handleDelivery方法,最后觸發(fā)handleCancelOk方法。)
單個(gè)Consumer在Connection上都分配單個(gè)的線程來(lái)調(diào)用這些callback的方法,也就是說(shuō)Consumer這里安全的調(diào)用阻塞式的方法,比如queueDeclare, txCommit, basicCancel或者basicPublish。
每個(gè)Channel都有自己的獨(dú)立的線程。最常用的用法是一個(gè)Channel對(duì)應(yīng)一個(gè)Consumer, 也就是意味著Consumers彼此間沒(méi)有任何關(guān)聯(lián)。當(dāng)然你也可以在一個(gè)Channel中維持多個(gè)Consumers, 但是要注意一個(gè)問(wèn)題,如果在Channel的一個(gè)Consumer一直在運(yùn)行,那么對(duì)于其他Consumer的callbacks而言會(huì)被hold up(耽擱)。
Retrieving individual messages
通過(guò)Channel.basicGet可以一個(gè)一個(gè)的獲取消息,其返回值是GetResponse(from which the header information(properties) and message body can be extracted)。
示例Demo如下:
如果設(shè)置autoAck為false,那么你同樣需要顯示的調(diào)用Channel.basicAck來(lái)確認(rèn)消息已經(jīng)被成功的接受了:
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message(譯者注:有關(guān)RabbitMQ的消費(fèi)端的更多信息可以參考:RabbitMQ之Consumer消費(fèi)模式(Push & Pull))
Handing unroutable messages
如果一個(gè)消息在publish的時(shí)候設(shè)置了mandatory標(biāo)記,如果消息沒(méi)有成功的路由到某個(gè)隊(duì)列的時(shí)候,broker端會(huì)通過(guò)Basic.Return返回回來(lái)。
這時(shí)候客戶端需要實(shí)現(xiàn)ReturnListener這個(gè)接口,并且調(diào)用Channel.setReturnListener。 如果client沒(méi)有配置相關(guān)的return listener那么相應(yīng)的需要被returned的消息就會(huì)被drop掉。
channel.setReturnListener(new ReturnListener() {public void handleBasicReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body)throws IOException {...} });(譯者注:有關(guān)mandatory的更多內(nèi)容可以參考:RabbitMQ之mandatory和immediate。)
Shutdown Protocol
Overview of the AMQP client shutdown
AMQP-0-9-1的connection和channel采用同樣的方式來(lái)管理網(wǎng)絡(luò)失敗,內(nèi)部錯(cuò)誤以及顯示的local shutdown。
AMQP-0-9-1的connection和channel具備如下的生命周期狀態(tài)(lifecycle states):
- open: the object is ready to use.
- closing:當(dāng)前對(duì)象被顯示的通知調(diào)用shutdown,這樣就產(chǎn)生了一個(gè)shutdown的請(qǐng)求至lower-layer的對(duì)象進(jìn)行相應(yīng)的操作,并等待這些shutdown操作的完成。
- closed:當(dāng)前對(duì)象已經(jīng)接受到所有的shutdown完成的通知,并且也shutdown了自身。
這些對(duì)象最終成closed的狀態(tài),而不管是由于什么原因引起的,或者是一個(gè)applicatin request,或者是內(nèi)部client library的失敗,或者是a remote network request, 亦或者是network failure。
AMQP的connecton和channel對(duì)象控制(possess)了shutdown-related的方法:addShutdownListener(ShutdownListener listener)和removeShutdownListener(ShutdownListener listener)。當(dāng)connection和channel轉(zhuǎn)向closed狀態(tài)時(shí)會(huì)調(diào)用ShutdownListener, 而且如果將一個(gè)ShutdownListener注冊(cè)到一個(gè)已經(jīng)處于closed狀態(tài)的object(特指connection或者channel的對(duì)象)時(shí),會(huì)立刻調(diào)用ShutdownListener。
- getCloseReason():可以讓你知道the object’s shutdown的原因。
- isOpen():檢測(cè)the objects當(dāng)前的是否處于open狀態(tài)。
- close(int closeCode, String closeMessage):顯示的通知the object執(zhí)行shutdown。
示例代碼:
import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.ShutdownListener;connection.addShutdownListener(new ShutdownListener() {public void shutdownCompleted(ShutdownSignalException cause){...} });Information about the ircumstances of a shutdown
當(dāng)觸發(fā)ShutdownListener的時(shí)候,就可以獲取到ShutdownSignalException,這個(gè)ShutdownSignalException包含了close的原因,這個(gè)原因也可以通過(guò)getCloseReason()方法獲取。
ShutdownSignalException提供了多個(gè)方法用來(lái)分析shutdown的原因。isHardError()方法可以知道是connection還是channel的error,getReason()方法可以獲取cause相關(guān)的信息(以AMQP method的形式,com.rabbitmq.client.Method:AMQP.Channel.Close or AMQP.Connection.Close):
public void shutdownCompleted(ShutdownSignalException cause) {if (cause.isHardError()){Connection conn = (Connection)cause.getReference();if (!cause.isInitiatedByApplication()){Method reason = cause.getReason();...}...} else {Channel ch = (Channel)cause.getReference();...} }Atomicity and use of the isOpen() method
我們并不推薦在生產(chǎn)環(huán)境的代碼上使用channel或者connection的isOpen()方法,這個(gè)isOpen()方法的返回值依賴于shutdown cause的存在,有可能會(huì)產(chǎn)生競(jìng)爭(zhēng)。
(譯者添加:關(guān)于isOpen依賴于shutdown cause, isOpen的實(shí)現(xiàn)代碼如下:)
錯(cuò)誤的使用方式如下:
public void brokenMethod(Channel channel) {if (channel.isOpen()){// The following code depends on the channel being in open state.// However there is a possibility of the change in the channel state// between isOpen() and basicQos(1) call...channel.basicQos(1);} }正確的使用方式:
public void validMethod(Channel channel) {try {...channel.basicQos(1);} catch (ShutdownSignalException sse) {// possibly check if channel was closed// by the time we started action and reasons for// closing it...} catch (IOException ioe) {// check why connection was closed...} }Advanced Connection options
Consumer thread pool
默認(rèn)情況下客戶端會(huì)自動(dòng)分配一個(gè)ExecutorService給Consumer線程,同樣你也可以使用自定義的線程池,比如:
ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es);當(dāng)connection關(guān)閉的時(shí)候,默認(rèn)的ExecutorService會(huì)被shutdown,但是如果是自定義的ExecutorService將不會(huì)被自動(dòng)的shutdown,所以Clients程序需要在最終關(guān)閉的時(shí)候手動(dòng)的去執(zhí)行shutdown(),否則將會(huì)阻止JVM的正常關(guān)閉。
同一個(gè)executor service可以被多個(gè)connections共用。除非有明顯的證據(jù)證明默認(rèn)的ExecutorService不能滿足當(dāng)前Consumer callbacks的需要,否則不建議使用自定義的ExecutorService.
Using Lists of Hosts
可以通過(guò)使用Address來(lái)執(zhí)行newConnection(). com.rabbitmq.client.Address的使用是比較方便的,例如:
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1), new Address(hostname2, portnumber2)}; Connection conn = factory.newConnection(addrArr);如果hostname1:portnumber1成功了連接,而hostname2:portnumber2連接失敗了,connection照樣會(huì)成功returned, 也不會(huì)跑出IOException。這個(gè)和你重復(fù)設(shè)置host,port然后調(diào)用factory.newConnection()直到有一組成功為止一個(gè)效果。
同樣可以指定自定義的ExecutorService, 比如:factory.newConnection(es, addrArr)。
If you want moew control over the host to connect to, see the support for service discovery.
Service discovery with the AddressResolver interface
在版本3.6.6開(kāi)始,可以通過(guò)AddressResolver接口的實(shí)現(xiàn)來(lái)創(chuàng)建connection:
Connection conn = factory.newConnection(addressResolver);AddressResolver接口如下:
public interface AddressResolver {List<Address> getAddresses() throws IOException; }使用AddressResolver可以更好的實(shí)現(xiàn)custom service discovery邏輯,和“automatic recovery”組合使用,客戶端可以自動(dòng)的和broker nodes連接.AddressResolver也可以有效的配合負(fù)載均衡策略。
AddressResolver有兩個(gè)實(shí)現(xiàn):DnsRecordIpAddressResolver和DnsSrvRecordAddressResolver.(博主沒(méi)用過(guò)AddressResolver,這里就不多做解釋了)
Heartbeat Timeout
有關(guān)Heartbeat的內(nèi)容請(qǐng)參考Heatbeats guide。(原文就是這么說(shuō)的。)
Custom Thread Factories
略。和Google App Engine有關(guān)。
Support for Java non-blocking IO
4.0版本開(kāi)始客戶端引入了java的NIO,這里引入NIO的目的不是為了比BIO的更快,而是是的更加容易的控制資源。
對(duì)于默認(rèn)的BIO模式,每個(gè)connection都需要一個(gè)獨(dú)立的線程來(lái)進(jìn)行網(wǎng)絡(luò)通訊。但在NIO模式下,你可以控制網(wǎng)絡(luò)通訊讀寫線程的數(shù)量。
如果你的java程序需要許多的connections(幾十個(gè)或者幾百個(gè)),那么使用NIO模式是一個(gè)很好的選擇。相比BIO而言,你所使用的線程數(shù)很少,通過(guò)設(shè)置合理的線程數(shù),你可以不必?fù)?dān)心性能的損耗,尤其是在connections不怎么busy的時(shí)候。
NIO必須被顯示的設(shè)置:
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.useNio();你也可以設(shè)置NIO的參數(shù):
connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));NIO模式下使用合理的默認(rèn)值,同時(shí)你也可以根據(jù)自身的負(fù)載情況來(lái)進(jìn)行合理的變換。
Automatic Recovery From Network Failures
Connection Recovery
客戶端和broker之間的網(wǎng)絡(luò)通訊可能會(huì)失敗。RabbitMQ java client支持connections和拓?fù)鋞opology(指queues, exchanges, bindings and consumers)的自動(dòng)回復(fù)。自動(dòng)恢復(fù)過(guò)程有如下幾個(gè)步驟:
- Reconnect
- Restore connection listeners
- Re-open channels
- Restore channel listeners
- Restore channel basic.qos setting, publisher confirms and transaction settings
topology的恢復(fù)包括如下行為,performed for every channel:
- Re-declare exchange (exception for predefined ones)
- Re-declare queues
- Recover all bindings
- Recover all consumers
在版本4.0.0開(kāi)始,自動(dòng)回復(fù)默認(rèn)是開(kāi)啟的。你也通過(guò)factory.setAutomaticRecoveryEnabled(boolean)可以手動(dòng)的設(shè)置automatic onnection recovery.
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(true); // connection that will recover automatically Connection conn = factory.newConnection();如果由于某些異常(比如RabbitMQ節(jié)點(diǎn)始終連接不上)而導(dǎo)致的恢復(fù)失敗。那么會(huì)在某個(gè)特定的時(shí)間間隔內(nèi)重試,默認(rèn)此間隔為5s,當(dāng)然此值可配:
ConnectionFactory factory = new ConnectionFactory(); // attempt recovery every 10 seconds factory.setNetworkRecoveryInterval(10000);Recovery Listeners
It is possible to register one or more recovery listeners on recoverable connections and channels. 當(dāng)connection recovery啟用的時(shí)候,通過(guò)調(diào)用ConnectionFactory#newConnection和Connection#createChannel返回的connections實(shí)現(xiàn)com.rabbitmq.client.Recoverable. 這里提供了兩個(gè)方法:addRecoveryListener和removerRecoveryListener.
/*** Provides a way to register (network, AMQP 0-9-1) connection recovery* callbacks.** When connection recovery is enabled via {@link ConnectionFactory},* {@link ConnectionFactory#newConnection()} and {@link Connection#createChannel()}* return {@link Recoverable} connections and channels.** @see com.rabbitmq.client.impl.recovery.AutorecoveringConnection* @see com.rabbitmq.client.impl.recovery.AutorecoveringChannel*/ public interface Recoverable {/*** Registers a connection recovery callback.** @param listener Callback function*/void addRecoveryListener(RecoveryListener listener);void removeRecoveryListener(RecoveryListener listener); }當(dāng)然你必須將connections和channels強(qiáng)制轉(zhuǎn)換為Recoverable的才能使用這些方法。
Effects on Publishing
消息通過(guò)Channel,basicPublish發(fā)布,如果connection down了那么消息就會(huì)丟失。客戶端不會(huì)在connection恢復(fù)之后重新delivery這些消息。為了確保消息的可靠性,可以參考Publisher Confims.(或者可以參考博主的博文:RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm))。
Topology Recovery
Topology recovery涉及到exchanges, queues, bindings and consumer.當(dāng)automatic recovery可用時(shí)topology recovery默認(rèn)也可用。當(dāng)然topology也可顯示的設(shè)置為disabled:
ConnectionFactory factory = new ConnectionFactory();Connection conn = factory.newConnection(); // enable automatic recovery (e.g. Java client prior 4.0.0) factory.setAutomaticRecoveryEnabled(true); // disable topology recovery factory.setTopologyRecoveryEnabled(false);Manual Acknowledgements and Automatic Recovery
當(dāng)autoAck設(shè)置為false的時(shí)候,在消息delivery和ack的時(shí)候有可能會(huì)由于網(wǎng)絡(luò)原因故障,在connection recovery之后,RabbitMQ會(huì)將所有的channels的delivery tags進(jìn)行重置。這就意味著basic.ack, basic,nack以及basic.reject帶有old delivery tags的將會(huì)引起channel exception。為了解決這個(gè)為題,RabbitMQ java client會(huì)記錄和更新相應(yīng)的delivery tags來(lái)確保在恢復(fù)期間保持單調(diào)遞增。帶有過(guò)時(shí)的delivery tags的ack將不會(huì)被發(fā)送。采用manual ack和automatic recovery的應(yīng)用必須具備處理redeliveries的能力。
Unhandled Exceptions
在connection, channel, recovery, consumer生命周期內(nèi)涉及的未被處理的異常可以委托給exception handler. Exception handler實(shí)現(xiàn)了ExceptionHandler這個(gè)接口,默認(rèn)情況下使用的是DefaultExceptionHandler, 只是在標(biāo)準(zhǔn)輸出流中打印一些exception的細(xì)節(jié)。
你可以使用ConnectionFactory#setExceptionHandler來(lái)override這個(gè)handler,這個(gè)handler可以被ConnectionFactory創(chuàng)建的所有的Connections所使用:
ConnectionFactory factory = new ConnectionFactory(); factory.setExceptionHandler(customHandler);Metrics and monitoring
RabbitMQ Java Client on Google App Engine
Cavets and Limitations
The RPC Pattern
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的(RabbitMQ) Java Client API Guide的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: RabbitMQ之镜像队列
- 下一篇: 如何获取Kafka的消费者详情——从Sc