Stream是 Java 8新增加的類,用來補充集合類。
Stream代表數據流,流中的數據元素的數量可能是有限的,也可能是無限的。
Stream和其它集合類的區別在於:其它集合類主要關注與有限數量的數據的訪問和有效管理(增刪改),而Stream並沒有提供訪問和管理元素的方式,而是通過聲明數據源的方式,利用可計算的操作在數據源上執行,當然BaseStream.iterator()和BaseStream.spliterator()操作提供了遍歷元素的方法。
Java Stream提供了提供了串行和並行兩種類型的流,保持一致的接口,提供函數式編程方式,以管道方式提供中間操作和最終執行操作,為Java語言的集合提供了現代語言提供的類似的高階函數操作,簡化和提高了Java集合的功能。
本文首先介紹Java Stream的特點,然後按照功能分類逐個介紹流的中間操作和終點操作,最後會介紹第三方為Java Stream做的擴展。
前年年底的時候我寫了一些關於Java 8 Lambda和Stream的文章,本文應該在那個時候完成。後來忙於項目和寫《Scala集合技術手冊》(Scala Collections Cookbook)這本書,一直沒來得及寫Java Stream的文章,現在這篇文章算是對 Java Stream的一個總結吧。
本節翻譯整理自 Javadoc ,並對流的這些特性做了進一步的解釋。
Stream接口還包含幾個基本類型的子接口如IntStream, LongStream 和 DoubleStream。
關於流和其它集合具體的區別,可以參照下面的列表:
流的操作是以管道的方式串起來的。流管道包含一個數據源,接著包含零到N個中間操作,最後以一個終點操作結束。
所有的流操作都可以串行執行或者並行執行。除非顯示地創建並行流,否則Java庫中創建的都是串行流。Collection.stream()為集合創建串行流而Collection.parallelStream()為集合創建並行流。IntStream.range(int, int)創建的是串行流。通過parallel()方法可以將串行流轉換成並行流,sequential()方法將流轉換成並行流。
除非方法的Javadoc中指明了方法在並行執行的時候結果是不確定(比如findAny、forEach),否則串行和並行執行的結果應該是一樣的。
流可以從非線程安全的集合中創建,當流的管道執行的時候,非concurrent數據源不應該被改變。下面的代碼會拋出java.util.ConcurrentModificationException異常:
List<String> l = new ArrayList(Arrays.asList("one", "two"));Stream<String> sl = l.stream();sl.forEach(s -> l.add("three"));
在設置中間操作的時候,可以更改數據源,只有在執行終點操作的時候,才有可能出現並發問題(拋出異常,或者不期望的結果),比如下面的代碼不會拋出異常:
List<String> l = new ArrayList(Arrays.asList("one", "two"));Stream<String> sl = l.stream();l.add("three");sl.forEach(System.out::println);
對於concurrent數據源,不會有這樣的問題,比如下面的代碼很正常:
List<String> l = new CopyOnWriteArrayList<>(Arrays.asList("one", "two"));Stream<String> sl = l.stream();sl.forEach(s -> l.add("three"));
雖然我們上面例子是在終點操作中對非並發數據源進行修改,但是非並發數據源也可能在其它線程中修改,同樣會有並發問題。
大部分流的操作的參數都是函數式接口,可以使用Lambda表達式實現。它們用來描述用戶的行為,稱之為行為參數(behavioral parameters)。
如果這些行為參數有狀態,則流的操作的結果可能是不確定的,比如下面的代碼:
List<String> l = new ArrayList(Arrays.asList("one", "two", ……));class State { boolean s;}final State state = new State();Stream<String> sl = l.stream().map(e -> { if (state.s) return "OK"; else { state.s = true; return e; } });sl.forEach(System.out::println);
上面的代碼在並行執行時多次的執行結果可能是不同的。這是因為這個lambda表達式是有狀態的。
有副作用的行為參數是被鼓勵使用的。
副作用指的是行為參數在執行的時候有輸入輸入,比如網絡輸入輸出等。
這是因為Java不保證這些副作用對其它線程可見,也不保證相同流管道上的同樣的元素的不同的操作運行在同一個線程中。
很多有副作用的行為參數可以被轉換成無副作用的實現。一般來說println()這樣的副作用代碼不會有害。
ArrayList<String> results = new ArrayList<>();stream.filter(s -> pattern.matcher(s).matches()) .forEach(s -> results.add(s)); // 副作用代碼
上面的代碼可以改成無副作用的。
List<String>results = stream.filter(s -> pattern.matcher(s).matches()) .collect(Collectors.toList()); // No side-effects!
某些流的返回的元素是有確定順序的,我們稱之為encounter order。這個順序是流提供它的元素的順序,比如數組的encounter order是它的元素的排序順序,List是它的迭代順序(iteration order),對於HashSet,它本身就沒有encounter order。
一個流是否是encounter order主要依賴數據源和它的中間操作,比如數據源List和Array上創建的流是有序的(ordered),但是在HashSet創建的流不是有序的。
sorted()方法可以將流轉換成有序的,unordered可以將流轉換成無序的。除此之外,一個操作可能會影響流的有序,比如map方法,它會用不同的值甚至類型替換流中的元素,所以輸入元素的有序性已經變得沒有意義了,但是對於filter方法來說,它只是丟棄掉一些值而已,輸入元素的有序性還是保障的。
對於串行流,流有序與否不會影響其性能,只是會影響確定性(determinism),無序流在多次執行的時候結果可能是不一樣的。
對於並行流,去掉有序這個約束可能會提供性能,比如distinct、groupingBy這些聚合操作。
一個操作或者函數op滿足結合性意味著它滿足下面的條件:
(a op b) op c == a op (b op c)
對於並發流來說,如果操作滿足結合性,我們就可以並行計算:
a op b op c op d == (a op b) op (c op d)
比如min、max以及字符串連接都是滿足結合性的。
可以通過多種方式創建流:
1、通過集合的stream()方法或者parallelStream(),比如Arrays.asList(1,2,3).stream()。
2、通過Arrays.stream(Object[])方法, 比如Arrays.stream(new int[]{1,2,3})。
3、使用流的靜態方法,比如Stream.of(Object[]),IntStream.range(int, int)或者Stream.iterate(Object, UnaryOperator),如Stream.iterate(0, n -> n * 2),或者generate(Supplier<T> s)如Stream.generate(Math::random)。
4、BufferedReader.lines()從文件中獲得行的流。
5、Files類的操作路徑的方法,如list、find、walk等。
6、隨機數流Random.ints()。
7、其它一些類提供了創建流的方法,如BitSet.stream(),Pattern.splitAsStream(java.lang.CharSequence), 和JarFile.stream()。
8、更底層的使用StreamSupport,它提供了將Spliterator轉換成流的方法。
中間操作會返回一個新的流,並且操作是延遲執行的(lazy),它不會修改原始的數據源,而且是由在終點操作開始的時候才真正開始執行。這個Scala集合的轉換操作不同,Scala集合轉換操作會生成一個新的中間集合,顯而易見Java的這種設計會減少中間對象的生成。
下面介紹流的這些中間操作:
distinct保證輸出的流中包含唯一的元素,它是通過Object.equals(Object)來檢查是否包含相同的元素。
List<String> l = Stream.of("a","b","c","b") .distinct() .collect(Collectors.toList());System.out.println(l); //[a, b, c]
filter返回的流中只包含滿足斷言(predicate)的數據。
下面的代碼返回流中的偶數集合。
List<Integer> l = IntStream.range(1,10) .filter( i -> i % 2 == 0) .boxed() .collect(Collectors.toList());System.out.println(l); //[2, 4, 6, 8]
map方法將流中的元素映射成另外的值,新的值類型可以和原來的元素的類型不同。
下面的代碼中將字符元素映射成它的哈希碼(ASCII值)。
List<Integer> l = Stream.of('a','b','c') .map( c -> c.hashCode()) .collect(Collectors.toList());System.out.println(l); //[97, 98, 99]
flatmap方法混合了map+flattern的功能,它將映射後的流的元素全部放入到一個新的流中。它的方法定義如下:
<R> Stream<R> flatMap(Function<? super T,? extends Stream<? extends R>> mapper)
可以看到mapper函數會將每一個元素轉換成一個流對象,而flatMap方法返回的流包含的元素為mapper生成的流中的元素。
下面這個例子中將一首唐詩生成一個按行分割的流,然後在這個流上調用flatmap得到單詞的小寫形式的集合,去掉重復的單詞然後打印出來。
String poetry = "Where, before me, are the ages that have gone?/n" + "And where, behind me, are the coming generations?/n" + "I think of heaven and earth, without limit, without end,/n" + "And I am all alone and my tears fall down.";Stream<String> lines = Arrays.stream(poetry.split("/n"));Stream<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")));List<String> l = words.map( w -> { if (w.endsWith(",") || w.endsWith(".") || w.endsWith("?")) return w.substring(0,w.length() -1).trim().toLowerCase(); else return w.trim().toLowerCase();}).distinct().sorted().collect(Collectors.toList());System.out.println(l); //[ages, all, alone, am, and, are, before, behind, coming, down, earth, end, fall, generations, gone, have, heaven, i, limit, me, my, of, tears, that, the, think, where, without]
flatMapToDouble、flatMapToInt、flatMapToLong提供了轉換成特定流的方法。
limit方法指定數量的元素的流。對於串行流,這個方法是有效的,這是因為它只需返回前n個元素即可,但是對於有序的並行流,它可能花費相對較長的時間,如果你不在意有序,可以將有序並行流轉換為無序的,可以提高性能。
List<Integer> l = IntStream.range(1,100).limit(5) .boxed() .collect(Collectors.toList());System.out.println(l);//[1, 2, 3, 4, 5]
peek方法方法會使用一個Consumer消費流中的元素,但是返回的流還是包含原來的流中的元素。
String[] arr = new String[]{"a","b","c","d"};Arrays.stream(arr) .peek(System.out::println) //a,b,c,d .count();
sorted()將流中的元素按照自然排序方式進行排序,如果元素沒有實現Comparable,則終點操作執行時會拋出java.lang.ClassCastException異常。sorted(Comparator<? super T> comparator)可以指定排序的方式。
對於有序流,排序是穩定的。對於非有序流,不保證排序穩定。
String[] arr = new String[]{"b_123","c+342","b#632","d_123"};List<String> l = Arrays.stream(arr) .sorted((s1,s2) -> { if (s1.charAt(0) == s2.charAt(0)) return s1.substring(2).compareTo(s2.substring(2)); else return s1.charAt(0) - s2.charAt(0); }) .collect(Collectors.toList());System.out.println(l); //[b_123, b#632, c+342, d_123]
skip返回丟棄了前n個元素的流,如果流中的元素小於或者等於n,則返回空的流。
public boolean allMatch(Predicate<? super T> predicate)public boolean anyMatch(Predicate<? super T> predicate)public boolean noneMatch(Predicate<? super T> predicate)
這一組方法用來檢查流中的元素是否滿足斷言。
allMatch只有在所有的元素都滿足斷言時才返回true,否則flase,流為空時總是返回true
anyMatch只有在任意一個元素滿足斷言時就返回true,否則flase,
noneMatch只有在所有的元素都不滿足斷言時才返回true,否則flase,
System.out.println(Stream.of(1,2,3,4,5).allMatch( i -> i > 0)); //true System.out.println(Stream.of(1,2,3,4,5).anyMatch( i -> i > 0)); //true System.out.println(Stream.of(1,2,3,4,5).noneMatch( i -> i > 0)); //falseSystem.out.println(Stream.<Integer>empty().allMatch( i -> i > 0)); //true System.out.println(Stream.<Integer>empty().anyMatch( i -> i > 0)); //false System.out.println(Stream.<Integer>empty().noneMatch( i -> i > 0)); //true
count方法返回流中的元素的數量。它實現為:
mapToLong(e -> 1L).sum();
<R,A> R collect(Collector<? super T,A,R> collector)<R> R collect(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner)
使用一個collector執行mutable reduction操作。輔助類 Collectors 提供了很多的collector,可以滿足我們日常的需求,你也可以創建新的collector實現特定的需求。它是一個值得關注的類,你需要熟悉這些特定的收集器,如聚合類averagingInt、最大最小值maxByminBy、計數counting、分組groupingBy、字符串連接joining、分區partitioningBy、匯總summarizingInt、化簡reducing、轉換toXXX等。
第二個提供了更底層的功能,它的邏輯類似下面的偽代碼:
R result = supplier.get();for (T element : this stream) accumulator.accept(result, element);return result;
例子:
List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);String concat = stringStream.collect(StringBuilder::new, StringBuilder::append, StringBuilder::append) .toString();
findAny()返回任意一個元素,如果流為空,返回空的Optional,對於並行流來說,它只需要返回任意一個元素即可,所以性能可能要好於findFirst(),但是有可能多次執行的時候返回的結果不一樣。findFirst()返回第一個元素,如果流為空,返回空的Optional。
forEach遍歷流的每一個元素,執行指定的action。它是一個終點操作,和peek方法不同。這個方法不擔保按照流的encounter order順序執行,如果對於有序流按照它的encounter order順序執行,你可以使用forEachOrdered方法。
Stream.of(1,2,3,4,5).forEach(System.out::println);
max返回流中的最大值,min返回流中的最小值。
reduce是常用的一個方法,事實上很多操作都是基於它實現的。它有幾個重載方法:
pubic Optional<T> reduce(BinaryOperator<T> accumulator)pubic T reduce(T identity, BinaryOperator<T> accumulator)pubic <U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)
第一個方法使用流中的第一個值作為初始值,後面兩個方法則使用一個提供的初始值。
Optional<Integer> total = Stream.of(1,2,3,4,5).reduce( (x, y) -> x +y);Integer total2 = Stream.of(1,2,3,4,5).reduce(0, (x, y) -> x +y);
值得注意的是accumulator應該滿足結合性(associative)。
將流中的元素放入到一個數組中。
concat用來連接類型一樣的兩個流。
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)
“
toArray方法將一個流轉換成數組,而如果想轉換成其它集合類型,西需要調用collect方法,利用Collectors.toXXX方法進行轉換:
public static <T,C extends Collection<T>> Collector<T,?,C> toCollection(Supplier<C> collectionFactory)public static …… toConcurrentMap(……)public static <T> Collector<T,?,List<T>> toList()public static …… toMap(……)public static <T> Collector<T,?,Set<T>> toSet()
雖然Stream提供了很多的操作,但是相對於Scala等語言,似乎還少了一些。一些開源項目提供了額外的一些操作,比如 protonpack 項目提供了下列方法:
java8-utils 也提供了一些有益的輔助方法。