55
55
import java .util .Set ;
56
56
import java .util .concurrent .CompletableFuture ;
57
57
import java .util .concurrent .CompletionStage ;
58
+ import java .util .concurrent .CopyOnWriteArraySet ;
59
+ import jnr .ffi .annotations .Synchronized ;
58
60
import net .jcip .annotations .ThreadSafe ;
59
61
import org .slf4j .Logger ;
60
62
import org .slf4j .LoggerFactory ;
@@ -80,7 +82,8 @@ public class MetadataManager implements AsyncAutoCloseable {
80
82
private volatile KeyspaceFilter keyspaceFilter ;
81
83
private volatile Boolean schemaEnabledProgrammatically ;
82
84
private volatile boolean tokenMapEnabled ;
83
- private volatile Set <DefaultNode > contactPoints ;
85
+ private volatile Set <EndPoint > contactPoints ;
86
+ private volatile Set <DefaultNode > resolvedContactPoints ;
84
87
private volatile boolean wasImplicitContactPoint ;
85
88
private volatile TypeCodec <TupleValue > tabletPayloadCodec = null ;
86
89
@@ -102,7 +105,7 @@ protected MetadataManager(InternalDriverContext context, DefaultMetadata initial
102
105
DefaultDriverOption .METADATA_SCHEMA_REFRESHED_KEYSPACES , Collections .emptyList ());
103
106
this .keyspaceFilter = KeyspaceFilter .newInstance (logPrefix , refreshedKeyspaces );
104
107
this .tokenMapEnabled = config .getBoolean (DefaultDriverOption .METADATA_TOKEN_MAP_ENABLED );
105
-
108
+ this . resolvedContactPoints = new CopyOnWriteArraySet <>();
106
109
context .getEventBus ().register (ConfigChangeEvent .class , this ::onConfigChanged );
107
110
}
108
111
@@ -145,18 +148,19 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
145
148
// Convert the EndPoints to Nodes, but we can't put them into the Metadata yet, because we
146
149
// don't know their host_id. So store them in a volatile field instead, they will get copied
147
150
// during the first node refresh.
148
- ImmutableSet .Builder <DefaultNode > contactPointsBuilder = ImmutableSet .builder ();
151
+ ImmutableSet .Builder <EndPoint > contactPointsBuilder = ImmutableSet .builder ();
149
152
if (providedContactPoints == null || providedContactPoints .isEmpty ()) {
150
153
LOG .info (
151
154
"[{}] No contact points provided, defaulting to {}" , logPrefix , DEFAULT_CONTACT_POINT );
152
155
this .wasImplicitContactPoint = true ;
153
- contactPointsBuilder .add (new DefaultNode ( DEFAULT_CONTACT_POINT , context ) );
156
+ contactPointsBuilder .add (DEFAULT_CONTACT_POINT );
154
157
} else {
155
158
for (EndPoint endPoint : providedContactPoints ) {
156
- contactPointsBuilder .add (new DefaultNode ( endPoint , context ) );
159
+ contactPointsBuilder .add (endPoint );
157
160
}
158
161
}
159
162
this .contactPoints = contactPointsBuilder .build ();
163
+ this .resolveContactPoints ();
160
164
LOG .debug ("[{}] Adding initial contact points {}" , logPrefix , contactPoints );
161
165
}
162
166
@@ -167,7 +171,30 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
167
171
* @see #wasImplicitContactPoint()
168
172
*/
169
173
public Set <DefaultNode > getContactPoints () {
170
- return contactPoints ;
174
+ return resolvedContactPoints ;
175
+ }
176
+
177
+ public synchronized void resolveContactPoints () {
178
+ ImmutableSet .Builder <EndPoint > resultBuilder = ImmutableSet .builder ();
179
+ for (EndPoint endPoint : contactPoints ) {
180
+ List <EndPoint > resolveEndpoints = endPoint .resolveAll ();
181
+ if (resolveEndpoints .isEmpty ()) {
182
+ LOG .error ("failed to resolve contact endpoint {}" , endPoint );
183
+ } else {
184
+ resultBuilder .addAll (resolveEndpoints );
185
+ }
186
+ }
187
+
188
+ Set <EndPoint > result = resultBuilder .build ();
189
+ for (EndPoint endPoint : result ) {
190
+ if (resolvedContactPoints .stream ()
191
+ .anyMatch (resolved -> resolved .getEndPoint ().equals (endPoint ))) {
192
+ continue ;
193
+ }
194
+ this .resolvedContactPoints .add (new DefaultNode (endPoint , context ));
195
+ }
196
+
197
+ this .resolvedContactPoints .removeIf (endPoint -> !result .contains (endPoint .getEndPoint ()));
171
198
}
172
199
173
200
/** Whether the default contact point was used (because none were provided explicitly). */
@@ -337,10 +364,13 @@ private SingleThreaded(InternalDriverContext context, DriverExecutionProfile con
337
364
}
338
365
339
366
private Void refreshNodes (Iterable <NodeInfo > nodeInfos ) {
367
+ if (!didFirstNodeListRefresh ) {
368
+ resolveContactPoints ();
369
+ }
340
370
MetadataRefresh refresh =
341
371
didFirstNodeListRefresh
342
372
? new FullNodeListRefresh (nodeInfos )
343
- : new InitialNodeListRefresh (nodeInfos , contactPoints );
373
+ : new InitialNodeListRefresh (nodeInfos , resolvedContactPoints );
344
374
didFirstNodeListRefresh = true ;
345
375
return apply (refresh );
346
376
}
0 commit comments