Thread sleeping in a Thread-Pool

问题: Let's say we have a thread-pool with a limited number of threads. Executor executor = Executors.newFixedThreadPool(3); Now let's say one of the active tasks must sleep...

问题:

Let's say we have a thread-pool with a limited number of threads.

Executor executor = Executors.newFixedThreadPool(3);

Now let's say one of the active tasks must sleep for 3 seconds (for whatever reason).

executor.execute(() -> {
    try {
        Thread.sleep(3000L);
    } catch (InterruptedException ignore) {}
});

How can we implement such a thread-pool in way that, when a task sleeps (or waits on a monitor/condition), the thread1 can be used effectively to run another task?

1 By thread I do not mean the "physical" Java thread, because that would be impossible while the thread is asleep. What I mean is, the thread-pool to have an abstract implementation which virtually seems to allow a thread to run another task during sleeping. The key point is that there are always N simultaneously running (non-sleeping) tasks.

Somewhat similar to the way a monitor handles access to a critical region:

  • If a thread waits on a resource, the resource can be used by another thread.
  • If the thread is notified, it is placed into the waiting set to (re-)gain access to that resource.

回答1:

Using ScheduledExecutorService to run the task after a delay would be preferable to using Thread.sleep() inside a thread.

ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
executor.schedule(() -> { ... }, 3000, TimeUnit.MILLISECONDS);

However in your example the thread pool has only 3 threads. Until you reach dozens or perhaps hundreds of threads there shouldn't be much impact by having sleeping threads on a server class machine.


回答2:

I implemented a minimal working example which basically does what I think you want.

A Task interface (much like the runnable interface, just with a passed Context to perform waiting)

package io.medev.stackoverflow;

import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;

public interface Task {

    /**
     * Wraps the given runnable into a Task with a not guessable execution time (meaning guessExecutionTime always returns Long.MAX_VALUE)
     * @param runnable The runnable to wrap
     * @return a Task wrapping this runnable
     */
    static Task wrap(Runnable runnable) {
        return wrap(runnable, Long.MAX_VALUE);
    }

    /**
     * Wraps the given runnable using the given guessedExecutionTimeMillis
     * @param runnable The runnable to wrap
     * @param guessedExecutionTimeMillis The guessed execution time in millis for this runnable
     * @return a Task wrapping this runnable
     */
    static Task wrap(Runnable runnable, long guessedExecutionTimeMillis) {
        return new Task() {
            @Override
            public long guessExecutionTimeMillis() {
                return guessedExecutionTimeMillis;
            }

            @Override
            public void run(Context context) {
                runnable.run();
            }
        };
    }

    /**
     * Should more or less guess how long this task will run
     * @return The execution time of this Task in milliseconds
     */
    long guessExecutionTimeMillis();

    void run(Context context);

    interface Context {

        /**
         * Block until the condition is met, giving other Tasks time to execute
         * @param condition the condition to check
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(BooleanSupplier condition) throws InterruptedException;

        /**
         * Blocks at least for the given duration, giving other Tasks time to execute
         * @param timeout
         * @param timeUnit
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(long timeout, TimeUnit timeUnit) throws InterruptedException;

        /**
         * Blocks until the condition is met or the timeout expires, giving other Tasks time to execute
         * @param condition the condition to check
         * @param timeout
         * @param timeUnit
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException;
    }
}

And a basic fixed thread-pool Executor - but you have to depend on the concrete implementation here:

package io.medev.stackoverflow;

import java.util.Comparator;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;

public class TimeEfficientExecutor implements Executor {

    private final BlockingQueue<Task> taskQueue;
    private final CountDownLatch latch;
    private volatile boolean alive;

    public TimeEfficientExecutor(int threads) {
        this.taskQueue = new PriorityBlockingQueue<>(10, Comparator.comparingLong(Task::guessExecutionTimeMillis));
        this.latch = new CountDownLatch(threads);
        this.alive = true;

        for (int i = 0; i < threads; i++) {
            Thread thread = new Thread(new TimeEfficientExecutorRunnable());
            thread.start();
        }
    }

    @Override
    public void execute(Runnable runnable) {
        execute(Task.wrap(runnable));
    }

    public void execute(Runnable runnable, long guessedExecutionTimeMillis) {
        execute(Task.wrap(runnable, guessedExecutionTimeMillis));
    }

    public void execute(Task task) {
        this.taskQueue.offer(task);
    }

    public void shutdown() {
        this.alive = false;
    }

    public void awaitShutdown() throws InterruptedException {
        this.latch.await();
    }

    public void awaitShutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.latch.await(timeout, timeUnit);
    }

    private class TimeEfficientExecutorRunnable implements Runnable {

        @Override
        public void run() {
            try {
                while (TimeEfficientExecutor.this.alive) {
                    Task task = TimeEfficientExecutor.this.taskQueue.poll();

                    if (task != null) {
                        try {
                            task.run(new IdleTaskContext());
                        } catch (Exception e) {
                            // TODO: logging
                        }
                    }
                }
            } finally {
                TimeEfficientExecutor.this.latch.countDown();
            }
        }
    }

    private class IdleTaskContext implements Task.Context {

        @Override
        public void idle(BooleanSupplier condition) throws InterruptedException {
            idle(condition, Long.MAX_VALUE);
        }

        @Override
        public void idle(long timeout, TimeUnit timeUnit) throws InterruptedException {
            idle(() -> false, timeout, timeUnit);
        }

        @Override
        public void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException {
            idle(condition, System.currentTimeMillis() + timeUnit.toMillis(timeout));
        }

        private void idle(BooleanSupplier condition, long idleUntilTs) throws InterruptedException {
            long leftMillis = idleUntilTs - System.currentTimeMillis();

            while (TimeEfficientExecutor.this.alive && !condition.getAsBoolean() && leftMillis >= 1L) {
                Task task = TimeEfficientExecutor.this.taskQueue.poll(leftMillis, TimeUnit.MILLISECONDS);
                leftMillis = idleUntilTs - System.currentTimeMillis();

                if (task != null) {
                    if (leftMillis >= 1L && task.guessExecutionTimeMillis() < leftMillis) {
                        task.run(new IdleTaskContext());
                    } else {
                        TimeEfficientExecutor.this.taskQueue.offer(task);
                    }
                }
            }
        }
    }
}

Note that you can't just step down the stack - and the stack is bound to the executing thread. That means that it is not possible to jump back into an underlying idleing task if some "Sub"-Task starts idleing. You have to "trust" what each task returns in the guessExecutionTimeMillis-Method.

Thanks to the PriorityQueue used in the Executor, the queue will always return the task with the lowest exeuction time.

  • 发表于 2019-03-23 04:41
  • 阅读 ( 213 )
  • 分类:sof

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除