第六章 - 使用Fork/Join 框架

Java 7 引入了一个特殊的 executor 使用 Fork/Join 框架。Fork/Join 框架用来解决那些能使用分治算法解决的问题 Fork/Join 框架介绍 为了使用分治算法,你必须把一个问题分解成小问题。使用...

Java 7 引入了一个特殊的 executor 使用 Fork/Join 框架。Fork/Join 框架用来解决那些能使用分治算法解决的问题

Fork/Join 框架介绍

为了使用分治算法,你必须把一个问题分解成小问题。使用递归方法来重复相同的操作直到问题被细分为能直接被解决的足够小的问题。这些小问题可以用 executor 来解决,但是为了更有效地解决这些小问题,Java 7 引入了 Fork/Join 框架。

这个框架基于 ForkJoinPool 类,它是一个特殊的 executor,它包含有两个操作 fork() 和 join()  (以及它们的变种形式),以及一个名为工作窃取算法的内部算法。

Fork/Join 框架的基本特征

使用此框架时,你的主方法代码类似于:

if ( problem.size() > DEFAULT_SIZE) {
       divideTasks();
       executeTask();
       taskResults=joinTasksResult();
       return taskResults;
   } else {
       taskResults=solveBasicProblem();
       return taskResults;
}

 代码中最重要的部分是允许你有效地分解并执行子任务并从这些子任务中获得运行结果用来计算父任务的结果。这个功能由 ForkJoinTask 的以下两个方法提供:

  • fork() 方法:这个方法允许你发送一个子任务给 Fork/Join executor
  • join() 方法:该方法允许你等待一个子任务的完成并返回其运行结果

这两个方法还有其它不同的变形。Fork/Join 框架还有一个关键的特性:决定哪些任务被执行的工作窃取算法。当一个任务使用 join() 方法等待一个子任务完成时,执行此任务的线程会从任务池中取不同的任务运行。通过此方法,Fork/Join executor 的线程总是在执行任务,从而提高了应用的性能。

Java 8 提供了 Fork/Join 框架的一个新特性。现在每个 Java 应用程序都有一个默认的名为 ForkJoinPool 的通用池。你可以通过调用静态方法 ForkJoinPool.commonPool() 来获得。默认情况下这个默认的 Fork/Join executor 会被使用,executor 里线程数由运行计算机的可用处理器数来决定。你可以通过改变操作系统值 java.util.concurrent.ForkJoinPool.common.parallelism 来改变其默认行为。

一些 Java API 本身也使用 Fork/Join 框架来实现并行操作。例如用并发方式来对数组排序的 Arrays 类的 parallelSort() 方法,以及 Java 8 提供的 parallel streams。

Fork/Join 框架的局限性

Fork/Join 框架具有以下局限性:

  • 你不会再细分的基本问题不能太小也不能太大。根据 Java API 文档,它必须具有 100 到 10,000 个基本运算步骤
  • 你不能使用能阻塞的 I/O 操作,例如用户输入或从网络中获取数据。这些操作会造成CPU处理器空闲,从而降低并发处理速度
  • 你不能在任务里抛出checked exceptions。你必须在代码中获取这些 checked exceptions (例如把它们封装成 unchecked RuntimeException),Unchecked exceptions会被特殊处理。

Fork/Join 框架的组件

Fork/Join 框架有以下五个基础类:

  • ForkJoinPool类:该类实现了 Executor 和 ExecutorService 接口,它用来执行 Fork/Join 任务。Java 提供了一个默认的 ForkJoinPool 对象 (通用池),但你可以使用一些构造函数来创建你自己的对象。你必须制定并发级别 (最大的运行并发线程数)。默认情况下,它使用可用的处理器数作为并发级别。
  • ForkJoinTask类:该类是所有的 Fork/Join 任务的基础抽象类。它提供了 fork() 和 join() 方法以及一些其他的变种。同时它也实现了 Future 接口并提供了一些方法来判断一个任务是否正常结束,被取消或是否抛出了 unchecked 异常。RecursiveTask, RecursiveAction 以及 CountedCompleter 类提供了 compute() 这个抽象犯法,你必须在子类中实现它来做一些实际的计算操作。
  • RecursiveTask类:该类继承了 ForkJoinTask 类。它也是一个抽象类,你可以使用它如果你要实现有返回值的 Fork/Join 任务。
  • RecursiveAction类:该类继承了 ForkJoinTask类。它也是一个抽象类,你可以使用它如果你要实现没有返回值的 Fork/Join 任务。
  • CountedCompleter类:该类继承了 ForkJoinTask 类。它是 Java 8 API 的一个新特性。如果任务结束后需要触发其它任务,你可以使用它。
public class ForkJoinDemo {
    public static void main(String[] args) {
        int[] ints = IntStream.range(1, 5).toArray();

        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        SumTask sumTask = new SumTask(ints, 0, ints.length, 1);
        long startTime = System.currentTimeMillis();
        int sum = forkJoinPool.invoke(sumTask);
        long endTime = System.currentTimeMillis();
        System.out.println("Fork/join sum: " + sum + " in " + (endTime - startTime) + " ms.");
        forkJoinPool.shutdown();
        try {
            forkJoinPool.awaitTermination(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class SumTask extends RecursiveTask<Integer> {

    private int[] dataList;
    private int from;
    private int end;
    private int divideFactor;

    public SumTask(int[] dataList, int from, int end, int divideFactor) {
        this.dataList = dataList;
        this.from = from;
        this.end = end;
        this.divideFactor = divideFactor;
    }

    @Override
    protected Integer compute() {

        if ((end - from) <= divideFactor) {
            int sum = 0;
            for (int i = from; i < end; i++) {
                sum += dataList[i];
            }

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(String.format("%s : compute %d~%d = %d", Thread.currentThread().getName(), from, end, sum));

            return sum;
        } else {
            int middle = (end + from) / 2;
            System.out.println(String.format("%s : split %d~%d ==> %d~%d, %d~%d", Thread.currentThread().getName(), from, end, from, middle, middle, end));
            SumTask task1 = new SumTask(dataList, from, middle, divideFactor);
            SumTask task2 = new SumTask(dataList, middle, end, divideFactor);

            // 这里我们使用 invokeAll() 方法,如果使用 task1.fork() 和 task2.fork() 方法
            // 会导致当前线程变成监工,不参与子任务的执行,从而降低性能
            invokeAll(task1, task2);

            int sum1 = task1.join();
            int sum2 = task2.join();

            int sum = sum1 + sum2;
            System.out.println(Thread.currentThread().getName() + " : " + "result = " + sum1 + " + " + sum2 + " ==> " + sum);

            return sum1 + sum2;
        }
    }
}

Fork/Join 框架

ForkJoinPool 类提供了 execute(),invoke(),submit() 方法来向线程池中提交任务。它们之间的区别是:

  • execute():发送任务给 ForkJoinPool 并立刻返回一个 void 值 (即提交无返回值的任务)
  • invoke():发送任务给 ForkJoinPool,当任务运行结束才返回
  • submit():发送任务给 ForkJoinPool,立刻返回一个 Future 对象用来控制任务的状态并获取结果

我们知道 ForkJoinPool 处理基于 ForkJoinTask 类的任务,但是也能执行基于 Runnable 和 Callable 接口的任务。你可以使用 submit() 方法,该方法可以接收一个 Runnable 对象,一个有返回值的 Runnable 对象,以及一个 Callable 对象。

ForkJoinTask:

  • get (long timeout, TimeUnit unit) 来获取任务返回的结果。这个方法在指定时间内等待任务执行完并返回结果。如果指定时间之内任务无法完成,则抛出 TimeoutException 异常。
  • invoke() 方法从语法上类似于 fork(); join() 但它总是尝试在当前线程执行。
  • 名字以 “quiet" 开始的方法没有返回运行结果也不报告异常。这些方法当有多个任务在执行时,结果或异常在运行结束后才被处理是很有用。
  • 一对 fork-join 就像并行递归方法里的 call (fork) 和 return (join)。因此 join 必须从最里面开始,例如, a.fork();b.form();b.join();a.join() 比 a 在 b 之前 join 更有效率。
  • 发表于 2019-09-21 22:20
  • 阅读 ( 105 )
  • 分类:网络文章

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除