Java IO 之 管道流 原理分析
概述
管道流是用來(lái)在多個(gè)線程之間進(jìn)行信息傳遞的Java流。
管道流分為字節(jié)流管道流和字符管道流。
字節(jié)管道流:PipedOutputStream 和 PipedInputStream。
字符管道流:PipedWriter 和 PipedReader。
PipedOutputStream、PipedWriter 是寫(xiě)入者/生產(chǎn)者/發(fā)送者;
PipedInputStream、PipedReader 是讀取者/消費(fèi)者/接收者。
字節(jié)管道流
這里我們只分析字節(jié)管道流,字符管道流原理跟字節(jié)管道流一樣,只不過(guò)底層一個(gè)是 byte 數(shù)組存儲(chǔ) 一個(gè)是 char 數(shù)組存儲(chǔ)的。
java的管道輸入與輸出實(shí)際上使用的是一個(gè)循環(huán)緩沖數(shù)來(lái)實(shí)現(xiàn)的。輸入流PipedInputStream從這個(gè)循環(huán)緩沖數(shù)組中讀數(shù)據(jù),輸出流PipedOutputStream往這個(gè)循環(huán)緩沖數(shù)組中寫(xiě)入數(shù)據(jù)。當(dāng)這個(gè)緩沖數(shù)組已滿的時(shí)候,輸出流PipedOutputStream所在的線程將阻塞;當(dāng)這個(gè)緩沖數(shù)組為空的時(shí)候,輸入流PipedInputStream所在的線程將阻塞。
注意事項(xiàng)
在使用管道流之前,需要注意以下要點(diǎn):
* 管道流僅用于多個(gè)線程之間傳遞信息,若用在同一個(gè)線程中可能會(huì)造成死鎖;
* 管道流的輸入輸出是成對(duì)的,一個(gè)輸出流只能對(duì)應(yīng)一個(gè)輸入流,使用構(gòu)造函數(shù)或者connect函數(shù)進(jìn)行連接;
* 一對(duì)管道流包含一個(gè)緩沖區(qū),其默認(rèn)值為1024個(gè)字節(jié),若要改變緩沖區(qū)大小,可以使用帶有參數(shù)的構(gòu)造函數(shù);
* 管道的讀寫(xiě)操作是互相阻塞的,當(dāng)緩沖區(qū)為空時(shí),讀操作阻塞;當(dāng)緩沖區(qū)滿時(shí),寫(xiě)操作阻塞;
* 管道依附于線程,因此若線程結(jié)束,則雖然管道流對(duì)象還在,仍然會(huì)報(bào)錯(cuò)“read dead end”;
* 管道流的讀取方法與普通流不同,只有輸出流正確close時(shí),輸出流才能讀到-1值。
示例
public class PipedStreamDemo {public static void main(String[] args) {//創(chuàng)建一個(gè)線程池ExecutorService executorService = Executors.newCachedThreadPool();try {//創(chuàng)建輸入和輸出管道流PipedOutputStream pos = new PipedOutputStream();PipedInputStream pis = new PipedInputStream(pos);//創(chuàng)建發(fā)送線程和接收線程Sender sender = new Sender(pos);Reciever reciever = new Reciever(pis);//提交給線程池運(yùn)行發(fā)送線程和接收線程executorService.execute(sender);executorService.execute(reciever);} catch (IOException e) {e.printStackTrace();}//通知線程池,不再接受新的任務(wù),并執(zhí)行完成當(dāng)前正在運(yùn)行的線程后關(guān)閉線程池。executorService.shutdown();try {//shutdown 后可能正在運(yùn)行的線程很長(zhǎng)時(shí)間都運(yùn)行不完成,這里設(shè)置超過(guò)1小時(shí),強(qiáng)制執(zhí)行 Interruptor 結(jié)束線程。executorService.awaitTermination(1, TimeUnit.HOURS);} catch (InterruptedException e) {e.printStackTrace();}}static class Sender extends Thread {private PipedOutputStream pos;public Sender(PipedOutputStream pos) {super();this.pos = pos;}@Overridepublic void run() {try {String s = "hello world, amazing java !";System.out.println("Sender:" + s);byte[] buf = s.getBytes();pos.write(buf, 0, buf.length);pos.close();TimeUnit.SECONDS.sleep(5);} catch (Exception e) {e.printStackTrace();}}}static class Reciever extends Thread {private PipedInputStream pis;public Reciever(PipedInputStream pis) {super();this.pis = pis;}@Overridepublic void run() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();byte[] buf = new byte[1024];int len = 0;while ((len = pis.read(buf)) != -1) {baos.write(buf, 0, len);}byte[] result = baos.toByteArray();String s = new String(result, 0, result.length);System.out.println("Reciever:" + s);} catch (IOException e) {e.printStackTrace();}}} }輸出結(jié)果:
源碼分析
因?yàn)閿?shù)據(jù)是從 PipedOutputStream 寫(xiě)入,然后通過(guò) PipedInputStream 讀取的,所以下面我們先來(lái)分析下 生產(chǎn)者 PipedOutputStream 的源碼。
PipedOutputStream 源碼分析
初始化
1、定義了一個(gè) PipedInputStream 成員變量 sink。用來(lái)保存需要寫(xiě)入到的目標(biāo)管道流中。
2、一個(gè)代參數(shù)的構(gòu)造,一個(gè)無(wú)參的構(gòu)造。
* 有參的構(gòu)造調(diào)用 connect() 方法把兩個(gè)管道流連接在一起,
* 無(wú)參的構(gòu)造函數(shù)更靈活,不必在創(chuàng)建一個(gè) PipedOutputStream 的對(duì)象時(shí)指定 PipedInputStream 對(duì)象,可以在后面代碼,自己調(diào)用 connect() 自己指定。使用方式如下:
write 方法
write 方法就是調(diào)用 PipedInputStream的 receive 的方法,把要寫(xiě)入的數(shù)據(jù)寫(xiě)入進(jìn)去。
PipedOutputStream 總結(jié)
通過(guò)源碼分析,發(fā)現(xiàn)該類沒(méi)有什么特別的,通過(guò)構(gòu)造或者 connect() 方法接收一個(gè) PipedInputStream對(duì)象,然后把要輸出信息,交給 PipedInputStream.receive() 方法去接收。
PipedInputStream 源碼分析
打開(kāi)該類后發(fā)現(xiàn)比 PipedInputStream 類復(fù)雜了好多。
類結(jié)構(gòu)
PipedInputStream 中定義了很多成員變量
1、closedByWriter 是否關(guān)閉 PipedOutputStream 流。
2、closedByReader 是否關(guān)閉 PipedInputStream 流。
3、connected 輸入輸出管道流是否成功連接了。
4、readSide、writeSide 讀線程和寫(xiě)線程
5、DEFAULT_PIPE_SIZE 默認(rèn)讀寫(xiě)的緩沖區(qū)大小為 1024.
6、PIPE_SIZE 對(duì)外暴露管道流的讀寫(xiě)緩沖區(qū)大小(當(dāng)前包可見(jiàn))
7、buffer 緩沖區(qū)大小
8、in 寫(xiě)入緩沖區(qū)下標(biāo)
9、out 寫(xiě)出緩沖區(qū)下標(biāo)
PipedInputStream 構(gòu)造及初始化
PipedInputStream 支持有4種構(gòu)造方法。
1、public PipedInputStream(PipedOutputStream src)
傳入一個(gè) PipedOutputStream 參數(shù),并調(diào)用 initPipe() 方法創(chuàng)建默認(rèn)大小(1024)的 buffer。
2、public PipedInputStream(PipedOutputStream src, int pipeSize)
傳入一個(gè) PipedOutputStream 參數(shù)和 pipeSize參數(shù),調(diào)用 initPipe() 方法創(chuàng)建指定大小的 buffer
3、public PipedInputStream()
調(diào)用 initPipe() 方法,創(chuàng)建一個(gè)默認(rèn)大小的buffer
4、public PipedInputStream(int pipeSize)
調(diào)用 initPipe() 方法,創(chuàng)建一個(gè)指定大小的bufferinitPipe 方法
private void initPipe(int pipeSize)
根據(jù) pipeSize 創(chuàng)建 buffer 。connect 方法
public void connect(PipedOutputStream src)
connect方法其實(shí)還是調(diào)用的 PipedOutputStream 類種的 connect 方法。
所以下面這樣寫(xiě)法,是等價(jià)的,都是調(diào)用 PipedOutputStream 類種的 connect 方法。
receive 方法
通過(guò)分析 PipedOutputStream 的源碼,我們知道,該方法是在 PipedOutputStream.write() 方法種調(diào)用的。
* 1、checkStateForReceive()檢查是否可以接受數(shù)據(jù)。(是否可向 buffer 種寫(xiě)入數(shù)據(jù));
* 2、獲取寫(xiě)線程。PipedOutputStream.write() 中調(diào)用的,所以獲取的是PipedOutStream 所在的線程;
* 3、判斷 in==out。如果相等說(shuō)明,已經(jīng)緩沖區(qū)已經(jīng)被填充滿數(shù)據(jù)了。這時(shí)調(diào)用 awaitSpace() 方法,喚醒讀線程(讀線程可能 wait 狀態(tài)),讓當(dāng)前線程 wait ,如果沒(méi)有讀線程喚醒寫(xiě)線程,那么寫(xiě)線程會(huì)在 awaitSpace() 方法種每隔1秒檢查一次是否可寫(xiě);
為什么 in == out 的時(shí)候就是寫(xiě)滿緩沖區(qū)呢?
比如: buffer 長(zhǎng)度為10,現(xiàn)在寫(xiě)了5個(gè)字節(jié),又讀了5個(gè)字節(jié),是不是 in 也等于 out?
其實(shí)不會(huì)的,為什么?
因?yàn)樽x的時(shí)候如果 in==out時(shí),他把 in 的值置為了 -1。詳見(jiàn) read() 方法。
* 4、如果 in<0,就是第一次寫(xiě)或者已經(jīng)讀完 buffer 中已寫(xiě)的數(shù)據(jù),這是,把 in 和 out 置為0;
* 5、向buffer 種寫(xiě)入數(shù)據(jù)。
* 6、如果 in 達(dá)到 buffer 的最大長(zhǎng)度,則把in 置為 0, 下次開(kāi)始從0 開(kāi)始填充。(這里,可以把 buffer 當(dāng)成一個(gè)環(huán)形隊(duì)列)。
awaitSpace() 源碼
read() 方法
1、執(zhí)行各種檢查,是否可讀。
2、獲取讀線程并賦值給 readSide 變量。
3、while 循環(huán)監(jiān)聽(tīng)判斷是否有寫(xiě)線程寫(xiě)數(shù)據(jù),如果沒(méi)有則等待(每秒檢查一次),并喚醒寫(xiě)線程(寫(xiě)線程可能 wait )。
4、讀取 buffer 中的數(shù)據(jù)。 如果讀到 buffer 的最后一個(gè)元素,則把 out 置為0,下次從下標(biāo)0開(kāi)始繼續(xù)讀(循環(huán)隊(duì)列表)。
5、如果 in == out,則把 in 置為 -1 。置為初始狀態(tài)。相當(dāng)于清空了緩沖區(qū),從緩沖區(qū)的下標(biāo) 0 開(kāi)始讀寫(xiě)。
available() 方法
獲取當(dāng)前可讀的字節(jié)數(shù)
1、如果 in<0; 說(shuō)明當(dāng)前沒(méi)有可讀的數(shù)據(jù)
2、如果 in == out; 說(shuō)明數(shù)據(jù)已經(jīng)填充滿了。
3、如果 in > out; 那么in - out 就是 可寫(xiě)的字節(jié)數(shù)。
4、否則,就是 in < out 的情況。因?yàn)樗黔h(huán)形寫(xiě)入的,可能出現(xiàn) in < out 的情況,所以需要 in + buffer.length - out,才能獲取可讀字節(jié)長(zhǎng)度。
PipedInputStream 總結(jié)
PipedInputStream 原理其實(shí)也很簡(jiǎn)單,但代碼看起來(lái)有點(diǎn)懵,它就是通過(guò) wait() 和 notifyAll() 來(lái)控制 buffer 是否可讀,或可寫(xiě)的。
管道流,做開(kāi)發(fā)這么多年,現(xiàn)在都沒(méi)有遇到可用的場(chǎng)景。管道流能用到的場(chǎng)景,在并發(fā)包種,很多方式都可以實(shí)現(xiàn)或代替。比如 java.util.concurrent.Exchanger 類。
java.util.concurrent.Exc
hanger 的使用場(chǎng)景比管道流使用場(chǎng)景更廣泛些。
想了解更多精彩內(nèi)容請(qǐng)關(guān)注我的公眾號(hào)
本人簡(jiǎn)書(shū)blog地址:http://www.jianshu.com/u/1f0067e24ff8????
點(diǎn)擊這里快速進(jìn)入簡(jiǎn)書(shū)
GIT地址:http://git.oschina.net/brucekankan/
點(diǎn)擊這里快速進(jìn)入GIT
總結(jié)
以上是生活随笔為你收集整理的Java IO 之 管道流 原理分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Java IO 之 SequenceIn
- 下一篇: Java使用SFTP和FTP两种连接服务