第二章 - 管理多个线程 - Executors

Executor框架相比于传统的并发系统基础实现具有很多的优势。传统做法是实现一个Runnable接口的类,然后使用该类的对象来直接创建Thread实例。 这种做法有一些问题,特别是当你启动太多线程的时...

Executor框架相比于传统的并发系统基础实现具有很多的优势。传统做法是实现一个Runnable接口的类,然后使用该类的对象来直接创建Thread实例。

这种做法有一些问题,特别是当你启动太多线程的时候,你可能降低了整个系统的性能。

Executor 框架里的基础组件

  • Executor 接口:这是 executor 框架里的基本组件。它只定义了一个允许程序员给executor发送Runnable对象的方法。
  • ExecutorService 接口:这个接口继承了 Executor 接口,并包含了额外一些方法来增加框架的功能,其中包括:
  1.           运行一些具有返回值的任务:Runnable 接口提供的 run() 方法没有返回值,但是使用 executors,你可以运行一些有返回值的任务
  2.           使用一个方法来执行一系列的任务
  3.           完成 executor 的执行并等待其销毁
  • ThreadPoolExecutor 类:这个类实现了 Executor 和 ExecutorService 两个接口。另外,它提供了一些其它方法来获取 executor 的状态(工作线程数,已经被执行的任务数等等),配置 executor(最小和最大工作线程数)。
  • Executors 类:该类提供了一些实用方法用以创建 Executor 对象以及其它相关的类。

 本章提供了几个例子来解释Executor的使用。例子代码有点长,不过相对还是蛮简单的清晰的。

实例1 - K 邻近算法(K-nearest neighbors algorithm)

K邻近算法是一个用于监督分类的简单机器学习算法。算法包含以下主要部分:

  • 训练数据集:该数据集由一系列样本组成,每个样本具由一个或多个属性,还有一个特殊属性来记录每个样本的标签
  • 一个距离矩阵:这个矩阵用来计算需要分类的样本和训练数据集里样本的距离
  • 测试数据集:该数据集用来衡量算法的行为。

当要对一个样本进行归类时,算法计算该样本和训练数据集里所有样本的距离。然后再取距离最小的的 k 个样本,这 k 个样本中,哪个标签数最多,那么这个标签就赋给要归类的那个样本。根据第一章得出的经验,我们从算法的串行版本开始,然后从串行版本演变到并行版本。

K邻近算法 - 串行版本

public class KnnClassifier {
    private List<? extends Sample> dataSet;
    private int k;

    public KnnClassifier(List<? extends Sample> dataSet, int k) {
        this.dataSet = dataSet;
        this.k = k;
    }

    public String classify(Sample example) {
        Distance[] distances = new Distance[dataSet.size()];
        int index = 0;

        // 计算新样本和训练数据集中各样本之间的距离
        for (Sample localExample : dataSet) {
            distances[index] = new Distance();
            distances[index].setIndex(index);
            distances[index].setDistance
                    (EuclideanDistanceCalculator.calculate(localExample,
                            example));
            index++;
        }

        // 对计算得到的距离排序以便获取K个最近距离的样本
        Arrays.sort(distances);

        Map<String, Integer> results = new HashMap<>();
        for (int i = 0; i < k; i++) {
            Sample localExample = dataSet.get(distances[i].getIndex());
            String tag = localExample.getTag();
            results.merge(tag, 1, (a, b) -> a + b);
        }

        // 返回最近k个样本总数最多的那个标签
        return Collections.max(results.entrySet(),
                Map.Entry.comparingByValue()).getKey();
    }
}
 
// 该类用来计算两个样本的距离
public class EuclideanDistanceCalculator {
    public static double calculate (Sample example1, Sample
            example2) {
        double ret=0.0d;
        double[] data1=example1.getExample();
        double[] data2=example2.getExample();
        if (data1.length!=data2.length) {
            throw new IllegalArgumentException ("Vector doesn't have the
                    same length");
        }
        for (int i=0; i<data1.length; i++) {
            ret+=Math.pow(data1[i]-data2[i], 2);
        }
        return Math.sqrt(ret);
    }
}
 

K邻近算法 - 细颗粒度的并发版本

如果你分析以上的算法的并行版本,你会发现有两点你可以用并行来实现:

  • 距离的计算:计算输入的样本和训练数据集中各样本的距离的循环中,每一个循环都是独立的,他们之间并不互相依赖。
  • 距离的排序:Java 8 提供了 Arrays 类的 parallelSort() 方法来并发实现数据排序

在细颗粒度并发版本中,我们为每一个计算输入样本和训练数据集中样本的距离创建一个任务。由此可见,所谓的细颗粒度就是我们创建了很多的任务。

public class KnnClassifierParallelIndividual {
    private List<? extends Sample> dataSet;
    private int k;
    private ThreadPoolExecutor executor;
    private int numThreads;
    private boolean parallelSort;

    public KnnClassifierParallelIndividual(List<? extends Sample>
                                                   dataSet, int k, 
                                                   int factor, 
                                                   boolean parallelSort) {
        this.dataSet = dataSet;
        this.k = k;

        // 动态获取运行此程序的处理器或核的数量来决定线程池中线程的数量
        numThreads = factor * (Runtime.getRuntime().availableProcessors());
        executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(numThreads);
        this.parallelSort = parallelSort;
    }

    /**
     * 因为我们为每个距离计算创建了一个任务,因此主线程需要等待所有任务完成后才能继续,
     * 我们使用 CountDownLatch 这个类来同步所有任务的完成,
     * 我们用任务总数也就是数据集中样本的总数来初始化 CountDownLatch,
     * 每个任务完成后调用 countDown() 方法
     */
    public String classify(Sample example) throws Exception {
        Distance[] distances = new Distance[dataSet.size()];
        CountDownLatch endController = new CountDownLatch(dataSet.size());
        int index = 0;
        for (Sample localExample : dataSet) {
            IndividualDistanceTask task = new
                    IndividualDistanceTask(distances, index, localExample,
                    example, endController);
            executor.execute(task);
            index++;
        }
        endController.await();

        if (parallelSort) {
            Arrays.parallelSort(distances);
        } else {
            Arrays.sort(distances);
        }

        Map<String, Integer> results = new HashMap<>();
        for (int i = 0; i < k; i++) {
            Sample localExample = dataSet.get(distances[i].getIndex());
            String tag = localExample.getTag();
            results.merge(tag, 1, (a, b) -> a + b);
        }

        // 返回最近k个样本总数最多的那个标签
        return Collections.max(results.entrySet(),
                Map.Entry.comparingByValue()).getKey();
    }

    public void destroy() {
        executor.shutdown();
    }
}
 
public class IndividualDistanceTask implements Runnable {
    private Distance[] distances;
    private int index;
    private Sample localExample;
    private Sample example;
    private CountDownLatch endController;

    public IndividualDistanceTask(Distance[] distances, int index,
                                  Sample localExample,
                                  Sample example, CountDownLatch endController) {
        this.distances = distances;
        this.index = index;
        this.localExample = localExample;
        this.example = example;
        this.endController = endController;
    }

    public void run() {
        distances[index] = new Distance();
        distances[index].setIndex(index);
        distances[index].setDistance
                (EuclideanDistanceCalculator.calculate(localExample,
                        example));

        // 任务完成,调用CountDownLatch的countDown()
        endController.countDown();
    }
}
 
  • 发表于 2019-09-16 16:00
  • 阅读 ( 59 )
  • 分类:网络文章

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除