日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

源码分析netty服务器创建过程vs java nio服务器创建

發(fā)布時(shí)間:2025/4/5 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 源码分析netty服务器创建过程vs java nio服务器创建 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1.Java NIO服務(wù)端創(chuàng)建

首先,我們通過一個(gè)時(shí)序圖來看下如何創(chuàng)建一個(gè)NIO服務(wù)端并啟動(dòng)監(jiān)聽,接收多個(gè)客戶端的連接,進(jìn)行消息的異步讀寫。

?

示例代碼(參考文獻(xiàn)【2】):

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.Iterator; import java.util.Set;/*** User: mihasya* Date: Jul 25, 2010* Time: 9:09:03 AM*/ public class JServer {public static void main (String[] args) {ServerSocketChannel sch = null;Selector sel = null;try {// setup the socket we're listening for connections on.InetSocketAddress addr = new InetSocketAddress(8400);sch = ServerSocketChannel.open();sch.configureBlocking(false);sch.socket().bind(addr);// setup our selector and register the main socket on it sel = Selector.open();sch.register(sel, SelectionKey.OP_ACCEPT);} catch (IOException e) {System.out.println("Couldn't setup server socket");System.out.println(e.getMessage());System.exit(1);}// fire up the listener thread, pass it our selectorListenerThread listener = new ListenerThread(sel);listener.run();}/** the thread is completely unnecessary, it could all just happen* in main()*/ class ListenerThread extends Thread {Selector sel = null;ListenerThread(Selector sel) {this.sel = sel;}public void run() {while (true) {// our canned response for nowByteBuffer resp = ByteBuffer.wrap(new String("got it\n").getBytes());try {// loop over all the sockets that are ready for some activitywhile (this.sel.select() > 0) {Set keys = this.sel.selectedKeys();Iterator i = keys.iterator();while (i.hasNext()) {SelectionKey key = (SelectionKey)i.next();if (key.isAcceptable()) {// this means that a new client has hit the port our main// socket is listening on, so we need to accept the connection// and add the new client socket to our select pool for reading// a command laterSystem.out.println("Accepting connection!");// this will be the ServerSocketChannel we initially registered// with the selector in main()ServerSocketChannel sch = (ServerSocketChannel)key.channel();SocketChannel ch = sch.accept();ch.configureBlocking(false);ch.register(this.sel, SelectionKey.OP_READ);} else if (key.isReadable()) {// one of our client sockets has received a command and// we're now ready to read it inSystem.out.println("Accepting command!"); SocketChannel ch = (SocketChannel)key.channel();ByteBuffer buf = ByteBuffer.allocate(200);ch.read(buf);buf.flip();Charset charset = Charset.forName("UTF-8");CharsetDecoder decoder = charset.newDecoder();CharBuffer cbuf = decoder.decode(buf);System.out.print(cbuf.toString());// re-register this socket with the selector, this time// for writing since we'll want to write something to it// on the next go-aroundch.register(this.sel, SelectionKey.OP_WRITE);} else if (key.isWritable()) {// we are ready to send a response to one of the client sockets// we had read a command from previouslySystem.out.println("Sending response!");SocketChannel ch = (SocketChannel)key.channel();ch.write(resp);resp.rewind();// we may get another command from this guy, so prepare// to read again. We could also close the channel, but// that sort of defeats the whole purpose of doing asyncch.register(this.sel, SelectionKey.OP_READ);}i.remove();}}} catch (IOException e) {System.out.println("Error in poll loop");System.out.println(e.getMessage());System.exit(1);}}} } }

從上面的代碼可以看出java nio的通用步驟:

1.打開ServerSocketChannel,用于監(jiān)聽客戶端的連接,它是所有客戶端連接的父通道,綁定監(jiān)聽端口,設(shè)置客戶端連接方式為非阻塞模式。

2.打開多路復(fù)用器并啟動(dòng)服務(wù)端監(jiān)聽線程,將ServerSocketChannel注冊到Reactor線程的多路復(fù)用器Selector上,監(jiān)聽ACCEPT狀態(tài)。

3.多路復(fù)用器監(jiān)聽到有新的客戶端接入,處理新的接入請(qǐng)求,完成TCP三次握手后,與客戶端建立物理鏈路。

?

2. Netty服務(wù)端創(chuàng)建

?2.1 打開ServerSocketChannel

ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

創(chuàng)建一個(gè)ServerSocketChannel的過程:

/*** Create a new instance*/public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}

調(diào)用newSocket方法:

private static ServerSocketChannel newSocket(SelectorProvider provider) {try {/*** Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.** See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.*/return provider.openServerSocketChannel();} catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);}}

其中的provider.openServerSocketChannel()就是java nio的實(shí)現(xiàn)。設(shè)置非阻塞模式包含在父類中:

/*** Create a new instance** @param parent the parent {@link Channel} by which this instance was created. May be {@code null}* @param ch the underlying {@link SelectableChannel} on which it operates* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}*/protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", e2);}}throw new ChannelException("Failed to enter non-blocking mode.", e);}}

?

?

2.2?打開多路復(fù)用器過程

NioEventLoop負(fù)責(zé)調(diào)度和執(zhí)行Selector輪詢操作,選擇準(zhǔn)備就緒的Channel集合,相關(guān)代碼如下:

@Overrideprotected void run() {for (;;) {boolean oldWakenUp = wakenUp.getAndSet(false);try {if (hasTasks()) {selectNow();} else {select(oldWakenUp);// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {processSelectedKeys();runAllTasks();} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break;}}} catch (Throwable t) {logger.warn("Unexpected exception in the selector loop.", t);// Prevent possible consecutive immediate failures that lead to// excessive CPU consumption.try {Thread.sleep(1000);} catch (InterruptedException e) {// Ignore. }}}}

2.2.1 綁定處理的key

private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}

以processSelectedKeysPlain為例:

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {// check if the set is empty and if so just return to not create garbage by// creating a new Iterator every time even if there is nothing to process.// See https://github.com/netty/netty/issues/597if (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> i = selectedKeys.iterator();for (;;) {final SelectionKey k = i.next();final Object a = k.attachment();i.remove();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (!i.hasNext()) {break;}if (needsToSelectAgain) {selectAgain();selectedKeys = selector.selectedKeys();// Create the iterator again to avoid ConcurrentModificationExceptionif (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();}}}}

?

?2.3 綁定端口,接收請(qǐng)求:

// Bind and start to accept incoming connections.ChannelFuture f = b.bind(PORT).sync();

調(diào)用bind程序

private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.executor = channel.eventLoop();}doBind0(regFuture, channel, localAddress, promise);}});return promise;}}

最終的綁定由dobind0來完成

private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new OneTimeTask() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

具體實(shí)現(xiàn):

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {// close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {// Connection already closed - no need to handle write.return;}}if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush();}if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

?

參考文獻(xiàn)

【1】http://www.infoq.com/cn/articles/netty-server-create

【2】https://github.com/mihasya/sample-java-nio-server/blob/master/src/JServer.java

轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/5050607.html

總結(jié)

以上是生活随笔為你收集整理的源码分析netty服务器创建过程vs java nio服务器创建的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。