Skip to content

Commit 42c3736

Browse files
fix: reset gRPC channel backoff between worker reconnect attempts (#1757) (#1759)
(cherry picked from commit a56c984) Signed-off-by: Javier Aliaga <javier@diagrid.io> Co-authored-by: Javier Aliaga <javier@diagrid.io>
1 parent 50a07fc commit 42c3736

2 files changed

Lines changed: 139 additions & 0 deletions

File tree

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,16 @@ public void startAndBlock() {
230230
Thread.currentThread().interrupt();
231231
break;
232232
}
233+
234+
// After repeated connection failures, gRPC's default exponential backoff
235+
// (1s → up to 120s) can outrun this 5s retry loop: the channel is mid-backoff
236+
// when the next getWorkItems call fires, so it fast-fails with UNAVAILABLE
237+
// instead of attempting reconnection. Short-circuit the backoff timer so the
238+
// retry interval is bounded by this loop, not by the channel's internal state.
239+
Channel ch = this.sidecarClient.getChannel();
240+
if (ch instanceof ManagedChannel) {
241+
((ManagedChannel) ch).resetConnectBackoff();
242+
}
233243
}
234244
}
235245
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright 2026 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
14+
package io.dapr.durabletask;
15+
16+
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
17+
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
18+
import io.grpc.ManagedChannel;
19+
import io.grpc.ManagedChannelBuilder;
20+
import io.grpc.Server;
21+
import io.grpc.netty.NettyServerBuilder;
22+
import io.grpc.stub.StreamObserver;
23+
import org.junit.jupiter.api.AfterEach;
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.net.ServerSocket;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
33+
/**
34+
* Verifies DurableTaskGrpcWorker recovers from an extended sidecar outage even when the
35+
* underlying gRPC channel's exponential backoff has grown past the worker's retry interval.
36+
*
37+
* <p>Without forcing a backoff reset on each retry, gRPC-Java's default reconnect policy
38+
* (1s initial, 1.6x multiplier, 120s max) lets the channel's internal backoff timer outrun
39+
* the worker's 5s retry loop. The worker keeps calling getWorkItems(), but the channel is
40+
* mid-backoff and fails fast with UNAVAILABLE. The bug surfaces in production under chaos
41+
* faults that kill daprd for ~30s — by the time daprd is back, the channel may be 30+s
42+
* into a 60s backoff and miss the recovery window.</p>
43+
*
44+
* <p>Uses a real TCP gRPC server (NettyServerBuilder) so the channel exercises its real
45+
* connect-and-backoff state machine. In-process channels do not have this behavior.</p>
46+
*/
47+
class DurableTaskGrpcWorkerChannelBackoffTest {
48+
49+
// Long enough for the channel's exponential backoff to grow past the worker's 5s
50+
// retry interval (per default policy, ~6 failures get to ~16s backoff).
51+
private static final long EXTENDED_OUTAGE_MILLIS = 30_000L;
52+
53+
// Once the second server is up, a fixed-fix worker should reconnect within one
54+
// retry cycle. Bug-version worker is stuck in a long channel backoff and won't.
55+
private static final long RECONNECT_DEADLINE_MILLIS = 10_000L;
56+
57+
private DurableTaskGrpcWorker worker;
58+
private ManagedChannel channel;
59+
private Server server;
60+
61+
@AfterEach
62+
void tearDown() throws Exception {
63+
if (worker != null) {
64+
worker.close();
65+
}
66+
if (channel != null) {
67+
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
68+
}
69+
if (server != null && !server.isShutdown()) {
70+
server.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
71+
}
72+
}
73+
74+
@Test
75+
void workerReconnectsAfterExtendedSidecarOutage() throws Exception {
76+
int port = pickFreePort();
77+
78+
// Phase 1: server v1 up; worker connects.
79+
CountDownLatch v1Connected = new CountDownLatch(1);
80+
server = startServer(port, request -> v1Connected.countDown());
81+
channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build();
82+
83+
worker = new DurableTaskGrpcWorkerBuilder().grpcChannel(channel).build();
84+
worker.start();
85+
86+
assertTrue(v1Connected.await(10, TimeUnit.SECONDS), "worker did not establish initial stream to server v1");
87+
88+
// Phase 2: kill the sidecar; let the channel exhaust several backoff cycles.
89+
server.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
90+
server = null;
91+
Thread.sleep(EXTENDED_OUTAGE_MILLIS);
92+
93+
// Phase 3: sidecar back; worker should reconnect within one retry cycle.
94+
AtomicInteger v2CallCount = new AtomicInteger();
95+
CountDownLatch v2Connected = new CountDownLatch(1);
96+
server = startServer(port, request -> {
97+
v2CallCount.incrementAndGet();
98+
v2Connected.countDown();
99+
});
100+
101+
boolean reconnected = v2Connected.await(RECONNECT_DEADLINE_MILLIS, TimeUnit.MILLISECONDS);
102+
assertTrue(reconnected,
103+
"worker failed to reconnect within " + RECONNECT_DEADLINE_MILLIS + "ms after sidecar restart"
104+
+ " (channel state=" + channel.getState(false) + ", v2 calls=" + v2CallCount.get() + ")");
105+
}
106+
107+
private static int pickFreePort() throws Exception {
108+
try (ServerSocket s = new ServerSocket(0)) {
109+
s.setReuseAddress(true);
110+
return s.getLocalPort();
111+
}
112+
}
113+
114+
private static Server startServer(int port, java.util.function.Consumer<OrchestratorService.GetWorkItemsRequest> onCall)
115+
throws Exception {
116+
return NettyServerBuilder.forPort(port)
117+
.addService(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() {
118+
@Override
119+
public void getWorkItems(
120+
OrchestratorService.GetWorkItemsRequest request,
121+
StreamObserver<OrchestratorService.WorkItem> responseObserver) {
122+
onCall.accept(request);
123+
// Hold the stream open; the worker only loops on stream termination.
124+
}
125+
})
126+
.build()
127+
.start();
128+
}
129+
}

0 commit comments

Comments
 (0)