Skip to content

Commit

Permalink
Add redpandadata/redpanda as a compatible image (#7898)
Browse files Browse the repository at this point in the history
Test has been added for all compatible images.
  • Loading branch information
eddumelendez committed Nov 30, 2023
1 parent 7a1bcd2 commit 892b06b
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 80 deletions.
Expand Up @@ -30,7 +30,7 @@
/**
* Testcontainers implementation for Redpanda.
* <p>
* Supported images: {@code docker.redpanda.com/redpandadata/redpanda}, {@code docker.redpanda.com/vectorized/redpanda}
* Supported images: {@code redpandadata/redpanda}, {@code docker.redpanda.com/redpandadata/redpanda}
* <p>
* Exposed ports:
* <ul>
Expand All @@ -43,11 +43,15 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {

private static final String REDPANDA_FULL_IMAGE_NAME = "docker.redpanda.com/redpandadata/redpanda";

private static final String IMAGE_NAME = "redpandadata/redpanda";

@Deprecated
private static final String REDPANDA_OLD_FULL_IMAGE_NAME = "docker.redpanda.com/vectorized/redpanda";

private static final DockerImageName REDPANDA_IMAGE = DockerImageName.parse(REDPANDA_FULL_IMAGE_NAME);

private static final DockerImageName IMAGE = DockerImageName.parse(IMAGE_NAME);

@Deprecated
private static final DockerImageName REDPANDA_OLD_IMAGE = DockerImageName.parse(REDPANDA_OLD_FULL_IMAGE_NAME);

Expand Down Expand Up @@ -75,10 +79,14 @@ public RedpandaContainer(String image) {

public RedpandaContainer(DockerImageName imageName) {
super(imageName);
imageName.assertCompatibleWith(REDPANDA_OLD_IMAGE, REDPANDA_IMAGE);
imageName.assertCompatibleWith(REDPANDA_OLD_IMAGE, REDPANDA_IMAGE, IMAGE);

boolean isLessThanBaseVersion = new ComparableVersion(imageName.getVersionPart()).isLessThan("v22.2.1");
if (REDPANDA_FULL_IMAGE_NAME.equals(imageName.getUnversionedPart()) && isLessThanBaseVersion) {
boolean isPublicCompatibleImage =
REDPANDA_FULL_IMAGE_NAME.equals(imageName.getUnversionedPart()) ||
IMAGE_NAME.equals(imageName.getUnversionedPart()) ||
REDPANDA_OLD_FULL_IMAGE_NAME.equals(imageName.getUnversionedPart());
if (isPublicCompatibleImage && isLessThanBaseVersion) {
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
}

Expand Down
@@ -0,0 +1,92 @@
package org.testcontainers.redpanda;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.rnorth.ducttape.unreliables.Unreliables;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

public class AbstractRedpanda {

protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}

protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
try (
AdminClient adminClient = AdminClient.create(
ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
);
KafkaProducer<String, String> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
),
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"
),
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "messages-" + UUID.randomUUID();

Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
}

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));

return true;
}
);

consumer.unsubscribe();
}
}
}
@@ -0,0 +1,28 @@
package org.testcontainers.redpanda;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class CompatibleImageTest extends AbstractRedpanda {

private final String image;

public CompatibleImageTest(String image) {
this.image = image;
}

@Parameterized.Parameters(name = "{0}")
public static String[] image() {
return new String[] { "docker.redpanda.com/vectorized/redpanda:v22.2.1", "redpandadata/redpanda:v22.2.1" };
}

@Test
public void shouldProduceAndConsumeMessage() throws Exception {
try (RedpandaContainer container = new RedpandaContainer(this.image)) {
container.start();
testKafkaFunctionality(container.getBootstrapServers());
}
}
}
Expand Up @@ -8,27 +8,16 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -39,9 +28,8 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.tuple;

public class RedpandaContainerTest {
public class RedpandaContainerTest extends AbstractRedpanda {

private static final String REDPANDA_IMAGE = "docker.redpanda.com/redpandadata/redpanda:v22.2.1";

Expand Down Expand Up @@ -78,6 +66,20 @@ public void testNotCompatibleVersion() {
.hasMessageContaining("Redpanda version must be >= v22.2.1");
}

@Test
public void vectorizedRedpandaImageVersion2221ShouldNotBeCompatible() {
assertThatThrownBy(() -> new RedpandaContainer("docker.redpanda.com/vectorized/redpanda:v21.11.19"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Redpanda version must be >= v22.2.1");
}

@Test
public void redpandadataRedpandaImageVersion2221ShouldNotBeCompatible() {
assertThatThrownBy(() -> new RedpandaContainer("redpandadata/redpanda:v21.11.19"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Redpanda version must be >= v22.2.1");
}

@Test
public void testSchemaRegistry() {
try (RedpandaContainer container = new RedpandaContainer(REDPANDA_DOCKER_IMAGE)) {
Expand Down Expand Up @@ -359,70 +361,6 @@ public void testRestProxy() {
}
}

private void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}

private void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
try (
AdminClient adminClient = AdminClient.create(
ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
);
KafkaProducer<String, String> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
),
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"
),
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "messages-" + UUID.randomUUID();

Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
}

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));

return true;
}
);

consumer.unsubscribe();
}
}

private AdminClient getAdminClient(RedpandaContainer redpanda) {
String bootstrapServer = String.format("%s:%s", redpanda.getHost(), redpanda.getMappedPort(9092));
// createAdminClient {
Expand Down

0 comments on commit 892b06b

Please sign in to comment.