55
55
import java .util .Set ;
56
56
import java .util .concurrent .CompletableFuture ;
57
57
import java .util .concurrent .CompletionStage ;
58
+ import java .util .concurrent .CopyOnWriteArraySet ;
59
+ import jnr .ffi .annotations .Synchronized ;
58
60
import net .jcip .annotations .ThreadSafe ;
59
61
import org .slf4j .Logger ;
60
62
import org .slf4j .LoggerFactory ;
@@ -80,7 +82,8 @@ public class MetadataManager implements AsyncAutoCloseable {
80
82
private volatile KeyspaceFilter keyspaceFilter ;
81
83
private volatile Boolean schemaEnabledProgrammatically ;
82
84
private volatile boolean tokenMapEnabled ;
83
- private volatile Set <DefaultNode > contactPoints ;
85
+ private volatile Set <EndPoint > contactPoints ;
86
+ private volatile Set <DefaultNode > resolvedContactPoints ;
84
87
private volatile boolean wasImplicitContactPoint ;
85
88
private volatile TypeCodec <TupleValue > tabletPayloadCodec = null ;
86
89
@@ -102,7 +105,7 @@ protected MetadataManager(InternalDriverContext context, DefaultMetadata initial
102
105
DefaultDriverOption .METADATA_SCHEMA_REFRESHED_KEYSPACES , Collections .emptyList ());
103
106
this .keyspaceFilter = KeyspaceFilter .newInstance (logPrefix , refreshedKeyspaces );
104
107
this .tokenMapEnabled = config .getBoolean (DefaultDriverOption .METADATA_TOKEN_MAP_ENABLED );
105
-
108
+ this . resolvedContactPoints = new CopyOnWriteArraySet <>();
106
109
context .getEventBus ().register (ConfigChangeEvent .class , this ::onConfigChanged );
107
110
}
108
111
@@ -145,18 +148,19 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
145
148
// Convert the EndPoints to Nodes, but we can't put them into the Metadata yet, because we
146
149
// don't know their host_id. So store them in a volatile field instead, they will get copied
147
150
// during the first node refresh.
148
- ImmutableSet .Builder <DefaultNode > contactPointsBuilder = ImmutableSet .builder ();
151
+ ImmutableSet .Builder <EndPoint > contactPointsBuilder = ImmutableSet .builder ();
149
152
if (providedContactPoints == null || providedContactPoints .isEmpty ()) {
150
153
LOG .info (
151
154
"[{}] No contact points provided, defaulting to {}" , logPrefix , DEFAULT_CONTACT_POINT );
152
155
this .wasImplicitContactPoint = true ;
153
- contactPointsBuilder .add (new DefaultNode ( DEFAULT_CONTACT_POINT , context ) );
156
+ contactPointsBuilder .add (DEFAULT_CONTACT_POINT );
154
157
} else {
155
158
for (EndPoint endPoint : providedContactPoints ) {
156
- contactPointsBuilder .add (new DefaultNode ( endPoint , context ) );
159
+ contactPointsBuilder .add (endPoint );
157
160
}
158
161
}
159
162
this .contactPoints = contactPointsBuilder .build ();
163
+ this .resolveContactPoints ();
160
164
LOG .debug ("[{}] Adding initial contact points {}" , logPrefix , contactPoints );
161
165
}
162
166
@@ -167,7 +171,31 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
167
171
* @see #wasImplicitContactPoint()
168
172
*/
169
173
public Set <DefaultNode > getContactPoints () {
170
- return contactPoints ;
174
+ return resolvedContactPoints ;
175
+ }
176
+
177
+ @ Synchronized
178
+ public void resolveContactPoints () {
179
+ ImmutableSet .Builder <EndPoint > resultBuilder = ImmutableSet .builder ();
180
+ for (EndPoint endPoint : contactPoints ) {
181
+ List <EndPoint > resolveEndpoints = endPoint .resolveAll ();
182
+ if (!resolveEndpoints .isEmpty ()) {
183
+ LOG .error ("failed to resolve contact endpoint {}" , endPoint );
184
+ } else {
185
+ resultBuilder .addAll (resolveEndpoints );
186
+ }
187
+ }
188
+
189
+ Set <EndPoint > result = resultBuilder .build ();
190
+ for (EndPoint endPoint : result ) {
191
+ if (resolvedContactPoints .stream ()
192
+ .anyMatch (resolved -> resolved .getEndPoint ().equals (endPoint ))) {
193
+ continue ;
194
+ }
195
+ this .resolvedContactPoints .add (new DefaultNode (endPoint , context ));
196
+ }
197
+
198
+ this .resolvedContactPoints .removeIf (endPoint -> !result .contains (endPoint .getEndPoint ()));
171
199
}
172
200
173
201
/** Whether the default contact point was used (because none were provided explicitly). */
@@ -337,10 +365,13 @@ private SingleThreaded(InternalDriverContext context, DriverExecutionProfile con
337
365
}
338
366
339
367
private Void refreshNodes (Iterable <NodeInfo > nodeInfos ) {
368
+ if (!didFirstNodeListRefresh ) {
369
+ resolveContactPoints ();
370
+ }
340
371
MetadataRefresh refresh =
341
372
didFirstNodeListRefresh
342
373
? new FullNodeListRefresh (nodeInfos )
343
- : new InitialNodeListRefresh (nodeInfos , contactPoints );
374
+ : new InitialNodeListRefresh (nodeInfos , resolvedContactPoints );
344
375
didFirstNodeListRefresh = true ;
345
376
return apply (refresh );
346
377
}
0 commit comments