Sunday, 16 March 2014

Several approaches to processing events (part 2 - asynchronous processing with blocking queues and thread pool)

In the previous post we were solving problem with computing prime factors for numbers read from a large file.
We were taking a synchronous approach, which was quite easy to implement but very slow.
This time we'll try to go the asynchronous way using blocking queues and thread pool.

First of all, let's take a look at our Reader class:

The reader reads each line and puts it into the blocking queue. Once it's finished, it informs the latch.

The LineProcessor class is more complicated:

Let's take a closer look at the collaborators.

inputQueue is the queue that Reader is writing to.
successQueue and exceptionsQueue are the queues that will be populated based on the line processing result.
inputLatch is the latch modified by the Reader.
outputLatch will be informed when there are no more lines to be processed.

The LineProcessor checks if the Reader already finished by checking inputLatch and inputQueue.
If it didn't, it takes a line from the inputQueue and and populates appropriate queue with the result.
If it did finish, it informs outputLatch and terminates processing.

The Writer class is quite simple:

It takes the messages from the queue and writes it to a file.
It terminates, if the queue is empty and the latch was informed that there is no more messages.

The only thing left is the Bootstrap class that binds it all together:

The latches and queues are initialized and then passed to the processing objects via constructor.
Line processors are wrapped in a thread pool.
Reader and two writers are started as threads. We then wait until all writers are finished and the summary is printed.

We can check the behavior with the following test:

On a machine with Core2 Duo 2.53GHz it takes ~43 seconds to process 10 000 numbers.
The whole project can be found at github.