Skip to content

Commit a159ef6

Browse files
Dugong42Haarolean
andauthored
KC: Stop Connectors and Reset Connector Offsets (#573)
Co-authored-by: NOZAIS Julien <[email protected]> Co-authored-by: Roman Zabaluev <[email protected]>
1 parent aaac4d7 commit a159ef6

File tree

19 files changed

+517
-39
lines changed

19 files changed

+517
-39
lines changed

.mvn/jvm.config

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
-Djava.net.useSystemProxies=true

api/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@
492492
</goals>
493493
<configuration>
494494
<arguments>build</arguments>
495+
<pnpmInheritsProxyConfigFromMaven>false</pnpmInheritsProxyConfigFromMaven>
495496
</configuration>
496497
</execution>
497498
</executions>

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

+22
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,16 @@ public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorNam
238238
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
239239
}
240240

241+
@Override
242+
public Mono<Void> stopConnector(String connectorName) throws WebClientResponseException {
243+
return withRetryOnConflictOrRebalance(super.stopConnector(connectorName));
244+
}
245+
246+
@Override
247+
public Mono<ResponseEntity<Void>> stopConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
248+
return withRetryOnConflictOrRebalance(super.stopConnectorWithHttpInfo(connectorName));
249+
}
250+
241251
@Override
242252
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
243253
throws WebClientResponseException {
@@ -261,6 +271,18 @@ public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connec
261271
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
262272
}
263273

274+
@Override
275+
public Mono<Void> resetConnectorOffsets(String connectorName)
276+
throws WebClientResponseException {
277+
return withRetryOnConflictOrRebalance(super.resetConnectorOffsets(connectorName));
278+
}
279+
280+
@Override
281+
public Mono<ResponseEntity<Void>> resetConnectorOffsetsWithHttpInfo(String connectorName)
282+
throws WebClientResponseException {
283+
return withRetryOnConflictOrRebalance(super.resetConnectorOffsetsWithHttpInfo(connectorName));
284+
}
285+
264286
@Override
265287
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
266288
return withRetryOnRebalance(super.resumeConnector(connectorName));

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

+21
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
44
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
55
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
6+
import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
7+
import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;
68

79
import io.kafbat.ui.api.KafkaConnectApi;
810
import io.kafbat.ui.model.ConnectDTO;
@@ -285,4 +287,23 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
285287
default -> defaultComparator;
286288
};
287289
}
290+
291+
@Override
292+
public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, String connectName,
293+
String connectorName,
294+
ServerWebExchange exchange) {
295+
296+
var context = AccessContext.builder()
297+
.cluster(clusterName)
298+
.connectActions(connectName, VIEW, RESET_OFFSETS)
299+
.operationName("resetConnectorOffsets")
300+
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
301+
.build();
302+
303+
return validateAccess(context).then(
304+
kafkaConnectService
305+
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
306+
.map(ResponseEntity::ok))
307+
.doOnEach(sig -> audit(context, sig));
308+
}
288309
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.kafbat.ui.exception;
2+
3+
public class ConnectorOffsetsResetException extends CustomBaseException {
4+
5+
public ConnectorOffsetsResetException(String message) {
6+
super(message);
7+
}
8+
9+
@Override
10+
public ErrorCode getErrorCode() {
11+
return ErrorCode.CONNECTOR_OFFSETS_RESET_ERROR;
12+
}
13+
}

api/src/main/java/io/kafbat/ui/exception/ErrorCode.java

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public enum ErrorCode {
3232
TOPIC_ANALYSIS_ERROR(4018, HttpStatus.BAD_REQUEST),
3333
FILE_UPLOAD_EXCEPTION(4019, HttpStatus.INTERNAL_SERVER_ERROR),
3434
CEL_ERROR(4020, HttpStatus.BAD_REQUEST),
35+
CONNECTOR_OFFSETS_RESET_ERROR(4021, HttpStatus.BAD_REQUEST),
3536
;
3637

3738
static {

api/src/main/java/io/kafbat/ui/model/rbac/permission/ConnectAction.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ public enum ConnectAction implements PermissibleAction {
1010
EDIT(VIEW),
1111
CREATE(VIEW),
1212
RESTART(VIEW),
13-
DELETE(VIEW)
13+
DELETE(VIEW),
14+
RESET_OFFSETS(VIEW)
1415

1516
;
1617

@@ -20,7 +21,7 @@ public enum ConnectAction implements PermissibleAction {
2021
this.dependantActions = dependantActions;
2122
}
2223

23-
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART);
24+
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART, RESET_OFFSETS);
2425

2526
@Nullable
2627
public static ConnectAction fromString(String name) {

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

+18
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
77
import io.kafbat.ui.connect.model.ConnectorTopics;
88
import io.kafbat.ui.connect.model.TaskStatus;
9+
import io.kafbat.ui.exception.ConnectorOffsetsResetException;
910
import io.kafbat.ui.exception.NotFoundException;
1011
import io.kafbat.ui.exception.ValidationException;
1112
import io.kafbat.ui.mapper.ClusterMapper;
@@ -213,6 +214,7 @@ public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName,
213214
case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName,
214215
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
215216
case PAUSE -> client.pauseConnector(connectorName);
217+
case STOP -> client.stopConnector(connectorName);
216218
case RESUME -> client.resumeConnector(connectorName);
217219
});
218220
}
@@ -272,4 +274,20 @@ private ReactiveFailover<KafkaConnectClientApi> api(KafkaCluster cluster, String
272274
}
273275
return client;
274276
}
277+
278+
public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName,
279+
String connectorName) {
280+
return api(cluster, connectName)
281+
.mono(client -> client.resetConnectorOffsets(connectorName))
282+
.onErrorResume(WebClientResponseException.NotFound.class,
283+
e -> {
284+
throw new NotFoundException("Connector %s not found in %s".formatted(connectorName, connectName));
285+
})
286+
.onErrorResume(WebClientResponseException.BadRequest.class,
287+
e -> {
288+
throw new ConnectorOffsetsResetException(
289+
"Failed to reset offsets of connector %s of %s. Make sure it is STOPPED first."
290+
.formatted(connectorName, connectName));
291+
});
292+
}
275293
}

api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java

+57-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.junit.jupiter.api.Test;
2525
import org.springframework.beans.factory.annotation.Autowired;
2626
import org.springframework.core.ParameterizedTypeReference;
27+
import org.springframework.http.HttpStatus;
28+
import org.springframework.test.web.reactive.server.ExchangeResult;
2729
import org.springframework.test.web.reactive.server.WebTestClient;
2830

2931
@Slf4j
@@ -45,6 +47,7 @@ public class KafkaConnectServiceTests extends AbstractIntegrationTest {
4547

4648
@BeforeEach
4749
public void setUp() {
50+
4851
webTestClient.post()
4952
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
5053
.bodyValue(new NewConnectorDTO()
@@ -54,11 +57,10 @@ public void setUp() {
5457
"tasks.max", "1",
5558
"topics", "output-topic",
5659
"file", "/tmp/test",
57-
"test.password", "test-credentials"
58-
))
59-
)
60+
"test.password", "test-credentials")))
6061
.exchange()
6162
.expectStatus().isOk();
63+
6264
}
6365

6466
@AfterEach
@@ -418,4 +420,56 @@ public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() {
418420
.expectStatus()
419421
.isBadRequest();
420422
}
423+
424+
@Test
425+
public void shouldResetConnectorWhenInStoppedState() {
426+
427+
webTestClient.get()
428+
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
429+
LOCAL, connectName, connectorName)
430+
.exchange()
431+
.expectStatus().isOk()
432+
.expectBody(ConnectorDTO.class)
433+
.value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.RUNNING));
434+
435+
webTestClient.post()
436+
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/action/STOP",
437+
LOCAL, connectName, connectorName)
438+
.exchange()
439+
.expectStatus().isOk();
440+
441+
webTestClient.get()
442+
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
443+
LOCAL, connectName, connectorName)
444+
.exchange()
445+
.expectStatus().isOk()
446+
.expectBody(ConnectorDTO.class)
447+
.value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.STOPPED));
448+
449+
webTestClient.delete()
450+
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets",
451+
LOCAL, connectName, connectorName)
452+
.exchange()
453+
.expectStatus().isOk();
454+
455+
}
456+
457+
@Test
458+
public void shouldReturn400WhenResettingConnectorInRunningState() {
459+
460+
webTestClient.get()
461+
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
462+
LOCAL, connectName, connectorName)
463+
.exchange()
464+
.expectStatus().isOk()
465+
.expectBody(ConnectorDTO.class)
466+
.value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.RUNNING));
467+
468+
webTestClient.delete()
469+
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets", LOCAL,
470+
connectName, connectorName)
471+
.exchange()
472+
.expectStatus().isBadRequest();
473+
474+
}
421475
}

contract/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@
201201
</goals>
202202
<configuration>
203203
<arguments>gen:sources</arguments>
204+
<pnpmInheritsProxyConfigFromMaven>false</pnpmInheritsProxyConfigFromMaven>
204205
</configuration>
205206
</execution>
206207
</executions>

contract/src/main/resources/swagger/kafbat-ui-api.yaml

+37-2
Original file line numberDiff line numberDiff line change
@@ -1565,7 +1565,7 @@ paths:
15651565
post:
15661566
tags:
15671567
- Kafka Connect
1568-
summary: update connector state (restart, pause or resume)
1568+
summary: update connector state (restart, pause, stop or resume)
15691569
operationId: updateConnectorState
15701570
parameters:
15711571
- name: clusterName
@@ -1722,6 +1722,31 @@ paths:
17221722
200:
17231723
description: OK
17241724

1725+
/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets:
1726+
delete:
1727+
tags:
1728+
- Kafka Connect
1729+
summary: reset the offsets for the specified connector
1730+
operationId: resetConnectorOffsets
1731+
parameters:
1732+
- name: clusterName
1733+
in: path
1734+
required: true
1735+
schema:
1736+
type: string
1737+
- name: connectName
1738+
in: path
1739+
required: true
1740+
schema:
1741+
type: string
1742+
- name: connectorName
1743+
in: path
1744+
required: true
1745+
schema:
1746+
type: string
1747+
responses:
1748+
200:
1749+
description: OK
17251750

17261751
/api/clusters/{clusterName}/ksql/v2:
17271752
post:
@@ -3567,6 +3592,7 @@ components:
35673592
- RESTART_FAILED_TASKS
35683593
- PAUSE
35693594
- RESUME
3595+
- STOP
35703596

35713597
TaskAction:
35723598
type: string
@@ -3953,7 +3979,16 @@ components:
39533979

39543980
KafkaAcl:
39553981
type: object
3956-
required: [resourceType, resourceName, namePatternType, principal, host, operation, permission]
3982+
required:
3983+
[
3984+
resourceType,
3985+
resourceName,
3986+
namePatternType,
3987+
principal,
3988+
host,
3989+
operation,
3990+
permission,
3991+
]
39573992
properties:
39583993
resourceType:
39593994
$ref: '#/components/schemas/KafkaAclResourceType'

0 commit comments

Comments
 (0)