future 线程报错后_线程池运用实例——一次错误的多线程程序设计以及修复过程...
寫在前面的話
寫下這篇文章只為了回顧之前在實際工作中犯的一個極其二逼的錯誤,用我的經歷來提示后來者,諸位程序大神,大牛,小牛們看到此文笑笑即可,輕拍輕拍。。。
1 背景
有這么一個需求,我們的系統(后面簡稱:A系統)需要在后臺執行一個報表導出任務,在這個任務的執行過程中需要通過CORBA調用其他系統(后面簡稱:B系統)的一個(也有可能是多個)接口去查詢報表,待結果返回后,將這些結果寫入Excel。這個需求是不是很簡單?套用網上一些FutureTask或者線程池的例子一兩小時就能搞定這個需求。當時我也是這樣認為的,可誰想,這是一個巨大的坑….
2 初始設計
用過CORBA的同學會知道,如同數據庫連接一樣,CORBA的連接數也是是有限的,如果一個接口調用的時間過長,就會長時間占用CORBA有限的連接數,當這種長時間的同步調用過多時就會造成整個系統CORBA調用的阻塞,進而造成系統停止響應。由于查詢操作很耗時,為了避免這種情況的發生,這個接口被設計成了一個異步接口。任務的執行流程就會是這樣:任務開始執行,接著調用這個接口并且通過CORBA向B系統訂閱一個事件,然后任務進入等待狀態,當B系統執行完成后,會向A系統發送一個事件告知執行的結果,任務收到事件后重新開始執行直到結束,如圖:
既然說到了事件,那么很自然而然的就想到了使用回調的方式去響應事件,并且為了避免事件超時(也就是長時間沒有接收到事件)導致任務長時間等待,我還使用了一個定時的任務去檢查任務的狀態。所以我的程序看起來就像這樣:
IEventFuture.java
public interface IEventFuture {
void onEventReceived(Event event);
}
ExportRptTask.java
public class ExportRptTask implements Callable, IEventFuture {
private static final int INITIALIZED = 0;
private static final int RUNNING = 1;
private static final int COMPLETED = 2;
private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
private Date lastUpdate = new Date();
private volatile int state = INITIALIZED;
private Timer timer = new Timer();
private SystemBSer systemBSer = new SystemBSer();
private int eventId = -1;
@Override
public Void call() throws Exception {
this.state = RUNNING;
try {
systemBSer.doQuery();
subscribeEvent();
startTaskTimeoutMonitorTask();
Future future = createEventFuture();
future.get();
} catch (Throwable t) {
onTaskError(t);
} finally {
EventManager.unsubscribe(this.eventId);
timer.cancel();
}
return null;
}
@Override
public void onEventReceived(Event event) {
this.lastUpdate = new Date();
// start to write excel
// .....
// end to write excel
this.state = COMPLETED;
}
private void subscribeEvent() {
this.eventId = EventManager.subscribe(this);
}
private Future createEventFuture() {
FutureTask listenFuture = new FutureTask(new Callable() {
@Override
public Void call() throws Exception {
while (state != COMPLETED) {
}
return null;
}
});
new Thread(listenFuture).start();
return listenFuture;
}
private void startTaskTimeoutMonitorTask() {
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (state != COMPLETED || new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
onTaskTimeout();
}
}
}, 0, 15 * 60 * 1000);
}
private void onTaskTimeout() {
// do something on task timeout.
// ? ....
// end
// set task to completed to end task.
this.state = COMPLETED;
}
private void onTaskError(Throwable t) {
// do something to handle error.
}
}
3 升級改進
由于做這個需求的關系,我開始閱讀一些關于JAVA多線程編程的一下教程,在閱讀到關于閉鎖的內容時,我突然靈光一現,這玩意不正好可以代替我那個丑陋的使用循環來讓任務進入等待狀態的實現么?然后我的程序就變成了這樣:
ExportRptTask.java
public class ExportRptTask implements Callable, IEventFuture {
private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
private Date lastUpdate = new Date();
private CountDownLatch endGate = new CountDownLatch(1);
private Timer timer = new Timer();
private SystemBSer systemBSer = new SystemBSer();
private int eventId = -1;
@Override
public Void call() throws Exception {
try {
systemBSer.doQuery();
subscribeEvent();
endGate.await();
startTaskTimeoutMonitorTask();
} catch (Throwable t) {
onTaskError(t);
} finally {
EventManager.unsubscribe(this.eventId);
timer.cancel();
}
return null;
}
@Override
public void onEventReceived(Event event) {
this.lastUpdate = new Date();
// start to write excel
// .....
// end to write excel
this.endGate.countDown();
}
private void subscribeEvent() {
this.eventId = EventManager.subscribe(this);
}
private void startTaskTimeoutMonitorTask() {
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
onTaskTimeout();
}
}
}, 0, 15 * 60 * 1000);
}
private void onTaskTimeout() {
// do something on task timeout.
// ? ....
// end
// set task to completed to end task.
this.endGate.countDown();
}
private void onTaskError(Throwable t) {
// do something to handle error.
}
}
4 問題浮現
正在我為我使用高大上的閉鎖代替循環沾沾自喜的時候,測試大爺告訴我,任務經常莫名其妙的失敗,并且日志中沒有任何異常。開始,這讓我覺得很不可思議,因為我已經在call()方法處處理了所有的異常,任務失敗時至少也應該有個日志啥的吧。這個問題一直困擾著我,直到有一天分析日志我突然發現任務執行的工作線程(也就是call()方法所在的線程)和接收到事件后的回調并不是同一個線程。這就意味著在查詢到報表結果后,所有寫Excel,分發結果等等的操作都是在事件回調的線程中執行的,那么一旦這里發生異常原來call()中的catch塊自然無法捕獲,然后異常就被莫名其妙的吞掉了。好吧,我承認我之前對線程池也就了解點皮毛,對多線程也僅僅是有個概念,想當然的認為在線程池中可以Hold住任務的一切,包括響應這個任務在執行過程中創建的其他線程運行時發生的異常。而且更嚴重的是按照原來的實現,只有當整個任務執行完成(包括寫完Excel)后,才會釋放那個閉鎖,所以一旦事件回調發生異常,那么整個任務都無法終止。在線程池中發生一個任務永遠無法終止的后果,你懂的。
5 重新設計
痛定思痛,我決定重新梳理這個任務的流程。這個需求的難點就是在如何監聽并響應B系統給我們發送的事件,實際上,這是一個很經典的生產者–消費者問題,而阻塞隊列正好是解決這類問題的利器。重新設計的事件響應流程就變成:當B系統發送事件的時候,事件回調線程會往阻塞隊列里面填充一個事件。在另一方面,任務調用完B系統的查詢接口后,就開始從阻塞隊列中取事件,當事件隊列為空的時候,取事件的線程(也就是線程池執行任務的工作線程)會被阻塞。并且,阻塞隊列的取操作可以設置超時時間,所以當取到的事件對象為空時,就意味著事件超時了,這樣就省去了使用定時任務定時檢查任務狀態的工作。重新設計的程序是這樣的:
EventProxy.java
public class EventProxy implements IEventFuture {
private static final BlockingQueue eventQueue = new ArrayBlockingQueue(10);
private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
@Override
public void onEventReceived(Event event) {
eventQueue.offer(event);
}
public Event getEvent() throws InterruptedException {
return eventQueue.poll(TASK_TIME_OUT_TIME, TimeUnit.MILLISECONDS);
}
}
ExportRptTask.java
public class ExportRptTask3 implements Callable {
private SystemBSer systemBSer = new SystemBSer();
private EventProxy eventProxy = new EventProxy();
private int eventId = -1;
@Override
public Void call() throws Exception {
try {
systemBSer.doQuery();
subscribeEvent();
Event event = eventProxy.getEvent();
if (event != null) {
processEvent(event);
} else {
onTaskTimeout();
}
} catch (Throwable t) {
onTaskError(t);
} finally {
EventManager.unsubscribe(this.eventId);
}
return null;
}
private void subscribeEvent() {
this.eventId = EventManager.subscribe(eventProxy);
}
private void processEvent(Event event) {
// do something on receive event.
}
private void onTaskTimeout() {
// do something on task timeout.
// ? ....
// end
}
private void onTaskError(Throwable t) {
// do something to handle error.
}
}
6 總結
相信各位并發編程的大牛們能在一瞬間就可以把我的程序(包括改進后的)批得體無完膚,不過我還是想分享下我在這個過程中的收獲。
在動手寫程序前,請先理解你的需求,特別是要注意用已有的模型去識別問題,在本例中,我就是沒有識別響應事件的流程其實是個生產者–消費者問題導致了后面的錯誤
請充分的了解你需要使用的技術和工具。比如,使用線程池你就要了解線程池的工作原理,這樣你才能正確的使用這些技術。做技術切忌想當然。
在使用線程池時,重要的操作盡量放在任務的主線程中執行(也就是call()/run()方法所在的線程),否則線程池本身難以對任務進行控制。
如果一定要在任務中再創建新的線程,請確保任務主線程是任務最后退出的線程。切忌不要使用外部線程直接調用任務類的方法,在本例中我就犯了這樣的錯誤。
總結
以上是生活随笔為你收集整理的future 线程报错后_线程池运用实例——一次错误的多线程程序设计以及修复过程...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何戒上头电子烟
- 下一篇: idea社区版和企业版区别_Intell