Skip to content

Commit 1826de2

Browse files
authored
feat: add provisioning phase for consumer on data plane (#4875)
feat: add preparation phase for consumer on data plane
1 parent 6b4a73f commit 1826de2

File tree

35 files changed

+1014
-42
lines changed

35 files changed

+1014
-42
lines changed

core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/DataFlowManagerImpl.java

+11
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,17 @@ public void register(int priority, DataFlowController controller) {
5858
controllers.add(new PrioritizedDataFlowController(priority, controller));
5959
}
6060

61+
@Override
62+
public @NotNull StatusResult<DataFlowResponse> provision(TransferProcess transferProcess, Policy policy) {
63+
try {
64+
return chooseControllerAndApply(transferProcess, controller -> controller.provision(transferProcess, policy));
65+
} catch (Exception e) {
66+
var message = runtimeException(transferProcess.getId(), e.getMessage());
67+
monitor.severe(message, e);
68+
return StatusResult.failure(FATAL_ERROR, message);
69+
}
70+
}
71+
6172
@WithSpan
6273
@Override
6374
public @NotNull StatusResult<DataFlowResponse> start(TransferProcess transferProcess, Policy policy) {

core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java

+9
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,15 @@ private boolean processInitial(TransferProcess process) {
204204

205205
ResourceManifest manifest;
206206
if (process.getType() == CONSUMER) {
207+
var provisioning = dataFlowManager.provision(process, policy);
208+
if (provisioning.succeeded()) {
209+
var response = provisioning.getContent();
210+
process.setDataPlaneId(response.getDataPlaneId());
211+
process.transitionRequesting();
212+
update(process);
213+
return true;
214+
}
215+
207216
var manifestResult = manifestGenerator.generateConsumerResourceManifest(process, policy);
208217
if (manifestResult.failed()) {
209218
transitionToTerminated(process, format("Resource manifest for process %s cannot be modified to fulfil policy. %s", process.getId(), manifestResult.getFailureMessages()));

core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/flow/DataFlowManagerImplTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,27 @@ private DataFlowController createDataFlowController() {
118118
}
119119
}
120120

121+
@Nested
122+
class Provision {
123+
@Test
124+
void shouldChooseControllerAndProvision() {
125+
var controller = mock(DataFlowController.class);
126+
var dataDestination = DataAddress.Builder.newInstance().type("test-dest-type").build();
127+
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
128+
var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination).contentDataAddress(dataAddress).build();
129+
var policy = Policy.Builder.newInstance().build();
130+
131+
when(controller.canHandle(any())).thenReturn(true);
132+
when(controller.provision(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
133+
manager.register(controller);
134+
135+
var result = manager.provision(transferProcess, policy);
136+
137+
assertThat(result).isSucceeded();
138+
verify(controller).provision(transferProcess, policy);
139+
}
140+
}
141+
121142
@Nested
122143
class Suspend {
123144
@Test

core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplIntegrationTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTING;
7979
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING;
8080
import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY;
81+
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;
8182
import static org.junit.jupiter.params.provider.Arguments.arguments;
8283
import static org.mockito.ArgumentMatchers.any;
8384
import static org.mockito.ArgumentMatchers.anyString;
@@ -136,6 +137,7 @@ void setup() {
136137
@DisplayName("Verify that no process 'starves' during two consecutive runs, when the batch size > number of processes")
137138
void verifyProvision_shouldNotStarve() {
138139
var numProcesses = TRANSFER_MANAGER_BATCH_SIZE * 2;
140+
when(dataFlowManager.provision(any(), any())).thenReturn(StatusResult.failure(FATAL_ERROR));
139141
when(provisionManager.provision(any(), any(Policy.class))).thenAnswer(i -> completedFuture(List.of(
140142
ProvisionResponse.Builder.newInstance()
141143
.resource(new TestProvisionedDataDestinationResource("any", "1"))

core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java

+28
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,7 @@ class InitialConsumer {
635635
@Test
636636
void initial_consumer_shouldTransitionToProvisioning() {
637637
var transferProcess = createTransferProcess(INITIAL);
638+
when(dataFlowManager.provision(any(), any())).thenReturn(StatusResult.failure(FATAL_ERROR));
638639
when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build());
639640
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code())))
640641
.thenReturn(List.of(transferProcess))
@@ -655,6 +656,7 @@ void initial_consumer_shouldTransitionToProvisioning() {
655656
@Test
656657
void initial_consumer_manifestEvaluationFailed_shouldTransitionToTerminated() {
657658
var transferProcess = createTransferProcess(INITIAL);
659+
when(dataFlowManager.provision(any(), any())).thenReturn(StatusResult.failure(FATAL_ERROR));
658660
when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build());
659661
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code())))
660662
.thenReturn(List.of(transferProcess))
@@ -674,6 +676,7 @@ void initial_consumer_manifestEvaluationFailed_shouldTransitionToTerminated() {
674676
@Test
675677
void initial_consumer_shouldTransitionToTerminated_whenNoPolicyFound() {
676678
var transferProcess = createTransferProcess(INITIAL);
679+
when(dataFlowManager.provision(any(), any())).thenReturn(StatusResult.failure(FATAL_ERROR));
677680
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code())))
678681
.thenReturn(List.of(transferProcess))
679682
.thenReturn(emptyList());
@@ -687,6 +690,31 @@ void initial_consumer_shouldTransitionToTerminated_whenNoPolicyFound() {
687690
verify(transferProcessStore).save(argThat(p -> p.getState() == TERMINATED.code()));
688691
});
689692
}
693+
694+
@Test
695+
void shouldTransitionToRequesting_whenProvisionThroughDataplaneSucceeds() {
696+
var dataPlaneId = UUID.randomUUID().toString();
697+
var dataFlowResponse = DataFlowResponse.Builder.newInstance()
698+
.dataPlaneId(dataPlaneId)
699+
.build();
700+
var transferProcess = createTransferProcess(INITIAL);
701+
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code())))
702+
.thenReturn(List.of(transferProcess))
703+
.thenReturn(emptyList());
704+
when(dataFlowManager.provision(any(), any())).thenReturn(StatusResult.success(dataFlowResponse));
705+
706+
manager.start();
707+
708+
await().untilAsserted(() -> {
709+
verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString());
710+
verifyNoInteractions(provisionManager, manifestGenerator);
711+
var captor = ArgumentCaptor.forClass(TransferProcess.class);
712+
verify(transferProcessStore).save(captor.capture());
713+
var storedTransferProcess = captor.getValue();
714+
assertThat(storedTransferProcess.getState()).isEqualTo(REQUESTING.code());
715+
assertThat(storedTransferProcess.getDataPlaneId()).isEqualTo(dataPlaneId);
716+
});
717+
}
690718
}
691719

692720
@Nested

core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/service/EmbeddedDataPlaneSelectorService.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.util.List;
2828
import java.util.Optional;
29+
import java.util.function.Predicate;
2930

3031
import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstanceStates.AVAILABLE;
3132

@@ -51,25 +52,34 @@ public ServiceResult<List<DataPlaneInstance>> getAll() {
5152
}
5253

5354
@Override
54-
public ServiceResult<DataPlaneInstance> select(DataAddress source, String transferType, @Nullable String selectionStrategy) {
55+
public ServiceResult<DataPlaneInstance> select(@Nullable String selectionStrategy, Predicate<DataPlaneInstance> filter) {
5556
var sanitizedSelectionStrategy = Optional.ofNullable(selectionStrategy).orElse(DEFAULT_STRATEGY);
5657
var strategy = selectionStrategyRegistry.find(sanitizedSelectionStrategy);
5758
if (strategy == null) {
5859
return ServiceResult.badRequest("Strategy " + sanitizedSelectionStrategy + " was not found");
5960
}
60-
6161
return transactionContext.execute(() -> {
6262
try (var stream = store.getAll()) {
63-
var dataPlanes = stream.filter(it -> it.getState() == AVAILABLE.code()).filter(dataPlane -> dataPlane.canHandle(source, transferType)).toList();
64-
var dataPlane = strategy.apply(dataPlanes);
65-
if (dataPlane == null) {
66-
return ServiceResult.notFound("DataPlane not found");
63+
var dataPlanes = stream
64+
.filter(it -> it.getState() == AVAILABLE.code())
65+
.filter(filter)
66+
.toList();
67+
68+
if (dataPlanes.isEmpty()) {
69+
return ServiceResult.notFound("No dataplane found");
6770
}
71+
72+
var dataPlane = strategy.apply(dataPlanes);
6873
return ServiceResult.success(dataPlane);
6974
}
7075
});
7176
}
7277

78+
@Override
79+
public ServiceResult<DataPlaneInstance> select(DataAddress source, String transferType, @Nullable String selectionStrategy) {
80+
return select(selectionStrategy, dataPlane -> dataPlane.canHandle(source, transferType));
81+
}
82+
7383
@Override
7484
public ServiceResult<Void> addInstance(DataPlaneInstance instance) {
7585
return transactionContext.execute(() -> {

core/data-plane-selector/data-plane-selector-core/src/test/java/org/eclipse/edc/connector/dataplane/selector/service/EmbeddedDataPlaneSelectorServiceTest.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,18 @@ class Select {
5858
@Test
5959
void select_shouldUseChosenSelector() {
6060
var instances = range(0, 10)
61-
.mapToObj(i -> createInstanceBuilder("instance" + i).build())
61+
.mapToObj(i -> createInstanceBuilder("instance" + i)
62+
.allowedSourceType("srcTestType")
63+
.allowedTransferType("transferType")
64+
.state(AVAILABLE.code())
65+
.build())
6266
.toList();
63-
when(store.getAll()).thenReturn(instances.stream());
67+
when(store.getAll()).thenAnswer(i -> instances.stream());
6468
SelectionStrategy selectionStrategy = mock();
6569
when(selectionStrategy.apply(any())).thenAnswer(it -> instances.get(0));
6670
when(selectionStrategyRegistry.find(any())).thenReturn(selectionStrategy);
6771

68-
var result = service.select(createAddress("srcTestType"), "transferType", "strategy");
72+
var result = service.select("strategy", dataPlane -> dataPlane.canHandle(createAddress("srcTestType"), "transferType"));
6973

7074
assertThat(result).isSucceeded().extracting(DataPlaneInstance::getId).isEqualTo("instance0");
7175
verify(selectionStrategyRegistry).find("strategy");
@@ -82,7 +86,7 @@ void select_shouldExcludeInstancesNotAvailable() {
8286
when(selectionStrategy.apply(any())).thenAnswer(it -> availableInstance);
8387
when(selectionStrategyRegistry.find(any())).thenReturn(selectionStrategy);
8488

85-
service.select(createAddress("srcTestType"), "transferType", "strategy");
89+
service.select("strategy", dataPlane -> dataPlane.canHandle(createAddress("srcTestType"), "transferType"));
8690

8791
verify(selectionStrategy).apply(List.of(availableInstance));
8892
}
@@ -95,7 +99,7 @@ void select_shouldReturnBadRequest_whenStrategyNotFound() {
9599
when(store.getAll()).thenReturn(instances.stream());
96100
when(selectionStrategyRegistry.find(any())).thenReturn(null);
97101

98-
var result = service.select(createAddress("srcTestType"), "transferType", "strategy");
102+
var result = service.select("strategy", dataPlane -> dataPlane.canHandle(createAddress("srcTestType"), "transferType"));
99103

100104
assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(BAD_REQUEST);
101105
}
@@ -105,7 +109,7 @@ void select_shouldReturnNotFound_whenInstanceNotFound() {
105109
when(store.getAll()).thenReturn(Stream.empty());
106110
when(selectionStrategyRegistry.find(any())).thenReturn(mock());
107111

108-
var result = service.select(createAddress("srcTestType"), "transferType", "strategy");
112+
var result = service.select("strategy", dataPlane -> dataPlane.canHandle(createAddress("srcTestType"), "transferType"));
109113

110114
assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND);
111115
}

core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneDefaultServicesExtension.java

+7
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
package org.eclipse.edc.connector.dataplane.framework;
1616

1717
import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl;
18+
import org.eclipse.edc.connector.dataplane.framework.provision.ResourceDefinitionGeneratorManagerImpl;
1819
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
1920
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryAccessTokenDataStore;
2021
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryDataPlaneStore;
2122
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
2223
import org.eclipse.edc.connector.dataplane.spi.iam.NoOpDataPlaneAuthorizationService;
2324
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
2425
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
26+
import org.eclipse.edc.connector.dataplane.spi.provision.ResourceDefinitionGeneratorManager;
2527
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
2628
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
2729
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
@@ -78,4 +80,9 @@ public DataPlaneAuthorizationService dataPlaneAuthorizationService(ServiceExtens
7880
context.getMonitor().info("No proper DataPlaneAuthorizationService provided. The data-plane won't support PULL transfer types.");
7981
return new NoOpDataPlaneAuthorizationService();
8082
}
83+
84+
@Provider
85+
public ResourceDefinitionGeneratorManager resourceDefinitionGeneratorManager() {
86+
return new ResourceDefinitionGeneratorManagerImpl();
87+
}
8188
}

core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
2323
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer;
2424
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
25+
import org.eclipse.edc.connector.dataplane.spi.provision.ResourceDefinitionGeneratorManager;
2526
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
2627
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
2728
import org.eclipse.edc.runtime.metamodel.annotation.Configuration;
@@ -114,6 +115,8 @@ public class DataPlaneFrameworkExtension implements ServiceExtension {
114115
private PipelineService pipelineService;
115116
@Inject
116117
private DataPlaneAuthorizationService authorizationService;
118+
@Inject
119+
private ResourceDefinitionGeneratorManager resourceDefinitionGeneratorManager;
117120

118121
@Override
119122
public String name() {
@@ -144,6 +147,7 @@ public void initialize(ServiceExtensionContext context) {
144147
.telemetry(telemetry)
145148
.runtimeId(context.getRuntimeId())
146149
.flowLeaseConfiguration(flowLeaseConfiguration)
150+
.resourceDefinitionGeneratorManager(resourceDefinitionGeneratorManager)
147151
.build();
148152

149153
context.registerService(DataPlaneManager.class, dataPlaneManager);
@@ -170,11 +174,6 @@ public DataTransferExecutorServiceContainer dataTransferExecutorServiceContainer
170174
executorInstrumentation.instrument(executorService, "Data plane transfers"));
171175
}
172176

173-
@NotNull
174-
private EntityRetryProcessConfiguration getEntityRetryProcessConfiguration() {
175-
return new EntityRetryProcessConfiguration(sendRetryLimit, () -> new ExponentialWaitStrategy(sendRetryBaseDelay));
176-
}
177-
178177
@Settings
179178
public record FlowLeaseConfiguration(
180179
@Setting(
@@ -201,5 +200,11 @@ public FlowLeaseConfiguration() {
201200
public long abandonTime() {
202201
return time * factor;
203202
}
203+
204+
}
205+
206+
@NotNull
207+
private EntityRetryProcessConfiguration getEntityRetryProcessConfiguration() {
208+
return new EntityRetryProcessConfiguration(sendRetryLimit, () -> new ExponentialWaitStrategy(sendRetryBaseDelay));
204209
}
205210
}

core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java

+33
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
2323
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
2424
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
25+
import org.eclipse.edc.connector.dataplane.spi.provision.ProvisionResourceDefinition;
26+
import org.eclipse.edc.connector.dataplane.spi.provision.ResourceDefinitionGeneratorManager;
2527
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
2628
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
2729
import org.eclipse.edc.spi.entity.StatefulEntity;
2830
import org.eclipse.edc.spi.query.Criterion;
2931
import org.eclipse.edc.spi.response.StatusResult;
3032
import org.eclipse.edc.spi.result.Result;
33+
import org.eclipse.edc.spi.types.domain.transfer.DataFlowProvisionMessage;
3134
import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage;
3235
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
3336
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
@@ -63,6 +66,7 @@
6366
*/
6467
public class DataPlaneManagerImpl extends AbstractStateEntityManager<DataFlow, DataPlaneStore> implements DataPlaneManager {
6568

69+
private ResourceDefinitionGeneratorManager resourceDefinitionGeneratorManager;
6670
private DataPlaneAuthorizationService authorizationService;
6771
private TransferServiceRegistry transferServiceRegistry;
6872
private TransferProcessApiClient transferProcessClient;
@@ -88,6 +92,30 @@ public Result<Boolean> validate(DataFlowStartMessage dataRequest) {
8892
}
8993
}
9094

95+
@Override
96+
public Result<DataFlowResponseMessage> provision(DataFlowProvisionMessage message) {
97+
var dataFlow = DataFlow.Builder.newInstance()
98+
.id(message.getProcessId())
99+
.destination(message.getDestination())
100+
.callbackAddress(message.getCallbackAddress())
101+
.traceContext(telemetry.getCurrentTraceContext())
102+
.properties(message.getProperties())
103+
.transferType(message.getTransferType())
104+
.runtimeId(runtimeId)
105+
.build();
106+
107+
var resources = resourceDefinitionGeneratorManager.generateConsumerResourceDefinition(dataFlow);
108+
dataFlow.transitionToProvisioning(resources);
109+
110+
store.save(dataFlow);
111+
112+
var newDestination = resources.stream().findFirst()
113+
.map(ProvisionResourceDefinition::getDataAddress)
114+
.orElse(null);
115+
116+
return Result.success(DataFlowResponseMessage.Builder.newInstance().dataAddress(newDestination).build());
117+
}
118+
91119
@Override
92120
public Result<DataFlowResponseMessage> start(DataFlowStartMessage startMessage) {
93121
var dataFlowBuilder = DataFlow.Builder.newInstance()
@@ -368,6 +396,11 @@ public Builder flowLeaseConfiguration(FlowLeaseConfiguration flowLeaseConfigurat
368396
manager.flowLeaseConfiguration = flowLeaseConfiguration;
369397
return this;
370398
}
399+
400+
public Builder resourceDefinitionGeneratorManager(ResourceDefinitionGeneratorManager resourceDefinitionGeneratorManager) {
401+
manager.resourceDefinitionGeneratorManager = resourceDefinitionGeneratorManager;
402+
return this;
403+
}
371404
}
372405

373406
}

0 commit comments

Comments
 (0)