Skip to content

Commit 21ed18a

Browse files
committed
Add GrpcHealthCheckedEndpointGroupBuilder
Motivation: Add `GrpcHealthCheckedEndpointGroupBuilder` which builds a health checked endpoint group whose health comes from a [standard gRPC health check service result](https://grpc.io/docs/guides/health-checking/). Modifications: * Adds `GrpcHealthCheckedEndpointGroupBuilder` which extends `AbstractHealthCheckedEndpointGroupBuilder` and creates a new health check function * Adds `GrpcHealthChecker` which is the health check function that creates and uses a gRPC `HealthGrpc` stub to check the gRPC health service on the endpoint. If the health check response is `SERVING`, it is healthy. It is unhealthy if the response is not `SERVING` or if there was a request failure. * Adds tests. Result: * A user can create a health checked endpoint group that is backed by a gRPC health check service. * Closes line#5930 Closes
1 parent a2fd1d3 commit 21ed18a

File tree

5 files changed

+422
-0
lines changed

5 files changed

+422
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.client.grpc.endpoint.healthcheck;
17+
18+
import com.linecorp.armeria.client.endpoint.EndpointGroup;
19+
import com.linecorp.armeria.client.endpoint.healthcheck.AbstractHealthCheckedEndpointGroupBuilder;
20+
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext;
21+
import com.linecorp.armeria.common.annotation.Nullable;
22+
import com.linecorp.armeria.common.util.AsyncCloseable;
23+
import com.linecorp.armeria.internal.client.grpc.GrpcHealthChecker;
24+
25+
import java.util.function.Function;
26+
27+
public final class GrpcHealthCheckedEndpointGroupBuilder extends AbstractHealthCheckedEndpointGroupBuilder<GrpcHealthCheckedEndpointGroupBuilder> {
28+
29+
private @Nullable String service;
30+
31+
GrpcHealthCheckedEndpointGroupBuilder(EndpointGroup delegate) {
32+
super(delegate);
33+
}
34+
35+
public static GrpcHealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate) {
36+
return new GrpcHealthCheckedEndpointGroupBuilder(delegate);
37+
}
38+
39+
public GrpcHealthCheckedEndpointGroupBuilder service(@Nullable String service) {
40+
this.service = service;
41+
return this;
42+
}
43+
44+
@Override
45+
protected Function<? super HealthCheckerContext, ? extends AsyncCloseable> newCheckerFactory() {
46+
return new GrpcHealthCheckerFactory(service);
47+
}
48+
49+
private static class GrpcHealthCheckerFactory implements Function<HealthCheckerContext, AsyncCloseable> {
50+
51+
private final @Nullable String service;
52+
53+
private GrpcHealthCheckerFactory(@Nullable String service) {
54+
this.service = service;
55+
}
56+
57+
@Override
58+
public AsyncCloseable apply(HealthCheckerContext ctx) {
59+
GrpcHealthChecker healthChecker = new GrpcHealthChecker(ctx, ctx.endpoint(), ctx.protocol(), service);
60+
healthChecker.start();
61+
return healthChecker;
62+
}
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.internal.client.grpc;
17+
18+
import com.google.common.annotations.VisibleForTesting;
19+
import com.google.common.util.concurrent.FutureCallback;
20+
import com.google.common.util.concurrent.Futures;
21+
import com.google.common.util.concurrent.ListenableFuture;
22+
import com.linecorp.armeria.client.ClientRequestContext;
23+
import com.linecorp.armeria.client.ClientRequestContextCaptor;
24+
import com.linecorp.armeria.client.Clients;
25+
import com.linecorp.armeria.client.Endpoint;
26+
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext;
27+
import com.linecorp.armeria.client.grpc.GrpcClients;
28+
import com.linecorp.armeria.common.SessionProtocol;
29+
import com.linecorp.armeria.common.annotation.Nullable;
30+
import com.linecorp.armeria.common.util.AsyncCloseable;
31+
import com.linecorp.armeria.common.util.AsyncCloseableSupport;
32+
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;
33+
import io.grpc.health.v1.HealthCheckRequest;
34+
import io.grpc.health.v1.HealthCheckResponse;
35+
import io.grpc.health.v1.HealthGrpc;
36+
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.locks.ReentrantLock;
39+
40+
public final class GrpcHealthChecker implements AsyncCloseable {
41+
42+
static final double HEALTHY = 1d;
43+
static final double UNHEALTHY = 0d;
44+
45+
private final HealthCheckerContext ctx;
46+
@Nullable private final String service;
47+
private final HealthGrpc.HealthFutureStub stub;
48+
49+
private final ReentrantLock lock = new ReentrantShortLock();
50+
private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync);
51+
52+
public GrpcHealthChecker(HealthCheckerContext ctx, Endpoint endpoint, SessionProtocol sessionProtocol,
53+
@Nullable String service) {
54+
this.ctx = ctx;
55+
this.service = service;
56+
57+
this.stub = GrpcClients.builder(sessionProtocol, endpoint)
58+
.options(ctx.clientOptions())
59+
.build(HealthGrpc.HealthFutureStub.class);
60+
}
61+
62+
public void start() {
63+
check();
64+
}
65+
66+
@VisibleForTesting
67+
void check() {
68+
lock();
69+
try {
70+
HealthCheckRequest.Builder builder = HealthCheckRequest.newBuilder();
71+
if (this.service != null) {
72+
builder.setService(service);
73+
}
74+
75+
try (ClientRequestContextCaptor reqCtxCaptor = Clients.newContextCaptor()) {
76+
ListenableFuture<HealthCheckResponse> future = stub.check(builder.build());
77+
ClientRequestContext reqCtx = reqCtxCaptor.get();
78+
79+
Futures.addCallback(future, new FutureCallback<HealthCheckResponse>() {
80+
@Override
81+
public void onSuccess(HealthCheckResponse result) {
82+
if (result.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
83+
ctx.updateHealth(HEALTHY, reqCtx, null, null);
84+
85+
} else {
86+
ctx.updateHealth(UNHEALTHY, reqCtx, null, null);
87+
}
88+
}
89+
90+
@Override
91+
public void onFailure(Throwable t) {
92+
ctx.updateHealth(UNHEALTHY, reqCtx, null, t);
93+
}
94+
95+
}, reqCtx.eventLoop().withoutContext());
96+
}
97+
} finally {
98+
unlock();
99+
}
100+
}
101+
102+
@Override
103+
public CompletableFuture<?> closeAsync() {
104+
return closeable.closeAsync();
105+
}
106+
107+
private synchronized void closeAsync(CompletableFuture<?> future) {
108+
future.complete(null);
109+
}
110+
111+
@Override
112+
public void close() {
113+
closeable.close();
114+
}
115+
116+
private void lock() {
117+
lock.lock();
118+
}
119+
120+
private void unlock() {
121+
lock.unlock();
122+
}
123+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.client.grpc.endpoint.healthcheck;
17+
18+
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup;
19+
import com.linecorp.armeria.common.SessionProtocol;
20+
import com.linecorp.armeria.common.grpc.HealthGrpcServerExtension;
21+
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.api.extension.RegisterExtension;
23+
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
26+
27+
import static org.assertj.core.api.Assertions.*;
28+
29+
class GrpcHealthCheckedEndpointGroupBuilderTest {
30+
31+
@RegisterExtension
32+
private static HealthGrpcServerExtension serverExtension = new HealthGrpcServerExtension();
33+
34+
@Test
35+
public void hasHealthyEndpoint() throws Exception {
36+
serverExtension.setAction(HealthGrpcServerExtension.Action.DO_HEALTHY);
37+
38+
HealthCheckedEndpointGroup endpointGroup = GrpcHealthCheckedEndpointGroupBuilder
39+
.builder(serverExtension.endpoint(SessionProtocol.H2C))
40+
.build();
41+
42+
assertThat(endpointGroup.whenReady().join()).hasSize(1);
43+
}
44+
45+
@Test
46+
public void empty() throws Exception {
47+
serverExtension.setAction(HealthGrpcServerExtension.Action.DO_UNHEALTHY);
48+
49+
HealthCheckedEndpointGroup endpointGroup = GrpcHealthCheckedEndpointGroupBuilder
50+
.builder(serverExtension.endpoint(SessionProtocol.H2C))
51+
.build();
52+
53+
assertThatThrownBy(() -> {
54+
// whenReady() will timeout because there are no healthy endpoints
55+
endpointGroup.whenReady().get(1, TimeUnit.SECONDS);
56+
57+
}).isInstanceOf(TimeoutException.class);
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.common.grpc;
17+
18+
import com.google.protobuf.TextFormat;
19+
import com.linecorp.armeria.server.ServerBuilder;
20+
import com.linecorp.armeria.server.grpc.GrpcService;
21+
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
22+
import io.grpc.health.v1.HealthCheckRequest;
23+
import io.grpc.health.v1.HealthCheckResponse;
24+
import io.grpc.health.v1.HealthGrpc;
25+
import io.grpc.stub.StreamObserver;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
public class HealthGrpcServerExtension extends ServerExtension {
30+
31+
private static final Logger LOGGER = LoggerFactory.getLogger(HealthGrpcServerExtension.class);
32+
33+
private static final HealthCheckResponse HEALTHY_HEALTH_CHECK_RESPONSE = HealthCheckResponse.newBuilder()
34+
.setStatus(HealthCheckResponse.ServingStatus.SERVING)
35+
.build();
36+
37+
private static final HealthCheckResponse UNHEALTHY_HEALTH_CHECK_RESPONSE = HealthCheckResponse.newBuilder()
38+
.setStatus(HealthCheckResponse.ServingStatus.NOT_SERVING)
39+
.build();
40+
41+
public enum Action {
42+
DO_HEALTHY, DO_UNHEALTHY, DO_TIMEOUT
43+
}
44+
45+
private Action action;
46+
47+
@Override
48+
protected void configure(ServerBuilder sb) throws Exception {
49+
GrpcService grpcService = GrpcService.builder()
50+
.addService(new HealthGrpc.HealthImplBase() {
51+
@Override
52+
public void check(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
53+
LOGGER.debug("Received health check response {}", TextFormat.shortDebugString(request));
54+
55+
if (action == Action.DO_HEALTHY) {
56+
responseObserver.onNext(HEALTHY_HEALTH_CHECK_RESPONSE);
57+
responseObserver.onCompleted();
58+
59+
} else if (action == Action.DO_UNHEALTHY) {
60+
responseObserver.onNext(UNHEALTHY_HEALTH_CHECK_RESPONSE);
61+
responseObserver.onCompleted();
62+
63+
} else if (action == Action.DO_TIMEOUT) {
64+
LOGGER.debug("Not sending a response...");
65+
}
66+
67+
LOGGER.debug("Completed health check response");
68+
}
69+
70+
@Override
71+
public void watch(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
72+
throw new UnsupportedOperationException();
73+
}
74+
})
75+
.build();
76+
77+
sb.service(grpcService);
78+
}
79+
80+
public void setAction(Action action) {
81+
this.action = action;
82+
}
83+
}

0 commit comments

Comments
 (0)