Java 8 stream学习
簡介
? ? Java 8里引入的另外一個重要特性就是stream api。籠統的來說,它這種特性的引入可以方便我們以一種更加聲明式的方式來寫代碼,更加便利了一些函數式編程方法的使用。同時,它也使得我們可以充分利用系統的并行能力而不用自己手工的去做很多底層的工作。當然,里面最讓人印象深刻的也許是一種類似于流式編程的概念。?
?
流水線(pipeline)
? ? 在以前一些linux腳本命令中經常會接觸到的一個概念就是pipeline,它其實體現出來了一個很好的程序設計哲學,就是我們應該設計很多小而且職責單一的模塊。每個模塊只專注于做一件事情,然后它們之間通過一種流水線的方式將它們連接起來。我們看一個典型的命令:?
cat file1 file2 | tr "[A-Z]" "[a-z]" | sort | tail -3? ? 上面這部分的命令表示從file1, file2兩個文件里讀內容,然后將里面的大寫字母轉換成小寫字母,然后再排序。最后取排序后的最后3個字符。
? ? 我們這里其實不是關心這個命令做了什么,而是這些命令它們執行的方式。實際上,在linux里面,上述的幾個命令它們完全是并發執行的,前面的cat命令可能是讀取了一部分文件的內容經由tr命令替換字符后,再由sort命令排序。它們的執行過程如下圖所示:
? ? ?上述的執行過程類似于一個工廠里的生產流水線,在每個生產的步驟里,它不是等前面一個步驟要生產的所有東西都完成才做下一步,而是前面做完一部分就馬上傳遞給后面一個部分。這樣才能實現所有步驟的并發工作。如果熟悉python的同學,也許會聯想到里面的generator的功能,它的功能也是類似的。
? ? 那么,上述的這種流水線式的編程方式有什么好處呢?除了前面提到的它可以使得我們充分利用計算機的并發能力,還能夠處理一些數據量很大的場景。因為它不是所有的數據都要一次性的放到內存里來處理。另外,它的每個步驟如果定義好之后,確實可以結合前面函數式編程的討論得到一個很好的應用。
? ? 現在,java 8里面引入的stream特性,就是給我們帶來了上述的好處。我們來詳細分析一下。
?
示例對比
? ? 假設我們有一個如下類:
import java.util.*;public class Dish {private final String name;private final boolean vegetarian;private final int calories;private final Type type;public Dish(String name, boolean vegetarian, int calories, Type type) {this.name = name;this.vegetarian = vegetarian;this.calories = calories;this.type = type;}public String getName() {return name;}public boolean isVegetarian() {return vegetarian;}public int getCalories() {return calories;}public Type getType() {return type;}public enum Type { MEAT, FISH, OTHER }@Overridepublic String toString() {return name;}public static final List<Dish> menu =Arrays.asList( new Dish("pork", false, 800, Dish.Type.MEAT),new Dish("beef", false, 700, Dish.Type.MEAT),new Dish("chicken", false, 400, Dish.Type.MEAT),new Dish("french fries", true, 530, Dish.Type.OTHER),new Dish("rice", true, 350, Dish.Type.OTHER),new Dish("season fruit", true, 120, Dish.Type.OTHER),new Dish("pizza", true, 550, Dish.Type.OTHER),new Dish("prawns", false, 400, Dish.Type.FISH),new Dish("salmon", false, 450, Dish.Type.FISH)); }? ? 這個示例稍微有點長,主要是定義了一個Dish對象,然后初始化了一個Dish的list。?
? ? 現在假設我們需要做一些如下的操作,首先獲取列表里卡路里小于400的元素,然后再根據卡路里的數值進行排序,最后我們再返回這些排序后的元素的名字。如果按照我們通常的理解,會做一個如下的實現:
List<Dish> lowCaloricDishes = new ArrayList<>(); for(Dish d: menu) {if(d.getCalories() < 400) {lowCaloricDishes.add(d);} } Collections.sort(lowCaloricDishes, new Comparator<Dish>() {public int compare(Dish d1, Dish d2) {return Integer.compare(d1.getCalories(), d2.getCalories()); } }); List<String> lowCaloricDishesName = new ArrayList<>(); for(Dish d: lowCaloricDishes) {lowCaloricDishesName.add(d.getName()); }?? ?上面這部分的代碼看起來很中規中矩,當然,也顯得有點啰嗦。具體它的特點以及與后面的代碼對比會在后面詳細說。如果我們用stream api來實現上述的邏輯該怎么做呢?
?
import static java.util.Comparator.comparing; import static java.til.stream.Collectors.toList;List<String> lowCaloricDishesName = menu.stream().filter(d -> d.getCalories < 400).sorted(comparing(Dish::getCalories)).map(Dish::getName).collect(toList());? ? ?現在我們來詳細比較一下兩種寫法上的差別。在第一種寫法上,我們需要過濾數據元素的時候需要使用一個臨時的list來保存過濾后的結果,然后再將過濾后的元素排序。因為我們最后需要的是一個排序后元素的名字列表,于是沒辦法,又要創建一個list,將里面的元素一個個的獲取出來再填充到這個list里。所以綜合來說,這種方法需要創建大量臨時的列表。這樣不但使得程序變得冗長難懂,而且創建的這些臨時的列表也增加了程序垃圾回收的壓力。
? ? 我們再看stream api的實現方式。上述代碼的實現更加是聲明式的,它的處理流程更加像一個流水線的方式。我們首先利用filter方法來過濾元素,然后調用sorted方法來排序,最后用map方法來轉換提取的元素。這種寫法不僅更加簡潔而且更加高效。關于這些具體方法的意思我們在后續部分詳細討論。
?
stream定義
? ? 從前面使用手法上來看,stream的使用像是一個流水線。在這個流水線里,它好像有一個自動推進的流程,我們使用者只需要指定對它的各種轉換操作就可以了。從更嚴格的意義來說,stream是一組定義的計算序列,這種結構將一系列的操作給串聯起來。所以如果熟悉設計模式的人會覺得這就像是一個chain of responsibility模式。當然,從函數式編程的理論角度來說,它表示的是一個叫monad的結構。
? ? 因此,從定義的角度來說,stream定義的并不是一個普通意義上的數據流,它實際上是一個計算流,表示一組計算的順序。它有一些典型的特性,比如內循環(internal iteration), 惰性計算(laziness)等。我們結合它們和集合類的比較來一起討論。
?
內迭代和外迭代(internal iteration vs external iteration)
? ? 在前面示例代碼里,我們已經比對過兩種實現方法,對于第一種方法來說,它需要顯式的定義一個循環迭代的過程。比如:
for(Dish d: menu) {if(d.getCalories() < 400) {lowCaloricDishes.add(d);} }? ? ?這部分代碼的本質是集合實現了一個iterable的接口,然后在這個循環里調用iterator()方法這樣依次的遍歷集合里的元素。這種方式實現的代碼有如下幾個問題:
1. for循環本身就是串行的過程,所有集合里元素處理的順序必須按照定義好的順序來處理。
2. 因為這種循環是由開發人員來寫的,而不是本身庫內部定義的,這樣系統比較難做一些內在的優化,比如數據的重排序,潛在并行性的利用等。
? ? 尤其是牽涉到大量數據和性能的時候,如果有更加好的方式來優雅的處理程序邏輯將更加受到歡迎。
? ? 與前面對應的是另外一種遍歷方式,稱為內部迭代。和上述代碼對應的一種實現如下:
menu.stream().filter(d -> d.getCalories < 400)? ? ?從語法上看起來它只是一個很小的變化,但是它的實際實現卻是差別很大的。因為這里的代碼并沒有顯式的定義循環處理的過程,真正迭代處理的過程相當于交給類庫來處理了。類庫的實現可以潛在的利用一些優化的手段來使得程序的執行性能更加高效。所以一旦看到stream的時候,對它執行運算時就好像已經在一個生產線的傳送帶上了。所有需要做的事情就是將一些具體的操作傳遞給這個流水線。
? ? 前面這種方式的實現實際上將要做什么和怎么做是混在一起的。比如說我需要過濾出來所有卡路里小于400的菜,這里就需要循環遍歷所有的菜列表。而后面的這種方式更像是一個聲明,只是說我需要過濾某個東西。而這個東西的條件就是一個lambda表達式,至于它的過濾是怎么實現的我們可以不用去關心了。 這樣整個業務邏輯的代碼實現也更加清晰簡練。
?
stream工作方式
不可變性
? ? 基于前面的示例,我們可能有若干個疑問,因為前面按照傳統的方式來實現的功能需要用到臨時的列表,必然要修改一些元素的屬性。那么在stream里面,我們調用的那些處理方法它會不會修改原有stream數據源的值呢?我們看如下的代碼:
?
List<String> myList = new ArrayList<>(); myList.add("a1"); myList.add("a2"); myList.add("b1"); myList.add("c2"); myList.add("c1"); myList.stream().filter(s -> s.startsWith("a")).map(String::toUpperCase).sorted().forEach(System.out::println); System.out.println(myList);? ?它的輸出如下:
A1 A2 [a1, a2, b1, c2, c1]? ? 上述代碼里的filter, map等方法并沒有修改stream里源的內容。它僅僅是根據當前的轉換操作新建一個元素。這種思路恰恰也是和copy on write的數據結構暗合的。而且它對于以后的并發處理也是有巨大的好處。
?
不可重復使用
? ? stream還有一個典型的特征就是它不能被重復使用,比如說我們嘗試如下的代碼:
Stream<String> stream = myList.stream();stream.anyMatch(s -> true); stream.anyMatch(s -> true);? ?在編譯的時候沒有問題,而運行的時候將出現如下的錯誤:
?
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closedat java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)at java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449)at Sample.main(Sample.java:20)? ? 因此,凡是我們使用的stream它就相當于一次性的用品,用完之后就會被close了。如果我們需要再利用stream進一步的操作需要重新聲明一個新的stream。
?
兩種運算
? ? 在前面的代碼里還要一個需要我們深入了解的地方就是,我們能夠對一個stream做哪些操作呢?像前面的filter, map, forEach, collect等。它們有什么作用呢?
? ? 在stream里,主要有兩種運算,一種叫中間運算(intermediate),還要一種是終止運算(terminal)。比如前面的filter, map運算。filter運算僅僅過濾stream里的元素,但是返回的依然是一個Stream<String>類型。同樣,map操作也僅僅實現一個元素的轉換。如果我們有一些類型轉換的話,實際上也只是將一種類型參數的Stream轉換成另外一種Stream。而終止運算比如前面的collect,它將一個Stream又轉換成了一個List,類似的它還要toSet等方法。這些方法使得stream的處理終止。所以我們稱之為終止運算方法。關于intermediate和terminal方法的詳細介紹可以參考Stream的官方文檔,如下鏈接。
??
惰性計算(laziness)
? ? stream里還要一個比較典型的特性就是惰性計算。像前面stream里的一些典型運算filter, mapping。它們可以通過急性求值的方式來實現。以filter方法為例,也就是說在方法返回前,急性求值就需要完成對所有數據元素的過濾。而惰性計算則是當需要的時候才進行過濾運算。在實際應用中,惰性計算的方式更加有優勢一些。因為我們將它和流水線后續的一些操作結合在一起來運算,而不用多次遍歷數據。在某些場景里,比如說我們需要遍歷一個非常大的集合去尋找第一個匹配某種條件的數據,我們完全可以在找到第一個匹配的元素時就返回,而不用真正去完整遍歷整個集合。這種特性尤其在數據集合有無限個長度的情況下用處比較明顯。
? ? 我們來看一個如下的示例:
Stream.of("d2", "a2", "b1", "b3", "c").filter(s -> {System.out.println("filter: " + s);return true;});? ? 上述的stream操作里只有一個filter操作,相當于只是做了一個stream轉換成另外一個stream的操作,并沒有一個terminal的操作。如果運行上面的代碼的話,則不會有任何輸出。
? ? 總的來說,對于一個stream的操作它會盡量采用惰性計算的方式以實現滿足目標結果。
?
stream執行順序
? ? 還有一個比較值得讓人關心的就是stream處理元素的執行順序。它是按照前面示例里某個運算一次將所有的數據處理完之后再傳遞給下一個呢還是一次處理一個傳遞下去呢?我們再來看如下的代碼:
Stream.of("d2", "a2", "b1", "b3", "c").filter(s -> {System.out.println("filter: " + s);return true;}).forEach(s -> System.out.println("forEach: " + s));? ? ?運行上面這部分程序的輸出如下:
filter: d2 forEach: d2 filter: a2 forEach: a2 filter: b1 forEach: b1 filter: b3 forEach: b3 filter: c forEach: c? ? 可見,在stream里對元素的處理是按照流水線的方式來進行的。因此它不需要額外的利用集合的數據結構來保存中間結果。這種方式在處理海量數據的時候帶來非常遍歷的特性。
?
Optional類型
? ? Stream api帶來的另外一個影響就是引入了optional類型的數據。關于optional類型數據的詳細討論會在后面的文章里描述。這里只是一個簡單的敘述。我們來看如下的示例:
Optional<Shape> firstBlue = shapes.stream().filter(s -> s.getColor() == BLUE).findFirst();? ? 在shapes的stream里通過filter方法來過濾一個符合color == BLUE的元素。實際上返回的結果可能存在有這樣的元素,也可能不存在這樣的元素。于是針對這種可能存在也可能不存在的類型元素,這里引入了Optional類型數據來描述它。通過引入Optional類型可以減少和規避很多容易出現nullpointerexception的情況。也算是對程序的一種改進。
?
stream的潛在并行性
? ? 前面提到過,在stream api里引入了一種使得運用并行開發更加簡便的方式,這就是 parallel stream。在目前多核體系結構比較普遍的情況下,大多數計算機都有多個核,如果只是使用以前的編程方式的話并不能充分發揮機器的性能。于是需要一種更好的方式來使用多核和多線程。在以往的java 多線程開發里,使用好多線程是一個很困難的任務。于是為了簡化對一些多線程情況下的使用,這里就引入了parallel stram。
? ? 需要注意的是,前面用的stream是對數據進行串行處理的,而這里使用并行處理的時候,它的使用方式則稍微有點差別。我們先來看一部分如下的代碼:
Arrays.asList("a1", "a2", "b1", "c2", "c1").parallelStream().filter(s -> {System.out.format("filter: %s [%s]\n",s, Thread.currentThread().getName());return true;}).map(s -> {System.out.format("map: %s [%s]\n",s, Thread.currentThread().getName());return s.toUpperCase();}).forEach(s -> System.out.format("forEach: %s [%s]\n",s, Thread.currentThread().getName()));? ? 這部分代碼看起來比較復雜,實際上和前面代碼的唯一差別就是stream()方法編程了parallelStream()。在每個處理步驟里都加入了打印的消息以方便我們跟蹤程序執行的過程。如果我們運行上述的代碼,會發現如下的輸出:
?
filter: b1 [main] map: b1 [main] filter: c2 [ForkJoinPool.commonPool-worker-4] filter: c1 [ForkJoinPool.commonPool-worker-3] map: c1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-3] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] forEach: A2 [ForkJoinPool.commonPool-worker-1] filter: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-4] forEach: C2 [ForkJoinPool.commonPool-worker-4] forEach: B1 [main] map: a1 [ForkJoinPool.commonPool-worker-2] forEach: A1 [ForkJoinPool.commonPool-worker-2]? ? 實際上,如果我們多次運行程序的話會發現每次的輸出還有點不一樣。當然,從輸出里我們還可以看到一個東西,就是輸出的線程名是屬于一個ForkJoinPool里的線程。也就是說它實際上運用了線程池。這里運用到的線程池就是java 7里引入的forkjoin pool。關于forkjoin pool的討論可以參考我前面一篇相關的分析文章。
?
總結
? ? Java 8 引入的stream api可以說是給前面函數式編程應用到的lambda表達式提供了一個極好的應用場景。它本質上是一個惰性計算流,它不像我們傳統使用的數據結構,需要事先分配內存空間,而是一種按需計算的模式。所以它更像是一個流水線式的計算模型。同時,它在默認的情況下是串行執行的,所以它的執行順序不一樣,但是可以利用很少的內存空間。另外,在stream api里也有很簡單支持并行計算的parallemstream,它本質上是運用了java的Forkjoin thread pool來實現并行的。這種方式大大簡化了我們并發編程的難度。
?
參考材料
http://cr.openjdk.java.net/~briangoetz/lambda/lambda-libraries-final.html
http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/
http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html
http://www.oracle.com/technetwork/articles/java/architect-streams-pt2-2227132.html
http://www.amazon.com/Java-Action-Lambdas-functional-style-programming/dp/1617291994/ref=sr_1_1?s=books&ie=UTF8&qid=1447684950&sr=1-1&keywords=java+8+in+action
總結
以上是生活随笔為你收集整理的Java 8 stream学习的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 开怀一笑
- 下一篇: Java程序员跳槽的经验开怀之谈(现在很