中间件系列「三」netty之NIO基础
Java NIO系統的核心在于:通道(Channel)和緩沖區(Buffer)。通道表示打開到 IO 設備(例如:文件、套接字)的連接。若需要使用 NIO 系統,需要獲取用于連接 IO 設備的通道以及用于容納數據的緩沖區。然后操作緩沖區,對數據進行處理
簡而言之,通道負責傳輸,緩沖區負責存儲
常見的Channel有以下四種,其中FileChannel主要用于文件傳輸,其余三種用于網絡通信
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
Buffer有以下幾種,其中使用較多的是ByteBuffer
- ByteBuffer
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
1、Selector
在使用Selector之前,處理socket連接還有以下兩種方法
使用多線程技術
為每個連接分別開辟一個線程,分別去處理對應的socke連接
這種方法存在以下幾個問題
- 內存占用高
- 每個線程都需要占用一定的內存,當連接較多時,會開辟大量線程,導致占用大量內存
- 線程上下文切換成本高
- 只適合連接數少的場景
- 連接數過多,會導致創建很多線程,從而出現問題
使用線程池技術
使用線程池,讓線程池中的線程去處理連接
這種方法存在以下幾個問題
- 阻塞模式下,線程僅能處理一個連接
- 線程池中的線程獲取任務(task)后,只有當其執行完任務之后(斷開連接后),才會去獲取并執行下一個任務
- 若socke連接一直未斷開,則其對應的線程無法處理其他socke連接
- 僅適合短連接場景
- 短連接即建立連接發送請求并響應后就立即斷開,使得線程池中的線程可以快速處理其他連接
使用選擇器(這也叫響應式編程)
selector 的作用就是配合一個線程來管理多個 channel(fileChannel因為是阻塞式的,所以無法使用selector),獲取這些 channel 上發生的事件,這些 channel 工作在非阻塞模式下,當一個channel中沒有執行任務時,可以去執行其他channel中的任務。適合連接數多,但流量較少的場景
若事件未就緒,調用 selector 的 select() 方法會阻塞線程,直到 channel 發生了就緒事件。這些事件就緒后,select 方法就會返回這些事件交給 thread 來處理
2、ByteBuffer
使用方式
- 向 buffer 寫入數據,例如調用 channel.read(buffer)
- 調用 flip() 切換至讀模式
- flip會使得buffer中的limit變為position,position變為0
- 從 buffer 讀取數據,例如調用 buffer.get()
- 調用 clear() 或者compact()切換至寫模式
- 調用clear()方法時position=0,limit變為capacity
- 調用compact()方法時,會將緩沖區中的未讀數據壓縮到緩沖區前面
- 重復以上步驟
使用ByteBuffer讀取文件中的內容
public class TestByteBuffer {public static void main(String[] args) {// 獲得FileChanneltry (FileChannel channel = new FileInputStream("stu.txt").getChannel()) {// 獲得緩沖區ByteBuffer buffer = ByteBuffer.allocate(10);int hasNext = 0;StringBuilder builder = new StringBuilder();while((hasNext = channel.read(buffer)) > 0) {// 切換模式 limit=position, position=0buffer.flip();// 當buffer中還有數據時,獲取其中的數據while(buffer.hasRemaining()) {builder.append((char)buffer.get());}// 切換模式 position=0, limit=capacitybuffer.clear();}System.out.println(builder.toString());} catch (IOException e) {}} }核心屬性
字節緩沖區的父類Buffer中有幾個核心屬性,如下
// Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity;- capacity:緩沖區的容量。通過構造函數賦予,一旦設置,無法更改
- limit:緩沖區的界限。位于limit 后的數據不可讀寫。緩沖區的限制不能為負,并且不能大于其容量
- position:下一個讀寫位置的索引(類似PC)。緩沖區的位置不能為負,并且不能大于limit
- mark:記錄當前position的值。position被改變后,可以通過調用reset() 方法恢復到mark的位置。
以上四個屬性必須滿足以下要求
mark <= position <= limit <= capacity
核心方法
put()方法
- put()方法可以將一個數據放入到緩沖區中。
- 進行該操作后,postition的值會+1,指向下一個可以放入的位置。capacity = limit ,為緩沖區容量的值。
flip()方法
- flip()方法會切換對緩沖區的操作模式,由寫->讀 / 讀->寫
- 進行該操作后
- 如果是寫模式->讀模式,position = 0 , limit 指向最后一個元素的下一個位置,capacity不變
- 如果是讀->寫,則恢復為put()方法中的值
get()方法
- get()方法會讀取緩沖區中的一個值
- 進行該操作后,position會+1,如果超過了limit則會拋出異常
- 注意:get(i)方法不會改變position的值
rewind()方法
- 該方法只能在讀模式下使用
- rewind()方法后,會恢復position、limit和capacity的值,變為進行get()前的值
clean()方法
- clean()方法會將緩沖區中的各個屬性恢復為最初的狀態,position = 0, capacity = limit
- 此時緩沖區的數據依然存在,處于“被遺忘”狀態,下次進行寫操作時會覆蓋這些數據
mark()和reset()方法
- mark()方法會將postion的值保存到mark屬性中
- reset()方法會將position的值改為mark中保存的值
compact()方法
此方法為ByteBuffer的方法,而不是Buffer的方法
- compact會把未讀完的數據向前壓縮,然后切換到寫模式
- 數據前移后,原位置的值并未清零,寫時會覆蓋之前的值
clear() VS compact()
clear只是對position、limit、mark進行重置,而compact在對position進行設置,以及limit、mark進行重置的同時,還涉及到數據在內存中拷貝(會調用arraycopy)。所以compact比clear更耗性能。但compact能保存你未讀取的數據,將新數據追加到為讀取的數據之后;而clear則不行,若你調用了clear,則未讀取的數據就無法再讀取到了
所以需要根據情況來判斷使用哪種方法進行模式切換
方法調用及演示
ByteBuffer調試工具類
需要先導入netty依賴
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.51.Final</version> </dependency> import java.nio.ByteBuffer;import io.netty.util.internal.MathUtil; import io.netty.util.internal.StringUtil; import io.netty.util.internal.MathUtil.*;/*** @author Panwen Chen* @date 2021/4/12 15:59*/ public class ByteBufferUtil {private static final char[] BYTE2CHAR = new char[256];private static final char[] HEXDUMP_TABLE = new char[256 * 4];private static final String[] HEXPADDING = new String[16];private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];private static final String[] BYTE2HEX = new String[256];private static final String[] BYTEPADDING = new String[16];static {final char[] DIGITS = "0123456789abcdef".toCharArray();for (int i = 0; i < 256; i++) {HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];}int i;// Generate the lookup table for hex dump paddingsfor (i = 0; i < HEXPADDING.length; i++) {int padding = HEXPADDING.length - i;StringBuilder buf = new StringBuilder(padding * 3);for (int j = 0; j < padding; j++) {buf.append(" ");}HEXPADDING[i] = buf.toString();}// Generate the lookup table for the start-offset header in each row (up to 64KiB).for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {StringBuilder buf = new StringBuilder(12);buf.append(StringUtil.NEWLINE);buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));buf.setCharAt(buf.length() - 9, '|');buf.append('|');HEXDUMP_ROWPREFIXES[i] = buf.toString();}// Generate the lookup table for byte-to-hex-dump conversionfor (i = 0; i < BYTE2HEX.length; i++) {BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);}// Generate the lookup table for byte dump paddingsfor (i = 0; i < BYTEPADDING.length; i++) {int padding = BYTEPADDING.length - i;StringBuilder buf = new StringBuilder(padding);for (int j = 0; j < padding; j++) {buf.append(' ');}BYTEPADDING[i] = buf.toString();}// Generate the lookup table for byte-to-char conversionfor (i = 0; i < BYTE2CHAR.length; i++) {if (i <= 0x1f || i >= 0x7f) {BYTE2CHAR[i] = '.';} else {BYTE2CHAR[i] = (char) i;}}}/*** 打印所有內容* @param buffer*/public static void debugAll(ByteBuffer buffer) {int oldlimit = buffer.limit();buffer.limit(buffer.capacity());StringBuilder origin = new StringBuilder(256);appendPrettyHexDump(origin, buffer, 0, buffer.capacity());System.out.println("+--------+-------------------- all ------------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);System.out.println(origin);buffer.limit(oldlimit);}/*** 打印可讀取內容* @param buffer*/public static void debugRead(ByteBuffer buffer) {StringBuilder builder = new StringBuilder(256);appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());System.out.println("+--------+-------------------- read -----------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());System.out.println(builder);}private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {throw new IndexOutOfBoundsException("expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length+ ") <= " + "buf.capacity(" + buf.capacity() + ')');}if (length == 0) {return;}dump.append(" +-------------------------------------------------+" +StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");final int startIndex = offset;final int fullRows = length >>> 4;final int remainder = length & 0xF;// Dump the rows which have 16 bytes.for (int row = 0; row < fullRows; row++) {int rowStartIndex = (row << 4) + startIndex;// Per-row prefix.appendHexDumpRowPrefix(dump, row, rowStartIndex);// Hex dumpint rowEndIndex = rowStartIndex + 16;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(" |");// ASCII dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append('|');}// Dump the last row which has less than 16 bytes.if (remainder != 0) {int rowStartIndex = (fullRows << 4) + startIndex;appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);// Hex dumpint rowEndIndex = rowStartIndex + remainder;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(HEXPADDING[remainder]);dump.append(" |");// Ascii dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append(BYTEPADDING[remainder]);dump.append('|');}dump.append(StringUtil.NEWLINE +"+--------+-------------------------------------------------+----------------+");}private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {if (row < HEXDUMP_ROWPREFIXES.length) {dump.append(HEXDUMP_ROWPREFIXES[row]);} else {dump.append(StringUtil.NEWLINE);dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));dump.setCharAt(dump.length() - 9, '|');dump.append('|');}}public static short getUnsignedByte(ByteBuffer buffer, int index) {return (short) (buffer.get(index) & 0xFF);} } public class TestByteBuffer {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);// 向buffer中寫入1個字節的數據buffer.put((byte)97);// 使用工具類,查看buffer狀態ByteBufferUtil.debugAll(buffer);// 向buffer中寫入4個字節的數據buffer.put(new byte[]{98, 99, 100, 101});ByteBufferUtil.debugAll(buffer);// 獲取數據buffer.flip();ByteBufferUtil.debugAll(buffer);System.out.println(buffer.get());System.out.println(buffer.get());ByteBufferUtil.debugAll(buffer);// 使用compact切換模式buffer.compact();ByteBufferUtil.debugAll(buffer);// 再次寫入buffer.put((byte)102);buffer.put((byte)103);ByteBufferUtil.debugAll(buffer);} }運行結果
// 向緩沖區寫入了一個字節的數據,此時postition為1 +--------+-------------------- all ------------------------+----------------+ position: [1], limit: [10]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 00 00 00 00 00 00 00 00 00 |a......... | +--------+-------------------------------------------------+----------------+// 向緩沖區寫入四個字節的數據,此時position為5 +--------+-------------------- all ------------------------+----------------+ position: [5], limit: [10]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 62 63 64 65 00 00 00 00 00 |abcde..... | +--------+-------------------------------------------------+----------------+// 調用flip切換模式,此時position為0,表示從第0個數據開始讀取 +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 62 63 64 65 00 00 00 00 00 |abcde..... | +--------+-------------------------------------------------+----------------+ // 讀取兩個字節的數據 97 98// position變為2 +--------+-------------------- all ------------------------+----------------+ position: [2], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 62 63 64 65 00 00 00 00 00 |abcde..... | +--------+-------------------------------------------------+----------------+// 調用compact切換模式,此時position及其后面的數據被壓縮到ByteBuffer前面去了 // 此時position為3,會覆蓋之前的數據 +--------+-------------------- all ------------------------+----------------+ position: [3], limit: [10]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 63 64 65 64 65 00 00 00 00 00 |cdede..... | +--------+-------------------------------------------------+----------------+// 再次寫入兩個字節的數據,之前的 0x64 0x65 被覆蓋 +--------+-------------------- all ------------------------+----------------+ position: [5], limit: [10]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 63 64 65 66 67 00 00 00 00 00 |cdefg..... | +--------+-------------------------------------------------+----------------+方法二
編碼:通過StandardCharsets的encode方法獲得ByteBuffer,此時獲得的ByteBuffer為讀模式,無需通過flip切換模式
解碼:通過StandardCharsets的decoder方法解碼
public class Translate {public static void main(String[] args) {// 準備兩個字符串String str1 = "hello";String str2 = "";// 通過StandardCharsets的encode方法獲得ByteBuffer// 此時獲得的ByteBuffer為讀模式,無需通過flip切換模式ByteBuffer buffer1 = StandardCharsets.UTF_8.encode(str1);ByteBufferUtil.debugAll(buffer1);// 將緩沖區中的數據轉化為字符串// 通過StandardCharsets解碼,獲得CharBuffer,再通過toString獲得字符串str2 = StandardCharsets.UTF_8.decode(buffer1).toString();System.out.println(str2);ByteBufferUtil.debugAll(buffer1);} }運行結果 +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+ hello +--------+-------------------- all ------------------------+----------------+ position: [5], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+方法三編碼:字符串調用getByte()方法獲得字節數組,將字節數組傳給ByteBuffer的wrap()方法,通過該方法獲得ByteBuffer。同樣無需調用flip方法切換為讀模式
解碼:通過StandardCharsets的decoder方法解碼
public class Translate {public static void main(String[] args) {// 準備兩個字符串String str1 = "hello";String str2 = "";// 通過StandardCharsets的encode方法獲得ByteBuffer// 此時獲得的ByteBuffer為讀模式,無需通過flip切換模式ByteBuffer buffer1 = ByteBuffer.wrap(str1.getBytes());ByteBufferUtil.debugAll(buffer1);// 將緩沖區中的數據轉化為字符串// 通過StandardCharsets解碼,獲得CharBuffer,再通過toString獲得字符串str2 = StandardCharsets.UTF_8.decode(buffer1).toString();System.out.println(str2);ByteBufferUtil.debugAll(buffer1);} }運行結果 +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+ hello +--------+-------------------- all ------------------------+----------------+ position: [5], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+
???????粘包與半包
現象
網絡上有多條數據發送給服務端,數據之間使用 \n 進行分隔
但由于某種原因這些數據在接收時,被進行了重新組合,例如原始數據有3條為
- Hello,world\n
- I’m Nyima\n
- How are you?\n
變成了下面的兩個 byteBuffer (粘包,半包)
- Hello,world\nI’m Nyima\nHo
- w are you?\n
出現原因
粘包
發送方在發送數據時,并不是一條一條地發送數據,而是將數據整合在一起,當數據達到一定的數量后再一起發送。這就會導致多條信息被放在一個緩沖區中被一起發送出去
半包
接收方的緩沖區的大小是有限的,當接收方的緩沖區滿了以后,就需要將信息截斷,等緩沖區空了以后再繼續放入數據。這就會發生一段完整的數據最后被截斷的現象
解決辦法
- 通過get(index)方法遍歷ByteBuffer,遇到分隔符時進行處理。注意:get(index)不會改變position的值
- 記錄該段數據長度,以便于申請對應大小的緩沖區
- 將緩沖區的數據通過get()方法寫入到target中
- 調用compact方法切換模式,因為緩沖區中可能還有未讀的數據
二、文件編程
1、FileChannel
工作模式
FileChannel只能在阻塞模式下工作,所以無法搭配Selector
獲取
不能直接打開 FileChannel,必須通過 FileInputStream、FileOutputStream 或者 RandomAccessFile 來獲取 FileChannel,它們都有 getChannel 方法
- 通過 FileInputStream 獲取的 channel?只能讀
- 通過 FileOutputStream 獲取的 channel?只能寫
- 通過 RandomAccessFile 是否能讀寫根據構造 RandomAccessFile 時的讀寫模式決定
讀取
通過 FileInputStream 獲取channel,通過read方法將數據寫入到ByteBuffer中
read方法的返回值表示讀到了多少字節,若讀到了文件末尾則返回-1
int readBytes = channel.read(buffer);可根據返回值判斷是否讀取完畢Copy while(channel.read(buffer) > 0) {// 進行對應操作... }Copy寫入
因為channel也是有大小的,所以 write 方法并不能保證一次將 buffer 中的內容全部寫入 channel。必須需要按照以下規則進行寫入
// 通過hasRemaining()方法查看緩沖區中是否還有數據未寫入到通道中 while(buffer.hasRemaining()) {channel.write(buffer); }Copy關閉
通道需要close,一般情況通過try-with-resource進行關閉,最好使用以下方法獲取strea以及channel,避免某些原因使得資源未被關閉
public class TestChannel {public static void main(String[] args) throws IOException {try (FileInputStream fis = new FileInputStream("stu.txt");FileOutputStream fos = new FileOutputStream("student.txt");FileChannel inputChannel = fis.getChannel();FileChannel outputChannel = fos.getChannel()) {// 執行對應操作...}} }position
channel也擁有一個保存讀取數據位置的屬性,即position
long pos = channel.position();可以通過position(int pos)設置channel中position的值 long newPos = ...; channel.position(newPos);設置當前位置時,如果設置為文件的末尾
- 這時讀取會返回 -1
- 這時寫入,會追加內容,但要注意如果 position 超過了文件末尾,再寫入時在新內容和原末尾之間會有空洞(00)
強制寫入
操作系統出于性能的考慮,會將數據緩存,不是立刻寫入磁盤,而是等到緩存滿了以后將所有數據一次性的寫入磁盤。可以調用?force(true)?方法將文件內容和元數據(文件的權限等信息)立刻寫入磁盤
2、兩個Channel傳輸數據
transferTo方法
使用transferTo方法可以快速、高效地將一個channel中的數據傳輸到另一個channel中,但一次只能傳輸2G的內容
transferTo底層使用了零拷貝技術
public class TestChannel {public static void main(String[] args){try (FileInputStream fis = new FileInputStream("stu.txt");FileOutputStream fos = new FileOutputStream("student.txt");FileChannel inputChannel = fis.getChannel();FileChannel outputChannel = fos.getChannel()) {// 參數:inputChannel的起始位置,傳輸數據的大小,目的channel// 返回值為傳輸的數據的字節數// transferTo一次只能傳輸2G的數據inputChannel.transferTo(0, inputChannel.size(), outputChannel);} catch (IOException e) {e.printStackTrace();}} }當傳輸的文件大于2G時,需要使用以下方法進行多次傳輸
public class TestChannel {public static void main(String[] args){try (FileInputStream fis = new FileInputStream("stu.txt");FileOutputStream fos = new FileOutputStream("student.txt");FileChannel inputChannel = fis.getChannel();FileChannel outputChannel = fos.getChannel()) {long size = inputChannel.size();long capacity = inputChannel.size();// 分多次傳輸while (capacity > 0) {// transferTo返回值為傳輸了的字節數capacity -= inputChannel.transferTo(size-capacity, capacity, outputChannel);}} catch (IOException e) {e.printStackTrace();}} }3、Path與Paths
- Path 用來表示文件路徑
- Paths 是工具類,用來獲取 Path 實例
- .?代表了當前路徑
- ..?代表了上一級路徑
例如目錄結構如下
d:|- data|- projects|- a|- b代碼
Path path = Paths.get("d:\\data\\projects\\a\\..\\b"); System.out.println(path); System.out.println(path.normalize()); // 正常化路徑 會去除 . 以及 ..輸出結果為
d:\data\projects\a\..\b d:\data\projects\b4、Files
查找
檢查文件是否存在
Path path = Paths.get("helloword/data.txt"); System.out.println(Files.exists(path));創建
創建一級目錄
Path path = Paths.get("helloword/d1"); Files.createDirectory(path);- 如果目錄已存在,會拋異常 FileAlreadyExistsException
- 不能一次創建多級目錄,否則會拋異常 NoSuchFileException
創建多級目錄用
Path path = Paths.get("helloword/d1/d2"); Files.createDirectories(path);Copy拷貝及移動拷貝文件
Path source = Paths.get("helloword/data.txt"); Path target = Paths.get("helloword/target.txt");Files.copy(source, target);Copy- 如果文件已存在,會拋異常 FileAlreadyExistsException
如果希望用 source?覆蓋掉 target,需要用 StandardCopyOption 來控制
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);Copy移動文件
Path source = Paths.get("helloword/data.txt"); Path target = Paths.get("helloword/data.txt");Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);StandardCopyOption.ATOMIC_MOVE 保證文件移動的原子性刪除
刪除文件
Path target = Paths.get("helloword/target.txt");三、網絡編程
1、阻塞
- 阻塞模式下,相關方法都會導致線程暫停
- ServerSocketChannel.accept 會在沒有連接建立時讓線程暫停
- SocketChannel.read 會在通道中沒有數據可讀時讓線程暫停
- 阻塞的表現其實就是線程暫停了,暫停期間不會占用 cpu,但線程相當于閑置
- 單線程下,阻塞方法之間相互影響,幾乎不能正常工作,需要多線程支持
- 但多線程下,有新的問題,體現在以下方面
- 32 位 jvm 一個線程 320k,64 位 jvm 一個線程 1024k,如果連接數過多,必然導致 OOM,并且線程太多,反而會因為頻繁上下文切換導致性能降低
- 可以采用線程池技術來減少線程數和線程上下文切換,但治標不治本,如果有很多連接建立,但長時間 inactive,會阻塞線程池中所有線程,因此不適合長連接,只適合短連接
服務端代碼
public class Server {public static void main(String[] args) {// 創建緩沖區ByteBuffer buffer = ByteBuffer.allocate(16);// 獲得服務器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {// 為服務器通道綁定端口server.bind(new InetSocketAddress(8080));// 用戶存放連接的集合ArrayList<SocketChannel> channels = new ArrayList<>();// 循環接收連接while (true) {System.out.println("before connecting...");// 沒有連接時,會阻塞線程SocketChannel socketChannel = server.accept();System.out.println("after connecting...");channels.add(socketChannel);// 循環遍歷集合中的連接for(SocketChannel channel : channels) {System.out.println("before reading");// 處理通道中的數據// 當通道中沒有數據可讀時,會阻塞線程channel.read(buffer);buffer.flip();ByteBufferUtil.debugRead(buffer);buffer.clear();System.out.println("after reading");}}} catch (IOException e) {e.printStackTrace();}} }?客戶端代碼
public class Client {public static void main(String[] args) {try (SocketChannel socketChannel = SocketChannel.open()) {// 建立連接socketChannel.connect(new InetSocketAddress("localhost", 8080));System.out.println("waiting...");} catch (IOException e) {e.printStackTrace();}} }運行結果
- 客戶端-服務器建立連接前:服務器端因accept阻塞
- 客戶端-服務器建立連接后,客戶端發送消息前:服務器端因通道為空被阻塞
- 客戶端發送數據后,服務器處理通道中的數據。再次進入循環時,再次被accept阻塞
- 之前的客戶端再次發送消息,服務器端因為被accept阻塞,無法處理之前客戶端發送到通道中的信息
2、非阻塞
-
可以通過ServerSocketChannel的configureBlocking(false)方法將獲得連接設置為非阻塞的。此時若沒有連接,accept會返回null
-
可以通過SocketChannel的configureBlocking(false)方法將從通道中讀取數據設置為非阻塞的。若此時通道中沒有數據可讀,read會返回-1
服務器代碼如下
public class Server {public static void main(String[] args) {// 創建緩沖區ByteBuffer buffer = ByteBuffer.allocate(16);// 獲得服務器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {// 為服務器通道綁定端口server.bind(new InetSocketAddress(8080));// 用戶存放連接的集合ArrayList<SocketChannel> channels = new ArrayList<>();// 循環接收連接while (true) {// 設置為非阻塞模式,沒有連接時返回null,不會阻塞線程server.configureBlocking(false);SocketChannel socketChannel = server.accept();// 通道不為空時才將連接放入到集合中if (socketChannel != null) {System.out.println("after connecting...");channels.add(socketChannel);}// 循環遍歷集合中的連接for(SocketChannel channel : channels) {// 處理通道中的數據// 設置為非阻塞模式,若通道中沒有數據,會返回0,不會阻塞線程channel.configureBlocking(false);int read = channel.read(buffer);if(read > 0) {buffer.flip();ByteBufferUtil.debugRead(buffer);buffer.clear();System.out.println("after reading");}}}} catch (IOException e) {e.printStackTrace();}} }這樣寫存在一個問題,因為設置為了非阻塞,會一直執行while(true)中的代碼,CPU一直處于忙碌狀態,會使得性能變低,所以實際情況中不使用這種方法處理請求
3、Selector
多路復用
單線程可以配合 Selector 完成對多個 Channel 可讀寫事件的監控,這稱之為多路復用
- 多路復用僅針對網絡 IO,普通文件 IO?無法利用多路復用
- 如果不用 Selector 的非阻塞模式,線程大部分時間都在做無用功,而 Selector 能夠保證
- 有可連接事件時才去連接
- 有可讀事件才去讀取
- 有可寫事件才去寫入
- 限于網絡傳輸能力,Channel 未必時時可寫,一旦 Channel 可寫,會觸發 Selector 的可寫事件
4、使用及Accpet事件
要使用Selector實現多路復用,服務端代碼如下改進
public class SelectServer {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(16);// 獲得服務器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {server.bind(new InetSocketAddress(8080));// 創建選擇器Selector selector = Selector.open();// 通道必須設置為非阻塞模式server.configureBlocking(false);// 將通道注冊到選擇器中,并設置感興趣的事件server.register(selector, SelectionKey.OP_ACCEPT);while (true) {// 若沒有事件就緒,線程會被阻塞,反之不會被阻塞。從而避免了CPU空轉// 返回值為就緒的事件個數int ready = selector.select();System.out.println("selector ready counts : " + ready);// 獲取所有事件Set<SelectionKey> selectionKeys = selector.selectedKeys();// 使用迭代器遍歷事件Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 判斷key的類型if(key.isAcceptable()) {// 獲得key對應的channelServerSocketChannel channel = (ServerSocketChannel) key.channel();System.out.println("before accepting...");// 獲取連接并處理,而且是必須處理,否則需要取消SocketChannel socketChannel = channel.accept();System.out.println("after accepting...");// 處理完畢后移除iterator.remove();}}}} catch (IOException e) {e.printStackTrace();}} }步驟解析
- 獲得選擇器Selector
- 將通道設置為非阻塞模式,并注冊到選擇器中,并設置感興趣的事件
- channel 必須工作在非阻塞模式
- FileChannel 沒有非阻塞模式,因此不能配合 selector 一起使用
- 綁定的事件類型可以有
- connect - 客戶端連接成功時觸發
- accept - 服務器端成功接受連接時觸發
- read - 數據可讀入時觸發,有因為接收能力弱,數據暫不能讀入的情況
- write - 數據可寫出時觸發,有因為發送能力弱,數據暫不能寫出的情況
-
通過Selector監聽事件,并獲得就緒的通道個數,若沒有通道就緒,線程會被阻塞
-
阻塞直到綁定事件發生
int count = selector.select();阻塞直到綁定事件發生,或是超時(時間單位為 ms)???????
-
- int count = selector.select(long timeout);
-
不會阻塞,也就是不管有沒有事件,立刻返回,自己根據返回值檢查是否有事件
int count = selector.selectNow();
-
獲取就緒事件并得到對應的通道,然后進行處理
事件發生后,要么處理,要么取消(cancel),不能什么都不做,否則下次該事件仍會觸發,這是因為 nio 底層使用的是水平觸發
Level_triggered(水平觸發):當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。如果這次沒有把數據一次性全部讀寫完(如讀寫緩沖區太小),那么下次調用 epoll_wait()時,它還會通知你在上沒讀寫完的文件描述符上繼續讀寫,當然如果你一直不去讀寫,它會一直通知你!!!如果系統中有大量你不需要讀寫的就緒文件描述符,而它們每次都會返回,這樣會大大降低處理程序檢索自己關心的就緒文件描述符的效率!!!
Edge_triggered(邊緣觸發):當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。如果這次沒有把數據全部讀寫完(如讀寫緩沖區太小),那么下次調用epoll_wait()時,它不會通知你,也就是它只會通知你一次,直到該文件描述符上出現第二次可讀寫事件才會通知你!!!這種模式比水平觸發效率高,系統不會充斥大量你不關心的就緒文件描述符!!!
5、Read事件
- 在Accept事件中,若有客戶端與服務器端建立了連接,需要將其對應的SocketChannel設置為非阻塞,并注冊到選擇其中
- 添加Read事件,觸發后進行讀取操作
???????刪除事件
當處理完一個事件后,一定要調用迭代器的remove方法移除對應事件,否則會出現錯誤。原因如下
以我們上面的?Read事件?的代碼為例
-
當調用了 server.register(selector, SelectionKey.OP_ACCEPT)后,Selector中維護了一個集合,用于存放SelectionKey以及其對應的通道
// WindowsSelectorImpl 中的 SelectionKeyImpl數組 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[8]; public class SelectionKeyImpl extends AbstractSelectionKey {// Key對應的通道final SelChImpl channel;... } -
當選擇器中的通道對應的事件發生后,selecionKey會被放到另一個集合中,但是selecionKey不會自動移除,所以需要我們在處理完一個事件后,通過迭代器手動移除其中的selecionKey。否則會導致已被處理過的事件再次被處理,就會引發錯誤
-
斷開處理
當客戶端與服務器之間的連接斷開時,會給服務器端發送一個讀事件,對異常斷開和正常斷開需要加以不同的方式進行處理
-
正常斷開
-
正常斷開時,服務器端的channel.read(buffer)方法的返回值為-1,所以當結束到返回值為-1時,需要調用key的cancel方法取消此事件,并在取消后移除該事件
int read = channel.read(buffer); // 斷開連接時,客戶端會向服務器發送一個寫事件,此時read的返回值為-1 if(read == -1) {// 取消該事件的處理key.cancel();channel.close(); } else {... } // 取消或者處理,都需要移除key iterator.remove();異常斷開
-
-
- 異常斷開時,會拋出IOException異常, 在try-catch的catch塊中捕獲異常并調用key的cancel方法即可
消息邊界
不處理消息邊界存在的問題
將緩沖區的大小設置為4個字節,發送2個漢字(你好),通過decode解碼并打印時,會出現亂碼
ByteBuffer buffer = ByteBuffer.allocate(4); // 解碼并打印 System.out.println(StandardCharsets.UTF_8.decode(buffer)); 你� ��這是因為UTF-8字符集下,1個漢字占用3個字節,此時緩沖區大小為4個字節,一次讀時間無法處理完通道中的所有數據,所以一共會觸發兩次讀事件。這就導致?你好?的?好?字被拆分為了前半部分和后半部分發送,解碼時就會出現問題
處理消息邊界
傳輸的文本可能有以下三種情況
- 文本大于緩沖區大小
- 此時需要將緩沖區進行擴容
- 發生半包現象
- 發生粘包現象
解決思路大致有以下三種
- 固定消息長度,數據包大小一樣,服務器按預定長度讀取,當發送的數據較少時,需要將數據進行填充,直到長度與消息規定長度一致。缺點是浪費帶寬
- 另一種思路是按分隔符拆分,缺點是效率低,需要一個一個字符地去匹配分隔符
- TLV 格式,即 Type 類型、Length 長度、Value 數據(也就是在消息開頭用一些空間存放后面數據的長度),如HTTP請求頭中的Content-Type與Content-Length。類型和長度已知的情況下,就可以方便獲取消息大小,分配合適的 buffer,缺點是 buffer 需要提前分配,如果內容過大,則影響 server 吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
下文的消息邊界處理方式為第二種:按分隔符拆分
附件與擴容
Channel的register方法還有第三個參數:附件,可以向其中放入一個Object類型的對象,該對象會與登記的Channel以及其對應的SelectionKey綁定,可以從SelectionKey獲取到對應通道的附件
public final SelectionKey register(Selector sel, int ops, Object att)可通過SelectionKey的attachment()方法獲得附件 ByteBuffer buffer = (ByteBuffer) key.attachment();我們需要在Accept事件發生后,將通道注冊到Selector中時,對每個通道添加一個ByteBuffer附件,讓每個通道發生讀事件時都使用自己的通道,避免與其他通道發生沖突而導致問題
// 設置為非阻塞模式,同時將連接的通道也注冊到選擇其中,同時設置附件 socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(16); // 添加通道對應的Buffer附件 socketChannel.register(selector, SelectionKey.OP_READ, buffer);當Channel中的數據大于緩沖區時,需要對緩沖區進行擴容操作。此代碼中的擴容的判定方法:Channel調用compact方法后,的position與limit相等,說明緩沖區中的數據并未被讀取(容量太小),此時創建新的緩沖區,其大小擴大為兩倍。同時還要將舊緩沖區中的數據拷貝到新的緩沖區中,同時調用SelectionKey的attach方法將新的緩沖區作為新的附件放入SelectionKey中
// 如果緩沖區太小,就進行擴容 if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);// 將舊buffer中的內容放入新的buffer中ewBuffer.put(buffer);// 將新buffer作為附件放到key中key.attach(newBuffer); }改造后的服務器代碼如下 public class SelectServer {public static void main(String[] args) {// 獲得服務器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {server.bind(new InetSocketAddress(8080));// 創建選擇器Selector selector = Selector.open();// 通道必須設置為非阻塞模式server.configureBlocking(false);// 將通道注冊到選擇器中,并設置感興趣的事件server.register(selector, SelectionKey.OP_ACCEPT);// 為serverKey設置感興趣的事件while (true) {// 若沒有事件就緒,線程會被阻塞,反之不會被阻塞。從而避免了CPU空轉// 返回值為就緒的事件個數int ready = selector.select();System.out.println("selector ready counts : " + ready);// 獲取所有事件Set<SelectionKey> selectionKeys = selector.selectedKeys();// 使用迭代器遍歷事件Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 判斷key的類型if(key.isAcceptable()) {// 獲得key對應的channelServerSocketChannel channel = (ServerSocketChannel) key.channel();System.out.println("before accepting...");// 獲取連接SocketChannel socketChannel = channel.accept();System.out.println("after accepting...");// 設置為非阻塞模式,同時將連接的通道也注冊到選擇其中,同時設置附件socketChannel.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16);socketChannel.register(selector, SelectionKey.OP_READ, buffer);// 處理完畢后移除iterator.remove();} else if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();System.out.println("before reading...");// 通過key獲得附件(buffer)ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer);if(read == -1) {key.cancel();channel.close();} else {// 通過分隔符來分隔buffer中的數據split(buffer);// 如果緩沖區太小,就進行擴容if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);// 將舊buffer中的內容放入新的buffer中buffer.flip();newBuffer.put(buffer);// 將新buffer放到key中作為附件key.attach(newBuffer);}}System.out.println("after reading...");// 處理完畢后移除iterator.remove();}}}} catch (IOException e) {e.printStackTrace();}}private static void split(ByteBuffer buffer) {buffer.flip();for(int i = 0; i < buffer.limit(); i++) {// 遍歷尋找分隔符// get(i)不會移動positionif (buffer.get(i) == '\n') {// 緩沖區長度int length = i+1-buffer.position();ByteBuffer target = ByteBuffer.allocate(length);// 將前面的內容寫入target緩沖區for(int j = 0; j < length; j++) {// 將buffer中的數據寫入target中target.put(buffer.get());}// 打印結果ByteBufferUtil.debugAll(target);}}// 切換為寫模式,但是緩沖區可能未讀完,這里需要使用compactbuffer.compact();} }CopyByteBuffer的大小分配
- 每個 channel 都需要記錄可能被切分的消息,因為?ByteBuffer 不能被多個 channel 共同使用,因此需要為每個 channel 維護一個獨立的 ByteBuffer
- ByteBuffer 不能太大,比如一個 ByteBuffer 1Mb 的話,要支持百萬連接就要 1Tb 內存,因此需要設計大小可變的 ByteBuffer
- 分配思路可以參考
- 一種思路是首先分配一個較小的 buffer,例如 4k,如果發現數據不夠,再分配 8k 的 buffer,將 4k buffer 內容拷貝至 8k buffer,優點是消息連續容易處理,缺點是數據拷貝耗費性能
- 參考實現?Java Resizable Array
- 另一種思路是用多個數組組成 buffer,一個數組不夠,把多出來的內容寫入新的數組,與前面的區別是消息存儲不連續解析復雜,優點是避免了拷貝引起的性能損耗
- 一種思路是首先分配一個較小的 buffer,例如 4k,如果發現數據不夠,再分配 8k 的 buffer,將 4k buffer 內容拷貝至 8k buffer,優點是消息連續容易處理,缺點是數據拷貝耗費性能
6、Write事件
服務器通過Buffer向通道中寫入數據時,可能因為通道容量小于Buffer中的數據大小,導致無法一次性將Buffer中的數據全部寫入到Channel中,這時便需要分多次寫入,具體步驟如下
-
執行一次寫操作,向將buffer中的內容寫入到SocketChannel中,然后判斷Buffer中是否還有數據
-
若Buffer中還有數據,則需要將SockerChannel注冊到Seletor中,并關注寫事件,同時將未寫完的Buffer作為附件一起放入到SelectionKey中
int write = socket.write(buffer); // 通道中可能無法放入緩沖區中的所有數據 if (buffer.hasRemaining()) {// 注冊到Selector中,關注可寫事件,并將buffer添加到key的附件中socket.configureBlocking(false);socket.register(selector, SelectionKey.OP_WRITE, buffer); } -
添加寫事件的相關操作key.isWritable(),對Buffer再次進行寫操作
- 每次寫后需要判斷Buffer中是否還有數據(是否寫完)。若寫完,需要移除SelecionKey中的Buffer附件,避免其占用過多內存,同時還需移除對寫事件的關注
整體代碼如下
public class WriteServer {public static void main(String[] args) {try(ServerSocketChannel server = ServerSocketChannel.open()) {server.bind(new InetSocketAddress(8080));server.configureBlocking(false);Selector selector = Selector.open();server.register(selector, SelectionKey.OP_ACCEPT);while (true) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 處理后就移除事件iterator.remove();if (key.isAcceptable()) {// 獲得客戶端的通道SocketChannel socket = server.accept();// 寫入數據StringBuilder builder = new StringBuilder();for(int i = 0; i < 500000000; i++) {builder.append("a");}ByteBuffer buffer = StandardCharsets.UTF_8.encode(builder.toString());// 先執行一次Buffer->Channel的寫入,如果未寫完,就添加一個可寫事件int write = socket.write(buffer);System.out.println(write);// 通道中可能無法放入緩沖區中的所有數據if (buffer.hasRemaining()) {// 注冊到Selector中,關注可寫事件,并將buffer添加到key的附件中socket.configureBlocking(false);socket.register(selector, SelectionKey.OP_WRITE, buffer);}} else if (key.isWritable()) {SocketChannel socket = (SocketChannel) key.channel();// 獲得bufferByteBuffer buffer = (ByteBuffer) key.attachment();// 執行寫操作int write = socket.write(buffer);System.out.println(write);// 如果已經完成了寫操作,需要移除key中的附件,同時不再對寫事件感興趣if (!buffer.hasRemaining()) {key.attach(null);key.interestOps(0);}}}}} catch (IOException e) {e.printStackTrace();}} }7、優化
多線程優化
充分利用多核CPU,分兩組選擇器
- 單線程配一個選擇器(Boss),專門處理 accept 事件
- 創建 cpu 核心數的線程(Worker),每個線程配一個選擇器,輪流處理 read 事件
實現思路
-
創建一個負責處理Accept事件的Boss線程,與多個負責處理Read事件的Worker線程
-
Boss線程執行的操作
-
接受并處理Accepet事件,當Accept事件發生后,調用Worker的register(SocketChannel socket)方法,讓Worker去處理Read事件,其中需要根據標識robin去判斷將任務分配給哪個Worker
// 創建固定數量的Worker Worker[] workers = new Worker[4]; // 用于負載均衡的原子整數 AtomicInteger robin = new AtomicInteger(0); // 負載均衡,輪詢分配Worker workers[robin.getAndIncrement()% workers.length].register(socket);Copy -
register(SocketChannel socket)方法會通過同步隊列完成Boss線程與Worker線程之間的通信,讓SocketChannel的注冊任務被Worker線程執行。添加任務后需要調用selector.wakeup()來喚醒被阻塞的Selector
public void register(final SocketChannel socket) throws IOException {// 只啟動一次if (!started) {// 初始化操作}// 向同步隊列中添加SocketChannel的注冊事件// 在Worker線程中執行注冊事件queue.add(new Runnable() {@Overridepublic void run() {try {socket.register(selector, SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}});// 喚醒被阻塞的Selector// select類似LockSupport中的park,wakeup的原理類似LockSupport中的unparkselector.wakeup(); }Copy
-
-
Worker線程執行的操作
- 從同步隊列中獲取注冊任務,并處理Read事件
實現代碼
public class ThreadsServer {public static void main(String[] args) {try (ServerSocketChannel server = ServerSocketChannel.open()) {// 當前線程為Boss線程Thread.currentThread().setName("Boss");server.bind(new InetSocketAddress(8080));// 負責輪詢Accept事件的SelectorSelector boss = Selector.open();server.configureBlocking(false);server.register(boss, SelectionKey.OP_ACCEPT);// 創建固定數量的WorkerWorker[] workers = new Worker[4];// 用于負載均衡的原子整數AtomicInteger robin = new AtomicInteger(0);for(int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-"+i);}while (true) {boss.select();Set<SelectionKey> selectionKeys = boss.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();// BossSelector負責Accept事件if (key.isAcceptable()) {// 建立連接SocketChannel socket = server.accept();System.out.println("connected...");socket.configureBlocking(false);// socket注冊到Worker的Selector中System.out.println("before read...");// 負載均衡,輪詢分配Workerworkers[robin.getAndIncrement()% workers.length].register(socket);System.out.println("after read...");}}}} catch (IOException e) {e.printStackTrace();}}static class Worker implements Runnable {private Thread thread;private volatile Selector selector;private String name;private volatile boolean started = false;/*** 同步隊列,用于Boss線程與Worker線程之間的通信*/private ConcurrentLinkedQueue<Runnable> queue;public Worker(String name) {this.name = name;}public void register(final SocketChannel socket) throws IOException {// 只啟動一次if (!started) {thread = new Thread(this, name);selector = Selector.open();queue = new ConcurrentLinkedQueue<>();thread.start();started = true;}// 向同步隊列中添加SocketChannel的注冊事件// 在Worker線程中執行注冊事件queue.add(new Runnable() {@Overridepublic void run() {try {socket.register(selector, SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}});// 喚醒被阻塞的Selector// select類似LockSupport中的park,wakeup的原理類似LockSupport中的unparkselector.wakeup();}@Overridepublic void run() {while (true) {try {selector.select();// 通過同步隊列獲得任務并運行Runnable task = queue.poll();if (task != null) {// 獲得任務,執行注冊操作task.run();}Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();// Worker只負責Read事件if (key.isReadable()) {// 簡化處理,省略細節SocketChannel socket = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(16);socket.read(buffer);buffer.flip();ByteBufferUtil.debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}} }四、NIO與BIO
1、Stream與Channel
- stream 不會自動緩沖數據,channel 會利用系統提供的發送緩沖區、接收緩沖區(更為底層)
- stream 僅支持阻塞 API,channel 同時支持阻塞、非阻塞 API,網絡 channel 可配合 selector 實現多路復用
- 二者均為全雙工,即讀寫可以同時進行
- 雖然Stream是單向流動的,但是它也是全雙工的
2、IO模型
- 同步:線程自己去獲取結果(一個線程)
- 例如:線程調用一個方法后,需要等待方法返回結果
- 異步:線程自己不去獲取結果,而是由其它線程返回結果(至少兩個線程)
- 例如:線程A調用一個方法后,繼續向下運行,運行結果由線程B返回
當調用一次 channel.read?或 stream.read?后,會由用戶態切換至操作系統內核態來完成真正數據讀取,而讀取又分為兩個階段,分別為:
-
等待數據階段
-
復制數據階段
根據UNIX 網絡編程 - 卷 I,IO模型主要有以下幾種
阻塞IO
- 用戶線程進行read操作時,需要等待操作系統執行實際的read操作,此期間用戶線程是被阻塞的,無法執行其他操作
非阻塞IO
- 用戶線程在一個循環中一直調用read方法,若內核空間中還沒有數據可讀,立即返回
- 只是在等待階段非阻塞
- 用戶線程發現內核空間中有數據后,等待內核空間執行復制數據,待復制結束后返回結果
多路復用
Java中通過Selector實現多路復用
- 當沒有事件是,調用select方法會被阻塞住
- 一旦有一個或多個事件發生后,就會處理對應的事件,從而實現多路復用
多路復用與阻塞IO的區別
- 阻塞IO模式下,若線程因accept事件被阻塞,發生read事件后,仍需等待accept事件執行完成后,才能去處理read事件
- 多路復用模式下,一個事件發生后,若另一個事件處于阻塞狀態,不會影響該事件的執行
異步IO
- 線程1調用方法后理解返回,不會被阻塞也不需要立即獲取結果
- 當方法的運行結果出來以后,由線程2將結果返回給線程1
3、零拷貝
零拷貝指的是數據無需拷貝到 JVM 內存中,同時具有以下三個優點
- 更少的用戶態與內核態的切換
- 不利用 cpu 計算,減少 cpu 緩存偽共享
- 零拷貝適合小文件傳輸
傳統 IO 問題
傳統的 IO 將一個文件通過 socket 寫出
File f = new File("helloword/data.txt"); RandomAccessFile file = new RandomAccessFile(file, "r");byte[] buf = new byte[(int)f.length()]; file.read(buf);Socket socket = ...; socket.getOutputStream().write(buf);Copy內部工作流如下
-
Java 本身并不具備 IO 讀寫能力,因此 read 方法調用后,要從 Java 程序的用戶態切換至內核態,去調用操作系統(Kernel)的讀能力,將數據讀入內核緩沖區。這期間用戶線程阻塞,操作系統使用 DMA(Direct Memory Access)來實現文件讀,其間也不會使用 CPU
-
DMA 也可以理解為硬件單元,用來解放 cpu 完成文件 IO
-
從內核態切換回用戶態,將數據從內核緩沖區讀入用戶緩沖區(即 byte[] buf),這期間?CPU 會參與拷貝,無法利用 DMA
-
調用 write 方法,這時將數據從用戶緩沖區(byte[] buf)寫入?socket 緩沖區,CPU 會參與拷貝
-
接下來要向網卡寫數據,這項能力 Java 又不具備,因此又得從用戶態切換至內核態,調用操作系統的寫能力,使用 DMA 將?socket 緩沖區的數據寫入網卡,不會使用 CPU
可以看到中間環節較多,java 的 IO 實際不是物理設備級別的讀寫,而是緩存的復制,底層的真正讀寫是操作系統來完成的
- 用戶態與內核態的切換發生了 3 次,這個操作比較重量級
- 數據拷貝了共 4 次
NIO 優化
通過?DirectByteBuf
- ByteBuffer.allocate(10)
- 底層對應 HeapByteBuffer,使用的還是 Java 內存
- ByteBuffer.allocateDirect(10)
- 底層對應DirectByteBuffer,使用的是操作系統內存
大部分步驟與優化前相同,唯有一點:Java 可以使用 DirectByteBuffer 將堆外內存映射到 JVM 內存中來直接訪問使用
- 這塊內存不受 JVM 垃圾回收的影響,因此內存地址固定,有助于 IO 讀寫
- Java 中的 DirectByteBuf 對象僅維護了此內存的虛引用,內存回收分成兩步
- DirectByteBuffer 對象被垃圾回收,將虛引用加入引用隊列
- 當引用的對象ByteBuffer被垃圾回收以后,虛引用對象Cleaner就會被放入引用隊列中,然后調用Cleaner的clean方法來釋放直接內存
- DirectByteBuffer 的釋放底層調用的是 Unsafe 的 freeMemory 方法
- 通過專門線程訪問引用隊列,根據虛引用釋放堆外內存
- DirectByteBuffer 對象被垃圾回收,將虛引用加入引用隊列
- 減少了一次數據拷貝,用戶態與內核態的切換次數沒有減少
進一步優化1
以下兩種方式都是零拷貝,即無需將數據拷貝到用戶緩沖區中(JVM內存中)
底層采用了?linux 2.1?后提供的?sendFile?方法,Java 中對應著兩個 channel 調用?transferTo/transferFrom?方法拷貝數據
-
Java 調用 transferTo 方法后,要從 Java 程序的用戶態切換至內核態,使用 DMA將數據讀入內核緩沖區,不會使用 CPU
-
數據從內核緩沖區傳輸到?socket 緩沖區,CPU 會參與拷貝
-
最后使用 DMA 將?socket 緩沖區的數據寫入網卡,不會使用 CPU
這種方法下
- 只發生了1次用戶態與內核態的切換
- 數據拷貝了 3 次
進一步優化2
linux 2.4?對上述方法再次進行了優化
-
Java 調用 transferTo 方法后,要從 Java 程序的用戶態切換至內核態,使用 DMA將數據讀入內核緩沖區,不會使用 CPU
-
只會將一些 offset 和 length 信息拷入?socket 緩沖區,幾乎無消耗
-
使用 DMA 將?內核緩沖區的數據寫入網卡,不會使用 CPU
整個過程僅只發生了1次用戶態與內核態的切換,數據拷貝了 2 次
4、AIO
AIO 用來解決數據復制階段的阻塞問題
- 同步意味著,在進行讀寫操作時,線程需要等待結果,還是相當于閑置
- 異步意味著,在進行讀寫操作時,線程不必等待結果,而是將來由操作系統來通過回調方式由另外的線程來獲得結果
異步模型需要底層操作系統(Kernel)提供支持
- Windows 系統通過 IOCP?實現了真正的異步 IO
- Linux 系統異步 IO 在 2.6 版本引入,但其底層實現還是用多路復用模擬了異步 IO,性能沒有優勢
總結
以上是生活随笔為你收集整理的中间件系列「三」netty之NIO基础的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Java注解系列】内置注解与AOP实现
- 下一篇: 【计算机是如何通信 四】Web服务器/S