48
48
import edu .umd .cs .findbugs .annotations .NonNull ;
49
49
import io .netty .util .concurrent .EventExecutor ;
50
50
import java .net .InetSocketAddress ;
51
+ import java .net .UnknownHostException ;
51
52
import java .nio .ByteBuffer ;
52
53
import java .util .Collections ;
53
54
import java .util .List ;
54
55
import java .util .Map ;
55
56
import java .util .Set ;
56
57
import java .util .concurrent .CompletableFuture ;
57
58
import java .util .concurrent .CompletionStage ;
59
+ import java .util .concurrent .CopyOnWriteArraySet ;
60
+ import jnr .ffi .annotations .Synchronized ;
58
61
import net .jcip .annotations .ThreadSafe ;
59
62
import org .slf4j .Logger ;
60
63
import org .slf4j .LoggerFactory ;
@@ -80,7 +83,8 @@ public class MetadataManager implements AsyncAutoCloseable {
80
83
private volatile KeyspaceFilter keyspaceFilter ;
81
84
private volatile Boolean schemaEnabledProgrammatically ;
82
85
private volatile boolean tokenMapEnabled ;
83
- private volatile Set <DefaultNode > contactPoints ;
86
+ private volatile Set <EndPoint > contactPoints ;
87
+ private volatile Set <DefaultNode > resolvedContactPoints ;
84
88
private volatile boolean wasImplicitContactPoint ;
85
89
private volatile TypeCodec <TupleValue > tabletPayloadCodec = null ;
86
90
@@ -102,7 +106,7 @@ protected MetadataManager(InternalDriverContext context, DefaultMetadata initial
102
106
DefaultDriverOption .METADATA_SCHEMA_REFRESHED_KEYSPACES , Collections .emptyList ());
103
107
this .keyspaceFilter = KeyspaceFilter .newInstance (logPrefix , refreshedKeyspaces );
104
108
this .tokenMapEnabled = config .getBoolean (DefaultDriverOption .METADATA_TOKEN_MAP_ENABLED );
105
-
109
+ this . resolvedContactPoints = new CopyOnWriteArraySet <>();
106
110
context .getEventBus ().register (ConfigChangeEvent .class , this ::onConfigChanged );
107
111
}
108
112
@@ -145,18 +149,19 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
145
149
// Convert the EndPoints to Nodes, but we can't put them into the Metadata yet, because we
146
150
// don't know their host_id. So store them in a volatile field instead, they will get copied
147
151
// during the first node refresh.
148
- ImmutableSet .Builder <DefaultNode > contactPointsBuilder = ImmutableSet .builder ();
152
+ ImmutableSet .Builder <EndPoint > contactPointsBuilder = ImmutableSet .builder ();
149
153
if (providedContactPoints == null || providedContactPoints .isEmpty ()) {
150
154
LOG .info (
151
155
"[{}] No contact points provided, defaulting to {}" , logPrefix , DEFAULT_CONTACT_POINT );
152
156
this .wasImplicitContactPoint = true ;
153
- contactPointsBuilder .add (new DefaultNode ( DEFAULT_CONTACT_POINT , context ) );
157
+ contactPointsBuilder .add (DEFAULT_CONTACT_POINT );
154
158
} else {
155
159
for (EndPoint endPoint : providedContactPoints ) {
156
- contactPointsBuilder .add (new DefaultNode ( endPoint , context ) );
160
+ contactPointsBuilder .add (endPoint );
157
161
}
158
162
}
159
163
this .contactPoints = contactPointsBuilder .build ();
164
+ this .resolveContactPoints ();
160
165
LOG .debug ("[{}] Adding initial contact points {}" , logPrefix , contactPoints );
161
166
}
162
167
@@ -167,7 +172,30 @@ public void addContactPoints(Set<EndPoint> providedContactPoints) {
167
172
* @see #wasImplicitContactPoint()
168
173
*/
169
174
public Set <DefaultNode > getContactPoints () {
170
- return contactPoints ;
175
+ return resolvedContactPoints ;
176
+ }
177
+
178
+ @ Synchronized
179
+ public void resolveContactPoints () {
180
+ ImmutableSet .Builder <EndPoint > resultBuilder = ImmutableSet .builder ();
181
+ for (EndPoint endPoint : contactPoints ) {
182
+ try {
183
+ resultBuilder .addAll (endPoint .resolveAll ());
184
+ } catch (UnknownHostException e ) {
185
+ LOG .error ("failed to resolve contact endpoint {}" , endPoint , e );
186
+ }
187
+ }
188
+
189
+ Set <EndPoint > result = resultBuilder .build ();
190
+ for (EndPoint endPoint : result ) {
191
+ if (resolvedContactPoints .stream ()
192
+ .anyMatch (resolved -> resolved .getEndPoint ().equals (endPoint ))) {
193
+ continue ;
194
+ }
195
+ this .resolvedContactPoints .add (new DefaultNode (endPoint , context ));
196
+ }
197
+
198
+ this .resolvedContactPoints .removeIf (endPoint -> !result .contains (endPoint .getEndPoint ()));
171
199
}
172
200
173
201
/** Whether the default contact point was used (because none were provided explicitly). */
@@ -337,10 +365,13 @@ private SingleThreaded(InternalDriverContext context, DriverExecutionProfile con
337
365
}
338
366
339
367
private Void refreshNodes (Iterable <NodeInfo > nodeInfos ) {
368
+ if (!didFirstNodeListRefresh ) {
369
+ resolveContactPoints ();
370
+ }
340
371
MetadataRefresh refresh =
341
372
didFirstNodeListRefresh
342
373
? new FullNodeListRefresh (nodeInfos )
343
- : new InitialNodeListRefresh (nodeInfos , contactPoints );
374
+ : new InitialNodeListRefresh (nodeInfos , resolvedContactPoints );
344
375
didFirstNodeListRefresh = true ;
345
376
return apply (refresh );
346
377
}
0 commit comments