Java并发 正确终止与恢复线程
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
前面提到了stop()、suspend()等方法在終止與恢復(fù)線程的弊端,那么問題來了,應(yīng)該如何正確終止與恢復(fù)線程呢?這里可以使用兩種方法:interrupt()方法和使用boolean變量進(jìn)行控制。
在使用interrupt方法之前,有必要介紹一下中斷以及與interrupt相關(guān)的方法。中斷可以理解為線程的一個(gè)標(biāo)志位屬性,表示一個(gè)運(yùn)行中的線程是否被其他線程進(jìn)行了中斷操作。這里提到了其他線程,所以可以認(rèn)為中斷是線程之間進(jìn)行通信的一種方式,簡單來說就是由其他線程通過執(zhí)行interrupt方法對該線程打個(gè)招呼,讓起中斷標(biāo)志位為true,從而實(shí)現(xiàn)中斷線程執(zhí)行的目的。
其他線程調(diào)用了interrupt方法后,該線程通過檢查自身是否被中斷進(jìn)行響應(yīng),具體就是該線程需要調(diào)用isInterrupted方法進(jìn)行判斷是否被中斷或者調(diào)用Thread類的靜態(tài)方法interrupted對當(dāng)前線程的中斷標(biāo)志位進(jìn)行復(fù)位(變?yōu)閒alse)。需要注意的是,如果該線程已經(jīng)處于終結(jié)狀態(tài),即使該線程被中斷過,那么調(diào)用isInterrupted方法返回仍然是false,表示沒有被中斷。
那么是不是線程調(diào)用了interrupt方法對該線程進(jìn)行中斷,該線程就會(huì)被中斷呢?答案是否定的。因?yàn)镴ava虛擬機(jī)對會(huì)拋出InterruptedException異常的方法進(jìn)行了特別處理:Java虛擬機(jī)會(huì)將該線程的中斷標(biāo)志位清除,然后跑出InterruptedException,這個(gè)時(shí)候調(diào)用isInterrupted方法返回的也是false。
下面的代碼首先創(chuàng)建了兩個(gè)線程,一個(gè)線程內(nèi)部不停睡眠,另一個(gè)則不斷執(zhí)行,然后對這兩個(gè)線程執(zhí)行中斷操作。
package com.rhwayfun.concurrency;/*** Created by rhwayfun on 16-4-2.*/ public class Interrupted {public static void main(String[] args){//創(chuàng)建一個(gè)休眠線程Thread sleepThread = new Thread(new SleepThread(),"SleepThread");//設(shè)為守護(hù)線程sleepThread.setDaemon(true);//創(chuàng)建一個(gè)忙線程Thread busyThread = new Thread(new BusyThread(),"BusyThread");//把該線程設(shè)為守護(hù)線程//守護(hù)線程只有當(dāng)其他前臺(tái)線程全部退出之后才會(huì)結(jié)束busyThread.setDaemon(true);//啟動(dòng)休眠線程sleepThread.start();//啟動(dòng)忙線程busyThread.start();//休眠5秒,讓兩個(gè)線程充分運(yùn)行SleepUtil.second(5);//嘗試中斷線程//只需要調(diào)用interrupt方法sleepThread.interrupt();busyThread.interrupt();//查看這兩個(gè)線程是否被中斷了System.out.println("SleepThread interrupted is " + sleepThread.isInterrupted());System.out.println("BusyThread interrupted is " + busyThread.isInterrupted());//防止sleepThread和busyThread立刻退出SleepUtil.second(2);}/*** 不斷休眠*/static class SleepThread implements Runnable{public void run() {while (true){try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 不斷等待*/static class BusyThread implements Runnable{public void run() {while (true){//忙等待}}} }執(zhí)行結(jié)果:
可以發(fā)現(xiàn)內(nèi)部不停睡眠的方法執(zhí)行執(zhí)行中斷后,其中斷標(biāo)志位返回的是false,而一直運(yùn)行的線程的中斷標(biāo)志位則為true。這里主要由于Sleep方法會(huì)拋出InterruptedException異常,所以Java虛擬機(jī)把SleepThread的中斷標(biāo)志位復(fù)位了,所以才會(huì)顯示false。
那么使用interrupt方法正確終止線程已經(jīng)很明顯了,代碼如下:
package com.rhwayfun.concurrency;import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit;/*** Created by rhwayfun on 16-4-2.*/ public class SafeShutdownThread {public static void main(String[] args) throws InterruptedException {DateFormat format = new SimpleDateFormat("HH:mm:ss");Runner one = new Runner();//創(chuàng)建第一個(gè)計(jì)數(shù)線程,該線程使用jdk自帶的中斷方法執(zhí)行中斷Thread threadOne = new Thread(one,"ThreadOne");//執(zhí)行第一個(gè)線程threadOne.start();//threadOne休眠一秒,然后由main thread執(zhí)行中斷TimeUnit.SECONDS.sleep(1);threadOne.interrupt();System.out.println("ThreadOne is interrupted ? " + threadOne.isInterrupted());System.out.println("main thread interrupt ThreadOne at " + format.format(new Date()));//創(chuàng)建第二個(gè)線程,該線程使用cancel方法執(zhí)行中斷Runner two = new Runner();Thread threadTwo = new Thread(two,"ThreadTwo");threadTwo.start();//休眠一秒,然后調(diào)用cancel方法中斷線程TimeUnit.SECONDS.sleep(1);two.cancel();System.out.println("ThreadTwo is interrupted ? " + threadTwo.isInterrupted());System.out.println("main thread interrupt ThreadTwo at " + format.format(new Date()));}/*** 該線程是一個(gè)計(jì)數(shù)線程*/private static class Runner implements Runnable{//變量iprivate long i;//是否繼續(xù)運(yùn)行的標(biāo)志//這里使用volatile關(guān)鍵字可以保證多線程并發(fā)訪問該變量的時(shí)候//其他線程都可以感知到該變量值的變化。這樣所有線程都會(huì)從共享//內(nèi)存中取值private volatile boolean on = true;public void run() {while (on && !Thread.currentThread().isInterrupted()){i++;}System.out.println("Count i = " + i);}//讓線程終止的方法public void cancel(){on = false;}} }在計(jì)數(shù)線程中通過使用一個(gè)boolean變量成功終止了線程。這種通過標(biāo)志位或者中斷操作的方式能夠使得線程在終止的時(shí)候有機(jī)會(huì)去清理資源,而不是武斷地將線程終止,因此這種終止線程的做法更優(yōu)雅和安全。
上面的程序只是正確地終止了線程,卻沒有給出正確恢復(fù)的方法。可能有人會(huì)想到:再寫一個(gè)方法讓on變量為true不就行了。事實(shí)并如此,因?yàn)樵贑ountThread中,由于已經(jīng)調(diào)用cancel方法,這時(shí)on變量已經(jīng)是false了,線程按照順序執(zhí)行原則繼續(xù)執(zhí)行,所以即使改變on為true也是沒用的,因?yàn)镃ountThread已經(jīng)終止了。具體的解決方法將在下一篇關(guān)于等待通知機(jī)制的文章給出詳細(xì)的解決措施。
?
重新認(rèn)識(shí)中斷
之前在正確終止與恢復(fù)線程一文中介紹了使用Thread類的interrupt方法和使用標(biāo)志位實(shí)現(xiàn)線程的終止。由于之前只是簡單介紹了jdk默認(rèn)中斷方法的問題,對線程的中斷機(jī)制沒有深入介紹。為了正確終止線程,深刻理解線程中斷的本質(zhì)是很有必要的。Java沒有提供可搶占的安全的中斷機(jī)制,但是Java提供了線程協(xié)作機(jī)制(之前說的interrupt方法和標(biāo)志位本質(zhì)上都屬于線程之間協(xié)作的手段),但是提供了中斷機(jī)制,中斷機(jī)制允許一個(gè)線程終止另一個(gè)線程的當(dāng)前工作,所以需要在程序設(shè)計(jì)的時(shí)候考慮到中斷的位置和時(shí)機(jī)。
回到之前使用volatile類型的標(biāo)志位來終止線程的例子,在代碼中調(diào)用cancel方法來取消i的自增請求,如果Runner線程在下次執(zhí)行,或者正要執(zhí)行下一次自增請求時(shí)判斷on的時(shí)是否變?yōu)榱薴alse,如果是則終止執(zhí)行。
根據(jù)運(yùn)行結(jié)果,Runner的計(jì)數(shù)任務(wù)最終會(huì)被取消,然后退出。在Runner線程最終取消執(zhí)行之前,會(huì)有一定的時(shí)間,如果在在這個(gè)時(shí)間內(nèi),調(diào)用此方法的任務(wù)調(diào)用了一個(gè)會(huì)阻塞的方法,比如BlockingQueue的put方法,那么可能該任務(wù)一直違法檢測到on的值變?yōu)閒alse,因而Runner線程不會(huì)終止。
一個(gè)例子
比如下面的代碼就說明了這一點(diǎn):
package com.rhwayfun.patchwork.concurrency.r0411;import java.math.BigInteger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit;/*** Created by rhwayfun on 16-4-11.*/ public class BrokenShutdownThread extends Thread {//是否繼續(xù)運(yùn)行的標(biāo)志private static volatile boolean on = true;//阻塞隊(duì)列private final BlockingQueue<BigInteger> queue;public BrokenShutdownThread(BlockingQueue<BigInteger> queue) {this.queue = queue;}public void run() {try {BigInteger p = BigInteger.ONE;while (on) {//生產(chǎn)者一次可以放40個(gè)數(shù)for (int i = 0; i < 40; i++){queue.put(p = p.nextProbablePrime());System.out.println(Thread.currentThread().getName() + ": put value " + p);}}} catch (InterruptedException e) {}}public void cancel() {on = false;}/*** 消費(fèi)者線程*/static class Consumer extends Thread{//阻塞隊(duì)列private final BlockingQueue<BigInteger> queue;public Consumer(BlockingQueue<BigInteger> queue) {this.queue = queue;}@Overridepublic void run() {try {while (on) {//消費(fèi)者一次只能消費(fèi)1個(gè)數(shù)System.out.println(Thread.currentThread().getName() + ": get value " + queue.take());}System.out.println("work done!");} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) throws InterruptedException {BlockingQueue<BigInteger> queue = new LinkedBlockingQueue<>(5);BrokenShutdownThread producer = new BrokenShutdownThread(queue);//啟動(dòng)計(jì)數(shù)線程producer.start();TimeUnit.SECONDS.sleep(1);new Consumer(queue).start();TimeUnit.SECONDS.sleep(1);producer.cancel();} }運(yùn)行上面的程序,發(fā)現(xiàn)雖然控制臺(tái)輸出了work done!的信息,但是程序仍然沒有停止,仔細(xì)分析就會(huì)發(fā)現(xiàn)生產(chǎn)者的速度(40個(gè)數(shù)/次)遠(yuǎn)大于消費(fèi)者的速度(1個(gè)數(shù)/次),造成隊(duì)列被填滿,put方法被阻塞。雖然在運(yùn)行一秒后調(diào)用cancel方法將volatile變量on設(shè)為了false,但是由于生產(chǎn)者線程的put方法被阻塞,所以無法從阻塞的put方法中恢復(fù),自然程序就無法終止了。
重新認(rèn)識(shí)中斷
每個(gè)線程都有一個(gè)boolean類型的中斷狀態(tài)。當(dāng)中斷線程時(shí),中斷狀態(tài)被設(shè)為true。通過Thread的三個(gè)方法可以進(jìn)行不同的中斷操作:
public void interrupt() {...} public static boolean interrupted() {...} public boolean isInterrupted() {...}執(zhí)行interrupt方法能夠中斷線程,interrupted可以清除線程的中斷狀態(tài),isInterrupted方法可以返回當(dāng)前線程的中斷狀態(tài)。
當(dāng)線程調(diào)用會(huì)阻塞的方法,比如wait()、sleep()等方法時(shí),線程會(huì)檢查自己的中斷狀態(tài),并且在發(fā)生中斷時(shí)提前返回。這些阻塞的方法響應(yīng)中斷的操作是清除中斷狀態(tài),拋出InterruptedException。拋出InterruptedException的作用是表示線程由于中斷需要提前結(jié)束。調(diào)用interrupt方法執(zhí)行中斷的本質(zhì)是調(diào)用interrupt方法并不會(huì)立即停止目標(biāo)線程正在執(zhí)行的工作,只是傳遞了請求中斷的消息。然后線程會(huì)在下一個(gè)時(shí)刻中斷自己。當(dāng)收到中斷請求時(shí)拋出InterruptedException,讓線程有選擇中斷策略的自由。一般而言,調(diào)用代碼需要對拋出的InterruptedException進(jìn)行額外的處理,直接屏蔽該異常是不正確的(也就是直接調(diào)用printStackTrace()方法)。屏蔽中斷異常的后果是調(diào)用棧的上層無法對中斷請求做出響應(yīng)。
對上面代碼的修正
根據(jù)以上的分析只需要對代碼做如下的修改就能正確終止線程:
public void run() {try {BigInteger p = BigInteger.ONE;while (on && !Thread.currentThread().isInterrupted()) {//生產(chǎn)者一次可以放40個(gè)數(shù)for (int i = 0; i < 40; i++){queue.put(p = p.nextProbablePrime());System.out.println(Thread.currentThread().getName() + ": put value " + p);}}} catch (InterruptedException e) {//讓線程退出return;}}public void cancel() {on = false;interrupt();} static class Consumer extends Thread{//阻塞隊(duì)列private final BlockingQueue<BigInteger> queue;public Consumer(BlockingQueue<BigInteger> queue) {this.queue = queue;}@Overridepublic void run() {try {while (on && !Thread.currentThread().isInterrupted()) {//消費(fèi)者一次只能消費(fèi)1個(gè)數(shù)System.out.println(Thread.currentThread().getName() + ": get value " + queue.take());}System.out.println("work done!");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}而其他代碼保持不變,再次運(yùn)行以上的程序,發(fā)現(xiàn)能夠正確終止了。主要就是使用中斷機(jī)制完成了線程之間的協(xié)作,從而達(dá)到正確終止線程的目的。
實(shí)際上在調(diào)用可阻塞的方法時(shí)拋出的InterruptedException是為了讓調(diào)用者能夠注意到中斷信息,使得調(diào)用者可以就中斷做出自己的操作。往往在將中斷信息傳給調(diào)用者之前需要執(zhí)行其他操作,如果在線程中使用中斷機(jī)制完成線程之間的協(xié)作,那么就應(yīng)該調(diào)用Thread.currentThread().intrrupt()恢復(fù)當(dāng)前線程的中斷狀態(tài),這樣當(dāng)前線程就能夠繼續(xù)其他操作了。正常情況下,都需要對中斷進(jìn)行響應(yīng),除非自己實(shí)現(xiàn)了中斷所應(yīng)該進(jìn)行的操作。
為了取消線程的執(zhí)行,除了之前的方法,還可以使用Future.get(Long time,TimeUnit unit)的帶超時(shí)限制的方法取消線程的執(zhí)行,如果沒有在指定的時(shí)間內(nèi)完成任務(wù),那么可以在代碼中直接調(diào)用Future.cancel()方法取消任務(wù)的執(zhí)行。取消任務(wù)的時(shí)候有兩種情況:一是任務(wù)在指定的時(shí)間完成了,這個(gè)時(shí)候調(diào)用取消操作沒有什么影響;二是任務(wù)沒有在指定的時(shí)間完成,那么調(diào)用cancel方法后任務(wù)將被中斷。
偽代碼如下:
Future task = threadPool.submit(runnable); try{}catch(TimeOutException e){//會(huì)取消任務(wù)的執(zhí)行 }catch(ExecutionException e){//如果在任務(wù)中拋出了執(zhí)行異常,則重新拋出該異常throw(new Throwable(e.getCause())); }finally{//true表示正在執(zhí)行的任務(wù)能夠接收中斷,如果在執(zhí)行則線程能被中斷//如果為false,則表示若任務(wù)還沒有啟動(dòng)則不要啟動(dòng)該任務(wù)task.cancel(true); }實(shí)現(xiàn)線程取消的完整例子
這里以日志服務(wù)作為例子,業(yè)務(wù)場景是這樣的:前臺(tái)會(huì)有多個(gè)生產(chǎn)者調(diào)用日志服務(wù)輸出程序的日志,生產(chǎn)者將需要輸出的日志信息放入一個(gè)隊(duì)列中,后臺(tái)服務(wù)器有一個(gè)消費(fèi)者線程,負(fù)責(zé)從隊(duì)列中取出日志信息并輸出(目的地可能不同)。顯然這是一個(gè)典型的生產(chǎn)者-消費(fèi)者問題,不過這里出現(xiàn)了多個(gè)生產(chǎn)者,但是只有一個(gè)消費(fèi)者。顯然如果生產(chǎn)者的速度遠(yuǎn)遠(yuǎn)大于消費(fèi)者的處理速度的話,很可能造成阻塞,不過這點(diǎn)已經(jīng)再上面的分析中得到了解決。現(xiàn)在需要實(shí)現(xiàn)的是,提供可靠的關(guān)閉日志服務(wù)的方法,在前臺(tái)調(diào)用服務(wù)接口可以正確停止日志服務(wù),而不會(huì)出現(xiàn)任何問題。
實(shí)現(xiàn)代碼如下:
package com.rhwayfun.patchwork.concurrency.r0411;import java.io.PrintWriter; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit;/*** Created by rhwayfun on 16-4-11.*/ public class LoggerService {// 存放日志消息的阻塞隊(duì)列private final BlockingQueue<String> logQueue;// 打印日志的消費(fèi)者線程private final LoggerThread loggerThread;// 打印日志的打印器private PrintWriter writer;// 日志服務(wù)是否關(guān)閉的標(biāo)志private boolean isShutdown;// 執(zhí)行l(wèi)og方法的調(diào)用者的計(jì)數(shù)器private int reservations;public LoggerService(PrintWriter writer) {this.logQueue = new LinkedBlockingQueue<>(5);this.loggerThread = new LoggerThread(writer);}/*** 啟動(dòng)日志服務(wù)*/public void start() {loggerThread.start();}/*** 記錄日志** @param msg* @throws InterruptedException*/public void recordLog(String msg) throws InterruptedException {// 有條件保持對日志的添加// 并且在接收到關(guān)閉請求時(shí)停止往隊(duì)列中填入日志synchronized (this) {if (isShutdown) throw new IllegalStateException("LoggerService is shutdown!");++reservations;}// 由生產(chǎn)者將消息放入隊(duì)列// 這里不放入synchronized塊是因?yàn)閜ut方法有阻塞的作用logQueue.put(msg);}/*** 停止日志服務(wù)*/public void stop() {// 以原子方式檢查關(guān)閉請求synchronized (this) {isShutdown = true;}// 讓消費(fèi)者線程停止從隊(duì)列取日志loggerThread.interrupt();}/*** 消費(fèi)者線程*/private class LoggerThread extends Thread {private PrintWriter writer;public LoggerThread(PrintWriter writer) {this.writer = writer;}@Overridepublic void run() {try {while (true) {try {// 持有的鎖與之前的相同// 如果接收到應(yīng)用程序的關(guān)閉請求并且沒有生產(chǎn)者線程繼續(xù)往隊(duì)列填入日志// 那么就結(jié)束循環(huán),消費(fèi)者線程終止synchronized (LoggerService.this) {if (isShutdown && reservations == 0) break;}// 從隊(duì)列獲取生產(chǎn)者的日志String msg = logQueue.take();// 每輸出一條日志就減少一個(gè)線程synchronized (LoggerService.this) {--reservations;}writer.println("Read: " + msg);} catch (InterruptedException e) {//恢復(fù)中斷狀態(tài)Thread.currentThread().interrupt();}}} finally {writer.close();}}}/*** 生產(chǎn)者線程*/private static class LoggerWriter implements Runnable {private LoggerService service;private final DateFormat format = new SimpleDateFormat("HH:mm:ss");public LoggerWriter(LoggerService service) {this.service = service;}@Overridepublic void run() {try {String msg = "time is " + format.format(new Date());System.out.println("Write: " + msg);service.recordLog(msg);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}public static void main(String[] args) throws InterruptedException {LoggerService service = new LoggerService(new PrintWriter(System.out));//創(chuàng)建多個(gè)生產(chǎn)者線程負(fù)責(zé)創(chuàng)建日志for (int i = 0; i < 5; i++) {new Thread(new LoggerWriter(service)).start();TimeUnit.SECONDS.sleep(1);}//啟動(dòng)日志服務(wù)service.start();//休眠10秒TimeUnit.SECONDS.sleep(10);//關(guān)閉日志服務(wù)service.stop();} }?
小結(jié)
轉(zhuǎn)載于:https://my.oschina.net/oosc/blog/1622660
總結(jié)
以上是生活随笔為你收集整理的Java并发 正确终止与恢复线程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 删除username的索引
- 下一篇: Grpc+Grpc Gateway实践一