基于JDK7 NIO2的高性能web服务器实践之二(转)
前一篇博客,我簡單提了下怎么為NIO2增加TransmitFile支持,文件傳送吞吐量是一個性能關注點,此外,并發連接數也是重要的關注點。?
不過JDK7中又一次做了簡單的實現,不支持同時投遞多個AcceptEx請求,只支持一次一個,返回后再投遞。這樣,客戶端連接的接受速度必然大打折扣。不知道為什么sun會做這樣的實現,WSASend()/WSAReceive()一次只允許一個還是可以理解,畢竟簡化了編程,不用考慮封包亂序問題。
也降低了內存耗盡的風險。AcceptEx卻沒有這樣的理由了。
于是再一次為了性能,我增加了同時投遞多個的支持。
另外,在JDK7的默認實現中,AcceptEx返回后,為了設置遠程和本地InetSocketAddress也采用了效率很低的方法。4次通過JNI調用getsockname,2次為了取sockaddr,2次為了取port. 這些操作本人采用GetAcceptExSockaddrs一次完成,進一步提高效率。
先看Java部分的代碼,框架跟JDK7的一樣,細節處理不一樣:
?*?
?*/
package?sun.nio.ch;
import?java.io.IOException;
import?java.lang.reflect.Field;
import?java.lang.reflect.Method;
import?java.net.InetAddress;
import?java.net.InetSocketAddress;
import?java.nio.channels.AcceptPendingException;
import?java.nio.channels.AsynchronousCloseException;
import?java.nio.channels.AsynchronousServerSocketChannel;
import?java.nio.channels.AsynchronousSocketChannel;
import?java.nio.channels.ClosedChannelException;
import?java.nio.channels.CompletionHandler;
import?java.nio.channels.NotYetBoundException;
import?java.nio.channels.ShutdownChannelGroupException;
import?java.security.AccessControlContext;
import?java.security.AccessController;
import?java.security.PrivilegedAction;
import?java.util.Queue;
import?java.util.concurrent.ConcurrentLinkedQueue;
import?java.util.concurrent.Future;
import?java.util.concurrent.atomic.AtomicBoolean;
import?java.util.concurrent.atomic.AtomicInteger;
import?sun.misc.Unsafe;
/**
?*?This?class?enable?multiple?'AcceptEx'?post?on?the?completion?port,?hence?improve?the?concurrent?connection?number.
?*?@author?Yvon
?*
?*/
public?class?WindowsMultiAcceptSupport?{
????WindowsAsynchronousServerSocketChannelImpl?schannel;
????private?static?final?Unsafe?unsafe?=?Unsafe.getUnsafe();
????//?2?*?(sizeof(SOCKET_ADDRESS)?+?16)
????private?static?final?int?ONE_DATA_BUFFER_SIZE?=?88;
????private?long?handle;
????private?Iocp?iocp;
????//?typically?there?will?be?zero,?or?one?I/O?operations?pending.?In?rare
????//?cases?there?may?be?more.?These?rare?cases?arise?when?a?sequence?of?accept
????//?operations?complete?immediately?and?handled?by?the?initiating?thread.
????//?The?corresponding?OVERLAPPED?cannot?be?reused/released?until?the?completion
????//?event?has?been?posted.
????private?PendingIoCache?ioCache;
????private?Queue<Long>?dataBuffers;
????//?the?data?buffer?to?receive?the?local/remote?socket?address
????//????????private?final?long?dataBuffer;
????private?AtomicInteger?pendingAccept;
????private?int?maxPending;
????Method?updateAcceptContextM;
????Method?acceptM;
????WindowsMultiAcceptSupport()?{
????????//dummy?for?JNI?code.
????}
????public?void?close()?throws?IOException?{
????????schannel.close();
????????for?(int?i?=?0;?i?<?maxPending?+?1;?i++)//assert?there?is?maxPending+1?buffer?in?the?queue
????????{
????????????long?addr?=?dataBuffers.poll();
????????????//?release??resources
????????????unsafe.freeMemory(addr);
????????}
????}
????/**
?????*?
?????*/
????public?WindowsMultiAcceptSupport(AsynchronousServerSocketChannel?ch,?int?maxPost)?{
????????if?(maxPost?<=?0?||?maxPost?>?1024)
????????????throw?new?IllegalStateException("maxPost?can't?less?than?1?and?greater?than?1024");
????????this.schannel?=?(WindowsAsynchronousServerSocketChannelImpl)?ch;
????????maxPending?=?maxPost;
????????dataBuffers?=?new?ConcurrentLinkedQueue<Long>();
????????for?(int?i?=?0;?i?<?maxPending?+?1;?i++)?{
????????????dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));
????????}
????????pendingAccept?=?new?AtomicInteger(0);
????????try?{
????????????Field?f?=?WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");
????????????f.setAccessible(true);
????????????handle?=?f.getLong(schannel);
????????????f?=?WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");
????????????f.setAccessible(true);
????????????iocp?=?(Iocp)?f.get(schannel);
????????????f?=?WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");
????????????f.setAccessible(true);
????????????ioCache?=?(PendingIoCache)?f.get(schannel);
????????????f?=?WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");
????????????f.setAccessible(true);
????????????AtomicBoolean?accepting?=?(AtomicBoolean)?f.get(schannel);
????????????accepting.set(true);//disable?accepting?by?origin?channel.
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
????@SuppressWarnings("unchecked")
????public?final?<A>?void?accept(A?attachment,
????????CompletionHandler<AsynchronousSocketChannel,???super?A>?handler)?{
????????if?(handler?==?null)
????????????throw?new?NullPointerException("'handler'?is?null");
????????implAccept(attachment,?(CompletionHandler<AsynchronousSocketChannel,?Object>)?handler);
????}
????/**
?????*?Task?to?initiate?accept?operation?and?to?handle?result.
?????*/
????private?class?AcceptTask?implements?Runnable,?Iocp.ResultHandler?{
????????private?final?WindowsAsynchronousSocketChannelImpl?channel;
????????private?final?AccessControlContext?acc;
????????private?final?PendingFuture<AsynchronousSocketChannel,?Object>?result;
????????private?final?long?dataBuffer;
????????AcceptTask(WindowsAsynchronousSocketChannelImpl?channel,?AccessControlContext?acc,
????????????long?dataBuffer,?PendingFuture<AsynchronousSocketChannel,?Object>?result)?{
????????????this.channel?=?channel;
????????????this.acc?=?acc;
????????????this.result?=?result;
????????????this.dataBuffer?=?dataBuffer;
????????}
????????void?enableAccept()?{
????????????pendingAccept.decrementAndGet();
????????????dataBuffers.add(dataBuffer);
????????}
????????void?closeChildChannel()?{
????????????try?{
????????????????channel.close();
????????????}?catch?(IOException?ignore)?{
????????????}
????????}
????????//?caller?must?have?acquired?read?lock?for?the?listener?and?child?channel.
????????void?finishAccept()?throws?IOException?{
????????????/**
?????????????*?JDK7?use?4?calls?to?getsockname??to?setup
?????????????*?local&?remote?address,?this?is?very?inefficient.
?????????????*?
?????????????*?I?change?this?to?use?GetAcceptExSockaddrs
?????????????*/
????????????InetAddress[]?socks?=?new?InetAddress[2];
????????????int[]?ports?=?new?int[2];
????????????updateAcceptContext(handle,?channel.handle(),?socks,?ports,?dataBuffer);
????????????InetSocketAddress?local?=?new?InetSocketAddress(socks[0],?ports[0]);
????????????final?InetSocketAddress?remote?=?new?InetSocketAddress(socks[1],?ports[1]);
????????????channel.setConnected(local,?remote);
????????????//?permission?check?(in?context?of?initiating?thread)
????????????if?(acc?!=?null)?{
????????????????AccessController.doPrivileged(new?PrivilegedAction<Void>()?{
????????????????????public?Void?run()?{
????????????????????????SecurityManager?sm?=?System.getSecurityManager();
????????????????????????sm.checkAccept(remote.getAddress().getHostAddress(),?remote.getPort());
????????????????????????return?null;
????????????????????}
????????????????},?acc);
????????????}
????????}
????????/**
?????????*?Initiates?the?accept?operation.
?????????*/
????????@Override
????????public?void?run()?{
????????????long?overlapped?=?0L;
????????????try?{
????????????????//?begin?usage?of?listener?socket
????????????????schannel.begin();
????????????????try?{
????????????????????//?begin?usage?of?child?socket?(as?it?is?registered?with
????????????????????//?completion?port?and?so?may?be?closed?in?the?event?that
????????????????????//?the?group?is?forcefully?closed).
????????????????????channel.begin();
????????????????????synchronized?(result)?{
????????????????????????overlapped?=?ioCache.add(result);
??????????????????????
????????????????????????int?n?=?accept0(handle,?channel.handle(),?overlapped,?dataBuffer);//Be?careful?for?the?buffer?address
????????????????????????if?(n?==?IOStatus.UNAVAILABLE)?{
????????????????????????????return;
????????????????????????}
????????????????????????//?connection?accepted?immediately
????????????????????????finishAccept();
????????????????????????//?allow?another?accept?before?the?result?is?set
????????????????????????enableAccept();
????????????????????????result.setResult(channel);
????????????????????}
????????????????}?finally?{
????????????????????//?end?usage?on?child?socket
????????????????????channel.end();
????????????????}
????????????}?catch?(Throwable?x)?{
????????????????//?failed?to?initiate?accept?so?release?resources
????????????????if?(overlapped?!=?0L)
????????????????????ioCache.remove(overlapped);
????????????????closeChildChannel();
????????????????if?(x?instanceof?ClosedChannelException)
????????????????????x?=?new?AsynchronousCloseException();
????????????????if?(!(x?instanceof?IOException)?&&?!(x?instanceof?SecurityException))
????????????????????x?=?new?IOException(x);
????????????????enableAccept();
????????????????result.setFailure(x);
????????????}?finally?{
????????????????//?end?of?usage?of?listener?socket
????????????????schannel.end();
????????????}
????????????//?accept?completed?immediately?but?may?not?have?executed?on
????????????//?initiating?thread?in?which?case?the?operation?may?have?been
????????????//?cancelled.
????????????if?(result.isCancelled())?{
????????????????closeChildChannel();
????????????}
????????????//?invoke?completion?handler
????????????Invoker.invokeIndirectly(result);
????????}
????????/**
?????????*?Executed?when?the?I/O?has?completed
?????????*/
????????@Override
????????public?void?completed(int?bytesTransferred,?boolean?canInvokeDirect)?{
????????????try?{
????????????????//?connection?accept?after?group?has?shutdown
????????????????if?(iocp.isShutdown())?{
????????????????????throw?new?IOException(new?ShutdownChannelGroupException());
????????????????}
????????????????//?finish?the?accept
????????????????try?{
????????????????????schannel.begin();
????????????????????try?{
????????????????????????channel.begin();
????????????????????????finishAccept();
????????????????????}?finally?{
????????????????????????channel.end();
????????????????????}
????????????????}?finally?{
????????????????????schannel.end();
????????????????}
????????????????//?allow?another?accept?before?the?result?is?set
????????????????enableAccept();
????????????????result.setResult(channel);
????????????}?catch?(Throwable?x)?{
????????????????enableAccept();
????????????????closeChildChannel();
????????????????if?(x?instanceof?ClosedChannelException)
????????????????????x?=?new?AsynchronousCloseException();
????????????????if?(!(x?instanceof?IOException)?&&?!(x?instanceof?SecurityException))
????????????????????x?=?new?IOException(x);
????????????????result.setFailure(x);
????????????}
????????????//?if?an?async?cancel?has?already?cancelled?the?operation?then
????????????//?close?the?new?channel?so?as?to?free?resources
????????????if?(result.isCancelled())?{
????????????????closeChildChannel();
????????????}
????????????//?invoke?handler?(but?not?directly)
????????????Invoker.invokeIndirectly(result);
????????}
????????@Override
????????public?void?failed(int?error,?IOException?x)?{
????????????enableAccept();
????????????closeChildChannel();
????????????//?release?waiters
????????????if?(schannel.isOpen())?{
????????????????result.setFailure(x);
????????????}?else?{
????????????????result.setFailure(new?AsynchronousCloseException());
????????????}
????????????Invoker.invokeIndirectly(result);
????????}
????}
????Future<AsynchronousSocketChannel>?implAccept(Object?attachment,
????????final?CompletionHandler<AsynchronousSocketChannel,?Object>?handler)?{
????????if?(!schannel.isOpen())?{
????????????Throwable?exc?=?new?ClosedChannelException();
????????????if?(handler?==?null)
????????????????return?CompletedFuture.withFailure(exc);
????????????Invoker.invokeIndirectly(schannel,?handler,?attachment,?null,?exc);
????????????return?null;
????????}
????????if?(schannel.isAcceptKilled())
????????????throw?new?RuntimeException("Accept?not?allowed?due?to?cancellation");
????????//?ensure?channel?is?bound?to?local?address
????????if?(schannel.localAddress?==?null)
????????????throw?new?NotYetBoundException();
????????//?create?the?socket?that?will?be?accepted.?The?creation?of?the?socket
????????//?is?enclosed?by?a?begin/end?for?the?listener?socket?to?ensure?that
????????//?we?check?that?the?listener?is?open?and?also?to?prevent?the?I/O
????????//?port?from?being?closed?as?the?new?socket?is?registered.
????????WindowsAsynchronousSocketChannelImpl?ch?=?null;
????????IOException?ioe?=?null;
????????try?{
????????????schannel.begin();
????????????ch?=?new?WindowsAsynchronousSocketChannelImpl(iocp,?false);
????????}?catch?(IOException?x)?{
????????????ioe?=?x;
????????}?finally?{
????????????schannel.end();
????????}
????????if?(ioe?!=?null)?{
????????????if?(handler?==?null)
????????????????return?CompletedFuture.withFailure(ioe);
????????????Invoker.invokeIndirectly(this.schannel,?handler,?attachment,?null,?ioe);
????????????return?null;
????????}
????????//?need?calling?context?when?there?is?security?manager?as
????????//?permission?check?may?be?done?in?a?different?thread?without
????????//?any?application?call?frames?on?the?stack
????????AccessControlContext?acc?=
????????????(System.getSecurityManager()?==?null)???null?:?AccessController.getContext();
????????PendingFuture<AsynchronousSocketChannel,?Object>?result?=
????????????new?PendingFuture<AsynchronousSocketChannel,?Object>(schannel,?handler,?attachment);
????????//?check?and?set?flag?to?prevent?concurrent?accepting
????????if?(pendingAccept.get()?>=?maxPending)
????????????throw?new?AcceptPendingException();
????????pendingAccept.incrementAndGet();
????????AcceptTask?task?=?new?AcceptTask(ch,?acc,?dataBuffers.poll(),?result);
????????result.setContext(task);
????????//?initiate?I/O
????????if?(Iocp.supportsThreadAgnosticIo())?{
????????????task.run();
????????}?else?{
????????????Invoker.invokeOnThreadInThreadPool(this.schannel,?task);
????????}
????????return?result;
????}
????//????//reimplements?for?performance
????static?native?void?updateAcceptContext(long?listenSocket,?long?acceptSocket,
????????InetAddress[]?addresses,?int[]?ports,?long?dataBuffer)?throws?IOException;
????static?native?int?accept0(long?handle,?long?handle2,?long?overlapped,?long?dataBuffer);
}
對應的CPP代碼如下:
/*
?*?Class:?????sun_nio_ch_WindowsMultiAcceptSupport
?*?Method:????updateAcceptContext
?*?Signature:?(JJ[Ljava/net/InetAddress;[IJ)V
?*/
JNIEXPORT?void?JNICALL?Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext
(JNIEnv?*env?,?jclass?clazz,?jlong?listenSocket,?jlong?acceptSocket,?jobjectArray?sockArray,jintArray?portArray,jlong?buf)
{
????SOCKET?s1?=?(SOCKET)jlong_to_ptr(listenSocket);
????SOCKET?s2?=?(SOCKET)jlong_to_ptr(acceptSocket);
????PVOID?outputBuffer?=?(PVOID)jlong_to_ptr(buf);
????INT?iLocalAddrLen=0;
????INT?iRemoteAddrLen=0;
????SOCKETADDRESS*?lpLocalAddr;
????SOCKETADDRESS*?lpRemoteAddr;
????jobject?localAddr;
????jobject?remoteAddr;
????jint?ports[2]={0};
????
????setsockopt(s2,?SOL_SOCKET,?SO_UPDATE_ACCEPT_CONTEXT,?(char?*)&s1,?sizeof(s1));
????(lpGetAcceptExSockaddrs)(outputBuffer,
????????0,
????????sizeof(SOCKETADDRESS)+16,
????????sizeof(SOCKETADDRESS)+16,
????????(LPSOCKADDR*)&lpLocalAddr,
????????&iLocalAddrLen,
????????(LPSOCKADDR*)&lpRemoteAddr,
????????&iRemoteAddrLen);
????localAddr=lpNET_SockaddrToInetAddress(env,(struct?sockaddr?*)lpLocalAddr,(int?*)ports);
????remoteAddr=lpNET_SockaddrToInetAddress(env,(struct?sockaddr?*)lpRemoteAddr,(int?*)(ports+1));
????env->SetObjectArrayElement(sockArray,0,localAddr);
????env->SetObjectArrayElement(sockArray,1,remoteAddr);
????env->SetIntArrayRegion(portArray,0,2,ports);
}
/*
?*?Class:?????sun_nio_ch_WindowsMultiAcceptSupport
?*?Method:????accept0
?*?Signature:?(JJJJ)I
?*/
jint?JNICALL?Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0
??(JNIEnv?*env,?jclass?clazz,?jlong?listenSocket,?jlong?acceptSocket,?jlong?ov,?jlong?buf)
{
????BOOL?res;
????SOCKET?s1?=?(SOCKET)jlong_to_ptr(listenSocket);
????SOCKET?s2?=?(SOCKET)jlong_to_ptr(acceptSocket);
????PVOID?outputBuffer?=?(PVOID)jlong_to_ptr(buf);
????DWORD?nread?=?0;
????OVERLAPPED*?lpOverlapped?=?(OVERLAPPED*)jlong_to_ptr(ov);
????ZeroMemory((PVOID)lpOverlapped,?sizeof(OVERLAPPED));
????
????//why?use?SOCKETADDRESS?
????//because?client?may?use?IPv6?to?connect?to?server.
????res?=?(lpAcceptEx)(s1,
????????s2,
????????outputBuffer,
????????0,
????????sizeof(SOCKETADDRESS)+16,
????????sizeof(SOCKETADDRESS)+16,
????????&nread,
????????lpOverlapped);
????
????if?(res?==?0)?{
????????int?error?=?WSAGetLastError();
????????
????????if?(error?==?ERROR_IO_PENDING)?{
????????????
????????????return?NIO2_IOS_UNAVAILABLE;
????????}
????
????
????????return?NIO2_THROWN;
????}
????
????return?0;
}
這里用到的lpNET_SockaddrToInetAddress是JDK7中NET.DLL暴露的方法,從DLL里加載。相應代碼如下:
?*?Class:?????com_yovn_jabhttpd_utilities_SunPackageFixer
?*?Method:????initFds
?*?Signature:?()V
?*/
JNIEXPORT?void?JNICALL?Java_com_yovn_jabhttpd_utilities_SunPackageFixer_initFds
??(JNIEnv?*env,?jclass?clazz)
{
????GUID?GuidAcceptEx?=?WSAID_ACCEPTEX;
????GUID?GuidTransmitFile?=?WSAID_TRANSMITFILE;
????GUID?GuidGetAcceptExSockAddrs?=?WSAID_GETACCEPTEXSOCKADDRS;
????SOCKET?s;
????int?rv;
????DWORD?dwBytes;
????HMODULE?hModule;
????s?=?socket(AF_INET,?SOCK_STREAM,?0);
????if?(s?==?INVALID_SOCKET)?{
????????JNU_ThrowByName(env,"java/io/IOException",?"socket?failed");
????????return;
????}
????rv?=?WSAIoctl(s,
????????SIO_GET_EXTENSION_FUNCTION_POINTER,
????????(LPVOID)&GuidAcceptEx,
????????sizeof(GuidAcceptEx),
????????&lpAcceptEx,
????????sizeof(lpAcceptEx),
????????&dwBytes,
????????NULL,
????????NULL);
????if?(rv?!=?0)
????{
????????JNU_ThrowByName(env,?"java/io/IOException","WSAIoctl?failed?on?get?AcceptEx?");
????????goto?_ret;
????}
????rv?=?WSAIoctl(s,
????????SIO_GET_EXTENSION_FUNCTION_POINTER,
????????(LPVOID)&GuidTransmitFile,
????????sizeof(GuidTransmitFile),
????????&lpTransmitFile,
????????sizeof(lpTransmitFile),
????????&dwBytes,
????????NULL,
????????NULL);
????if?(rv?!=?0)
????{
????????JNU_ThrowByName(env,?"java/io/IOException","WSAIoctl?failed?on?get?TransmitFile");
????????goto?_ret;
????}
????rv?=?WSAIoctl(s,
????????SIO_GET_EXTENSION_FUNCTION_POINTER,
????????(LPVOID)&GuidGetAcceptExSockAddrs,
????????sizeof(GuidGetAcceptExSockAddrs),
????????&lpGetAcceptExSockaddrs,
????????sizeof(lpGetAcceptExSockaddrs),
????????&dwBytes,
????????NULL,
????????NULL);
????if?(rv?!=?0)
????{
????????JNU_ThrowByName(env,?"java/io/IOException","WSAIoctl?failed?on?get?GetAcceptExSockaddrs");
????????goto?_ret;
????}
????hModule=LoadLibrary("net.dll");
????if(hModule==NULL)
????{
????????JNU_ThrowByName(env,?"java/io/IOException","can't?load?java?net.dll");
????????goto?_ret;
????}
????lpNET_SockaddrToInetAddress=(NET_SockaddrToInetAddress_t)GetProcAddress(hModule,"_NET_SockaddrToInetAddress@12");
????if(lpNET_SockaddrToInetAddress==NULL)
????{
????????JNU_ThrowByName(env,?"java/io/IOException","can't?resolve?_NET_SockaddrToInetAddress?function?");
????????
????????
????}
_ret:
????closesocket(s);
????return;
}
細心的同學可能會發現,在創建socket之前沒有初始化WinSock庫,因為在這段代碼前,我初始化了一個InetSocketAddress對象,這樣JVM會加載NET.DLL并初始化WinSock庫了。
OK,現在,你可以在支持類上同時發起多個AcceptEx請求了。
PS:基于這個我簡單測試了下我的服務器,同時開5000個線程,每個下載3M多點的文件,一分鐘內能夠全部正確完成。
服務器正在開發中,有興趣的請加入:http://code.google.com/p/jabhttpd
轉載于:https://www.cnblogs.com/HJIN/p/3919071.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的基于JDK7 NIO2的高性能web服务器实践之二(转)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Log4j配置文件解读和模板页收藏
- 下一篇: java多线程之线程的同步与锁定(转)