Kafka Module
Testcontainers can be used to automatically instantiate and manage Apache Kafka containers.
Currently, two different Kafka images are supported:
org.testcontainers.kafka.ConfluentKafkaContainer
supports confluentinc/cp-kafkaorg.testcontainers.kafka.KafkaContainer
supports apache/kafka and apache/kafka-native
Note
org.testcontainers.containers.KafkaContainer
is deprecated.
Please use org.testcontainers.kafka.ConfluentKafkaContainer
or org.testcontainers.kafka.KafkaContainer
instead, depending on the used image.
Benefits
- Running a single node Kafka installation with just one line of code
- No need to manage external Zookeeper installation, required by Kafka. But see below
Example
Using org.testcontainers.containers.KafkaContainer
Create a KafkaContainer
to use it in your tests:
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
The correspondence between Confluent Platform versions and Kafka versions can be seen in Confluent documentation
Now your tests or any other process running on your machine can get access to running Kafka broker by using the following bootstrap server location:
kafka.getBootstrapServers()
Using org.testcontainers.kafka.ConfluentKafkaContainer
Note
Compatible with confluentinc/cp-kafka
images version 7.4.0
and later.
Create a ConfluentKafkaContainer
to use it in your tests:
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
Using org.testcontainers.kafka.KafkaContainer
Create a KafkaContainer
to use it in your tests:
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
Options
Using external Zookeeper
Note
Only available for org.testcontainers.containers.KafkaContainer
If for some reason you want to use an externally running Zookeeper, then just pass its location during construction:
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
.withNetwork(network)
.withExternalZookeeper("zookeeper:2181");
Using Kraft mode
Note
Only available for org.testcontainers.containers.KafkaContainer
KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft()
See the versions interoperability matrix for more details.
Register listeners
There are scenarios where additional listeners are needed because the consumer/producer can be in another container in the same network or a different process where the port to connect differs from the default exposed port. E.g Toxiproxy.
KafkaContainer kafka = new KafkaContainer(KAFKA_KRAFT_TEST_IMAGE)
.withListener(() -> "kafka:19092")
.withNetwork(network);
Container defined in the same network:
GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.4.1")
.withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
})
.withCopyToContainer(Transferable.of("Message produced by kcat"), "/data/msgs.txt")
.withNetwork(network)
.withCommand("-c", "tail -f /dev/null")
Client using the new registered listener:
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
String stdout = kcat
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
.getStdout();
Adding this module to your project dependencies
Add the following dependency to your pom.xml
/build.gradle
file:
testImplementation "org.testcontainers:kafka:1.20.2"
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.20.2</version>
<scope>test</scope>
</dependency>