日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty源码解析8-ChannelHandler实例之CodecHandler

發布時間:2025/6/17 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty源码解析8-ChannelHandler实例之CodecHandler 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

請戳GitHub原文: github.com/wangzhiwubi…

更多文章關注:多線程/集合/分布式/Netty/NIO/RPC

  • Java高級特性增強-集合
  • Java高級特性增強-多線程
  • Java高級特性增強-Synchronized
  • Java高級特性增強-volatile
  • Java高級特性增強-并發集合框架
  • Java高級特性增強-分布式
  • Java高級特性增強-Zookeeper
  • Java高級特性增強-JVM
  • Java高級特性增強-NIO
  • RPC
  • zookeeper
  • JVM
  • NIO
  • 其他更多

編解碼處理器作為Netty編程時必備的ChannelHandler,每個應用都必不可少。Netty作為網絡應用框架,在網絡上的各個應用之間不斷進行數據交互。而網絡數據交換的基本單位是字節,所以需要將本應用的POJO對象編碼為字節數據發送到其他應用,或者將收到的其他應用的字節數據解碼為本應用可使用的POJO對象。這一部分,又和JAVA中的序列化和反序列化對應。幸運的是,有很多其他的開源工具(protobuf,thrift,json,xml等等)可方便的處理POJO對象的序列化,可參見這個鏈接。 在互聯網中,Netty使用TCP/UDP協議傳輸數據。由于Netty基于異步事件處理以及TCP的一些特性,使得TCP數據包會發生粘包現象。想象這樣的情況,客戶端與服務端建立連接后,連接發送了兩條消息:

+------+ +------+ | MSG1 | | MSG2 | +------+ +------+ 復制代碼

在互聯網上傳輸數據時,連續發送的兩條消息,在服務端極有可能被合并為一條:

+------------+ | MSG1 MSG2 | +------------+ 復制代碼

這還不是最壞的情況,由于路由器的拆包和重組,可能收到這樣的兩個數據包:

+----+ +---------+ +-------+ +-----+ | MS | | G1MSG2 | 或者 | MSG1M | | SG2 | +----+ +---------+ +-------+ +-----+ 復制代碼

而服務端要正確的識別出這樣的兩條消息,就需要編碼器的正確工作。為了正確的識別出消息,業界有以下幾種做法:

使用定界符分割消息,一個特例是使用換行符分隔每條消息。 使用定長的消息。 在消息的某些字段指明消息長度。

明白了這些,進入正題,分析Netty的編碼框架ByteToMessageDecoder。

ByteToMessageDecoder

在分析之前,需要說明一點:ByteToMessage容易引起誤解,解碼結果Message會被認為是JAVA對象POJO,但實際解碼結果是消息幀。也就是說該解碼器處理TCP的粘包現象,將網絡發送的字節流解碼為具有確定含義的消息幀,之后的解碼器再將消息幀解碼為實際的POJO對象。 明白了這點,再次回顧兩條消息發送的最壞情況,可知要正確取得兩條消息,需要一個內存區域存儲消息,當收到MS時繼續等待第二個包G1MSG2到達再進行解碼操作。在ByteToMessageDecoder中,這個內存區域被抽象為Cumulator,直譯累積器,可自動擴容累積字節數據,Netty將其定義為一個接口:

public interface Cumulator {ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);} 復制代碼

其中,兩個ByteBuf參數cumulation指已經累積的字節數據,in表示該次channelRead()讀取到的新數據。返回ByteBuf為累積數據后的新累積區(必要時候自動擴容)。自動擴容的代碼如下:

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int newReadBytes) {ByteBuf oldCumulation = cumulation;// 擴容后新的緩沖區cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);cumulation.writeBytes(oldCumulation);// 舊的緩沖區釋放oldCumulation.release();return cumulation;} 復制代碼

自動擴容的方法簡單粗暴,直接使用大容量的Bytebuf替換舊的ByteBuf。Netty定義了兩個累積器,一個為MERGE_CUMULATOR:

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {ByteBuf buffer;// 1.累積區容量不夠容納數據// 2.用戶使用了slice().retain()或duplicate().retain()使refCnt增加if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()|| cumulation.refCnt() > 1) {buffer = expandCumulation(alloc, cumulation, in.readableBytes());} else {buffer = cumulation;}buffer.writeBytes(in);in.release();return buffer;}}; 復制代碼

可知,兩種情況下會擴容:

  • 累積區容量不夠容納新讀入的數據
  • 用戶使用了slice().retain()或duplicate().retain()使refCnt增加并且大于1,此時擴容返回一個新的累積區ByteBuf,方便用戶對老的累積區ByteBuf進行后續處理。
  • 另一個累積器為COMPOSITE_CUMULATOR:

    public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {ByteBuf buffer;if (cumulation.refCnt() > 1) {buffer = expandCumulation(alloc, cumulation, in.readableBytes());buffer.writeBytes(in);in.release();} else {CompositeByteBuf composite;if (cumulation instanceof CompositeByteBuf) {composite = (CompositeByteBuf) cumulation;} else {composite = alloc.compositeBuffer(Integer.MAX_VALUE);composite.addComponent(true, cumulation);}composite.addComponent(true, in);buffer = composite;}return buffer;}}; 復制代碼

    這個累積器只在第二種情況refCnt>1時擴容,除此之外處理和MERGE_CUMULATOR一致,不同的是當cumulation不是CompositeByteBuf時會創建新的同類CompositeByteBuf,這樣最后返回的ByteBuf必定是CompositeByteBuf。使用這個累積器后,當容量不夠時并不會進行內存復制,只會講新讀入的in加到CompositeByteBuf中。需要注意的是:此種情況下雖然不需內存復制,卻要求用戶維護復雜的索引,在某些使用中可能慢于MERGE_CUMULATOR。故Netty默認使用MERGE_CUMULATOR累積器。 累積器分析完畢,步入正題ByteToMessageDecoder,首先看類簽名:

    public abstract class ByteToMessageDecoder extendsChannelInboundHandlerAdapter 復制代碼

    該類是一個抽象類,其中的抽象方法只有一個decode():

    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception; 復制代碼

    用戶使用了該解碼框架后,只需實現該方法就可定義自己的解碼器。參數in表示累積器已累積的數據,out表示本次可從累積數據解碼出的結果列表,結果可為POJO對象或者ByteBuf等等Object。 關注一下成員變量,以便更好的分析:

    ByteBuf cumulation; // 累積區private Cumulator cumulator = MERGE_CUMULATOR; // 累積器// 設置為true后每個channelRead事件只解碼出一個結果private boolean singleDecode; // 某些特殊協議使用private boolean decodeWasNull; // 解碼結果為空private boolean first; // 是否首個消息// 累積區不丟棄字節的最大次數,16次后開始丟棄private int discardAfterReads = 16;private int numReads; // 累積區不丟棄字節的channelRead次數 復制代碼

    下面,直接進入channelRead()事件處理:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 只對ByteBuf處理即只對字節數據進行處理if (msg instanceof ByteBuf) {// 解碼結果列表CodecOutputList out = CodecOutputList.newInstance();try {ByteBuf data = (ByteBuf) msg;first = cumulation == null; // 累積區為空表示首次解碼if (first) {// 首次解碼直接使用讀入的ByteBuf作為累積區cumulation = data;} else {// 非首次需要進行字節數據累積cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);}callDecode(ctx, cumulation, out); // 解碼操作} catch (DecoderException e) {throw e;} catch (Throwable t) {throw new DecoderException(t);} finally {if (cumulation != null && !cumulation.isReadable()) {// 此時累積區不再有字節數據,已被處理完畢numReads = 0;cumulation.release();cumulation = null;} else if (++ numReads >= discardAfterReads) {// 連續discardAfterReads次后// 累積區還有字節數據,此時丟棄一部分數據numReads = 0;discardSomeReadBytes(); // 丟棄一些已讀字節}int size = out.size();// 本次沒有解碼出數據,此時size=0decodeWasNull = !out.insertSinceRecycled();fireChannelRead(ctx, out, size); // 觸發事件out.recycle(); // 回收解碼結果}} else {ctx.fireChannelRead(msg);}} 復制代碼

    解碼結果列表CodecOutputList是Netty定制的一個特殊列表,該列表在線程中被緩存,可循環使用來存儲解碼結果,減少不必要的列表實例創建,從而提升性能。由于解碼結果需要頻繁存儲,普通的ArrayList難以滿足該需求,故定制化了一個特殊列表,由此可見Netty對優化的極致追求。 注意finally塊的第一個if情況滿足時,即累積區的數據已被讀取完畢,請考慮釋放累積區的必要性。想象這樣的情況,當一條消息被解碼完畢后,如果客戶端長時間不發送消息,那么,服務端保存該條消息的累積區將一直占據服務端內存浪費資源,所以必須釋放該累積區。 第二個if情況滿足時,即累積區的數據一直在channelRead讀取數據進行累積和解碼,直到達到了discardAfterReads次(默認16),此時累積區依然還有數據。在這樣的情況下,Netty主動丟棄一些字節,這是為了防止該累積區占用大量內存甚至耗盡內存引發OOM。 處理完這些情況后,最后統一觸發ChannelRead事件,將解碼出的數據傳遞給下一個處理器。注意:當out=0時,統一到一起被處理了。 再看細節的discardSomeReadBytes()和fireChannelRead():

    protected final void discardSomeReadBytes() {if (cumulation != null && !first && cumulation.refCnt() == 1) {cumulation.discardSomeReadBytes();}}static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {for (int i = 0; i < numElements; i ++) {ctx.fireChannelRead(msgs.getUnsafe(i));}} 復制代碼

    代碼比較簡單,只需注意discardSomeReadBytes中,累積區的refCnt() == 1時才丟棄數據是因為:如果用戶使用了slice().retain()和duplicate().retain()使refCnt>1,表明該累積區還在被用戶使用,丟棄數據可能導致用戶的困惑,所以須確定用戶不再使用該累積區的已讀數據,此時才丟棄。 下面分析解碼核心方法callDecode():

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {while (in.isReadable()) {int outSize = out.size();if (outSize > 0) {// 解碼出消息就立即處理,防止消息等待fireChannelRead(ctx, out, outSize);out.clear();// 用戶主動刪除該Handler,繼續操作in是不安全的if (ctx.isRemoved()) {break;}outSize = 0;}int oldInputLength = in.readableBytes();decode(ctx, in, out); // 子類需要實現的具體解碼步驟// 用戶主動刪除該Handler,繼續操作in是不安全的if (ctx.isRemoved()) {break; }// 此時outSize都==0(這的代碼容易產生誤解 應該直接使用0)if (outSize == out.size()) {if (oldInputLength == in.readableBytes()) {// 沒有解碼出消息,且沒讀取任何in數據break;} else {// 讀取了一部份數據但沒有解碼出消息// 說明需要更多的數據,故繼續continue;}}// 運行到這里outSize>0 說明已經解碼出消息if (oldInputLength == in.readableBytes()) {// 解碼出消息但是in的讀索引不變,用戶的decode方法有Bugthrow new DecoderException("did not read anything but decoded a message.");}// 用戶設定一個channelRead事件只解碼一次if (isSingleDecode()) {break; }}} catch (DecoderException e) {throw e;} catch (Throwable cause) {throw new DecoderException(cause);}} 復制代碼

    循環中的第一個if分支,檢查解碼結果,如果已經解碼出消息則立即將消息傳播到下一個處理器進行處理,這樣可使消息得到及時處理。在調用decode()方法的前后,都檢查該Handler是否被用戶從ChannelPipeline中刪除,如果刪除則跳出解碼步驟不對輸入緩沖區in進行操作,因為繼續操作in已經不安全。解碼完成后,對in解碼前后的讀索引進行了檢查,防止用戶的錯誤使用,如果用戶錯誤使用將拋出異常。 至此,核心的解碼框架已經分析完畢,再看最后的一些邊角處理。首先是channelReadComplete()讀事件完成后的處理:

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {numReads = 0; // 連續讀次數置0discardSomeReadBytes(); // 丟棄已讀數據,節約內存if (decodeWasNull) {// 沒有解碼出結果,則期待更多數據讀入decodeWasNull = false;if (!ctx.channel().config().isAutoRead()) {ctx.read();}}ctx.fireChannelReadComplete();}復制代碼

    如果channelRead()中沒有解碼出消息,極有可能是數據不夠,由此調用ctx.read()期待讀入更多的數據。如果設置了自動讀取,將會在HeadHandler中調用ctx.read();沒有設置自動讀取,則需要此處顯式調用。 最后再看Handler從ChannelPipelien中移除的處理handlerRemoved():

    public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {ByteBuf buf = cumulation;if (buf != null) {cumulation = null; // 釋放累積區,GC回收int readable = buf.readableBytes();if (readable > 0) {ByteBuf bytes = buf.readBytes(readable);buf.release();// 解碼器已被刪除故不再解碼,只將數據傳播到下一個Handlerctx.fireChannelRead(bytes);} else {buf.release();}numReads = 0; // 置0,有可能被再次添加ctx.fireChannelReadComplete();}handlerRemoved0(ctx); // 用戶可進行的自定義處理} 復制代碼

    當解碼器被刪除時,如果還有沒被解碼的數據,則將數據傳播到下一個處理器處理,防止丟失數據。此外,當連接不再有效觸發channelInactive事件或者觸發ChannelInputShutdownEvent時,則會調用callDecode()解碼,如果解碼出消息,傳播到下一個處理器。這部分的代碼不再列出。 至此,ByteToMessageDecoder解碼框架已分析完畢,下面,我們選用具體的實例進行分析。

    LineBasedFrameDecoder

    基于行分隔的解碼器LineBasedFrameDecoder是一個特殊的分隔符解碼器,該解碼器使用的分隔符為:windows的\r\n和類linux的\n。 首先看該類定義的成員變量:

    // 最大幀長度,超過此長度將拋出異常TooLongFrameExceptionprivate final int maxLength;// 是否快速失敗,true-檢測到幀長度過長立即拋出異常不在讀取整個幀// false-檢測到幀長度過長依然讀完整個幀再拋出異常private final boolean failFast;// 是否略過分隔符,true-解碼結果不含分隔符private final boolean stripDelimiter;// 超過最大幀長度是否丟棄字節private boolean discarding;private int discardedBytes; // 丟棄的字節數 復制代碼

    其中,前三個變量可由用戶根據實際情況配置,后兩個變量解碼時使用。 該子類覆蓋的解碼方法如下:

    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object decoded = decode(ctx, in);if (decoded != null) {out.add(decoded);}} 復制代碼

    其中又定義了decode(ctx, in)解碼出單個消息幀,事實上這也是其他編碼子類使用的方法。decode(ctx, in)方法處理很繞彎,只給出偽代碼:

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {final int eol = findEndOfLine(buffer);if (!discarding) {if (eol >= 0) {// 此時已找到換行符if(!checkMaxLength()) {return getFrame().retain();} // 超過最大長度拋出異常} else {if (checkMaxLength()) {// 設置true表示下一次解碼需要丟棄字節discarding = true; if (failFast) {// 拋出異常}} }} else {if (eol >= 0) {// 丟棄換行符以及之前的字節buffer.readerIndex(eol + delimLength);} else {// 丟棄收到的所有字節buffer.readerIndex(buffer.writerIndex());}}} 復制代碼

    該方法需要結合解碼框架的while循環反復理解,每個if情況都是一次while循環,而變量discarding就成為控制每次解碼流程的狀態量,注意其中的狀態轉移。(想法:使用狀態機實現,則流程更清晰)

    DelimiterBasedFrameDecoder

    該解碼器是更通用的分隔符解碼器,可支持多個分隔符,每個分隔符可為一個或多個字符。如果定義了多個分隔符,并且可解碼出多個消息幀,則選擇產生最小幀長的結果。例如,使用行分隔符\r\n和\n分隔:

    +--------------+| ABC\nDEF\r\n |+--------------+ 復制代碼

    可有兩種結果:

    +-----+-----+ +----------+ | ABC | DEF | (√) 和 | ABC\nDEF | (×) +-----+-----+ +----------+ 復制代碼

    該編碼器可配置的變量與LineBasedFrameDecoder類似,只是多了一個ByteBuf[] delimiters用于配置具體的分隔符。 Netty在Delimiters類中定義了兩種默認的分隔符,分別是NULL分隔符和行分隔符:

    public static ByteBuf[] nulDelimiter() {return new ByteBuf[] {Unpooled.wrappedBuffer(new byte[] { 0 }) };}public static ByteBuf[] lineDelimiter() {return new ByteBuf[] {Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }),Unpooled.wrappedBuffer(new byte[] { '\n' }),};} 復制代碼

    FixedLengthFrameDecoder

    該解碼器十分簡單,按照固定長度frameLength解碼出消息幀。如下的數據幀解碼為固定長度3的消息幀示例如下:

    +---+----+------+----+ +-----+-----+-----+ | A | BC | DEFG | HI | -> | ABC | DEF | GHI | +---+----+------+----+ +-----+-----+-----+ 復制代碼

    其中的解碼方法也十分簡單:

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {if (in.readableBytes() < frameLength) {return null;} else {return in.readSlice(frameLength).retain();}} 復制代碼

    LengthFieldBasedFrameDecoder

    基于長度字段的消息幀解碼器,該解碼器可根據數據包中的長度字段動態的解碼出消息幀。一個推薦的二進制傳輸協議可設計為如下格式:

    +----------+------+----------+------+ | 頭部長度 | 頭部 | 數據長度 | 數據 | +----------+------+----------+------+ 復制代碼

    這樣的協議可滿足大多數場景使用,但不幸的是:很多情況下并不可以設計新的協議,往往要在老舊的協議上傳輸數據。由此,Netty將該解碼器設計的十分通用,只要有類似的長度字段便能正確解碼出消息幀。當然前提是:正確使用解碼器。 沒有什么是完美的,由于該解碼器十分通用,所以有大量的配置變量:

    private final ByteOrder byteOrder;private final int maxFrameLength;private final boolean failFast;private final int lengthFieldOffset;private final int lengthFieldLength;private final int lengthAdjustment;private final int initialBytesToStrip; 復制代碼

    變量byteOrder表示長度字段的字節序:大端或小端,默認為大端。如果對字節序有疑問,請查閱其他資料,不再贅述。maxFrameLength和failFast與其他解碼器相同,控制最大幀長度和快速失敗拋異常,注意:該解碼器failFast默認為true。 接下來將重點介紹其它四個變量:

    • lengthFieldOffset表示長度字段偏移量即在一個數據包中長度字段的具體下標位置。標準情況,該長度字段為數據部分長度。

    • lengthFieldLength表示長度字段的具體字節數,如一個int占4字節。該解碼器支持的字節數有:1,2,3,4和8,其他則會拋出異常。另外,還需要注意的是:長度字段的結果為無符號數。

    • lengthAdjustment是一個長度調節量,當數據包的長度字段不是數據部分長度而是總長度時,可將此值設定為頭部長度,便能正確解碼出包含整個數據包的結果消息幀。注意:某些情況下,該值可設定為負數。

    • initialBytesToStrip表示需要略過的字節數,如果我們只關心數據部分而不關心頭部,可將此值設定為頭部長度從而丟棄頭部。 下面我們使用具體的例子來說明:

    • 需求1:如下待解碼數據包,正確解碼為消息幀,其中長度字段在最前面的2字節,數據部分為12字節的字符串"HELLO, WORLD",長度字段0x000C=12 表示數據部分長度,數據包總長度則為14字節。

      解碼前(14 bytes) 解碼后(14 bytes)+--------+----------------+ +--------+----------------+| Length | Actual Content |----->| Length | Actual Content || 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |+--------+----------------+ +--------+----------------+ 復制代碼

    正確配置(只列出四個值中不為0的值):

    lengthFieldLength = 2; 復制代碼
    • 需求2:需求1的數據包不變,消息幀中去除長度字段。

      解碼前(14 bytes) 解碼后(12 bytes)+--------+----------------+ +----------------+| Length | Actual Content |----->| Actual Content || 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |+--------+----------------+ +----------------+ 復制代碼

    正確配置:

    lengthFieldLength = 2;initialBytesToStrip = 2; 復制代碼

    需求3:需求1數據包中長度字段表示數據包總長度。

    解碼前(14 bytes) 解碼后(14 bytes)+--------+----------------+ +--------+----------------+| Length | Actual Content |----->| Length | Actual Content || 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |+--------+----------------+ +--------+----------------+ 復制代碼

    正確配置:

    lengthFieldLength = 2;lengthAdjustment = -2; // 調整長度字段的2字節 復制代碼

    需求4:綜合難度,數據包有兩個頭部HDR1和HDR2,長度字段以及數據部分組成,其中長度字段值表示數據包總長度。結果消息幀需要第二個頭部HDR2和數據部分。請先給出答案再與標準答案比較,結果正確說明你已完全掌握了該解碼器的使用。

    解碼前 (16 bytes) 解碼后 (13 bytes) +------+--------+------+----------------+ +------+----------------+ | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | +------+--------+------+----------------+ +------+----------------+ 復制代碼

    正確配置:

    lengthFieldOffset = 1;lengthFieldLength = 2;lengthAdjustment = -3;initialBytesToStrip = 3; 復制代碼

    本解碼器的解碼過程總體上較為復雜,由于解碼的代碼是在while循環里面,decode方法return或者拋出異常時可看做一次循環結束,直到in中數據被解析完或者in的readerIndex讀索引不再增加才會從while循環跳出。使用狀態的思路理解,每個return或者拋出異常看為一個狀態:

    狀態1:丟棄過長幀狀態,可能是用戶設置了錯誤的幀長度或者實際幀過長。

    if (discardingTooLongFrame) {long bytesToDiscard = this.bytesToDiscard;int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());in.skipBytes(localBytesToDiscard); // 丟棄實際的字節數bytesToDiscard -= localBytesToDiscard;this.bytesToDiscard = bytesToDiscard;failIfNecessary(false);} 復制代碼

    變量localBytesToDiscard取得實際需要丟棄的字節數,由于過長幀有兩種情況:a.用戶設置了錯誤的長度字段,此時in中并沒有如此多的字節;b.in中確實有如此長度的幀,這個幀確實超過了設定的最大長度。bytesToDiscard的計算是為了failIfNecessary()確定異常的拋出,其值為0表示當次丟棄狀態已經丟棄了in中的所有數據,可以對新讀入in的數據進行處理;否則,還處于異常狀態。

    private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {if (bytesToDiscard == 0) {long tooLongFrameLength = this.tooLongFrameLength;this.tooLongFrameLength = 0;// 由于已經丟棄所有數據,關閉丟棄模式discardingTooLongFrame = false;// 已經丟棄了所有字節,當非快速失敗模式拋異常if (!failFast || firstDetectionOfTooLongFrame) {fail(tooLongFrameLength);}} else {if (failFast && firstDetectionOfTooLongFrame) {// 幀長度異常,快速失敗模式檢測到即拋異常fail(tooLongFrameLength);}}}復制代碼

    可見,首次檢測到幀長度是一種特殊情況,在之后的一個狀態進行分析。請注意該狀態并不是都拋異常,還有可能進入狀態2。

    狀態2:in中數據不足夠組成消息幀,此時直接返回null等待更多數據到達。

    if (in.readableBytes() < lengthFieldEndOffset) {return null;} 復制代碼

    狀態3:幀長度錯誤檢測,檢測長度字段為負值得幀以及加入調整長度后總長小于長度字段的幀,均拋出異常。

    int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;// 該方法取出長度字段的值,不再深入分析long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);if (frameLength < 0) {in.skipBytes(lengthFieldEndOffset);throw new CorruptedFrameException("...");}frameLength += lengthAdjustment + lengthFieldEndOffset;if (frameLength < lengthFieldEndOffset) {in.skipBytes(lengthFieldEndOffset);throw new CorruptedFrameException("..."); 復制代碼

    狀態4:幀過長,由前述可知:可能是用戶設置了錯誤的幀長度或者實際幀過長

    if (frameLength > maxFrameLength) {long discard = frameLength - in.readableBytes();tooLongFrameLength = frameLength;if (discard < 0) {in.skipBytes((int) frameLength);} else {discardingTooLongFrame = true;bytesToDiscard = discard;in.skipBytes(in.readableBytes());}failIfNecessary(true);return null;} 復制代碼

    變量discard<0表示當前收到的數據足以確定是實際的幀過長,所以直接丟棄過長的幀長度;>0表示當前in中的數據并不足以確定是用戶設置了錯誤的幀長度,還是正確幀的后續數據字節還沒有到達,但無論何種情況,將丟棄狀態discardingTooLongFrame標記設置為true,之后后續數據字節進入狀態1處理。==0時,在failIfNecessary(true)無論如何都將拋出異常,><0時,只有設置快速失敗才會拋出異常。還需注意一點:failIfNecessary()的參數firstDetectionOfTooLongFrame的首次是指正確解析數據后發生的第一次發生的幀過長,可知會有很多首次。

    狀態5:正確解碼出消息幀。

    int frameLengthInt = (int) frameLength;if (in.readableBytes() < frameLengthInt) {return null; // 到達的數據還達不到幀長}if (initialBytesToStrip > frameLengthInt) {in.skipBytes(frameLengthInt); // 跳過字節數錯誤throw new CorruptedFrameException("...");}in.skipBytes(initialBytesToStrip);// 正確解碼出數據幀int readerIndex = in.readerIndex();int actualFrameLength = frameLengthInt - initialBytesToStrip;ByteBuf frame = in.slice(readerIndex, actualFrameLength).retain();in.readerIndex(readerIndex + actualFrameLength);return frame; 復制代碼

    代碼中混合了兩個簡單狀態,到達的數據還達不到幀長和用戶設置的忽略字節數錯誤。由于較為簡單,故合并到一起。 至此解碼框架分析完畢。可見,要正確的寫出基于長度字段的解碼器還是較為復雜的,如果開發時確有需求,特別要注意狀態的轉移。下面介紹較為簡單的編碼框架。

    請戳GitHub原文: https://github.com/wangzhiwubigdata/God-Of-BigData關注公眾號,內推,面試,資源下載,關注更多大數據技術~大數據成神之路~預計更新500+篇文章,已經更新60+篇~ 復制代碼

    總結

    以上是生活随笔為你收集整理的Netty源码解析8-ChannelHandler实例之CodecHandler的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。