We were taking a synchronous approach, which was quite easy to implement but very slow.
This time we'll try to go the asynchronous way using blocking queues and thread pool.
First of all, let's take a look at our Reader class:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Reader implements Runnable { | |
private final BlockingQueue<String> queue; | |
private final CountDownLatch latch; | |
public Reader(BlockingQueue<String> queue, CountDownLatch latch) { | |
this.queue = queue; | |
this.latch = latch; | |
} | |
void read(String fileName) throws Exception { | |
File file = new File(fileName); | |
BufferedReader br = new BufferedReader(new FileReader(file)); | |
String line; | |
while ((line = br.readLine()) != null) { | |
queue.put(line); | |
} | |
br.close(); | |
latch.countDown(); | |
} | |
@Override | |
public void run() { | |
try { | |
read("numbers.txt"); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} |
The LineProcessor class is more complicated:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class LineProcessor implements Runnable { | |
private final PrimeFactorCounter primeFactorCounter = new PrimeFactorCounter(); | |
private final BlockingQueue<String> inputQueue; | |
private final BlockingQueue<String> successQueue; | |
private final BlockingQueue<String> exceptionsQueue; | |
private final CountDownLatch inputLatch; | |
private final CountDownLatch outputLatch; | |
public LineProcessor(BlockingQueue<String> inputQueue, BlockingQueue<String> successQueue, BlockingQueue<String> exceptionsQueue, CountDownLatch inputLatch, CountDownLatch outputLatch) { | |
this.inputQueue = inputQueue; | |
this.successQueue = successQueue; | |
this.exceptionsQueue = exceptionsQueue; | |
this.inputLatch = inputLatch; | |
this.outputLatch = outputLatch; | |
} | |
@Override | |
public void run() { | |
process(); | |
} | |
void process() { | |
while (true) { | |
String number = null; | |
try { | |
long latchCount = inputLatch.getCount(); | |
int queueSize = inputQueue.size(); | |
if (latchCount == 0 && queueSize == 0) { | |
outputLatch.countDown(); | |
return; | |
} | |
number = inputQueue.poll(1, TimeUnit.SECONDS); | |
long value = Long.valueOf(number); | |
if (value > 0) { | |
long factor = primeFactorCounter.primeFactors(value); | |
successQueue.put(value + "," + factor + "\n"); | |
} | |
} catch (Exception e) { | |
exceptionsQueue.add(number + ", " + e + "\n"); | |
} | |
} | |
} | |
} |
inputQueue is the queue that Reader is writing to.
successQueue and exceptionsQueue are the queues that will be populated based on the line processing result.
inputLatch is the latch modified by the Reader.
outputLatch will be informed when there are no more lines to be processed.
The LineProcessor checks if the Reader already finished by checking inputLatch and inputQueue.
If it didn't, it takes a line from the inputQueue and and populates appropriate queue with the result.
If it did finish, it informs outputLatch and terminates processing.
The Writer class is quite simple:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Writer implements Runnable { | |
private final String fileName; | |
private final BlockingQueue<String> queue; | |
private final CountDownLatch latch; | |
public Writer(BlockingQueue<String> queue, CountDownLatch latch, String fileName) { | |
this.queue = queue; | |
this.fileName = fileName; | |
this.latch = latch; | |
} | |
@Override | |
public void run() { | |
write(); | |
} | |
void write() { | |
try (BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(fileName, true))) { | |
while (true) { | |
long latchCount = latch.getCount(); | |
int queueSize = queue.size(); | |
if (latchCount == 0 && queueSize == 0) { | |
return; | |
} | |
String line = queue.poll(1, TimeUnit.SECONDS); | |
if (line != null) { | |
bufferedWriter.write(line); | |
} | |
} | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} |
It terminates, if the queue is empty and the latch was informed that there is no more messages.
The only thing left is the Bootstrap class that binds it all together:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Bootstrap { | |
private final Logger logger = LoggerFactory.getLogger(this.getClass()); | |
private static final int THREAD_POOL_SIZE = 4; | |
static final String PRIME_FACTOR_FILE = "primeFactorCounter-threadPool.txt"; | |
static final String EXCEPTIONS_FILE = "exceptions-threadPool.txt"; | |
static final String SUMMARY_FILE = "summary-threadPool.txt"; | |
private BlockingQueue<String> inputQueue = new ArrayBlockingQueue<>(10000); | |
private BlockingQueue<String> successQueue = new ArrayBlockingQueue<>(10000); | |
private BlockingQueue<String> exceptionsQueue = new ArrayBlockingQueue<>(10000); | |
private CountDownLatch inputLatch = new CountDownLatch(1); | |
private CountDownLatch outputLatch = new CountDownLatch(THREAD_POOL_SIZE); | |
public void start() throws Exception { | |
long startTime = System.currentTimeMillis(); | |
Reader reader = new Reader(inputQueue, inputLatch); | |
Writer successWriter = new Writer(successQueue, outputLatch, PRIME_FACTOR_FILE); | |
Writer exceptionsWriter = new Writer(exceptionsQueue, outputLatch, EXCEPTIONS_FILE); | |
Collection<LineProcessor> lineProcessors = new ArrayList<>(); | |
for (int i = 0; i < THREAD_POOL_SIZE; i++) { | |
lineProcessors.add(new LineProcessor(inputQueue, successQueue, exceptionsQueue, inputLatch, outputLatch)); | |
} | |
ExecutorService lineProcessorExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); | |
for (LineProcessor lineProcessor : lineProcessors) { | |
lineProcessorExecutor.execute(lineProcessor); | |
} | |
new Thread(reader).start(); | |
Thread successWriterThread = new Thread(successWriter); | |
successWriterThread.start(); | |
Thread exceptionsWriterThread = new Thread(exceptionsWriter); | |
exceptionsWriterThread.start(); | |
lineProcessorExecutor.shutdown(); | |
successWriterThread.join(); | |
exceptionsWriterThread.join(); | |
String summary = String.format("thread pool processing, time taken %d ms", System.currentTimeMillis() - startTime); | |
logger.debug(summary); | |
FileUtils.writeStringToFile(new File(SUMMARY_FILE), summary); | |
} | |
} |
Line processors are wrapped in a thread pool.
Reader and two writers are started as threads. We then wait until all writers are finished and the summary is printed.
We can check the behavior with the following test:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ThreadPoolSystemTest { | |
@Before | |
public void setUp() { | |
new File(Bootstrap.PRIME_FACTOR_FILE).delete(); | |
new File(Bootstrap.EXCEPTIONS_FILE).delete(); | |
new File(Bootstrap.SUMMARY_FILE).delete(); | |
} | |
@Test | |
public void shouldProcessFile() throws Exception { | |
// when | |
new Bootstrap().start(); | |
// then | |
assertResultFile(); | |
assertExceptionFile(); | |
assertSummaryFile(); | |
} | |
private void assertResultFile() throws IOException { | |
List<String> lines = FileUtils.readLines(new File(Bootstrap.PRIME_FACTOR_FILE)); | |
assertThat(lines.size(), is(9900)); | |
assertThat(lines.contains("5556634922133,5"), is(true)); | |
} | |
private void assertExceptionFile() throws IOException { | |
File exceptionsFile = new File(Bootstrap.EXCEPTIONS_FILE); | |
assertThat(exceptionsFile.exists(), is(true)); | |
String exceptionsFileContent = FileUtils.readFileToString(exceptionsFile); | |
assertThat(exceptionsFileContent, containsString("badData1")); | |
assertThat(exceptionsFileContent, containsString("badData2")); | |
} | |
private void assertSummaryFile() { | |
assertThat(new File(Bootstrap.SUMMARY_FILE).exists(), is(true)); | |
} | |
} |
The whole project can be found at github.
No comments:
Post a Comment