java nio2 iocp_基于JDK7 NIO2的高性能web服务器实践之二(转)
前一篇博客,我簡單提了下怎么為NIO2增加TransmitFile支持,文件傳送吞吐量是一個性能關注點,此外,并發連接數也是重要的關注點。
不過JDK7中又一次做了簡單的實現,不支持同時投遞多個AcceptEx請求,只支持一次一個,返回后再投遞。這樣,客戶端連接的接受速度必然大打折扣。不知道為什么sun會做這樣的實現,WSASend()/WSAReceive()一次只允許一個還是可以理解,畢竟簡化了編程,不用考慮封包亂序問題。
也降低了內存耗盡的風險。AcceptEx卻沒有這樣的理由了。
于是再一次為了性能,我增加了同時投遞多個的支持。
另外,在JDK7的默認實現中,AcceptEx返回后,為了設置遠程和本地InetSocketAddress也采用了效率很低的方法。4次通過JNI調用getsockname,2次為了取sockaddr,2次為了取port. 這些操作本人采用GetAcceptExSockaddrs一次完成,進一步提高效率。
先看Java部分的代碼,框架跟JDK7的一樣,細節處理不一樣:
/**
*
*/
package?sun.nio.ch;
import?java.io.IOException;
import?java.lang.reflect.Field;
import?java.lang.reflect.Method;
import?java.net.InetAddress;
import?java.net.InetSocketAddress;
import?java.nio.channels.AcceptPendingException;
import?java.nio.channels.AsynchronousCloseException;
import?java.nio.channels.AsynchronousServerSocketChannel;
import?java.nio.channels.AsynchronousSocketChannel;
import?java.nio.channels.ClosedChannelException;
import?java.nio.channels.CompletionHandler;
import?java.nio.channels.NotYetBoundException;
import?java.nio.channels.ShutdownChannelGroupException;
import?java.security.AccessControlContext;
import?java.security.AccessController;
import?java.security.PrivilegedAction;
import?java.util.Queue;
import?java.util.concurrent.ConcurrentLinkedQueue;
import?java.util.concurrent.Future;
import?java.util.concurrent.atomic.AtomicBoolean;
import?java.util.concurrent.atomic.AtomicInteger;
import?sun.misc.Unsafe;
/**
*?This?class?enable?multiple?'AcceptEx'?post?on?the?completion?port,?hence?improve?the?concurrent?connection?number.
*?@author?Yvon
*
*/
public?class?WindowsMultiAcceptSupport?{
WindowsAsynchronousServerSocketChannelImpl?schannel;
private?static?final?Unsafe?unsafe?=?Unsafe.getUnsafe();
//?2?*?(sizeof(SOCKET_ADDRESS)?+?16)
private?static?final?int?ONE_DATA_BUFFER_SIZE?=?88;
private?long?handle;
private?Iocp?iocp;
//?typically?there?will?be?zero,?or?one?I/O?operations?pending.?In?rare
//?cases?there?may?be?more.?These?rare?cases?arise?when?a?sequence?of?accept
//?operations?complete?immediately?and?handled?by?the?initiating?thread.
//?The?corresponding?OVERLAPPED?cannot?be?reused/released?until?the?completion
//?event?has?been?posted.
private?PendingIoCache?ioCache;
private?Queue?dataBuffers;
//?the?data?buffer?to?receive?the?local/remote?socket?address
//????????private?final?long?dataBuffer;
private?AtomicInteger?pendingAccept;
private?int?maxPending;
Method?updateAcceptContextM;
Method?acceptM;
WindowsMultiAcceptSupport()?{
//dummy?for?JNI?code.
}
public?void?close()?throws?IOException?{
schannel.close();
for?(int?i?=?0;?i?
{
long?addr?=?dataBuffers.poll();
//?release??resources
unsafe.freeMemory(addr);
}
}
/**
*
*/
public?WindowsMultiAcceptSupport(AsynchronousServerSocketChannel?ch,?int?maxPost)?{
if?(maxPost?<=?0?||?maxPost?>?1024)
throw?new?IllegalStateException("maxPost?can't?less?than?1?and?greater?than?1024");
this.schannel?=?(WindowsAsynchronousServerSocketChannelImpl)?ch;
maxPending?=?maxPost;
dataBuffers?=?new?ConcurrentLinkedQueue();
for?(int?i?=?0;?i?
dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));
}
pendingAccept?=?new?AtomicInteger(0);
try?{
Field?f?=?WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");
f.setAccessible(true);
handle?=?f.getLong(schannel);
f?=?WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");
f.setAccessible(true);
iocp?=?(Iocp)?f.get(schannel);
f?=?WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");
f.setAccessible(true);
ioCache?=?(PendingIoCache)?f.get(schannel);
f?=?WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");
f.setAccessible(true);
AtomicBoolean?accepting?=?(AtomicBoolean)?f.get(schannel);
accepting.set(true);//disable?accepting?by?origin?channel.
}?catch?(Exception?e)?{
e.printStackTrace();
}
}
@SuppressWarnings("unchecked")
public?final??void?accept(A?attachment,
CompletionHandler?handler)?{
if?(handler?==?null)
throw?new?NullPointerException("'handler'?is?null");
implAccept(attachment,?(CompletionHandler)?handler);
}
/**
*?Task?to?initiate?accept?operation?and?to?handle?result.
*/
private?class?AcceptTask?implements?Runnable,?Iocp.ResultHandler?{
private?final?WindowsAsynchronousSocketChannelImpl?channel;
private?final?AccessControlContext?acc;
private?final?PendingFuture?result;
private?final?long?dataBuffer;
AcceptTask(WindowsAsynchronousSocketChannelImpl?channel,?AccessControlContext?acc,
long?dataBuffer,?PendingFuture?result)?{
this.channel?=?channel;
this.acc?=?acc;
this.result?=?result;
this.dataBuffer?=?dataBuffer;
}
void?enableAccept()?{
pendingAccept.decrementAndGet();
dataBuffers.add(dataBuffer);
}
void?closeChildChannel()?{
try?{
channel.close();
}?catch?(IOException?ignore)?{
}
}
//?caller?must?have?acquired?read?lock?for?the?listener?and?child?channel.
void?finishAccept()?throws?IOException?{
/**
*?JDK7?use?4?calls?to?getsockname??to?setup
*?local&?remote?address,?this?is?very?inefficient.
*
*?I?change?this?to?use?GetAcceptExSockaddrs
*/
InetAddress[]?socks?=?new?InetAddress[2];
int[]?ports?=?new?int[2];
updateAcceptContext(handle,?channel.handle(),?socks,?ports,?dataBuffer);
InetSocketAddress?local?=?new?InetSocketAddress(socks[0],?ports[0]);
final?InetSocketAddress?remote?=?new?InetSocketAddress(socks[1],?ports[1]);
channel.setConnected(local,?remote);
//?permission?check?(in?context?of?initiating?thread)
if?(acc?!=?null)?{
AccessController.doPrivileged(new?PrivilegedAction()?{
public?Void?run()?{
SecurityManager?sm?=?System.getSecurityManager();
sm.checkAccept(remote.getAddress().getHostAddress(),?remote.getPort());
return?null;
}
},?acc);
}
}
/**
*?Initiates?the?accept?operation.
*/
@Override
public?void?run()?{
long?overlapped?=?0L;
try?{
//?begin?usage?of?listener?socket
schannel.begin();
try?{
//?begin?usage?of?child?socket?(as?it?is?registered?with
//?completion?port?and?so?may?be?closed?in?the?event?that
//?the?group?is?forcefully?closed).
channel.begin();
synchronized?(result)?{
overlapped?=?ioCache.add(result);
int?n?=?accept0(handle,?channel.handle(),?overlapped,?dataBuffer);//Be?careful?for?the?buffer?address
if?(n?==?IOStatus.UNAVAILABLE)?{
return;
}
//?connection?accepted?immediately
finishAccept();
//?allow?another?accept?before?the?result?is?set
enableAccept();
result.setResult(channel);
}
}?finally?{
//?end?usage?on?child?socket
channel.end();
}
}?catch?(Throwable?x)?{
//?failed?to?initiate?accept?so?release?resources
if?(overlapped?!=?0L)
ioCache.remove(overlapped);
closeChildChannel();
if?(x?instanceof?ClosedChannelException)
x?=?new?AsynchronousCloseException();
if?(!(x?instanceof?IOException)?&&?!(x?instanceof?SecurityException))
x?=?new?IOException(x);
enableAccept();
result.setFailure(x);
}?finally?{
//?end?of?usage?of?listener?socket
schannel.end();
}
//?accept?completed?immediately?but?may?not?have?executed?on
//?initiating?thread?in?which?case?the?operation?may?have?been
//?cancelled.
if?(result.isCancelled())?{
closeChildChannel();
}
//?invoke?completion?handler
Invoker.invokeIndirectly(result);
}
/**
*?Executed?when?the?I/O?has?completed
*/
@Override
public?void?completed(int?bytesTransferred,?boolean?canInvokeDirect)?{
try?{
//?connection?accept?after?group?has?shutdown
if?(iocp.isShutdown())?{
throw?new?IOException(new?ShutdownChannelGroupException());
}
//?finish?the?accept
try?{
schannel.begin();
try?{
channel.begin();
finishAccept();
}?finally?{
channel.end();
}
}?finally?{
schannel.end();
}
//?allow?another?accept?before?the?result?is?set
enableAccept();
result.setResult(channel);
}?catch?(Throwable?x)?{
enableAccept();
closeChildChannel();
if?(x?instanceof?ClosedChannelException)
x?=?new?AsynchronousCloseException();
if?(!(x?instanceof?IOException)?&&?!(x?instanceof?SecurityException))
x?=?new?IOException(x);
result.setFailure(x);
}
//?if?an?async?cancel?has?already?cancelled?the?operation?then
//?close?the?new?channel?so?as?to?free?resources
if?(result.isCancelled())?{
closeChildChannel();
}
//?invoke?handler?(but?not?directly)
Invoker.invokeIndirectly(result);
}
@Override
public?void?failed(int?error,?IOException?x)?{
enableAccept();
closeChildChannel();
//?release?waiters
if?(schannel.isOpen())?{
result.setFailure(x);
}?else?{
result.setFailure(new?AsynchronousCloseException());
}
Invoker.invokeIndirectly(result);
}
}
Future?implAccept(Object?attachment,
final?CompletionHandler?handler)?{
if?(!schannel.isOpen())?{
Throwable?exc?=?new?ClosedChannelException();
if?(handler?==?null)
return?CompletedFuture.withFailure(exc);
Invoker.invokeIndirectly(schannel,?handler,?attachment,?null,?exc);
return?null;
}
if?(schannel.isAcceptKilled())
throw?new?RuntimeException("Accept?not?allowed?due?to?cancellation");
//?ensure?channel?is?bound?to?local?address
if?(schannel.localAddress?==?null)
throw?new?NotYetBoundException();
//?create?the?socket?that?will?be?accepted.?The?creation?of?the?socket
//?is?enclosed?by?a?begin/end?for?the?listener?socket?to?ensure?that
//?we?check?that?the?listener?is?open?and?also?to?prevent?the?I/O
//?port?from?being?closed?as?the?new?socket?is?registered.
WindowsAsynchronousSocketChannelImpl?ch?=?null;
IOException?ioe?=?null;
try?{
schannel.begin();
ch?=?new?WindowsAsynchronousSocketChannelImpl(iocp,?false);
}?catch?(IOException?x)?{
ioe?=?x;
}?finally?{
schannel.end();
}
if?(ioe?!=?null)?{
if?(handler?==?null)
return?CompletedFuture.withFailure(ioe);
Invoker.invokeIndirectly(this.schannel,?handler,?attachment,?null,?ioe);
return?null;
}
//?need?calling?context?when?there?is?security?manager?as
//?permission?check?may?be?done?in?a?different?thread?without
//?any?application?call?frames?on?the?stack
AccessControlContext?acc?=
(System.getSecurityManager()?==?null)???null?:?AccessController.getContext();
PendingFuture?result?=
new?PendingFuture(schannel,?handler,?attachment);
//?check?and?set?flag?to?prevent?concurrent?accepting
if?(pendingAccept.get()?>=?maxPending)
throw?new?AcceptPendingException();
pendingAccept.incrementAndGet();
AcceptTask?task?=?new?AcceptTask(ch,?acc,?dataBuffers.poll(),?result);
result.setContext(task);
//?initiate?I/O
if?(Iocp.supportsThreadAgnosticIo())?{
task.run();
}?else?{
Invoker.invokeOnThreadInThreadPool(this.schannel,?task);
}
return?result;
}
//????//reimplements?for?performance
static?native?void?updateAcceptContext(long?listenSocket,?long?acceptSocket,
InetAddress[]?addresses,?int[]?ports,?long?dataBuffer)?throws?IOException;
static?native?int?accept0(long?handle,?long?handle2,?long?overlapped,?long?dataBuffer);
}
對應的CPP代碼如下:
/*
*?Class:?????sun_nio_ch_WindowsMultiAcceptSupport
*?Method:????updateAcceptContext
*?Signature:?(JJ[Ljava/net/InetAddress;[IJ)V
*/
JNIEXPORT?void?JNICALL?Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext
(JNIEnv?*env?,?jclass?clazz,?jlong?listenSocket,?jlong?acceptSocket,?jobjectArray?sockArray,jintArray?portArray,jlong?buf)
{
SOCKET?s1?=?(SOCKET)jlong_to_ptr(listenSocket);
SOCKET?s2?=?(SOCKET)jlong_to_ptr(acceptSocket);
PVOID?outputBuffer?=?(PVOID)jlong_to_ptr(buf);
INT?iLocalAddrLen=0;
INT?iRemoteAddrLen=0;
SOCKETADDRESS*?lpLocalAddr;
SOCKETADDRESS*?lpRemoteAddr;
jobject?localAddr;
jobject?remoteAddr;
jint?ports[2]={0};
setsockopt(s2,?SOL_SOCKET,?SO_UPDATE_ACCEPT_CONTEXT,?(char?*)&s1,?sizeof(s1));
(lpGetAcceptExSockaddrs)(outputBuffer,
0,
sizeof(SOCKETADDRESS)+16,
sizeof(SOCKETADDRESS)+16,
(LPSOCKADDR*)&lpLocalAddr,
&iLocalAddrLen,
(LPSOCKADDR*)&lpRemoteAddr,
&iRemoteAddrLen);
localAddr=lpNET_SockaddrToInetAddress(env,(struct?sockaddr?*)lpLocalAddr,(int?*)ports);
remoteAddr=lpNET_SockaddrToInetAddress(env,(struct?sockaddr?*)lpRemoteAddr,(int?*)(ports+1));
env->SetObjectArrayElement(sockArray,0,localAddr);
env->SetObjectArrayElement(sockArray,1,remoteAddr);
env->SetIntArrayRegion(portArray,0,2,ports);
}
/*
*?Class:?????sun_nio_ch_WindowsMultiAcceptSupport
*?Method:????accept0
*?Signature:?(JJJJ)I
*/
jint?JNICALL?Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0
(JNIEnv?*env,?jclass?clazz,?jlong?listenSocket,?jlong?acceptSocket,?jlong?ov,?jlong?buf)
{
BOOL?res;
SOCKET?s1?=?(SOCKET)jlong_to_ptr(listenSocket);
SOCKET?s2?=?(SOCKET)jlong_to_ptr(acceptSocket);
PVOID?outputBuffer?=?(PVOID)jlong_to_ptr(buf);
DWORD?nread?=?0;
OVERLAPPED*?lpOverlapped?=?(OVERLAPPED*)jlong_to_ptr(ov);
ZeroMemory((PVOID)lpOverlapped,?sizeof(OVERLAPPED));
//why?use?SOCKETADDRESS?
//because?client?may?use?IPv6?to?connect?to?server.
res?=?(lpAcceptEx)(s1,
s2,
outputBuffer,
0,
sizeof(SOCKETADDRESS)+16,
sizeof(SOCKETADDRESS)+16,
&nread,
lpOverlapped);
if?(res?==?0)?{
int?error?=?WSAGetLastError();
if?(error?==?ERROR_IO_PENDING)?{
return?NIO2_IOS_UNAVAILABLE;
}
return?NIO2_THROWN;
}
return?0;
}
這里用到的lpNET_SockaddrToInetAddress是JDK7中NET.DLL暴露的方法,從DLL里加載。相應代碼如下:
*
*?Class:?????com_yovn_jabhttpd_utilities_SunPackageFixer
*?Method:????initFds
*?Signature:?()V
*/
JNIEXPORT?void?JNICALL?Java_com_yovn_jabhttpd_utilities_SunPackageFixer_initFds
(JNIEnv?*env,?jclass?clazz)
{
GUID?GuidAcceptEx?=?WSAID_ACCEPTEX;
GUID?GuidTransmitFile?=?WSAID_TRANSMITFILE;
GUID?GuidGetAcceptExSockAddrs?=?WSAID_GETACCEPTEXSOCKADDRS;
SOCKET?s;
int?rv;
DWORD?dwBytes;
HMODULE?hModule;
s?=?socket(AF_INET,?SOCK_STREAM,?0);
if?(s?==?INVALID_SOCKET)?{
JNU_ThrowByName(env,"java/io/IOException",?"socket?failed");
return;
}
rv?=?WSAIoctl(s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
(LPVOID)&GuidAcceptEx,
sizeof(GuidAcceptEx),
&lpAcceptEx,
sizeof(lpAcceptEx),
&dwBytes,
NULL,
NULL);
if?(rv?!=?0)
{
JNU_ThrowByName(env,?"java/io/IOException","WSAIoctl?failed?on?get?AcceptEx?");
goto?_ret;
}
rv?=?WSAIoctl(s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
(LPVOID)&GuidTransmitFile,
sizeof(GuidTransmitFile),
&lpTransmitFile,
sizeof(lpTransmitFile),
&dwBytes,
NULL,
NULL);
if?(rv?!=?0)
{
JNU_ThrowByName(env,?"java/io/IOException","WSAIoctl?failed?on?get?TransmitFile");
goto?_ret;
}
rv?=?WSAIoctl(s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
(LPVOID)&GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs),
&lpGetAcceptExSockaddrs,
sizeof(lpGetAcceptExSockaddrs),
&dwBytes,
NULL,
NULL);
if?(rv?!=?0)
{
JNU_ThrowByName(env,?"java/io/IOException","WSAIoctl?failed?on?get?GetAcceptExSockaddrs");
goto?_ret;
}
hModule=LoadLibrary("net.dll");
if(hModule==NULL)
{
JNU_ThrowByName(env,?"java/io/IOException","can't?load?java?net.dll");
goto?_ret;
}
lpNET_SockaddrToInetAddress=(NET_SockaddrToInetAddress_t)GetProcAddress(hModule,"_NET_SockaddrToInetAddress@12");
if(lpNET_SockaddrToInetAddress==NULL)
{
JNU_ThrowByName(env,?"java/io/IOException","can't?resolve?_NET_SockaddrToInetAddress?function?");
}
_ret:
closesocket(s);
return;
}
細心的同學可能會發現,在創建socket之前沒有初始化WinSock庫,因為在這段代碼前,我初始化了一個InetSocketAddress對象,這樣JVM會加載NET.DLL并初始化WinSock庫了。
OK,現在,你可以在支持類上同時發起多個AcceptEx請求了。
PS:基于這個我簡單測試了下我的服務器,同時開5000個線程,每個下載3M多點的文件,一分鐘內能夠全部正確完成。
服務器正在開發中,有興趣的請加入:http://code.google.com/p/jabhttpd
總結
以上是生活随笔為你收集整理的java nio2 iocp_基于JDK7 NIO2的高性能web服务器实践之二(转)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java发送get post请求_【工具
- 下一篇: button layui 点击事件_La