From c6fd47f30495947b5e8e1627f08971b4191fe964 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 2 Jan 2023 15:19:34 +0200 Subject: [PATCH] Use kubernetes-client to fetch EndpointSlices for the known seeds if Kubernetes version is 1.21 or newer --- management-api-agent-4.1.x/pom.xml | 5 ++ .../cassandra/locator/K8SeedProvider41x.java | 74 ++++++++++++------- pom.xml | 1 + 3 files changed, 52 insertions(+), 28 deletions(-) diff --git a/management-api-agent-4.1.x/pom.xml b/management-api-agent-4.1.x/pom.xml index fa4b414e..ed29564a 100644 --- a/management-api-agent-4.1.x/pom.xml +++ b/management-api-agent-4.1.x/pom.xml @@ -36,6 +36,11 @@ byte-buddy-agent ${bytebuddy.version} + + io.kubernetes + client-java + ${kubernetes-client.version} + junit junit diff --git a/management-api-agent-4.1.x/src/main/java/org/apache/cassandra/locator/K8SeedProvider41x.java b/management-api-agent-4.1.x/src/main/java/org/apache/cassandra/locator/K8SeedProvider41x.java index d72d633f..e2b11f0d 100644 --- a/management-api-agent-4.1.x/src/main/java/org/apache/cassandra/locator/K8SeedProvider41x.java +++ b/management-api-agent-4.1.x/src/main/java/org/apache/cassandra/locator/K8SeedProvider41x.java @@ -13,6 +13,15 @@ import java.util.List; import java.util.stream.Collectors; +import com.datastax.mgmtapi.ShimLoader; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.Configuration; +import io.kubernetes.client.openapi.apis.DiscoveryV1Api; +import io.kubernetes.client.openapi.models.V1Endpoint; +import io.kubernetes.client.openapi.models.V1EndpointConditions; +import io.kubernetes.client.openapi.models.V1EndpointSlice; +import io.kubernetes.client.util.Namespaces; +import io.kubernetes.client.util.version.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,41 +32,50 @@ public class K8SeedProvider41x implements SeedProvider { private static final Logger logger = LoggerFactory.getLogger(K8SeedProvider41x.class); + private static final int MINIMUM_ENDPOINTSLICE_VERSION = 21; + public K8SeedProvider41x() { } public List getSeeds() { - Config conf; - try - { - conf = DatabaseDescriptor.loadConfig(); - } - catch (Exception e) - { - throw new AssertionError(e); - } - String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1); - List seeds = new ArrayList<>(hosts.length); - for (String host : hosts) - { - try - { - // A name may resolve to multiple seed node IPs, as would be - // the case in Kubernetes when a headless service is used to - // represent the seed nodes in a cluster, which is why we use - // `getAllByName` here instead of `getByName`. - seeds.addAll(Arrays.asList(InetAddress.getAllByName(host.trim())) - .stream() - .map(n -> InetAddressAndPort.getByAddress(n)) - .collect(Collectors.toList())); + try { + org.apache.cassandra.config.Config conf = DatabaseDescriptor.loadConfig(); + ApiClient client = io.kubernetes.client.util.Config.defaultClient(); + Version version = new Version(client); + int kubernetesVersion = Integer.parseInt(version.getVersion().getMinor()); + if(kubernetesVersion < MINIMUM_ENDPOINTSLICE_VERSION) { + logger.info("Kubernetes server version is too old, using legacy method to get the seeds"); + return ShimLoader.instance.get().getK8SeedProvider().getSeeds(); } - catch (UnknownHostException ex) - { - // not fatal... DD will bark if there end up being zero seeds. - logger.warn("Seed provider couldn't lookup host {}", host); + + Configuration.setDefaultApiClient(client); + + String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1); + DiscoveryV1Api discoveryApi = new DiscoveryV1Api(client); + + List seeds = new ArrayList<>(); + for (String host : hosts) { + V1EndpointSlice v1EndpointSlice = discoveryApi.readNamespacedEndpointSlice(host, Namespaces.getPodNamespace(), null); + for (V1Endpoint endpoint : v1EndpointSlice.getEndpoints()) { + V1EndpointConditions conditions = endpoint.getConditions(); + if (Boolean.FALSE.equals(conditions.getReady())) { + continue; + } + for (String address : endpoint.getAddresses()) { + try { + InetAddressAndPort inet = InetAddressAndPort.getByName(address); + seeds.add(inet); + } catch (UnknownHostException e) { + // This address simply isn't added + } + } + } } + + return Collections.unmodifiableList(seeds); + } catch (Exception e) { + throw new AssertionError(e); } - return Collections.unmodifiableList(seeds); } } diff --git a/pom.xml b/pom.xml index 011d0887..8af65f11 100644 --- a/pom.xml +++ b/pom.xml @@ -16,6 +16,7 @@ 4.0.7 3.2.13 4.13.2 + 15.0.1 1.10.10 build_version.sh 1.7.25