日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty 5用户指南

發(fā)布時間:2025/3/15 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty 5用户指南 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

問題

現(xiàn)如今我們使用通用的應用程序或者類庫來實現(xiàn)系統(tǒng)之間地互相訪問,比如我們經(jīng)常使用一個HTTP客戶端來從web服務器上獲取信息,或者通過web service來執(zhí)行一個遠程的調(diào)用。

然而,有時候一個通用的協(xié)議和他的實現(xiàn)并沒有覆蓋一些場景。比如我們無法使用一個通用的HTTP服務器來處理大文件、電子郵件、近實時消息比如財務信息和多人游戲數(shù)據(jù)。我們需要一個合適的協(xié)議來處理一些特殊的場景。例如你可以實現(xiàn)一個優(yōu)化的Ajax的聊天應用、媒體流傳輸或者是大文件傳輸?shù)腍TTP服務器,你甚至可以自己設(shè)計和實現(xiàn)一個新的協(xié)議來準確地實現(xiàn)你的需求。

?

另外不可避免的事情是你不得不處理這些私有協(xié)議來確保和原有系統(tǒng)的互通。這個例子將會展示如何快速實現(xiàn)一個不影響應用程序穩(wěn)定性和性能的協(xié)議。

解決方案

Netty是一個提供異步事件驅(qū)動的網(wǎng)絡應用框架,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡服務器和客戶端程序。

換句話說,Netty是一個NIO框架,使用它可以簡單快速地開發(fā)網(wǎng)絡應用程序,比如客戶端和服務端的協(xié)議。Netty大大簡化了網(wǎng)絡程序的開發(fā)過程比如TCP和UDP的 Socket的開發(fā)。

“快速和簡單”并不意味著應用程序會有難維護和性能低的問題,Netty是一個精心設(shè)計的框架,它從許多協(xié)議的實現(xiàn)中吸收了很多的經(jīng)驗比如FTP、SMTP、HTTP、許多二進制和基于文本的傳統(tǒng)協(xié)議,Netty在不降低開發(fā)效率、性能、穩(wěn)定性、靈活性情況下,成功地找到了解決方案。

有一些用戶可能已經(jīng)發(fā)現(xiàn)其他的一些網(wǎng)絡框架也聲稱自己有同樣的優(yōu)勢,所以你可能會問是Netty和它們的不同之處。答案就是Netty的哲學設(shè)計理念。Netty從第一天開始就為用戶提供了用戶體驗最好的API以及實現(xiàn)設(shè)計。正是因為Netty的設(shè)計理念,才讓我們得以輕松地閱讀本指南并使用Netty

入門指南

這個章節(jié)會介紹Netty核心的結(jié)構(gòu),并通過一些簡單的例子來幫助你快速入門。當你讀完本章節(jié)你馬上就可以用Netty寫出一個客戶端和服務端。

如果你在學習的時候喜歡“自頂向下(top-down)”的方法,那你可能需要要從第二章《架構(gòu)概述》開始,然后再回到這里。

開始之前

運行本章節(jié)中的兩個例子最低要求是:Netty的最新版本(Netty5)和JDK1.6及以上。最新的Netty版本在項目下載頁面可以找到。為了下載到正確的JDK版本,請到你喜歡的網(wǎng)站下載。

閱讀本章節(jié)過程中,你可能會對相關(guān)類有疑惑,關(guān)于這些類的詳細的信息請請參考API說明文檔。為了方便,所有文檔中涉及到的類名字都會被關(guān)聯(lián)到一個在線的API說明。當然如果有任何錯誤信息、語法錯誤或者你有任何好的建議來改進文檔說明,那么請聯(lián)系Netty社區(qū)。

DISCARD服務(丟棄服務,指的是會忽略所有接收的數(shù)據(jù)的一種協(xié)議)

世界上最簡單的協(xié)議不是”Hello,World!”,是DISCARD,他是一種丟棄了所有接受到的數(shù)據(jù),并不做有任何的響應的協(xié)議。

為了實現(xiàn)DISCARD協(xié)議,你唯一需要做的就是忽略所有收到的數(shù)據(jù)。讓我們從處理器的實現(xiàn)開始,處理器是由Netty生成用來處理I/O事件的。

package io.netty.example.discard;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerAdapter;/*** Handles a server-side channel.*/ public class DiscardServerHandler extends ChannelHandlerAdapter { // (1)@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)// Discard the received data silently.((ByteBuf) msg).release(); // (3)}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();} }
  • DisCardServerHandler 繼承自?ChannelHandlerAdapter,這個類實現(xiàn)了ChannelHandler接口,ChannelHandler提供了許多事件處理的接口方法,然后你可以覆蓋這些方法。現(xiàn)在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現(xiàn)接口方法。
  • 這里我們覆蓋了chanelRead()事件處理方法。每當從客戶端收到新的數(shù)據(jù)時,這個方法會在收到消息時被調(diào)用,這個例子中,收到的消息的類型是ByteBuf
  • 為了實現(xiàn)DISCARD協(xié)議,處理器不得不忽略所有接受到的消息。ByteBuf是一個引用計數(shù)對象,這個對象必須顯示地調(diào)用release()方法來釋放。請記住處理器的職責是釋放所有傳遞到處理器的引用計數(shù)對象。通常,channelRead()方法的實現(xiàn)就像下面的這段代碼: @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {try {// Do something with msg} finally {ReferenceCountUtil.release(msg);} }
  • exceptionCaught()事件處理方法是當出現(xiàn)Throwable對象才會被調(diào)用,即當Netty由于IO錯誤或者處理器在處理事件時拋出的異常時。在大部分情況下,捕獲的異常應該被記錄下來并且把關(guān)聯(lián)的channel給關(guān)閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不同的實現(xiàn),比如你可能想在關(guān)閉連接之前發(fā)送一個錯誤碼的響應消息。
  • 到目前為止一切都還比較順利,我們已經(jīng)實現(xiàn)了DISCARD服務的一半功能,剩下的需要編寫一個main()方法來啟動服務端的DiscardServerHandler。

    package io.netty.example.discard;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;/*** Discards any incoming data.*/ public class DiscardServer {private int port;public DiscardServer(int port) {this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap(); // (2)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3).childHandler(new ChannelInitializer<SocketChannel>() { // (4)@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new DiscardServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128) // (5).childOption(ChannelOption.SO_KEEPALIVE, true); // (6)// Bind and start to accept incoming connections.ChannelFuture f = b.bind(port).sync(); // (7)// Wait until the server socket is closed.// In this example, this does not happen, but you can do that to gracefully// shut down your server.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port;if (args.length > 0) {port = Integer.parseInt(args[0]);} else {port = 8080;}new DiscardServer(port).run();} }
  • NioEventLoopGroup?是用來處理I/O操作的多線程事件循環(huán)器,Netty提供了許多不同的EventLoopGroup的實現(xiàn)用來處理不同傳輸協(xié)議。在這個例子中我們實現(xiàn)了一個服務端的應用,因此會有2個NioEventLoopGroup會被使用。第一個經(jīng)常被叫做‘boss’,用來接收進來的連接。第二個經(jīng)常被叫做‘worker’,用來處理已經(jīng)被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。如何知道多少個線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的Channels上都需要依賴于EventLoopGroup的實現(xiàn),并且可以通過構(gòu)造函數(shù)來配置他們的關(guān)系。
  • ServerBootstrap?是一個啟動NIO服務的輔助啟動類。你可以在這個服務中直接使用Channel,但是這會是一個復雜的處理過程,在很多情況下你并不需要這樣做。
  • 這里我們指定使用NioServerSocketChannel類來舉例說明一個新的Channel如何接收進來的連接。
  • 這里的事件處理類經(jīng)常會被用來處理一個最近的已經(jīng)接收的Channel。ChannelInitializer是一個特殊的處理類,他的目的是幫助使用者配置一個新的Channel。也許你想通過增加一些處理類比如DiscardServerHandle來配置一個新的Channel或者其對應的ChannelPipeline來實現(xiàn)你的網(wǎng)絡程序。當你的程序變的復雜時,可能你會增加更多的處理類到pipline上,然后提取這些匿名類到最頂層的類上。
  • 你可以設(shè)置這里指定的通道實現(xiàn)的配置參數(shù)。我們正在寫一個TCP/IP的服務端,因此我們被允許設(shè)置socket的參數(shù)選項比如tcpNoDelay和keepAlive。請參考ChannelOption和詳細的ChannelConfig實現(xiàn)的接口文檔以此可以對ChannelOptions的有一個大概的認識。
  • 你關(guān)注過option()和childOption()嗎?option()是提供給NioServerSocketChannel用來接收進來的連接。childOption()是提供給由父管道ServerChannel接收到的連接,在這個例子中也是NioServerSocketChannel。
  • 我們繼續(xù),剩下的就是綁定端口然后啟動服務。這里我們在機器上綁定了機器所有網(wǎng)卡上的8080端口。當然現(xiàn)在你可以多次調(diào)用bind()方法(基于不同綁定地址)。
  • 恭喜!你已經(jīng)完成熟練地完成了第一個基于Netty的服務端程序。

    觀察接收到的數(shù)據(jù)

    現(xiàn)在我們已經(jīng)編寫出我們第一個服務端,我們需要測試一下他是否真的可以運行。最簡單的測試方法是用telnet 命令。例如,你可以在命令行上輸入telnet localhost 8080或者其他類型參數(shù)。

    然而我們能說這個服務端是正常運行了嗎?事實上我們也不知道因為他是一個discard服務,你根本不可能得到任何的響應。為了證明他仍然是在工作的,讓我們修改服務端的程序來打印出他到底接收到了什么。

    我們已經(jīng)知道channelRead()方法是在數(shù)據(jù)被接收的時候調(diào)用。讓我們放一些代碼到DiscardServerHandler類的channelRead()方法。

    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf in = (ByteBuf) msg;try {while (in.isReadable()) { // (1)System.out.print((char) in.readByte());System.out.flush();}} finally {ReferenceCountUtil.release(msg); // (2)} }
  • 這個低效的循環(huán)事實上可以簡化為:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  • 或者,你可以在這里調(diào)用in.release()。
  • 如果你再次運行telnet命令,你將會看到服務端打印出了他所接收到的消息。
    完整的discard server代碼放在了io.netty.example.discard包下面。

    ECHO服務(響應式協(xié)議)

    到目前為止,我們雖然接收到了數(shù)據(jù),但沒有做任何的響應。然而一個服務端通常會對一個請求作出響應。讓我們學習怎樣在ECHO協(xié)議的實現(xiàn)下編寫一個響應消息給客戶端,這個協(xié)議針對任何接收的數(shù)據(jù)都會返回一個響應。

    和discard server唯一不同的是把在此之前我們實現(xiàn)的channelRead()方法,返回所有的數(shù)據(jù)替代打印接收數(shù)據(jù)到控制臺上的邏輯。因此,需要把channelRead()方法修改如下:

    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.write(msg); // (1)ctx.flush(); // (2) }

    1.?ChannelHandlerContext對象提供了許多操作,使你能夠觸發(fā)各種各樣的I/O事件和操作。這里我們調(diào)用了write(Object)方法來逐字地把接受到的消息寫入。請注意不同于DISCARD的例子我們并沒有釋放接受到的消息,這是因為當寫入的時候Netty已經(jīng)幫我們釋放了。
    2. ctx.write(Object)方法不會使消息寫入到通道上,他被緩沖在了內(nèi)部,你需要調(diào)用ctx.flush()方法來把緩沖區(qū)中數(shù)據(jù)強行輸出。或者你可以用更簡潔的cxt.writeAndFlush(msg)以達到同樣的目的。

    如果你再一次運行telnet命令,你會看到服務端會發(fā)回一個你已經(jīng)發(fā)送的消息。
    完整的echo服務的代碼放在了io.netty.example.echo包下面。

    TIME服務(時間協(xié)議的服務)

    在這個部分被實現(xiàn)的協(xié)議是TIME協(xié)議。和之前的例子不同的是在不接受任何請求時他會發(fā)送一個含32位的整數(shù)的消息,并且一旦消息發(fā)送就會立即關(guān)閉連接。在這個例子中,你會學習到如何構(gòu)建和發(fā)送一個消息,然后在完成時主動關(guān)閉連接。

    因為我們將會忽略任何接收到的數(shù)據(jù),而只是在連接被創(chuàng)建發(fā)送一個消息,所以這次我們不能使用channelRead()方法了,代替他的是,我們需要覆蓋channelActive()方法,下面的就是實現(xiàn)的內(nèi)容:

    package io.netty.example.time;public class TimeServerHandler extends ChannelHandlerAdapter {@Overridepublic void channelActive(final ChannelHandlerContext ctx) { // (1)final ByteBuf time = ctx.alloc().buffer(4); // (2)time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));final ChannelFuture f = ctx.writeAndFlush(time); // (3)f.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {assert f == future;ctx.close();}}); // (4)}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }
  • channelActive()方法將會在連接被建立并且準備進行通信時被調(diào)用。因此讓我們在這個方法里完成一個代表當前時間的32位整數(shù)消息的構(gòu)建工作。
  • 為了發(fā)送一個新的消息,我們需要分配一個包含這個消息的新的緩沖。因為我們需要寫入一個32位的整數(shù),因此我們需要一個至少有4個字節(jié)的ByteBuf。通過ChannelHandlerContext.alloc()得到一個當前的ByteBufAllocator,然后分配一個新的緩沖。
  • 和往常一樣我們需要編寫一個構(gòu)建好的消息。但是等一等,flip在哪?難道我們使用NIO發(fā)送消息時不是調(diào)用java.nio.ByteBuffer.flip()嗎?ByteBuf之所以沒有這個方法因為有兩個指針,一個對應讀操作一個對應寫操作。當你向ByteBuf里寫入數(shù)據(jù)的時候?qū)懼羔樀乃饕蜁黾?#xff0c;同時讀指針的索引沒有變化。讀指針索引和寫指針索引分別代表了消息的開始和結(jié)束。比較起來,NIO緩沖并沒有提供一種簡潔的方式來計算出消息內(nèi)容的開始和結(jié)尾,除非你調(diào)用flip方法。當你忘記調(diào)用flip方法而引起沒有數(shù)據(jù)或者錯誤數(shù)據(jù)被發(fā)送時,你會陷入困境。這樣的一個錯誤不會發(fā)生在Netty上,因為我們對于不同的操作類型有不同的指針。你會發(fā)現(xiàn)這樣的使用方法會讓你過程變得更加的容易,因為你已經(jīng)習慣一種沒有使用flip的方式。另外一個點需要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法會返回一個ChannelFuture對象,一個ChannelFuture代表了一個還沒有發(fā)生的I/O操作。這意味著任何一個請求操作都不會馬上被執(zhí)行,因為在Netty里所有的操作都是異步的。舉個例子下面的代碼中在消息被發(fā)送之前可能會先關(guān)閉連接。 Channel ch = ...; ch.writeAndFlush(message); ch.close();

    因此你需要在write()方法返回的ChannelFuture完成后調(diào)用close()方法,然后當他的寫操作已經(jīng)完成他會通知他的監(jiān)聽者。請注意,close()方法也可能不會立馬關(guān)閉,他也會返回一個ChannelFuture。

  • 當一個寫請求已經(jīng)完成是如何通知到我們?這個只需要簡單地在返回的ChannelFuture上增加一個ChannelFutureListener。這里我們構(gòu)建了一個匿名的ChannelFutureListener類用來在操作完成時關(guān)閉Channel。或者,你可以使用簡單的預定義監(jiān)聽器代碼: f.addListener(ChannelFutureListener.CLOSE);
  • 為了測試我們的time服務如我們期望的一樣工作,你可以使用UNIX的rdate命令

    $ rdate -o <port> -p <host>

    Port是你在main()函數(shù)中指定的端口,host使用locahost就可以了。

    Time客戶端

    不像DISCARD和ECHO的服務端,對于TIME協(xié)議我們需要一個客戶端因為人們不能把一個32位的二進制數(shù)據(jù)翻譯成一個日期或者日歷。在這一部分,我們將會討論如何確保服務端是正常工作的,并且學習怎樣用Netty編寫一個客戶端。

    Netty中,編寫服務端和客戶端最大的并且唯一不同的使用了不同的BootStrap和Channel的實現(xiàn)。請看一下下面的代碼:

    package io.netty.example.time;public class TimeClient {public static void main(String[] args) throws Exception {String host = args[0];int port = Integer.parseInt(args[1]);EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer&lt;SocketChannel&gt;() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeClientHandler());}});// Start the client.ChannelFuture f = b.connect(host, port).sync(); // (5)// Wait until the connection is closed.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}} }
  • BootStrap和ServerBootstrap類似,不過他是對非服務端的channel而言,比如客戶端或者無連接傳輸模式的channel。
  • 如果你只指定了一個EventLoopGroup,那他就會即作為一個‘boss’線程,也會作為一個‘workder’線程,盡管客戶端不需要使用到‘boss’線程。
  • 代替NioServerSocketChannel的是NioSocketChannel,這個類在客戶端channel被創(chuàng)建時使用。
  • 不像在使用ServerBootstrap時需要用childOption()方法,因為客戶端的SocketChannel沒有父channel的概念。
  • 我們用connect()方法代替了bind()方法。
  • 正如你看到的,他和服務端的代碼是不一樣的。ChannelHandler是如何實現(xiàn)的?他應該從服務端接受一個32位的整數(shù)消息,把他翻譯成人們能讀懂的格式,并打印翻譯好的時間,最后關(guān)閉連接:

    package io.netty.example.time;import java.util.Date;public class TimeClientHandler extends ChannelHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf m = (ByteBuf) msg; // (1)try {long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;System.out.println(new Date(currentTimeMillis));ctx.close();} finally {m.release();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }
  • 在TCP/IP中,NETTY會把讀到的數(shù)據(jù)放到ByteBuf的數(shù)據(jù)結(jié)構(gòu)中。
  • 這樣看起來非常簡單,并且和服務端的那個例子的代碼也相差不多。然而,處理器有時候會因為拋出IndexOutOfBoundsException而拒絕工作。在下個部分我們會討論為什么會發(fā)生這種情況。

    流數(shù)據(jù)的傳輸處理

    一個小的Socket Buffer問題

    在基于流的傳輸里比如TCP/IP,接收到的數(shù)據(jù)會先被存儲到一個socket接收緩沖里。不幸的是,基于流的傳輸并不是一個數(shù)據(jù)包隊列,而是一個字節(jié)隊列。即使你發(fā)送了2個獨立的數(shù)據(jù)包,操作系統(tǒng)也不會作為2個消息處理而僅僅是作為一連串的字節(jié)而言。因此這是不能保證你遠程寫入的數(shù)據(jù)就會準確地讀取。舉個例子,讓我們假設(shè)操作系統(tǒng)的TCP/TP協(xié)議棧已經(jīng)接收了3個數(shù)據(jù)包:

    由于基于流傳輸?shù)膮f(xié)議的這種普通的性質(zhì),在你的應用程序里讀取數(shù)據(jù)的時候會有很高的可能性被分成下面的片段。

    因此,一個接收方不管他是客戶端還是服務端,都應該把接收到的數(shù)據(jù)整理成一個或者多個更有意思并且能夠讓程序的業(yè)務邏輯更好理解的數(shù)據(jù)。在上面的例子中,接收到的數(shù)據(jù)應該被構(gòu)造成下面的格式:

    第一個解決方案

    現(xiàn)在讓我們回到TIME客戶端的例子上。這里我們遇到了同樣的問題,一個32字節(jié)數(shù)據(jù)是非常小的數(shù)據(jù)量,他并不見得會被經(jīng)常拆分到到不同的數(shù)據(jù)段內(nèi)。然而,問題是他確實可能會被拆分到不同的數(shù)據(jù)段內(nèi),并且拆分的可能性會隨著通信量的增加而增加。

    最簡單的方案是構(gòu)造一個內(nèi)部的可積累的緩沖,直到4個字節(jié)全部接收到了內(nèi)部緩沖。下面的代碼修改了TimeClientHandler的實現(xiàn)類修復了這個問題

    package io.netty.example.time;import java.util.Date;public class TimeClientHandler extends ChannelHandlerAdapter {private ByteBuf buf;@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {buf = ctx.alloc().buffer(4); // (1)}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {buf.release(); // (1)buf = null;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf m = (ByteBuf) msg;buf.writeBytes(m); // (2)m.release();if (buf.readableBytes() &gt;= 4) { // (3)long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;System.out.println(new Date(currentTimeMillis));ctx.close();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }
  • ChannelHandler有2個生命周期的監(jiān)聽方法:handlerAdded()和handlerRemoved()。你可以完成任意初始化任務只要他不會被阻塞很長的時間。
  • 首先,所有接收的數(shù)據(jù)都應該被累積在buf變量里。
  • 然后,處理器必須檢查buf變量是否有足夠的數(shù)據(jù),在這個例子中是4個字節(jié),然后處理實際的業(yè)務邏輯。否則,Netty會重復調(diào)用channelRead()當有更多數(shù)據(jù)到達直到4個字節(jié)的數(shù)據(jù)被積累。
  • 第二個解決方案

    盡管第一個解決方案已經(jīng)解決了Time客戶端的問題了,但是修改后的處理器看起來不那么的簡潔,想象一下如果由多個字段比如可變長度的字段組成的更為復雜的協(xié)議時,你的ChannelHandler的實現(xiàn)將很快地變得難以維護。

    正如你所知的,你可以增加多個ChannelHandler到ChannelPipeline?,因此你可以把一整個ChannelHandler拆分成多個模塊以減少應用的復雜程度,比如你可以把TimeClientHandler拆分成2個處理器:

    • TimeDecoder處理數(shù)據(jù)拆分的問題
    • TimeClientHandler原始版本的實現(xiàn)

    幸運地是,Netty提供了一個可擴展的類,幫你完成TimeDecoder的開發(fā)。

    package io.netty.example.time;public class TimeDecoder extends ByteToMessageDecoder { // (1)@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List&lt;Object&gt; out) { // (2)if (in.readableBytes() &lt; 4) {return; // (3)}out.add(in.readBytes(4)); // (4)} }
  • ByteToMessageDecoder是ChannelHandler的一個實現(xiàn)類,他可以在處理數(shù)據(jù)拆分的問題上變得很簡單。
  • 每當有新數(shù)據(jù)接收的時候,ByteToMessageDecoder都會調(diào)用decode()方法來處理內(nèi)部的那個累積緩沖。
  • Decode()方法可以決定當累積緩沖里沒有足夠數(shù)據(jù)時可以往out對象里放任意數(shù)據(jù)。當有更多的數(shù)據(jù)被接收了ByteToMessageDecoder會再一次調(diào)用decode()方法。
  • 如果在decode()方法里增加了一個對象到out對象里,這意味著解碼器解碼消息成功。ByteToMessageDecoder將會丟棄在累積緩沖里已經(jīng)被讀過的數(shù)據(jù)。請記得你不需要對多條消息調(diào)用decode(),ByteToMessageDecoder會持續(xù)調(diào)用decode()直到不放任何數(shù)據(jù)到out里。
  • 現(xiàn)在我們有另外一個處理器插入到ChannelPipeline里,我們應該在TimeClient里修改ChannelInitializer?的實現(xiàn):

    b.handler(new ChannelInitializer&lt;SocketChannel&gt;() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());} });

    如果你是一個大膽的人,你可能會嘗試使用更簡單的解碼類ReplayingDecoder。不過你還是需要參考一下API文檔來獲取更多的信息。

    public class TimeDecoder extends ReplayingDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<object width="300" height="150">out) {out.add(in.readBytes(4));}}

    此外,Netty還提供了更多可以直接拿來用的解碼器使你可以更簡單地實現(xiàn)更多的協(xié)議,幫助你避免開發(fā)一個難以維護的處理器實現(xiàn)。請參考下面的包以獲取更多更詳細的例子:

    • 對于二進制協(xié)議請看io.netty.example.factorial
    • 對于基于文本協(xié)議請看io.netty.example.telnet

    用POJO代替ByteBuf

    我們已經(jīng)討論了所有的例子,到目前為止一個消息的消息都是使用ByteBuf作為一個基本的數(shù)據(jù)結(jié)構(gòu)。在這一部分,我們會改進TIME協(xié)議的客戶端和服務端的例子,用POJO替代ByteBuf。在你的ChannelHandlerS中使用POJO優(yōu)勢是比較明顯的。通過從ChannelHandler中提取出ByteBuf的代碼,將會使ChannelHandler的實現(xiàn)變得更加可維護和可重用。在TIME客戶端和服務端的例子中,我們讀取的僅僅是一個32位的整形數(shù)據(jù),直接使用ByteBuf不會是一個主要的問題。然后,你會發(fā)現(xiàn)當你需要實現(xiàn)一個真實的協(xié)議,分離代碼變得非常的必要。首先,讓我們定義一個新的類型叫做UnixTime。

    package io.netty.example.time;import java.util.Date;public class UnixTime {private final int value;public UnixTime() {this((int) (System.currentTimeMillis() / 1000L + 2208988800L));}public UnixTime(int value) {this.value = value;}public int value() {return value;}@Overridepublic String toString() {return new Date((value() - 2208988800L) * 1000L).toString();} }

    現(xiàn)在我們可以修改下TimeDecoder類,返回一個UnixTime,以替代ByteBuf

    @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}out.add(new UnixTime(in.readInt())); }

    下面是修改后的解碼器,TimeClientHandler不再有任何的ByteBuf代碼了。

    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {UnixTime m = (UnixTime) msg;System.out.println(m);ctx.close(); }

    是不是變得更加簡單和優(yōu)雅了?相同的技術(shù)可以被運用到服務端。讓我們修改一下TimeServerHandler的代碼。

    @Override public void channelActive(ChannelHandlerContext ctx) {ChannelFuture f = ctx.writeAndFlush(new UnixTime());f.addListener(ChannelFutureListener.CLOSE); }

    現(xiàn)在,僅僅需要修改的是ChannelHandler的實現(xiàn),這里需要把UnixTime對象重新轉(zhuǎn)化為一個ByteBuf。不過這已經(jīng)是非常簡單了,因為當你對一個消息編碼的時候,你不需要再處理拆包和組裝的過程。

    package io.netty.example.time;public class TimeEncoder extends ChannelHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {UnixTime m = (UnixTime) msg;ByteBuf encoded = ctx.alloc().buffer(4);encoded.writeInt(m.value());ctx.write(encoded, promise); // (1)} }
  • 在這幾行代碼里還有幾個重要的事情。第一, 通過ChannelPromise,當編碼后的數(shù)據(jù)被寫到了通道上Netty可以通過這個對象標記是成功還是失敗。第二, 我們不需要調(diào)用cxt.flush()。因為處理器已經(jīng)單獨分離出了一個方法void flush(ChannelHandlerContext cxt),如果像自己實現(xiàn)flush方法內(nèi)容可以自行覆蓋這個方法。
  • 進一步簡化操作,你可以使用MessageToByteEncode:

    public class TimeEncoder extends MessageToByteEncoder<UnixTime> {@Overrideprotected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {out.writeInt(msg.value());} }

    最后的任務就是在TimeServerHandler之前把TimeEncoder插入到ChannelPipeline。但這是不那么重要的工作。

    關(guān)閉你的應用

    關(guān)閉一個Netty應用往往只需要簡單地通過shutdownGracefully()方法來關(guān)閉你構(gòu)建的所有的NioEventLoopGroupS.當EventLoopGroup被完全地終止,并且對應的所有channels都已經(jīng)被關(guān)閉時,Netty會返回一個Future對象。

    概述

    在這一章節(jié)中,我們會快速地回顧下如果在熟練掌握Netty的情況下編寫出一個健壯能運行的網(wǎng)絡應用程序。在Netty接下去的章節(jié)中還會有更多更相信的信息。我們也鼓勵你去重新復習下在io.netty.example包下的例子。請注意社區(qū)一直在等待你的問題和想法以幫助Netty的持續(xù)改進,Netty的文檔也是基于你們的快速反饋上。

    總結(jié)

    以上是生活随笔為你收集整理的Netty 5用户指南的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。