BlockingQueue and ExecutorService
This is a quick and dirty post, but I promised I would publish everything I research at Playtomic.
Last week, we were having a discussion about how to limit how many tasks an ExecutorService can enqueue. We were trying to control how much memory a service can handle before it throws out of memory exceptions. This service accepts messages from both a Kafka topic and from an API. Those operations end in the same internal logic, which is threaded.
There is a kind of Queue, BlockingQueue, that can wait until a spot in the queue is free. It would seem that using an ExecutionService with a BlockingQueue would wait when submitting a task until that queue is not full. But it is not, the ExecutionService rejects the task.
You know that hours of trial and error can save you hours of reading the manual 😉. I’m proud to say that I have read the manual first this time.
This test shows what happens:
public class BlockingQueueExecutorServiceTest {
@Test
public void submitTest() {
// Worst case scenario: accept only 1 thread in the queue.
int nThreads = 1;
ExecutorService exService = new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(nThreads));
// Full this with tasks
for (int i = 0; i < 10000; ++i) {
WaitingTask t = new WaitingTask(i);
exService.submit(t);
}
}
private static class WaitingTask implements Runnable {
int index;
public WaitingTask(int index) {
this.index = index;
}
@Override
public void run() {
try {
log.info("Running task {}", index);
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@550dbc7a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4dbb42b7[Wrapped task = com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest$WaitingTask@66f57048]] rejected from java.util.concurrent.ThreadPoolExecutor@21282ed8[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
at com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest.submitTest(BlockingQueueExecutorServiceTest.java:28)
... more boring stacktrace
If you want to wait until queue is not full, you have to provide a RejectedExecutionHandler which does that. For example, Spring’s CallerBlocksPolicy.
public class BlockingQueueExecutorServiceTest {
@Test
public void submitTest() {
// Worst case scenario: accept only 1 thread in the queue.
int nThreads = 1;
CallerBlocksPolicy policy = new CallerBlocksPolicy(10000); // 10secs
ExecutorService exService = new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(nThreads),
policy);
// Full this with tasks
for (int i = 0; i < 10000; ++i) {
WaitingTask t = new WaitingTask(i);
exService.submit(t);
}
}
private static class WaitingTask implements Runnable {
int index;
public WaitingTask(int index) {
this.index = index;
}
@Override
public void run() {
try {
log.info("Running task {}", index);
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
And this time we get:
12:21:32.409 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 0
12:21:32.422 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Attempting to queue task execution for 10000 milliseconds
12:21:33.420 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 1
12:21:33.420 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Task execution queued
12:21:33.421 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Attempting to queue task execution for 10000 milliseconds
12:21:34.423 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 2