Kafka Module
Testcontainers can be used to automatically instantiate and manage Apache Kafka containers.
Currently, two different Kafka images are supported:
org.testcontainers.kafka.ConfluentKafkaContainer
supports confluentinc/cp-kafkaorg.testcontainers.kafka.KafkaContainer
supports apache/kafka and apache/kafka-native
Note
org.testcontainers.containers.KafkaContainer
is deprecated.
Please use org.testcontainers.kafka.ConfluentKafkaContainer
or org.testcontainers.kafka.KafkaContainer
instead, depending on the used image.
Benefits
- Running a single node Kafka installation with just one line of code
- No need to manage external Zookeeper installation, required by Kafka.
Example
Using org.testcontainers.kafka.KafkaContainer
Create a KafkaContainer
to use it in your tests:
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
Now your tests or any other process running on your machine can get access to running Kafka broker by using the following bootstrap server location:
kafka.getBootstrapServers()
Using org.testcontainers.kafka.ConfluentKafkaContainer
Note
Compatible with confluentinc/cp-kafka
images version 7.4.0
and later.
Create a ConfluentKafkaContainer
to use it in your tests:
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
Options
Using Kraft mode
Note
Only available for org.testcontainers.containers.KafkaContainer
KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft()
See the versions interoperability matrix for more details.
Register listeners
There are scenarios where additional listeners are needed because the consumer/producer can be in another container in the same network or a different process where the port to connect differs from the default exposed port. E.g Toxiproxy.
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
.withListener("kafka:19092")
.withNetwork(network);
Container defined in the same network:
GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.9.0")
.withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
})
.withCopyToContainer(Transferable.of("Message produced by kcat"), "/data/msgs.txt")
.withNetwork(network)
.withCommand("-c", "tail -f /dev/null")
Client using the new registered listener:
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
String stdout = kcat
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
.getStdout();
Adding this module to your project dependencies
Add the following dependency to your pom.xml
/build.gradle
file:
testImplementation "org.testcontainers:kafka:1.21.3"
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.21.3</version>
<scope>test</scope>
</dependency>