system流怎么判断为空_并行流ParallelStream中隐藏的陷阱
點擊上方藍字 ↑↑ Throwable文摘
關注公眾號設置星標,不定時推送高質量原創文章
關注
前提
這篇文章介紹一下日常開發中并行流ParallelStream中隱藏的陷阱,這個問題其實離我們很近,特別是喜歡使用JDK1.8+的流式編程的伙伴,應該會深有感觸。
一個故意而為的例子
下面舉一個故意而為的例子,實際上應該不會有類似的業務代碼:
public?class?ParallelStreamMain?{????public?static?void?main(String[]?args)?throws?Exception?{
????????List>?array?=?new?ArrayList<>();
????????List?item1?=?new?ArrayList<>();
????????List?item2?=?new?ArrayList<>();
????????List?target?=?new?ArrayList<>(100);
????????array.add(item1);
????????array.add(item2);
????????array.parallelStream().forEach(x?->?{for?(int?i?=?0;?i?100000;?i++)?{
????????????????target.add(i);
????????????}
????????});
????????System.out.println(target.size());
????}
}
某一次執行結果為:163913。如果不停地執行這個main方法,最終都會得到一個非200000的結果,這里的問題就在于使用了并行流parallelStream()方法。ParallelStream底層使用了Fork/Join框架實現,也就是應用了線程池ForkJoinPool把并行流中的節點抽象為ForkJoinTask進行計算,背后用到的"任務竊取"等原理這里就不進行展開,只需要明確:
- ForkJoinPool一般使用Runtime.getRuntime().availableProcessors()(此值一般認為是物理機器的邏輯核心數量)作為并行度(parallelism),簡單認為是可并發執行的任務數,并不是工作線程數。
- 多核機器中,使用ParallelStream在流的節點中的所有操作都相當于在「一個多線程環境中」進行操作,里面的所有操作都會產生不可預期的結果,例如可能會數組越界、添加元素丟失、部分下標index的引用為NULL等等。
一個仿真例子
寫這篇文章不是有意為之,其實很早之前筆者曾經遇到一個比較隱蔽的生產故障,其中有一段訪問量比較低的代碼大致如下:
@Dataprivate?static?class?OrderDTO?{
????private?String?orderId;
????private?OrderStatus?orderStatus;
????private?BigDecimal?amount;
????private?Long?customerId;
}
@Data
private?static?class?Order?{
????private?Long?id;
????private?String?orderId;
????private?Integer?orderStatus;
????private?BigDecimal?amount;
????private?Long?customerId;
????private?OffsetDateTime?createTime;
????private?OffsetDateTime?editTime;
}
public?void?groupByOrderStatus(Long?customerId)?{
????List?orders?=?orderDao.selectByCustomerId(customerId);
????List?orderDTOList?=?new?ArrayList<>();
????orders.parallelStream().forEach(order?->?{
????????OrderDTO?dto?=?new?OrderDTO();
????????......
????????orderDTOList.add(dto);
????});
????Map>?collect?
????????????=?orderDTOList.stream().collect(Collectors.groupingBy(item?->?item.getOrderStatus().getCode()));
????......
}
該方法的功能是通過客戶ID查詢訂單列表,然后把訂單列表轉化為OrderDTO列表,然后再按照訂單狀態字段進行分組。通過生產日志和測試回歸發現,上面的代碼段中groupByOrderStatus()方法會偶發空指針異常。
初次出現問題的時候,由于開發者通過Lambda表達式把多處代碼壓縮為1行,所以從異常棧比較難排查具體發生問題的代碼,后面把Lambda表達式以句點起點拆分為多行上線后觀察一段時間,最終定位到發生空指針異常的代碼段為Collectors.groupingBy(item -> item.getOrderStatus().getCode()),也就是OrderDTO實例中的orderStatus為空對象。這里顯然,groupByOrderStatus()方法其實是被封閉在線程棧中調用,本不應該有多個線程去并發修改其中的內容,這里只剩下一個疑點:使用了parallelStream()。后來直接把parallelStream()修改為stream()重新上線,該空指針問題不再復現。
Lambda/Stream其實并不是天然線程安全的,線程安全的前提是它們本身被線程封閉調用,并且不引入多線程環境,像使用了并行流,本質就是引入了多線程環境。所以,在開發功能的時候,需要仔細思考一下:
筆者有代碼潔癖,當時還發現了上面的代碼存在映射操作,正確來說應該使用map()函數,而不是forEach()去遍歷元素重新裝進去另一個列表,方法中的邏輯體現了原開發者其實對Lambda一知半解。
?小結
回到最初那個問題,其實使用并行流也可以保證執行結果和預期一致,不過一定需要引入額外的同步機制,例如這里使用「監視器」進行同步:
public?class?ParallelStreamMain?{????public?static?void?main(String[]?args)?throws?Exception?{
????????List>?array?=?new?ArrayList<>();
????????List?item1?=?new?ArrayList<>();
????????List?item2?=?new?ArrayList<>();
????????List?target?=?new?ArrayList<>(100);
????????array.add(item1);
????????array.add(item2);final?Object?monitor?=?new?Object();
????????array.parallelStream().forEach(x?->?{synchronized?(monitor)?{for?(int?i?=?0;?i?100000;?i++)?{
????????????????????target.add(i);
????????????????}
????????????}
????????});
????????System.out.println(target.size());
????}
}
上面的方法無論執行多少次,最終都只會輸出:200000。
(本文完 c-1-d e-a-20200710)
總結
以上是生活随笔為你收集整理的system流怎么判断为空_并行流ParallelStream中隐藏的陷阱的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: leetcode 142 --- lin
- 下一篇: python垃圾回收 采用方式_pyth