集合的聚合操作
相關(guān)知識
lambda表達(dá)式(Lambda Expression)
方法引用(Method References)
lambda表達(dá)式(Lambda Expression)
也叫匿名函數(shù),lambda表示式的句法:
- 括號包裹的,由逗號分割的參數(shù)列表。參數(shù)類型可以省略,如果只有一個(gè)參數(shù),括號也可以省略
- 箭頭->
- 表達(dá)體,由一個(gè)表達(dá)式或者語句組成。如果是語句的話,必須用花括號包起來{},java運(yùn)行時(shí)會計(jì)算表達(dá)式并返回它的值,或者,你也可以使用return語句
方法引用(Method References)
如果lambda表達(dá)式只是調(diào)用一個(gè)現(xiàn)成的方法,那么可以考慮使用方法引用,它更緊湊易讀。比如,對某個(gè)數(shù)組中的成員根據(jù)年齡排序:
lambda表示是:
Arrays.sort(rosterAsArray,(a, b) -> Person.compareByAge(a, b) );方法飲用:
Arrays.sort(rosterAsArray, Person::compareByAge);這兩種用法在語義上是相同的:
- 參數(shù)列表都是(Person, Person)
- 方法體都調(diào)用了Person.compareByAge
聚合
管道和流
聚合操作通過管道和流完成。管道的組成:
- 源:可以是集合、數(shù)組、生成器函數(shù)、I/O通道,通過調(diào)用源的.stream()方法可以產(chǎn)生一個(gè)流
- 零個(gè)或多個(gè)中間操作:每個(gè)操作產(chǎn)出一個(gè)新的流(stream),比如
- filter
- mapToInt
- map
- 一個(gè)終點(diǎn)操作:產(chǎn)出一個(gè)非流的結(jié)果,可以是基本數(shù)據(jù)類型,集合,或者不產(chǎn)出任何值
- forEach(不產(chǎn)出任何值)
- average,合并流中的元素返回一個(gè)結(jié)果,這類操作也叫做聚合(reduction),其他還有sum, min, max, count 等
- reduce方法,(類似于python中的reduce)
聚合操作和迭代器的區(qū)別
聚合操作,比如forEeach,和迭代器(或增強(qiáng)的for語句)很像,但是有些本質(zhì)不同:
- 聚合操作使用內(nèi)部委托,而迭代器使用外部迭代。聚合操作沒有next方法來指示它們該處理集合的下一個(gè)元素,而是使用內(nèi)部委托。通過這樣方式,你的程序只決定要迭代什么集合,至于怎么迭代,由JDK決定。而外部迭代是,你要決定迭代什么,以及如何迭代,并且只能串行迭代,這樣無法利用并行計(jì)算的優(yōu)勢。
- 聚合操作從流中處理元素
- 聚合操作支持參數(shù),可以將lambda表達(dá)式作為大部分聚合操作的參數(shù),這使你能自定義聚合操作的行為。
reduce方法
reduce方法接收兩個(gè)參數(shù):
- 初始值(或者默認(rèn)值)
- 累計(jì)器,一個(gè)函數(shù),接收兩個(gè)參數(shù),一個(gè)是目前累計(jì)計(jì)算的結(jié)果,一個(gè)是流中的下一個(gè)元素
示例:
Integer totalAgeReduce = roster.stream().map(Person::getAge).reduce(0,(a, b) -> a + b);collect方法
reduce方法每處理一個(gè)元素都會創(chuàng)建一個(gè)新的值,collect方法是修改現(xiàn)有的值。通過自定義一種容器類型來跟蹤值的變化(類似于python中利用字典的可變性來緩存值):
// 定義容器類型 class Averager implements IntConsumer {private int total = 0;private int count = 0;public double average() {return count > 0 ? ((double) total)/count : 0;}public void accept(int i) { total += i; count++; }public void combine(Averager other) {total += other.total;count += other.count;} }// 使用容器來收集結(jié)果 Averager averageCollect = roster.stream().filter(p -> p.getGender() == Person.Sex.MALE).map(Person::getAge).collect(Averager::new, Averager::accept, Averager::combine);System.out.println("Average age of male members: " +averageCollect.average());collect方法包含三個(gè)參數(shù):
- 供應(yīng)器:一個(gè)工廠函數(shù),創(chuàng)建一個(gè)新的結(jié)果容器
- 累計(jì)器:將元素累加進(jìn)
- 合并器:接收兩個(gè)結(jié)果容器,并合并它們的結(jié)果。在這個(gè)例子中,它修改一個(gè)Averager容器,將另一個(gè)Averager容器的count和total值加過來。
盡管已經(jīng)提供了average等操作,但如果你需要從流中的元素計(jì)算多個(gè)值,你可以使用collect操作和自定義類。
collect方法接收Collector類型的參數(shù)
將所有男性成員放入集合中:
List<String> namesOfMaleMembersCollect = roster.stream().filter(p -> p.getGender() == Person.Sex.MALE).map(p -> p.getName()).collect(Collectors.toList());我們知道,collect方法接收3個(gè)參數(shù),而Collectors類封裝了一些函數(shù),可以用作collect方法的參數(shù)。Collectors類的大部分操作返回一個(gè)Collector實(shí)例,而不是集合,toList也不例外。
根據(jù)性別分組:
Map<Person.Sex, List<Person>> byGender =roster.stream().collect(Collectors.groupingBy(Person::getGender));groupingBy操作返回返回一個(gè)映射,健是性別,值是該性別下Person的列表。
這個(gè)操作也可以接受兩個(gè)參數(shù),比如,根據(jù)性別分組,并且只要每組下成員的名字:
Map<Person.Sex, List<String>> namesByGender =roster.stream().collect(Collectors.groupingBy(Person::getGender, Collectors.mapping(Person::getName,Collectors.toList())));這里接收兩個(gè)參數(shù),一個(gè)是分類函數(shù),一個(gè)是Collector實(shí)例,后者被稱作下游收集器。一個(gè)管道如果包含一個(gè)或多個(gè)下游收集器,就稱之為多層聚合。
統(tǒng)計(jì)每組中年齡的總和:
Map<Person.Sex, Integer> totalAgeByGender =roster.stream().collect(Collectors.groupingBy(Person::getGender, Collectors.reducing(0,Person::getAge,Integer::sum)));這里的reducing操作接收三個(gè)參數(shù):
- 初始值,和Stream.reduce操作類似
- 映射函數(shù)
- 聚合操作
并行
集合不是線程安全的,但是聚合操作和并行流允許你使用集合實(shí)現(xiàn)并行計(jì)算,前提是你在操作集合時(shí)不要修改它。注意,并行不一定比串行更快,除非你有足夠的數(shù)據(jù)和處理器核心。盡管聚合操作使你更容易實(shí)現(xiàn)并行,但你還是要評估下你的程序是否適合并行。
并行流
創(chuàng)建的流默認(rèn)是串行的,要?jiǎng)?chuàng)建一個(gè)并行流,調(diào)用Collection.parallelStream方法即可。并行執(zhí)行一個(gè)流時(shí),java運(yùn)行時(shí)將流切分為多個(gè)子流,聚合操作以并行的方式遍歷并處理子流,然后合并結(jié)果。
double average = roster// 創(chuàng)建并行流.parallelStream().filter(p -> p.getGender() == Person.Sex.MALE).mapToInt(Person::getAge).average().getAsDouble();并發(fā)聚合
使用并發(fā)聚合,根據(jù)性別分組:
ConcurrentMap<Person.Sex, List<Person>> byGender =roster.parallelStream().collect(Collectors.groupingByConcurrent(Person::getGender));注意例子中包含的關(guān)鍵字:Concurrent
順序
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8}; List<Integer> listOfIntegers =new ArrayList<>(Arrays.asList(intArray));System.out.println("listOfIntegers: "); listOfIntegers.stream().forEach(e -> System.out.print(e + " ")); System.out.println("");System.out.println("listOfIntegers sorted in reverse order:"); Comparator<Integer> normal = Integer::compare; Comparator<Integer> reversed = normal.reversed(); // 對列表進(jìn)行倒序排列 Collections.sort(listOfIntegers, reversed); listOfIntegers.stream().forEach(e -> System.out.print(e + " ")); System.out.println("");System.out.println("parallel stream"); listOfIntegers.parallelStream().forEach(e -> System.out.print(e + " ")); System.out.println("");System.out.println("Another parallel stream:"); listOfIntegers.parallelStream().forEach(e -> System.out.print(e + " ")); System.out.println("");System.out.println("With forEachOrdered:"); listOfIntegers.parallelStream().forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");執(zhí)行結(jié)果如下:
listOfIntegers: 1 2 3 4 5 6 7 8 listOfIntegers sorted in reverse order: 8 7 6 5 4 3 2 1 parallel stream 3 4 6 2 5 8 7 1 Another parallel stream: 3 1 2 7 6 5 4 8 With forEachOrdered: 8 7 6 5 4 3 2 1第三個(gè)和第四個(gè)管道打印的元素順序是隨機(jī)的。記住,處理流中的元素時(shí),這些操作使用的是內(nèi)部遍歷。結(jié)果就是,除非由流操作另行指定,否則在并行執(zhí)行一個(gè)流時(shí),java編譯器和運(yùn)行時(shí)決定以什么順序來處理流中的元素,以最大程度地發(fā)揮并行計(jì)算的優(yōu)勢。
第五個(gè)管道使用forEachOrdered方法,不論流是串行還是并行執(zhí)行,都以源中指定的順序來處理流中的元素。需要注意的是,在并行流中使用這種操作可能會無法利用并行計(jì)算的優(yōu)勢。
副作用
如果方法或者表達(dá)式除了返回或者產(chǎn)出值之外,還修改了計(jì)算機(jī)的狀態(tài),則它具有副作用。比如可變聚合,或者調(diào)用系統(tǒng)的打印方法。一個(gè)返回空的lambda表達(dá)式,比如它只是調(diào)用System.out.println方法,除了有副作用外什么也做不了。盡管JDK能很好地處管道中某些副作用,像forEach和peek這樣的方法在設(shè)計(jì)時(shí)也考慮了副作用,但是在并行流中使用這些操作也要小心,因?yàn)閖ava運(yùn)行時(shí)會從多個(gè)線程并發(fā)地調(diào)用你指定為參數(shù)的lambda表達(dá)式。切勿將lambda表達(dá)式作為參數(shù)傳遞,它會對filter, map等操作產(chǎn)生副作用。下面討論干擾和有狀態(tài)的lambda表達(dá)式,這二者是副作用的來源,尤其是在并行流中,可能返回不一致或不可預(yù)測的結(jié)果。首先需要了解惰性這個(gè)概念,因?yàn)樗苯佑绊懜蓴_。
惰性
如果一個(gè)表達(dá)式、方法或者算法的值只有在需要時(shí)才求值,那么它們就是惰性的。所有的中間操作都是惰性的,在終點(diǎn)操作開始前,中間操作不會開始處理流中的內(nèi)容。這種惰性允許java編譯器和運(yùn)行時(shí)優(yōu)化對流的處理。比如管道:filter-mapToInt-average,其中filter、mapToInt是中間操作,average是終點(diǎn)操作。average操作可以從mapToInt操作創(chuàng)建的流中獲取前幾個(gè)元素,而mapToInt則從filer操作創(chuàng)建的流中獲取元素。average操作一直重復(fù)這一過程,直到它獲取了流中所有的元素,然后再計(jì)算平均值。(個(gè)人理解:average需要幾個(gè)值,mapToInt就從filter中拿幾個(gè),而filter就從源中拿幾個(gè))
干擾
如果一個(gè)管道正在處理流時(shí),流的源被修改了,就會發(fā)生干擾。
try {List<String> listOfStrings = new ArrayList<>(Arrays.asList("one", "two"));String concatenatedString = listOfStrings.stream().peek(s -> listOfStrings.add("three")).reduce((a, b) -> a + " " + b).get();System.out.println("concatenatedString: " + concatenatedString); } catch (Exception e) {// ConcurrentModificationExceptionSystem.out.println(e); }這個(gè)示例用reduce操作(這是一個(gè)終點(diǎn)操作)拼接列表中的字符串。但是管道調(diào)用了中間操作peek,來嘗試往列表中添加一個(gè)新的元素。所有的中間操作都是惰性的,當(dāng)get被調(diào)用時(shí),管道才開始執(zhí)行。當(dāng)get操作完成后,管道結(jié)束執(zhí)行。peek操作的參數(shù)試圖在管道執(zhí)行期間修改流的源,導(dǎo)致異常拋出。
有狀態(tài)的的lambda表達(dá)式
在流的操作中,避免使用有狀態(tài)的lambda表達(dá)式作為參數(shù)。這種表達(dá)式的結(jié)果依賴于某種狀態(tài),而這種狀態(tài)在管道的執(zhí)行過程中可能會改變。這回導(dǎo)致結(jié)果的不可確定性。
Integer[] arrOfIntegers = new Integer[] {3, 2, 1, 4, 5}; List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(arrOfIntegers)); List<Integer> serialStorage = new ArrayList<>();System.out.println("serial stream: "); listOfIntegers.stream().map( e -> {serialStorage.add(e); return e;}).forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");serialStorage.stream().forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");System.out.println("parallel stream: "); List<Integer> parallelStorage = Collections.synchronizedList(new ArrayList<>()); listOfIntegers.parallelStream()// 有狀態(tài)的lambda表達(dá)式:// ForEachOrdered操作按流指定的順序處理元素,不論流是串行還是并行執(zhí)行。// 但是,當(dāng)并行執(zhí)行一個(gè)流時(shí),map操作處理由java運(yùn)行時(shí)和編譯器指定的流中的元素,// 結(jié)果就是,每次運(yùn)行時(shí),當(dāng)前l(fā)ambda表達(dá)式添加元素的順序都會不一樣.map(e -> {parallelStorage.add(e); return e;}).forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");parallelStorage.stream().forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");運(yùn)行以上示例執(zhí)行結(jié)果如下:
serial stream: 3 2 1 4 5 3 2 1 4 5 parallel stream: 3 2 1 4 5 3 4 2 1 5 // 這里的結(jié)果每次不一樣注意,上面調(diào)用synchronizedList方法是為了確保列表是線程安全的。因?yàn)榧喜皇蔷€程安全的,這意味多個(gè)線程不應(yīng)該同時(shí)訪問一個(gè)集合。假如我們不這么做,比如改為下面這樣子:
List<Integer> parallelStorage = new ArrayList<>();那么執(zhí)行結(jié)果可能是這樣的:
//parallel stream: //3 2 1 4 5 //3 2 4 5總結(jié)
- 上一篇: 安装目录opencv for andro
- 下一篇: DM365 dvsdk_2_10_01_