55
55
import java .util .Set ;
56
56
import java .util .concurrent .CompletableFuture ;
57
57
import java .util .concurrent .CompletionStage ;
58
+ import java .util .concurrent .CopyOnWriteArraySet ;
58
59
import net .jcip .annotations .ThreadSafe ;
59
60
import org .slf4j .Logger ;
60
61
import org .slf4j .LoggerFactory ;
@@ -80,7 +81,8 @@ public class MetadataManager implements AsyncAutoCloseable {
80
81
private volatile KeyspaceFilter keyspaceFilter ;
81
82
private volatile Boolean schemaEnabledProgrammatically ;
82
83
private volatile boolean tokenMapEnabled ;
83
- private volatile Set <DefaultNode > contactPoints ;
84
+ private volatile Set <EndPoint > contactPoints ;
85
+ private volatile Set <DefaultNode > resolvedContactPoints ;
84
86
private volatile boolean wasImplicitContactPoint ;
85
87
private volatile TypeCodec <TupleValue > tabletPayloadCodec = null ;
86
88
@@ -102,7 +104,7 @@ protected MetadataManager(InternalDriverContext context, DefaultMetadata initial
102
104
DefaultDriverOption .METADATA_SCHEMA_REFRESHED_KEYSPACES , Collections .emptyList ());
103
105
this .keyspaceFilter = KeyspaceFilter .newInstance (logPrefix , refreshedKeyspaces );
104
106
this .tokenMapEnabled = config .getBoolean (DefaultDriverOption .METADATA_TOKEN_MAP_ENABLED );
105
-
107
+ this . resolvedContactPoints = new CopyOnWriteArraySet <>();
106
108
context .getEventBus ().register (ConfigChangeEvent .class , this ::onConfigChanged );
107
109
}
108
110
@@ -145,18 +147,19 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
145
147
// Convert the EndPoints to Nodes, but we can't put them into the Metadata yet, because we
146
148
// don't know their host_id. So store them in a volatile field instead, they will get copied
147
149
// during the first node refresh.
148
- ImmutableSet .Builder <DefaultNode > contactPointsBuilder = ImmutableSet .builder ();
150
+ ImmutableSet .Builder <EndPoint > contactPointsBuilder = ImmutableSet .builder ();
149
151
if (providedContactPoints == null || providedContactPoints .isEmpty ()) {
150
152
LOG .info (
151
153
"[{}] No contact points provided, defaulting to {}" , logPrefix , DEFAULT_CONTACT_POINT );
152
154
this .wasImplicitContactPoint = true ;
153
- contactPointsBuilder .add (new DefaultNode ( DEFAULT_CONTACT_POINT , context ) );
155
+ contactPointsBuilder .add (DEFAULT_CONTACT_POINT );
154
156
} else {
155
157
for (EndPoint endPoint : providedContactPoints ) {
156
- contactPointsBuilder .add (new DefaultNode ( endPoint , context ) );
158
+ contactPointsBuilder .add (endPoint );
157
159
}
158
160
}
159
161
this .contactPoints = contactPointsBuilder .build ();
162
+ this .resolveContactPoints ();
160
163
LOG .debug ("[{}] Adding initial contact points {}" , logPrefix , contactPoints );
161
164
}
162
165
@@ -167,7 +170,30 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
167
170
* @see #wasImplicitContactPoint()
168
171
*/
169
172
public Set <DefaultNode > getContactPoints () {
170
- return contactPoints ;
173
+ return resolvedContactPoints ;
174
+ }
175
+
176
+ public synchronized void resolveContactPoints () {
177
+ ImmutableSet .Builder <EndPoint > resultBuilder = ImmutableSet .builder ();
178
+ for (EndPoint endPoint : contactPoints ) {
179
+ List <EndPoint > resolveEndpoints = endPoint .resolveAll ();
180
+ if (resolveEndpoints .isEmpty ()) {
181
+ LOG .error ("failed to resolve contact endpoint {}" , endPoint );
182
+ } else {
183
+ resultBuilder .addAll (resolveEndpoints );
184
+ }
185
+ }
186
+
187
+ Set <EndPoint > result = resultBuilder .build ();
188
+ for (EndPoint endPoint : result ) {
189
+ if (resolvedContactPoints .stream ()
190
+ .anyMatch (resolved -> resolved .getEndPoint ().equals (endPoint ))) {
191
+ continue ;
192
+ }
193
+ this .resolvedContactPoints .add (new DefaultNode (endPoint , context ));
194
+ }
195
+
196
+ this .resolvedContactPoints .removeIf (endPoint -> !result .contains (endPoint .getEndPoint ()));
171
197
}
172
198
173
199
/** Whether the default contact point was used (because none were provided explicitly). */
@@ -337,10 +363,13 @@ private SingleThreaded(InternalDriverContext context, DriverExecutionProfile con
337
363
}
338
364
339
365
private Void refreshNodes (Iterable <NodeInfo > nodeInfos ) {
366
+ if (!didFirstNodeListRefresh ) {
367
+ resolveContactPoints ();
368
+ }
340
369
MetadataRefresh refresh =
341
370
didFirstNodeListRefresh
342
371
? new FullNodeListRefresh (nodeInfos )
343
- : new InitialNodeListRefresh (nodeInfos , contactPoints );
372
+ : new InitialNodeListRefresh (nodeInfos , resolvedContactPoints );
344
373
didFirstNodeListRefresh = true ;
345
374
return apply (refresh );
346
375
}
0 commit comments