javaweb网关_Java网关服务-AIO(三)
Java網(wǎng)關(guān)服務(wù)-AIO(三)
概述
前兩節(jié)中,我們已經(jīng)獲取了body的總長度,剩下的就是讀出body,處理請(qǐng)求
ChannelServerHandler
ChannelServerHandler即從channel中讀取請(qǐng)求,也向channle輸出結(jié)果,因此它實(shí)現(xiàn)了InboundHandler, OutboundHandler
/**
* 讀取請(qǐng)求的內(nèi)容,業(yè)務(wù)處理
*/
public class ChannelServerHandler implements CompletionHandler, InboundHandler, OutboundHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(ChannelServerHandler.class);
private AsynchronousSocketChannel channel;
public ChannelServerHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
public void completed(Integer result, ByteBuffer attachment) {
//如果條件成立,說明客戶端主動(dòng)終止了TCP套接字,這時(shí)服務(wù)端終止就可以了
if (result == -1) {
System.out.println("remote is close");
closeChannel();
return;
}
Object resultData;
String req = (String) read(channel, attachment);
if (req == null) {
closeChannel();
return;
}
try {
LOGGER.info("socket:{}", channel.getRemoteAddress());
//同步處理請(qǐng)求
RequestHandler requestHandler = ApplicationUtils.getBean(RequestHandler.class);
resultData = requestHandler.execute(req);
} catch (Throwable t) {
resultData = Result.error("ERROR", Utils.error(t));
LOGGER.error("調(diào)用接口失敗", t);
}
if (resultData == null) {
resultData = Result.failure("FAILURE", "調(diào)用失敗,數(shù)據(jù)為空.");
}
try {
String resultContent = resultData instanceOf String ? (String) resultData : JSON.toJSONString(resultData);
byte[] bytes = resultContent.getBytes("UTF-8");
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
write(channel, writeBuffer);
} catch (Exception e) {
LOGGER.error("對(duì)象轉(zhuǎn)JSON失敗,對(duì)象:{}", resultData, e);
}
closeChannel();
}
@Override
public Object read(AsynchronousSocketChannel socketChannel, ByteBuffer in) {
in.flip();
byte[] body = new byte[in.remaining()];
in.get(body);
String req = null;
try {
req = new String(body, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return req;
}
@Override
public Object write(AsynchronousSocketChannel socketChannel, ByteBuffer out) {
//write,write操作結(jié)束后關(guān)閉通道
channel.write(out, out, new CompletionHandler() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
closeChannel();
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel();
}
});
return null;
}
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel();
}
private void closeChannel() {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
讀取body
in.flip();
byte[] body = new byte[in.remaining()];
in.get(body);
String req = null;
try {
req = new String(body, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return req;
in.remaining()
buffer中含有的字節(jié)數(shù)
客戶端、服務(wù)端由于跨語言和經(jīng)驗(yàn)問題,沒有使用復(fù)雜的跨語言序列化技術(shù),雙方約定使用UTF-8編碼,通過將body轉(zhuǎn)換為String,最終獲得了客戶端傳遞的字符串。
處理請(qǐng)求
經(jīng)過自定義的請(qǐng)求處理邏輯,同步處理,最終將響應(yīng)編碼后,發(fā)送給客戶端,write操作結(jié)束后,關(guān)閉連接
總結(jié)
使用AIO開發(fā)服務(wù)端時(shí),主要涉及
配置I/O事件完成的回調(diào)線程池
從accept -> read 到 向client端響應(yīng) write -> close,盡量使用CompletionHanlder來異步處理,不要在處理某個(gè)事件完成的線程中,同步的調(diào)用,如future.get()
如果是短連接,則需在write操作時(shí)注冊(cè)write結(jié)束后的handler,在handler中關(guān)閉連接
擴(kuò)展
長連接該如何處理
長連接意味著client可以發(fā)多次請(qǐng)求,由于多次請(qǐng)求被server執(zhí)行的順序是不可控的,可能后發(fā)的請(qǐng)求先響應(yīng),因此需要在請(qǐng)求和響應(yīng)時(shí),加requestId,據(jù)此對(duì)應(yīng)到請(qǐng)求的結(jié)果
長連接不需要在write后關(guān)閉連接
長連接需要開發(fā)定時(shí)的ping-pong心跳消息
長連接在響應(yīng)時(shí)比現(xiàn)在更復(fù)雜,也需要一個(gè)和請(qǐng)求類似或相同的協(xié)議來標(biāo)識(shí)body長度
測試
測試用例
/**
* mvn -Dtest=com.jd.jshop.web.sdk.test.ClientTest#pingReqSocket test
*
* @throws IOException
*/
@Test
@PerfTest(invocations = 20000, threads = 50)
public void pingReqSocket() throws IOException {
byte[] content = "ping".getBytes("UTF-8");
String result = sendReq(content);
//斷言 是否和預(yù)期一致
Assert.assertEquals("pong", result);
}
private String sendReq(byte[] content) throws IOException {
ByteBuffer writeBuffer = ByteBuffer.allocate(4 + content.length);
writeBuffer.putInt(content.length);
writeBuffer.put(content);
writeBuffer.flip();
Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1", 9801));
socket.getOutputStream().write(writeBuffer.array());
socket.getOutputStream().flush();
byte[] buf = new byte[1024];
int len = 0;
String result = null;
while ((len = socket.getInputStream().read(buf)) != -1) {
result = new String(buf, 0, len);
System.out.println(result);
}
return result;
}
測試的方法是,在服務(wù)器上建立socket連接,向server發(fā)送ping,server返回pong
測試服務(wù)器:centos, 2個(gè)物理核,4個(gè)邏輯核,內(nèi)存16G
分析aio的實(shí)現(xiàn):
在ping-pong測試中性能極高,優(yōu)于并甩開netty
以下是使用Netty開發(fā)的server端的測試用例,可以和上面的圖片對(duì)比一下
Measured invocations:10,000
Thread Count:10
Measured
(system)Required
Execution time:1,646 ms
Throughput:6,075 / s
Min. latency:0 ms
Average latency:1 ms
Median:2 ms
90%:2 ms
Max latency:26 ms
============================
Started at:Oct 16, 2018 5:27:03 PM
Measured invocations:20,000
Thread Count:20
Measured
(system)Required
Execution time:3,293 ms
Throughput:6,073 / s
Min. latency:0 ms
Average latency:3 ms
Median:3 ms
90%:5 ms
Max latency:54 ms
============================
Started at:Oct 16, 2018 5:28:24 PM
Measured invocations:20,000
Thread Count:10
Measured
(system)Required
Execution time:3,051 ms
Throughput:6,555 / s
Min. latency:0 ms
Average latency:1 ms
Median:1 ms
90%:2 ms
Max latency:44 ms
============================
Started at:Oct 16, 2018 5:30:06 PM
Measured invocations:20,000
Thread Count:50
Measured
(system)Required
Execution time:3,167 ms
Throughput:6,315 / s
Min. latency:0 ms
Average latency:7 ms
Median:7 ms
90%:10 ms
Max latency:64 ms
分析基于Netty的實(shí)現(xiàn):
吞吐量:6000+/s
10個(gè)線程時(shí):90%低于2ms,平均1ms
20個(gè)線程時(shí):90%低于5ms,平均3ms
50個(gè)線程時(shí):90%低于10ms,平均7ms
線程越多,性能越低
當(dāng)前測試用例不太依賴內(nèi)存
執(zhí)行10000+次請(qǐng)求,建立10000+連接,要求服務(wù)器對(duì)單個(gè)進(jìn)程fd限制打開,防止報(bào)too many open files導(dǎo)致測試用例執(zhí)行失敗
ulimit -n 20240
總結(jié)
以上是生活随笔為你收集整理的javaweb网关_Java网关服务-AIO(三)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ipython怎么安装numpy_在Te
- 下一篇: java画笔覆盖在界面_Java学习笔记