Saturday, 12 February 2022

How to purge Kafka topic programatically using AdminClient

The need to purge kafka topic (without deleting the topic itself) comes pretty frequently (usually in test environments).

There are possibilities to do it by temporarily changing retention.ms on the topic with bash commands (described here) or with UI like Confluent Control Center (if you're using Confluent).

In this post, we'll check how to do it programatically using AdminClient. (Sample application will be written in Kotlin with Spring Boot and Spring Kafka)

Let's start with a look at the dependencies in build.gradle.kts (project skeleton was generated by Spring Initializr):

We need to define a bean with AdminClient:
For the sake of simplicity in this example, we're just setting property of kafka broker address (kafka.bootstrapAddress) taken from application.properties file.


Once we have AdminClient configured, we can use it to purge the topic (notice that AdminClient is injected):
We need to describe the topic we want to purge to obtain number of partitions.
Once we have that, we create a map - for each partition (as key) we define that we want to delete all records (RecordsToDelete.beforeOffset(-1)).


Let's check how we could write integration test for such functionality: There's a lot going on here, let's break it into smaller chunks.
We're starting embedded kafka broker on port 9092 with @EmbeddedKafka(ports = [9092]) annotation. (Port corresponds with property in application.properties)
At the beginning of the test we add a topic with 2 partitions (line 12).
Then we send 1 message to each partition (lines 16,17).
After that, we check in each partition that earliest offset is set to 0 (line 21) and latest offset is set to 1 (line 22). That's it for the testcase setup.
Later we invoke the actual purging and check offsets in both partitions once again (earliest and latest should be set to 1). That proves that all the records were removed.

The whole project can be found on github.

No comments:

Post a Comment