Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support region groups for jigasi selection. #1186

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/Bridge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.jxmpp.jid.Jid
import java.time.Clock
import java.time.Duration
import java.time.Instant
import org.jitsi.jicofo.bridge.BridgeConfig.Companion.config as config

/**
* Represents a jitsi-videobridge instance, reachable at a certain JID, which
Expand All @@ -51,7 +52,7 @@ class Bridge @JvmOverloads internal constructor(
* Keep track of the recently added endpoints.
*/
private val newEndpointsRate = RateTracker(
BridgeConfig.config.participantRampupInterval(),
config.participantRampupInterval,
Duration.ofMillis(100),
clock
)
Expand Down Expand Up @@ -93,7 +94,7 @@ class Bridge @JvmOverloads internal constructor(
// To filter out intermittent failures, do not return operational
// until past the reset threshold since the last failure.
if (failureInstant != null &&
Duration.between(failureInstant, clock.instant()).compareTo(failureResetThreshold) < 0
Duration.between(failureInstant, clock.instant()).compareTo(config.failureResetThreshold) < 0
) {
false
} else {
Expand All @@ -110,7 +111,7 @@ class Bridge @JvmOverloads internal constructor(
/**
* Start out with the configured value, update if the bridge reports a value.
*/
private var averageParticipantStress = BridgeConfig.config.averageParticipantStress()
private var averageParticipantStress = config.averageParticipantStress

/**
* Stores a boolean that indicates whether the bridge is in graceful shutdown mode.
Expand Down Expand Up @@ -216,7 +217,7 @@ class Bridge @JvmOverloads internal constructor(
val healthy = stats.getValueAsString("healthy")
if (healthy != null) {
isHealthy = java.lang.Boolean.parseBoolean(healthy)
} else if (BridgeConfig.config.usePresenceForHealth) {
} else if (config.usePresenceForHealth) {
logger.warn(
"Presence-based health checks are enabled, but presence did not include health status. Health " +
"checks for this bridge are effectively disabled."
Expand Down Expand Up @@ -286,7 +287,7 @@ class Bridge @JvmOverloads internal constructor(
* @return true if the stress of the bridge is greater-than-or-equal to the threshold.
*/
val isOverloaded: Boolean
get() = stress >= BridgeConfig.config.stressThreshold()
get() = stress >= config.stressThreshold

val debugState: OrderedJsonObject
get() {
Expand All @@ -306,12 +307,6 @@ class Bridge @JvmOverloads internal constructor(
}

companion object {
/**
* How long the "failed" state should be sticky for. Once a [Bridge] goes in a non-operational state (via
* [.setIsOperational]) it will be considered non-operational for at least this amount of time.
* See the tests for example behavior.
*/
private val failureResetThreshold = BridgeConfig.config.failureResetThreshold()

/**
* Returns a negative number if b1 is more able to serve conferences than b2. The computation is based on the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,40 +36,38 @@ class BridgeConfig private constructor() {
"org.jitsi.jicofo.BridgeSelector.MAX_PARTICIPANTS_PER_BRIDGE".from(JitsiConfig.legacyConfig)
"$BASE.max-bridge-participants".from(JitsiConfig.newConfig)
}
fun maxBridgeParticipants() = maxBridgeParticipants

val maxBridgePacketRatePps: Int by config {
"org.jitsi.jicofo.BridgeSelector.MAX_BRIDGE_PACKET_RATE".from(JitsiConfig.legacyConfig)
"$BASE.max-bridge-packet-rate".from(JitsiConfig.newConfig)
}
fun maxBridgePacketRatePps() = maxBridgePacketRatePps

val averageParticipantPacketRatePps: Int by config {
"org.jitsi.jicofo.BridgeSelector.AVG_PARTICIPANT_PACKET_RATE".from(JitsiConfig.legacyConfig)
"$BASE.average-participant-packet-rate-pps"
.from(JitsiConfig.newConfig).softDeprecated("use $BASE.average-participant-stress")
}
fun averageParticipantPacketRatePps() = averageParticipantPacketRatePps

val averageParticipantStress: Double by config {
"$BASE.average-participant-stress".from(JitsiConfig.newConfig)
}
fun averageParticipantStress() = averageParticipantStress

val stressThreshold: Double by config { "$BASE.stress-threshold".from(JitsiConfig.newConfig) }
fun stressThreshold() = stressThreshold

/**
* How long the "failed" state should be sticky for. Once a [Bridge] goes in a non-operational state (via
* [.setIsOperational]) it will be considered non-operational for at least this amount of time.
* See the tests for example behavior.
*/
val failureResetThreshold: Duration by config {
"org.jitsi.focus.BRIDGE_FAILURE_RESET_THRESHOLD".from(JitsiConfig.legacyConfig)
.convertFrom<Long> { Duration.ofMillis(it) }
"$BASE.failure-reset-threshold".from(JitsiConfig.newConfig)
}
fun failureResetThreshold() = failureResetThreshold

val participantRampupInterval: Duration by config {
"$BASE.participant-rampup-interval".from(JitsiConfig.newConfig)
}
fun participantRampupInterval() = participantRampupInterval

val selectionStrategy: BridgeSelectionStrategy by config {
"org.jitsi.jicofo.BridgeSelector.BRIDGE_SELECTION_STRATEGY".from(JitsiConfig.legacyConfig)
Expand Down Expand Up @@ -148,17 +146,27 @@ class BridgeConfig private constructor() {
"jicofo.bridge.xmpp-connection-name".from(JitsiConfig.newConfig)
}

val regionGroups: Set<Set<String>> by config {
val regionGroups: Map<String, Set<String>> by config {
"jicofo.bridge".from(JitsiConfig.newConfig).convertFrom<ConfigObject> {
val regionGroupsConfigList = it["region-groups"] as? ConfigList ?: emptyList<ConfigValue>()
regionGroupsConfigList.map { regionsConfigList ->
val regionGroups = regionGroupsConfigList.map { regionsConfigList ->
(regionsConfigList as? ConfigList ?: emptyList<ConfigValue>()).map { region ->
region.unwrapped().toString()
}.toSet()
}.toSet()
mutableMapOf<String, Set<String>>().apply {
regionGroups.forEach { regionGroup ->
regionGroup.forEach { region ->
this[region] = regionGroup
}
}
}
}
}

fun getRegionGroup(region: String?): Set<String> =
if (region == null) emptySet() else regionGroups[region] ?: setOf(region)

companion object {
const val BASE = "jicofo.bridge"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.jitsi.jicofo.bridge
import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.LoggerImpl
import org.json.simple.JSONObject
import org.jitsi.jicofo.bridge.BridgeConfig.Companion.config as config

/**
* Represents an algorithm for bridge selection.
Expand Down Expand Up @@ -85,12 +86,6 @@ abstract class BridgeSelectionStrategy {
*/
private var totalLeastLoaded = 0

/**
* Maximum participants per bridge in one conference, or `-1` for no maximum.
*/
private val maxParticipantsPerBridge =
BridgeConfig.config.maxBridgeParticipants()

/**
* Selects a bridge to be used for a new participant in a conference.
*
Expand Down Expand Up @@ -176,7 +171,7 @@ abstract class BridgeSelectionStrategy {
participantProperties: ParticipantProperties,
desiredRegion: String?
): Bridge? {
val regionGroup = getRegionGroup(desiredRegion)
val regionGroup = config.getRegionGroup(desiredRegion)
val result = bridges
.filterNot { isOverloaded(it, conferenceBridges) }
.intersect(conferenceBridges.keys)
Expand Down Expand Up @@ -234,7 +229,7 @@ abstract class BridgeSelectionStrategy {
participantProperties: ParticipantProperties,
desiredRegion: String?
): Bridge? {
val regionGroup = getRegionGroup(desiredRegion)
val regionGroup = config.getRegionGroup(desiredRegion)
val result = bridges
.filterNot { isOverloaded(it, conferenceBridges) }
.firstOrNull { regionGroup.contains(it.region) }
Expand Down Expand Up @@ -279,7 +274,7 @@ abstract class BridgeSelectionStrategy {
participantProperties: ParticipantProperties,
desiredRegion: String?
): Bridge? {
val regionGroup = getRegionGroup(desiredRegion)
val regionGroup = config.getRegionGroup(desiredRegion)
val result = bridges
.intersect(conferenceBridges.keys)
.firstOrNull { regionGroup.contains(it.region) }
Expand Down Expand Up @@ -320,7 +315,7 @@ abstract class BridgeSelectionStrategy {
participantProperties: ParticipantProperties,
desiredRegion: String?
): Bridge? {
val regionGroup = getRegionGroup(desiredRegion)
val regionGroup = config.getRegionGroup(desiredRegion)
val result = bridges
.firstOrNull { regionGroup.contains(it.region) }
if (result != null) {
Expand Down Expand Up @@ -401,28 +396,12 @@ abstract class BridgeSelectionStrategy {
*/
private fun isOverloaded(bridge: Bridge, conferenceBridges: Map<Bridge, ConferenceBridgeProperties>): Boolean {
return bridge.isOverloaded || (
maxParticipantsPerBridge > 0 &&
config.maxBridgeParticipants > 0 &&
conferenceBridges.containsKey(bridge) &&
conferenceBridges[bridge]!!.participantCount >= maxParticipantsPerBridge
conferenceBridges[bridge]!!.participantCount >= config.maxBridgeParticipants
)
}

private val regionGroups: MutableMap<String, Set<String>> = HashMap()

init {
BridgeConfig.config.regionGroups.forEach { regionGroup ->
regionGroup.forEach { region ->
regionGroups[region] = regionGroup
}
}
}

protected fun getRegionGroup(region: String?): Set<String> {
if (region == null) return setOf()
val regionGroup = regionGroups[region]
return regionGroup ?: setOf(region)
}

val stats: JSONObject
get() {
val json = JSONObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class RegionBasedBridgeSelectionStrategy : BridgeSelectionStrategy() {
val participantRegion = participantProperties.region
var region = participantRegion ?: localRegion
if (localRegion != null) {
val regionGroup = getRegionGroup(region)
val regionGroup = BridgeConfig.config.getRegionGroup(region)
if (conferenceBridges.isEmpty() && region != localRegion) {
// Selecting an initial bridge for a participant not in the local region. This is most likely because
// exactly one of the first two participants in the conference is not in the local region, and we're
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class BridgeConfigTest : ShouldSpec() {
}
}
context("Region groups") {
config.regionGroups shouldBe emptySet()
config.regionGroups shouldBe emptyMap()

withNewConfig(
"""
Expand All @@ -92,7 +92,15 @@ class BridgeConfigTest : ShouldSpec() {
]
""".trimIndent()
) {
config.regionGroups shouldBe setOf(setOf("us-east", "us-west"), setOf("eu-central", "eu-west"))
config.regionGroups shouldBe mapOf(
"us-east" to setOf("us-east", "us-west"),
"us-west" to setOf("us-east", "us-west"),
"eu-central" to setOf("eu-central", "eu-west"),
"eu-west" to setOf("eu-central", "eu-west")
)
config.getRegionGroup(null) shouldBe emptySet()
config.getRegionGroup("abc") shouldBe setOf("abc")
config.getRegionGroup("us-east") shouldBe setOf("us-east", "us-west")
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion jicofo/src/main/kotlin/org/jitsi/jicofo/jigasi/JigasiDetector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.jitsi.jicofo.jigasi

import org.jitsi.jicofo.JicofoConfig
import org.jitsi.jicofo.bridge.BridgeConfig
import org.jitsi.jicofo.metrics.JicofoMetricsContainer
import org.jitsi.jicofo.xmpp.BaseBrewery
import org.jitsi.jicofo.xmpp.XmppProvider
Expand Down Expand Up @@ -86,6 +87,7 @@ open class JigasiDetector(
this["supports_transcription"] = instance.supportsTranscription()
this["is_in_graceful_shutdown"] = instance.isInGracefulShutdown()
this["participants"] = instance.getParticipantCount()
this["region"] = instance.getRegion() ?: "null"
}
debugState[instance.jid.resourceOrEmpty.toString()] = instanceJson
}
Expand Down Expand Up @@ -118,6 +120,14 @@ open class JigasiDetector(
availableInstances.filter { it.isInRegion(*preferredRegions.toTypedArray()) }.let {
if (it.isNotEmpty()) return it.leastLoaded()?.jid
}
// Try to match the preferred region groups.
val extendedPreferredRegions = preferredRegions.flatMap { region ->
BridgeConfig.config.getRegionGroup(region)
}
availableInstances.filter { it.isInRegion(*extendedPreferredRegions.toTypedArray()) }.let {
if (it.isNotEmpty()) return it.leastLoaded()?.jid
}

// Otherwise try to match the local region.
availableInstances.filter { it.isInRegion(localRegion) }.let {
if (it.isNotEmpty()) return it.leastLoaded()?.jid
Expand Down Expand Up @@ -148,8 +158,10 @@ private fun BaseBrewery<ColibriStatsExtension>.BrewInstance.supportsTranscriptio
private fun BaseBrewery<ColibriStatsExtension>.BrewInstance.supportsSip(): Boolean =
java.lang.Boolean.parseBoolean(status.getValueAsString(ColibriStatsExtension.SUPPORTS_SIP))
private fun BaseBrewery<ColibriStatsExtension>.BrewInstance.isInRegion(vararg regions: String?): Boolean =
regions.contains(status.getValueAsString(ColibriStatsExtension.REGION))
regions.contains(this.getRegion())
private fun BaseBrewery<ColibriStatsExtension>.BrewInstance.getParticipantCount(): Int =
status.getValueAsInt(ColibriStatsExtension.PARTICIPANTS) ?: 0
private fun BaseBrewery<ColibriStatsExtension>.BrewInstance.getRegion(): String? =
status.getValueAsString(ColibriStatsExtension.REGION)
private fun List<BaseBrewery<ColibriStatsExtension>.BrewInstance>.leastLoaded() =
minByOrNull { it.getParticipantCount() }
Loading