There are possibilities to do it by temporarily changing retention.ms on the topic with bash commands (described here) or with UI like Confluent Control Center (if you're using Confluent).
In this post, we'll check how to do it programatically using AdminClient. (Sample application will be written in Kotlin with Spring Boot and Spring Kafka)
Let's start with a look at the dependencies in build.gradle.kts (project skeleton was generated by Spring Initializr):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
implementation("org.springframework.boot:spring-boot-starter") | |
implementation("org.jetbrains.kotlin:kotlin-reflect") | |
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") | |
implementation("org.springframework.kafka:spring-kafka") | |
testImplementation("org.springframework.boot:spring-boot-starter-test") | |
testImplementation("org.springframework.kafka:spring-kafka-test") | |
testImplementation("org.awaitility:awaitility-kotlin:4.1.1") |
We need to define a bean with AdminClient:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Configuration | |
class AppConfig { | |
@Value(value = "\${kafka.bootstrapAddress}") | |
lateinit var bootstrapAddress: String | |
@Bean | |
fun adminClient(): AdminClient { | |
return AdminClient.create(mapOf( | |
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapAddress) | |
) | |
} | |
} |
Once we have AdminClient configured, we can use it to purge the topic (notice that AdminClient is injected):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
class TopicPurger(private val adminClient: AdminClient) { | |
fun purge(topicName: String) { | |
val topicDescription = adminClient.describeTopics(listOf(topicName)).all().get()[topicName] | |
val partitionSize = topicDescription?.partitions()?.size | |
val recordsToDelete = (0..partitionSize!!).associate { partitionIndex -> | |
TopicPartition(topicName, partitionIndex) to RecordsToDelete.beforeOffset(-1) | |
} | |
adminClient.deleteRecords(recordsToDelete) | |
} | |
} |
Once we have that, we create a map - for each partition (as key) we define that we want to delete all records (RecordsToDelete.beforeOffset(-1)).
Let's check how we could write integration test for such functionality:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@SpringBootTest | |
@EmbeddedKafka(ports = [9092]) | |
class KafkaExampleApplicationTests( | |
@Autowired val embeddedKafkaBroker: EmbeddedKafkaBroker, | |
@Autowired val adminClient: AdminClient, | |
@Autowired val topicPurger: TopicPurger | |
) { | |
private val topicName = "test.topic" | |
@Test | |
fun purgesTopic() { | |
embeddedKafkaBroker.addTopics(NewTopic(topicName, 2, 1)) | |
val producer = DefaultKafkaProducerFactory<Int, String>(producerProps(embeddedKafkaBroker)).createProducer() | |
val partition1 = 0 | |
val partition2 = 1 | |
producer.send(ProducerRecord(topicName, partition1, 0, "value1")) | |
producer.send(ProducerRecord(topicName, partition2, 1, "value2")) | |
val topicPartition1 = TopicPartition(topicName, partition1) | |
val topicPartition2 = TopicPartition(topicName, partition2) | |
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.earliest(), 0) | |
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.latest(), 1) | |
topicPurger.purge(topicName) | |
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.earliest(), 1) | |
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.latest(), 1) | |
} | |
private fun checkOffsets(topicPartition1: TopicPartition, | |
topicPartition2: TopicPartition, | |
offsetSpec: OffsetSpec, | |
expectedOffset: Long) { | |
await().until { | |
val offsets = adminClient.listOffsets( | |
mapOf( | |
topicPartition1 to offsetSpec, | |
topicPartition2 to offsetSpec, | |
)).all().get() | |
offsets[topicPartition1]?.offset() == expectedOffset | |
&& offsets[topicPartition2]?.offset() == expectedOffset | |
} | |
} | |
} |
We're starting embedded kafka broker on port 9092 with @EmbeddedKafka(ports = [9092]) annotation. (Port corresponds with property in application.properties)
At the beginning of the test we add a topic with 2 partitions (line 12).
Then we send 1 message to each partition (lines 16,17).
After that, we check in each partition that earliest offset is set to 0 (line 21) and latest offset is set to 1 (line 22). That's it for the testcase setup.
Later we invoke the actual purging and check offsets in both partitions once again (earliest and latest should be set to 1). That proves that all the records were removed.
The whole project can be found on github.