Saturday, 12 February 2022

How to purge Kafka topic programatically using AdminClient

The need to purge kafka topic (without deleting the topic itself) comes pretty frequently (usually in test environments).

There are possibilities to do it by temporarily changing retention.ms on the topic with bash commands (described here) or with UI like Confluent Control Center (if you're using Confluent).

In this post, we'll check how to do it programatically using AdminClient. (Sample application will be written in Kotlin with Spring Boot and Spring Kafka)

Let's start with a look at the dependencies in build.gradle.kts (project skeleton was generated by Spring Initializr):
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("org.awaitility:awaitility-kotlin:4.1.1")

We need to define a bean with AdminClient:
@Configuration
class AppConfig {
@Value(value = "\${kafka.bootstrapAddress}")
lateinit var bootstrapAddress: String
@Bean
fun adminClient(): AdminClient {
return AdminClient.create(mapOf(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapAddress)
)
}
}
view raw AppConfig.kt hosted with ❤ by GitHub
For the sake of simplicity in this example, we're just setting property of kafka broker address (kafka.bootstrapAddress) taken from application.properties file.


Once we have AdminClient configured, we can use it to purge the topic (notice that AdminClient is injected):
@Component
class TopicPurger(private val adminClient: AdminClient) {
fun purge(topicName: String) {
val topicDescription = adminClient.describeTopics(listOf(topicName)).all().get()[topicName]
val partitionSize = topicDescription?.partitions()?.size
val recordsToDelete = (0..partitionSize!!).associate { partitionIndex ->
TopicPartition(topicName, partitionIndex) to RecordsToDelete.beforeOffset(-1)
}
adminClient.deleteRecords(recordsToDelete)
}
}
view raw TopicPurger.kt hosted with ❤ by GitHub
We need to describe the topic we want to purge to obtain number of partitions.
Once we have that, we create a map - for each partition (as key) we define that we want to delete all records (RecordsToDelete.beforeOffset(-1)).


Let's check how we could write integration test for such functionality:
@SpringBootTest
@EmbeddedKafka(ports = [9092])
class KafkaExampleApplicationTests(
@Autowired val embeddedKafkaBroker: EmbeddedKafkaBroker,
@Autowired val adminClient: AdminClient,
@Autowired val topicPurger: TopicPurger
) {
private val topicName = "test.topic"
@Test
fun purgesTopic() {
embeddedKafkaBroker.addTopics(NewTopic(topicName, 2, 1))
val producer = DefaultKafkaProducerFactory<Int, String>(producerProps(embeddedKafkaBroker)).createProducer()
val partition1 = 0
val partition2 = 1
producer.send(ProducerRecord(topicName, partition1, 0, "value1"))
producer.send(ProducerRecord(topicName, partition2, 1, "value2"))
val topicPartition1 = TopicPartition(topicName, partition1)
val topicPartition2 = TopicPartition(topicName, partition2)
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.earliest(), 0)
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.latest(), 1)
topicPurger.purge(topicName)
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.earliest(), 1)
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.latest(), 1)
}
private fun checkOffsets(topicPartition1: TopicPartition,
topicPartition2: TopicPartition,
offsetSpec: OffsetSpec,
expectedOffset: Long) {
await().until {
val offsets = adminClient.listOffsets(
mapOf(
topicPartition1 to offsetSpec,
topicPartition2 to offsetSpec,
)).all().get()
offsets[topicPartition1]?.offset() == expectedOffset
&& offsets[topicPartition2]?.offset() == expectedOffset
}
}
}
There's a lot going on here, let's break it into smaller chunks.
We're starting embedded kafka broker on port 9092 with @EmbeddedKafka(ports = [9092]) annotation. (Port corresponds with property in application.properties)
At the beginning of the test we add a topic with 2 partitions (line 12).
Then we send 1 message to each partition (lines 16,17).
After that, we check in each partition that earliest offset is set to 0 (line 21) and latest offset is set to 1 (line 22). That's it for the testcase setup.
Later we invoke the actual purging and check offsets in both partitions once again (earliest and latest should be set to 1). That proves that all the records were removed.

The whole project can be found on github.

Sunday, 1 April 2018

Handling nulls with Optional

Let's imagine following situation:
We have some 3rd party code (or legacy one that you can't change at the moment) that fetches objects representing people based on some parameter. This could look as following:

public interface PersonService {
Person getByName(String name);
}
public class Person {
private final String name;
private final int age;
private final Address address;
// ...
}
public class Address {
private final String street;
private final String city;
// ...
}

Our task is to retrieve city in which specific person lives for further processing. We don't have any guarantee over data consistency, so we need to handle it on our own.

Task seems to be simple, how can we achieve this?
One solution could look something like:
public class CityRetriever {
private final PersonService personService;
public String retrieveCity(String name) {
Person person = personService.getByName(name);
if (person != null) {
Address address = person.getAddress();
if (address != null) {
if (address.getCity() != null) {
return address.getCity();
}
}
}
return null;
}
}
As you can see above it's quite ugly. Multiple nested if statements don't look nice. What is more, returning null is not a good practice.

What can we do to get rid of the ifs and indicate that city might not be present?

We can use Optional type:
public class CityRetrieverWithOptional {
private final PersonService personService;
public Optional<String> retrieveCity(String name) {
return Optional.ofNullable(personService.getByName(name))
.map(Person::getAddress)
.map(Address::getCity);
}
}
map methods take care of handling null and we're returning Optional to indicate that value might not be there.
Instead of returning Optional we could consider null object

The whole project (with the tests checking the solution) can be found on github

Saturday, 6 August 2016

Thread safe lazy initialization using proxy

Sometimes we need to defer the object creation until it is needed.
The reasons may be various like creation is time/resource consuming (database connection etc.) and we want to speed up application startup.

This is a use case for the lazy initialization pattern.

We have a a following service interface and implementation that returns current time:

public interface Service {
LocalDateTime getTime();
}
public class DefaultService implements Service {
private static final Logger logger = LoggerFactory.getLogger(DefaultService.class.getName());
public DefaultService() {
logger.debug("Initializing service");
}
@Override
public LocalDateTime getTime() {
return now();
}
}
view raw Service.java hosted with ❤ by GitHub

Let's imagine that the initialization is very costly.

Simple lazy initialization could look as follows:
public class LazyInitService implements Service {
private Service service;
@Override
public LocalDateTime getTime() {
initializeIfNecessary();
return service.getTime();
}
private void initializeIfNecessary() {
if (service == null) {
service = new DefaultService();
}
}
}
We're creating DefaultService only once when we're invoking getTime method.
This solution is fine in single threaded environment (we will cover multi threaded environment later).

The problem arises when the service  does not have few methods but a lot of them (it may happen when you're using some legacy or not well designed library).
Such implementation would look awful in this situation - you would have to implement every method and try to initialize the object in all of them.

In that case we can create a proxy object that would lazy initialization for each method invocation.
To create such proxy we need to implement java.lang.reflect.InvocationHandler interface.
There are some pitfalls the we need to overcome (more info here), that's why it's easier to use Guava's AbstractInvocationHandler:
public class LazyInitWithProxyServiceFactory {
public static Service createService() {
return Reflection.newProxy(Service.class, new LazyInitInvocationHandler());
}
private static class LazyInitInvocationHandler extends AbstractInvocationHandler {
private Service service;
@Override
protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
return method.invoke(service(), args);
}
private Service service() {
if (service == null) {
service = new DefaultService();
}
return service;
}
}
}

The only thing left is thread safety. We can implement it on our own, but the safest way is to use ConcurrentInitializer from commons-lang:
public class LazyInitWithProxyThreadSafeServiceFactory {
public static Service createService() {
return Reflection.newProxy(Service.class, new ThreadSafeLazyInitInvocationHandler());
}
private static class ThreadSafeLazyInitInvocationHandler extends AbstractInvocationHandler {
private static final LazyInitializer<Service> initializer = new LazyInitializer<Service>() {
@Override
protected Service initialize() {
return new DefaultService();
}
};
@Override
protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
return method.invoke(initializer.get(), args);
}
}
}

I chose LazyInitializer that uses double-checked idiom but there are other implementation that might be more useful depending on the context.

Let's run all the implementations and see how they work (notice when the info about initialization is printed on the console) :
public class App {
private static final Logger logger = LoggerFactory.getLogger(App.class.getName());
public static void main(String[] args) {
invoke(new DefaultService());
logger.debug("----------------------\n");
invoke(new LazyInitService());
logger.debug("----------------------\n");
invoke(LazyInitWithProxyServiceFactory.createService());
logger.debug("----------------------\n");
invoke(LazyInitWithProxyThreadSafeServiceFactory.createService());
}
private static void invoke(Service service) {
logger.debug("Invoking method");
logger.debug("Current time:" + service.getTime().toString());
}
}
view raw App.java hosted with ❤ by GitHub

The whole project can be found on github.

Monday, 6 July 2015

Dynamic thread pool

Sometimes we're facing the problem, that we need to solve using dynamic thread pool.

By 'dynamic' I mean, that the pool is shrinking and growing automatically but it does have upper bound (so it can't grow indefinitely).

The API of the pool would be quite simple:

public Future<T> submitTask(Callable<T> task);
public int threadsCount();
We should be able to submit the task to be executed asynchronously and monitor number of threads in the pool.
Having control over maximum number of threads and their longevity would be another feature.

Let's try to document functional requirements in the form of unit tests.
That would be our test setup:
private static final int MAX_THREADS = 2;
private static final int KEEP_ALIVE_TIME_IN_MILLISECONDS = 300;
private static final long DELAY_TIME_IN_MILLISECONDS = 50;
private DynamicThreadPool<String> dynamicThreadPool
= new DynamicThreadPool<>(MAX_THREADS, KEEP_ALIVE_TIME_IN_MILLISECONDS);
private static final Callable<String> TASK_WITH_DELAY = () -> {
simulateDelay(DELAY_TIME_IN_MILLISECONDS);
return randomUUID().toString();
};
private static void simulateDelay(long delayTimeInMilliseconds) throws InterruptedException {
MILLISECONDS.sleep(delayTimeInMilliseconds);
}
view raw testConfig.java hosted with ❤ by GitHub

Initial number of threads should be zero:
@Test
public void shouldHaveZeroThreadsAtTheBeginning() {
int activeThreads = dynamicThreadPool.threadsCount();
assertThat(activeThreads).isEqualTo(0);
}
view raw test1.java hosted with ❤ by GitHub
The execution of the tasks should be done concurrently:
@Test
public void shouldComputeConcurrently() throws Exception {
Future<String> firstFuture = dynamicThreadPool.submitTask(TASK_WITH_DELAY);
Future<String> secondFuture = dynamicThreadPool.submitTask(TASK_WITH_DELAY);
String firstResult = firstFuture.get();
String secondResult = secondFuture.get(DELAY_TIME_IN_MILLISECONDS / 2, MILLISECONDS);
assertThat(firstResult).isNotNull();
assertThat(secondResult).isNotNull();
}
view raw test2.java hosted with ❤ by GitHub

Number of threads should grow automatically:
@Test
public void shouldDynamicallyIncreaseThreadsNumber() {
dynamicThreadPool.submitTask(TASK_WITH_DELAY);
dynamicThreadPool.submitTask(TASK_WITH_DELAY);
await().atMost(KEEP_ALIVE_TIME_IN_MILLISECONDS, SECONDS)
.until(() -> dynamicThreadPool.threadsCount() == MAX_THREADS);
}
view raw test3.java hosted with ❤ by GitHub

The size of thread pool should shrink when there is no active tasks and keep alive time was exceeded:
@Test
public void shouldHaveZeroThreadsWhenNoActiveTasks() {
dynamicThreadPool.submitTask(() -> randomUUID().toString());
dynamicThreadPool.submitTask(() -> randomUUID().toString());
await().atMost(KEEP_ALIVE_TIME_IN_MILLISECONDS * 2, SECONDS)
.until(() -> dynamicThreadPool.threadsCount() == 0);
}
view raw test4.java hosted with ❤ by GitHub

If the number of submitted task is greater than maximum threads number, we should block:
@Test
public void shouldBlockAddingTasksWhenMaxThreadsCountExceeded() throws Exception {
dynamicThreadPool.submitTask(TASK_WITH_DELAY);
dynamicThreadPool.submitTask(TASK_WITH_DELAY);
long startTime = System.currentTimeMillis();
Future<String> lastTaskFuture = dynamicThreadPool.submitTask(TASK_WITH_DELAY);
String result = lastTaskFuture.get();
long timeElapsed = System.currentTimeMillis() - startTime;
assertThat(timeElapsed).isGreaterThan(DELAY_TIME_IN_MILLISECONDS);
assertThat(result).isNotNull();
}
view raw test5.java hosted with ❤ by GitHub

Actual implementation is quite simple, we need to use ThreadPoolExecutor:
public class DynamicThreadPool<T> {
private final ThreadPoolExecutor executor;
public DynamicThreadPool(int maxThreads, int keepAliveTimeInMillis) {
executor = new ThreadPoolExecutor(0, maxThreads, keepAliveTimeInMillis, MILLISECONDS, new SynchronousQueue<>());
executor.setRejectedExecutionHandler((r, theExecutor) -> {
try {
theExecutor.getQueue().put(r);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
}
public Future<T> submitTask(Callable<T> task) {
return executor.submit(task);
}
public int threadsCount() {
return executor.getPoolSize();
}
}

There are two important things here.
First, we are required to create our own RejectedExecutionHandler to support blocking when the maximum number is exceed.
Secondly, we need to use the blocking queue of size zero (in our case I chose SynchronousQueue) - the queue needs to be filled up before we can create more than initial number of threads (in our case zero).
The whole project can be found at github.

Sunday, 16 March 2014

Several approaches to processing events (part 2 - asynchronous processing with blocking queues and thread pool)

In the previous post we were solving problem with computing prime factors for numbers read from a large file.
We were taking a synchronous approach, which was quite easy to implement but very slow.
This time we'll try to go the asynchronous way using blocking queues and thread pool.

First of all, let's take a look at our Reader class:

public class Reader implements Runnable {
private final BlockingQueue<String> queue;
private final CountDownLatch latch;
public Reader(BlockingQueue<String> queue, CountDownLatch latch) {
this.queue = queue;
this.latch = latch;
}
void read(String fileName) throws Exception {
File file = new File(fileName);
BufferedReader br = new BufferedReader(new FileReader(file));
String line;
while ((line = br.readLine()) != null) {
queue.put(line);
}
br.close();
latch.countDown();
}
@Override
public void run() {
try {
read("numbers.txt");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
view raw Reader.java hosted with ❤ by GitHub
The reader reads each line and puts it into the blocking queue. Once it's finished, it informs the latch.

The LineProcessor class is more complicated:

public class LineProcessor implements Runnable {
private final PrimeFactorCounter primeFactorCounter = new PrimeFactorCounter();
private final BlockingQueue<String> inputQueue;
private final BlockingQueue<String> successQueue;
private final BlockingQueue<String> exceptionsQueue;
private final CountDownLatch inputLatch;
private final CountDownLatch outputLatch;
public LineProcessor(BlockingQueue<String> inputQueue, BlockingQueue<String> successQueue, BlockingQueue<String> exceptionsQueue, CountDownLatch inputLatch, CountDownLatch outputLatch) {
this.inputQueue = inputQueue;
this.successQueue = successQueue;
this.exceptionsQueue = exceptionsQueue;
this.inputLatch = inputLatch;
this.outputLatch = outputLatch;
}
@Override
public void run() {
process();
}
void process() {
while (true) {
String number = null;
try {
long latchCount = inputLatch.getCount();
int queueSize = inputQueue.size();
if (latchCount == 0 && queueSize == 0) {
outputLatch.countDown();
return;
}
number = inputQueue.poll(1, TimeUnit.SECONDS);
long value = Long.valueOf(number);
if (value > 0) {
long factor = primeFactorCounter.primeFactors(value);
successQueue.put(value + "," + factor + "\n");
}
} catch (Exception e) {
exceptionsQueue.add(number + ", " + e + "\n");
}
}
}
}
Let's take a closer look at the collaborators.

inputQueue is the queue that Reader is writing to.
successQueue and exceptionsQueue are the queues that will be populated based on the line processing result.
inputLatch is the latch modified by the Reader.
outputLatch will be informed when there are no more lines to be processed.

The LineProcessor checks if the Reader already finished by checking inputLatch and inputQueue.
If it didn't, it takes a line from the inputQueue and and populates appropriate queue with the result.
If it did finish, it informs outputLatch and terminates processing.

The Writer class is quite simple:

public class Writer implements Runnable {
private final String fileName;
private final BlockingQueue<String> queue;
private final CountDownLatch latch;
public Writer(BlockingQueue<String> queue, CountDownLatch latch, String fileName) {
this.queue = queue;
this.fileName = fileName;
this.latch = latch;
}
@Override
public void run() {
write();
}
void write() {
try (BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(fileName, true))) {
while (true) {
long latchCount = latch.getCount();
int queueSize = queue.size();
if (latchCount == 0 && queueSize == 0) {
return;
}
String line = queue.poll(1, TimeUnit.SECONDS);
if (line != null) {
bufferedWriter.write(line);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
view raw Writer.java hosted with ❤ by GitHub
It takes the messages from the queue and writes it to a file.
It terminates, if the queue is empty and the latch was informed that there is no more messages.

The only thing left is the Bootstrap class that binds it all together:

public class Bootstrap {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final int THREAD_POOL_SIZE = 4;
static final String PRIME_FACTOR_FILE = "primeFactorCounter-threadPool.txt";
static final String EXCEPTIONS_FILE = "exceptions-threadPool.txt";
static final String SUMMARY_FILE = "summary-threadPool.txt";
private BlockingQueue<String> inputQueue = new ArrayBlockingQueue<>(10000);
private BlockingQueue<String> successQueue = new ArrayBlockingQueue<>(10000);
private BlockingQueue<String> exceptionsQueue = new ArrayBlockingQueue<>(10000);
private CountDownLatch inputLatch = new CountDownLatch(1);
private CountDownLatch outputLatch = new CountDownLatch(THREAD_POOL_SIZE);
public void start() throws Exception {
long startTime = System.currentTimeMillis();
Reader reader = new Reader(inputQueue, inputLatch);
Writer successWriter = new Writer(successQueue, outputLatch, PRIME_FACTOR_FILE);
Writer exceptionsWriter = new Writer(exceptionsQueue, outputLatch, EXCEPTIONS_FILE);
Collection<LineProcessor> lineProcessors = new ArrayList<>();
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
lineProcessors.add(new LineProcessor(inputQueue, successQueue, exceptionsQueue, inputLatch, outputLatch));
}
ExecutorService lineProcessorExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
for (LineProcessor lineProcessor : lineProcessors) {
lineProcessorExecutor.execute(lineProcessor);
}
new Thread(reader).start();
Thread successWriterThread = new Thread(successWriter);
successWriterThread.start();
Thread exceptionsWriterThread = new Thread(exceptionsWriter);
exceptionsWriterThread.start();
lineProcessorExecutor.shutdown();
successWriterThread.join();
exceptionsWriterThread.join();
String summary = String.format("thread pool processing, time taken %d ms", System.currentTimeMillis() - startTime);
logger.debug(summary);
FileUtils.writeStringToFile(new File(SUMMARY_FILE), summary);
}
}
view raw Bootstrap.java hosted with ❤ by GitHub
The latches and queues are initialized and then passed to the processing objects via constructor.
Line processors are wrapped in a thread pool.
Reader and two writers are started as threads. We then wait until all writers are finished and the summary is printed.

We can check the behavior with the following test:

public class ThreadPoolSystemTest {
@Before
public void setUp() {
new File(Bootstrap.PRIME_FACTOR_FILE).delete();
new File(Bootstrap.EXCEPTIONS_FILE).delete();
new File(Bootstrap.SUMMARY_FILE).delete();
}
@Test
public void shouldProcessFile() throws Exception {
// when
new Bootstrap().start();
// then
assertResultFile();
assertExceptionFile();
assertSummaryFile();
}
private void assertResultFile() throws IOException {
List<String> lines = FileUtils.readLines(new File(Bootstrap.PRIME_FACTOR_FILE));
assertThat(lines.size(), is(9900));
assertThat(lines.contains("5556634922133,5"), is(true));
}
private void assertExceptionFile() throws IOException {
File exceptionsFile = new File(Bootstrap.EXCEPTIONS_FILE);
assertThat(exceptionsFile.exists(), is(true));
String exceptionsFileContent = FileUtils.readFileToString(exceptionsFile);
assertThat(exceptionsFileContent, containsString("badData1"));
assertThat(exceptionsFileContent, containsString("badData2"));
}
private void assertSummaryFile() {
assertThat(new File(Bootstrap.SUMMARY_FILE).exists(), is(true));
}
}
On a machine with Core2 Duo 2.53GHz it takes ~43 seconds to process 10 000 numbers.
The whole project can be found at github.

Wednesday, 26 February 2014

Scheduling tasks with Spring (updated)

Spring provides an easy way to schedule tasks.

Let's say that we would like to have an information about current time printed on a console periodically.

The class that prints the time may look like this:

public class TimePrinter {
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("HH:mm:ss");
private static final Logger LOGGER = LoggerFactory.getLogger(TimePrinter.class.getName());
public void reportCurrentTime() {
LOGGER.info("The time is now " + DATE_FORMAT.format(new Date()));
}
}

We need to define a class that will encapsulate the printing task:

public class ScheduledTask {
static final String THREAD_NAME = ScheduledTask.class.getName() + "!";
@Autowired private TimePrinter timePrinter;
@Scheduled(fixedRate = 5000)
public void reportCurrentTime() {
Thread.currentThread().setName(THREAD_NAME);
timePrinter.printCurrentTime();
}
}
The @Scheduled annotation is the key here: the method reportCurrentTime is annotated by it, therefore it will be invoked every 5 seconds.
You can also specify cron expression. You can use fixedRateString parameter if you want to read it from properties file.
Please note setting of the thread name - it will be needed for the test.
Adding production code only for tests is generally not a good practice, but in this case it can also be used for monitoring purposes.

The spring configuration looks as following:

@Configuration
@EnableScheduling
@ComponentScan("pl.mjedynak")
public class AppConfig {
@Bean
public ScheduledTask scheduledTask() {
return new ScheduledTask();
}
@Bean
public TimePrinter timePrinter() {
return new TimePrinter();
}
}
view raw AppConfig.java hosted with ❤ by GitHub
To run it we need to create an invoker class:

public class App {
public static void main(String[] args) {
new AnnotationConfigApplicationContext(AppConfig.class);
}
}
view raw App.java hosted with ❤ by GitHub
Unfortunately there is no trivial way to test it automatically. We can do it the following way.
Let's create a test class:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = AppConfig.class)
public class ScheduledTaskIntegrationTest {
@Test
public void shouldInvokeTask() {
Callable<Boolean> threadWithNameExists =
() -> getAllStackTraces().keySet().stream().
anyMatch(t -> t.getName().equals(ScheduledTask.THREAD_NAME));
await().until(threadWithNameExists, is(true));
}
}

When we run the test, the spring context will start and the task will be invoked by scheduler.
We check if the thread with our name exists.
We do it for some time to avoid race condition - it may happen that verification method will be invoked before thread starts.
The whole project alongside with dependencies can be found on github.

Monday, 13 January 2014

Several approaches to processing events (part 1 - synchronous processing)

Let's consider the following problem that we have to solve:

We have a file with numbers (each line has one number).
Our goal is to process every number and count its prime factors.
We need to write the result along with the processed number in a separate file.
In case of any exception during processing we need to write the exception to a file together with the number that was processed by the time it occurred.
Apart from that we also need to write the summary of the time that we spent on processing.

The class that counts prime factors looks as following:

public class PrimeFactorCounter {
public long primeFactors(long number) {
long n = number > 0 ? number : -number;
long counter = 0;
for (int i = 2; i <= n / i; i++) {
while (n % i == 0) {
counter++;
n /= i;
}
}
if (n > 1) {
counter++;
}
return counter;
}
}
The simplest but not the optimal way to solve this would be to process each line synchronously. It could look like that:

public class Bootstrap {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
static final String PRIME_FACTOR_FILE = "primeFactorCounter-synchronous.txt";
static final String EXCEPTIONS_FILE = "exceptions-synchronous.txt";
static final String SUMMARY_FILE = "summary-synchronous.txt";
private final PrimeFactorCounter primeFactorCounter = new PrimeFactorCounter();
private final Reader reader = new Reader();
private final Writer writer = new Writer();
public void start() throws IOException {
long startTime = System.currentTimeMillis();
Iterable<String> numbers = reader.getNumbers("numbers.txt");
StringBuilder valuesWithPrimeFactorCounter = new StringBuilder();
StringBuilder exceptions = new StringBuilder();
for (String number : numbers) {
try {
long value = Long.valueOf(number);
if (value > 0) {
long factor = primeFactorCounter.primeFactors(value);
valuesWithPrimeFactorCounter.append(value).append(",").append(factor).append("\n");
}
} catch (Exception e) {
exceptions.append(number).append(", ").append(e).append("\n");
}
}
writeResults(startTime, valuesWithPrimeFactorCounter, exceptions);
}
private void writeResults(long startTime, StringBuilder valuesWithPrimeFactorCounter, StringBuilder exceptions) throws IOException {
writer.write(PRIME_FACTOR_FILE, valuesWithPrimeFactorCounter.toString());
writer.write(EXCEPTIONS_FILE, exceptions.toString());
String summary = "synchronous processing, time taken " + (System.currentTimeMillis() - startTime) + " ms";
logger.debug(summary);
writer.write(SUMMARY_FILE, summary);
}
}
We're reading lines from numbers.txt file and then we're processing each line in a for loop. At the end we're writing everything to three files.

The Reader and Writer classes are quite simple:

public class Reader {
public Iterable<String> getNumbers(String fileName) throws IOException {
String fileContent = FileUtils.readFileToString(new File(fileName));
return Splitter.on("\n").omitEmptyStrings().split(fileContent);
}
}
public class Writer {
public void write(String fileName, String content) throws IOException {
FileUtils.writeStringToFile(new File(fileName), content);
}
}
They're using Guava and Apache Commons dependencies that have following declaration:

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
view raw dep.xml hosted with ❤ by GitHub
We can check the results in a following test:

public class SynchronousProcessingSystemTest {
@Before
public void setUp() {
new File(Bootstrap.PRIME_FACTOR_FILE).delete();
new File(Bootstrap.EXCEPTIONS_FILE).delete();
new File(Bootstrap.SUMMARY_FILE).delete();
}
@Test
public void shouldProcessFile() throws Exception {
// when
new Bootstrap().start();
// then
assertResultFile();
assertExceptionsFile();
assertSummaryFile();
}
private void assertResultFile() throws IOException {
List<String> lines = FileUtils.readLines(new File(Bootstrap.PRIME_FACTOR_FILE));
assertThat(lines.size(), is(9900));
assertThat(lines.contains("5556634922133,5"), is(true));
}
private void assertExceptionsFile() throws IOException {
File exceptionsFile = new File(Bootstrap.EXCEPTIONS_FILE);
assertThat(exceptionsFile.exists(), is(true));
String exceptionsFileContent = FileUtils.readFileToString(exceptionsFile);
assertThat(exceptionsFileContent, containsString("badData1"));
assertThat(exceptionsFileContent, containsString("badData2"));
}
private void assertSummaryFile() {
assertThat(new File(Bootstrap.SUMMARY_FILE).exists(), is(true));
}
}
On a machine with Core2 Duo 2.53GHz it takes ~73 seconds to process 10 000 numbers.

The whole project can be found at github.

In next posts we'll take a look at other approaches to solve this problem.