[五]RabbitMQ-客户端源码之AMQChannel
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-amqchannel/
AMQChannel是一個抽象類,是ChannelN的父類。其中包含唯一的抽象方法:
/*** Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method* returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as* usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close.* @param command the command to handle asynchronously* @return true if we handled the command; otherwise the caller should consider it "unhandled"*/ public abstract boolean processAsync(Command command) throws IOException;有關processAsync()這個方法的會在介紹ChannelN類的時候詳細闡述([八]RabbitMQ-客戶端源碼之ChannelN)。
首先來說下AMQChannel的成員變量:
protected final Object _channelMutex = new Object(); /** The connection this channel is associated with. */ private final AMQConnection _connection; /** This channel's channel number. */ private final int _channelNumber; /** Command being assembled */ private AMQCommand _command = new AMQCommand(); /** The current outstanding RPC request, if any. (Could become a queue in future.) */ private RpcContinuation _activeRpc = null; /** Whether transmission of content-bearing methods should be blocked */ public volatile boolean _blockContent = false;- _channelMutex這個是內部用來當對象鎖的,沒有實際的意義,可忽略
- _connection是指AMQConnection這個對象。
- _channelNumber是指channel number, 這個應該不用多解釋了吧。通道編號為0的代表全局連接中的所有幀,1-65535代表特定通道的幀.
- _command是內部處理使用的對象,調用AMQCommand的方法來處理一些東西。
- _activeRpc是指當前未處理完的rpc請求(the current outstanding rpc request)。
- _blockContent 是在Channel.Flow里用到的,其余情況都是false
在AMQChannel的構造函數中,只有兩個參數:AMQConnection connection以及int channelNumber.
AMQChannel中有個handleFrame方法:
/*** Private API - When the Connection receives a Frame for this* channel, it passes it to this method.* @param frame the incoming frame* @throws IOException if an error is encountered*/ public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);} }/*** Private API - handle a command which has been assembled* @throws IOException if there's any problem** @param command the incoming command* @throws IOException*/ public void handleCompleteInboundCommand(AMQCommand command) throws IOException {// First, offer the command to the asynchronous-command// handling mechanism, which gets to act as a filter on the// incoming command stream. If processAsync() returns true,// the command has been dealt with by the filter and so should// not be processed further. It will return true for// asynchronous commands (deliveries/returns/other events),// and false for commands that should be passed on to some// waiting RPC continuation.if (!processAsync(command)) {// The filter decided not to handle/consume the command,// so it must be some reply to an earlier RPC.nextOutstandingRpc().handleCommand(command);markRpcFinished();} }這個在[六]RabbitMQ-客戶端源碼之AMQCommand有所介紹,主要是用來處理Frame幀的,當調用AMQCommand的handleFrame處理之后返回為true是,即處理完畢時繼續調用handleCompleteInboundCommand方法。這其中也牽涉到AMQConnection的MainLoop內部類,具體可以看看:[六]RabbitMQ-客戶端源碼之AMQCommand。
AMQChannel中有很多方法帶有rpc的字樣,這來做一個整理。
首先是:
這個方法在AMQConnection.start()方法中有過使用:_channel0.enqueueRpc(conStartBroker)。這個方法就是將參數付給成員變量_activeRpc,至于這個RpcContinuation到底是個什么gui,我們下面再講。
繼續下一個方法:
這個方法是判斷一下當前的_activeRpc是否為null,為null則為false,否則為true。看方法的名字應該猜出大半。
下面一個方法:
方法將當前的_activeRpc返回,并置AQMChannel的_activeRpc為null。
接下來幾個方法聯系性很強:
/*** Protected API - sends a {@link Method} to the broker and waits for the* next in-bound Command from the broker: only for use from* non-connection-MainLoop threads!*/ public AMQCommand rpc(Method m)throws IOException, ShutdownSignalException {return privateRpc(m); }public AMQCommand rpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {return privateRpc(m, timeout); }private AMQCommand privateRpc(Method m)throws IOException, ShutdownSignalException {SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);// At this point, the request method has been sent, and we// should wait for the reply to arrive.//// Calling getReply() on the continuation puts us to sleep// until the connection's reader-thread throws the reply over// the fence.return k.getReply(); }private AMQCommand privateRpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);return k.getReply(timeout); }public void rpc(Method m, RpcContinuation k)throws IOException {synchronized (_channelMutex) {ensureIsOpen();quiescingRpc(m, k);} }public void quiescingRpc(Method m, RpcContinuation k)throws IOException {synchronized (_channelMutex) {enqueueRpc(k);quiescingTransmit(m);} }主要是看最后一個方法——quiescingRpc.這個方法說白就兩行代碼:
enqueueRpc(k);是將由privateRpc等方法內部創建的SimpleBlockingRpcContinuation對象附給當前的AQMChannel對象的成員變量_activeRpc
關于quiescingTransmit(m)就要接下去看了:
上面代碼只需要看: c.transmit(this);這一句,其余的都是擺設。看到這里,就調用了AMQCommand的transmit方法,這個transmit方法就是講AMQChannel中封裝的內容發給broker,然后等待broker返回,進而通過之前附值的_activeRpc來處理回傳的幀。
雖然之前在AMQConnection([二]RabbitMQ-客戶端源碼之AMQConnection)中詳細講述了start()方法,但是這里還是要來拿這個來舉例這個AMQChannel中的rpc怎么使用
在AMQConnection中有這么一段代碼:
客戶端將Method封裝成Connection.StartOk幀之后等待broker返回Connection.Tune幀。
此時調用了AMQChannel的rpc(Method m, int timeout)方法,其間接調用了AMQChannel的privateRpc(Method, int timeout)方法。代碼詳情上面已經羅列出來。
注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);這句代碼的意思是SimpleBlockingRpcContinuation對象在等待broker的返回,確切的來說是MainLoop線程處理之后返回,即AMQChannel類中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)這行代碼。
AQMChannel還有些其他的內容,都是邊緣性的東西,這里還剩下個RpcContinuation要著重闡述下的:
public interface RpcContinuation {void handleCommand(AMQCommand command);void handleShutdownSignal(ShutdownSignalException signal); }public static abstract class BlockingRpcContinuation<T> implements RpcContinuation {public final BlockingValueOrException<T, ShutdownSignalException> _blocker =new BlockingValueOrException<T, ShutdownSignalException>();public void handleCommand(AMQCommand command) {_blocker.setValue(transformReply(command));}public void handleShutdownSignal(ShutdownSignalException signal) {_blocker.setException(signal);}public T getReply() throws ShutdownSignalException{return _blocker.uninterruptibleGetValue();}public T getReply(int timeout)throws ShutdownSignalException, TimeoutException{return _blocker.uninterruptibleGetValue(timeout);}public abstract T transformReply(AMQCommand command); }public static class SimpleBlockingRpcContinuationextends BlockingRpcContinuation<AMQCommand> {public AMQCommand transformReply(AMQCommand command) {return command;} }RPCContinuation只是一個接口,而BlockingRpcContinuation這個抽象類缺似乎略有門道。而SimpleBlockingRpcContinuation只是將BlockingRpcContinuation中的handleCommand方法便成為:
_blocker.setValue(command);BlockingRpcContinuation類主要操縱了BlockingValueOrException _blocker這個成員變量。再接下深究BlockingValueOrException其實是繼承了BlockingCell,對其做了一下簡單的封裝。最后來看下BlockingCell是個什么鬼, 截取部分代碼如下:
public class BlockingCell<T> {private boolean _filled = false;private T _value;public synchronized T get() throws InterruptedException {while (!_filled) {wait();}return _value;}其實這個就是capacity為1的BlockingQueue,顧美其名曰BlockingCell,繞了大半圈,原來AMQChannel中的_activeRpc就是個這么玩意兒~
附:本系列全集
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-amqchannel/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的[五]RabbitMQ-客户端源码之AMQChannel的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [四]RabbitMQ-客户端源码之Fr
- 下一篇: [六]RabbitMQ-客户端源码之AM