Skip to content

Commit

Permalink
feat: Detect bridge ICE failure based on restart requests from endpoi…
Browse files Browse the repository at this point in the history
…nts (#1210)

* ref: Simplify code, order alphabetically.
* feat: Keep track of number of endpoints on each Bridge.
* feat: Add per-bridge metrics for endpoint and restart requests.
* ref: Cleanup comment formatting.
* ref: Rename Bridge.stress to correctedStress.
* feat: Penalize stress when a bridge enters the failing ICE state
* feat: Add metric for failing ICE.
  • Loading branch information
bgrozev authored Feb 21, 2025
1 parent 9369e97 commit b62948d
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 104 deletions.
191 changes: 116 additions & 75 deletions jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/Bridge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import org.jxmpp.jid.Jid
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.math.max
import org.jitsi.jicofo.bridge.BridgeConfig.Companion.config as config

/**
Expand All @@ -48,36 +51,34 @@ class Bridge @JvmOverloads internal constructor(
private val clock: Clock = Clock.systemUTC()
) : Comparable<Bridge> {

/**
* Keep track of the recently added endpoints.
*/
/** Keep track of the recently added endpoints. */
private val newEndpointsRate = RateTracker(
config.participantRampupInterval,
Duration.ofMillis(100),
clock
)

/**
* The last report stress level
*/
private val endpointRestartRequestRate = RateTracker(
config.iceFailureDetection.interval,
Duration.ofSeconds(1),
clock
)

/** Number of endpoints currently allocated on this bridge by this jicofo instance. */
val endpoints = AtomicInteger(0)

/** The last report stress level */
var lastReportedStressLevel = 0.0
private set

/**
* Holds bridge version (if known - not all bridge version are capable of
* reporting it).
*/
/** Holds bridge version (if known - not all bridge version are capable of reporting it). */
private var version: String? = null

/**
* Whether the last received presence indicated the bridge is healthy.
*/
/** Whether the last received presence indicated the bridge is healthy. */
var isHealthy = true
private set

/**
* Holds bridge release ID, or null if not known.
*/
/** Holds bridge release ID, or null if not known. */
private var releaseId: String? = null

/**
Expand Down Expand Up @@ -108,25 +109,16 @@ class Bridge @JvmOverloads internal constructor(
}
}

/**
* Start out with the configured value, update if the bridge reports a value.
*/
/** Start out with the configured value, update if the bridge reports a value. */
private var averageParticipantStress = config.averageParticipantStress

/**
* Stores a boolean that indicates whether the bridge is in graceful shutdown mode.
*/
/** Stores a boolean that indicates whether the bridge is in graceful shutdown mode. */
var isInGracefulShutdown = false // we assume it is not shutting down

/**
* Whether the bridge is in SHUTTING_DOWN mode.
*/
/** Whether the bridge is in SHUTTING_DOWN mode. */
var isShuttingDown = false
private set

/**
* @return true if the bridge is currently in drain mode
*/
/**
* Stores a boolean that indicates whether the bridge is in drain mode.
*/
Expand All @@ -140,19 +132,27 @@ class Bridge @JvmOverloads internal constructor(
*/
private var failureInstant: Instant? = null

/**
* @return the region of this [Bridge].
*/
/** @return the region of this [Bridge]. */
var region: String? = null
private set

/**
* @return the relay ID advertised by the bridge, or `null` if
* none was advertised.
*/
/** @return the relay ID advertised by the bridge, or `null` if none was advertised. */
var relayId: String? = null
private set

/**
* If this [Bridge] has been removed from the list of bridges. Once removed, the metrics specific to this instance
* are cleared and no longer emitted. If the bridge re-connects, a new [Bridge] instance will be created.
*/
val removed = AtomicBoolean(false)

/**
* The last instant at which we detected, based on restart requests from endpoints, that this bridge is failing ICE
*/
private var lastIceFailed = Instant.MIN
private val failingIce: Boolean
get() = Duration.between(lastIceFailed, clock.instant()) < config.iceFailureDetection.timeout

private val logger: Logger = LoggerImpl(Bridge::class.java.name)

init {
Expand Down Expand Up @@ -237,37 +237,75 @@ class Bridge @JvmOverloads internal constructor(
return compare(this, other)
}

/**
* Notifies this [Bridge] that it was used for a new endpoint.
*/
/** Notifies this [Bridge] that it was used for a new endpoint. */
fun endpointAdded() {
newEndpointsRate.update(1)
endpoints.incrementAndGet()
if (!removed.get()) {
BridgeMetrics.endpoints.set(endpoints.get().toLong(), listOf(jid.resourceOrEmpty.toString()))
}
}

/**
* Returns the net number of video channels recently allocated or removed
* from this bridge.
*/
fun endpointRemoved() = endpointsRemoved(1)
fun endpointsRemoved(count: Int) {
endpoints.addAndGet(-count)
if (!removed.get()) {
BridgeMetrics.endpoints.set(endpoints.get().toLong(), listOf(jid.resourceOrEmpty.toString()))
}
if (endpoints.get() < 0) {
logger.error("Removed more endpoints than were allocated. Resetting to 0.", Throwable())
endpoints.set(0)
}
}
internal fun markRemoved() {
if (removed.compareAndSet(false, true)) {
BridgeMetrics.restartRequestsMetric.remove(listOf(jid.resourceOrEmpty.toString()))
BridgeMetrics.endpoints.remove(listOf(jid.resourceOrEmpty.toString()))
}
}
internal fun updateMetrics() {
if (!removed.get()) {
BridgeMetrics.failingIce.set(failingIce, listOf(jid.resourceOrEmpty.toString()))
}
}

fun endpointRequestedRestart() {
endpointRestartRequestRate.update(1)
if (!removed.get()) {
BridgeMetrics.restartRequestsMetric.inc(listOf(jid.resourceOrEmpty.toString()))
}

if (config.iceFailureDetection.enabled) {
val restartCount = endpointRestartRequestRate.getAccumulatedCount()
val endpoints = endpoints.get()
if (endpoints >= config.iceFailureDetection.minEndpoints &&
restartCount > endpoints * config.iceFailureDetection.threshold
) {
// Reset the timeout regardless of the previous state, but only log if the state changed.
if (!failingIce) {
logger.info("Detected an ICE failing state.")
}
lastIceFailed = clock.instant()
}
}
}

/** Returns the net number of video channels recently allocated or removed from this bridge. */
private val recentlyAddedEndpointCount: Long
get() = newEndpointsRate.getAccumulatedCount()

/**
* The version of this bridge (with embedded release ID, if available).
*/
/** The version of this bridge (with embedded release ID, if available). */
val fullVersion: String?
get() = if (version != null && releaseId != null) "$version-$releaseId" else version

/**
* {@inheritDoc}
*/
override fun toString(): String {
return String.format(
"Bridge[jid=%s, version=%s, relayId=%s, region=%s, stress=%.2f]",
"Bridge[jid=%s, version=%s, relayId=%s, region=%s, correctedStress=%.2f]",
jid.toString(),
fullVersion,
relayId,
region,
stress
correctedStress
)
}

Expand All @@ -276,34 +314,37 @@ class Bridge @JvmOverloads internal constructor(
* can exceed 1).
* @return this bridge's stress level
*/
val stress: Double
get() =
// While a stress of 1 indicates a bridge is fully loaded, we allow
// larger values to keep sorting correctly.
lastReportedStressLevel +
recentlyAddedEndpointCount.coerceAtLeast(0) * averageParticipantStress
val correctedStress: Double
get() {
// Correct for recently added endpoints.
// While a stress of 1 indicates a bridge is fully loaded, we allow larger values to keep sorting correctly.
val s = lastReportedStressLevel + recentlyAddedEndpointCount.coerceAtLeast(0) * averageParticipantStress

/**
* @return true if the stress of the bridge is greater-than-or-equal to the threshold.
*/
// Correct for failing ICE.
return if (failingIce) max(s, config.stressThreshold + 0.01) else s
}

/** @return true if the stress of the bridge is greater-than-or-equal to the threshold. */
val isOverloaded: Boolean
get() = stress >= config.stressThreshold
get() = correctedStress >= config.stressThreshold

val debugState: OrderedJsonObject
get() {
val o = OrderedJsonObject()
o["version"] = version.toString()
o["release"] = releaseId.toString()
o["stress"] = stress
o["operational"] = isOperational
o["region"] = region.toString()
o["drain"] = isDraining
o["graceful-shutdown"] = isInGracefulShutdown
o["shutting-down"] = isShuttingDown
o["overloaded"] = isOverloaded
o["relay-id"] = relayId.toString()
o["healthy"] = isHealthy
return o
get() = OrderedJsonObject().apply {
this["corrected-stress"] = correctedStress
this["drain"] = isDraining
this["endpoints"] = endpoints.get()
this["endpoint-restart-requests"] = endpointRestartRequestRate.getAccumulatedCount()
this["failing-ice"] = failingIce
this["graceful-shutdown"] = isInGracefulShutdown
this["healthy"] = isHealthy
this["operational"] = isOperational
this["overloaded"] = isOverloaded
this["region"] = region.toString()
this["relay-id"] = relayId.toString()
this["release"] = releaseId.toString()
this["shutting-down"] = isShuttingDown
this["stress"] = lastReportedStressLevel
this["version"] = version.toString()
}

companion object {
Expand All @@ -327,7 +368,7 @@ class Bridge @JvmOverloads internal constructor(
return if (myPriority != otherPriority) {
myPriority - otherPriority
} else {
b1.stress.compareTo(b2.stress)
b1.correctedStress.compareTo(b2.correctedStress)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,34 @@ class BridgeConfig private constructor() {
fun getRegionGroup(region: String?): Set<String> =
if (region == null) emptySet() else regionGroups[region] ?: setOf(region)

val iceFailureDetection = IceFailureDetectionConfig()

companion object {
const val BASE = "jicofo.bridge"

@JvmField
val config = BridgeConfig()
}
}

class IceFailureDetectionConfig {
val enabled: Boolean by config {
"$BASE.enabled".from(JitsiConfig.newConfig)
}
val interval: Duration by config {
"$BASE.interval".from(JitsiConfig.newConfig)
}
val minEndpoints: Int by config {
"$BASE.min-endpoints".from(JitsiConfig.newConfig)
}
val threshold: Double by config {
"$BASE.threshold".from(JitsiConfig.newConfig)
}
val timeout: Duration by config {
"$BASE.timeout".from(JitsiConfig.newConfig)
}

companion object {
const val BASE = "jicofo.bridge.ice-failure-detection"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.jitsi.jicofo.bridge

import org.jitsi.jicofo.metrics.JicofoMetricsContainer.Companion.instance as metricsContainer
class BridgeMetrics {
companion object {
/** Total number of participants that requested a restart for a specific bridge. */
val restartRequestsMetric = metricsContainer.registerCounter(
"bridge_restart_requests_total",
"Total number of requests to restart a bridge",
labelNames = listOf("jvb")
)
val endpoints = metricsContainer.registerLongGauge(
"bridge_endpoints",
"The number of endpoints on a bridge.",
labelNames = listOf("jvb")
)
val failingIce = metricsContainer.registerBooleanMetric(
"bridge_failing_ice",
"Whether a bridge is currently in the failing ICE state.",
labelNames = listOf("jvb")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class BridgeSelector @JvmOverloads constructor(
logger.warn("Lost a bridge: $bridgeJid")
lostBridges.inc()
}
it.markRemoved()
bridgeCount.dec()
eventEmitter.fireEvent { bridgeRemoved(it) }
}
Expand Down Expand Up @@ -214,10 +215,7 @@ class BridgeSelector @JvmOverloads constructor(
conferenceBridges,
participantProperties,
OctoConfig.config.enabled
).also {
// The bridge was selected for an endpoint, increment its counter.
it?.endpointAdded()
}
)
}

val stats: JSONObject
Expand Down Expand Up @@ -245,6 +243,7 @@ class BridgeSelector @JvmOverloads constructor(
inShutdownBridgeCountMetric.set(bridges.values.count { it.isInGracefulShutdown }.toLong())
operationalBridgeCountMetric.set(bridges.values.count { it.isOperational }.toLong())
bridgeVersionCount.set(bridges.values.map { it.fullVersion }.toSet().size.toLong())
bridges.values.forEach { it.updateMetrics() }
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ class VisitorTopologyStrategy : TopologySelectionStrategy() {
cascade.getDistanceFrom(it) { node -> !node.visitor }
}

val sortedNodes = nodesWithDistance.entries.sortedWith(compareBy({ it.value }, { it.key.bridge.stress }))
.map { it.key }
val sortedNodes = nodesWithDistance.entries.sortedWith(
compareBy({ it.value }, { it.key.bridge.correctedStress })
).map { it.key }

/* TODO: this logic looks a lot like bridge selection. Do we want to try to share logic with that code? */
val nonOverloadedInRegion = sortedNodes.filter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ interface ColibriSessionManager {
suppressLocalBridgeUpdate: Boolean = false
)

fun getBridgeSessionId(participantId: String): String?
fun getBridgeSessionId(participantId: String): Pair<Bridge?, String?>

/**
* Stop using [bridge], expiring all endpoints on it (e.g. because it was detected to have failed).
Expand Down
Loading

0 comments on commit b62948d

Please sign in to comment.