从NIO到Netty开发
轉(zhuǎn)載自??從NIO到Netty開發(fā)
1. 從傳統(tǒng)BIO到NIO的升級(jí)
2. NIO新特性
?
3. NIO服務(wù)端實(shí)現(xiàn)
根據(jù)上圖,發(fā)現(xiàn)服務(wù)端的事件有2個(gè),一是接受連接事件,二是讀取數(shù)據(jù):
public class NIOServer {private ByteBuffer readBuffer; ? ?private Selector selector; ? ?private ServerSocket serverSocket; ? ?public static void main(String[] args) {NIOServer server = new NIOServer();server.init();System.out.println("server started:8383");server.listener();} ? ?public void init() { ? ? ? ?//1. 創(chuàng)建臨時(shí)緩沖區(qū)readBuffer = ByteBuffer.allocate(1024); ? ? ? ?//2. 創(chuàng)建服務(wù)端Socket非阻塞通道ServerSocketChannel serverSocketChannel; ? ? ? ?try {serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false); ? ? ? ? ? ?//3. 指定內(nèi)部Socket綁定的服務(wù)端地址 并支持重用端口,因?yàn)橛锌赡芏鄠€(gè)客戶端同時(shí)訪問同一端口serverSocket=serverSocketChannel.socket();serverSocket.setReuseAddress(true);serverSocket.bind(new InetSocketAddress(8383)); ? ? ? ? ? ?//4. 創(chuàng)建輪詢器 并綁定到管道上,開始監(jiān)聽客戶端請(qǐng)求selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (Exception e) {e.printStackTrace();}} ? ?private void listener() { ? ? ? ?while (true) { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?//5. 開始監(jiān)聽事件,不斷取出事件的key,假如存在事件,則直接處理。selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); ? ? ? ? ? ? ? ?while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove();handleKey(key);}} catch (Exception e) {e.printStackTrace();}}} ? ?private void handleKey(SelectionKey key) throws IOException {SocketChannel channel = null; ? ? ? ?try { ? ? ? ? ? ?//6. 如果有客戶端要連接 這里則處理是否接受連接事件if (key.isAcceptable()) {ServerSocketChannel severChannel = (ServerSocketChannel) key.channel();channel = severChannel.accept();channel.configureBlocking(false); ? ? ? ? ? ? ? ?// 告訴輪詢器 接下來關(guān)心的是讀取客戶端數(shù)據(jù)這件事channel.register(selector, SelectionKey.OP_READ);} else if (key.isReadable()) { //7. 如果客戶端發(fā)送數(shù)據(jù),則這里讀取數(shù)據(jù)。channel = (SocketChannel) key.channel(); ? ? ? ? ? ? ? ?// 清空緩沖區(qū)readBuffer.clear(); ? ? ? ? ? ? ? ?// 當(dāng)客戶端關(guān)閉channel后,會(huì)不斷收到read事件,此刻read方法返回-1 所以對(duì)應(yīng)的服務(wù)器端也需要關(guān)閉channelint readCount = channel.read(readBuffer); ? ? ? ? ? ? ? ?if (readCount > 0) {readBuffer.flip();String question = CharsetHelper.decode(readBuffer).toString();System.out.println("server get the question:" + question);String answer = getAnswer(question);channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));} else {channel.close();}} } catch (Exception e) {e.printStackTrace();}finally { ? ? ? ? ? ?//8. 斷開連接通道if (channel!=null) {channel.close();}}} ? ?public static String getAnswer(String question) {String answer = null; ? ? ? ?switch (question) { ? ? ? ?case "who":answer = "我是小娜\n"; ? ? ? ? ? ?break; ? ? ? ?case "what":answer = "我是來幫你解悶的\n"; ? ? ? ? ? ?break; ? ? ? ?case "where":answer = "我來自外太空\n"; ? ? ? ? ? ?break; ? ? ? ?case "hi":answer = "hello\n"; ? ? ? ? ? ?break; ? ? ? ?case "bye":answer = "88\n"; ? ? ? ? ? ?break; ? ? ? ?default:answer = "請(qǐng)輸入 who, 或者what, 或者where";} ? ? ? ?return answer;}}4. NIO客戶端實(shí)現(xiàn):
客戶端的實(shí)現(xiàn)有3個(gè)步驟:1.請(qǐng)求連接。2.當(dāng)連接成功,寫數(shù)據(jù)。3.讀取服務(wù)端結(jié)果。
public class NIOClient implements Runnable {private BlockingQueue<String> words; ? ?private Random random; ? ?public static void main(String[] args) { ? ? ? ?// 多個(gè)線程發(fā)起Socket客戶端連接請(qǐng)求for (int i = 0; i < 5; i++) {NIOClient c = new NIOClient();c.init(); ? ? ? ? ? ?new Thread(c).start();}} ? ?//1. 初始化要發(fā)送的數(shù)據(jù)private void init() {words = new ArrayBlockingQueue<String>(5);random = new Random(); ? ? ? ?try {words.put("hi");words.put("who");words.put("what");words.put("where");words.put("bye");} catch (Exception e) { ? ? ? ? ? ?// TODO: handle exception}} ? ?//2. 啟動(dòng)子線程代碼@Overridepublic void run() {SocketChannel channel = null;Selector selector = null; ? ? ? ?try { ? ? ? ? ? ?//3. 創(chuàng)建連接服務(wù)端的通道 并設(shè)置為阻塞方法,這里需要指定服務(wù)端的ip和端口號(hào)channel = SocketChannel.open();channel.configureBlocking(false);channel.connect(new InetSocketAddress("localhost", 8383));selector = Selector.open(); ? ? ? ? ? ?//4. 請(qǐng)求關(guān)心連接事件channel.register(selector, SelectionKey.OP_CONNECT); ? ? ? ? ? ?boolean isOver = false; ? ? ? ? ? ?while (!isOver) {selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); ? ? ? ? ? ? ? ?while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove(); ? ? ? ? ? ? ? ? ? ?if (key.isConnectable()) { //5. 當(dāng)通道連接準(zhǔn)備完畢,發(fā)送請(qǐng)求并指定接收允許獲取服務(wù)端返回信息if (channel.isConnectionPending()) { ? ? ? ? ? ? ? ? ? ? ? ? ? ?if (channel.finishConnect()) {key.interestOps(SelectionKey.OP_READ);channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));sleep();} else {key.cancel();}}} else if (key.isReadable()) {//6. 開始讀取服務(wù)端返回?cái)?shù)據(jù)ByteBuffer byteBuffer = ByteBuffer.allocate(512);channel.read(byteBuffer);byteBuffer.flip();String answer = CharsetHelper.decode(byteBuffer).toString();System.out.println("client get the answer:" + answer);String word = getWord(); ? ? ? ? ? ? ? ? ? ? ? ?if (word != null) {channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));} else {isOver = true;}sleep();}}}} catch (Exception e) {e.printStackTrace();} finally { ? ? ? ? ? ?//7. 關(guān)閉通道if (channel != null) { ? ? ? ? ? ? ? ?try {channel.close();} catch (IOException e) {e.printStackTrace();}}}} ? ?public String getWord() { ? ? ? ?return words.poll();} ? ?private void sleep() { ? ? ? ?try {TimeUnit.SECONDS.sleep(random.nextInt(3));} catch (InterruptedException e) {e.printStackTrace();}}}-
?
5.Netty開發(fā)簡介
上面提到,NIO可以實(shí)現(xiàn)同步非阻塞的數(shù)據(jù)交互,但是對(duì)于NIO來說,一個(gè)普通的請(qǐng)求數(shù)據(jù)需要太多的開發(fā)步驟,不利于推廣,這里主要介紹NIO的實(shí)現(xiàn)框架Netty.
Netty是由JBOSS提供的一個(gè)java開源框架。Netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。
也就是說,Netty 是一個(gè)基于NIO的客戶、服務(wù)器端編程框架,使用Netty 可以確保你快速和簡單的開發(fā)出一個(gè)網(wǎng)絡(luò)應(yīng)用,例如實(shí)現(xiàn)了某種協(xié)議的客戶,服務(wù)端應(yīng)用。Netty相當(dāng)簡化和流線化了網(wǎng)絡(luò)應(yīng)用的編程開發(fā)過程,例如,TCP和UDP的socket服務(wù)開發(fā)。
?
6. Netty服務(wù)端實(shí)現(xiàn):
public class EchoServer {private final int port; ? ?public EchoServer(int port) { ? ? ? ?this.port = port;} ? ?public void start() throws Exception { ? ? ? ?//1.創(chuàng)建線程組EventLoopGroup group=new NioEventLoopGroup(); ? ? ? ?try { ? ? ? ? ? ?//2.創(chuàng)建服務(wù)端啟動(dòng)對(duì)象 裝配線程組&交互通道&服務(wù)器端口&網(wǎng)絡(luò)請(qǐng)求處理器鏈ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(group).channel(NioServerSocketChannel.class) .localAddress("localhost", port) ? ?.childHandler(new ChannelInitializer<Channel>() { @Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new EchoOutHandler1());ch.pipeline().addLast(new EchoOutHandler2());ch.pipeline().addLast(new EchoInHandler1());ch.pipeline().addLast(new EchoInHandler2());}}); ? ? ? ? ? ?// 3.開始監(jiān)聽客戶端請(qǐng)求ChannelFuture channelFuture = serverBootstrap.bind().sync();System.out.println("開始監(jiān)聽,端口號(hào)為:"+channelFuture.channel().localAddress()); ? ? ? ? ? ?// 4.等待所有請(qǐng)求執(zhí)行完畢后,關(guān)閉通道;如請(qǐng)求還沒執(zhí)行完,這里為阻塞狀態(tài)。channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally { ? ? ? ? ? ?//5.停止所有線程組內(nèi)部代碼的執(zhí)行g(shù)roup.shutdownGracefully().sync();}} ? ?public static void main(String[] args) throws Exception { ? ? ? ?new EchoServer(2000).start();}}7.Netty客戶端實(shí)現(xiàn):
public class EchoClient {public static void main(String[] args) throws InterruptedException { ? ? ? ?new EchoClient("localhost", 2000).start();} ? ?private final String host; ? ?private final int port; ? ?public EchoClient(String host, int port) { ? ? ? ?this.host = host; ? ? ? ?this.port = port;} ? ?private void start() throws InterruptedException { ? ? ? ?//1.創(chuàng)建線程組EventLoopGroup group = new NioEventLoopGroup(); ? ? ? ?try { ? ? ? ? ? ?//2. 創(chuàng)建客戶端啟動(dòng)對(duì)象,同樣需要裝配線程組,通道,綁定遠(yuǎn)程地址,請(qǐng)求處理器鏈。Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).remoteAddress(host, port).handler(new ChannelInitializer<Channel>() { ? ? ? ? ? ? ? ? ? ? ? ?@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new EchoClientHandler());}}); ? ? ? ? ? ?//3.開始請(qǐng)求連接ChannelFuture future = bootstrap.connect().sync(); ? ? ? ? ? ?//4.當(dāng)請(qǐng)求操作結(jié)后,關(guān)閉通道。future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally { ? ? ? ? ? ?if (group != null) {group.shutdownGracefully().sync();}}}}8.Netty處理器鏈
對(duì)于向服務(wù)端發(fā)送一個(gè)請(qǐng)求,并得到一個(gè)響應(yīng)來說。如果使用Netty來說,需要實(shí)現(xiàn)兩種不同的處理器,一個(gè)是讀的一個(gè)是寫的。他們共同組成一個(gè)鏈?zhǔn)秸{(diào)用,如下圖:
?
對(duì)于服務(wù)端來說,上面我們創(chuàng)建了4個(gè)處理器,他們組成一條鏈,分別是:EchoInHandler1 -> EchoInHandler2 -> EchoOutHandler2 -> EchoOutHandler1.
public class EchoInHandler1 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("EchoInHandler1 ?channelRead..."); ? ? ? ?//將消息傳遞到新的鏈。。。ctx.fireChannelRead(msg);} ? ?@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();} }public class EchoInHandler2 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("EchoInHandler2 ?channelRead..."); ? ? ? ?// Object msg 為Netty的一種緩存對(duì)象ByteBuf buffer = (ByteBuf) msg; ? ? ? ?byte[] req = new byte[buffer.readableBytes()];buffer.readBytes(req);String reqBody = new String(req, "UTF-8");System.out.println("獲取到的客戶端請(qǐng)求:" + reqBody); ? ? ? ?// 往客戶端寫數(shù)據(jù)String date = new Date().toString();ByteBuf returnBuf = Unpooled.copiedBuffer(date.getBytes());ctx.write(returnBuf);} ? ?@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("EchoOutHandler2 write...");ctx.write(msg);}}public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("EchoOutHandler1 write...");System.out.println("write msg:" + msg);ctx.write(msg);ctx.flush();// 最后將數(shù)據(jù)刷新到客戶端}}客戶端的處理器主要是當(dāng)連接成功后,請(qǐng)求獲取當(dāng)前時(shí)間,并讀取返回結(jié)果:
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{ ? ?//客戶端連接服務(wù)器的時(shí)候調(diào)用@Override ? ?public void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端連接服務(wù)器。。。。"); ? ? ? ?byte[] req = "QUERY TIME ORDER".getBytes();ByteBuf copiedBuffer = Unpooled.buffer(req.length);copiedBuffer.writeBytes(req);ctx.writeAndFlush(copiedBuffer);} ? ?//讀取服務(wù)端數(shù)據(jù)@Override ? ?protected void channelRead0(ChannelHandlerContext ctx, ByteBuf bytbuf) throws Exception {System.out.println("client get the server's data"); ? ? ? ?byte[] resp=new byte[bytbuf.readableBytes()];bytbuf.readBytes(resp);String respContent = new String(resp,"UTF-8");System.out.println("返回的數(shù)據(jù):"+respContent);} ? ?//強(qiáng)制關(guān)閉服務(wù)器的連接也會(huì)造成異常:遠(yuǎn)程主機(jī)強(qiáng)迫關(guān)閉了一個(gè)現(xiàn)有的連接。@Override ? ?public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println(cause.getLocalizedMessage());ctx.close();}}總結(jié)
以上是生活随笔為你收集整理的从NIO到Netty开发的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电视剧装台大结局介绍 装台简介
- 下一篇: 图解elasticsearch原理转载自