diff --git a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/AppVersionRevision.scala b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/AppVersionRevision.scala index 3ec51d7af..039177636 100644 --- a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/AppVersionRevision.scala +++ b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/AppVersionRevision.scala @@ -34,21 +34,24 @@ final class AppVersionRevision(implicit system: ExtendedActorSystem) extends Ext def getRevision(): Future[Version] = versionPromise.future def start(): Unit = { - if (k8sSettings.podName.isEmpty) { - log.error( - "Not able to read the app version from the revision of the current ReplicaSet. Reason:" + - "No configuration found to extract the pod name from. " + - s"Be sure to provide the pod name with `$configPath.pod-name` " + - "or by setting ENV variable `KUBERNETES_POD_NAME`.") - } else { - if (isInitialized.compareAndSet(false, true)) { + if (isInitialized.compareAndSet(false, true)) { + if (k8sSettings.podName.isEmpty) { + val msg = "Not able to read the app version from the revision of the current ReplicaSet. Reason:" + + "No configuration found to extract the pod name from. " + + s"Be sure to provide the pod name with `$configPath.pod-name` " + + "or by setting ENV variable `KUBERNETES_POD_NAME`." + log.error(msg) + versionPromise.failure(new MissingPodNameException(msg)) + } else { Cluster(system).setAppVersionLater(getRevision()) KubernetesApiImpl(log, k8sSettings).foreach { kubernetesApi => versionPromise.completeWith(kubernetesApi.readRevision().map(Version(_))) } - } else - log.warning("AppVersionRevision extension already initiated, yet start() method was called again. Ignoring.") + } + } else { + log.warning("AppVersionRevision extension already initiated, yet start() method was called again. Ignoring.") } + } // autostart if the extension is loaded through the config extension list diff --git a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApi.scala b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApi.scala index 416173fb6..e2f4a0367 100644 --- a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApi.scala +++ b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApi.scala @@ -49,6 +49,11 @@ private[akka] final case class PodCost(podName: String, cost: Int, address: Stri */ @InternalApi private[akka] sealed class ReadRevisionException(message: String) extends RuntimeException(message) +/** + * INTERNAL API + */ +@InternalApi private[akka] sealed class MissingPodNameException(message: String) extends RuntimeException(message) + /** * INTERNAL API */ diff --git a/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/AppVersionRevisionSpec.scala b/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/AppVersionRevisionSpec.scala index c2856097f..b7b631c9d 100644 --- a/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/AppVersionRevisionSpec.scala +++ b/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/AppVersionRevisionSpec.scala @@ -5,228 +5,40 @@ package akka.rollingupdate.kubernetes import akka.actor.ActorSystem -import akka.testkit.EventFilter -import akka.testkit.ImplicitSender import akka.testkit.TestKit -import com.fasterxml.jackson.databind.ObjectMapper -import com.github.tomakehurst.wiremock.WireMockServer -import com.github.tomakehurst.wiremock.client.WireMock.aResponse -import com.github.tomakehurst.wiremock.client.WireMock.get -import com.github.tomakehurst.wiremock.client.WireMock.stubFor -import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo -import com.github.tomakehurst.wiremock.client.MappingBuilder -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder -import com.github.tomakehurst.wiremock.client.WireMock -import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig -import com.github.tomakehurst.wiremock.matching.EqualToPattern -import com.github.tomakehurst.wiremock.stubbing.Scenario import com.typesafe.config.ConfigFactory -import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers -import org.scalatest.time.Millis -import org.scalatest.time.Seconds -import org.scalatest.time.Span import org.scalatest.wordspec.AnyWordSpecLike -import org.scalatest.BeforeAndAfterAll -import org.scalatest.BeforeAndAfterEach - -import scala.concurrent.duration._ object AppVersionRevisionSpec { val config = ConfigFactory.parseString(""" - akka.loggers = ["akka.testkit.TestEventListener"] akka.actor.provider = cluster akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 - akka.cluster.jmx.multi-mbeans-in-same-jvm = on akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.run-by-actor-system-terminate = off - akka.test.filter-leeway = 10s + akka.rollingupdate.kubernetes.pod-name = "" """) } - class AppVersionRevisionSpec extends TestKit( ActorSystem( "AppVersionRevisionSpec", AppVersionRevisionSpec.config )) - with ImplicitSender with AnyWordSpecLike with Matchers - with BeforeAndAfterAll - with BeforeAndAfterEach - with Eventually with ScalaFutures { - private val wireMockServer = new WireMockServer(wireMockConfig().port(0)) - wireMockServer.start() - WireMock.configureFor(wireMockServer.port()) - - // for wiremock to provide json - val mapper = new ObjectMapper() - - private val namespace = "namespace-test" - private val podName1 = "pod-test-1" - - private def settings(podName: String) = { - new KubernetesSettings( - apiCaPath = "", - apiTokenPath = "", - apiServiceHost = "localhost", - apiServicePort = wireMockServer.port(), - namespace = Some(namespace), - namespacePath = "", - podName = podName, - secure = false, - apiServiceRequestTimeout = 2.seconds, - customResourceSettings = new CustomResourceSettings(enabled = false, crName = None, 60.seconds) - ) - } - - private val kubernetesApi = - new KubernetesApiImpl( - system, - settings(podName1), - namespace, - apiToken = "apiToken", - clientHttpsConnectionContext = None) - - override implicit val patienceConfig: PatienceConfig = - PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis)) - - override protected def afterAll(): Unit = super.shutdown() - - override protected def beforeEach(): Unit = { - wireMockServer.resetAll() - WireMock.resetAllScenarios() - } - - private def podPath(podName: String) = - urlEqualTo(s"/api/v1/namespaces/$namespace/pods/$podName") - - private def replicaPath(replica: String) = - urlEqualTo(s"/apis/apps/v1/namespaces/$namespace/replicasets/$replica") - - private def getPod(podName: String): MappingBuilder = - get(podPath(podName)).withHeader("Content-Type", new EqualToPattern("application/json")) - - private def getReplicaSet(replica: String): MappingBuilder = - get(replicaPath(replica)).withHeader("Content-Type", new EqualToPattern("application/json")) - - private val defaultPodResponseJson = - """{ - | "metadata": { - | "ownerReferences": [ - | {"name": "wrong-replicaset-id", "kind": "SomethingElse"}, - | {"name": "parent-replicaset-id", "kind": "ReplicaSet"} - | ] - | } - |}""".stripMargin - - private val defaultReplicaResponseJson = - """{ - | "metadata": { - | "annotations": { - | "deployment.kubernetes.io/revision": "1" - | } - | } - |}""".stripMargin - - private def stubPodResponse(json: String = defaultPodResponseJson, state: String = Scenario.STARTED) = - stubFor( - getPod(podName1) - .willReturn( - ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json)) - ) - .inScenario("pod") - .whenScenarioStateIs(state)) - - private def stubReplicaResponse(json: String = defaultReplicaResponseJson) = - stubFor( - getReplicaSet("parent-replicaset-id") - .willReturn( - ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json)) - ) - .inScenario("replica") - .whenScenarioStateIs(Scenario.STARTED)) - - "Read revision from Kubernetes" should { - - "parse pod and replica response to get the revision" in { - stubPodResponse() - stubReplicaResponse() - - EventFilter - .info(pattern = "Reading revision from Kubernetes: akka.cluster.app-version was set to 1", occurrences = 1) - .intercept { - kubernetesApi.readRevision().futureValue should be("1") - } - } - - "retry and then fail when pod not found" in { - stubFor(getPod(podName1).willReturn(aResponse().withStatus(404))) - EventFilter - .warning(pattern = ".*Failed to get revision", occurrences = 5) - .intercept({ - assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException]) - }) - } - - "retry and then fail when replicaset not found" in { - stubPodResponse() - stubFor(getReplicaSet("parent-replicaset-id").willReturn(aResponse().withStatus(404))) - EventFilter - .warning(pattern = ".*Failed to get revision", occurrences = 5) - .intercept({ - assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException]) - }) - } - - "log if pod json can not be parsed" in { - stubPodResponse(json = """{ "invalid": "json" }""") - EventFilter - .warning(pattern = ".*Error while parsing Pod*") - .intercept({ - assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException]) - }) - } - - "log if replica json can not be parsed" in { - stubPodResponse() - stubReplicaResponse(json = """{ "invalid": "json" }""") - EventFilter - .warning(pattern = ".*Error while parsing Pod*") - .intercept({ - assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException]) - }) - } - - "break the loop if consecutive request succeeds" in { - stubFor( - getPod(podName1) - .willReturn(aResponse().withStatus(404)) - .inScenario("pod") - .whenScenarioStateIs(Scenario.STARTED) - .willSetStateTo("after first fail") - ) - stubFor( - getPod(podName1) - .willReturn(aResponse().withStatus(404)) - .inScenario("pod") - .whenScenarioStateIs("after first fail") - .willSetStateTo("k8s is happy now") - ) - stubPodResponse(state = "k8s is happy now") - stubReplicaResponse() - EventFilter - .warning(pattern = ".*Try again*", occurrences = 2) - .intercept({ - kubernetesApi.readRevision().futureValue should be("1") - }) + "AppVersionRevision extension" should { + "return failed future if pod-name is not configured" in { + val revisionExtension = AppVersionRevision(system) + revisionExtension.start() + val failure = revisionExtension.getRevision().failed.futureValue + failure.getMessage should include("No configuration found to extract the pod name from") } } } diff --git a/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/KubernetesApiSpec.scala b/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/KubernetesApiSpec.scala new file mode 100644 index 000000000..3cf61e7f0 --- /dev/null +++ b/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/KubernetesApiSpec.scala @@ -0,0 +1,232 @@ +/* + * Copyright (C) 2017-2023 Lightbend Inc. + */ + +package akka.rollingupdate.kubernetes + +import akka.actor.ActorSystem +import akka.testkit.EventFilter +import akka.testkit.ImplicitSender +import akka.testkit.TestKit +import com.fasterxml.jackson.databind.ObjectMapper +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock.aResponse +import com.github.tomakehurst.wiremock.client.WireMock.get +import com.github.tomakehurst.wiremock.client.WireMock.stubFor +import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo +import com.github.tomakehurst.wiremock.client.MappingBuilder +import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder +import com.github.tomakehurst.wiremock.client.WireMock +import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig +import com.github.tomakehurst.wiremock.matching.EqualToPattern +import com.github.tomakehurst.wiremock.stubbing.Scenario +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.Millis +import org.scalatest.time.Seconds +import org.scalatest.time.Span +import org.scalatest.wordspec.AnyWordSpecLike +import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach + +import scala.concurrent.duration._ + +object KubernetesApiSpec { + val config = ConfigFactory.parseString(""" + akka.loggers = ["akka.testkit.TestEventListener"] + akka.actor.provider = cluster + + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1 + + akka.cluster.jmx.multi-mbeans-in-same-jvm = on + akka.coordinated-shutdown.terminate-actor-system = off + akka.coordinated-shutdown.run-by-actor-system-terminate = off + akka.test.filter-leeway = 10s + """) +} + +class KubernetesApiSpec + extends TestKit( + ActorSystem( + "AppVersionRevisionSpec", + KubernetesApiSpec.config + )) + with ImplicitSender + with AnyWordSpecLike + with Matchers + with BeforeAndAfterAll + with BeforeAndAfterEach + with Eventually + with ScalaFutures { + + private val wireMockServer = new WireMockServer(wireMockConfig().port(0)) + wireMockServer.start() + WireMock.configureFor(wireMockServer.port()) + + // for wiremock to provide json + val mapper = new ObjectMapper() + + private val namespace = "namespace-test" + private val podName1 = "pod-test-1" + + private def settings(podName: String) = { + new KubernetesSettings( + apiCaPath = "", + apiTokenPath = "", + apiServiceHost = "localhost", + apiServicePort = wireMockServer.port(), + namespace = Some(namespace), + namespacePath = "", + podName = podName, + secure = false, + apiServiceRequestTimeout = 2.seconds, + customResourceSettings = new CustomResourceSettings(enabled = false, crName = None, 60.seconds) + ) + } + + private val kubernetesApi = + new KubernetesApiImpl( + system, + settings(podName1), + namespace, + apiToken = "apiToken", + clientHttpsConnectionContext = None) + + override implicit val patienceConfig: PatienceConfig = + PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis)) + + override protected def afterAll(): Unit = super.shutdown() + + override protected def beforeEach(): Unit = { + wireMockServer.resetAll() + WireMock.resetAllScenarios() + } + + private def podPath(podName: String) = + urlEqualTo(s"/api/v1/namespaces/$namespace/pods/$podName") + + private def replicaPath(replica: String) = + urlEqualTo(s"/apis/apps/v1/namespaces/$namespace/replicasets/$replica") + + private def getPod(podName: String): MappingBuilder = + get(podPath(podName)).withHeader("Content-Type", new EqualToPattern("application/json")) + + private def getReplicaSet(replica: String): MappingBuilder = + get(replicaPath(replica)).withHeader("Content-Type", new EqualToPattern("application/json")) + + private val defaultPodResponseJson = + """{ + | "metadata": { + | "ownerReferences": [ + | {"name": "wrong-replicaset-id", "kind": "SomethingElse"}, + | {"name": "parent-replicaset-id", "kind": "ReplicaSet"} + | ] + | } + |}""".stripMargin + + private val defaultReplicaResponseJson = + """{ + | "metadata": { + | "annotations": { + | "deployment.kubernetes.io/revision": "1" + | } + | } + |}""".stripMargin + + private def stubPodResponse(json: String = defaultPodResponseJson, state: String = Scenario.STARTED) = + stubFor( + getPod(podName1) + .willReturn( + ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json)) + ) + .inScenario("pod") + .whenScenarioStateIs(state)) + + private def stubReplicaResponse(json: String = defaultReplicaResponseJson) = + stubFor( + getReplicaSet("parent-replicaset-id") + .willReturn( + ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json)) + ) + .inScenario("replica") + .whenScenarioStateIs(Scenario.STARTED)) + + "Read revision from Kubernetes" should { + + "parse pod and replica response to get the revision" in { + stubPodResponse() + stubReplicaResponse() + + EventFilter + .info(pattern = "Reading revision from Kubernetes: akka.cluster.app-version was set to 1", occurrences = 1) + .intercept { + kubernetesApi.readRevision().futureValue should be("1") + } + } + + "retry and then fail when pod not found" in { + stubFor(getPod(podName1).willReturn(aResponse().withStatus(404))) + EventFilter + .warning(pattern = ".*Failed to get revision", occurrences = 5) + .intercept({ + assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException]) + }) + } + + "retry and then fail when replicaset not found" in { + stubPodResponse() + stubFor(getReplicaSet("parent-replicaset-id").willReturn(aResponse().withStatus(404))) + EventFilter + .warning(pattern = ".*Failed to get revision", occurrences = 5) + .intercept({ + assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException]) + }) + } + + "log if pod json can not be parsed" in { + stubPodResponse(json = """{ "invalid": "json" }""") + EventFilter + .warning(pattern = ".*Error while parsing Pod*") + .intercept({ + assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException]) + }) + } + + "log if replica json can not be parsed" in { + stubPodResponse() + stubReplicaResponse(json = """{ "invalid": "json" }""") + EventFilter + .warning(pattern = ".*Error while parsing Pod*") + .intercept({ + assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException]) + }) + } + + "break the loop if consecutive request succeeds" in { + stubFor( + getPod(podName1) + .willReturn(aResponse().withStatus(404)) + .inScenario("pod") + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo("after first fail") + ) + stubFor( + getPod(podName1) + .willReturn(aResponse().withStatus(404)) + .inScenario("pod") + .whenScenarioStateIs("after first fail") + .willSetStateTo("k8s is happy now") + ) + stubPodResponse(state = "k8s is happy now") + stubReplicaResponse() + EventFilter + .warning(pattern = ".*Try again*", occurrences = 2) + .intercept({ + kubernetesApi.readRevision().futureValue should be("1") + }) + } + } +}