日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

NIO之ByteBuffer_NIO之网络IO_与ChannelNetty初窥门径

發(fā)布時(shí)間:2024/7/5 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 NIO之ByteBuffer_NIO之网络IO_与ChannelNetty初窥门径 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

NIO之ByteBuffer與Channel

傳統(tǒng)IO:byte[] <= inputStream <= 文件 => outputStream => byte[] NIO:文件 => inputChannel <=> buffer <=> outputChannel => 文件文件 <= inputChannel <=> outputChannel => 文件
  • 文件復(fù)制, 并測(cè)試ByteBuffer常用API
    position: 當(dāng)前指針位置; limit: 當(dāng)前內(nèi)容的最大位置(例如: buffer內(nèi)容為"hello", 容量為20, limit就是5); capacity: 最大容量
/*** 測(cè)試Buffer的position, limit, capacity, clear, flip* @author regotto*/ public class NioTest1 {/*** 下面的代碼中, clear與flip效果一樣.* 沒有clear,flip,position的位置將會(huì)一直等于limit, 根據(jù)各屬性之間的大小關(guān)系, position一定不會(huì)大于limit, * 所以在下面的read中, 讀取到的值將一直都是0(代表當(dāng)前讀取到的位置), read一直不會(huì)等于-1, 代碼出現(xiàn)死循環(huán)* @param args* @throws Exception*/public static void main(String[] args) throws Exception {FileChannel inputChannel = new FileInputStream("input.txt").getChannel();FileChannel outputChannel = new FileOutputStream("output.txt").getChannel();ByteBuffer buffer = ByteBuffer.allocate(4);while(true){//此處不進(jìn)行clear, 此時(shí)position還是處于limit的位置, read將一直保持當(dāng)前位置出現(xiàn)//死循環(huán)buffer.clear();int read = inputChannel.read(buffer);System.out.println("read: " + read);if (-1 == read){break;}//重置bufferbuffer.flip();outputChannel.write(buffer);}inputChannel.close();outputChannel.close();} }

clear, flip源碼如下:

public final Buffer clear() {position = 0;limit = capacity;mark = -1;return this;}public final Buffer flip() {limit = position;position = 0;mark = -1;return this;}
  • DirectByteBuffer
    ByteBuffer.allocate(1024) => HeapByteBuffer, 內(nèi)部使用的就是byte[], 底層源碼如下
public static ByteBuffer allocate(int capacity) {if (capacity < 0)throw new IllegalArgumentException();return new HeapByteBuffer(capacity, capacity);}//HeapByteBuffer extends ByteBuffer//ByteBuffer構(gòu)造函數(shù)如下:ByteBuffer(int mark, int pos, int lim, int cap, // package-privatebyte[] hb, int offset){super(mark, pos, lim, cap);this.hb = hb;this.offset = offset;}

HeapByteBuffer位于JVM堆空間, 當(dāng)使用HeapByteBuffer進(jìn)行內(nèi)容復(fù)制時(shí), 存在2個(gè)復(fù)制過程: 應(yīng)用程序 => 應(yīng)用程序緩沖區(qū) => 內(nèi)核緩沖區(qū) => 文件; 這種情況下, 2個(gè)復(fù)制過程存在一定的性能問題;

ByteBuffer.allocateDirect(1024) => DirectByteBuffer, DirectByteBuffer使用native方法創(chuàng)建數(shù)組, 數(shù)組不再位于JVM的Heap中, 而是位于內(nèi)核內(nèi)存中, 這樣就避免了一次數(shù)據(jù)拷貝(拷貝的原因在于JVM中數(shù)據(jù)的地址會(huì)改變, 在GC下): 應(yīng)用程序緩沖區(qū) -> 內(nèi)核緩沖區(qū); 所謂的零拷貝, 加快速度, C/C++開辟的數(shù)組空間都是位于內(nèi)核緩沖區(qū)
DirectByteBuffer底層源碼:

public static ByteBuffer allocateDirect(int capacity) {return new DirectByteBuffer(capacity);}//unsafe中底層內(nèi)存分配base = unsafe.allocateMemory(size);public native long allocateMemory(long var1);

使用DirectByteBuffer進(jìn)行文件復(fù)制:

/*** 測(cè)試DirectByteBuffer* @author regotto*/ public class NioTest2 {public static void main(String[] args) throws Exception {FileChannel inputChannel = new FileInputStream("input.txt").getChannel();FileChannel outputChannel = new FileOutputStream("output.txt").getChannel();ByteBuffer buffer = ByteBuffer.allocateDirect(4);while(true){buffer.clear();int read = inputChannel.read(buffer);System.out.println("read: " + read);if (-1 == read){break;}buffer.flip();outputChannel.write(buffer);// buffer.flip();}inputChannel.close();outputChannel.close();}/*** 進(jìn)行文件復(fù)制*/public void test() throws Exception {FileChannel fisChannel = new FileInputStream("text1.txt").getChannel();FileChannel fosChannel = new FileOutputStream("text2.txt").getChannel();//transferTo與transferFrom效果一樣fisChannel.transferTo(0, fisChannel.size(), fosChannel);fisChannel.close();fosChannel.close();} }
  • 使用堆外內(nèi)存進(jìn)行文件內(nèi)容復(fù)制(使用塊內(nèi)存提高性能)
/*** 測(cè)試MappedByteBuffer* 使用堆外內(nèi)存對(duì)文件內(nèi)容進(jìn)行修改* @author regotto*/ public class NioTest3 {public static void main(String[] args) throws Exception{//下面的0, 4代表從0號(hào)位開始,將4個(gè)大小的空間映射到堆外內(nèi)存MappedByteBuffer mappedByteBuffer = new RandomAccessFile("input.txt", "rw").getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 4);mappedByteBuffer.put(0, (byte) 'a');mappedByteBuffer.put(0, (byte) 'a');mappedByteBuffer.put(0, (byte) 'a');} }

NIO之網(wǎng)絡(luò)IO

使用NIO進(jìn)行網(wǎng)絡(luò)非阻塞式編程, NIO編程模型圖:

Selector: 檢測(cè)Channel是否存在事件發(fā)生 ServerSocketChannel: 服務(wù)器 Channel: 管道 Client: 客戶端

NIO網(wǎng)絡(luò)編程結(jié)構(gòu)圖:

selectionKey的4種狀態(tài):OP_ACCEPT: 網(wǎng)絡(luò)已連接 value = 16OP_CONNECT: 連接已建立 value = 8OP_READ OP_WRITE: 讀/寫操作, value = 1 或value = 4

根據(jù)上面結(jié)構(gòu)圖編寫簡(jiǎn)單案例代碼:
NioServer

/*** server*/ public class NioServer {public static void main(String[] args) throws IOException, InterruptedException {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(9999));Selector selector = Selector.open();SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {//設(shè)置selector監(jiān)控, 監(jiān)控Channel的連接情況, 此處除去第一次的ServerSocketChannel注冊(cè), 監(jiān)控時(shí)長(zhǎng)2sif (selector.select(2000) == 0) {System.out.println("------當(dāng)前沒有要處理的Channel, 我去處理其他事------");TimeUnit.SECONDS.sleep(1);continue;}//此時(shí)有Channel進(jìn)行連接, 獲取selectedKeys, 處理每一個(gè)Channel的對(duì)象事件Set<SelectionKey> selectionKeys = selector.selectedKeys();selectionKeys.forEach(key -> {if (key.isAcceptable()) {System.out.println("OP_ACCEPT");//說明此處的key對(duì)應(yīng)的是最開始前面register的ServerSocketChannelServerSocketChannel server = (ServerSocketChannel) key.channel();//此處對(duì)象的hash值相同System.out.println("(ServerSocketChannel) key.channel(): " + server.hashCode());System.out.println("ServerSocketChannel.open(): " + serverSocketChannel.hashCode());try {SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);//此處注冊(cè)的時(shí)候就附加一個(gè)緩沖區(qū), 可用于傳輸對(duì)象//當(dāng)執(zhí)行register的時(shí)候, 就會(huì)生成一個(gè)對(duì)應(yīng)的SelectionKey事件, 當(dāng)前的selectionKeys遍歷完成, 就會(huì)將該selectionKey添加到set集合中SelectionKey socketChannelReadRegister = socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));} catch (IOException e) {e.printStackTrace();}}if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = (ByteBuffer) key.attachment();try {socketChannel.read(byteBuffer);System.out.println("客戶端內(nèi)容: " + new String(byteBuffer.array(), StandardCharsets.UTF_8));} catch (IOException e) {e.printStackTrace();}}//這里使用remove與下面使用clear效果一樣, 都是清除當(dāng)前已經(jīng)執(zhí)行過的Channel, 避免重復(fù)執(zhí)行//當(dāng)前的Channel對(duì)應(yīng)的事件被處理過, 就不再被處理, 使用這種寫法較為恰當(dāng)selectionKeys.remove(key);}); // selectionKeys.clear();}} }

NioClient

/*** client*/ public class NioClient {public static void main(String[] args) throws IOException, InterruptedException {SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999))) {//連接服務(wù)端失敗, 使用finishConnect進(jìn)行連接, 此處finishConnect非阻塞while (!socketChannel.finishConnect()) {System.out.println("連接同時(shí), 干其他事情");TimeUnit.SECONDS.sleep(2);}}//return new HeapByteBuffer(capacity, capacity); // ByteBuffer.allocate(1024);//return new HeapByteBuffer(array, offset, length);ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes(StandardCharsets.UTF_8));socketChannel.write(buffer);//此處阻塞, 若關(guān)閉連接, server會(huì)拋出異常System.out.println("進(jìn)入睡眠");Thread.currentThread().join();} }

Netty初窺門徑

  • Netty模型:
BossGroup: 處理客戶端連接請(qǐng)求 WorkGroup: 處理網(wǎng)絡(luò)讀寫操作 二者使用NioEventLoopGroup不斷循環(huán)處理任務(wù)線程, NioEventLoopGroup內(nèi)部都有一個(gè)selector, 監(jiān)聽每個(gè)Channel連接情況NioEventLoopGroup內(nèi)部使用串行化設(shè)計(jì): 消息讀取->解碼->處理->編碼->發(fā)送
  • NioEventLoopGroup模型:
一個(gè)NioEventLoopGroup包含多個(gè)NioEventLoop 一個(gè)NioEventLoop包含一個(gè)Selector, 一個(gè)任務(wù)隊(duì)列 每個(gè)NioChannel都會(huì)綁定一個(gè)自己的ChannelPipeline
  • ChannelPipeline模型:
ChannelPipeline: Handler集合, 負(fù)責(zé)處理/攔截inbound, outbound操作 ChannelHandlerContext: 事件處理器上下文對(duì)象, 內(nèi)部包含每個(gè)具體的ChannelHandler, 也綁定對(duì)應(yīng)Channel, pipeline信息
  • 簡(jiǎn)單入門案例:
    NettyServer:
/*** 服務(wù)器端*/ public class NettyServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workGroup = new NioEventLoopGroup();//線程池中任務(wù)隊(duì)列數(shù): ChannelOption.SO_BACKLOG, 128//讓連接保持活動(dòng)狀態(tài): ChannelOption.SO_KEEPALIVEServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();channelFuture.channel().closeFuture().sync();bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}

NettyServerHandler:

public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println("客戶端發(fā)送的內(nèi)容: " + byteBuf.toString(CharsetUtil.UTF_8));}/*** 數(shù)據(jù)讀取完成* @param ctx Channel上下文對(duì)象* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("就是沒錢", CharsetUtil.UTF_8));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//出現(xiàn)異常, 直接關(guān)閉上下文對(duì)象ctx.close();} }

NettyClient

/*** 客戶端*/ public class NettyClient {public static void main(String[] args) throws InterruptedException {EventLoopGroup workGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap().group(workGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyClientHandler());}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();channelFuture.channel().closeFuture().sync();} }

NettyClientHandler:

public class NettyClientHandler extends ChannelInboundHandlerAdapter {/*** 通道準(zhǔn)備就緒, 當(dāng)前已經(jīng)連接* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("老板, 還錢吧".getBytes(CharsetUtil.UTF_8)));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println(byteBuf.toString(CharsetUtil.UTF_8));} }

總結(jié)

以上是生活随笔為你收集整理的NIO之ByteBuffer_NIO之网络IO_与ChannelNetty初窥门径的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。