问题

par­al­lel­Stream是一个并行流操作,使用不当容易产生线程不安全的问题,如何避免这个问题,Java 官方给出两个解决办法:

  1. 使用收集器 col­lect( )方法
  2. 使用re­duce()方法

我们需要注意的是这两个方式保证的只是收集起来的结果的线程安全,如果你像下面这样使用:

List<Integer> list = Stream.iterate(1, n -> n + 2)
                .limit(1000000)
                .collect(Collectors.toList());

        List<Integer> midList = Lists.newArrayList();
        list.parallelStream()
                .map(num -> {
                    midList.add(num);
                    return num;
                })
                .collect(Collectors.toList());

当基数特别大的时候很容易得到一个数组越界的异常

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at ParallelStreamMain.main(ParallelStreamMain.java:26)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1851
    at java.util.ArrayList.add(ArrayList.java:459)
    at ParallelStreamMain.lambda$main$1(ParallelStreamMain.java:23)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
    at java.util.stream.AbstractTask.compute(AbstractTask.java:316)

上述事例中,在一个流中进行了额外的业务处理,即对 midList 这个集合进行add操作。

Ar­rayList 是一种线程不安全的集合,add 是线程不安全的操作,因为并行流只能保证 re­turn 出去的结果收集是线程安全的,那么当前额外操作如果是非线程安全的就很容易出现异常!

如何避免

约定我们的行为,在一个流里只做一件事,特别是在并行流里。

流只能遍历一次,那我们想复用流要怎么办,我们可以采用封装或者 Sup­plier<T> 函数式来解决这个问题:

1.封装

private Stream<String> getStream() {
           return Stream.of("d2", "a2", "b1", "b3", "c")
                   .filter(s -> s.startsWith("a"));
       }

2.Supplier

Supplier<Stream<String>> streamSupplier =
                   () -> Stream.of("d2", "a2", "b1", "b3", "c")
                           .filter(s -> s.startsWith("a"));

           streamSupplier.get().anyMatch(s -> true);
           streamSupplier.get().noneMatch(s -> true);

注意:但是这仅仅只是代码的复用,没有性能的提升。

相关问题

https://stackoverflow.com/questions/22350288/parallel-streams-collectors-and-thread-safety
https://stackoverflow.com/questions/40238099/is-it-safe-to-use-parallelstream-to-populate-a-map-in-java-8?noredirect=1&lq=1