Skip to content

Commit e658204

Browse files
authored
By default assume akka-management.http.port for bootstrap (#503)
Refs #464 Instead of discovering by port name 'management', avoid a 'named' port lookup. This arguably makes discovery on Kubernetes a bit easier, since the akka-management port does not need to be named any specific way in the deployment .yml. On the other hand, it makes the actual bootstrap/discovery process a bit more complicated, and for our 'config discovery' example we'd even have to add the 'port name' feature to that integration test to be able to make it work at all.
1 parent 49bbcad commit e658204

File tree

11 files changed

+123
-24
lines changed

11 files changed

+123
-24
lines changed

Diff for: cluster-bootstrap/src/main/resources/reference.conf

+5-3
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ akka.management {
3131
service-name = ${?AKKA_CLUSTER_BOOTSTRAP_SERVICE_NAME}
3232

3333
# The portName passed to discovery. This should be set to the name of the port for Akka Management
34-
# If set to "" None is passed
35-
port-name = "management"
34+
# If set to "" None is passed and ${akka.management.http.port} is assumed.
35+
port-name = ""
3636

3737
# The protocol passed to discovery.
3838
# If set to "" None is passed.
@@ -100,7 +100,9 @@ akka.management {
100100
contact-point {
101101

102102
# If no port is discovered along with the host/ip of a contact point this port will be used as fallback
103-
fallback-port = 8558 # port pun, it "complements" 2552 which is often used for Akka remoting
103+
# Also, when no port-name is used and multiple results are returned for a given service, this port is
104+
# used to disambiguate. When set to <fallback-port>, defaults to the value of akka.management.http.port
105+
fallback-port = "<fallback-port>" # port pun, it "complements" 2552 which is often used for Akka remoting
104106

105107
# If some discovered seed node will keep failing to connect for specified period of time,
106108
# it will initiate rediscovery again instead of keep trying.

Diff for: cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettings.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,11 @@ final class ClusterBootstrapSettings(config: Config, log: LoggingAdapter) {
122122
object contactPoint {
123123
private val contactPointConfig = bootConfig.getConfig("contact-point")
124124

125-
// FIXME this has to be the same as the management one, we currently override this value when starting management, any better way?
126-
val fallbackPort: Int = contactPointConfig.getInt("fallback-port")
125+
val fallbackPort: Int =
126+
contactPointConfig
127+
.optDefinedValue("fallback-port")
128+
.map(_.toInt)
129+
.getOrElse(config.getInt("akka.management.http.port"))
127130

128131
val probingFailureTimeout: FiniteDuration =
129132
contactPointConfig.getDuration("probing-failure-timeout", TimeUnit.MILLISECONDS).millis

Diff for: cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/internal/BootstrapCoordinator.scala

+19-7
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ private[akka] object BootstrapCoordinator {
102102
// also known as the "Baron von Bootstrappen"
103103
/** INTERNAL API */
104104
@InternalApi
105-
private[akka] final class BootstrapCoordinator(discovery: ServiceDiscovery,
106-
joinDecider: JoinDecider,
107-
settings: ClusterBootstrapSettings)
105+
private[akka] class BootstrapCoordinator(discovery: ServiceDiscovery,
106+
joinDecider: JoinDecider,
107+
settings: ClusterBootstrapSettings)
108108
extends Actor
109109
with ActorLogging
110110
with Timers {
@@ -174,8 +174,20 @@ private[akka] final class BootstrapCoordinator(discovery: ServiceDiscovery,
174174
discoverContactPoints()
175175

176176
case ServiceDiscovery.Resolved(_, contactPoints)
177-
log.info("Located service members based on: [{}]: [{}]", lookup, contactPoints.mkString(", "))
178-
onContactPointsResolved(contactPoints)
177+
val filteredContactPoints: Iterable[ResolvedTarget] =
178+
if (lookup.portName.isDefined)
179+
contactPoints
180+
else
181+
contactPoints.groupBy(_.host).flatMap {
182+
case (host, immutable.Seq(singleResult)) =>
183+
immutable.Seq(singleResult)
184+
case (host, multipleResults) =>
185+
multipleResults.filter(_.port.contains(settings.contactPoint.fallbackPort))
186+
}
187+
188+
log.info("Located service members based on: [{}]: [{}], filtered to [{}]", lookup, contactPoints.mkString(", "),
189+
filteredContactPoints.mkString(", "))
190+
onContactPointsResolved(filteredContactPoints)
179191
resetDiscoveryInterval() // in case we were backed-off, we reset back to healthy intervals
180192
startSingleDiscoveryTimer() // keep looking in case other nodes join the discovery
181193

@@ -253,7 +265,7 @@ private[akka] final class BootstrapCoordinator(discovery: ServiceDiscovery,
253265
discovery.lookup(lookup, settings.contactPointDiscovery.resolveTimeout).pipeTo(self)
254266
}
255267

256-
private def onContactPointsResolved(contactPoints: immutable.Seq[ResolvedTarget]): Unit = {
268+
private def onContactPointsResolved(contactPoints: Iterable[ResolvedTarget]): Unit = {
257269
val newObservation = ServiceContactsObservation(timeNow(), contactPoints.toSet)
258270
lastContactsObservation match {
259271
case Some(contacts) => lastContactsObservation = Some(contacts.sameOrChanged(newObservation))
@@ -270,7 +282,7 @@ private[akka] final class BootstrapCoordinator(discovery: ServiceDiscovery,
270282
newObservation.observedContactPoints.foreach(ensureProbing)
271283
}
272284

273-
private def ensureProbing(contactPoint: ResolvedTarget): Option[ActorRef] = {
285+
private[internal] def ensureProbing(contactPoint: ResolvedTarget): Option[ActorRef] = {
274286
val targetPort = contactPoint.port.getOrElse(settings.contactPoint.fallbackPort)
275287
val rawBaseUri = Uri("http", Uri.Authority(Uri.Host(contactPoint.host), targetPort))
276288
val baseUri = settings.managementBasePath.fold(rawBaseUri)(prefix => rawBaseUri.withPath(Uri.Path(s"/$prefix")))

Diff for: cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapBasePathIntegrationSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class ClusterBootstrapBasePathIntegrationSpec extends WordSpecLike with Matchers
7070

7171
// prepare the "mock DNS"
7272
val name = "basepathsystem.svc.cluster.local"
73-
MockDiscovery.set(Lookup(name).withProtocol("tcp").withPortName("management"),
73+
MockDiscovery.set(Lookup(name).withProtocol("tcp"),
7474
() =>
7575
Future.successful(
7676
Resolved(name,

Diff for: cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapDiscoveryBackoffIntegrationSpec.scala

+2
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ class ClusterBootstrapDiscoveryBackoffIntegrationSpec
7070
stable-margin = 4 seconds
7171

7272
interval = 500 ms
73+
74+
port-name = "management"
7375
}
7476
}
7577
}

Diff for: cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/contactpoint/ClusterBootstrapRetryUnreachableContactPointIntegrationSpec.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class ClusterBootstrapRetryUnreachableContactPointIntegrationSpec extends WordSp
6262
service-namespace = "svc.cluster.local"
6363

6464
stable-margin = 4 seconds
65+
66+
port-name = "management"
6567
}
6668
}
6769
}
@@ -86,7 +88,7 @@ class ClusterBootstrapRetryUnreachableContactPointIntegrationSpec extends WordSp
8688

8789
val name = "systemunreachablenodes.svc.cluster.local"
8890

89-
MockDiscovery.set(Lookup(name).withPortName("management").withProtocol("tcp"), { () =>
91+
MockDiscovery.set(Lookup(name).withProtocol("tcp").withPortName("management"), { () =>
9092
called += 1
9193

9294
Future.successful(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com>
3+
*/
4+
5+
package akka.management.cluster.bootstrap.internal
6+
7+
import java.util.concurrent.atomic.AtomicReference
8+
9+
import akka.actor.{ ActorRef, ActorSystem, Props }
10+
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
11+
import akka.discovery.{ Lookup, MockDiscovery }
12+
import akka.management.cluster.bootstrap.internal.BootstrapCoordinator.Protocol.InitiateBootstrapping
13+
import akka.management.cluster.bootstrap.{ ClusterBootstrapSettings, LowestAddressJoinDecider }
14+
import com.typesafe.config.ConfigFactory
15+
import org.scalatest.concurrent.Eventually
16+
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
17+
18+
import scala.concurrent.{ Await, Future }
19+
import scala.concurrent.duration._
20+
21+
class BootstrapCoordinatorSpec extends WordSpec with Matchers with BeforeAndAfterAll with Eventually {
22+
val serviceName = "bootstrap-coordinator-test-service"
23+
val system = ActorSystem("test", ConfigFactory.parseString(s"""
24+
|akka.management.cluster.bootstrap {
25+
| contact-point-discovery.service-name = $serviceName
26+
|}
27+
""".stripMargin).withFallback(ConfigFactory.load()))
28+
val settings = ClusterBootstrapSettings(system.settings.config, system.log)
29+
val joinDecider = new LowestAddressJoinDecider(system, settings)
30+
31+
val discovery = new MockDiscovery(system)
32+
33+
MockDiscovery.set(
34+
Lookup(serviceName, portName = None, protocol = Some("tcp")),
35+
() =>
36+
Future.successful(Resolved(serviceName,
37+
List(
38+
ResolvedTarget("host1", Some(2552), None),
39+
ResolvedTarget("host1", Some(8558), None),
40+
ResolvedTarget("host2", Some(2552), None),
41+
ResolvedTarget("host2", Some(8558), None)
42+
)))
43+
)
44+
45+
"The bootstrap coordinator, when avoiding named port lookups" should {
46+
47+
"probe only on the Akka Management port" in {
48+
val targets = new AtomicReference[List[ResolvedTarget]](Nil)
49+
val coordinator = system.actorOf(Props(new BootstrapCoordinator(discovery, joinDecider, settings) {
50+
override def ensureProbing(contactPoint: ResolvedTarget): Option[ActorRef] = {
51+
println(s"Resolving $contactPoint")
52+
val targetsSoFar = targets.get
53+
targets.compareAndSet(targetsSoFar, contactPoint +: targetsSoFar)
54+
None
55+
}
56+
}))
57+
coordinator ! InitiateBootstrapping
58+
eventually {
59+
val targetsToCheck = targets.get
60+
targetsToCheck.length should be >= (2)
61+
targetsToCheck.map(_.host) should contain("host1")
62+
targetsToCheck.map(_.host) should contain("host2")
63+
targetsToCheck.flatMap(_.port).toSet should be(Set(8558))
64+
}
65+
}
66+
}
67+
68+
override def afterAll(): Unit = {
69+
Await.result(system.terminate(), 10.seconds)
70+
super.afterAll()
71+
}
72+
}

Diff for: integration-test/kubernetes-api-java/kubernetes/akka-cluster.yml

+5-5
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ spec:
3333
- name: remoting
3434
containerPort: 2552
3535
protocol: TCP
36-
# akka-management bootstrap
37-
# must match up with contact-point-discovery.port-name for bootstrap
38-
- name: management
39-
containerPort: 8558
36+
# akka-management
37+
- containerPort: 8558
4038
protocol: TCP
39+
# when contact-point-discovery.port-name is set for cluster bootstrap,
40+
# the management port must be named accordingly:
41+
# name: management
4142
env:
4243
# The Kubernetes API discovery will use this service name to look for
4344
# nodes with this value in the 'app' label
@@ -47,7 +48,6 @@ spec:
4748
fieldRef:
4849
apiVersion: v1
4950
fieldPath: "metadata.labels['app']"
50-
5151
---
5252
kind: Role
5353
apiVersion: rbac.authorization.k8s.io/v1

Diff for: integration-test/kubernetes-api/kubernetes/akka-cluster.yml

+4-3
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ spec:
4343
containerPort: 2552
4444
protocol: TCP
4545
# akka-management bootstrap
46-
# must match up with contact-point-discovery.port-name for bootstrap
47-
- name: management
48-
containerPort: 8558
46+
- containerPort: 8558
4947
protocol: TCP
48+
# when contact-point-discovery.port-name is set for cluster bootstrap,
49+
# the management port must be named accordingly:
50+
# name: management
5051
env:
5152
# The Kubernetes API discovery will use this service name to look for
5253
# nodes with this value in the 'app' label.

Diff for: integration-test/kubernetes-dns/kubernetes/akka-cluster.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ spec:
4040
#health
4141
ports:
4242
- containerPort: 8558
43-
name: management
43+
# when contact-point-discovery.port-name is set for cluster bootstrap,
44+
# the management port must be named accordingly:
45+
# name: management
4446
- containerPort: 2552
4547
name: remoting
4648
env:

Diff for: integration-test/local/src/test/scala/akka/management/LocalBootstrapTest.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ object LocalBootstrapTest {
3030
port = 0
3131
}
3232
}
33-
akka.management.http.hostname = "localhost"
33+
akka.management {
34+
http.hostname = "localhost"
35+
cluster.bootstrap.contact-point-discovery.port-name = "management"
36+
}
3437
akka.discovery {
3538
config.services = {
3639
local-cluster = {

0 commit comments

Comments
 (0)