By 'dynamic' I mean, that the pool is shrinking and growing automatically but it does have upper bound (so it can't grow indefinitely).
The API of the pool would be quite simple:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public Future<T> submitTask(Callable<T> task); | |
public int threadsCount(); |
Having control over maximum number of threads and their longevity would be another feature.
Let's try to document functional requirements in the form of unit tests.
That would be our test setup:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static final int MAX_THREADS = 2; | |
private static final int KEEP_ALIVE_TIME_IN_MILLISECONDS = 300; | |
private static final long DELAY_TIME_IN_MILLISECONDS = 50; | |
private DynamicThreadPool<String> dynamicThreadPool | |
= new DynamicThreadPool<>(MAX_THREADS, KEEP_ALIVE_TIME_IN_MILLISECONDS); | |
private static final Callable<String> TASK_WITH_DELAY = () -> { | |
simulateDelay(DELAY_TIME_IN_MILLISECONDS); | |
return randomUUID().toString(); | |
}; | |
private static void simulateDelay(long delayTimeInMilliseconds) throws InterruptedException { | |
MILLISECONDS.sleep(delayTimeInMilliseconds); | |
} |
Initial number of threads should be zero:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void shouldHaveZeroThreadsAtTheBeginning() { | |
int activeThreads = dynamicThreadPool.threadsCount(); | |
assertThat(activeThreads).isEqualTo(0); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void shouldComputeConcurrently() throws Exception { | |
Future<String> firstFuture = dynamicThreadPool.submitTask(TASK_WITH_DELAY); | |
Future<String> secondFuture = dynamicThreadPool.submitTask(TASK_WITH_DELAY); | |
String firstResult = firstFuture.get(); | |
String secondResult = secondFuture.get(DELAY_TIME_IN_MILLISECONDS / 2, MILLISECONDS); | |
assertThat(firstResult).isNotNull(); | |
assertThat(secondResult).isNotNull(); | |
} |
Number of threads should grow automatically:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void shouldDynamicallyIncreaseThreadsNumber() { | |
dynamicThreadPool.submitTask(TASK_WITH_DELAY); | |
dynamicThreadPool.submitTask(TASK_WITH_DELAY); | |
await().atMost(KEEP_ALIVE_TIME_IN_MILLISECONDS, SECONDS) | |
.until(() -> dynamicThreadPool.threadsCount() == MAX_THREADS); | |
} |
The size of thread pool should shrink when there is no active tasks and keep alive time was exceeded:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void shouldHaveZeroThreadsWhenNoActiveTasks() { | |
dynamicThreadPool.submitTask(() -> randomUUID().toString()); | |
dynamicThreadPool.submitTask(() -> randomUUID().toString()); | |
await().atMost(KEEP_ALIVE_TIME_IN_MILLISECONDS * 2, SECONDS) | |
.until(() -> dynamicThreadPool.threadsCount() == 0); | |
} |
If the number of submitted task is greater than maximum threads number, we should block:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void shouldBlockAddingTasksWhenMaxThreadsCountExceeded() throws Exception { | |
dynamicThreadPool.submitTask(TASK_WITH_DELAY); | |
dynamicThreadPool.submitTask(TASK_WITH_DELAY); | |
long startTime = System.currentTimeMillis(); | |
Future<String> lastTaskFuture = dynamicThreadPool.submitTask(TASK_WITH_DELAY); | |
String result = lastTaskFuture.get(); | |
long timeElapsed = System.currentTimeMillis() - startTime; | |
assertThat(timeElapsed).isGreaterThan(DELAY_TIME_IN_MILLISECONDS); | |
assertThat(result).isNotNull(); | |
} |
Actual implementation is quite simple, we need to use ThreadPoolExecutor:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DynamicThreadPool<T> { | |
private final ThreadPoolExecutor executor; | |
public DynamicThreadPool(int maxThreads, int keepAliveTimeInMillis) { | |
executor = new ThreadPoolExecutor(0, maxThreads, keepAliveTimeInMillis, MILLISECONDS, new SynchronousQueue<>()); | |
executor.setRejectedExecutionHandler((r, theExecutor) -> { | |
try { | |
theExecutor.getQueue().put(r); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException(e); | |
} | |
}); | |
} | |
public Future<T> submitTask(Callable<T> task) { | |
return executor.submit(task); | |
} | |
public int threadsCount() { | |
return executor.getPoolSize(); | |
} | |
} |
There are two important things here.
First, we are required to create our own RejectedExecutionHandler to support blocking when the maximum number is exceed.
Secondly, we need to use the blocking queue of size zero (in our case I chose SynchronousQueue) - the queue needs to be filled up before we can create more than initial number of threads (in our case zero).
The whole project can be found at github.
No comments:
Post a Comment