Skip to content

Commit 4dd8c84

Browse files
authored
Migrate XdsEndpointGroup to client.endpoint (xDS-endpoint pt 2) (#5502)
Motivation: More classes will be added related to client endpoint, so I propose that we move `XdsEndpointGroup` from the package `com.linecorp.armeria.xds` to `com.linecorp.armeria.xds.client.endpoint`. Although this PR is mostly refactoring, a new API `EndpointGroup#of(ClusterSnapshot)` is added since `ConfigSourceClient` uses `GrpcService` to decide on the endpoint which will be used. POC: #5450 Modifications: - Move `XdsEndpointGroup`, `XdsConstants` to `com.linecorp.armeria.xds.client.endpoint` - Introduce a new API `EndpointGroup#of(ClusterSnapshot)` - Introduce `XdsEndpointUtil` and move `client.endpoint` related utility methods from `XdsConverterUtil` Result: - Preparation for adding new features to `XdsEndpointGroup` is done <!-- Visit this URL to learn more about how to write a pull request description: https://armeria.dev/community/developer-guide#how-to-write-pull-request-description -->
1 parent 3963f1a commit 4dd8c84

15 files changed

+267
-127
lines changed

Diff for: xds/src/main/java/com/linecorp/armeria/xds/ClusterSnapshot.java

+21
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.linecorp.armeria.xds;
1818

1919
import com.google.common.base.MoreObjects;
20+
import com.google.common.base.Objects;
2021

2122
import com.linecorp.armeria.common.annotation.Nullable;
2223
import com.linecorp.armeria.common.annotation.UnstableApi;
@@ -89,6 +90,26 @@ int index() {
8990
return index;
9091
}
9192

93+
@Override
94+
public boolean equals(Object object) {
95+
if (this == object) {
96+
return true;
97+
}
98+
if (object == null || getClass() != object.getClass()) {
99+
return false;
100+
}
101+
final ClusterSnapshot that = (ClusterSnapshot) object;
102+
return index == that.index && Objects.equal(clusterXdsResource, that.clusterXdsResource) &&
103+
Objects.equal(endpointSnapshot, that.endpointSnapshot) &&
104+
Objects.equal(virtualHost, that.virtualHost) &&
105+
Objects.equal(route, that.route);
106+
}
107+
108+
@Override
109+
public int hashCode() {
110+
return Objects.hashCode(clusterXdsResource, endpointSnapshot, virtualHost, route, index);
111+
}
112+
92113
@Override
93114
public String toString() {
94115
return MoreObjects.toStringHelper(this)

Diff for: xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceClient.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.linecorp.armeria.client.retry.Backoff;
3131
import com.linecorp.armeria.common.SessionProtocol;
3232
import com.linecorp.armeria.common.util.SafeCloseable;
33+
import com.linecorp.armeria.xds.client.endpoint.XdsEndpointGroup;
3334

3435
import io.envoyproxy.envoy.config.core.v3.ApiConfigSource;
3536
import io.envoyproxy.envoy.config.core.v3.ApiConfigSource.ApiType;
@@ -65,7 +66,7 @@ final class ConfigSourceClient implements SafeCloseable {
6566
final ClusterSnapshot clusterSnapshot = bootstrapClusters.clusterSnapshot(clusterName);
6667
checkArgument(clusterSnapshot != null, "Unable to find static cluster '%s'", clusterName);
6768

68-
endpointGroup = new XdsEndpointGroup(clusterSnapshot);
69+
endpointGroup = XdsEndpointGroup.of(clusterSnapshot);
6970
final boolean ads = apiConfigSource.getApiType() == ApiType.AGGREGATED_GRPC;
7071
final UpstreamTlsContext tlsContext = clusterSnapshot.xdsResource().upstreamTlsContext();
7172
final SessionProtocol sessionProtocol =

Diff for: xds/src/main/java/com/linecorp/armeria/xds/EndpointSnapshot.java

+18
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.linecorp.armeria.xds;
1818

1919
import com.google.common.base.MoreObjects;
20+
import com.google.common.base.Objects;
2021

2122
import com.linecorp.armeria.common.annotation.UnstableApi;
2223

@@ -38,6 +39,23 @@ public EndpointXdsResource xdsResource() {
3839
return endpoint;
3940
}
4041

42+
@Override
43+
public boolean equals(Object object) {
44+
if (this == object) {
45+
return true;
46+
}
47+
if (object == null || getClass() != object.getClass()) {
48+
return false;
49+
}
50+
final EndpointSnapshot that = (EndpointSnapshot) object;
51+
return Objects.equal(endpoint, that.endpoint);
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hashCode(endpoint);
57+
}
58+
4159
@Override
4260
public String toString() {
4361
return MoreObjects.toStringHelper(this)

Diff for: xds/src/main/java/com/linecorp/armeria/xds/ListenerSnapshot.java

+19
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.linecorp.armeria.xds;
1818

1919
import com.google.common.base.MoreObjects;
20+
import com.google.common.base.Objects;
2021

2122
import com.linecorp.armeria.common.annotation.Nullable;
2223
import com.linecorp.armeria.common.annotation.UnstableApi;
@@ -51,6 +52,24 @@ public RouteSnapshot routeSnapshot() {
5152
return routeSnapshot;
5253
}
5354

55+
@Override
56+
public boolean equals(Object object) {
57+
if (this == object) {
58+
return true;
59+
}
60+
if (object == null || getClass() != object.getClass()) {
61+
return false;
62+
}
63+
final ListenerSnapshot that = (ListenerSnapshot) object;
64+
return Objects.equal(listenerXdsResource, that.listenerXdsResource) &&
65+
Objects.equal(routeSnapshot, that.routeSnapshot);
66+
}
67+
68+
@Override
69+
public int hashCode() {
70+
return Objects.hashCode(listenerXdsResource, routeSnapshot);
71+
}
72+
5473
@Override
5574
public String toString() {
5675
return MoreObjects.toStringHelper(this)

Diff for: xds/src/main/java/com/linecorp/armeria/xds/RouteSnapshot.java

+19
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424

2525
import com.google.common.base.MoreObjects;
26+
import com.google.common.base.Objects;
2627

2728
import com.linecorp.armeria.common.annotation.UnstableApi;
2829

@@ -74,6 +75,24 @@ public Map<VirtualHost, List<ClusterSnapshot>> virtualHostMap() {
7475
return virtualHostMap;
7576
}
7677

78+
@Override
79+
public boolean equals(Object object) {
80+
if (this == object) {
81+
return true;
82+
}
83+
if (object == null || getClass() != object.getClass()) {
84+
return false;
85+
}
86+
final RouteSnapshot that = (RouteSnapshot) object;
87+
return Objects.equal(routeXdsResource, that.routeXdsResource) &&
88+
Objects.equal(clusterSnapshots, that.clusterSnapshots);
89+
}
90+
91+
@Override
92+
public int hashCode() {
93+
return Objects.hashCode(routeXdsResource, clusterSnapshots);
94+
}
95+
7796
@Override
7897
public String toString() {
7998
return MoreObjects.toStringHelper(this)

Diff for: xds/src/main/java/com/linecorp/armeria/xds/XdsConverterUtil.java

-64
Original file line numberDiff line numberDiff line change
@@ -17,81 +17,17 @@
1717
package com.linecorp.armeria.xds;
1818

1919
import static com.google.common.base.Preconditions.checkArgument;
20-
import static com.google.common.collect.ImmutableList.toImmutableList;
21-
import static com.linecorp.armeria.xds.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME;
2220

23-
import java.util.List;
24-
import java.util.Map;
25-
import java.util.Map.Entry;
26-
import java.util.function.Predicate;
27-
28-
import com.google.common.base.Strings;
29-
import com.google.protobuf.Struct;
30-
import com.google.protobuf.Value;
31-
32-
import com.linecorp.armeria.client.Endpoint;
3321
import com.linecorp.armeria.common.annotation.Nullable;
3422

3523
import io.envoyproxy.envoy.config.core.v3.ApiConfigSource;
3624
import io.envoyproxy.envoy.config.core.v3.ApiConfigSource.ApiType;
3725
import io.envoyproxy.envoy.config.core.v3.ConfigSource;
38-
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
39-
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
40-
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
4126

4227
final class XdsConverterUtil {
4328

4429
private XdsConverterUtil() {}
4530

46-
static List<Endpoint> convertEndpoints(ClusterLoadAssignment clusterLoadAssignment) {
47-
return convertEndpoints(clusterLoadAssignment, lbEndpoint -> true);
48-
}
49-
50-
static List<Endpoint> convertEndpoints(ClusterLoadAssignment clusterLoadAssignment, Struct filterMetadata) {
51-
checkArgument(filterMetadata.getFieldsCount() > 0,
52-
"filterMetadata.getFieldsCount(): %s (expected: > 0)", filterMetadata.getFieldsCount());
53-
final Predicate<LbEndpoint> lbEndpointPredicate = lbEndpoint -> {
54-
final Struct endpointMetadata = lbEndpoint.getMetadata().getFilterMetadataOrDefault(
55-
SUBSET_LOAD_BALANCING_FILTER_NAME, Struct.getDefaultInstance());
56-
if (endpointMetadata.getFieldsCount() == 0) {
57-
return false;
58-
}
59-
return containsFilterMetadata(filterMetadata, endpointMetadata);
60-
};
61-
return convertEndpoints(clusterLoadAssignment, lbEndpointPredicate);
62-
}
63-
64-
private static List<Endpoint> convertEndpoints(ClusterLoadAssignment clusterLoadAssignment,
65-
Predicate<LbEndpoint> lbEndpointPredicate) {
66-
return clusterLoadAssignment.getEndpointsList().stream().flatMap(
67-
localityLbEndpoints -> localityLbEndpoints
68-
.getLbEndpointsList()
69-
.stream()
70-
.filter(lbEndpointPredicate)
71-
.map(lbEndpoint -> {
72-
final SocketAddress socketAddress =
73-
lbEndpoint.getEndpoint().getAddress().getSocketAddress();
74-
final String hostname = lbEndpoint.getEndpoint().getHostname();
75-
if (!Strings.isNullOrEmpty(hostname)) {
76-
return Endpoint.of(hostname, socketAddress.getPortValue())
77-
.withIpAddr(socketAddress.getAddress());
78-
} else {
79-
return Endpoint.of(socketAddress.getAddress(), socketAddress.getPortValue());
80-
}
81-
})).collect(toImmutableList());
82-
}
83-
84-
private static boolean containsFilterMetadata(Struct filterMetadata, Struct endpointMetadata) {
85-
final Map<String, Value> endpointMetadataMap = endpointMetadata.getFieldsMap();
86-
for (Entry<String, Value> entry : filterMetadata.getFieldsMap().entrySet()) {
87-
final Value value = endpointMetadataMap.get(entry.getKey());
88-
if (value == null || !value.equals(entry.getValue())) {
89-
return false;
90-
}
91-
}
92-
return true;
93-
}
94-
9531
static void validateConfigSource(@Nullable ConfigSource configSource) {
9632
if (configSource == null || configSource.equals(ConfigSource.getDefaultInstance())) {
9733
return;

Diff for: xds/src/main/java/com/linecorp/armeria/xds/XdsConstants.java renamed to xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsConstants.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* under the License.
1515
*/
1616

17-
package com.linecorp.armeria.xds;
17+
package com.linecorp.armeria.xds.client.endpoint;
1818

1919
final class XdsConstants {
2020

Diff for: xds/src/main/java/com/linecorp/armeria/xds/XdsEndpointGroup.java renamed to xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointGroup.java

+23-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 LINE Corporation
2+
* Copyright 2024 LINE Corporation
33
*
44
* LINE Corporation licenses this file to you under the Apache License,
55
* version 2.0 (the "License"); you may not use this file except in compliance
@@ -14,11 +14,11 @@
1414
* under the License.
1515
*/
1616

17-
package com.linecorp.armeria.xds;
17+
package com.linecorp.armeria.xds.client.endpoint;
1818

1919
import static com.google.common.base.Preconditions.checkArgument;
20-
import static com.linecorp.armeria.xds.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME;
21-
import static com.linecorp.armeria.xds.XdsConverterUtil.convertEndpoints;
20+
import static com.linecorp.armeria.xds.client.endpoint.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME;
21+
import static com.linecorp.armeria.xds.client.endpoint.XdsEndpointUtil.convertEndpoints;
2222
import static java.util.Objects.requireNonNull;
2323

2424
import java.util.List;
@@ -37,11 +37,19 @@
3737
import com.linecorp.armeria.client.endpoint.EndpointGroup;
3838
import com.linecorp.armeria.common.annotation.UnstableApi;
3939
import com.linecorp.armeria.common.util.SafeCloseable;
40+
import com.linecorp.armeria.xds.ClusterSnapshot;
41+
import com.linecorp.armeria.xds.EndpointSnapshot;
42+
import com.linecorp.armeria.xds.ListenerRoot;
43+
import com.linecorp.armeria.xds.ListenerSnapshot;
44+
import com.linecorp.armeria.xds.RouteSnapshot;
45+
import com.linecorp.armeria.xds.SnapshotWatcher;
46+
import com.linecorp.armeria.xds.XdsBootstrap;
4047

4148
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
4249
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig;
4350
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetFallbackPolicy;
4451
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetSelector;
52+
import io.envoyproxy.envoy.config.core.v3.GrpcService;
4553
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
4654
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
4755
import io.envoyproxy.envoy.config.route.v3.Route;
@@ -76,6 +84,17 @@ public static EndpointGroup of(ListenerRoot listenerRoot) {
7684
return new XdsEndpointGroup(listenerRoot);
7785
}
7886

87+
/**
88+
* Creates a {@link XdsEndpointGroup} based on the specified {@link ClusterSnapshot}.
89+
* This may be useful if one would like to create an {@link EndpointGroup} based on
90+
* a {@link GrpcService}.
91+
*/
92+
@UnstableApi
93+
public static EndpointGroup of(ClusterSnapshot clusterSnapshot) {
94+
requireNonNull(clusterSnapshot, "clusterSnapshot");
95+
return new XdsEndpointGroup(clusterSnapshot);
96+
}
97+
7998
XdsEndpointGroup(ListenerRoot listenerRoot) {
8099
final SnapshotWatcher<ListenerSnapshot> watcher = update -> {
81100
final RouteSnapshot routeSnapshot = update.routeSnapshot();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2024 LINE Corporation
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.linecorp.armeria.xds.client.endpoint;
18+
19+
import static com.google.common.base.Preconditions.checkArgument;
20+
import static com.google.common.collect.ImmutableList.toImmutableList;
21+
import static com.linecorp.armeria.xds.client.endpoint.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME;
22+
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.Map.Entry;
26+
import java.util.function.Predicate;
27+
28+
import com.google.common.base.Strings;
29+
import com.google.protobuf.Struct;
30+
import com.google.protobuf.Value;
31+
32+
import com.linecorp.armeria.client.Endpoint;
33+
34+
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
35+
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
36+
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
37+
38+
final class XdsEndpointUtil {
39+
40+
static List<Endpoint> convertEndpoints(ClusterLoadAssignment clusterLoadAssignment) {
41+
return convertEndpoints(clusterLoadAssignment, lbEndpoint -> true);
42+
}
43+
44+
static List<Endpoint> convertEndpoints(ClusterLoadAssignment clusterLoadAssignment, Struct filterMetadata) {
45+
checkArgument(filterMetadata.getFieldsCount() > 0,
46+
"filterMetadata.getFieldsCount(): %s (expected: > 0)", filterMetadata.getFieldsCount());
47+
final Predicate<LbEndpoint> lbEndpointPredicate = lbEndpoint -> {
48+
final Struct endpointMetadata = lbEndpoint.getMetadata().getFilterMetadataOrDefault(
49+
SUBSET_LOAD_BALANCING_FILTER_NAME, Struct.getDefaultInstance());
50+
if (endpointMetadata.getFieldsCount() == 0) {
51+
return false;
52+
}
53+
return containsFilterMetadata(filterMetadata, endpointMetadata);
54+
};
55+
return convertEndpoints(clusterLoadAssignment, lbEndpointPredicate);
56+
}
57+
58+
private static List<Endpoint> convertEndpoints(ClusterLoadAssignment clusterLoadAssignment,
59+
Predicate<LbEndpoint> lbEndpointPredicate) {
60+
return clusterLoadAssignment.getEndpointsList().stream().flatMap(
61+
localityLbEndpoints -> localityLbEndpoints
62+
.getLbEndpointsList()
63+
.stream()
64+
.filter(lbEndpointPredicate)
65+
.map(lbEndpoint -> {
66+
final SocketAddress socketAddress =
67+
lbEndpoint.getEndpoint().getAddress().getSocketAddress();
68+
final String hostname = lbEndpoint.getEndpoint().getHostname();
69+
if (!Strings.isNullOrEmpty(hostname)) {
70+
return Endpoint.of(hostname, socketAddress.getPortValue())
71+
.withIpAddr(socketAddress.getAddress());
72+
} else {
73+
return Endpoint.of(socketAddress.getAddress(), socketAddress.getPortValue());
74+
}
75+
})).collect(toImmutableList());
76+
}
77+
78+
private static boolean containsFilterMetadata(Struct filterMetadata, Struct endpointMetadata) {
79+
final Map<String, Value> endpointMetadataMap = endpointMetadata.getFieldsMap();
80+
for (Entry<String, Value> entry : filterMetadata.getFieldsMap().entrySet()) {
81+
final Value value = endpointMetadataMap.get(entry.getKey());
82+
if (value == null || !value.equals(entry.getValue())) {
83+
return false;
84+
}
85+
}
86+
return true;
87+
}
88+
89+
private XdsEndpointUtil() {}
90+
}

0 commit comments

Comments
 (0)