日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

java selector 源码_Java NIO——Selector机制源码分析---转

發(fā)布時間:2024/3/7 java 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java selector 源码_Java NIO——Selector机制源码分析---转 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一直不明白pipe是如何喚醒selector的,所以又去看了jdk的源碼(openjdk下載),整理了如下:

以Java nio自帶demo : OperationServer.java???OperationClient.java(見附件)

其中server端的核心代碼:

public voidinitSelector() {try{

selector=SelectorProvider.provider().openSelector();this.serverChannel1 =ServerSocketChannel.open();

serverChannel1.configureBlocking(false);

InetSocketAddress isa= new InetSocketAddress("localhost", this.port1);

serverChannel1.socket().bind(isa);

serverChannel1.register(selector, SelectionKey.OP_ACCEPT);

}catch(IOException e) {//TODO Auto-generated catch block

e.printStackTrace();

}

}

從頭開始,

先看看SelectorProvider.provider()做了什么:

public staticSelectorProvider provider() {synchronized(lock) {if (provider != null)returnprovider;returnAccessController.doPrivileged(new PrivilegedAction() {publicSelectorProvider run() {if(loadProviderFromProperty())returnprovider;if(loadProviderAsService())returnprovider;

provider=sun.nio.ch.DefaultSelectorProvider.create();returnprovider;

}

});

}

}

其中provider?= sun.nio.ch.DefaultSelectorProvider.create();會根據(jù)操作系統(tǒng)來返回不同的實現(xiàn)類,windows平臺就返回WindowsSelectorProvider;

而if?(provider?!=?null)?returnprovider;

保證了整個server程序中只有一個WindowsSelectorProvider對象;

再看看WindowsSelectorProvider.?openSelector():

public AbstractSelector openSelector() throwsIOException {return new WindowsSelectorImpl(this);

}newWindowsSelectorImpl(SelectorProvider)代碼:

WindowsSelectorImpl(SelectorProvider sp)throwsIOException {super(sp);

pollWrapper= newPollArrayWrapper(INIT_CAP);

wakeupPipe=Pipe.open();

wakeupSourceFd=((SelChImpl)wakeupPipe.source()).getFDVal();//Disable the Nagle algorithm so that the wakeup is more immediate

SinkChannelImpl sink =(SinkChannelImpl)wakeupPipe.sink();

(sink.sc).socket().setTcpNoDelay(true);

wakeupSinkFd=((SelChImpl)sink).getFDVal();

pollWrapper.addWakeupSocket(wakeupSourceFd,0);

}

其中Pipe.open()是關(guān)鍵,這個方法的調(diào)用過程是:

Java代碼

public static Pipe open() throwsIOException {returnSelectorProvider.provider().openPipe();

}

SelectorProvider 中:public Pipe openPipe() throwsIOException {return new PipeImpl(this);

}

再看看怎么new PipeImpl()的:

Java代碼

PipeImpl(SelectorProvider sp) {long pipeFds = IOUtil.makePipe(true);int readFd = (int) (pipeFds >>> 32);int writeFd = (int) pipeFds;

FileDescriptor sourcefd= newFileDescriptor();

IOUtil.setfdVal(sourcefd, readFd);

source= newSourceChannelImpl(sp, sourcefd);

FileDescriptor sinkfd= newFileDescriptor();

IOUtil.setfdVal(sinkfd, writeFd);

sink= newSinkChannelImpl(sp, sinkfd);

}

其中IOUtil.makePipe(true)是個native方法:

/**

* Returns two file descriptors for a pipe encoded in a long.

* The read end of the pipe is returned in the high 32 bits,

* while the write end is returned in the low 32 bits.

*/

staticnativelong?makePipe(boolean?blocking);

具體實現(xiàn):

JNIEXPORT jlong JNICALL

Java_sun_nio_ch_IOUtil_makePipe(JNIEnv*env, jobject this, jboolean blocking)

{int fd[2];if (pipe(fd) < 0) {

JNU_ThrowIOExceptionWithLastError(env,"Pipe failed");return 0;

}if (blocking ==JNI_FALSE) {if ((configureBlocking(fd[0], JNI_FALSE) < 0)|| (configureBlocking(fd[1], JNI_FALSE) < 0)) {

JNU_ThrowIOExceptionWithLastError(env,"Configure blocking failed");

close(fd[0]);

close(fd[1]);return 0;

}

}return ((jlong) fd[0] << 32) | (jlong) fd[1];

}static intconfigureBlocking(intfd, jboolean blocking)

{int flags =fcntl(fd, F_GETFL);int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags |O_NONBLOCK);return (flags == newflags) ? 0: fcntl(fd, F_SETFL, newflags);

}

正如這段注釋:

/**

* Returns two file descriptors for a pipe encoded in a long.

* The read end of the pipe is returned in the high 32 bits,

* while the write end is returned in the low 32 bits.

*/

High32位存放的是通道read端的文件描述符FD(file descriptor),low 32 bits存放的是write端的文件描述符。所以取到makepipe()返回值后要做移位處理。

pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

這行代碼把返回的pipe的write端的FD放在了pollWrapper中(后面會發(fā)現(xiàn),這么做是為了實現(xiàn)selector的wakeup())

ServerSocketChannel.open()的實現(xiàn):

public static ServerSocketChannel open() throwsIOException {returnSelectorProvider.provider().openServerSocketChannel();

}

SelectorProvider:public ServerSocketChannel openServerSocketChannel() throwsIOException {return new ServerSocketChannelImpl(this);

}

可見創(chuàng)建的ServerSocketChannelImpl也有WindowsSelectorImpl的引用。

ServerSocketChannelImpl(SelectorProvider sp) throwsIOException {super(sp);this.fd = Net.serverSocket(true); //打開一個socket,返回FD

this.fdVal =IOUtil.fdVal(fd);this.state =ST_INUSE;

}

然后通過serverChannel1.register(selector, SelectionKey.OP_ACCEPT);把selector和channel綁定在一起,也就是把new ServerSocketChannel時創(chuàng)建的FD與selector綁定在了一起。

到此,server端已啟動完成了,主要創(chuàng)建了以下對象:

WindowsSelectorProvider:單例

WindowsSelectorImpl中包含:

pollWrapper:保存selector上注冊的FD,包括pipe的write端FD和ServerSocketChannel所用的FD

wakeupPipe:通道(其實就是兩個FD,一個read,一個write)

再到Server?中的run():

selector.select();主要調(diào)用了WindowsSelectorImpl中的這個方法:

protected int doSelect(long timeout) throwsIOException {if (channelArray == null)throw newClosedSelectorException();this.timeout = timeout; //set selector timeout

processDeregisterQueue();if(interruptTriggered) {

resetWakeupSocket();return 0;

}//Calculate number of helper threads needed for poll. If necessary//threads are created here and start waiting on startLock

adjustThreadsCount();

finishLock.reset();//reset finishLock//Wakeup helper threads, waiting on startLock, so they start polling.//Redundant threads will exit here after wakeup.

startLock.startThreads();//do polling in the main thread. Main thread is responsible for//first MAX_SELECTABLE_FDS entries in pollArray.

try{

begin();try{

subSelector.poll();

}catch(IOException e) {

finishLock.setException(e);//Save this exception

}//Main thread is out of poll(). Wakeup others and wait for them

if (threads.size() > 0)

finishLock.waitForHelperThreads();

}finally{

end();

}//Done with poll(). Set wakeupSocket to nonsignaled for the next run.

finishLock.checkForException();

processDeregisterQueue();int updated =updateSelectedKeys();//Done with poll(). Set wakeupSocket to nonsignaled for the next run.

resetWakeupSocket();returnupdated;

}

其中subSelector.poll()是核心,也就是輪訓(xùn)pollWrapper中保存的FD;具體實現(xiàn)是調(diào)用native方法poll0:

private int poll() throws IOException{ //poll for the main thread

returnpoll0(pollWrapper.pollArrayAddress,

Math.min(totalChannels, MAX_SELECTABLE_FDS),

readFds, writeFds, exceptFds, timeout);

}private native int poll0(long pollAddress, intnumfds,int[] readFds, int[] writeFds, int[] exceptFds, longtimeout);//These arrays will hold result of native select().//The first element of each array is the number of selected sockets.//Other elements are file descriptors of selected sockets.

private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存發(fā)生read的FD

private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發(fā)生write的FD

private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發(fā)生except的FD

這個poll0()會監(jiān)聽pollWrapper中的FD有沒有數(shù)據(jù)進(jìn)出,這會造成IO阻塞,直到有數(shù)據(jù)讀寫事件發(fā)生。比如,由于pollWrapper中保存的也有ServerSocketChannel的FD,所以只要ClientSocket發(fā)一份數(shù)據(jù)到ServerSocket,那么poll0()就會返回;又由于pollWrapper中保存的也有pipe的write端的FD,所以只要pipe的write端向FD發(fā)一份數(shù)據(jù),也會造成poll0()返回;如果這兩種情況都沒有發(fā)生,那么poll0()就一直阻塞,也就是selector.select()會一直阻塞;如果有任何一種情況發(fā)生,那么selector.select()就會返回,所有在OperationServer的run()里要用while?(true) {,這樣就可以保證在selector接收到數(shù)據(jù)并處理完后繼續(xù)監(jiān)聽poll();

這時再來看看WindowsSelectorImpl.?Wakeup():

publicSelector wakeup() {synchronized(interruptLock) {if (!interruptTriggered) {

setWakeupSocket();

interruptTriggered= true;

}

}return this;

}//Sets Windows wakeup socket to a signaled state.

private voidsetWakeupSocket() {

setWakeupSocket0(wakeupSinkFd);

}private native void setWakeupSocket0(intwakeupSinkFd);

JNIEXPORTvoidJNICALL

Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv*env, jclass this,

jint scoutFd)

{/*Write one byte into the pipe*/

const char byte = 1;

send(scoutFd,&byte, 1, 0);

}

可見wakeup()是通過pipe的write?端send(scoutFd, &byte, 1, 0),發(fā)生一個字節(jié)1,來喚醒poll()。所以在需要的時候就可以調(diào)用selector.wakeup()來喚醒selector。

原文:http://goon.iteye.com/blog/1775421

補充linux操作系統(tǒng)下的DefaultSelectorProvider的實現(xiàn),可以看到,如果內(nèi)核版本>=2.6則,具體的SelectorProvider為EPollSelectorProvider,否則為默認(rèn)的PollSelectorProvider

//sun.nio.ch.DefaultSelectorProvider

public staticSelectorProvider create() {

PrivilegedAction pa= new GetPropertyAction("os.name");

String osname=(String) AccessController.doPrivileged(pa);if ("SunOS".equals(osname)) {return newsun.nio.ch.DevPollSelectorProvider();

}//use EPollSelectorProvider for Linux kernels >= 2.6

if ("Linux".equals(osname)) {

pa= new GetPropertyAction("os.version");

String osversion=(String) AccessController.doPrivileged(pa);

String[] vers= osversion.split("\\.", 0);if (vers.length >= 2) {try{int major = Integer.parseInt(vers[0]);int minor = Integer.parseInt(vers[1]);if (major > 2 || (major == 2 && minor >= 6)) {return newsun.nio.ch.EPollSelectorProvider();

}

}catch(NumberFormatException x) {//format not recognized

}

}

}return newsun.nio.ch.PollSelectorProvider();

}

總結(jié)

以上是生活随笔為你收集整理的java selector 源码_Java NIO——Selector机制源码分析---转的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。