目录

chen 的个人博客

VX:ZzzChChen
Phone:13403656751
Email:zxydczzs@gmail.com

X

ParallelStream 并行流

一、什么是流

Stream 是 Java8 中新增加的一个特性,统称为流,它不是数据结构也不存放任何数据,其主要用于集合的逻辑处理。

二、Stream 和 Iterator 的区别

Iterator 作为迭代器,其按照一定的顺序迭代遍历集合中的每一个元素,并且对每个元素进行指定的操作。而 Stream 在此基础上还可以将这种操作并行化,利用多核处理器的优势快速处理集合(集合的数据会分成多个段,有多个线程处理)。

三、Stream 的使用

Stream 完全依赖于接口化编程方式,以下举例了解下 Stream 的使用。

1        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
2        numbers.stream().forEach(System.out::print);
3	// 输出12345678

由以上的例子可以看出,Stream 的遍历方式和结果与 Iterator 没什么差别,这是因为 Stream 的默认遍历和迭代器是相同的,保证以往使用迭代器的地方可以方便的改写为 Stream。

Stream 的强大之处在于其通过简单的链式编程,使得它可以方便的遍历处理后的数据进行再处理。我们以对集合中的数字加 1,并转换成字符串为例进行演示;

1        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
2        System.out.println(numbers.stream().map(num -> Integer.toString(++num)).collect(Collectors.toList()));
3	// 输出 [2, 3, 4, 5, 6, 7, 8, 9]

其中 map()方法遍历处理每一个元素,并且返回一个新的 Stream,随后 collect 方法将操作后的 Stream 解析为 List。

Stream 还提供了非常多的操作,如 filter()过滤、skip()偏移等等。

四、并行流 ParallelStream

ParallelStream 提供了流的并行处理,它是 Stream 的另一重要特性,其底层使用 Fork/Join 框架实现。简单理解就是多线程异步任务的一种实现。

以下实例演示一下 ParallelStream 的使用

1        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
2        numbers.parallelStream().forEach(System.out::print);
3	// 输出 65718423

实例输出发现,使用 ParallelStream 后,结果并不按照集合原有顺序输出,我们打印出线程信息来证明该操作是并行的。

 1        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
 2        numbers.parallelStream().forEach(num -> {
 3            System.out.println(Thread.currentThread().getName() + ":" + num);
 4        });
 5        /* 输出
 6            main:6
 7            ForkJoinPool.commonPool-worker-2:8
 8            ForkJoinPool.commonPool-worker-4:5
 9            ForkJoinPool.commonPool-worker-6:1
10            ForkJoinPool.commonPool-worker-3:7
11            ForkJoinPool.commonPool-worker-1:3
12            ForkJoinPool.commonPool-worker-2:4
13            ForkJoinPool.commonPool-worker-4:2
14         */

通过以上实例可以确信 ParallelStream 是利用多线程进行的,这可以很大程度简化我们使用并发操作。

五、并行流的陷阱

5.1、线程安全

由于并行流使用多线程,则一切线程安全问题都应该是需要考虑的问题,如:资源竞争、死锁、事务、可见性等等。

5.2、线程消费

在虚拟机启动时,我们制定了 worker 线程的数量,整个程序的生命周期都将使用这些工作线程;这必然存在任务生产和消费的问题,如果某个生产者生产了许多重量级的任务(耗时很长),那么其他任务毫无疑问将会没有工作线程可用;更可怕的事情是这些工作线程正在进行 IO 阻塞;

本应该利用并行加速处理的业务,因为工作者不够反而会额外增加处理时间,使得系统性能在某一时刻大打折扣。而且这一类问题往往是很难排查的。我们并不知道一个重量级项目中的哪个框架、哪一个模块在使用并行流。

接下来我们对这个问题进行演示

1    public static void main(String[] args) throws InterruptedException {
2        Thread thread1 = new Thread(ParallelStream::streamTest1);
3        Thread thread2 = new Thread(ParallelStream::streamTest2);
4        thread1.start();
5        thread2.start();
6        thread1.join();
7        thread2.join();
8    }
 1    public static void streamTest1() {
 2        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
 3        numbers.parallelStream().forEach(num -> {
 4            System.out.println("streamTest1请求并行:" + Thread.currentThread().getName() + "---" + num);
 5            try {
 6                Thread.sleep(5000L);
 7            } catch (InterruptedException e) {
 8                throw new RuntimeException(e);
 9            }
10        });
11    }
12    public static void streamTest2() {
13        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
14        numbers.parallelStream().forEach(num -> {
15            System.out.println("streamTest2请求并行:" + Thread.currentThread().getName() + "---" + num);
16            try {
17                Thread.sleep(5000L);
18            } catch (InterruptedException e) {
19                throw new RuntimeException(e);
20            }
21        });
22    }

执行结果

 1streamTest1请求并行:ForkJoinPool.commonPool-worker-3---2
 2streamTest1请求并行:ForkJoinPool.commonPool-worker-7---4
 3streamTest1请求并行:ForkJoinPool.commonPool-worker-1---3
 4streamTest1请求并行:ForkJoinPool.commonPool-worker-5---5
 5streamTest1请求并行:ForkJoinPool.commonPool-worker-6---1
 6streamTest1请求并行:Thread-0---6
 7streamTest1请求并行:ForkJoinPool.commonPool-worker-2---8
 8streamTest2请求并行:Thread-1---6
 9streamTest1请求并行:ForkJoinPool.commonPool-worker-4---7
10streamTest1请求并行:ForkJoinPool.commonPool-worker-1---9
11streamTest2请求并行:ForkJoinPool.commonPool-worker-3---3
12streamTest2请求并行:ForkJoinPool.commonPool-worker-2---2
13streamTest2请求并行:Thread-1---5
14streamTest2请求并行:ForkJoinPool.commonPool-worker-4---1
15streamTest2请求并行:ForkJoinPool.commonPool-worker-6---4
16streamTest2请求并行:ForkJoinPool.commonPool-worker-7---8
17streamTest2请求并行:ForkJoinPool.commonPool-worker-5---7
18streamTest2请求并行:ForkJoinPool.commonPool-worker-2---9
19

通过示例我们会发现,第一个并行流率先获得 worker 线程的使用权,第二个并行流变为串行;直到第一个并行流处理完毕,第二个并行流才可以拿到 worker 线程开始进行并行处理;

六、Stream Or ParallelStream

在上面我们也讲述到了 ParallelStream 所带来的隐患和好处,那么再从 Stream 和 ParallelStream 方法中进行选择时,我们可以考虑一下几个问题:

  1. 是否需要并行?
  2. 任务之间是否是独立的?是否会引起任务竞态条件?
  3. 结果是否取决于任务的调用顺序?

对于问题 1,在回答这个问题之前,你需要弄清楚你要解决的问题是什么,数据量有多大,计算的特点是什么?并不是所有的问题都适合使用并发程序来求解,比如当数据量不大时,顺序执行往往比并行执行更快。毕竟准备线程池和其他相关资源也是需要时间的。但是,当任务涉及到 I/O 操作并且任务之间不互相依赖时,那么并行化就是一个不错的选择。通常而言,将这类程序并行化之后,执行速度会提升好几个等级。

对于问题 2,如果任务之间是独立的,并且代码中不涉及到同一个对象的某个状态或者某个变量的更新操作,那么就表明代码是可以被并行化的。

对于问题 3,由于在并行环境中任务的执行顺序是不确定的,因此对于依赖顺序的任务而言,并行化也许不能给出正确的结果。

七、小结

串行流:适合存在线程安全问题、阻塞任务、重量级任务、顺序依赖性强以及需要使用同一事物等等的逻辑;

并行流:适合没有线程安全问题、任务之间不互相依赖、较单纯的数据处理任务等;


标题:ParallelStream 并行流
作者:zzzzchen
地址:https://dczzs.com/articles/2022/11/25/1669390522190.html