Monday, 6 July 2015

Dynamic thread pool

Sometimes we're facing the problem, that we need to solve using dynamic thread pool.

By 'dynamic' I mean, that the pool is shrinking and growing automatically but it does have upper bound (so it can't grow indefinitely).

The API of the pool would be quite simple:

public Future<T> submitTask(Callable<T> task);
public int threadsCount();
We should be able to submit the task to be executed asynchronously and monitor number of threads in the pool.
Having control over maximum number of threads and their longevity would be another feature.

Let's try to document functional requirements in the form of unit tests.
That would be our test setup:
private static final int MAX_THREADS = 2;
private static final int KEEP_ALIVE_TIME_IN_MILLISECONDS = 300;
private static final long DELAY_TIME_IN_MILLISECONDS = 50;
private DynamicThreadPool<String> dynamicThreadPool
= new DynamicThreadPool<>(MAX_THREADS, KEEP_ALIVE_TIME_IN_MILLISECONDS);
private static final Callable<String> TASK_WITH_DELAY = () -> {
simulateDelay(DELAY_TIME_IN_MILLISECONDS);
return randomUUID().toString();
};
private static void simulateDelay(long delayTimeInMilliseconds) throws InterruptedException {
MILLISECONDS.sleep(delayTimeInMilliseconds);
}
view raw testConfig.java hosted with ❤ by GitHub

Initial number of threads should be zero:
@Test
public void shouldHaveZeroThreadsAtTheBeginning() {
int activeThreads = dynamicThreadPool.threadsCount();
assertThat(activeThreads).isEqualTo(0);
}
view raw test1.java hosted with ❤ by GitHub
The execution of the tasks should be done concurrently:
@Test
public void shouldComputeConcurrently() throws Exception {
Future<String> firstFuture = dynamicThreadPool.submitTask(TASK_WITH_DELAY);
Future<String> secondFuture = dynamicThreadPool.submitTask(TASK_WITH_DELAY);
String firstResult = firstFuture.get();
String secondResult = secondFuture.get(DELAY_TIME_IN_MILLISECONDS / 2, MILLISECONDS);
assertThat(firstResult).isNotNull();
assertThat(secondResult).isNotNull();
}
view raw test2.java hosted with ❤ by GitHub

Number of threads should grow automatically:
@Test
public void shouldDynamicallyIncreaseThreadsNumber() {
dynamicThreadPool.submitTask(TASK_WITH_DELAY);
dynamicThreadPool.submitTask(TASK_WITH_DELAY);
await().atMost(KEEP_ALIVE_TIME_IN_MILLISECONDS, SECONDS)
.until(() -> dynamicThreadPool.threadsCount() == MAX_THREADS);
}
view raw test3.java hosted with ❤ by GitHub

The size of thread pool should shrink when there is no active tasks and keep alive time was exceeded:
@Test
public void shouldHaveZeroThreadsWhenNoActiveTasks() {
dynamicThreadPool.submitTask(() -> randomUUID().toString());
dynamicThreadPool.submitTask(() -> randomUUID().toString());
await().atMost(KEEP_ALIVE_TIME_IN_MILLISECONDS * 2, SECONDS)
.until(() -> dynamicThreadPool.threadsCount() == 0);
}
view raw test4.java hosted with ❤ by GitHub

If the number of submitted task is greater than maximum threads number, we should block:
@Test
public void shouldBlockAddingTasksWhenMaxThreadsCountExceeded() throws Exception {
dynamicThreadPool.submitTask(TASK_WITH_DELAY);
dynamicThreadPool.submitTask(TASK_WITH_DELAY);
long startTime = System.currentTimeMillis();
Future<String> lastTaskFuture = dynamicThreadPool.submitTask(TASK_WITH_DELAY);
String result = lastTaskFuture.get();
long timeElapsed = System.currentTimeMillis() - startTime;
assertThat(timeElapsed).isGreaterThan(DELAY_TIME_IN_MILLISECONDS);
assertThat(result).isNotNull();
}
view raw test5.java hosted with ❤ by GitHub

Actual implementation is quite simple, we need to use ThreadPoolExecutor:
public class DynamicThreadPool<T> {
private final ThreadPoolExecutor executor;
public DynamicThreadPool(int maxThreads, int keepAliveTimeInMillis) {
executor = new ThreadPoolExecutor(0, maxThreads, keepAliveTimeInMillis, MILLISECONDS, new SynchronousQueue<>());
executor.setRejectedExecutionHandler((r, theExecutor) -> {
try {
theExecutor.getQueue().put(r);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
}
public Future<T> submitTask(Callable<T> task) {
return executor.submit(task);
}
public int threadsCount() {
return executor.getPoolSize();
}
}

There are two important things here.
First, we are required to create our own RejectedExecutionHandler to support blocking when the maximum number is exceed.
Secondly, we need to use the blocking queue of size zero (in our case I chose SynchronousQueue) - the queue needs to be filled up before we can create more than initial number of threads (in our case zero).
The whole project can be found at github.

No comments:

Post a Comment