日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[六]RabbitMQ-客户端源码之AMQCommand

發(fā)布時(shí)間:2024/4/11 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [六]RabbitMQ-客户端源码之AMQCommand 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。

歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-amqcommand/


AMQCommand是用來處理AMQ命令的,其包含了Method, Content Heaeder和Content Body.
下面是通過wireshark抓包的AMQP協(xié)議

上圖中的Basic.Publish命令就包含Method, Content header以及Content body。

AMQCommand不是直接包含Method等成員變量的,而是通過CommandAssembler又做了一次封裝。
接下來先看下CommandAssembler類。此類中有這些成員變量:

/** Current state, used to decide how to handle each incoming frame. */ private enum CAState {EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE } private CAState state;/** The method for this command */ private Method method;/** The content header for this command */ private AMQContentHeader contentHeader;/** The fragments of this command's content body - a list of byte[] */ private final List<byte[]> bodyN; /** sum of the lengths of all fragments */ private int bodyLength;/** No bytes of content body not yet accumulated */ private long remainingBodyBytes;
  • CAState state標(biāo)識(shí)這此Command目前的狀態(tài),是準(zhǔn)備處理Method(EXPECTING_METHOD),還是處理Content header(EXPECTING_CONTENT_HEADER),還是準(zhǔn)備處理Content body(EXPECTING_CONTENT_BODY),還是以及完成了(COMPLETE)。
  • Method method代表type=Method的AMQP幀
  • AMQContentHeader contentHeader代表type=Content header的AMQP幀
  • final List<byte[]> bodyN代表type=Content body的AMQP幀,就是真正的消息體(Message body)。
  • bodyLength就是消息體大小

這個(gè)類中除了構(gòu)造函數(shù),getMethod, getContentHeader, getContentBody,isComplete這個(gè)幾個(gè)方法,最關(guān)鍵的方法就是:

public synchronized boolean handleFrame(Frame f) throws IOException {switch (this.state) {case EXPECTING_METHOD: consumeMethodFrame(f); break;case EXPECTING_CONTENT_HEADER: consumeHeaderFrame(f); break;case EXPECTING_CONTENT_BODY: consumeBodyFrame(f); break;default:throw new AssertionError("Bad Command State " + this.state);}return isComplete(); }

這個(gè)方法主要是處理AQMP幀的,根據(jù)CAState state來處理相應(yīng)狀態(tài)類型的幀,然后賦值給相應(yīng)的成員變量。
采用consumeMethodFrame(Frame f)方法舉個(gè)例子:

private void consumeMethodFrame(Frame f) throws IOException {if (f.type == AMQP.FRAME_METHOD) {this.method = AMQImpl.readMethodFrom(f.getInputStream());this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;} else {throw new UnexpectedFrameError(f, AMQP.FRAME_METHOD);} }

這個(gè)方法首先判斷當(dāng)前幀是否是Method幀(AMQP.FRAME_METHOD),然后調(diào)用AMQPImp.readMethodFrom的方法。就那Connection.Start這個(gè)真來將,它會(huì)從socket的輸入流中讀取:

public Start(MethodArgumentReader rdr) throws IOException {this(rdr.readOctet(), rdr.readOctet(), rdr.readTable(), rdr.readLongstr(), rdr.readLongstr()); }

對(duì)應(yīng)于下圖:

  • 第一個(gè)rdr.readOctet()是指Version-Magor:0
  • 第二個(gè)rdr.readOctet()是指Version-Minor:9
  • 第三個(gè)rdr.readTable()是指Server-Properties
  • 第四個(gè)rdr.readLongstr()是指Mechanisms
  • 第五個(gè)rdr.readLongstr()是指Locales

而MethodArgumentReader.readOctet()就是:

public final int readOctet()throws IOException {clearBits();return in.readOctet();//in對(duì)象是DataInputStream對(duì)象 }

寫到這里,思路再跳回來,知道了底層其實(shí)是Socket的DataInputStream,其上只是做了封裝再封裝
CommandAssembler 中的handleFrame這個(gè)方法只在AMQCommand中的:

private final CommandAssembler assembler; public boolean handleFrame(Frame f) throws IOException {return this.assembler.handleFrame(f); }

只在這個(gè)方法中調(diào)用。CommandAssembler只是對(duì)Method,Content-Header,Content-Body做了一下封裝。下面繼續(xù)回到AMQCommand這個(gè)類中來。
仔細(xì)閱讀源碼的同學(xué)會(huì)發(fā)現(xiàn)在handleFrame方法當(dāng)遇到類似Basic.Publish時(shí)會(huì)有Method,Content-Header,Content-Body一起的報(bào)文,那么handleFrame處理完Method之后就直接返回了,沒有完全處理完,這該如何是好?
這個(gè)就又要聯(lián)系到AMQConnection中的MainLoop的內(nèi)部類了。此類中的關(guān)鍵代碼如下:

while (_running) {Frame frame = _frameHandler.readFrame();if (frame != null) {_missedHeartbeats = 0;if (frame.type == AMQP.FRAME_HEARTBEAT) {// Ignore it: we've already just reset the heartbeat counter.} else {if (frame.channel == 0) { // the special channel_channel0.handleFrame(frame);} else {if (isOpen()) {// If we're still _running, but not isOpen(), then we// must be quiescing, which means any inbound frames// for non-zero channels (and any inbound commands on// channel zero that aren't Connection.CloseOk) must// be discarded.ChannelManager cm = _channelManager;if (cm != null) {cm.getChannel(frame.channel).handleFrame(frame);}}}}} else {// Socket timeout waiting for a frame.// Maybe missed heartbeat.handleSocketTimeout();} }

可以看到這是一個(gè)一直輪詢讀取Frame并處理Frame的過程。在遇到類似Basic.Publish這種帶Method, Content-Header, Content-Body的類型的報(bào)文時(shí),會(huì)循環(huán)處理,直到處理完成。注意這里的Method, Content-Header以及Content-Body都是看成單個(gè)Frame的,也就是這個(gè)while循環(huán)要三次,而不是將Basic.Publish看成一個(gè)幀。
上面調(diào)用的handleFrame方法是AMQChannel類中的(詳細(xì)可以參考([五]RabbitMQ-客戶端源碼之AMQChannel)):

public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);} }

可以看到只有當(dāng)AMQCommand的handleFrame方法返回true時(shí),即執(zhí)行完成之后才會(huì)繼續(xù)處理。


AMQCommand也有g(shù)etMethod, getContentHeader, getContentBody等方法,這些都是間接調(diào)用CommandAssembler類中相應(yīng)的方法的。
AMQCommand中也有個(gè)特別重要的方法:

/*** Sends this command down the named channel on the channel's* connection, possibly in multiple frames.* @param channel the channel on which to transmit the command* @throws IOException if an error is encountered*/ public void transmit(AMQChannel channel) throws IOException {int channelNumber = channel.getChannelNumber();AMQConnection connection = channel.getConnection();synchronized (assembler) {Method m = this.assembler.getMethod();connection.writeFrame(m.toFrame(channelNumber));if (m.hasContent()) {byte[] body = this.assembler.getContentBody();connection.writeFrame(this.assembler.getContentHeader().toFrame(channelNumber, body.length));int frameMax = connection.getFrameMax();int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax- EMPTY_FRAME_SIZE;for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {int remaining = body.length - offset;int fragmentLength = (remaining < bodyPayloadMax) ? remaining: bodyPayloadMax;Frame frame = Frame.fromBodyFragment(channelNumber, body,offset, fragmentLength);connection.writeFrame(frame);}}}connection.flush(); }

這段主要通過傳輸AMQP幀的,通過AMQChannel獲取到通信鏈路connection,然后將AMQCommand對(duì)象自身的method成員變量(或者包括content-header以及content-body)傳送給broker。這段方法里還有判斷payload大小是否超過broker端所設(shè)置的最大幀大小frameMax,即(frameMax == 0) ? body.length : frameMax - EMPTY_FRAME_SIZE這段代碼。當(dāng)frameMax=0時(shí)則沒有大小限制,當(dāng)frameMax不為0時(shí)則按照payload拆分成若干的payload然后發(fā)送多個(gè)FRAME_BODY幀。


附:本系列全集

  • [Conclusion]RabbitMQ-客戶端源碼之總結(jié)
  • [一]RabbitMQ-客戶端源碼之ConnectionFactory
  • [二]RabbitMQ-客戶端源碼之AMQConnection
  • [三]RabbitMQ-客戶端源碼之ChannelManager
  • [四]RabbitMQ-客戶端源碼之Frame
  • [五]RabbitMQ-客戶端源碼之AMQChannel
  • [六]RabbitMQ-客戶端源碼之AMQCommand
  • [七]RabbitMQ-客戶端源碼之AMQPImpl+Method
  • [八]RabbitMQ-客戶端源碼之ChannelN
  • [九]RabbitMQ-客戶端源碼之Consumer
  • 歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-amqcommand/


    歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。


    總結(jié)

    以上是生活随笔為你收集整理的[六]RabbitMQ-客户端源码之AMQCommand的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。