日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[五]RabbitMQ-客户端源码之AMQChannel

發布時間:2024/4/11 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [五]RabbitMQ-客户端源码之AMQChannel 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。

歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-amqchannel/


AMQChannel是一個抽象類,是ChannelN的父類。其中包含唯一的抽象方法:

/*** Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method* returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as* usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close.* @param command the command to handle asynchronously* @return true if we handled the command; otherwise the caller should consider it "unhandled"*/ public abstract boolean processAsync(Command command) throws IOException;

有關processAsync()這個方法的會在介紹ChannelN類的時候詳細闡述([八]RabbitMQ-客戶端源碼之ChannelN)。


首先來說下AMQChannel的成員變量:

protected final Object _channelMutex = new Object(); /** The connection this channel is associated with. */ private final AMQConnection _connection; /** This channel's channel number. */ private final int _channelNumber; /** Command being assembled */ private AMQCommand _command = new AMQCommand(); /** The current outstanding RPC request, if any. (Could become a queue in future.) */ private RpcContinuation _activeRpc = null; /** Whether transmission of content-bearing methods should be blocked */ public volatile boolean _blockContent = false;
  • _channelMutex這個是內部用來當對象鎖的,沒有實際的意義,可忽略
  • _connection是指AMQConnection這個對象。
  • _channelNumber是指channel number, 這個應該不用多解釋了吧。通道編號為0的代表全局連接中的所有幀,1-65535代表特定通道的幀.
  • _command是內部處理使用的對象,調用AMQCommand的方法來處理一些東西。
  • _activeRpc是指當前未處理完的rpc請求(the current outstanding rpc request)。
  • _blockContent 是在Channel.Flow里用到的,其余情況都是false
    在AMQChannel的構造函數中,只有兩個參數:AMQConnection connection以及int channelNumber.

AMQChannel中有個handleFrame方法:

/*** Private API - When the Connection receives a Frame for this* channel, it passes it to this method.* @param frame the incoming frame* @throws IOException if an error is encountered*/ public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);} }/*** Private API - handle a command which has been assembled* @throws IOException if there's any problem** @param command the incoming command* @throws IOException*/ public void handleCompleteInboundCommand(AMQCommand command) throws IOException {// First, offer the command to the asynchronous-command// handling mechanism, which gets to act as a filter on the// incoming command stream. If processAsync() returns true,// the command has been dealt with by the filter and so should// not be processed further. It will return true for// asynchronous commands (deliveries/returns/other events),// and false for commands that should be passed on to some// waiting RPC continuation.if (!processAsync(command)) {// The filter decided not to handle/consume the command,// so it must be some reply to an earlier RPC.nextOutstandingRpc().handleCommand(command);markRpcFinished();} }

這個在[六]RabbitMQ-客戶端源碼之AMQCommand有所介紹,主要是用來處理Frame幀的,當調用AMQCommand的handleFrame處理之后返回為true是,即處理完畢時繼續調用handleCompleteInboundCommand方法。這其中也牽涉到AMQConnection的MainLoop內部類,具體可以看看:[六]RabbitMQ-客戶端源碼之AMQCommand。


AMQChannel中有很多方法帶有rpc的字樣,這來做一個整理。
首先是:

public void enqueueRpc(RpcContinuation k) {synchronized (_channelMutex) {boolean waitClearedInterruptStatus = false;while (_activeRpc != null) {try {_channelMutex.wait();} catch (InterruptedException e) {waitClearedInterruptStatus = true;}}if (waitClearedInterruptStatus) {Thread.currentThread().interrupt();}_activeRpc = k;} }

這個方法在AMQConnection.start()方法中有過使用:_channel0.enqueueRpc(conStartBroker)。這個方法就是將參數付給成員變量_activeRpc,至于這個RpcContinuation到底是個什么gui,我們下面再講。
繼續下一個方法:

public boolean isOutstandingRpc() {synchronized (_channelMutex) {return (_activeRpc != null);} }

這個方法是判斷一下當前的_activeRpc是否為null,為null則為false,否則為true。看方法的名字應該猜出大半。
下面一個方法:

public RpcContinuation nextOutstandingRpc() {synchronized (_channelMutex) {RpcContinuation result = _activeRpc;_activeRpc = null;_channelMutex.notifyAll();return result;} }

方法將當前的_activeRpc返回,并置AQMChannel的_activeRpc為null。

接下來幾個方法聯系性很強:

/*** Protected API - sends a {@link Method} to the broker and waits for the* next in-bound Command from the broker: only for use from* non-connection-MainLoop threads!*/ public AMQCommand rpc(Method m)throws IOException, ShutdownSignalException {return privateRpc(m); }public AMQCommand rpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {return privateRpc(m, timeout); }private AMQCommand privateRpc(Method m)throws IOException, ShutdownSignalException {SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);// At this point, the request method has been sent, and we// should wait for the reply to arrive.//// Calling getReply() on the continuation puts us to sleep// until the connection's reader-thread throws the reply over// the fence.return k.getReply(); }private AMQCommand privateRpc(Method m, int timeout)throws IOException, ShutdownSignalException, TimeoutException {SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();rpc(m, k);return k.getReply(timeout); }public void rpc(Method m, RpcContinuation k)throws IOException {synchronized (_channelMutex) {ensureIsOpen();quiescingRpc(m, k);} }public void quiescingRpc(Method m, RpcContinuation k)throws IOException {synchronized (_channelMutex) {enqueueRpc(k);quiescingTransmit(m);} }

主要是看最后一個方法——quiescingRpc.這個方法說白就兩行代碼:
enqueueRpc(k);是將由privateRpc等方法內部創建的SimpleBlockingRpcContinuation對象附給當前的AQMChannel對象的成員變量_activeRpc
關于quiescingTransmit(m)就要接下去看了:

public void quiescingTransmit(Method m) throws IOException {synchronized (_channelMutex) {quiescingTransmit(new AMQCommand(m));} } public void quiescingTransmit(AMQCommand c) throws IOException {synchronized (_channelMutex) {if (c.getMethod().hasContent()) {while (_blockContent) {try {_channelMutex.wait();} catch (InterruptedException e) {}// This is to catch a situation when the thread wakes up during// shutdown. Currently, no command that has content is allowed// to send anything in a closing state.ensureIsOpen();}}c.transmit(this);} }

上面代碼只需要看: c.transmit(this);這一句,其余的都是擺設。看到這里,就調用了AMQCommand的transmit方法,這個transmit方法就是講AMQChannel中封裝的內容發給broker,然后等待broker返回,進而通過之前附值的_activeRpc來處理回傳的幀。

雖然之前在AMQConnection([二]RabbitMQ-客戶端源碼之AMQConnection)中詳細講述了start()方法,但是這里還是要來拿這個來舉例這個AMQChannel中的rpc怎么使用
在AMQConnection中有這么一段代碼:

Method method = (challenge == null)? new AMQP.Connection.StartOk.Builder().clientProperties(_clientProperties).mechanism(sm.getName()).response(response).build(): new AMQP.Connection.SecureOk.Builder().response(response).build();try {Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod();if (serverResponse instanceof AMQP.Connection.Tune) {connTune = (AMQP.Connection.Tune) serverResponse;} else {challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();response = sm.handleChallenge(challenge, this.username, this.password);}

客戶端將Method封裝成Connection.StartOk幀之后等待broker返回Connection.Tune幀。
此時調用了AMQChannel的rpc(Method m, int timeout)方法,其間接調用了AMQChannel的privateRpc(Method, int timeout)方法。代碼詳情上面已經羅列出來。

注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);這句代碼的意思是SimpleBlockingRpcContinuation對象在等待broker的返回,確切的來說是MainLoop線程處理之后返回,即AMQChannel類中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)這行代碼。


AQMChannel還有些其他的內容,都是邊緣性的東西,這里還剩下個RpcContinuation要著重闡述下的:

public interface RpcContinuation {void handleCommand(AMQCommand command);void handleShutdownSignal(ShutdownSignalException signal); }public static abstract class BlockingRpcContinuation<T> implements RpcContinuation {public final BlockingValueOrException<T, ShutdownSignalException> _blocker =new BlockingValueOrException<T, ShutdownSignalException>();public void handleCommand(AMQCommand command) {_blocker.setValue(transformReply(command));}public void handleShutdownSignal(ShutdownSignalException signal) {_blocker.setException(signal);}public T getReply() throws ShutdownSignalException{return _blocker.uninterruptibleGetValue();}public T getReply(int timeout)throws ShutdownSignalException, TimeoutException{return _blocker.uninterruptibleGetValue(timeout);}public abstract T transformReply(AMQCommand command); }public static class SimpleBlockingRpcContinuationextends BlockingRpcContinuation<AMQCommand> {public AMQCommand transformReply(AMQCommand command) {return command;} }

RPCContinuation只是一個接口,而BlockingRpcContinuation這個抽象類缺似乎略有門道。而SimpleBlockingRpcContinuation只是將BlockingRpcContinuation中的handleCommand方法便成為:

_blocker.setValue(command);

BlockingRpcContinuation類主要操縱了BlockingValueOrException _blocker這個成員變量。再接下深究BlockingValueOrException其實是繼承了BlockingCell,對其做了一下簡單的封裝。最后來看下BlockingCell是個什么鬼, 截取部分代碼如下:

public class BlockingCell<T> {private boolean _filled = false;private T _value;public synchronized T get() throws InterruptedException {while (!_filled) {wait();}return _value;}

其實這個就是capacity為1的BlockingQueue,顧美其名曰BlockingCell,繞了大半圈,原來AMQChannel中的_activeRpc就是個這么玩意兒~


附:本系列全集

  • [Conclusion]RabbitMQ-客戶端源碼之總結
  • [一]RabbitMQ-客戶端源碼之ConnectionFactory
  • [二]RabbitMQ-客戶端源碼之AMQConnection
  • [三]RabbitMQ-客戶端源碼之ChannelManager
  • [四]RabbitMQ-客戶端源碼之Frame
  • [五]RabbitMQ-客戶端源碼之AMQChannel
  • [六]RabbitMQ-客戶端源碼之AMQCommand
  • [七]RabbitMQ-客戶端源碼之AMQPImpl+Method
  • [八]RabbitMQ-客戶端源碼之ChannelN
  • [九]RabbitMQ-客戶端源碼之Consumer
  • 歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-amqchannel/


    歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


    總結

    以上是生活随笔為你收集整理的[五]RabbitMQ-客户端源码之AMQChannel的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。