Java源码解析——Java IO包
一、基礎知識:
1. Java IO一般包含兩個部分:1)java.io包中阻塞型IO;2)java.nio包中的非阻塞型IO,通常稱為New IO。這里只考慮到java.io包中堵塞型IO;
2. Java.io包簡單地分類。
2.1 Java的IO主要包含三個部分:
1)流式部分――IO的主體部分;
2)非流式部分――主要包含一些輔助流式部分的類,如:File類、RandomAccessFile類和FileDescriptor等類;
3)文件讀取部分的與安全相關的類,如:SerializablePermission類。以及與本地操作系統相關的文件系統的類,如:FileSystem類和Win32FileSystem類和WinNTFileSystem類。
2.2 流式部分可以概括:
1)字節流(Byte Stream)和字符流(Char Stream)的對應;
2)輸入和輸出的對應。
3)從字節流到字符流的橋梁。對應于輸入和輸出為InputStreamReader和OutputStreamWriter。
2.3?流的具體類中又可以具體分為:
?????? 1)介質流(Media Stream或者稱為原始流Raw Stream)――主要指一些基本的流,他們主要是從具體的介質上,如:文件、內存緩沖區(Byte數組、Char數組、StringBuffer對象)等,讀取數據;
?????? 2)過濾流(Filter Stream)――主要指所有FilterInputStream/FilterOutputStream和FilterReader/FilterWriter的子類,主要是對其包裝的類進行某些特定的處理,如:緩存等。
2.4?節點流和處理流
1)節點流是FileInputStream、ByteArrayInputStream這些直接從某個地方獲取流的類;
?????? 2)處理流則是BufferedInputStream這種可以裝飾節點流,來實現特定功能的類。因此,節點流可以理解為裝飾者模式中的被裝飾者,處理流則是裝飾者。
?
類圖可以參考博客:https://blog.csdn.net/u013063153/article/category/6399747
?
二、類的分析
?
1、輸入字節流
?
IO中輸入字節流的繼承圖:
-InputStream-ByteArrayInputStream //將內存中的Byte數組適配為一個InputStream-FileInputStream //最基本的文件輸入流。主要用于從文件中讀取信息-FilterInputStream //給其它被裝飾對象提供額外功能的抽象類-BufferedInputStream //使用該對象阻止每次讀取一個字節都會頻繁操作IO。將字節讀取一個緩存區,從緩存區讀取。-DataInputStream //使用它可以讀出基本數據類型-LineNumberInputStream //跟蹤輸入流中的行號。可以得到和設置行號。-PushbackInputStream //可以在讀取最后一個byte 后將其放回到緩存中。-ObjectInputStream-PipedInputStream //在流中實現了管道的概念讀取PipedOutputStream寫入的數據。-SequenceInputStream //將2個或者多個InputStream 對象轉變為一個InputStream-StringBufferInputStream //將內存中的字符串適配為一個InputStream(廢棄)1)InputStream是抽象類,是所有字節輸入流的超類。
2)ByteArrayInputStream、StringBufferInputStream、FileInputStream是三種基本的介質流,它們分別將Byte數組、StringBuffer、和本地文件中讀取數據。PipedInputStream是從與其它線程共用的管道中讀取數據;
3)ObjectInputStream和所有FilterInputStream的子類都是裝飾流(裝飾器模式的主角)。
4)FileInputStream 文件輸入流,用于讀取本地文件中的字節數據。
?
2. IO中的輸出字節流
?
?IO中輸出字節流的繼承圖:
-OutputStream-ByteArrayOutputStream //在內存中創建一個buffer。所有寫入此流中的數據都被放入到此buffer中。-FileOutputStream //將信息寫入文件中。-FilterOutputStream //實現OutputStream裝飾器功能的抽象類。 -BufferedOutputStream //使用該對象阻止每次讀取一個字節都會頻繁操作IO。將字節讀取一個緩存區,從緩存區讀取。-DataOutputStream //使用它可以寫入基本數據類型。 -PrintStream //產生具有格式的輸出信息。(一般地在java程序中DataOutputStream用于數據的存儲,即J2EE中持久層完成的功能,PrintStream完成顯示的功能,類似于J2EE中表現層的功能)-BufferedOutputStream //使用它可以避免頻繁地向IO寫入數據,數據一般都寫入一個緩存區,在調用flush方法后會清空緩存、一次完成數據的寫入。 -PipedOutputStream //任何寫入此對象的信息都被放入對應PipedInputStream 對象的緩存中,從而完成線程的通信,實現了“管道”的概念。1)OutputStream是所有的輸出字節流的父類,它是一個抽象類。
2)ByteArrayOutputStream、FileOutputStream是兩種基本的介質流,它們分別向Byte數組、和本地文件中寫入數據。PipedOutputStream是向與其它線程共用的管道中寫入數據。
3)ObjectOutputStream和所有FilterOutputStream的子類都是裝飾流。
?
3.?字節流的輸入與輸出的對應
1)LineNumberInputStream主要完成從流中讀取數據時,會得到相應的行號,至于什么時候分行、在哪里分行是由改類主動確定的,并不是在原始中有這樣一個行號。在輸出部分沒有對應的部分,我們完全可以自己建立一個LineNumberOutputStream,在最初寫入時會有一個基準的行號,以后每次遇到換行時會在下一行添加一個行號,看起來也是可以的。
2)PushbackInputStream的功能是查看最后一個字節,不滿意就放入緩沖區。主要用在編譯器的語法、詞法分析部分。輸出部分的BufferedOutputStream幾乎實現相近的功能。
3)SequenceInputStream可以認為是一個工具類,將兩個或者多個輸入流當成一個輸入流依次讀取。完全可以從IO包中去除,還完全不影響IO包的結構,卻讓其更“純潔”――純潔的Decorator模式。
4)PrintStream也可以認為是一個輔助工具。主要可以向其他輸出流,或者FileInputStream寫入數據,本身內部實現還是帶緩沖的。本質上是對其它流的綜合運用的一個工具而已。一樣可以踢出IO包!System.out和System.out就是PrintStream的實例!
5)ObjectInputStream/ObjectOutputStream和DataInputStream/DataOutputStream主要是要求寫對象/數據和讀對象/數據的次序要保持一致,否則輕則不能得到正確的數據,重則拋出異常;
6)PipedInputStream/PipedOutputStream在創建時一般就一起創建,調用它們的讀寫方法時會檢查對方是否存在,或者關閉。
?
4. 輸入字符流
IO中輸入字符流的繼承圖:
-Reader-BufferedReader-LineNumberReader-CharArrayReader-FilterReader-PushbackReader-InputStreamReader-FileReader-PipedReader-StringReader1)Reader是所有的輸入字符流的父類,它是一個抽象類。
2)CharReader、StringReader是兩種基本的介質流,它們分別將Char數組、String中讀取數據。PipedReader是從與其它線程共用的管道中讀取數據。
3)BufferedReader很明顯就是一個裝飾器,它和其子類負責裝飾其它Reader對象。
4)FilterReader是所有自定義具體裝飾流的父類,其子類PushbackReader對Reader對象進行裝飾,會增加一個行號。
5)InputStreamReader是字節流向字符流轉化的橋梁,它使用指定的 charset 讀取字節并將其解碼為字符。它使用的字符集可以由名稱指定或顯式給定,或者可以接受平臺默認的字符集。其構造方法的默認參數為InputStream 對象。
使用方法:
InputStreamReader(InputStream in),InputStreamReader(Inputstreamin, charset cs)
為了達到最高效率,InputStreamReader通常用法為:
BufferedReader in = new BufferedReader(newInputStreamReader(System.in));BufferedReader:緩沖輸入流,包裝其他字符輸入流,提高讀取效率,從字符輸入流中讀取文本,緩沖各個字符,從而實現字符、數組和行的高效讀取。Reader的讀取操作開銷大,為提高效率使用BufferedReader包裝其他Reader(如FileReader和InputStreamReader)
?
5.?輸出字符流:
IO中輸出字符流的繼承圖:
-Writer-BufferedWriter-CharArrayWriter-FilterWriter-OutputStreamWriter-FileWriter-PipedWriter-PrintWriter-StringWriter1)Writer是所有的輸出字符流的父類,它是一個抽象類。
2)CharArrayWriter、StringWriter是兩種基本的介質流,它們分別向Char數組、String中寫入數據。PipedWriter是向與其它線程共用的管道中寫入數據。
3) BufferedWriter是一個裝飾器為Writer提供緩沖功能。
4)PrintWriter和PrintStream極其類似,功能和使用也非常相似。
5)OutputStreamWriter:是字符流通向字節流的橋梁,它使用指定的 charset 讀取字符并將其解碼為字節。它使用的字符集可以由名稱指定或顯式給定,或者可以接受平臺默認的字符集。
使用方法:
其構造方法的默認參數為OutputStream 對象,
OutputStreamReader(InputStream in), OutputStreamReader(Inputstream in, charset cs)為了達到最高效率,OutputStreamReader通常用法為:
BufferedWriter out = new BufferedWriter(newOutputStreamWriter(System.in));BufferedWriter:緩沖輸出流,包裝其他字符輸出流,提高讀取效率,將文本寫入字符輸出流,緩沖各個字符,從而提供單個字符、數組和字符串的高效寫入Writer的讀取操作開銷大,為提高效率使用BufferedWriter包裝其他Writer(如FileWriter和InputStreamWriter)
?
6. 序列化與反序列化:ObjectInputStream,ObjectOutputStream
JAVA提出序列化是為了將對象在ObjectOutputStream:對象輸出流,它的writeObject(Object obj)方法可以對參數指定的obj對象進行序列化,把得到的字節序列寫到一個目標輸出流中。
序列化過程:
File file = new File(“path”); OutputStream fos = new FileOutputStream(file); ObjectOutputStream oos = new ObjectOutputStream(“fos”);將輸出流對象輸出到file對象中。 oos.writeObject(Object obj); oos.flush(); oos.close();ObjectInputStream:對象輸入流,它的readObject()方法可以序列化文件進行反序列化,把字節序列文件轉化為對象。
反序列化過程:
File file = new File(“path”) InputStream fis = new FileInputStream(file); ObjectInputStream ois = new ObjectInputStream(fis); Class obj = (class)ois.readObject(); Ois.close();實現序列化的兩種方式:
1)類實現Serializable接口,類只有實現了serializable接口,ObjectOutputstream才會去將類的對象序列化,否則會拋出NotSerializableException異常
2)類繼承Externalizable類。
?
三、主要源碼實現:
1.InputStream:
public abstract class InputStream implements Closeable {private static final int SKIP_BUFFER_SIZE = 2048; //用于skip方法,和skipBuffer相關private static byte[] skipBuffer; // skipBuffer is initialized in skip(long), if needed.//從輸入流中讀取下一個字節,//正常返回0-255,到達文件的末尾返回-1//在流中還有數據,但是沒有讀到時該方法會阻塞(block)//Java IO和New IO的區別就是阻塞流和非阻塞流//抽象方法!不同的子類不同的實現!public abstract int read() throws IOException; //將流中的數據讀入放在byte數組的第off個位置先后的len個位置中//放回值為放入字節的個數。//這個方法在利用抽象方法read,某種意義上簡單的Templete模式。public int read(byte b[], int off, int len) throws IOException {//檢查輸入是否正常。一般情況下,檢查輸入是方法設計的第一步if (b == null) { throw new NullPointerException();} else if (off < 0 || len < 0 || len > b.length - off) {throw new IndexOutOfBoundsException();} else if (len == 0) {return 0;} //讀取下一個字節int c = read();//到達文件的末端返回-1if (c == -1) { return -1; }//放回的字節downcast b[off] = (byte)c;//已經讀取了一個字節 int i = 1; try {//最多讀取len個字節,所以要循環len次for (; i < len ; i++) {//每次循環從流中讀取一個字節//由于read方法阻塞,//所以read(byte[],int,int)也會阻塞c = read();//到達末尾,理所當然放回-1 if (c == -1) { break; } //讀到就放入byte數組中b[off + i] = (byte)c;}} catch (IOException ee) { }return i;}//利用上面的方法read(byte[] b)public int read(byte b[]) throws IOException {return read(b, 0, b.length);} //方法內部使用的、表示要跳過的字節數目,public long skip(long n) throws IOException {long remaining = n; int nr;if (skipBuffer == null)//初始化一個跳轉的緩存skipBuffer = new byte[SKIP_BUFFER_SIZE]; //本地化的跳轉緩存byte[] localSkipBuffer = skipBuffer; //檢查輸入參數,應該放在方法的開始 if (n <= 0) { return 0; } //一共要跳過n個,每次跳過部分,循環 while (remaining > 0) { nr = read(localSkipBuffer, 0, (int) Math.min(SKIP_BUFFER_SIZE, remaining));//利用上面的read(byte[],int,int)方法盡量讀取n個字節 //讀到流的末端,則返回if (nr < 0) { break; }//沒有完全讀到需要的,則繼續循環remaining -= nr; } return n - remaining;//返回時要么全部讀完,要么因為到達文件末端,讀取了部分 }//查詢流中還有多少可以讀取的字節//該方法不會block。在java中抽象類方法的實現一般有以下幾種方式://1.拋出異常(java.util);2.“弱”實現。像上面這種。子類在必要的時候覆蓋它。//3.“空”實現。public int available() throws IOException { return 0;}//關閉當前流、同時釋放與此流相關的資源//關閉當前流、同時釋放與此流相關的資源public void close() throws IOException {}//markSupport可以查詢當前流是否支持markpublic synchronized void mark(int readlimit) {}//對mark過的流進行復位。只有當流支持mark時才可以使用此方法。public synchronized void reset() throws IOException {throw new IOException("mark/reset not supported");} //查詢是否支持mark//絕大部分不支持,因此提供默認實現,返回false。子類有需要可以覆蓋。public boolean markSupported() { return false;} }?
2.FilterInputStream
?這是字節輸入流部分裝飾器模式的核心。是在裝飾器模式中的Decorator對象,主要完成對其它流裝飾的基本功能:
public class FilterInputStream extends InputStream {//裝飾器的代碼特征:被裝飾的對象一般是裝飾器的成員變量protected volatile InputStream in; //將要被裝飾的字節輸入流protected FilterInputStream(InputStream in) { //通過構造方法傳入此被裝飾的流this.in = in;}//下面這些方法,完成最小的裝飾――0裝飾,只是調用被裝飾流的方法而已public int read() throws IOException {return in.read();}public int read(byte b[]) throws IOException {return read(b, 0, b.length);}public int read(byte b[], int off, int len) throws IOException {return in.read(b, off, len);}public long skip(long n) throws IOException {return in.skip(n);}public int available() throws IOException {return in.available();}public void close() throws IOException {in.close();}public synchronized void mark(int readlimit) {in.mark(readlimit);}public synchronized void reset() throws IOException {in.reset();}public boolean markSupported() {return in.markSupported();} }ByteArray到ByteArrayInputStream的適配:
ByteArrayInputStream內部有一個byte類型的buffer。很典型的適配器模式的應用――將byte數組適配流的接口。
public class ByteArrayInputStream extends InputStream {protected byte buf[]; //內部的buffer,一般通過構造器輸入protected int pos; //當前位置的cursor。從0至byte數組的長度。//byte[pos]就是read方法讀取的字節protected int mark = 0; //mark的位置。protected int count; //流中字節的數目。//構造器,從一個byte[]創建一個ByteArrayInputStreampublic ByteArrayInputStream(byte buf[]) {//初始化流中的各個成員變量this.buf = buf; this.pos = 0;this.count = buf.length;}//構造器public ByteArrayInputStream(byte buf[], int offset, int length) { this.buf = buf;this.pos = offset; //與上面不同this.count = Math.min(offset + length, buf.length);this.mark = offset; //與上面不同 }//從流中讀取下一個字節public synchronized int read() { //返回下一個位置的字節//流中沒有數據則返回-1return (pos < count) ? (buf[pos++] & 0xff) : -1; }// ByteArrayInputStream要覆蓋InputStream中可以看出其提供了該方法的實現//某些時候,父類不能完全實現子類的功能,父類的實現一般比較通用。//當子類有更有效的方法時,我們會覆蓋這些方法。public synchronized int read(byte b[], int off, int len) {//首先檢查輸入參數的狀態是否正確if(b==null){ throw new NullPointerException();} else if (off < 0 || len < 0 || len > b.length - off) {throw new IndexOutOfBoundsException();}if (pos >= count) { return -1; }if (pos + len > count) { len = count - pos; }if (len <= 0) { return 0; }//java中提供數據復制的方法//出于速度的原因!他們都用到System.arraycopy方法 System.arraycopy(buf, pos, b, off, len); pos += len;return len;}//下面這個方法,在InputStream中也已經實現了。//但是當時是通過將字節讀入一個buffer中實現的,好像效率低了一點。//比InputStream中的方法簡單、高效public synchronized long skip(long n) {//當前位置,可以跳躍的字節數目if (pos + n > count) { n = count - pos; } //小于0,則不可以跳躍if (n < 0) { return 0; } //跳躍后,當前位置變化 pos += n; return n;} //查詢流中還有多少字節沒有讀取。 public synchronized int available() {return count - pos;}//ByteArrayInputStream支持mark所以返回truepublic boolean markSupported() { return true;} //在流中當前位置mark。 public void mark(int readAheadLimit) { mark = pos;}//重置流。即回到mark的位置。public synchronized void reset() {pos = mark;}//關閉ByteArrayInputStream不會產生任何動作。public void close() throws IOException { } }?
3.BufferedInputStream//該類主要完成對被包裝流,加上一個緩存的功能
public class BufferedInputStream extends FilterInputStream {private static int defaultBufferSize = 8192; //默認緩存的大小protected volatile byte buf[]; //內部的緩存protected int count; //buffer的大小protected int pos; //buffer中cursor的位置protected int markpos = -1; //mark的位置protected int marklimit; //mark的范圍//原子性更新。和一致性編程相關private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =AtomicReferenceFieldUpdater.newUpdater (BufferedInputStream.class, byte[].class,"buf");//檢查輸入流是否關閉,同時返回被包裝流private InputStream getInIfOpen() throws IOException {InputStream input = in;if (input == null) throw new IOException("Stream closed");return input;}//檢查buffer的狀態,同時返回緩存private byte[] getBufIfOpen() throws IOException { byte[] buffer = buf;//不太可能發生的狀態if (buffer == null) throw new IOException("Stream closed"); return buffer;}//構造器 public BufferedInputStream(InputStream in) {//指定默認長度的bufferthis(in, defaultBufferSize); }//構造器public BufferedInputStream(InputStream in, int size) { super(in);//檢查輸入參數if(size<=0){throw new IllegalArgumentException("Buffer size <= 0");}//創建指定長度的bufferbuf = new byte[size]; }//從流中讀取數據,填充如緩存中。private void fill() throws IOException {//得到bufferbyte[] buffer = getBufIfOpen(); if (markpos < 0)//mark位置小于0,此時pos為0pos = 0;//pos大于buffer的長度 else if (pos >= buffer.length) if (markpos > 0) { int sz = pos - markpos;System.arraycopy(buffer, markpos, buffer, 0, sz);pos = sz;markpos = 0;} else if (buffer.length >= marklimit) { //buffer的長度大于marklimit時,mark失效markpos = -1; //丟棄buffer中的內容
pos = 0; }else{ //buffer的長度小于marklimit時對buffer擴容int nsz = pos * 2;if (nsz > marklimit) nsz = marklimit;//擴容為原來的2倍,太大則為marklimit大小byte nbuf[] = new byte[nsz]; //將buffer中的字節拷貝如擴容后的buf中 System.arraycopy(buffer, 0, nbuf, 0, pos); if (!bufUpdater.compareAndSet(this, buffer, nbuf)) { //在buffer在被操作時,不能取代此bufferthrow new IOException("Stream closed");}//將新buf賦值給bufferbuffer = nbuf; }count = pos;int n = getInIfOpen().read(buffer, pos, buffer.length - pos);if (n > 0) count = n + pos;}//讀取下一個字節public synchronized int read() throws IOException { //到達buffer的末端 if (pos >= count) { //就從流中讀取數據,填充buffer fill();
//讀過一次,沒有數據則返回-1if (pos >= count) return -1; }//返回buffer中下一個位置的字節return getBufIfOpen()[pos++] & 0xff; }//將數據從流中讀入buffer中private int read1(byte[] b, int off, int len) throws IOException { int avail = count - pos; //buffer中還剩的可讀字符 //buffer中沒有可以讀取的數據時if(avail<=0){ //將輸入流中的字節讀入b中if (len >= getBufIfOpen().length && markpos < 0) { return getInIfOpen().read(b, off, len);}fill();//填充 avail = count - pos;if (avail <= 0) return -1;}//從流中讀取后,檢查可以讀取的數目int cnt = (avail < len) ? avail : len; //將當前buffer中的字節放入b的末端 System.arraycopy(getBufIfOpen(), pos, b, off, cnt); pos += cnt;return cnt;}public synchronized int read(byte b[], int off, int len)throws IOException {getBufIfOpen();
// 檢查buffer是否open//檢查輸入參數是否正確if ((off | len | (off + len) | (b.length - (off + len))) < 0) { throw new IndexOutOfBoundsException();} else if (len == 0) {return 0;}int n = 0;for (;;) {int nread = read1(b, off + n, len - n);if (nread <= 0) return (n == 0) ? nread : n;n += nread;if (n >= len) return n;InputStream input = in;if (input != null && input.available() <= 0) return n;}}public synchronized long skip(long n) throws IOException {// 檢查buffer是否關閉 getBufIfOpen(); //檢查輸入參數是否正確if (n <= 0) { return 0; } //buffered中可以讀取字節的數目long avail = count - pos; //可以讀取的小于0,則從流中讀取if (avail <= 0) { //mark小于0,則mark在流中 if (markpos <0) return getInIfOpen().skip(n); // 從流中讀取數據,填充緩沖區。 fill(); //可以讀的取字節為buffer的容量減當前位置avail = count - pos; if (avail <= 0) return 0;} long skipped = (avail < n) ? avail : n; pos += skipped;
//當前位置改變return skipped;}//該方法不會block!返回流中可以讀取的字節的數目。//該方法的返回值為緩存中的可讀字節數目加流中可讀字節數目的和public synchronized int available() throws IOException {return getInIfOpen().available() + (count - pos); }//當前位置處為mark位置public synchronized void mark(int readlimit) { marklimit = readlimit;markpos = pos;}public synchronized void reset() throws IOException {// 緩沖去關閉了,肯定就拋出異常!程序設計中經常的手段 getBufIfOpen();if (markpos < 0) throw new IOException("Resetting to invalid mark");pos = markpos;}//該流和ByteArrayInputStream一樣都支持markpublic boolean markSupported() { return true;}//關閉當前流同時釋放相應的系統資源。public void close() throws IOException {byte[] buffer;while ( (buffer = buf) != null) {if (bufUpdater.compareAndSet(this, buffer, null)) {InputStream input = in;in = null;if (input != null) input.close();return;}// Else retry in case a new buf was CASed in fill() }} }
?
4.PipedOutputStream
?
PipedOutputStream一般必須和一個PipedInputStream連接。共同構成一個pipe。即必須連接輸入部分。
其原理為:PipedInputStream內部有一個Buffer, PipedInputStream可以使用InputStream的方法讀取其Buffer中的字節。PipedInputStream中Buffer中的字節是PipedOutputStream調用PipedInputStream的方法放入的。
public class PipedOutputStream extends OutputStream {//包含一個PipedInputStreamprivate PipedInputStream sink; //帶有目的地的構造器public PipedOutputStream(PipedInputStream snk)throws IOException { connect(snk);}//默認構造器,必須使用下面的connect方法連接public PipedOutputStream() { }public synchronized void connect(PipedInputStream snk) throws IOException {//檢查輸入參數的正確性if(snk==null){ throw new NullPointerException();} else if (sink != null || snk.connected) {throw new IOException("Already connected");}//一系列初始化工作sink = snk; snk.in = -1;snk.out = 0;snk.connected = true;} //向流中寫入數據public void write(int b) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); }//本質上是,調用PipedInputStream的receive方法接受此字節 sink.receive(b); }public void write(byte b[], int off, int len) throws IOException {//首先檢查輸入參數的正確性if (sink == null) { throw new IOException("Pipe not connected");} else if (b == null) {throw new NullPointerException();} else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {throw new IndexOutOfBoundsException();} else if (len == 0) {return;}//調用PipedInputStream的receive方法接受 sink.receive(b, off, len); }//flush輸出流public synchronized void flush() throws IOException { if (sink != null) {//本質是通知輸入流,可以讀取synchronized (sink) { sink.notifyAll(); } }}//關閉流同時釋放相關資源public void close() throws IOException { if (sink != null) { sink.receivedLast(); }} }PipedInputStream
public class PipedInputStream extends InputStream {//標識有讀取方或寫入方關閉boolean closedByWriter = false; volatile boolean closedByReader = false; //是否建立連接boolean connected = false; //標識哪個線程 Thread readSide; Thread writeSide;//緩沖區的默認大小protected static final int PIPE_SIZE = 1024; //緩沖區protected byte buffer[] = new byte[PIPE_SIZE]; //下一個寫入字節的位置。0代表空,in==out代表滿protected int in = -1; //下一個讀取字節的位置 protected int out = 0; //給定源的輸入流public PipedInputStream(PipedOutputStream src) throws IOException { connect(src);}//默認構造器,下部一定要connect源public PipedInputStream() { } //連接輸入源public void connect(PipedOutputStream src) throws IOException { //調用源的connect方法連接當前對象src.connect(this); }//只被PipedOuputStream調用protected synchronized void receive(int b) throws IOException { //檢查狀態,寫入 checkStateForReceive(); //永遠是PipedOuputStreamwriteSide = Thread.currentThread(); //輸入和輸出相等,等待空間if (in == out) awaitSpace(); if (in < 0) {in = 0;out = 0;}//放入buffer相應的位置buffer[in++] = (byte)(b & 0xFF); //in為0表示buffer已空if (in >= buffer.length) { in = 0; } }synchronized void receive(byte b[], int off, int len) throws IOException {checkStateForReceive();//從PipedOutputStream可以看出writeSide = Thread.currentThread(); int bytesToTransfer = len;while (bytesToTransfer > 0) {//滿了,會通知讀取的;空會通知寫入if (in == out) awaitSpace(); int nextTransferAmount = 0;if (out < in) {nextTransferAmount = buffer.length - in;} else if (in < out) {if (in == -1) {in = out = 0;nextTransferAmount = buffer.length - in;} else {nextTransferAmount = out - in;}}if (nextTransferAmount > bytesToTransfer) nextTransferAmount = bytesToTransfer;assert(nextTransferAmount > 0);System.arraycopy(b, off, buffer, in, nextTransferAmount);bytesToTransfer -= nextTransferAmount;off += nextTransferAmount;in += nextTransferAmount;if (in >= buffer.length) { in = 0; }}}//檢查當前狀態,等待輸入private void checkStateForReceive() throws IOException { if (!connected) {throw new IOException("Pipe not connected");} else if (closedByWriter || closedByReader) {throw new IOException("Pipe closed");} else if (readSide != null && !readSide.isAlive()) {throw new IOException("Read end dead");}}//Buffer已滿,等待一段時間private void awaitSpace() throws IOException { //in==out表示滿了,沒有空間while (in == out) { //檢查接受端的狀態 checkStateForReceive(); //通知讀取端 notifyAll(); try {wait(1000);} catch (InterruptedException ex) {throw new java.io.InterruptedIOException();}}}//通知所有等待的線程()已經接受到最后的字節synchronized void receivedLast() { closedByWriter = true; // notifyAll();}public synchronized int read() throws IOException {//檢查一些內部狀態if (!connected) { throw new IOException("Pipe not connected");} else if (closedByReader) {throw new IOException("Pipe closed");} else if (writeSide != null && !writeSide.isAlive()&& !closedByWriter && (in < 0)) {throw new IOException("Write end dead");}//當前線程讀取readSide = Thread.currentThread(); //重復兩次???int trials = 2; while (in < 0) {//輸入斷關閉返回-1if (closedByWriter) { return -1; } //狀態錯誤if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken");}notifyAll(); // 空了,通知寫入端可以寫入 try {wait(1000);} catch (InterruptedException ex) {throw new java.io.InterruptedIOException();}}int ret = buffer[out++] & 0xFF; if (out >= buffer.length) { out = 0; }//沒有任何字節if (in == out) { in = -1; } return ret;}public synchronized int read(byte b[], int off, int len) throws IOException {//檢查輸入參數的正確性if (b == null) { throw new NullPointerException();} else if (off < 0 || len < 0 || len > b.length - off) {throw new IndexOutOfBoundsException();} else if (len == 0) {return 0;}//讀取下一個int c = read(); //已經到達末尾了,返回-1if (c < 0) { return -1; } //放入外部buffer中b[off] = (byte) c; //return-lenint rlen = 1; //下一個in存在,且沒有到達lenwhile ((in >= 0) && (--len > 0)) { //依次放入外部bufferb[off + rlen] = buffer[out++]; rlen++;//讀到buffer的末尾,返回頭部if (out >= buffer.length) { out = 0; } //讀、寫位置一致時,表示沒有數據if (in == out) { in = -1; } }//返回填充的長度return rlen; }//返回還有多少字節可以讀取public synchronized int available() throws IOException { //到達末端,沒有字節if(in < 0)return 0; else if(in == out)//寫入的和讀出的一致,表示滿return buffer.length; else if (in > out)//寫入的大于讀出return in - out; else//寫入的小于讀出的return in + buffer.length - out; }//關閉當前流,同時釋放與其相關的資源public void close() throws IOException { //表示由輸入流關閉closedByReader = true; //同步化當前對象,in為-1synchronized (this) { in = -1; } }}?
轉載于:https://www.cnblogs.com/winterfells/p/8745297.html
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的Java源码解析——Java IO包的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java笔记(3):String(2)
- 下一篇: 20165234 《Java程序设计》第