Skip to content

Commit a489147

Browse files
authored
Aggregate and config discovery (#218)
* Config based discovery * Aggregate service discovery
1 parent 7747711 commit a489147

File tree

13 files changed

+574
-33
lines changed

13 files changed

+574
-33
lines changed

Diff for: build.sbt

+23
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,29 @@ lazy val `akka-discovery-dns` = project
4848
)
4949
.dependsOn(`akka-discovery`)
5050

51+
lazy val `akka-discovery-config` = project
52+
.in(file("discovery-config"))
53+
.enablePlugins(AutomateHeaderPlugin)
54+
.settings(unidocSettings)
55+
.settings(
56+
name := "akka-discovery-config",
57+
organization := "com.lightbend.akka.discovery",
58+
Dependencies.DiscoveryConfig
59+
)
60+
.dependsOn(`akka-discovery`)
61+
62+
lazy val `akka-discovery-aggregrate` = project
63+
.in(file("discovery-aggregate"))
64+
.enablePlugins(AutomateHeaderPlugin)
65+
.settings(unidocSettings)
66+
.settings(
67+
name := "akka-discovery-aggregate",
68+
organization := "com.lightbend.akka.discovery",
69+
Dependencies.DiscoveryAggregate
70+
)
71+
.dependsOn(`akka-discovery`)
72+
.dependsOn(`akka-discovery-config` % "test")
73+
5174
// K8s API implementation of discovery, allows formation to work even when readiness/health checks are failing
5275
lazy val `akka-discovery-kubernetes-api` = project
5376
.in(file("discovery-kubernetes-api"))
+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
akka {
2+
discovery {
3+
aggregate {
4+
class = akka.discovery.aggregate.AggregateSimpleServiceDiscovery
5+
6+
# List of service discovery mechanisms to try in order. E.g DNS then fall back to config
7+
# ["akka-dns" , "akka-confg" ]
8+
discovery-mechanisms = []
9+
10+
}
11+
}
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
3+
*/
4+
package akka.discovery.aggregate
5+
6+
import akka.actor.ExtendedActorSystem
7+
import akka.discovery.SimpleServiceDiscovery.Resolved
8+
import akka.discovery.aggregate.AggregateSimpleServiceDiscovery.Mechanisms
9+
import akka.discovery.{ ServiceDiscovery, SimpleServiceDiscovery }
10+
import akka.event.Logging
11+
import akka.util.Helpers.Requiring
12+
import com.typesafe.config.Config
13+
14+
import scala.collection.JavaConverters._
15+
import scala.concurrent.Future
16+
import scala.concurrent.duration.FiniteDuration
17+
18+
final class AggregateSimpleServiceDiscoverySettings(config: Config) {
19+
20+
val discoveryMechanisms = config
21+
.getStringList("discovery-mechanisms")
22+
.asScala
23+
.toList
24+
.requiring(_.nonEmpty, "At least one discovery mechanism should be specified")
25+
26+
}
27+
28+
object AggregateSimpleServiceDiscovery {
29+
type Mechanisms = List[(String, SimpleServiceDiscovery)]
30+
}
31+
32+
final class AggregateSimpleServiceDiscovery(system: ExtendedActorSystem) extends SimpleServiceDiscovery {
33+
34+
private val log = Logging(system, getClass)
35+
36+
private val settings =
37+
new AggregateSimpleServiceDiscoverySettings(system.settings.config.getConfig("akka.discovery.aggregate"))
38+
39+
private val mechanisms =
40+
settings.discoveryMechanisms.map(mech => (mech, ServiceDiscovery.loadServiceDiscovery(mech, system)))
41+
private implicit val ec = system.dispatcher
42+
43+
/**
44+
* Each discovery mechanism is given the resolveTimeout rather than reducing it each time between mechanisms.
45+
*/
46+
override def lookup(name: String, resolveTimeout: FiniteDuration): Future[SimpleServiceDiscovery.Resolved] = {
47+
resolve(mechanisms, name, resolveTimeout)
48+
}
49+
50+
private def resolve(sds: Mechanisms, name: String, resolveTimeout: FiniteDuration): Future[Resolved] = {
51+
sds match {
52+
case (mechanism, next) :: Nil =>
53+
log.debug("Looking up [{}] with [{}]", name, mechanism)
54+
next.lookup(name, resolveTimeout)
55+
case (mechanism, next) :: tail =>
56+
log.debug("Looking up [{}] with [{}]", name, mechanism)
57+
// If nothing comes back then try the next one
58+
next
59+
.lookup(name, resolveTimeout)
60+
.flatMap { resolved =>
61+
if (resolved.addresses.isEmpty) {
62+
log.debug("Mechanism [{}] returned no ResolvedTargets, trying next", name)
63+
resolve(tail, name, resolveTimeout)
64+
} else
65+
Future.successful(resolved)
66+
}
67+
.recoverWith {
68+
case t: Throwable =>
69+
log.error(t, "[{}] Service discovery failed. Trying next discovery mechanism", mechanism)
70+
resolve(tail, name, resolveTimeout)
71+
}
72+
}
73+
}
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
3+
*/
4+
package akka.discovery.aggregate
5+
6+
import akka.actor.{ ActorSystem, ExtendedActorSystem }
7+
import akka.discovery.SimpleServiceDiscovery.{ Resolved, ResolvedTarget }
8+
import akka.discovery.{ ServiceDiscovery, SimpleServiceDiscovery }
9+
import akka.event.Logging
10+
import akka.testkit.TestKit
11+
import com.typesafe.config.{ Config, ConfigFactory }
12+
import org.scalatest.concurrent.ScalaFutures
13+
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
14+
15+
import scala.concurrent.Future
16+
import scala.concurrent.duration._
17+
import scala.collection.immutable
18+
19+
class StubbedSimpleServiceDiscovery(system: ExtendedActorSystem) extends SimpleServiceDiscovery {
20+
21+
override def lookup(name: String, resolveTimeout: FiniteDuration): Future[SimpleServiceDiscovery.Resolved] = {
22+
if (name == "stubbed") {
23+
Future.successful(Resolved(name, immutable.Seq(ResolvedTarget("stubbed1", Some(1234)))))
24+
} else if (name == "fail") {
25+
Future.failed(new RuntimeException("No resolving for you!"))
26+
} else {
27+
Future.successful(Resolved(name, immutable.Seq.empty))
28+
}
29+
}
30+
}
31+
32+
object AggregateSimpleServiceDiscoverySpec {
33+
val config: Config = ConfigFactory.parseString("""
34+
akka {
35+
loglevel = DEBUG
36+
discovery {
37+
method = aggregate
38+
39+
aggregate {
40+
discovery-mechanisms = ["stubbed1", "config"]
41+
42+
}
43+
}
44+
}
45+
46+
akka.discovery.stubbed1 {
47+
class = akka.discovery.aggregate.StubbedSimpleServiceDiscovery
48+
}
49+
50+
akka.discovery.config.services = {
51+
config1 = {
52+
endpoints = [
53+
{
54+
host = "cat"
55+
port = 1233
56+
},
57+
{
58+
host = "dog"
59+
port = 1234
60+
}
61+
]
62+
},
63+
fail = {
64+
endpoints = [
65+
{
66+
host = "from-config"
67+
}
68+
]
69+
}
70+
}
71+
""")
72+
}
73+
74+
class AggregateSimpleServiceDiscoverySpec
75+
extends TestKit(ActorSystem("AggregateSimpleDiscoverySpec", AggregateSimpleServiceDiscoverySpec.config))
76+
with WordSpecLike
77+
with Matchers
78+
with BeforeAndAfterAll
79+
with ScalaFutures {
80+
81+
override protected def afterAll(): Unit = {
82+
TestKit.shutdownActorSystem(system)
83+
}
84+
85+
val discovery: SimpleServiceDiscovery = ServiceDiscovery(system).discovery
86+
87+
"Aggregate service discovery" must {
88+
89+
"only call first one if returns results" in {
90+
val results = discovery.lookup("stubbed", 100.millis).futureValue
91+
results shouldEqual Resolved("stubbed", immutable.Seq(ResolvedTarget("stubbed1", Some(1234))))
92+
}
93+
94+
"move onto the next if no resolved targets" in {
95+
val results = discovery.lookup("config1", 100.millis).futureValue
96+
results shouldEqual Resolved("config1",
97+
immutable.Seq(ResolvedTarget("cat", Some(1233)), ResolvedTarget("dog", Some(1234))))
98+
}
99+
100+
"move onto next if fails" in {
101+
val results = discovery.lookup("fail", 100.millis).futureValue
102+
// Stub fails then result comes from config
103+
results shouldEqual Resolved("fail", immutable.Seq(ResolvedTarget("from-config", None)))
104+
}
105+
}
106+
107+
}

Diff for: discovery-config/src/main/resources/reference.conf

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
akka {
2+
discovery {
3+
config {
4+
class = akka.discovery.config.ConfigSimpleServiceDiscovery
5+
6+
# Location of the services
7+
services-path = "akka.discovery.config.services"
8+
9+
# A map of services to resolve from configuration.
10+
# See docs for more examples.
11+
# A list of endpoints with host/port where port is optional e.g.
12+
# services {
13+
# service1 {
14+
# endpoints = [
15+
# {
16+
# host = "cat.com"
17+
# port = 1233
18+
# },
19+
# {
20+
# host = "dog.com"
21+
# }
22+
# ]
23+
# },
24+
# service2 {
25+
# endpoints = [
26+
# {
27+
# host = "fish.com"
28+
# port = 1233
29+
# }
30+
# ]
31+
# }
32+
# }
33+
services = {
34+
35+
}
36+
}
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
3+
*/
4+
package akka.discovery.config
5+
6+
import akka.actor.ExtendedActorSystem
7+
import akka.discovery.SimpleServiceDiscovery
8+
import akka.discovery.SimpleServiceDiscovery.{ Resolved, ResolvedTarget }
9+
import akka.event.Logging
10+
import com.typesafe.config.Config
11+
12+
import scala.collection.JavaConverters._
13+
import scala.collection.{ breakOut, immutable }
14+
import scala.concurrent.Future
15+
import scala.concurrent.duration.FiniteDuration
16+
17+
object ConfigServicesParser {
18+
def parse(config: Config): Map[String, Resolved] = {
19+
val byService = config
20+
.root()
21+
.entrySet()
22+
.asScala
23+
.map { en =>
24+
(en.getKey, config.getConfig(en.getKey))
25+
}
26+
.toMap
27+
28+
byService.map {
29+
case (serviceName, full) =>
30+
val endpoints = full.getConfigList("endpoints").asScala
31+
val resolvedTargets: immutable.Seq[ResolvedTarget] = endpoints.map { c =>
32+
val host = c.getString("host")
33+
val port = if (c.hasPath("port")) Some(c.getInt("port")) else None
34+
ResolvedTarget(host, port)
35+
}(breakOut)
36+
(serviceName, Resolved(serviceName, resolvedTargets))
37+
}
38+
}
39+
}
40+
41+
class ConfigSimpleServiceDiscovery(system: ExtendedActorSystem) extends SimpleServiceDiscovery {
42+
43+
private val log = Logging(system, getClass)
44+
45+
private val resolvedServices = ConfigServicesParser.parse(
46+
system.settings.config.getConfig(system.settings.config.getString("akka.discovery.config.services-path"))
47+
)
48+
49+
log.debug("Config discovery serving: {}", resolvedServices)
50+
51+
override def lookup(name: String, resolveTimeout: FiniteDuration): Future[SimpleServiceDiscovery.Resolved] = {
52+
// TODO or fail or change the Resolved type to an ADT?
53+
Future.successful(resolvedServices.getOrElse(name, Resolved(name, immutable.Seq.empty)))
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
3+
*/
4+
package akka.discovery.config
5+
6+
import akka.discovery.SimpleServiceDiscovery.{ Resolved, ResolvedTarget }
7+
import akka.discovery.config.ConfigServicesParserSpec._
8+
import com.typesafe.config.{ Config, ConfigFactory }
9+
import org.scalatest.{ Matchers, WordSpec }
10+
11+
import scala.collection.immutable
12+
13+
object ConfigServicesParserSpec {
14+
val exampleConfig: Config = ConfigFactory.parseString("""
15+
services {
16+
service1 {
17+
endpoints = [
18+
{
19+
host = "cat"
20+
port = 1233
21+
},
22+
{
23+
host = "dog"
24+
}
25+
]
26+
},
27+
service2 {
28+
endpoints = []
29+
}
30+
}
31+
""".stripMargin)
32+
}
33+
34+
class ConfigServicesParserSpec extends WordSpec with Matchers {
35+
36+
"Config parsing" must {
37+
"parse" in {
38+
val config = exampleConfig.getConfig("services")
39+
40+
val result = ConfigServicesParser.parse(config)
41+
42+
result("service1") shouldEqual Resolved("service1",
43+
immutable.Seq(ResolvedTarget("cat", Some(1233)), ResolvedTarget("dog", None)))
44+
result("service2") shouldEqual Resolved("service2", immutable.Seq())
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)