Skip to content

Commit

Permalink
Improve xDS health check (#5785)
Browse files Browse the repository at this point in the history
Motivation:

It is recommended to review #5802
prior to this PR

This changeset attempts to solve several problems:

##### Custom filter logic

xDS considers all endpoints when computing whether a `PrioritySet` is in
panic state. For instance, if the percentage of unhealthy endpoints
exceeds a preconfigured panic threshold, the endpoint selection includes
all endpoints regardless of the degraded status. While armeria supports
an `HealthCheckedEndpointGroup` out of the box, it filters out healthy
endpoints automatically.
-
https://github.com/line/armeria/blob/72ebfe12abf7804723a490fe7acfe3a45d46089f/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/DefaultLoadBalancer.java#L99-L105
- ref:
https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/panic_threshold

In order to resolve this, I propose that a
`AbstractHealthCheckedEndpointGroupBuilder#healthCheckedEndpointPredicate`
API is added

##### Per-cluster health check configuration

Per-cluster member health check is difficult with the current API since
a single parameter set is statically defined for an entire health
checked endpoint group.
- ref:
https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/health_checking#per-cluster-member-health-check-config

We already have an abstraction
`AbstractHealthCheckedEndpointGroupBuilder#newCheckerFactory`. I propose
that this API be used for the purpose of xDS. In order to support the
parameters xDS allows configuring, I propose that parameters are passed
to `HttpHealthChecker` via the constructor instead of the
`HealthCheckerContext`. In order to support this change,
`HttpHealthChecker` has been moved to an internal package so the `xds`
module can also access it.

Modifications:

- Added APIs
`AbstractHealthCheckedEndpointGroupBuilder#healthCheckedEndpointPredicate`
and modified `HealthCheckedEndpointGroup` to filter endpoints based on
the predicate.
- Health checked `Endpoint`s now have attributes `HEALTHY` and
`DEGRADED` set.
- `HealthCheckedEndpointGroup#setEndpoints` is now called on every
invocation of `updateHealth`, not just when a health status changes.
- xDS sometimes needs to override the health checked `Endpoint`, so
modified to receive health checked parameters from the
`HttpHealthChecker` constructor instead of `HealthCheckerContext`.
- In the process, `HttpHealthChecker` has been moved to an internal
package
- `HealthCheckerContext#originalEndpoint` has been added to retrieve the
original endpoint. The other APIs haven't been deprecated since it is
potentially useful information that other `HealthChecker`s may be using.
- Introduced a `XdsHealthCheckedEndpointGroupBuilder` which implements
its own `checkerFactory`.
- `EndpointUtil#coarseHealth` has been updated to consider `HEALTHY`,
`DEGRADED` attributes when determining health.

Result:

- The behavior of `XdsEndpointGroup` is more aligned with envoy in terms
of health checking
- Preparation for zone-aware load balancing

<!--
Visit this URL to learn more about how to write a pull request
description:

https://armeria.dev/community/developer-guide#how-to-write-pull-request-description
-->
  • Loading branch information
jrhee17 authored Aug 8, 2024
1 parent d00febf commit c7aca10
Show file tree
Hide file tree
Showing 25 changed files with 1,026 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import static com.linecorp.armeria.client.endpoint.WeightRampingUpStrategyBuilder.DEFAULT_RAMPING_UP_TASK_WINDOW_MILLIS;
import static com.linecorp.armeria.client.endpoint.WeightRampingUpStrategyBuilder.DEFAULT_TOTAL_STEPS;
import static com.linecorp.armeria.client.endpoint.WeightRampingUpStrategyBuilder.defaultTransition;
import static com.linecorp.armeria.internal.client.endpoint.RampingUpKeys.createdAtNanos;
import static com.linecorp.armeria.internal.client.endpoint.RampingUpKeys.hasCreatedAtNanos;
import static com.linecorp.armeria.internal.client.endpoint.EndpointAttributeKeys.createdAtNanos;
import static com.linecorp.armeria.internal.client.endpoint.EndpointAttributeKeys.hasCreatedAtNanos;
import static java.util.Objects.requireNonNull;

import java.util.ArrayDeque;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.linecorp.armeria.internal.client.endpoint.EndpointAttributeKeys.HEALTHY_ATTR;
import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.util.function.Function;
import java.util.function.Predicate;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.math.LongMath;

import com.linecorp.armeria.client.Client;
Expand Down Expand Up @@ -51,6 +54,9 @@ public abstract class AbstractHealthCheckedEndpointGroupBuilder
extends AbstractDynamicEndpointGroupBuilder<SELF> {

static final Backoff DEFAULT_HEALTH_CHECK_RETRY_BACKOFF = Backoff.fixed(3000).withJitter(0.2);
@VisibleForTesting
static final Predicate<Endpoint> DEFAULT_ENDPOINT_PREDICATE =
endpoint -> Boolean.TRUE.equals(endpoint.attr(HEALTHY_ATTR));

private final EndpointGroup delegate;

Expand All @@ -65,6 +71,7 @@ public abstract class AbstractHealthCheckedEndpointGroupBuilder

private long initialSelectionTimeoutMillis = Flags.defaultResponseTimeoutMillis();
private long selectionTimeoutMillis = Flags.defaultConnectTimeoutMillis();
private Predicate<Endpoint> healthCheckedEndpointPredicate = DEFAULT_ENDPOINT_PREDICATE;

/**
* Creates a new {@link AbstractHealthCheckedEndpointGroupBuilder}.
Expand Down Expand Up @@ -368,6 +375,26 @@ public SELF selectionTimeoutMillis(long initialSelectionTimeoutMillis, long sele
return self();
}

/**
* Sets a predicate to filter health checked {@link Endpoint}s. Whenever there is an update
* in endpoints, this predicate is used to filter {@link Endpoint}s that should be considered
* for selection.
*
* <p>For example:<pre>{@code
* // regardless of health check status, all endpoints will be considered for selection
* HealthCheckedEndpointGroup endpointGroup =
* HealthCheckedEndpointGroup.builder(delegate, "/health")
* .healthCheckedEndpointPredicate(endpoint -> true)
* .build();
* }</pre>
*/
@UnstableApi
public SELF healthCheckedEndpointPredicate(Predicate<Endpoint> healthCheckedEndpointPredicate) {
this.healthCheckedEndpointPredicate =
requireNonNull(healthCheckedEndpointPredicate, "healthCheckedEndpointPredicate");
return self();
}

/**
* Returns a newly created {@link HealthCheckedEndpointGroup} based on the properties set so far.
*/
Expand All @@ -392,7 +419,8 @@ public final HealthCheckedEndpointGroup build() {
initialSelectionTimeoutMillis, selectionTimeoutMillis,
protocol, port, retryBackoff,
clientOptionsBuilder.build(),
newCheckerFactory(), healthCheckStrategy);
newCheckerFactory(), healthCheckStrategy,
healthCheckedEndpointPredicate);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.linecorp.armeria.client.endpoint.healthcheck;

import static com.linecorp.armeria.internal.client.endpoint.EndpointAttributeKeys.healthCheckAttributes;

import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -39,6 +41,7 @@
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.InvalidResponseException;
import com.linecorp.armeria.client.retry.Backoff;
import com.linecorp.armeria.common.Attributes;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
Expand Down Expand Up @@ -72,6 +75,7 @@ final class DefaultHealthCheckerContext
private AsyncCloseable handle;
private boolean destroyed;
private int refCnt = 1;
private Attributes endpointAttributes;

DefaultHealthCheckerContext(Endpoint endpoint, int port, SessionProtocol protocol,
ClientOptions clientOptions, Backoff retryBackoff,
Expand All @@ -89,6 +93,7 @@ final class DefaultHealthCheckerContext
this.clientOptions = clientOptions;
this.retryBackoff = retryBackoff;
this.onUpdateHealth = onUpdateHealth;
endpointAttributes = healthCheckAttributes(false, false);
}

void init(AsyncCloseable handle) {
Expand Down Expand Up @@ -126,7 +131,8 @@ private CompletableFuture<Void> destroy() {
lock.unlock();
}

onUpdateHealth.accept(originalEndpoint, false);
endpointAttributes = healthCheckAttributes(false, false);
onUpdateHealth.accept(originalEndpoint.withAttrs(endpointAttributes), false);

return null;
});
Expand All @@ -137,6 +143,15 @@ public Endpoint endpoint() {
return endpoint;
}

@Override
public Endpoint originalEndpoint() {
return originalEndpoint;
}

Attributes endpointAttributes() {
return endpointAttributes;
}

@Override
public SessionProtocol protocol() {
return protocol;
Expand Down Expand Up @@ -174,7 +189,12 @@ public void updateHealth(double health) {
public void updateHealth(double health, ClientRequestContext ctx,
@Nullable ResponseHeaders headers, @Nullable Throwable cause) {
final boolean isHealthy = health > 0;
onUpdateHealth.accept(originalEndpoint, isHealthy);
if (headers != null && headers.contains("x-envoy-degraded")) {
endpointAttributes = healthCheckAttributes(isHealthy, true);
} else {
endpointAttributes = healthCheckAttributes(isHealthy, false);
}
onUpdateHealth.accept(originalEndpoint.withAttrs(endpointAttributes), isHealthy);

if (!initialCheckFuture.isDone()) {
if (isHealthy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ Map<Endpoint, DefaultHealthCheckerContext> contexts() {
}

List<Endpoint> candidates() {
return candidates;
return candidates.stream().map(endpoint -> {
final DefaultHealthCheckerContext context = contexts.get(endpoint);
assert context != null;
return endpoint.withAttrs(context.endpointAttributes());
}).collect(toImmutableList());
}

void initialize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.linecorp.armeria.client.endpoint.healthcheck;

import static com.linecorp.armeria.internal.client.endpoint.EndpointAttributeKeys.equalHealthCheckAttributes;
import static com.linecorp.armeria.internal.common.util.CollectionUtil.truncate;
import static java.util.Objects.requireNonNull;

Expand All @@ -24,12 +25,14 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -110,6 +113,7 @@ public static HealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate,
private final Function<? super HealthCheckerContext, ? extends AsyncCloseable> checkerFactory;
@VisibleForTesting
final HealthCheckStrategy healthCheckStrategy;
private final Predicate<Endpoint> healthCheckedEndpointPredicate;

private final ReentrantLock lock = new ReentrantShortLock();
@GuardedBy("lock")
Expand All @@ -119,7 +123,7 @@ public static HealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate,
// from the internal array. The remaining value is revived if a new value having the same hash code is
// added.
@VisibleForTesting
final Set<Endpoint> healthyEndpoints = ConcurrentHashMap.newKeySet();
final Map<Endpoint, Endpoint> cachedEndpoints = new ConcurrentHashMap<>();
private volatile boolean initialized;

/**
Expand All @@ -131,7 +135,8 @@ public static HealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate,
SessionProtocol protocol, int port,
Backoff retryBackoff, ClientOptions clientOptions,
Function<? super HealthCheckerContext, ? extends AsyncCloseable> checkerFactory,
HealthCheckStrategy healthCheckStrategy) {
HealthCheckStrategy healthCheckStrategy,
Predicate<Endpoint> healthCheckedEndpointPredicate) {

super(requireNonNull(delegate, "delegate").selectionStrategy(), allowEmptyEndpoints);

Expand All @@ -144,6 +149,8 @@ public static HealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate,
this.clientOptions = requireNonNull(clientOptions, "clientOptions");
this.checkerFactory = requireNonNull(checkerFactory, "checkerFactory");
this.healthCheckStrategy = requireNonNull(healthCheckStrategy, "healthCheckStrategy");
this.healthCheckedEndpointPredicate =
requireNonNull(healthCheckedEndpointPredicate, "healthCheckedEndpointPredicate");

clientOptions.factory().whenClosed().thenRun(this::closeAsync);
delegate.addListener(this::setCandidates, true);
Expand Down Expand Up @@ -203,33 +210,37 @@ Queue<HealthCheckContextGroup> contextGroupChain() {

@VisibleForTesting
List<Endpoint> allHealthyEndpoints() {
lock.lock();
try {
return allEndpoints().stream().filter(healthCheckedEndpointPredicate).collect(Collectors.toList());
} finally {
lock.unlock();
}
}

private List<Endpoint> allEndpoints() {
lock.lock();
try {
final HealthCheckContextGroup newGroup = contextGroupChain.peekLast();
if (newGroup == null) {
return ImmutableList.of();
}

final List<Endpoint> allHealthyEndpoints = new ArrayList<>();
for (Endpoint candidate : newGroup.candidates()) {
if (healthyEndpoints.contains(candidate)) {
allHealthyEndpoints.add(candidate);
}
}
final List<Endpoint> allEndpoints = new ArrayList<>(newGroup.candidates());

for (HealthCheckContextGroup oldGroup : contextGroupChain) {
if (oldGroup == newGroup) {
break;
}
for (Endpoint candidate : oldGroup.candidates()) {
if (!allHealthyEndpoints.contains(candidate) && healthyEndpoints.contains(candidate)) {
if (!allEndpoints.contains(candidate)) {
// Add old Endpoints that do not exist in newGroup. When the first check for newGroup is
// completed, the old Endpoints will be removed.
allHealthyEndpoints.add(candidate);
allEndpoints.add(candidate);
}
}
}
return allHealthyEndpoints;
return allEndpoints;
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -283,9 +294,11 @@ private void updateHealth(Endpoint endpoint, boolean health) {
final boolean updated;
// A healthy endpoint should be a valid checker context.
if (health && findContext(endpoint) != null) {
updated = healthyEndpoints.add(endpoint);
final Endpoint cached = cachedEndpoints.put(endpoint, endpoint);
// the previous endpoint didn't exist, or the attributes changed
updated = (cached == null) || !equalHealthCheckAttributes(cached, endpoint);
} else {
updated = healthyEndpoints.remove(endpoint);
updated = cachedEndpoints.remove(endpoint, endpoint);
}

// Each new health status will be updated after initialization of the first context group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.util.AsyncCloseable;
import com.linecorp.armeria.internal.client.endpoint.healthcheck.HttpHealthChecker;

/**
* A builder for creating a new {@link HealthCheckedEndpointGroup} that sends HTTP health check requests.
Expand Down Expand Up @@ -72,7 +73,8 @@ private static class HttpHealthCheckerFactory implements Function<HealthCheckerC

@Override
public AsyncCloseable apply(HealthCheckerContext ctx) {
final HttpHealthChecker checker = new HttpHealthChecker(ctx, path, useGet);
final HttpHealthChecker checker = new HttpHealthChecker(ctx, ctx.endpoint(), path, useGet,
ctx.protocol(), null);
checker.start();
return checker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;

/**
* Provides the properties and operations required for sending health check requests.
Expand All @@ -36,6 +37,12 @@ public interface HealthCheckerContext {
*/
Endpoint endpoint();

/**
* The original {@link Endpoint} which is considered for health checking.
*/
@UnstableApi
Endpoint originalEndpoint();

/**
* Returns the {@link SessionProtocol} to be used when sending health check requests.
*/
Expand Down
Loading

0 comments on commit c7aca10

Please sign in to comment.