Skip to content

Commit 4724b11

Browse files
authored
feat: prioritized transfer services (#4876)
1 parent 1826de2 commit 4724b11

File tree

4 files changed

+113
-43
lines changed

4 files changed

+113
-43
lines changed

core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/registry/TransferServiceRegistryImpl.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*
1010
* Contributors:
1111
* Microsoft Corporation - initial API and implementation
12+
* Cofinity-X - prioritized transfer services
1213
*
1314
*/
1415

@@ -20,14 +21,15 @@
2021
import org.jetbrains.annotations.Nullable;
2122

2223
import java.util.Collection;
24+
import java.util.Comparator;
2325
import java.util.LinkedHashSet;
2426

2527
/**
2628
* Default {@link TransferServiceRegistry} implementation.
2729
*/
2830
public class TransferServiceRegistryImpl implements TransferServiceRegistry {
2931

30-
private final Collection<TransferService> transferServices = new LinkedHashSet<>();
32+
private final Collection<PrioritizedTransferService> transferServices = new LinkedHashSet<>();
3133
private final TransferServiceSelectionStrategy transferServiceSelectionStrategy;
3234

3335
public TransferServiceRegistryImpl(TransferServiceSelectionStrategy transferServiceSelectionStrategy) {
@@ -36,13 +38,34 @@ public TransferServiceRegistryImpl(TransferServiceSelectionStrategy transferServ
3638

3739
@Override
3840
public void registerTransferService(TransferService transferService) {
39-
transferServices.add(transferService);
41+
transferServices.add(new PrioritizedTransferService(0, transferService));
4042
}
41-
43+
44+
@Override
45+
public void registerTransferService(int priority, TransferService transferService) {
46+
transferServices.add(new PrioritizedTransferService(priority, transferService));
47+
}
48+
4249
@Override
4350
@Nullable
4451
public TransferService resolveTransferService(DataFlowStartMessage request) {
45-
var possibleServices = transferServices.stream().filter(s -> s.canHandle(request));
52+
var prioritizedServicesPresent = transferServices.stream()
53+
.map(PrioritizedTransferService::priority)
54+
.anyMatch(priority -> priority > 0);
55+
56+
if (prioritizedServicesPresent) {
57+
return transferServices.stream()
58+
.filter(pts -> pts.service.canHandle(request))
59+
.sorted(Comparator.comparingInt(pts -> -pts.priority))
60+
.map(PrioritizedTransferService::service)
61+
.findFirst().orElse(null);
62+
}
63+
64+
var possibleServices = transferServices.stream()
65+
.map(PrioritizedTransferService::service)
66+
.filter(ts -> ts.canHandle(request));
4667
return transferServiceSelectionStrategy.chooseTransferService(request, possibleServices);
4768
}
69+
70+
record PrioritizedTransferService(int priority, TransferService service) { }
4871
}

core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/registry/TransferServiceSelectionStrategy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*
1010
* Contributors:
1111
* Microsoft Corporation - Initial implementation
12+
* Cofinity-X - prioritized transfer services
1213
*
1314
*/
1415

@@ -23,7 +24,11 @@
2324
/**
2425
* Functional interface for selecting which of (potentially) multiple {@link TransferService}s to use
2526
* for serving a particular {@link DataFlowStartMessage}.
27+
*
28+
* @deprecated use transfer service prioritization, see
29+
* {@link org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry#registerTransferService(int, TransferService)}.
2630
*/
31+
@Deprecated(since = "0.12.0", forRemoval = true)
2732
public interface TransferServiceSelectionStrategy {
2833
/**
2934
* Selects which of (potentially) multiple {@link TransferService}s to use

core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/registry/TransferServiceRegistryImplTest.java

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*
1010
* Contributors:
1111
* Microsoft Corporation - initial API and implementation
12+
* Cofinity-X - prioritized transfer services
1213
*
1314
*/
1415

@@ -23,70 +24,100 @@
2324
import java.util.stream.Stream;
2425

2526
import static org.assertj.core.api.Assertions.assertThat;
26-
import static org.mockito.ArgumentMatchers.any;
2727
import static org.mockito.Mockito.eq;
2828
import static org.mockito.Mockito.mock;
2929
import static org.mockito.Mockito.verify;
3030
import static org.mockito.Mockito.when;
3131

3232
class TransferServiceRegistryImplTest {
33-
TransferService transferService = mock(TransferService.class);
34-
TransferService transferService2 = mock(TransferService.class);
35-
36-
DataFlowStartMessage request = createRequest().build();
37-
TransferServiceSelectionStrategy transferServiceSelectionStrategy = mock(TransferServiceSelectionStrategy.class);
33+
private TransferService transferService = mock(TransferService.class);
34+
private TransferService transferService2 = mock(TransferService.class);
35+
36+
private DataFlowStartMessage request = createRequest().build();
37+
private TransferServiceSelectionStrategy transferServiceSelectionStrategy = mock(TransferServiceSelectionStrategy.class);
3838
@SuppressWarnings("unchecked")
39-
ArgumentCaptor<Stream<TransferService>> streamCaptor = ArgumentCaptor.forClass(Stream.class);
40-
39+
private ArgumentCaptor<Stream<TransferService>> streamCaptor = ArgumentCaptor.forClass(Stream.class);
40+
41+
private TransferServiceRegistryImpl registry = new TransferServiceRegistryImpl(transferServiceSelectionStrategy);
42+
4143
@Test
42-
void resolveTransferService_filters_matches() {
44+
void resolveTransferService_noServicesRegistered_shouldReturnNull() {
45+
var service = registry.resolveTransferService(request);
46+
47+
assertThat(service).isNull();
48+
}
49+
50+
@Test
51+
void resolveTransferService_noServiceCanHandle_shouldReturnNull() {
52+
registry.registerTransferService(transferService);
4353
when(transferService.canHandle(request)).thenReturn(false);
54+
55+
var service = registry.resolveTransferService(request);
56+
57+
assertThat(service).isNull();
58+
}
59+
60+
@Test
61+
void resolveTransferService_withPriorities_shouldReturnHighestPriorityService() {
62+
registry.registerTransferService(transferService);
63+
registry.registerTransferService(1, transferService2);
64+
when(transferService.canHandle(request)).thenReturn(true);
4465
when(transferService2.canHandle(request)).thenReturn(true);
45-
46-
createRegistryAndResolveForRequest();
47-
48-
assertThat(streamCaptor.getValue()).containsExactly(transferService2);
66+
67+
var service = registry.resolveTransferService(request);
68+
69+
assertThat(service).isEqualTo(transferService2);
4970
}
50-
71+
5172
@Test
52-
void resolveTransferService_handles_multipleMatches() {
73+
void resolveTransferService_withSamePriority_shouldReturnFirstWithHighestPriority() {
74+
registry.registerTransferService(1, transferService);
75+
registry.registerTransferService(1, transferService2);
5376
when(transferService.canHandle(request)).thenReturn(true);
5477
when(transferService2.canHandle(request)).thenReturn(true);
55-
56-
createRegistryAndResolveForRequest();
57-
58-
assertThat(streamCaptor.getValue()).containsExactly(transferService, transferService2);
78+
79+
var service = registry.resolveTransferService(request);
80+
81+
assertThat(service).isEqualTo(transferService);
5982
}
60-
83+
6184
@Test
62-
void resolveTransferService_handles_noMatch() {
63-
var registry = new TransferServiceRegistryImpl(transferServiceSelectionStrategy);
64-
85+
void resolveTransferService_noPriorityAndNoneCanHandle_shouldApplyStrategyWithEmptyStream() {
86+
registry.registerTransferService(transferService);
87+
registry.registerTransferService(transferService2);
88+
when(transferService.canHandle(request)).thenReturn(false);
89+
when(transferService2.canHandle(request)).thenReturn(false);
90+
6591
registry.resolveTransferService(request);
66-
92+
6793
verify(transferServiceSelectionStrategy).chooseTransferService(eq(request), streamCaptor.capture());
6894
assertThat(streamCaptor.getValue()).isEmpty();
6995
}
70-
96+
7197
@Test
72-
void resolveTransferService_returns_strategyResult() {
73-
var registry = new TransferServiceRegistryImpl(transferServiceSelectionStrategy);
74-
when(transferServiceSelectionStrategy.chooseTransferService(eq(request), any()))
75-
.thenReturn(transferService);
76-
77-
var resolved = registry.resolveTransferService(request);
78-
79-
assertThat(resolved).isSameAs(transferService);
98+
void resolveTransferService_noPriorityAndOneCanHandle_shouldApplyStrategyWithOneService() {
99+
registry.registerTransferService(transferService);
100+
registry.registerTransferService(transferService2);
101+
when(transferService.canHandle(request)).thenReturn(true);
102+
when(transferService2.canHandle(request)).thenReturn(false);
103+
104+
registry.resolveTransferService(request);
105+
106+
verify(transferServiceSelectionStrategy).chooseTransferService(eq(request), streamCaptor.capture());
107+
assertThat(streamCaptor.getValue()).containsExactly(transferService);
80108
}
81-
82-
private void createRegistryAndResolveForRequest() {
83-
var registry = new TransferServiceRegistryImpl(transferServiceSelectionStrategy);
109+
110+
@Test
111+
void resolveTransferService_noPriorityAndAllCanHandle_shouldApplyStrategyWithAllServices() {
84112
registry.registerTransferService(transferService);
85113
registry.registerTransferService(transferService2);
86-
114+
when(transferService.canHandle(request)).thenReturn(true);
115+
when(transferService2.canHandle(request)).thenReturn(true);
116+
87117
registry.resolveTransferService(request);
88-
118+
89119
verify(transferServiceSelectionStrategy).chooseTransferService(eq(request), streamCaptor.capture());
120+
assertThat(streamCaptor.getValue()).containsExactly(transferService, transferService2);
90121
}
91122

92123
private DataFlowStartMessage.Builder createRequest() {

spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/registry/TransferServiceRegistry.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*
1010
* Contributors:
1111
* Microsoft Corporation - initial API and implementation
12+
* Cofinity-X - prioritized transfer services
1213
*
1314
*/
1415

@@ -24,13 +25,23 @@
2425
*/
2526
@ExtensionPoint
2627
public interface TransferServiceRegistry {
28+
2729
/**
2830
* Adds a {@link TransferService} to the collection of services that can perform data transfers.
31+
* The priority is set to 0.
2932
*
3033
* @param transferService the service to add.
3134
*/
3235
void registerTransferService(TransferService transferService);
33-
36+
37+
/**
38+
* Adds a {@link TransferService} with given priority to the collection of services that can
39+
* perform data transfers. Higher priorities will be preferred during selection.
40+
*
41+
* @param priority the priority
42+
* @param transferService the service to add.
43+
*/
44+
void registerTransferService(int priority, TransferService transferService);
3445

3546
/**
3647
* Resolves a {@link TransferService}s to use for serving a particular {@link DataFlowStartMessage}.

0 commit comments

Comments
 (0)