Skip to content

Commit ba803e2

Browse files
authored
Introduce DefaultXdsLoadBalancerLifecycleObserver (#6490)
Motivation: When using xDS based clients, users will often want to know how the xDS resources translate to the load balancer which routes requests upstream. This changeset attempts introduce a `DefaultXdsLoadBalancerLifecycleObserver` which collects metrics of each load balancer for each cluster. This observer has the same lifecycle as a `ClusterResourceNode`, and will deregister each metric upon close. Note that thanks to `XdsClusterManager`, each cluster has a unique name. Modifications: - A `DefaultXdsLoadBalancerLifecycleObserver` is introduced which records metrics for a `XdsLoadBalancer` - `MeterRegistry`, `MeterIdPrefix` can now be specified in `XdsBootstrapBuilder` - `MeterRegistry`, `MeterIdPrefix` is also added to `SubscriptionContext` so that they can be accessed when dynamically querying xDS resources - An xDS-dedicated `EventExecutor` is used for the default event loop. Result: - `DefaultXdsLoadBalancerLifecycleObserver` now records metrics for `XdsLoadBalancer`s by default <!-- Visit this URL to learn more about how to write a pull request description: https://armeria.dev/community/developer-guide#how-to-write-pull-request-description -->
1 parent 1b2d426 commit ba803e2

File tree

8 files changed

+1013
-15
lines changed

8 files changed

+1013
-15
lines changed

it/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsLoadBalancerLifecycleObserverTest.java

Lines changed: 634 additions & 0 deletions
Large diffs are not rendered by default.

xds/src/main/java/com/linecorp/armeria/xds/BootstrapClusters.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import com.google.common.collect.ImmutableList;
2525

2626
import com.linecorp.armeria.common.annotation.Nullable;
27+
import com.linecorp.armeria.common.metric.MeterIdPrefix;
2728
import com.linecorp.armeria.xds.client.endpoint.XdsLoadBalancer;
2829

2930
import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap;
3031
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
32+
import io.micrometer.core.instrument.MeterRegistry;
3133
import io.netty.util.concurrent.EventExecutor;
3234

3335
final class BootstrapClusters implements SnapshotWatcher<ClusterSnapshot> {
@@ -39,16 +41,19 @@ final class BootstrapClusters implements SnapshotWatcher<ClusterSnapshot> {
3941
private final List<SnapshotWatcher<? super ClusterSnapshot>> watchers;
4042

4143
BootstrapClusters(Bootstrap bootstrap, EventExecutor eventLoop, XdsClusterManager clusterManager,
42-
SnapshotWatcher<Object> defaultSnapshotWatcher) {
44+
SnapshotWatcher<Object> defaultSnapshotWatcher,
45+
MeterIdPrefix meterIdPrefix, MeterRegistry meterRegistry) {
4346
this.bootstrap = bootstrap;
4447
this.eventLoop = eventLoop;
4548
this.clusterManager = clusterManager;
4649
watchers = ImmutableList.of(defaultSnapshotWatcher, this);
47-
initializePrimary(bootstrap);
50+
initializePrimary(bootstrap, meterIdPrefix, meterRegistry);
4851
}
4952

50-
private void initializePrimary(Bootstrap bootstrap) {
51-
final StaticSubscriptionContext context = new StaticSubscriptionContext(eventLoop);
53+
private void initializePrimary(Bootstrap bootstrap, MeterIdPrefix meterIdPrefix,
54+
MeterRegistry meterRegistry) {
55+
final StaticSubscriptionContext context =
56+
new StaticSubscriptionContext(eventLoop, meterIdPrefix, meterRegistry);
5257
for (Cluster cluster: bootstrap.getStaticResources().getClustersList()) {
5358
if (!cluster.hasLoadAssignment()) {
5459
continue;

xds/src/main/java/com/linecorp/armeria/xds/DefaultSubscriptionContext.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package com.linecorp.armeria.xds;
1818

19+
import com.linecorp.armeria.common.metric.MeterIdPrefix;
20+
21+
import io.micrometer.core.instrument.MeterRegistry;
1922
import io.netty.util.concurrent.EventExecutor;
2023

2124
final class DefaultSubscriptionContext implements SubscriptionContext {
@@ -24,21 +27,36 @@ final class DefaultSubscriptionContext implements SubscriptionContext {
2427
private final XdsClusterManager clusterManager;
2528
private final ConfigSourceMapper configSourceMapper;
2629
private final ControlPlaneClientManager controlPlaneClientManager;
30+
private final MeterRegistry meterRegistry;
31+
private final MeterIdPrefix meterIdPrefix;
2732

2833
DefaultSubscriptionContext(EventExecutor eventLoop, XdsClusterManager clusterManager,
2934
ConfigSourceMapper configSourceMapper,
30-
ControlPlaneClientManager controlPlaneClientManager) {
35+
ControlPlaneClientManager controlPlaneClientManager,
36+
MeterRegistry meterRegistry, MeterIdPrefix meterIdPrefix) {
3137
this.eventLoop = eventLoop;
3238
this.clusterManager = clusterManager;
3339
this.configSourceMapper = configSourceMapper;
3440
this.controlPlaneClientManager = controlPlaneClientManager;
41+
this.meterRegistry = meterRegistry;
42+
this.meterIdPrefix = meterIdPrefix;
3543
}
3644

3745
@Override
3846
public EventExecutor eventLoop() {
3947
return eventLoop;
4048
}
4149

50+
@Override
51+
public MeterRegistry meterRegistry() {
52+
return meterRegistry;
53+
}
54+
55+
@Override
56+
public MeterIdPrefix meterIdPrefix() {
57+
return meterIdPrefix;
58+
}
59+
4260
@Override
4361
public void subscribe(ResourceNode<?> node) {
4462
controlPlaneClientManager.subscribe(node);
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.linecorp.armeria.xds;
18+
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Map.Entry;
23+
24+
import com.google.common.collect.ImmutableList;
25+
import com.google.common.collect.ImmutableMap;
26+
27+
import com.linecorp.armeria.client.Endpoint;
28+
import com.linecorp.armeria.client.endpoint.EndpointGroup;
29+
import com.linecorp.armeria.common.metric.MeterIdPrefix;
30+
import com.linecorp.armeria.common.util.SafeCloseable;
31+
import com.linecorp.armeria.xds.client.endpoint.HostSet;
32+
import com.linecorp.armeria.xds.client.endpoint.LoadBalancerState;
33+
import com.linecorp.armeria.xds.client.endpoint.XdsLoadBalancerLifecycleObserver;
34+
35+
import io.envoyproxy.envoy.config.core.v3.Locality;
36+
import io.micrometer.core.instrument.Counter;
37+
import io.micrometer.core.instrument.Gauge;
38+
import io.micrometer.core.instrument.Meter;
39+
import io.micrometer.core.instrument.MeterRegistry;
40+
41+
final class DefaultXdsLoadBalancerLifecycleObserver implements XdsLoadBalancerLifecycleObserver {
42+
43+
private final SnapshotUpdateRecorder resourceUpdatedRecorder;
44+
private final SnapshotUpdateRecorder endpointsUpdatedRecorder;
45+
private final SnapshotUpdateRecorder stateUpdatedRecorder;
46+
private final SnapshotUpdateRecorder stateRejectedRecorder;
47+
private final LoadBalancerRecorder loadBalancerRecorder;
48+
49+
DefaultXdsLoadBalancerLifecycleObserver(MeterIdPrefix prefix,
50+
MeterRegistry meterRegistry, String clusterName) {
51+
prefix = prefix.withTags("cluster", clusterName);
52+
resourceUpdatedRecorder =
53+
new SnapshotUpdateRecorder(meterRegistry, prefix.append("lb.resource.updated"));
54+
endpointsUpdatedRecorder =
55+
new SnapshotUpdateRecorder(meterRegistry, prefix.append("lb.endpoints.updated"));
56+
stateUpdatedRecorder =
57+
new SnapshotUpdateRecorder(meterRegistry, prefix.append("lb.state.updated"));
58+
stateRejectedRecorder =
59+
new SnapshotUpdateRecorder(meterRegistry, prefix.append("lb.state.rejected"));
60+
loadBalancerRecorder = new LoadBalancerRecorder(meterRegistry, prefix);
61+
}
62+
63+
@Override
64+
public void resourceUpdated(ClusterSnapshot snapshot) {
65+
resourceUpdatedRecorder.snapshotUpdated(snapshot);
66+
}
67+
68+
@Override
69+
public void endpointsUpdated(ClusterSnapshot snapshot, List<Endpoint> endpoints) {
70+
endpointsUpdatedRecorder.snapshotUpdated(snapshot);
71+
}
72+
73+
@Override
74+
public void stateUpdated(ClusterSnapshot snapshot, LoadBalancerState state) {
75+
stateUpdatedRecorder.snapshotUpdated(snapshot);
76+
loadBalancerRecorder.stateUpdated(state);
77+
}
78+
79+
@Override
80+
public void stateRejected(ClusterSnapshot snapshot, List<Endpoint> endpoints, Throwable cause) {
81+
stateRejectedRecorder.snapshotUpdated(snapshot);
82+
}
83+
84+
@Override
85+
public void close() {
86+
resourceUpdatedRecorder.close();
87+
endpointsUpdatedRecorder.close();
88+
stateUpdatedRecorder.close();
89+
stateRejectedRecorder.close();
90+
loadBalancerRecorder.close();
91+
}
92+
93+
private static class LoadBalancerRecorder implements SafeCloseable {
94+
95+
private final Gauge subsetsGauge;
96+
private Map<Integer, PriorityRecorder> priorityMap = new HashMap<>();
97+
private final MeterRegistry meterRegistry;
98+
private final MeterIdPrefix prefix;
99+
100+
private volatile int numSubsets;
101+
102+
LoadBalancerRecorder(MeterRegistry meterRegistry, MeterIdPrefix prefix) {
103+
this.meterRegistry = meterRegistry;
104+
this.prefix = prefix;
105+
subsetsGauge = Gauge.builder(prefix.name("lb.state.subsets"), () -> numSubsets)
106+
.tags(prefix.tags())
107+
.description("the number of subsets")
108+
.register(meterRegistry);
109+
}
110+
111+
void stateUpdated(LoadBalancerState state) {
112+
113+
numSubsets = state.subsetStates().size();
114+
115+
final Map<Integer, PriorityRecorder> prevPriorityMap = priorityMap;
116+
final Map<Integer, PriorityRecorder> priorityMap = new HashMap<>();
117+
118+
// handle new HostSets
119+
for (Entry<Integer, HostSet> e: state.hostSets().entrySet()) {
120+
final int priority = e.getKey();
121+
PriorityRecorder recorder = prevPriorityMap.get(e.getKey());
122+
if (recorder == null) {
123+
recorder = new PriorityRecorder(meterRegistry, prefix, priority);
124+
}
125+
recorder.update(priority, e.getValue(), state);
126+
priorityMap.put(priority, recorder);
127+
}
128+
129+
// nullify missing priorities
130+
for (Entry<Integer, PriorityRecorder> e: prevPriorityMap.entrySet()) {
131+
if (!priorityMap.containsKey(e.getKey())) {
132+
e.getValue().close();
133+
}
134+
}
135+
this.priorityMap = ImmutableMap.copyOf(priorityMap);
136+
}
137+
138+
@Override
139+
public void close() {
140+
meterRegistry.remove(subsetsGauge);
141+
priorityMap.values().forEach(PriorityRecorder::close);
142+
}
143+
}
144+
145+
private static class PriorityRecorder implements SafeCloseable {
146+
147+
private Map<Locality, LocalityRecorder> localityMap = ImmutableMap.of();
148+
private final MeterRegistry meterRegistry;
149+
private final MeterIdPrefix prefix;
150+
private final int priority;
151+
private final List<Meter> meters;
152+
153+
private volatile int healthyLoad;
154+
private volatile int degradedLoad;
155+
private volatile boolean panicState;
156+
private volatile double zarLocalPercentage;
157+
158+
PriorityRecorder(MeterRegistry meterRegistry, MeterIdPrefix prefix, int priority) {
159+
this.meterRegistry = meterRegistry;
160+
this.priority = priority;
161+
prefix = prefix.withTags("priority", Integer.toString(priority));
162+
this.prefix = prefix;
163+
164+
final ImmutableList.Builder<Meter> metersBuilder = ImmutableList.builder();
165+
metersBuilder.add(Gauge.builder(prefix.name("lb.state.load.healthy"), () -> healthyLoad)
166+
.tags(prefix.tags())
167+
.register(meterRegistry));
168+
metersBuilder.add(Gauge.builder(prefix.name("lb.state.load.degraded"), () -> degradedLoad)
169+
.tags(prefix.tags())
170+
.register(meterRegistry));
171+
metersBuilder.add(Gauge.builder(prefix.name("lb.state.panic"), () -> panicState ? 1 : 0)
172+
.tags(prefix.tags())
173+
.description("1: panic, 0: normal")
174+
.register(meterRegistry));
175+
if (priority == 0) {
176+
metersBuilder.add(Gauge.builder(prefix.name("lb.zar.local.percentage"),
177+
() -> zarLocalPercentage)
178+
.tags(prefix.tags())
179+
.register(meterRegistry));
180+
}
181+
meters = metersBuilder.build();
182+
}
183+
184+
void update(int priority, HostSet hostSet, LoadBalancerState state) {
185+
assert this.priority == priority;
186+
187+
healthyLoad = state.healthyPriorityLoad().getOrDefault(priority, 0);
188+
degradedLoad = state.degradedPriorityLoad().getOrDefault(priority, 0);
189+
panicState = state.perPriorityPanic().getOrDefault(priority, false);
190+
zarLocalPercentage = state.zarLocalPercentage();
191+
192+
// update locality-specific metrics
193+
final Map<Locality, LocalityRecorder> prevLocalityMap = localityMap;
194+
final Map<Locality, LocalityRecorder> localityMap = new HashMap<>();
195+
for (Locality locality: hostSet.endpointGroupPerLocality().keySet()) {
196+
LocalityRecorder recorder = prevLocalityMap.get(locality);
197+
if (recorder == null) {
198+
recorder = new LocalityRecorder(meterRegistry, prefix, locality, priority);
199+
}
200+
recorder.update(hostSet, state.zarResidualPercentages().getOrDefault(locality, 0d));
201+
localityMap.put(locality, recorder);
202+
}
203+
for (Entry<Locality, LocalityRecorder> e: prevLocalityMap.entrySet()) {
204+
if (!localityMap.containsKey(e.getKey())) {
205+
e.getValue().close();
206+
}
207+
}
208+
this.localityMap = ImmutableMap.copyOf(localityMap);
209+
}
210+
211+
@Override
212+
public void close() {
213+
meters.forEach(meterRegistry::remove);
214+
localityMap.values().forEach(LocalityRecorder::close);
215+
}
216+
}
217+
218+
private static class LocalityRecorder implements SafeCloseable {
219+
220+
private final MeterRegistry meterRegistry;
221+
private final Locality locality;
222+
private final List<Meter> meters;
223+
224+
private volatile int total;
225+
private volatile int healthy;
226+
private volatile int degraded;
227+
private volatile int localityWeight;
228+
private volatile double zarPercentage;
229+
230+
LocalityRecorder(MeterRegistry meterRegistry, MeterIdPrefix prefix, Locality locality,
231+
int priority) {
232+
this.meterRegistry = meterRegistry;
233+
this.locality = locality;
234+
prefix = prefix.withTags("region", locality.getRegion(),
235+
"zone", locality.getZone(),
236+
"sub_zone", locality.getSubZone());
237+
final ImmutableList.Builder<Meter> metersBuilder = ImmutableList.builder();
238+
metersBuilder.add(Gauge.builder(prefix.name("lb.membership.total"), () -> total)
239+
.tags(prefix.tags())
240+
.register(meterRegistry));
241+
metersBuilder.add(Gauge.builder(prefix.name("lb.membership.healthy"), () -> healthy)
242+
.tags(prefix.tags())
243+
.register(meterRegistry));
244+
metersBuilder.add(Gauge.builder(prefix.name("lb.membership.degraded"), () -> degraded)
245+
.tags(prefix.tags())
246+
.register(meterRegistry));
247+
metersBuilder.add(Gauge.builder(prefix.name("lb.locality.weight"), () -> localityWeight)
248+
.tags(prefix.tags())
249+
.register(meterRegistry));
250+
if (priority == 0) {
251+
metersBuilder.add(Gauge.builder(prefix.name("lb.zar.residual.percentage"), () -> zarPercentage)
252+
.tags(prefix.tags())
253+
.register(meterRegistry));
254+
}
255+
meters = metersBuilder.build();
256+
}
257+
258+
void update(HostSet hostSet, double zarPercentage) {
259+
final EndpointGroup allHosts = hostSet.endpointGroupPerLocality()
260+
.getOrDefault(locality, EndpointGroup.of());
261+
total = allHosts.endpoints().size();
262+
final EndpointGroup healthyHosts = hostSet.healthyEndpointGroupPerLocality()
263+
.getOrDefault(locality, EndpointGroup.of());
264+
healthy = healthyHosts.endpoints().size();
265+
final EndpointGroup degradedHosts = hostSet.degradedEndpointGroupPerLocality()
266+
.getOrDefault(locality, EndpointGroup.of());
267+
degraded = degradedHosts.endpoints().size();
268+
localityWeight = hostSet.localityWeights().getOrDefault(locality, 0);
269+
this.zarPercentage = zarPercentage;
270+
}
271+
272+
@Override
273+
public void close() {
274+
meters.forEach(meterRegistry::remove);
275+
}
276+
}
277+
278+
private static class SnapshotUpdateRecorder implements SafeCloseable {
279+
280+
private final MeterRegistry meterRegistry;
281+
private volatile long revision;
282+
private final Gauge revisionGauge;
283+
private final Counter updatedCounter;
284+
285+
SnapshotUpdateRecorder(MeterRegistry meterRegistry, MeterIdPrefix prefix) {
286+
this.meterRegistry = meterRegistry;
287+
revisionGauge = Gauge.builder(prefix.name("revision"), () -> revision)
288+
.tags(prefix.tags())
289+
.register(meterRegistry);
290+
updatedCounter = Counter.builder(prefix.name("count"))
291+
.tags(prefix.tags())
292+
.register(meterRegistry);
293+
}
294+
295+
void snapshotUpdated(ClusterSnapshot snapshot) {
296+
revision = snapshot.xdsResource().revision();
297+
updatedCounter.increment();
298+
}
299+
300+
@Override
301+
public void close() {
302+
meterRegistry.remove(revisionGauge);
303+
meterRegistry.remove(updatedCounter);
304+
}
305+
}
306+
}

0 commit comments

Comments
 (0)