Skip to content

Commit

Permalink
[strimzi#434] feat: added create topic endpoint - path changed
Browse files Browse the repository at this point in the history
Signed-off-by: ilkerkocatepe <[email protected]>
  • Loading branch information
ilkerkocatepe committed Sep 28, 2024
1 parent dbbe858 commit 8df435a
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 9 deletions.
6 changes: 3 additions & 3 deletions src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public CompletionStage<Set<String>> listTopics() {
* @param topicName topic name to create
* @return a CompletionStage Void
*/
public CompletionStage<Void> createTopic(String topicName) {
public CompletionStage<Void> createTopic(String topicName, int partitions, short replicationFactor) {
LOGGER.trace("Create topic thread {}", Thread.currentThread());
LOGGER.info("Create topic {}", topicName);
LOGGER.info("Create topic {}, partitions {}, replicationFactor {}", topicName, partitions, replicationFactor);
CompletableFuture<Void> promise = new CompletableFuture<>();
this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 2, (short) 1)))
this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, partitions, replicationFactor)))
.all()
.whenComplete((topic, exception) -> {
LOGGER.trace("Create topic callback thread {}", Thread.currentThread());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,10 @@ public void doGetTopic(RoutingContext routingContext) {
*/
public void doCreateTopic(RoutingContext routingContext) {
String topicName = routingContext.pathParam("topicname");
int partitions = Integer.parseInt(routingContext.queryParams().get("partitions"));
short replicationFactor = Short.parseShort(routingContext.queryParams().get("replication_factor"));

this.kafkaBridgeAdmin.createTopic(topicName)
this.kafkaBridgeAdmin.createTopic(topicName, partitions, replicationFactor)
.whenComplete(((topic, exception) -> {
LOGGER.trace("Create topic handler thread {}", Thread.currentThread());
if (exception == null) {
Expand Down
20 changes: 19 additions & 1 deletion src/main/resources/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@
}
]
},
"/create-topic/{topicname}": {
"/admin/topics/{topicname}": {
"post": {
"tags": [
"Topics"
Expand Down Expand Up @@ -800,6 +800,24 @@
"schema": {
"type": "string"
}
},
{
"name": "partitions",
"in": "query",
"description": "Number of partitions for the topic.",
"required": true,
"schema": {
"type": "integer"
}
},
{
"name": "replication_factor",
"in": "query",
"description": "Replication factor for the topic.",
"required": true,
"schema": {
"type": "integer"
}
}
]
},
Expand Down
20 changes: 19 additions & 1 deletion src/main/resources/openapiv2.json
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@
}
]
},
"/create-topic/{topicname}": {
"/admin/topics/{topicname}": {
"post": {
"tags": [
"Topics"
Expand Down Expand Up @@ -725,6 +725,24 @@
"schema": {
"type": "string"
}
},
{
"name": "partitions",
"in": "query",
"description": "Number of partitions for the topic.",
"required": true,
"schema": {
"type": "integer"
}
},
{
"name": "replication_factor",
"in": "query",
"description": "Replication factor for the topic.",
"required": true,
"schema": {
"type": "integer"
}
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ void setupTopic(VertxTestContext context, String topic, int partitions, int coun
@Test
void createTopicTest(VertxTestContext context) {
baseService()
.postRequest("/create-topic/" + topic)
.postRequest("/admin/topics/" + topic)
.addQueryParam("partitions", "1")
.addQueryParam("replication_factor", "1")
.as(BodyCodec.jsonArray())
.send(ar -> {
context.verify(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ void openapiTest(VertxTestContext context) {
assertThat(paths.containsKey("/topics/{topicname}/partitions/{partitionid}/offsets"), is(true));
assertThat(paths.containsKey("/topics/{topicname}/partitions"), is(true));
assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/topics/{topicname}/partitions/{partitionid}").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.SEND_TO_PARTITION.toString()));
assertThat(paths.containsKey("/create-topic/{topicname}"), is(true));
assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/create-topic/{topicname}").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.CREATE_TOPIC.toString()));
assertThat(paths.containsKey("/admin/topics/{topicname}"), is(true));
assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/admin/topics/{topicname}").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.CREATE_TOPIC.toString()));
assertThat(paths.containsKey("/healthy"), is(true));
assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/healthy").getJsonObject("get").getString("operationId"), is(HttpOpenApiOperations.HEALTHY.toString()));
assertThat(paths.containsKey("/ready"), is(true));
Expand Down

0 comments on commit 8df435a

Please sign in to comment.