Skip to content

Commit 0bb6925

Browse files
Expose Redpanda schema registry (testcontainers#5994)
The schema registry port `8081` was added to the list of exposed ports and a supporting method `getSchemaRegistryAddress` was added to easily use for configuring `schema.registry.url` serializer configuration. A minor change on the internal advertised address was made as the `kafka` alias was not available within the container and the panda proxy was getting lost with the following message: ``` INFO 2022-10-17 19:08:37,164 [shard 0] kafka/client - broker.cc:41 - connected to broker:-1 - 0.0.0.0:29092 WARN 2022-10-17 19:08:37,176 [shard 0] kafka/client - broker.cc:52 - std::system_error: kafka: Not found ERROR 2022-10-17 19:08:37,177 [shard 0] pandaproxy - service.cc:137 - Schema registry failed to initialize internal topic: kafka::client::broker_error ({ node: -1 }, { error_code: broker_not_available [8] }) ```
1 parent b7925be commit 0bb6925

File tree

4 files changed

+48
-2
lines changed

4 files changed

+48
-2
lines changed

docs/modules/redpanda.md

+6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ Now your tests or any other process running on your machine can get access to ru
1919
[Bootstrap Servers](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getBootstrapServers
2020
<!--/codeinclude-->
2121

22+
Redpanda also provides a schema registry implementation. Like the Redpanda broker, you can access by using the following schema registry location:
23+
24+
<!--codeinclude-->
25+
[Schema Registry](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getSchemaRegistryAddress
26+
<!--/codeinclude-->
27+
2228
## Adding this module to your project dependencies
2329

2430
Add the following dependency to your `pom.xml`/`build.gradle` file:

modules/redpanda/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ dependencies {
55

66
testImplementation 'org.apache.kafka:kafka-clients:3.3.0'
77
testImplementation 'org.assertj:assertj-core:3.23.1'
8+
testImplementation 'io.rest-assured:rest-assured:5.2.0'
89
}

modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {
1818

1919
private static final int REDPANDA_PORT = 9092;
2020

21+
private static final int SCHEMA_REGISTRY_PORT = 8081;
22+
2123
private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
2224

2325
public RedpandaContainer(String image) {
@@ -33,7 +35,7 @@ public RedpandaContainer(DockerImageName imageName) {
3335
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
3436
}
3537

36-
withExposedPorts(REDPANDA_PORT);
38+
withExposedPorts(REDPANDA_PORT, SCHEMA_REGISTRY_PORT);
3739
withCreateContainerCmdModifier(cmd -> {
3840
cmd.withEntrypoint("sh");
3941
});
@@ -49,12 +51,17 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
4951

5052
command += "/usr/bin/rpk redpanda start --mode dev-container ";
5153
command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
52-
command += "--advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);
54+
command +=
55+
"--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);
5356

5457
copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
5558
}
5659

5760
public String getBootstrapServers() {
5861
return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(REDPANDA_PORT));
5962
}
63+
64+
public String getSchemaRegistryAddress() {
65+
return String.format("http://%s:%s", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT));
66+
}
6067
}

modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java

+32
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.testcontainers.redpanda;
22

33
import com.google.common.collect.ImmutableMap;
4+
import io.restassured.RestAssured;
5+
import io.restassured.response.Response;
46
import org.apache.kafka.clients.admin.AdminClient;
57
import org.apache.kafka.clients.admin.AdminClientConfig;
68
import org.apache.kafka.clients.admin.NewTopic;
@@ -64,6 +66,36 @@ public void testNotCompatibleVersion() {
6466
.hasMessageContaining("Redpanda version must be >= v22.2.1");
6567
}
6668

69+
@Test
70+
public void testSchemaRegistry() {
71+
try (RedpandaContainer container = new RedpandaContainer(REDPANDA_DOCKER_IMAGE)) {
72+
container.start();
73+
74+
String subjectsEndpoint = String.format(
75+
"%s/subjects",
76+
// getSchemaRegistryAddress {
77+
container.getSchemaRegistryAddress()
78+
// }
79+
);
80+
81+
String subjectName = String.format("test-%s-value", UUID.randomUUID());
82+
83+
Response createSubject = RestAssured
84+
.given()
85+
.contentType("application/vnd.schemaregistry.v1+json")
86+
.pathParam("subject", subjectName)
87+
.body("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}")
88+
.when()
89+
.post(subjectsEndpoint + "/{subject}/versions")
90+
.thenReturn();
91+
assertThat(createSubject.getStatusCode()).isEqualTo(200);
92+
93+
Response allSubjects = RestAssured.given().when().get(subjectsEndpoint).thenReturn();
94+
assertThat(allSubjects.getStatusCode()).isEqualTo(200);
95+
assertThat(allSubjects.jsonPath().getList("$")).contains(subjectName);
96+
}
97+
}
98+
6799
private void testKafkaFunctionality(String bootstrapServers) throws Exception {
68100
testKafkaFunctionality(bootstrapServers, 1, 1);
69101
}

0 commit comments

Comments
 (0)