什么是流(Stream )
流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。你可以把它们看成遍历数据集的高级迭代器。它是单向的,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。
Stream 还可以进行并行操作。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。
流到底是什么呢?简短的定义就是“从支持数据处理操作的源生成的元素序列”。
- 元素序列——就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如ArrayList 与 LinkedList)。但集合讲的是数据,流讲的是计算。
- 源——流会使用一个提供数据的源,如集合、数组或输入/输出资源。 请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。这是一个很重要的特性!
- 数据处理操作——流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如filter、map、reduce、find、match、sort等。流操作可以顺序执行,也可并行执行。
- 流水线——很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线。那么我们就可以写出一个链式的代码,能够帮助我们实现很多的代码优化。流水线的操作可以看作对数据源进行数据库式查询。
- 内部迭代——与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。
流与集合
集合与流之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构, 它包含数据结构中目前所有的值——集合中的每个元素都得先算出来才能添加到集合中。(你可以往集合里加东西或者删东西,但是不管什么时候,集合中的每个元素都是放在内存里的,元素都得先算出来才能成为集合的一部分。)
相比之下,流则是在概念上固定的数据结构(你不能添加或删除元素),其元素则是按需计算的。 这个思想就是用户仅仅从流中提取需要的值,而这些值——在用户看不见的地方——只会按需生成。这是一种生产者-消费者的关系。从另一个角度来说,流就像是一个延迟创建的集合:只有在消费者要求的时候才会计算值。所以流是惰性的,必须要有终端操作才能驱动整个流链路的执行!这也使得流可以从无限流到有限集合。
流的构成
一个数据源(如集合)来执行一个查询;
一个中间操作链,形成一条流的流水线;
一个终端操作,执行流水线,并能生成结果。
获取一个数据源(source)→ 数据转换(中间操作链)→执行操作获取想要的结果(终端操作),每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道,,如下图所示。
图 1. 流管道 (Stream Pipeline) 的构成
流创建
Collections
,List
,Set
诸如此类的集合可以使用stream()
方法直接创建一个流。由值创建流。使用静态方法
Stream.of()
,通过显式值创建一个流方法创建一个stream对象。它可以接受任意数量的参数。例如:Stream<String> stream = Stream.of("Java 8 ", "Lambdas ", "In ", "Action"); stream.map(String::toUpperCase).forEach(System.out::println);
- 由数组创建流。使用静态方法
Arrays.stream()
从数组创建一个流。它接受一个数组作为参数。例如:
int[] numbers = {2, 3, 5, 7, 11, 13};
int sum = Arrays.stream(numbers).sum();
4.由函数生成流:创建无限流。Stream API提供了两个静态方法来从函数生成流:Stream.iterate()
和Stream.generate()
。这两个操作可以创建所谓的无限流:不像从固定集合创建的流那样有固定大小的流。由iterate
和generate
产生的流会用给定的函数按需创建值,因此可以无穷无尽地计算下去!一般来说,
应该使用limit(n)
来对这种流加以限制,以避免打印无穷多个值。
(1).迭代iterate
Stream<T> iterate(final T seed, final UnaryOperator<T> f)
,接收一个初始值也叫种子值,还有一个依次应用在每个产生的新值上的UnaryOperator<T>类型函数式接口。此操作将生成一个无限流!
例如:
Stream.iterate(0, n -> n + 2)
.limit(10)
.forEach(System.out::println);
再例如:得到一个斐波那契数列
Stream.iterate(new int[] { 0, 1 }, t -> new int[] { t[1], t[0] + t[1] })
.limit(20)
.forEach(t -> System.out.println("(" + t[0] + "," + t[1] + ")"));
//iterate需要一个Lambda来确定后续的元素。对于元组(3, 5),其后续元素是(5, 3+5) = (5, 8)。下一个是(8, 5+8)。
//看到这个模式了吗?给定一个元组,其后续的元素是(t[1], t[0] + t[1])。这可以用这个Lambda来计算:t->new int[]{t[1], t[0]+t[1]}。
//运行这段代码,你就得到了序列(0, 1), (1, 1), (1, 2), (2, 3), (3, 5), (5, 8), (8, 13), (13, 21)…请注意,
//如果你只想打印正常的斐波纳契数列,可以使用map提取每个元组中的第一个元素
Stream.iterate(new int[] { 0, 1 }, t -> new int[] { t[1], t[0] + t[1] })
.limit(10)
.map(t -> t[0])
.forEach(System.out::println);
(2).生成generate
与iterate方法类似,generate方法也可让你按需生成一个无限流。但generate不是依次对每个新生成的值应用函数的。它接受一个Supplier<T>
类型的Lambda提供新的值。例如:
Stream.generate(Math::random)
.limit(5)
.forEach(System.out::println);
我们使用的供应源(指向Math.random的方法引用)是无状态的:它不会在任何地方记录任何值,以备以后计算使用。但供应源不一定是无状态的。你可以创建存储状态的供应源,它可以修改状态,并在为流生成下一个值时使用。例如使用generate
生成斐波那契数列:
IntSupplier fib = new IntSupplier() {
private int previous = 0;
private int current = 1;
public int getAsInt() {
int oldPrevious = this.previous;
int nextValue = this.previous + this.current;
this.previous = this.current;
this.current = nextValue;
return oldPrevious;
}
};
IntStream.generate(fib)
.limit(10)
.forEach(System.out::println);
这里使用的匿名类和Lambda的区别在于,匿名类可以通过字段定义状态,而状态又可以用getAsInt
方法来修改。这是一个副作用的例子。你迄今见过的所有Lambda都是没有副作用的; 它们没有改变任何状态。此对象有可变的状态:它在两个实例变量中记录了前一个斐波纳契项和当前的斐波纳契项。getAsInt
在调用时会改变对象的状态,由此在每次调用时产生新的值。相比之下,使用iterate的方法则是纯粹不变的:它没有修改现有状态, 但在每次迭代时会创建新的元组。为了方便并行处理流,并保持结果正确。我们应该应该始终采用不变的方法!
5.由文件生成流
Java中用于处理文件等I/O操作的NIO API(非阻塞 I/O)已更新,以便利用Stream API。java.nio.file.Files
中的很多静态方法都会返回一个流。例如,一个很有用的方法是Files.lines
。目前还基本用不到,故此不做详述。
流操作
中间操作(intermediate operations):
一个流可以后面跟随零个或多个中间操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),仅仅调用到这类方法,并没有真正开始流的遍历。
终端操作(terminal operation):
一个流只能有一个终端操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。终端操作的执行,才会真正开始流的遍历,并且会生成一个结果是非Stream的值,或者一个 side effect。
操作 | 类型 | 方法 |
---|---|---|
中间操作(Intermediate operations) | 无状态(Stateless) | unordered、filter、 map、 mapToInt、mapToLong、mapToDouble、flatMap、 flatMapToInt、flatMapToLong、flatMapToDouble、peek |
有状态(Stateful) | distinct、sorted、sorted、limit、 skip | |
终端操作(Terminal operations) | 非短路操作 | forEach、forEachOrdered、toArray、reduce、collect、max、min、count |
短路操作(short-circuiting) | anyMatch、 allMatch、noneMatch、findFirst、findAny |
//没有终端操作 没有输出
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("No Terminal . filter: " + s);
return true;
});
//有终端操作,,,,,,流的执行顺序
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));
输出:
filter: d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c
//有状态和无状态
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
输出:
sort: a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
filter: d2
API:中间操作
filter(Predicate<T>):
过滤操作只会保留那些与传递进去的过滤器函数计算结果为 true 元素。
例如:
Stream.of(0,1,2,3,4,5,6)
.filter(i->i>4)
.forEach(System.out::println);//输出5,6
distinct():
用于去除流中的重复元素。相比创建一个 Set 集合,该方法的工作量要少得多
例如:
Stream.of(0,1,2,3,4,5,6,5,6)
.filter(i->i>4)
.distinct()
.forEach(System.out::println);//输出5,6
但是distinct()
有一个弊端,去重不能做到指定某个key作为去重条件,进阶去重:
List<User> users=Lists.newArrayList();
users.add(new User("Jim","man",12));
users.add(new User("Jim","man",14));
users.add(new User("Tom","man",15));
users.add(new User("Tom","man",15));
users.add(new User("Jane","woman",13));
users.stream()
.distinct()
.peek(user -> System.out.println(user.getName()+" "+user.getGender()+" "+user.getAge()))
.collect(Collectors.toList());
List<User> distinctUsers = users.stream()
.filter(distinctByKey(User::getName))
.peek(user -> System.out.println(user.getName()+" "+user.getGender()+" "+user.getAge()))
.collect(Collectors.toList());
public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
Map<Object, Boolean> seen = new ConcurrentHashMap<>();
return object -> seen.putIfAbsent(keyExtractor.apply(object), Boolean.TRUE) == null;
}
输出:
上面的
Jim man 12
Jim man 14
Tom man 15
Jane woman 13
下面的
Jim man 12
Tom man 15
Jane woman 13
我们可以注意到使用distinct
的话必须对象完全相等才能被去掉所以Tom
被去掉了,但是下面的方式根据name
去掉了重复的Jim
和Tom
。
limit(long maxSize):
对一个Stream进行截断操作,获取其前maxSize个元素,如果原Stream中包含的元素个数小于maxSize,那就获取其所有的元素。limit也可以用在无序流上,比如源是一个Set。这种情况下,limit的结果不会以任何顺序排列。l例子:
Stream.iterate(0, n -> n + 1)
.limit(10)
.forEach(System.out::println);
skip(long n):
返回一个丢弃原Stream的前n个元素后剩下元素组成的新Stream,如果原Stream中包含的元素个数小于n,那么返回空Stream。limit(n)和skip(n)是互补的。
例如:
Stream.of(0, 1, 2, 3, 4, 5, 6, 5, 6)
.skip(3)
.forEach(System.out::println);//输出3 4 5 6 5 6
map(Function<T, U>):
将 Function 操作应用在输入流的每一个元素中,并将返回值传递到输出流中。即把一个元素按照规则映射成另一个元素。
例如:
users.stream()
.map(User::getName)
.collect(Collectors.toList());
mapToInt(ToIntFunction)
:操作同上,但结果是 IntStream。mapToLong(ToLongFunction)
:操作同上,但结果是 LongStream。mapToDouble(ToDoubleFunction)
:操作同上,但结果是 DoubleStream。
flatMap(Function<? super T, ? extends Stream<? extends R>> mapper)
flatmap方法让你把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流。它可以做到打扁一个流的目的,例如List<List<E>>
这种嵌套的集合,他可以轻松打扁将内层List全部展平,扁平化成一个流。
List<User> users = Lists.newArrayList();
users.add(new User("Jim", "man", 12));
users.add(new User("Jim", "man", 14));
users.add(new User("Tom", "man", 15));
users.add(new User("Tom", "man", 15));
users.add(new User("Jane", "woman", 13));
users.stream()
.map(User::getName)
.map(name->Splitter.on("m").splitToList(name))//guava包的Splitter
.flatMap(Stream::of)
.collect(Collectors.toList());
flatMapToInt(Function)
:当Function
产生IntStream
时使用。flatMapToLong(Function)
:当Function
产生LongStream
时使用。flatMapToDouble(Function)
:当Function
产生DoubleStream
时使用。
sorted()/sorted(Comparator<? super T> comparator)
比较器,无参比较器默认使用自然序排序【从小到大】和传入一个 Comparator 参数的比较器,如Comparator.reverseOrder()【反序】。例如
Stream.of(199, 22, 73, 15, 67, 77, 28)
.sorted(Comparator.reverseOrder())
.forEach(System.out::println);
Stream.of(199, 22, 73, 15, 67, 77, 28)
.sorted()
.forEach(System.out::println);
更复杂的用法:
List<User> users = Lists.newArrayList();
users.add(new User("Jim", "man", 12));
users.add(new User("Jim", "man", 14));
users.add(new User("Tom", "man", 15));
users.add(new User("Tom", "man", 23));
users.add(new User("Jane", "woman", 13));
users.add(new User("Jane", "woman", null));
List<User> newList = users.stream()
.sorted(Comparator.comparing(User::getAge, Comparator.nullsFirst(Comparator.reverseOrder())))
.peek(user -> System.out.println(user.getAge()))
.collect(Collectors.toList());//null 23 15 14 13 12
peek(Consumer<? super T> action)
目的是帮助调试。它允许你无修改地查看流中的元素。
注:如果使用了peek,sonar会建议不要使用。例如上面的例子
forEach(Consumer<? super T> action)与forEachOrdered(Consumer<? super T> action)
第一种形式是显式地设计为以任何顺序操作元素,这只在引入 parallel()
(并行) 操作时才有意义。第二种形式保证了 forEach 的操作顺序是原始流顺序。当然在串行流中这二者并无差别。
//foreach
IntStream rands = Arrays.stream(new Random(47).ints(0, 1000)
.limit(100)
.toArray());
int sz = 14;
rands().limit(sz)
.forEach(n -> System.out.format("%d ", n));
//foreach
System.out.println();
rands().limit(sz)
.parallel()
.forEach(n -> System.out.format("%d ", n));
//forEachOrdered
System.out.println();
rands().limit(sz)
.parallel()
.forEachOrdered(n -> System.out.format("%d ", n));
System.out.println();
输出:
258 555 693 861 961 429 868 200 522 207 288 128 551 589
551 589 555 868 258 961 288 207 429 200 128 522 861 693
258 555 693 861 961 429 868 200 522 207 288 128 551 589
当然了,foreach也可以用于map的遍历!例如
Map<String,String> map= Maps.newHashMap();
map.put("xiaoming","12");
map.put("xiaohua","13");
map.forEach((key,value)->{
System.out.println(key+" "+value);
});
这时候的foreach不再是上述的形式了。而是forEach(BiConsumer<? super K, ? super V> action)
,BiConsumer
是接收两个参数的消费型函数式。
查找:findFirst()与findAny()
findFirst()
:返回一个包含第一个元素的 Optional 对象,如果流为空则返回 Optional.empty
findAny()
:返回包含任意元素的 Optional 对象,如果流为空则返回 Optional.empty。
findFirst()
无论流是否为并行化的,总是会选择流中的第一个元素。对于非并行流,findAny()
会选择流中的第一个元素(即使从定义上来看是选择任意元素)。所以一般来说在并行流里会使用findAny()
更多。
Stream.of(199, 22, 73, 15, 67, 77, 28)
.sorted()
.findFirst()
.ifPresent(System.out::println);//15
System.out.println();
//findAny
Stream.of(199, 22, 73, 15, 67, 77, 28)
.sorted()
.parallel()
.findAny()
.ifPresent(System.out::println);//73
比较大小:min()与max():
min(Comparator<? super T> comparator
:接收一个 Comparator 参数的比较器,返回最小值的 Optional 对象,如果流为空则返回 Optional.empty
max(Comparator<? super T> comparator)
:接收一个 Comparator 参数的比较器,返回最大值的 Optional 对象,如果流为空则返回 Optional.empty
Stream.of(199, 22, 73, 15, 67, 77, 28)
.max(Integer::compareTo)
.ifPresent(System.out::println);//199
Stream.of(199, 22, 73, 15, 67, 77, 28)
.min(Integer::compareTo)
.ifPresent(System.out::println);//15
匹配:anyMatch、allMatch、noneMatch
anyMatch(Predicate<? super T> predicate)
:如果流中的一个元素根据提供的 Predicate 返回 true 时,结果返回为 true
allMatch(Predicate<? super T> predicate)
:如果流的每个元素根据提供的 Predicate 都返回 true 时,结果返回为 true
noneMatch(Predicate<? super T> predicate)
:如果流的每个元素根据提供的 Predicate 都返回 false 时,结果返回为 true
List<String> strs = Arrays.asList("a", "a", "a", "a", "b");
boolean aa = strs.stream()
.anyMatch(str -> str.equals("a"));
boolean bb = strs.stream()
.allMatch(str -> str.equals("a"));
boolean cc = strs.stream()
.noneMatch(str -> str.equals("a"));
System.out.println(aa);// TRUE
System.out.println(bb);// FALSE
System.out.println(cc);// FALSE
归约:reduce()
Optional<T> reduce(BinaryOperator<T> accumulator)
:使用 BinaryOperator 来组合所有流中的元素。其返回值为 Optional类型。
T reduce(T identity, BinaryOperator<T> accumulator)
:功能同上,但是使用 identity 作为其组合的初始值。因此如果流为空,identity 就是结果。
<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner)
:为并行流而生的。
Stream.of(199, 22, 73, 15, 67, 77, 28)
.reduce(Integer::sum)
.ifPresent(System.out::println);
String sss = Stream.of(19, 21, 73, 15, 67, 77, 28)
.limit(2)
.reduce(10, Integer::sum)
.toString();
System.out.println(sss);
一般来说前两个就能满足我们的需求了。
reduce的第二个第三个重载方法都可以用于并行,为什么第二个可以替代第三个方法,为什么还要有第三个方法呢?
T reduce(T identity, BinaryOperator<T> accumulator);
<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner);
仔细看这两个接口的定义,发现了什么?第一个入参和出参必须都是T类型,但是第二个就不一样了,他的入参是T类型,但是它的出参可以是U!另外需要注意的是,在非并行的情况下,使用三个参数的重载方法,第三个参数BinaryOperator是不会执行的!
例如:
List<User> newUserList = Lists.newArrayList();
newUserList.add(new User("Jim", "man", 12));
newUserList.add(new User("Jim", "man", 14));
newUserList.add(new User("Tom", "man", 15));
List<User> otherUserList = Lists.newArrayList();
otherUserList.add(new User("Tom", "man", 23));
otherUserList.add(new User("Jane", "woman", 13));
otherUserList.add(new User("Jane", "woman", null));
List<List<User>> containList = Lists.newArrayList(otherUserList, newUserList);
containList.stream()
.reduce(Lists.newArrayList(), (left, right) -> {
left.addAll(right);
return left;
});
containList.parallelStream()
.reduce(Maps.newHashMap(), (left, right) -> {
left.put(right, right);
return left;
}, (left, right) -> {
left.put(right, right);
return left;
});
这是一个比较刻意为之的例子,但是很好区分了两个的重载方法。
我们可以从reduce三个参数的的方法注释里看到:
Performs a reduction on the elements of this stream, using the provided identity, accumulation and combining functions. This is equivalent to:
U result = identity; for (T element : this stream) result = accumulator.apply(result, element) return result;
but is not constrained to execute sequentially.
The identity value must be an identity for the combiner function. This means that for all u, combiner(identity, u) is equal to u. Additionally, the combiner function must be compatible with the accumulator function; for all u and t, the following must hold:combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
reduce三个参数的方法的使用一定要特别注意满足几个条件!
第一个:初始值identity
要和combiner(identity, u)
累加器计算之后相等。例如
Integer identity = 10;
BinaryOperator<Integer> combiner = (x, y) -> x + y;
boolean identityRespected = combiner.apply(identity, 1) == 1;
System.out.println(identityRespected); // prints false
这就不满足条件,所以下面的例子在使用并行流计算求和就会出现不等的情况!
int result = Stream.of(1, 2, 3, 4, 5, 6)
.parallel()
.reduce(10, (a, b) -> a + b);
System.out.println(result); // 81 on my run
每一次运行都可能导致结果不同,这取决与你的电脑核心数。因为比如是六核心的话就是:(10+1)+(10+2)+。。。+(10+6)=81,但实际上他们求和应该是31。很明显这种并行求和就是错误的用法!如果初始值identity换成0那么,条件就能成立,所以并行的结果也就是正确的!
第二个:累加器和组合器能够相互替换, combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
List<String> left = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee");
List<String> right = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee", "");
System.out.println(howMany(left)); // 38 on my run
System.out.println(howMany(right)); // 50 on my run
/**
* count letters, adding a bit more all the time
*/
private static int howMany(List<String> tokens) {
return tokens.stream()
.parallel()
.reduce(0, // identity
(i, s) -> { // accumulator
return s.length() + i;
}, (left, right) -> { // combiner
return left + right + left; // notice the extra left here
});
}
这就是不满足条件,所以上面的例子在使用并行流计算就会出现不等的情况!我们将上面的操作拆解
Integer identity = 0;
String t = "aa";
Integer u = 3; // "bbb"
BiFunction<Integer, String, Integer> accumulator = (Integer i, String s) -> i + s.length();
BinaryOperator<Integer> combiner = (left, right) -> left + right + left;
int first = accumulator.apply(identity, t); // 2
int second = combiner.apply(u, first); // 3 + 2 + 3 = 8
Integer shouldBe8 = accumulator.apply(u, t);
System.out.println(shouldBe8 == second); // false
我们可以发现两个是无法互相替换的,所以这种用法也必然在并行流里会导致错误的结果!
在之前的博文我们也探讨了关于reduce的一些踩坑记录参见:并行流下的reduce归约List问题
收集器:collect()
<R> R collect(Supplier<R> supplier,BiConsumer<R, ? super T> accumulator,BiConsumer<R, R> combiner);
:同上,但是 Supplier 创建了一个新的结果容器,第一个 BiConsumer 是将下一个元素包含在结果容器,而第二个 BiConsumer 是用于将两个容器聚合起来,作用于并行的时候,如果非并行情况,第三个参数不会被执行。有点类似reduce。最后的结果是折叠进左边的容器,例如:
Map<Integer,Integer> integerMap = Lists.newArrayList(1, 2, 3, 4, 5)
.stream()
.collect(Maps::newHashMap, (map, listValue) -> {
System.out.println("执行 accumulator");
map.put(listValue,listValue);
}, (mapLeft, mapRight) -> {
System.out.println("执行 combiner");
mapLeft.putAll(mapRight);
});
//执行 accumulator执行 accumulator执行 accumulator执行 accumulator 执行 accumulator
<R, A> R collect(Collector<? super T, A, R> collector)
:Java8针对上面的用法进行高度抽象,更加灵活易用,官方也封装了一系列的常用的操作,放在Collectors
类里。使用 Collector
来累计流元素到结果集合中。
一般来说使用官方提供的Collectors
就能满足我们的需求,你也可以自定义一个Collector
,这里我们也主要关注Collector
的用法。
public interface Collector<T, A, R> {
Supplier<A> supplier(); //用来创建并且返回一个可变结果容器
BiConsumer<A, T> accumulator(); //将一个值叠进一个可变结果容器
//接受两个部分结果并将它们合并。可能是把一个参数叠进另一个参数并且返回另一个参数,
//也有可能返回一个新的结果容器,并行处理时会用到
BinaryOperator<A> combiner();
//将中间类型执行最终的转换,转换成最终结果类型,如果属性 IDENTITY_TRANSFORM 被设
//置,该方法会假定中间结果类型可以强制转成最终结果类型
Function<A, R> finisher();
//收集器的属性集合定义了归约器的一些其他行为,比如流是否可以并行归约,是否可以直接把累加器类型作为
//最终返回值返回等
Set<Characteristics> characteristics();
}
T:表示流中每个元素的类型。
A:表示中间结果容器的类型。
R:表示最终返回的结果类型。
Collector
中定义了一个枚举类Characteristics
,有三个枚举值。
Characteristics.CONCURRENT
:表示中间结果只有一个,即使在并行流的情况下。所以只有在并行流且收集器不具备CONCURRENT特性时,combiner
方法返回的lambda表达式才会执行(中间结果容器只有一个就无需合并)。Characteristics.UNORDER
:表示流中的元素无序。Characteristics.IDENTITY_FINISH
:表示中间结果容器类型与最终结果类型一致,此时finiser
方法不会被调用。
参考toList
源码:
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}
static final Set<Collector.Characteristics> CH_ID
= Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
CollectorImpl(Supplier<A> supplier,
BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner,
Set<Characteristics> characteristics) {
this(supplier, accumulator, combiner, castingIdentity(), characteristics);
}
@SuppressWarnings("unchecked")
private static <I, R> Function<I, R> castingIdentity() {
return i -> (R) i;
}
我们观察Collectors
类可以发现这个类里很多操作都是实现了内部类CollectorImpl
完成的。ArrayList::new
创建了一个中间容器,List::add
将流中的每个数据都折叠进中间容器,(left, right) -> { left.addAll(right); return left; }
将并行执行生成的中间容器合并,castingIdentity()
因为我们的中间容器和最终的结果容器类型一致,所以这里直接做了类型强制转换,将ArrayList
转成List
,CH_ID
告诉我们中间结果容器类型与最终结果类型一致,所以其实finiser
方法不会被调用。与此同理可以查看源码的其他操作。
Collectors.toList()/Collectors.toSet()/Collectors.toCollection(Supplier<C> collectionFactory)
遍历流得到一个集合结果,toList
、toSet
应该都很好理解,如果说你想返回一个不是List,set的集合类型那么你可以用toCollection指定返回还说呢么类型的集合。例如:
numbers.stream()
.filter(i -> i > 0)
.collect(Collectors.toList());
LinkedList<Integer> integerMap1 = Lists.newArrayList(1, 2, 3, 4, 5)
.stream()
.collect(Collectors.toCollection(Lists::newLinkedList));
Collectors.toMap()
它有好几个重载方法:
public static <T, K, U>
Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper) {
return toMap(keyMapper, valueMapper, throwingMerger(), HashMap::new);
}
public static <T, K, U>
Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction) {
return toMap(keyMapper, valueMapper, mergeFunction, HashMap::new);
}
public static <T, K, U, M extends Map<K, U>>
Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapSupplier) {
BiConsumer<M, T> accumulator
= (map, element) -> map.merge(keyMapper.apply(element),
valueMapper.apply(element), mergeFunction);
return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
}
第一个就是很普通的将流的数据转成Map,第二种看源码是起源于第三种,我们看第三种,分析一下他的参数,第一个参数就是key的生成,第二个参数就是value的生成,第三个就是针对于存在key,它的value要做什么处理,调用Map
新加的方法merge()
default V merge(K key, V value,
BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
Objects.requireNonNull(remappingFunction);
Objects.requireNonNull(value);
V oldValue = get(key);
V newValue = (oldValue == null) ? value :
remappingFunction.apply(oldValue, value);
if(newValue == null) {
remove(key);
} else {
put(key, newValue);
}
return newValue;
}
我们看merge()
方法会发现它限制了value
不能为null
,所以这里要特别注意使用toMap()
第二、三个重载方法需要保证value
不能为null
。当旧值为null则返回value,否则执行remappingFunction
,如果执行之后的新值newValue
为null则移除key否则就是put
操作
Collectors.joining()
它也有三个重载方法
joining()
joining(CharSequence delimiter)
joining(CharSequence delimiter,CharSequence prefix,CharSequence suffix)
第一个无参的拼接就是单纯将字符串拼接,第二个方法则可以指定拼接起来的分隔符,第三个可以指定分隔符之外还可以指定他的前缀和后缀。例如
String str1 = Lists.newArrayList("a", "b", "c", "d", "e")
.stream()
.collect(Collectors.joining());
System.out.println(str1);
String str2 = Lists.newArrayList("a", "b", "c", "d", "e")
.stream()
.collect(Collectors.joining(";"));
System.out.println(str2);
String str3 = Lists.newArrayList("a", "b", "c", "d", "e")
.stream()
.collect(Collectors.joining(";","pre "," suf"));
System.out.println(str3);
输出:
abcde
a;b;c;d;e
pre a;b;c;d;e suf
Collectors.counting()
计算流结果数量,例如
Long str1 = Lists.newArrayList("a", "b", "c", "d", "e")
.stream()
.collect(Collectors.counting());
System.out.println(str1);//5
Collectors.maxBy()/minBy()
maxBy(Comparator<? super T> comparator)
minBy(Comparator<? super T> comparator)
根据入参的比较条件取最大最小值,需要注意的是他返回的是一个Optional
包装的数据
Collectors.reducing()
reducing
方法是很多归约处理的一般化情况,上述的很多处理都可以用reducing
替代,但是可读性可能没有上面那些更加通俗易懂。
public static <T> Collector<T, ?, Optional<T>>
reducing(BinaryOperator<T> op)
public static <T> Collector<T, ?, T>
reducing(T identity, BinaryOperator<T> op)
public static <T, U>
Collector<T, ?, U> reducing(U identity,
Function<? super T, ? extends U> mapper,
BinaryOperator<U> op)
它有三个重载方法:
第一个重载方法,接受一个归约操作,例如你想得到年龄最大的用户,返回一个Optional
包装的结果
List<User> users = Arrays.asList(new User("Jones", "man", 12),
new User("Cat", "man", 15),
new User("Jerry", "man", 16),
new User("Jane", "women", 18),
new User("Black", "man", 19));
users.stream()
.collect(Collectors.reducing((left, right) -> {
if (left.getAge() > right.getAge()) {
return left;
}
return right;
})).orElse(null);
第二个重载方法,接受一个初始值和一个归约操作。我们从上面得到结果会是Optional类型,有时候我们会很烦这种再包装一层的,但是Java为了不产生空指针不得不这样做,那么我们可以自定义一个初始值,这样就担心会返回空指针了。例如:
users.stream()
.collect(Collectors.reducing(new User(),(left, right) -> {
if (left.getAge() > right.getAge()) {
return left;
}
return right;
}));
第三个重载方法,接收一个初始值,映射函数,归约操作。比如上面我只想取返回年龄最大值,但是上述的方法要求我必须返回User
类型的值。那么这个方法可以满足你的要求。例如
users.stream()
.collect(Collectors.reducing(0,User::getAge,(left, right) -> {
if (left.intValue()>right.intValue()) {
return left;
}
return right;
}));
那么到这里可能会发现reduce
和reducing
有什么区别,因为这两个方法通常都可以得到相同的结果。
如下的例子我们 可以使用reduce完成toListCollectors所做的工作。
Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5, 6)
.stream();
List<Integer> numbers = stream.reduce(new ArrayList<Integer>(), (List<Integer> l, Integer e) -> {
l.add(e);
return l;
}, (List<Integer> l1, List<Integer> l2) -> {
l1.addAll(l2);
return l1;
});
这个解决方案有两个问题:一个语义问题和一个实际问题。语义问题在于,reduce方法旨在把两个值结合起来生成一个新值,它是一个不可变的归约。与此相反,collect方法的设计就是要改变容器,从而累积要输出的结果。这意味着,上面的代码片段是在滥用reduce方法,因为它在原地改变了作为累加器的List。你在下一章中会更详细地看到,以错误的语义使用reduce方法还会造成一个实际问题:这个归约过程不能并行工作,因为由多个线程并发修改同一个数据结构可能会破坏List本身。在这种情况下,如果你想要线程安全,就需要每次分配一个新的List,而对象分配又会影响性能。这就是collect方法特别适合表达可变容器上的归约的原因,更关键的是它适合并行操作,本章后面会谈到这一点。
Collectors.mapping()
mapping(Function<? super T, ? extends U> mapper,
Collector<? super U, A, R> downstream)
通过在收集之前对每一个元素进行映射,将接收类型U的元素的收集器调整为一个接受类型T的元素。第二个参数则是接受一个下游收集器,也意味你可以继续嵌套其他操作。例如
List<User> users = Arrays.asList(new User("Jones", "man", 12), new User("Cat", "man", 15),
new User("Jerry", "man", 16), new User("Jane", "women", 18), new User("Black", "man", 19));
Map<String, List<String>> mappingName = users.stream()
.collect(Collectors.groupingBy(User::getGender, Collectors.mapping(User::getName,Collectors.toList())));
Collectors.groupingBy()
public static <T, K> Collector<T, ?, Map<K, List<T>>>
groupingBy(Function<? super T, ? extends K> classifier)
public static <T, K, A, D>
Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
Collector<? super T, A, D> downstream)
public static <T, K, D, A, M extends Map<K, D>>
Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream)
执行流操作很多时候都避免不了一些分组操作,Collectors
也为我们提供了这样的api。groupingBy()
是按某种条件分组,我们这里称Function为分组函数。分组也可以支持多级分组。
第一个重载方法,接收一个分组函数,返回一个map,key就是分组函数的返回值,value就是分组得到的流数据聚合成List。例如:
List<User> users = Arrays.asList(new User("Jones", "man", 12),
new User("Cat", "man", 15),
new User("Jerry", "man", 16),
new User("Jane", "women", 18),
new User("Black", "man", 19));
Map<String, List<User>> commonGroupingMap = users.stream()
.collect(Collectors.groupingBy(User::getGender));//按性别分组
第二个重载方法,我们根据第一种得到的结果我们可能不满意,比如我只想得到这些User
的name
集合,而你却把整个对象都给我了,那就是很多冗余数据,所以也就有了第二个重载方法,你可以对分组的下行结果再进行自定义处理。例如上面mapping举的例子:
Map<String, List<String>> mappingName = users.stream()
.collect(
Collectors.groupingBy(User::getGender, Collectors.mapping(User::getName, Collectors.toList())));
第三个重载方法,我们根据第二个重载方法得到的结果可能也不满足我们的需求,比如我想把这个map的数据塞到我已经得到的一个Map里,我不想在外面再调用putAll
放进去,那么第三个方法可以帮助你实现愿望,mapFactory
可以让你指定一个初始Map
,如果翻阅源码会发现第二个重载方法就是运用第三个方法实现的,只是初始值是一个空map。例如:
Map<String,List<String>> alreadyExistsMap=Maps.newHashMap();
alreadyExistsMap.put("man",Lists.newArrayList("JackChan"));
Map<String, List<String>> newMap = users.stream()
.collect(
Collectors.groupingBy(User::getGender,()-> alreadyExistsMap, Collectors.mapping(User::getName, Collectors.toList())));
System.out.println("**** "+newMap);
//**** {man=[JackChan, Jones, Cat, Jerry, Black], women=[Jane]}
我们可以发现它很智能地为我们合并了,如果key值相同的话。
要实现多级分组,我们可以使用一个由双参数版本的Collectors.groupingBy
工厂方法创建的收集器,它除了普通的分类函数之外,还可以接受collector类型的第二个参数。那么要进行二级分组的话,我们可以把一个内层groupingBy
传递给外层groupingBy
,并定义一个为流中项目分类的二级标准例如,按性别分组之后再按年龄分组。一般来说,把groupingBy看作“桶”比较容易明白。第一个groupingBy给每个键建立了一个桶。然后再用下游的收集器去收集每个桶中的元素,以此得到n级分组。
Map<String, Map<Integer, List<User>>> multiGroupingMap = users.stream()
.collect(Collectors.groupingBy(User::getGender, Collectors.groupingBy(User::getAge)));
Collectors.partitioningBy()
public static <T>
Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {
return partitioningBy(predicate, toList());
}
public static <T, D, A>
Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
Collector<? super T, A, D> downstream)
partitioningBy
是以true
,false
作为分区的维度,分区是一个特殊的分组,true一组,false一组。我们称这个predicate
为分区函数。他有两个重载方法,第一个方法,直接分区之后收集成List,第二个方法接收一个下游收集器进行其他操作。例如:
Map<Boolean, List<User>> partitioningMap = users.stream()
.collect(Collectors.partitioningBy(user -> user.getAge()>15));
Map<Boolean, List<String>> partitioningMap = users.stream()
.collect(Collectors.partitioningBy(user -> user.getAge()>15, Collectors.mapping(User::getName,Collectors.toList())));
Collectors.collectingAndThen()
接收一个Collector
,再执行最终操作。字面意思就是收集器的结果转换为另一种类型。
方法接受两个参数——要转换的收集器以及转换函数,并返回另一个收集器。这个收集器相当于旧收集器的一个包装,collect操作的最后一步就是将返回值用转换函数做一个映射。
我们把上面几个api一起综合一下:
//collectingAndThen 以及多层嵌套的收集器
List<User> users = Arrays.asList(new User("Jones", "man", 12), new User("Cat", "man", 15),
new User("Jerry", "man", 16), new User("Jane", "women", 18), new User("Black", "man", 19));
String name = users.stream()
.collect(Collectors.collectingAndThen(Collectors.maxBy(Comparator.comparing(User::getAge)),
(Optional<User> user) -> user.isPresent() ? user.get()
.getName() : "none"));
Map<String, List<String>> collectUser = users.stream()
.collect(Collectors.groupingBy(User::getGender, Collectors.collectingAndThen(Collectors.toList(),
users1 -> users1.stream()
.map(User::getName)
.collect(Collectors.toList()))));
//多层嵌套
//根据性别分组,过滤出名字里含有 a 字母的名字
Map<String, List<String>> collectFilterUser = users.stream()
.collect(Collectors.groupingBy(User::getGender, Collectors.collectingAndThen(
Collectors.collectingAndThen(Collectors.toList(), user -> user.stream()
.map(User::getName)
.collect(Collectors.toList())), users1 -> users1.stream()
.filter(userName -> userName.contains("a"))
.collect(Collectors.toList()))));
System.out.println("Max salaried employee's name: " + name);
上面的例子有些是十分刻意为之,但是是为了体现收集器的强大之处,可以层层嵌套处理,但是我们十分建议把复杂的操作提取出方法,能够一目了然知道这个收集器是做什么操作!
当然api还不止这些,我只是列举了部分常用API作为讲解。
并行流
我们可以通过parallelStream()
或者parallel()
实现并行流。像forEachOrdered()、groupingByConcurrent() findAny()
这几个方法就是为并行流设计的。groupingByConcurrent
是groupingBy
并行流版本。
线程安全:
1.如果没有使用收集器(collector
)可能导致线程不安全,如foreach
并行流下会丢失部分数据引起异常!
2.并行流会导致拿不到本地线程变量
一些常见问题
- 出现这种报错
Variable used in lambda expression should be final or effectively final
实例变量在堆上分配的,而局部变量在栈上进行分配,lambda表达式运行在一个独立的线程中,我们知道栈内存是线程私有的,所以局部变量也属于线程私有,如果肆意的允许lambda表达式引用局部变量,可能会存在局部变量以及所属的线程被回收,而lambda表达式所在的线程却无从知晓,这个时候去访问就会出现错误,而final类型的局部变量在Lambda表达式(匿名类) 中其实是局部变量的一个拷贝。 - 类型推断
有时候可能会疑惑我们的调用代码会具体匹配哪个函数式接口,实际上编译器会根据参数、返回类型、异常类型(如果存在)等因素做正确的判定。我们写代码常常会遇到比如方法不是静态的,或者某个get,set方法飘红,这些大多数都是因为你的入参类型或者返回的类型没对上,导致类型推断的时候出现错误。