java如何阻塞和同步_同步与异步,阻塞与非阻塞
今天早上關(guān)注了這個(gè)問(wèn)題,剛抽出時(shí)間大概整理下,以下僅是個(gè)人理解:
一定要多看幾遍代碼并結(jié)合文字理解下
引0、從I/O說(shuō)起
這些概念之所以容易令人迷惑,在于很多人對(duì)I/O就沒(méi)有清晰準(zhǔn)確的理解,后面的理解自然不可能正確。我想用一個(gè)具體的例子來(lái)說(shuō)明一下I/O。
設(shè)想自己是一個(gè)進(jìn)程,就叫小進(jìn)吧。小進(jìn)需要接收一個(gè)輸入,我們不管這個(gè)輸入是從網(wǎng)絡(luò)套接字來(lái),還是鍵盤(pán),鼠標(biāo)來(lái),輸入的來(lái)源可以千千萬(wàn)萬(wàn)。但是,都必須由內(nèi)核來(lái)幫小進(jìn)完成,為啥內(nèi)核這么霸道?因?yàn)橛?jì)算機(jī)上運(yùn)行的可不只是咱小進(jìn)一個(gè)進(jìn)程,還有很多進(jìn)程。這些進(jìn)程兄弟也可能需要從這些輸入設(shè)備接收輸入,沒(méi)有內(nèi)核居中協(xié)調(diào),豈不是亂套。
從小進(jìn)的角度看,內(nèi)核幫助它完成輸入,其實(shí)包括三個(gè)步驟:內(nèi)核替小進(jìn)接收好數(shù)據(jù),這些數(shù)據(jù)暫時(shí)存在內(nèi)核的內(nèi)存空間
內(nèi)核將數(shù)據(jù)從自己的內(nèi)存空間復(fù)制到小進(jìn)的內(nèi)存空間
告訴小進(jìn),輸入數(shù)據(jù)來(lái)了,趕快讀吧
這三步看似挺簡(jiǎn)單,其實(shí)在具體實(shí)現(xiàn)時(shí),有很多地方需要考慮:小進(jìn)如何告訴內(nèi)核自己要接收一個(gè)輸入?
內(nèi)核接到小進(jìn)的請(qǐng)求,替小進(jìn)接收好數(shù)據(jù)這段時(shí)間, 小進(jìn)咋辦?
內(nèi)核在將數(shù)據(jù)復(fù)制到小進(jìn)的內(nèi)存空間這段時(shí)間,小進(jìn)咋辦?
到底什么時(shí)候告訴小進(jìn)數(shù)據(jù)準(zhǔn)備好了,是在內(nèi)核接收好數(shù)據(jù)之后就告訴小進(jìn),還是在將數(shù)據(jù)復(fù)制到小進(jìn)的內(nèi)存空間之后再告訴他?
內(nèi)核以什么樣的方式告訴小進(jìn),數(shù)據(jù)準(zhǔn)備好了?
1、阻塞式I/O模型
對(duì)上面5個(gè)問(wèn)題,最簡(jiǎn)單的解決方案就是阻塞式I/O模型,它的過(guò)程是這樣的:
小進(jìn):內(nèi)核內(nèi)核,我要接收一個(gè)鍵盤(pán)輸入,快點(diǎn)幫我完成!
內(nèi)核:好咧!biubiu!一個(gè)阻塞丟給小進(jìn),小進(jìn)頓時(shí)石化,就像被孫悟空點(diǎn)了定一樣。
就這樣,小進(jìn)在石化中,時(shí)間一點(diǎn)點(diǎn)流逝。終于,內(nèi)核收到了數(shù)據(jù)。
內(nèi)核:數(shù)據(jù)終于來(lái)了,我要開(kāi)干了!duang duang duang,先把數(shù)據(jù)存在自己的內(nèi)核空間,然后又復(fù)制到小進(jìn)的用戶空間。
內(nèi)核:biubiu!一個(gè)解除阻塞丟給小進(jìn),小進(jìn)瞬間復(fù)活,小進(jìn)的記憶還是停留在讓內(nèi)核幫他接收輸入時(shí)。
小進(jìn):哇!內(nèi)核真靠譜,數(shù)據(jù)已經(jīng)有了!干活去!
我們可以看到,小進(jìn)發(fā)出接收輸入的請(qǐng)求給內(nèi)核開(kāi)始,就處于阻塞狀態(tài),直到內(nèi)核將數(shù)據(jù)復(fù)制到小進(jìn)的用戶空間,小進(jìn)才解除阻塞。
2、非阻塞式I/O
小進(jìn)發(fā)現(xiàn),阻塞式I/O中,自己總要被阻塞好久,好不爽啊,于是小進(jìn)改用了非阻塞式I/O,其過(guò)程是這樣的:
小進(jìn):內(nèi)核內(nèi)核,我要接收一個(gè)輸入,趕緊幫我看看,數(shù)據(jù)到了沒(méi)有,先說(shuō)好,不要阻塞我。
內(nèi)核:查看了一下自己的內(nèi)核空間,沒(méi)有發(fā)現(xiàn)數(shù)據(jù),于是迅速告訴小進(jìn),沒(méi)有呢!并繼續(xù)幫小進(jìn)等著數(shù)據(jù)。
如此這樣,小進(jìn)不斷地問(wèn)內(nèi)核,終于,過(guò)了一段時(shí)間,小進(jìn)再一次詢問(wèn)時(shí),內(nèi)核往自己的空間中一查,呦!數(shù)據(jù)來(lái)了,不勝其煩的內(nèi)核迅速告訴小進(jìn),數(shù)據(jù)好了!
小進(jìn):快給我!
內(nèi)核:biu!一個(gè)阻塞丟給小進(jìn),悲催的小進(jìn)還是石化了!
內(nèi)核趕緊將自己空間的輸入數(shù)據(jù)復(fù)制到小進(jìn)的用戶空間,復(fù)制好后。
內(nèi)核:biu!一個(gè)非阻塞丟給小進(jìn),小進(jìn)立馬復(fù)活
小進(jìn):哇!數(shù)據(jù)來(lái)了,啥也不說(shuō),干活!
我們看到,所謂的非阻塞I/O,其實(shí)在內(nèi)核將數(shù)據(jù)從內(nèi)核空間復(fù)制到小進(jìn)的用戶空間時(shí),小進(jìn)還是被阻塞的。
3、信號(hào)驅(qū)動(dòng)式I/O
非阻塞I/O中,小進(jìn)不停地問(wèn)內(nèi)核,數(shù)據(jù)好了沒(méi)有啊,內(nèi)核感覺(jué)太煩了,于是想出一個(gè)好辦法。
內(nèi)核告訴小進(jìn),本內(nèi)核升級(jí)了,如果想要我替你接收輸入,請(qǐng)先注冊(cè)一個(gè)信號(hào)處理函數(shù),等數(shù)據(jù)準(zhǔn)備好時(shí),我會(huì)發(fā)信號(hào)給你。于是,現(xiàn)在的流程是這樣的:
小進(jìn):注冊(cè)信號(hào)處理函數(shù),告訴內(nèi)核,自己要接收一個(gè)輸入,然后繼續(xù)干活!
內(nèi)核:收到函數(shù),開(kāi)始執(zhí)行數(shù)據(jù)接收
接收完成時(shí),給小進(jìn)發(fā)送信號(hào),信號(hào)處理函數(shù)收到信號(hào),開(kāi)始向內(nèi)核發(fā)送讀數(shù)據(jù)請(qǐng)求
內(nèi)核:biu!阻塞了小進(jìn),并把數(shù)據(jù)從內(nèi)核空間復(fù)制到小進(jìn)的用戶空間。
內(nèi)核:biu!解除了阻塞
小進(jìn):哇!數(shù)據(jù)來(lái)了!啥也不說(shuō),干活去!
4、異步I/O
上面的三種I/O解決方案中,小進(jìn)都被阻塞了,只不過(guò)是阻塞時(shí)間長(zhǎng)短不一樣,第一種方案中小進(jìn)被阻塞的時(shí)間長(zhǎng)一些,在內(nèi)核接收數(shù)據(jù)以及將數(shù)據(jù)復(fù)制到小進(jìn)的用戶空間時(shí),都被阻塞。
第二、第三種方案中,只在內(nèi)核將數(shù)據(jù)從內(nèi)核空間復(fù)制到小進(jìn)的用戶空間時(shí),小進(jìn)才被阻塞。
我們現(xiàn)在說(shuō)的異步I/O,目的就是讓小進(jìn)絕對(duì)不被阻塞。其過(guò)程是這樣的:
小進(jìn):內(nèi)核內(nèi)核,我要接收一個(gè)輸入,弄好了告訴我。同時(shí)將一個(gè)信號(hào)和信號(hào)處理函數(shù)告訴內(nèi)核,然后繼續(xù)干自己的活了。
內(nèi)核:得了您嘞,您先忙。
一直到內(nèi)核接收到數(shù)據(jù)并將數(shù)據(jù)從內(nèi)核空間復(fù)制到小進(jìn)的用戶空間后,內(nèi)核才給小進(jìn)發(fā)送信號(hào)。小進(jìn)在信號(hào)處理函數(shù)中可以直接處理數(shù)據(jù)。
踐
1、阻塞式I/O式
客戶端代碼public class Client {
public static void main(String[] args) {
Socket socket = null;
try {
System.out.println("socket begin " + System.currentTimeMillis());
// 隨機(jī)綁定本地地址與端口
socket = new Socket("localhost", 8888);
System.out.println("socket end " + System.currentTimeMillis());
OutputStream os = socket.getOutputStream();
Random ran = new Random();
for (int n = 0; n < 10; n++) {
System.out.println("send message " + n);
os.write(("hello server form " + socket.getLocalAddress().getHostAddress() + " - " + n).getBytes());
try {
TimeUnit.SECONDS.sleep(ran.nextInt(10));
} catch (InterruptedException e) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null) {
// 自動(dòng)關(guān)閉綁定流
socket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
服務(wù)端代碼public class Server {
public static void main(String[] args) {
ServerSocket serverSocket = null;
Socket socket = null;
ThreadPoolExecutor executor = null;
try {
// 初始化線程池
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
0L, TimeUnit.SECONDS, new LinkedBlockingDeque(), new ThreadBuilder());
// 監(jiān)聽(tīng)通配符地址
serverSocket = new ServerSocket(8888);
System.out.println("accept begin " + System.currentTimeMillis());
while ((socket = serverSocket.accept()) != null) {
executor.execute(new Task(socket));
}
System.out.println("accept end " + System.currentTimeMillis());
} catch (IOException e) {
e.printStackTrace();
} finally {
executor.shutdown();
try {
if (serverSocket != null) {
serverSocket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class ThreadBuilder implements ThreadFactory {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
thread.setUncaughtExceptionHandler((t, e) -> {
if (e instanceof TaskException) {
System.err.println(t.getName() + "|" + e.getCause().getMessage());
} else {
System.err.println(t.getName() + "|" + e.getMessage());
}
});
return thread;
}
}
static class Task implements Runnable {
private byte[] buffer = new byte[10 * 1024];
private Socket socket;
public Task(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream is = socket.getInputStream();
System.out.println("--------------------------------------------------");
System.out.println("read begin " + System.currentTimeMillis());
System.out.println("***");
int len = is.read(buffer);// 呈阻塞效果
while (len != -1) {
String str = new String(buffer, 0, len);
System.out.println(str);
len = is.read(buffer);
}
System.out.println("***");
System.out.println("read end " + System.currentTimeMillis());
System.out.println("--------------------------------------------------");
} catch (IOException e) {
throw new TaskException(e);
} finally {
if (socket != null) {
try {
// 自動(dòng)關(guān)閉綁定流
socket.close();
System.out.println("socket closed");
} catch (IOException e) {
System.err.println("socket close failed");
}
}
}
}
}
static class TaskException extends RuntimeException {
public TaskException(Throwable cause) {
super(cause);
}
}
}
2、非阻塞式I/O
客戶端代碼同上
服務(wù)端代碼public class Server {
public static void main(String[] args) {
ServerSocketChannel serverSocket = null;
SocketChannel socket = null;
ThreadPoolExecutor executor = null;
try {
// 初始化線程池
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
0L, TimeUnit.SECONDS, new LinkedBlockingDeque(), new ThreadBuilder());
serverSocket = ServerSocketChannel.open();
// 設(shè)置阻塞
serverSocket.configureBlocking(true);
// 監(jiān)聽(tīng)通配符地址
serverSocket.bind(new InetSocketAddress(8888));
System.out.println("accept begin " + System.currentTimeMillis());
while ((socket = serverSocket.accept()) != null) {
// 設(shè)置非阻塞
socket.configureBlocking(false);
executor.execute(new Task(socket));
}
System.out.println("accept end " + System.currentTimeMillis());
} catch (IOException e) {
e.printStackTrace();
} finally {
executor.shutdown();
try {
if (serverSocket != null) {
serverSocket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class ThreadBuilder implements ThreadFactory {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
thread.setUncaughtExceptionHandler((t, e) -> {
if (e instanceof TaskException) {
System.err.println(t.getName() + "|" + e.getCause().getMessage());
} else {
System.err.println(t.getName() + "|" + e.getMessage());
}
});
return thread;
}
}
static class Task implements Runnable {
private ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
private SocketChannel socket;
public Task(SocketChannel socket) {
this.socket = socket;
}
@Override
public void run() {
try {
System.out.println("--------------------------------------------------");
System.out.println("read begin " + System.currentTimeMillis());
System.out.println("***");
socket.read(buffer);// 呈阻塞效果
while (true) {
if (buffer.position() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
continue;
}
} else {
buffer.flip();
String str = new String(buffer.array(), 0, buffer.limit());
System.out.println(str);
if ("exit".equals(str)) {
break;
}
buffer.clear();
}
socket.read(buffer);
}
System.out.println("***");
System.out.println("read end " + System.currentTimeMillis());
System.out.println("--------------------------------------------------");
} catch (IOException e) {
throw new TaskException(e);
} finally {
if (socket != null) {
try {
// 自動(dòng)關(guān)閉綁定流
socket.close();
System.out.println("socket closed");
} catch (IOException e) {
System.err.println("socket close failed");
}
}
}
}
}
static class TaskException extends RuntimeException {
public TaskException(Throwable cause) {
super(cause);
}
}
}
3、多路復(fù)用式I/O(基于非阻塞式I/O)
客戶端代碼同上
服務(wù)端代碼public class Server {
public static void main(String[] args) {
Selector selector = null;
ServerSocketChannel serverSocket = null;
SocketChannel socket = null;
ThreadPoolExecutor executor = null;
try {
// 初始化線程池
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
0L, TimeUnit.SECONDS, new LinkedBlockingDeque(), new ThreadBuilder());
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
// 設(shè)置非阻塞
serverSocket.configureBlocking(false);
// 監(jiān)聽(tīng)通配符地址
serverSocket.bind(new InetSocketAddress(8888));
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("accept begin " + System.currentTimeMillis());
while (true) {
selector.select();
Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
// 設(shè)置非阻塞
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
executor.execute(new Task(socketChannel));
key.cancel();
} else {
// TODO 寫(xiě)事件注冊(cè)
}
iterator.remove();
}
}
// System.out.println("accept end " + System.currentTimeMillis());
} catch (IOException e) {
e.printStackTrace();
} finally {
executor.shutdown();
try {
if (serverSocket != null) {
serverSocket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class ThreadBuilder implements ThreadFactory {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
thread.setUncaughtExceptionHandler((t, e) -> {
if (e instanceof TaskException) {
System.err.println(t.getName() + "|" + e.getCause().getMessage());
} else {
System.err.println(t.getName() + "|" + e.getMessage());
}
});
return thread;
}
}
static class Task implements Runnable {
private ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
private SocketChannel socket;
public Task(SocketChannel socket) {
this.socket = socket;
}
@Override
public void run() {
try {
System.out.println("--------------------------------------------------");
System.out.println("read begin " + System.currentTimeMillis());
System.out.println("***");
socket.read(buffer);// 呈阻塞效果
while (true) {
if (buffer.position() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
continue;
}
} else {
buffer.flip();
String str = new String(buffer.array(), 0, buffer.limit());
System.out.println(str);
if ("exit".equals(str)) {
break;
}
buffer.clear();
}
socket.read(buffer);
}
System.out.println("***");
System.out.println("read end " + System.currentTimeMillis());
System.out.println("--------------------------------------------------");
} catch (IOException e) {
throw new TaskException(e);
} finally {
if (socket != null) {
try {
// 自動(dòng)關(guān)閉綁定流
socket.close();
System.out.println("socket closed");
} catch (IOException e) {
System.err.println("socket close failed");
}
}
}
}
}
static class TaskException extends RuntimeException {
public TaskException(Throwable cause) {
super(cause);
}
}
}
4、信號(hào)驅(qū)動(dòng)式I/O
JAVA沒(méi)有實(shí)現(xiàn)
5、異步I/O
客戶端代碼同上
服務(wù)端代碼public class Server {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8888));
serverSocketChannel.accept(null, new CompletionHandler() {
public void completed(AsynchronousSocketChannel asc, Void att) {
serverSocketChannel.accept(null, this);
ByteBuffer byteBuffer = ByteBuffer.allocate(10 * 1024);
asc.read(byteBuffer, null, new CompletionHandler() {
@Override
public void completed(Integer result, Void attachment) {
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, byteBuffer.limit()));
byteBuffer.clear();
try {
asc.close();
} catch (IOException e) {
}
}
@Override
public void failed(Throwable exc, Void attachment) {
}
});
}
public void failed(Throwable exc, Void att) {
}
});
for (; ; ) {
}
}
}
結(jié)
牛奶工送牛奶場(chǎng)景阻塞式:每天早上自己去小區(qū)門口等牛奶工
非阻塞式:每天早上在家從窗戶早上隔3分鐘看看牛奶工到了沒(méi),到了的話去拿
多路復(fù)用式:每天早上由小區(qū)門衛(wèi)室接待所有牛奶工,到了會(huì)給住戶發(fā)短信,你馬上去拿
信號(hào)驅(qū)動(dòng)式:每天早上牛奶工到了會(huì)給你發(fā)短信,你馬上去拿
異步式:每天早上牛奶工直接放到小區(qū)住戶牛奶柜并發(fā)短信,不需要現(xiàn)在去拿
它
程序分為CPU計(jì)算型和I/O讀寫(xiě)型,線程尤其是被內(nèi)核調(diào)度的線程是及其珍貴的資源(JAVA計(jì)劃在JDK將來(lái)的版本實(shí)現(xiàn)由JVM”自己“調(diào)度的輕型線程),在有限的線程資源下CPU計(jì)算型程序不但不會(huì)有明顯提升,反而由于頻繁的上下文切換導(dǎo)致性能下降(這也是Redis這種基于內(nèi)存的數(shù)據(jù)庫(kù)采用單工作線程并且速度非常快的原因,另一個(gè)重要的原因是單線程導(dǎo)致了不用為共享資源給線程加/解鎖造成人為阻塞),而在I/O讀寫(xiě)型的程序中,多線程工作在以上五種模式下性能是逐步提升的(最后多說(shuō)一句,還是以Redis舉例,不管是Jedis-Pool這種池化客戶端還是Lettuce這種單連接客戶端,當(dāng)多用戶接入Redis服務(wù)器時(shí)一定是多連接的,這時(shí)候就要用到多路復(fù)用來(lái)處理用戶請(qǐng)求了,至于為什么沒(méi)有用異步,一個(gè)原因是工作線程是單線程,另一個(gè)原因是異步I/O模型在性能提升方面有限并且復(fù)雜度高,以至于Netty在新版本的包中把這種模式刪除了)
總結(jié)
以上是生活随笔為你收集整理的java如何阻塞和同步_同步与异步,阻塞与非阻塞的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: android content item
- 下一篇: ENSP配置 实例八 三层交换机DHC