Dabai的个人博客

Java 8新特性:流式处理

声明:本篇博客很多内容引用自Java 8 中的 Streams API 详解Java 8函数式编程,同时文中也有很多自己的理解和学习体会,希望通过这篇文章深入的介绍和总结Java 8的流式处理。

1. 什么是流

Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。

Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:

  • 1.0-1.4 中的 java.lang.Thread
  • 5.0 中的 java.util.concurrent
  • 6.0 中的 Phasers 等
  • 7.0 中的 Fork/Join 框架
  • 8.0 中的 Lambda

Stream 的另外一大特点是,数据源本身可以是无限的。

总的来说,Stream是用函数编程的方式在集合类上进行复杂操作的工具。

2. 流的构成

当我们使用一个流的时候,通常包括三个基本步骤:

获取一个数据源(source)→ 数据转换→执行操作获取想要的结果,每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道,如下图所示。

流的操作类型分为两种:

  • Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。

  • Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。

在对于一个 Stream 进行多次转换操作 (Intermediate 操作),每次都对 Stream 的每个元素进行转换,而且是执行多次,这样时间复杂度就是 N(转换次数)个 for 循环里把所有操作都做掉的总和吗?其实不是这样的,转换操作都是 lazy 的,多个转换操作只会在 Terminal 操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream 里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在 Terminal 操作的时候循环 Stream 对应的集合,然后对每个元素执行所有的函数。

例如,下面的Stream没有Terminal操作,不会输出任何信息!

1
2
3
4
5
6
List<Integer> nums = Arrays.asList(1,null,3,4,null,5);
nums.stream()
.filter(num -> {
System.out.println(num);
return num != null;
});

使用Terminal操作后,所有的中间惰性操作才会执行:

1
2
3
4
5
6
7
List<Integer> nums = Arrays.asList(1,null,3,4,null,5);
nums.stream()
.filter(num -> {
System.out.println(num);
return num != null;
})
.count();

上面代码打印的信息如下:

1
2
3
4
5
6
1
null
3
4
null
5

还有一种操作被称为 short-circuiting。指的是:

  • 对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个有限的新 Stream。
  • 对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。
  • 当操作一个无限大的 Stream,而又希望在有限时间内完成操作,则在管道内拥有一个 short-circuiting 操作是必要非充分条件。

3. 流的使用

简单说,对 Stream 的使用就是实现一个 filter-map-reduce 过程,产生一个最终结果,或者导致一个副作用(side effect)。

3.1 流的构造与转换

  • 流的构造过程就是对原来的集合进行包装,产生一个Stream对象,常见的构造流的方法如下:
1
2
3
4
5
6
7
8
9
// 1. Individual values
Stream stream = Stream.of("a", "b", "c");
// 2. Arrays
String [] strArray = new String[] {"a", "b", "c"};
stream = Stream.of(strArray);
stream = Arrays.stream(strArray);
// 3. Collections
List<String> list = Arrays.asList(strArray);
stream = list.stream();
  • 流转换为其它数据结构
1
2
3
4
5
6
7
8
9
// 1. Array
String[] strArray1 = stream.toArray(String[]::new);
// 2. Collection
List<String> list1 = stream.collect(Collectors.toList());
List<String> list2 = stream.collect(Collectors.toCollection(ArrayList::new));
Set set1 = stream.collect(Collectors.toSet());
Stack stack1 = stream.collect(Collectors.toCollection(Stack::new));
// 3. String
String str = stream.collect(Collectors.joining()).toString();

一个 Stream 只可以使用一次,上面的代码为了简洁而重复使用了数次。

3.2 流的操作

接下来,当把一个数据结构包装成 Stream 后,就要开始对里面的元素进行各类操作了。常见的操作可以归类如下。

  • Intermediate:

    map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered

  • Terminal:

    forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator

  • Short-circuiting:

    anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limi

需要注意的是所有的操作方法可接受的参数都是lambda表达式(没有参数的操作方法除外)。

3.2.1 collect(toList())

该操作是由Stream生成一个列表,是一个及时操作。

3.2.2 map

map将一个流转化为一个新的流,是惰性操作。操作过程如下图所示:

下面是一个转换大小写的例子:

1
2
3
List<String> collected = Stream.of("a", "b", "hello")
.map(string -> string.toUpperCase()) //lambda
.collect(Collectors.toList());

注意:传给map的参数是一个lambda表达式,该lambda表达式必须是Function结构的实例,即lambda表达式只接受一个参数,并且返回一个值,接受参数类型和返回参数的类型可以不一样。Function接口如下图所示:

3.2.3 flatMap

Java 8中,除了一些基本的对象类型,如String,Object等,流也可以是集合或数组类型,如:

1
2
3
4
Stream<String[]>
Stream<Set<String>>
Stream<List<String>>
Stream<List<Object>>

然而,流的很多操作,如filter,sum,distinct等以及收集器collectors的处理都不支持这些流类型。因此需要使用flatmap对这些类型的流做转换:

1
2
3
4
Stream<String[]>	-> flatMap ->	Stream<String>
Stream<Set<String>> -> flatMap -> Stream<String>
Stream<List<String>> -> flatMap -> Stream<String>
Stream<List<Object>> -> flatMap -> Stream<Object>

flapMap将这样的input Stream连接成一个新的Stream,实现了底层数据的扁平化处理,其操作过程如下图所示:

1
2
3
4
5
6
7
8
Stream<List<Integer>> inputStream = Stream.of(
Arrays.asList(1),
Arrays.asList(2, 3),
Arrays.asList(4, 5, 6)
);
Stream<Integer> outputStream = inputStream.
flatMap((childList) -> childList.stream());
List<Integer> collected = outputStream.collect(Collectors.toList());

上面的代码使用Strem的工厂方法,将每一个列表转换为Stream对象,然后使用flatMap方法将多个Stream转换为一个新的Stream。注意flapMap方法的相关函数接口和map方法一样,都是Function接口类型,只是方法的返回值限制为Stream类型罢了。

3.2.4 filter

filter 对原始 Stream 进行某项测试,通过测试的元素被留下来生成一个新 Stream。filter接受的参数是一个Predicate类型的lambda表达式。

下面是用filter保留偶数的操作:

1
2
3
4
Integer[] sixNums = {1, 2, 3, 4, 5, 6};
Integer[] evens = Stream.of(sixNums)
.filter(n -> n%2 == 0)
.toArray(Integer[]::new);

3.2.5 findFirst

这是一个 termimal 兼 short-circuiting 操作,它总是返回 Stream 的第一个元素,或者空。需要注意的是findFirst返回值的类型是Optional,Optional表示一个值的容器,它可能含有某值,可能不包含,使用它的目的是尽可能避免NullPointException。

Stream 中的 findAny、max/min、reduce 等方法等返回 Optional 值。还有例如 IntStream.average() 返回 OptionalDouble 等等。

3.3 使用收集器

收集器是将流生成对应集合元素类型的方法,常用的收集器可从java.util.stream.Collectors中导入。例如Collectors.toList()就是一种从流生成对应列表的收集器,通过将收集器传递给collect方法,所有的流就可以使用它了。下面是使用收集器将流转化为其它类型集合的例子:

  • 使用toArray方法将String的流转化为对应的数组
1
2
3
String[] strArray = Stream.of("a", "b", "c")
.filter(x -> x != String.valueOf("b"))
.toArray(String[]::new);

注意:toArray接收的函数接口类型或lambda表达式类型是IntFunction,IntFunction如下图所示:

因此,上面的代码和下面的代码是等价的:

1
2
3
String[] strArray = Stream.of("a", "b", "c")
.filter(x -> x != String.valueOf("b"))
.toArray(x -> new String[x]);
  • 使用toCollection,用定制的集合收集元素,下面是将Stream收集成TreeSet集合的例子:
1
2
3
TreeSet<String> strSet = Stream.of("a", "b", "c", "c")
.filter(x -> x != String.valueOf("b"))
.collect(Collectors.toCollection(TreeSet::new));

这里,toCollection接收的函授接口类型是Supplier,Supplier表示lambda表达式不接收参数,但返回一个值,由于lambda表达式返回的类型可以根据上下文自动识别,所以这里当然返回的是一个TreeSet的对象了。

参考:

Java 8 中的 Streams API 详解

Java8初体验(二)Stream语法详解

Java 8函数式编程