@@ -25,18 +25,19 @@ public abstract class GrpcStreamRetrier {
25
25
protected final AtomicBoolean isReconnecting = new AtomicBoolean (false );
26
26
protected final AtomicBoolean isStopped = new AtomicBoolean (false );
27
27
28
+ private final Logger logger ;
28
29
private final ScheduledExecutorService scheduler ;
29
30
private final RetryConfig retryConfig ;
30
31
private volatile int retryCount ;
31
32
private volatile long retryStartedAt ;
32
33
33
- protected GrpcStreamRetrier (RetryConfig retryConfig , ScheduledExecutorService scheduler ) {
34
+ protected GrpcStreamRetrier (Logger logger , RetryConfig retryConfig , ScheduledExecutorService scheduler ) {
35
+ this .logger = logger ;
34
36
this .retryConfig = retryConfig ;
35
37
this .scheduler = scheduler ;
36
38
this .id = generateRandomId (ID_LENGTH );
37
39
}
38
40
39
- protected abstract Logger getLogger ();
40
41
protected abstract String getStreamName ();
41
42
protected abstract void onStreamReconnect ();
42
43
protected abstract void onShutdown (String reason );
@@ -49,20 +50,20 @@ protected static String generateRandomId(int length) {
49
50
.toString ();
50
51
}
51
52
52
- private void tryReconnect (long delay ) {
53
+ private void tryScheduleReconnect (long delay ) {
53
54
if (!isReconnecting .compareAndSet (false , true )) {
54
- getLogger () .info ("[{}] should reconnect {} stream, but reconnect is already in progress" , id ,
55
+ logger .info ("[{}] should reconnect {} stream, but reconnect is already in progress" , id ,
55
56
getStreamName ());
56
57
return ;
57
58
}
58
59
59
- getLogger () .warn ("[{}] Retry #{}. Scheduling {} reconnect in {}ms..." , id , retryCount , getStreamName (), delay );
60
+ logger .warn ("[{}] Retry #{}. Scheduling {} reconnect in {}ms..." , id , retryCount , getStreamName (), delay );
60
61
try {
61
62
scheduler .schedule (this ::reconnect , delay , TimeUnit .MILLISECONDS );
62
63
} catch (RejectedExecutionException exception ) {
63
64
String errorMessage = "[" + id + "] Couldn't schedule reconnect: scheduler is already shut down. " +
64
65
"Shutting down " + getStreamName ();
65
- getLogger () .error (errorMessage );
66
+ logger .error (errorMessage );
66
67
shutdownImpl (errorMessage );
67
68
}
68
69
}
@@ -73,9 +74,9 @@ protected void resetRetries() {
73
74
}
74
75
75
76
void reconnect () {
76
- getLogger () .info ("[{}] {} reconnect #{} started" , id , getStreamName (), retryCount );
77
+ logger .info ("[{}] {} reconnect #{} started" , id , getStreamName (), retryCount );
77
78
if (!isReconnecting .compareAndSet (true , false )) {
78
- getLogger () .warn ("Couldn't reset reconnect flag. Shouldn't happen" );
79
+ logger .warn ("Couldn't reset reconnect flag. Shouldn't happen" );
79
80
}
80
81
onStreamReconnect ();
81
82
}
@@ -85,7 +86,7 @@ protected CompletableFuture<Void> shutdownImpl() {
85
86
}
86
87
87
88
protected CompletableFuture <Void > shutdownImpl (String reason ) {
88
- getLogger () .info ("[{}] Shutting down {}"
89
+ logger .info ("[{}] Shutting down {}"
89
90
+ (reason == null || reason .isEmpty () ? "" : " with reason: " + reason ), id , getStreamName ());
90
91
isStopped .set (true );
91
92
return CompletableFuture .runAsync (() -> {
@@ -94,28 +95,28 @@ protected CompletableFuture<Void> shutdownImpl(String reason) {
94
95
}
95
96
96
97
protected void onSessionClosed (Status status , Throwable th ) {
97
- getLogger () .info ("[{}] onSessionClosed called" , id );
98
+ logger .info ("[{}] onSessionClosed called" , id );
98
99
99
100
RetryPolicy retryPolicy = null ;
100
101
if (th != null ) {
101
- getLogger () .error ("[{}] Exception in {} stream session: " , id , getStreamName (), th );
102
+ logger .error ("[{}] Exception in {} stream session: " , id , getStreamName (), th );
102
103
retryPolicy = retryConfig .isThrowableRetryable (th );
103
104
} else {
104
105
if (status .isSuccess ()) {
105
106
if (isStopped .get ()) {
106
- getLogger () .info ("[{}] {} stream session closed successfully" , id , getStreamName ());
107
+ logger .info ("[{}] {} stream session closed successfully" , id , getStreamName ());
107
108
return ;
108
109
} else {
109
- getLogger () .warn ("[{}] {} stream session was closed on working {}" , id , getStreamName ());
110
+ logger .warn ("[{}] {} stream session was closed on working {}" , id , getStreamName ());
110
111
}
111
112
} else {
112
- getLogger () .warn ("[{}] Error in {} stream session: {}" , id , getStreamName (), status );
113
+ logger .warn ("[{}] Error in {} stream session: {}" , id , getStreamName (), status );
113
114
retryPolicy = retryConfig .isStatusRetryable (status .getCode ());
114
115
}
115
116
}
116
117
117
118
if (isStopped .get ()) {
118
- getLogger () .info ("[{}] {} is already stopped, no need to schedule reconnect" , id , getStreamName ());
119
+ logger .info ("[{}] {} is already stopped, no need to schedule reconnect" , id , getStreamName ());
119
120
return ;
120
121
}
121
122
@@ -126,23 +127,21 @@ protected void onSessionClosed(Status status, Throwable th) {
126
127
long delay = retryPolicy .nextRetryMs (retryCount + 1 , System .currentTimeMillis () - retryStartedAt );
127
128
if (delay >= 0 ) {
128
129
retryCount ++;
129
- tryReconnect (delay );
130
+ tryScheduleReconnect (delay );
130
131
return ;
131
132
}
132
133
}
133
134
134
135
long elapsedMs = retryStartedAt > 0 ? System .currentTimeMillis () - retryStartedAt : 0 ;
135
136
if (!isStopped .compareAndSet (false , true )) {
136
- getLogger () .warn ("[{}] Stopped after {} retries and {} ms elapsed. But {} is already shut down." ,
137
+ logger .warn ("[{}] Stopped after {} retries and {} ms elapsed. But {} is already shut down." ,
137
138
id , retryCount , elapsedMs , getStreamName ());
138
139
return ;
139
140
}
140
141
141
142
String errorMessage = "[" + id + "] Stopped after " + retryCount + " retries and " + elapsedMs +
142
143
" ms elapsed. Shutting down " + getStreamName ();
143
- getLogger () .error (errorMessage );
144
+ logger .error (errorMessage );
144
145
shutdownImpl (errorMessage );
145
146
}
146
-
147
-
148
147
}
0 commit comments