Netty 5用户指南
問題
現(xiàn)如今我們使用通用的應用程序或者類庫來實現(xiàn)系統(tǒng)之間地互相訪問,比如我們經(jīng)常使用一個HTTP客戶端來從web服務器上獲取信息,或者通過web service來執(zhí)行一個遠程的調(diào)用。
然而,有時候一個通用的協(xié)議和他的實現(xiàn)并沒有覆蓋一些場景。比如我們無法使用一個通用的HTTP服務器來處理大文件、電子郵件、近實時消息比如財務信息和多人游戲數(shù)據(jù)。我們需要一個合適的協(xié)議來處理一些特殊的場景。例如你可以實現(xiàn)一個優(yōu)化的Ajax的聊天應用、媒體流傳輸或者是大文件傳輸?shù)腍TTP服務器,你甚至可以自己設(shè)計和實現(xiàn)一個新的協(xié)議來準確地實現(xiàn)你的需求。
?
另外不可避免的事情是你不得不處理這些私有協(xié)議來確保和原有系統(tǒng)的互通。這個例子將會展示如何快速實現(xiàn)一個不影響應用程序穩(wěn)定性和性能的協(xié)議。
解決方案
Netty是一個提供異步事件驅(qū)動的網(wǎng)絡應用框架,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡服務器和客戶端程序。
換句話說,Netty是一個NIO框架,使用它可以簡單快速地開發(fā)網(wǎng)絡應用程序,比如客戶端和服務端的協(xié)議。Netty大大簡化了網(wǎng)絡程序的開發(fā)過程比如TCP和UDP的 Socket的開發(fā)。
“快速和簡單”并不意味著應用程序會有難維護和性能低的問題,Netty是一個精心設(shè)計的框架,它從許多協(xié)議的實現(xiàn)中吸收了很多的經(jīng)驗比如FTP、SMTP、HTTP、許多二進制和基于文本的傳統(tǒng)協(xié)議,Netty在不降低開發(fā)效率、性能、穩(wěn)定性、靈活性情況下,成功地找到了解決方案。
有一些用戶可能已經(jīng)發(fā)現(xiàn)其他的一些網(wǎng)絡框架也聲稱自己有同樣的優(yōu)勢,所以你可能會問是Netty和它們的不同之處。答案就是Netty的哲學設(shè)計理念。Netty從第一天開始就為用戶提供了用戶體驗最好的API以及實現(xiàn)設(shè)計。正是因為Netty的設(shè)計理念,才讓我們得以輕松地閱讀本指南并使用Netty。
入門指南
這個章節(jié)會介紹Netty核心的結(jié)構(gòu),并通過一些簡單的例子來幫助你快速入門。當你讀完本章節(jié)你馬上就可以用Netty寫出一個客戶端和服務端。
如果你在學習的時候喜歡“自頂向下(top-down)”的方法,那你可能需要要從第二章《架構(gòu)概述》開始,然后再回到這里。
開始之前
運行本章節(jié)中的兩個例子最低要求是:Netty的最新版本(Netty5)和JDK1.6及以上。最新的Netty版本在項目下載頁面可以找到。為了下載到正確的JDK版本,請到你喜歡的網(wǎng)站下載。
閱讀本章節(jié)過程中,你可能會對相關(guān)類有疑惑,關(guān)于這些類的詳細的信息請請參考API說明文檔。為了方便,所有文檔中涉及到的類名字都會被關(guān)聯(lián)到一個在線的API說明。當然如果有任何錯誤信息、語法錯誤或者你有任何好的建議來改進文檔說明,那么請聯(lián)系Netty社區(qū)。
DISCARD服務(丟棄服務,指的是會忽略所有接收的數(shù)據(jù)的一種協(xié)議)
世界上最簡單的協(xié)議不是”Hello,World!”,是DISCARD,他是一種丟棄了所有接受到的數(shù)據(jù),并不做有任何的響應的協(xié)議。
為了實現(xiàn)DISCARD協(xié)議,你唯一需要做的就是忽略所有收到的數(shù)據(jù)。讓我們從處理器的實現(xiàn)開始,處理器是由Netty生成用來處理I/O事件的。
package io.netty.example.discard;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerAdapter;/*** Handles a server-side channel.*/ public class DiscardServerHandler extends ChannelHandlerAdapter { // (1)@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)// Discard the received data silently.((ByteBuf) msg).release(); // (3)}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();} }到目前為止一切都還比較順利,我們已經(jīng)實現(xiàn)了DISCARD服務的一半功能,剩下的需要編寫一個main()方法來啟動服務端的DiscardServerHandler。
package io.netty.example.discard;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;/*** Discards any incoming data.*/ public class DiscardServer {private int port;public DiscardServer(int port) {this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap(); // (2)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3).childHandler(new ChannelInitializer<SocketChannel>() { // (4)@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new DiscardServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128) // (5).childOption(ChannelOption.SO_KEEPALIVE, true); // (6)// Bind and start to accept incoming connections.ChannelFuture f = b.bind(port).sync(); // (7)// Wait until the server socket is closed.// In this example, this does not happen, but you can do that to gracefully// shut down your server.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port;if (args.length > 0) {port = Integer.parseInt(args[0]);} else {port = 8080;}new DiscardServer(port).run();} }恭喜!你已經(jīng)完成熟練地完成了第一個基于Netty的服務端程序。
觀察接收到的數(shù)據(jù)
現(xiàn)在我們已經(jīng)編寫出我們第一個服務端,我們需要測試一下他是否真的可以運行。最簡單的測試方法是用telnet 命令。例如,你可以在命令行上輸入telnet localhost 8080或者其他類型參數(shù)。
然而我們能說這個服務端是正常運行了嗎?事實上我們也不知道因為他是一個discard服務,你根本不可能得到任何的響應。為了證明他仍然是在工作的,讓我們修改服務端的程序來打印出他到底接收到了什么。
我們已經(jīng)知道channelRead()方法是在數(shù)據(jù)被接收的時候調(diào)用。讓我們放一些代碼到DiscardServerHandler類的channelRead()方法。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf in = (ByteBuf) msg;try {while (in.isReadable()) { // (1)System.out.print((char) in.readByte());System.out.flush();}} finally {ReferenceCountUtil.release(msg); // (2)} }如果你再次運行telnet命令,你將會看到服務端打印出了他所接收到的消息。
完整的discard server代碼放在了io.netty.example.discard包下面。
ECHO服務(響應式協(xié)議)
到目前為止,我們雖然接收到了數(shù)據(jù),但沒有做任何的響應。然而一個服務端通常會對一個請求作出響應。讓我們學習怎樣在ECHO協(xié)議的實現(xiàn)下編寫一個響應消息給客戶端,這個協(xié)議針對任何接收的數(shù)據(jù)都會返回一個響應。
和discard server唯一不同的是把在此之前我們實現(xiàn)的channelRead()方法,返回所有的數(shù)據(jù)替代打印接收數(shù)據(jù)到控制臺上的邏輯。因此,需要把channelRead()方法修改如下:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.write(msg); // (1)ctx.flush(); // (2) }1.?ChannelHandlerContext對象提供了許多操作,使你能夠觸發(fā)各種各樣的I/O事件和操作。這里我們調(diào)用了write(Object)方法來逐字地把接受到的消息寫入。請注意不同于DISCARD的例子我們并沒有釋放接受到的消息,這是因為當寫入的時候Netty已經(jīng)幫我們釋放了。
2. ctx.write(Object)方法不會使消息寫入到通道上,他被緩沖在了內(nèi)部,你需要調(diào)用ctx.flush()方法來把緩沖區(qū)中數(shù)據(jù)強行輸出。或者你可以用更簡潔的cxt.writeAndFlush(msg)以達到同樣的目的。
如果你再一次運行telnet命令,你會看到服務端會發(fā)回一個你已經(jīng)發(fā)送的消息。
完整的echo服務的代碼放在了io.netty.example.echo包下面。
TIME服務(時間協(xié)議的服務)
在這個部分被實現(xiàn)的協(xié)議是TIME協(xié)議。和之前的例子不同的是在不接受任何請求時他會發(fā)送一個含32位的整數(shù)的消息,并且一旦消息發(fā)送就會立即關(guān)閉連接。在這個例子中,你會學習到如何構(gòu)建和發(fā)送一個消息,然后在完成時主動關(guān)閉連接。
因為我們將會忽略任何接收到的數(shù)據(jù),而只是在連接被創(chuàng)建發(fā)送一個消息,所以這次我們不能使用channelRead()方法了,代替他的是,我們需要覆蓋channelActive()方法,下面的就是實現(xiàn)的內(nèi)容:
package io.netty.example.time;public class TimeServerHandler extends ChannelHandlerAdapter {@Overridepublic void channelActive(final ChannelHandlerContext ctx) { // (1)final ByteBuf time = ctx.alloc().buffer(4); // (2)time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));final ChannelFuture f = ctx.writeAndFlush(time); // (3)f.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {assert f == future;ctx.close();}}); // (4)}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }因此你需要在write()方法返回的ChannelFuture完成后調(diào)用close()方法,然后當他的寫操作已經(jīng)完成他會通知他的監(jiān)聽者。請注意,close()方法也可能不會立馬關(guān)閉,他也會返回一個ChannelFuture。
為了測試我們的time服務如我們期望的一樣工作,你可以使用UNIX的rdate命令
$ rdate -o <port> -p <host>Port是你在main()函數(shù)中指定的端口,host使用locahost就可以了。
Time客戶端
不像DISCARD和ECHO的服務端,對于TIME協(xié)議我們需要一個客戶端因為人們不能把一個32位的二進制數(shù)據(jù)翻譯成一個日期或者日歷。在這一部分,我們將會討論如何確保服務端是正常工作的,并且學習怎樣用Netty編寫一個客戶端。
在Netty中,編寫服務端和客戶端最大的并且唯一不同的使用了不同的BootStrap和Channel的實現(xiàn)。請看一下下面的代碼:
package io.netty.example.time;public class TimeClient {public static void main(String[] args) throws Exception {String host = args[0];int port = Integer.parseInt(args[1]);EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeClientHandler());}});// Start the client.ChannelFuture f = b.connect(host, port).sync(); // (5)// Wait until the connection is closed.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}} }正如你看到的,他和服務端的代碼是不一樣的。ChannelHandler是如何實現(xiàn)的?他應該從服務端接受一個32位的整數(shù)消息,把他翻譯成人們能讀懂的格式,并打印翻譯好的時間,最后關(guān)閉連接:
package io.netty.example.time;import java.util.Date;public class TimeClientHandler extends ChannelHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf m = (ByteBuf) msg; // (1)try {long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;System.out.println(new Date(currentTimeMillis));ctx.close();} finally {m.release();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }這樣看起來非常簡單,并且和服務端的那個例子的代碼也相差不多。然而,處理器有時候會因為拋出IndexOutOfBoundsException而拒絕工作。在下個部分我們會討論為什么會發(fā)生這種情況。
流數(shù)據(jù)的傳輸處理
一個小的Socket Buffer問題
在基于流的傳輸里比如TCP/IP,接收到的數(shù)據(jù)會先被存儲到一個socket接收緩沖里。不幸的是,基于流的傳輸并不是一個數(shù)據(jù)包隊列,而是一個字節(jié)隊列。即使你發(fā)送了2個獨立的數(shù)據(jù)包,操作系統(tǒng)也不會作為2個消息處理而僅僅是作為一連串的字節(jié)而言。因此這是不能保證你遠程寫入的數(shù)據(jù)就會準確地讀取。舉個例子,讓我們假設(shè)操作系統(tǒng)的TCP/TP協(xié)議棧已經(jīng)接收了3個數(shù)據(jù)包:
由于基于流傳輸?shù)膮f(xié)議的這種普通的性質(zhì),在你的應用程序里讀取數(shù)據(jù)的時候會有很高的可能性被分成下面的片段。
因此,一個接收方不管他是客戶端還是服務端,都應該把接收到的數(shù)據(jù)整理成一個或者多個更有意思并且能夠讓程序的業(yè)務邏輯更好理解的數(shù)據(jù)。在上面的例子中,接收到的數(shù)據(jù)應該被構(gòu)造成下面的格式:
第一個解決方案
現(xiàn)在讓我們回到TIME客戶端的例子上。這里我們遇到了同樣的問題,一個32字節(jié)數(shù)據(jù)是非常小的數(shù)據(jù)量,他并不見得會被經(jīng)常拆分到到不同的數(shù)據(jù)段內(nèi)。然而,問題是他確實可能會被拆分到不同的數(shù)據(jù)段內(nèi),并且拆分的可能性會隨著通信量的增加而增加。
最簡單的方案是構(gòu)造一個內(nèi)部的可積累的緩沖,直到4個字節(jié)全部接收到了內(nèi)部緩沖。下面的代碼修改了TimeClientHandler的實現(xiàn)類修復了這個問題
package io.netty.example.time;import java.util.Date;public class TimeClientHandler extends ChannelHandlerAdapter {private ByteBuf buf;@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {buf = ctx.alloc().buffer(4); // (1)}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {buf.release(); // (1)buf = null;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf m = (ByteBuf) msg;buf.writeBytes(m); // (2)m.release();if (buf.readableBytes() >= 4) { // (3)long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;System.out.println(new Date(currentTimeMillis));ctx.close();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();} }第二個解決方案
盡管第一個解決方案已經(jīng)解決了Time客戶端的問題了,但是修改后的處理器看起來不那么的簡潔,想象一下如果由多個字段比如可變長度的字段組成的更為復雜的協(xié)議時,你的ChannelHandler的實現(xiàn)將很快地變得難以維護。
正如你所知的,你可以增加多個ChannelHandler到ChannelPipeline?,因此你可以把一整個ChannelHandler拆分成多個模塊以減少應用的復雜程度,比如你可以把TimeClientHandler拆分成2個處理器:
- TimeDecoder處理數(shù)據(jù)拆分的問題
- TimeClientHandler原始版本的實現(xiàn)
幸運地是,Netty提供了一個可擴展的類,幫你完成TimeDecoder的開發(fā)。
package io.netty.example.time;public class TimeDecoder extends ByteToMessageDecoder { // (1)@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)if (in.readableBytes() < 4) {return; // (3)}out.add(in.readBytes(4)); // (4)} }現(xiàn)在我們有另外一個處理器插入到ChannelPipeline里,我們應該在TimeClient里修改ChannelInitializer?的實現(xiàn):
b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());} });如果你是一個大膽的人,你可能會嘗試使用更簡單的解碼類ReplayingDecoder。不過你還是需要參考一下API文檔來獲取更多的信息。
public class TimeDecoder extends ReplayingDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<object width="300" height="150">out) {out.add(in.readBytes(4));}}此外,Netty還提供了更多可以直接拿來用的解碼器使你可以更簡單地實現(xiàn)更多的協(xié)議,幫助你避免開發(fā)一個難以維護的處理器實現(xiàn)。請參考下面的包以獲取更多更詳細的例子:
- 對于二進制協(xié)議請看io.netty.example.factorial
- 對于基于文本協(xié)議請看io.netty.example.telnet
用POJO代替ByteBuf
我們已經(jīng)討論了所有的例子,到目前為止一個消息的消息都是使用ByteBuf作為一個基本的數(shù)據(jù)結(jié)構(gòu)。在這一部分,我們會改進TIME協(xié)議的客戶端和服務端的例子,用POJO替代ByteBuf。在你的ChannelHandlerS中使用POJO優(yōu)勢是比較明顯的。通過從ChannelHandler中提取出ByteBuf的代碼,將會使ChannelHandler的實現(xiàn)變得更加可維護和可重用。在TIME客戶端和服務端的例子中,我們讀取的僅僅是一個32位的整形數(shù)據(jù),直接使用ByteBuf不會是一個主要的問題。然后,你會發(fā)現(xiàn)當你需要實現(xiàn)一個真實的協(xié)議,分離代碼變得非常的必要。首先,讓我們定義一個新的類型叫做UnixTime。
package io.netty.example.time;import java.util.Date;public class UnixTime {private final int value;public UnixTime() {this((int) (System.currentTimeMillis() / 1000L + 2208988800L));}public UnixTime(int value) {this.value = value;}public int value() {return value;}@Overridepublic String toString() {return new Date((value() - 2208988800L) * 1000L).toString();} }現(xiàn)在我們可以修改下TimeDecoder類,返回一個UnixTime,以替代ByteBuf
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}out.add(new UnixTime(in.readInt())); }下面是修改后的解碼器,TimeClientHandler不再有任何的ByteBuf代碼了。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {UnixTime m = (UnixTime) msg;System.out.println(m);ctx.close(); }是不是變得更加簡單和優(yōu)雅了?相同的技術(shù)可以被運用到服務端。讓我們修改一下TimeServerHandler的代碼。
@Override public void channelActive(ChannelHandlerContext ctx) {ChannelFuture f = ctx.writeAndFlush(new UnixTime());f.addListener(ChannelFutureListener.CLOSE); }現(xiàn)在,僅僅需要修改的是ChannelHandler的實現(xiàn),這里需要把UnixTime對象重新轉(zhuǎn)化為一個ByteBuf。不過這已經(jīng)是非常簡單了,因為當你對一個消息編碼的時候,你不需要再處理拆包和組裝的過程。
package io.netty.example.time;public class TimeEncoder extends ChannelHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {UnixTime m = (UnixTime) msg;ByteBuf encoded = ctx.alloc().buffer(4);encoded.writeInt(m.value());ctx.write(encoded, promise); // (1)} }進一步簡化操作,你可以使用MessageToByteEncode:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {@Overrideprotected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {out.writeInt(msg.value());} }最后的任務就是在TimeServerHandler之前把TimeEncoder插入到ChannelPipeline。但這是不那么重要的工作。
關(guān)閉你的應用
關(guān)閉一個Netty應用往往只需要簡單地通過shutdownGracefully()方法來關(guān)閉你構(gòu)建的所有的NioEventLoopGroupS.當EventLoopGroup被完全地終止,并且對應的所有channels都已經(jīng)被關(guān)閉時,Netty會返回一個Future對象。
概述
在這一章節(jié)中,我們會快速地回顧下如果在熟練掌握Netty的情況下編寫出一個健壯能運行的網(wǎng)絡應用程序。在Netty接下去的章節(jié)中還會有更多更相信的信息。我們也鼓勵你去重新復習下在io.netty.example包下的例子。請注意社區(qū)一直在等待你的問題和想法以幫助Netty的持續(xù)改進,Netty的文檔也是基于你們的快速反饋上。
總結(jié)
以上是生活随笔為你收集整理的Netty 5用户指南的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SaaS 中 6 种常见 UI 入职模式
- 下一篇: 【动态规划】关于转移方程的简单理解