diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 6b4b6ebd..00f67883 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -193,9 +193,7 @@ func (m *Monitor) CanAccept() bool { return false } - available := m.cpuStats.GetCPUIdle() - m.pendingCPUs.Load() - - return available > m.maxCost + return m.getAvailable() > m.maxCost } func (m *Monitor) canAcceptIngress(info *livekit.IngressInfo) (bool, float64, float64) { @@ -205,7 +203,7 @@ func (m *Monitor) canAcceptIngress(info *livekit.IngressInfo) (bool, float64, fl var cpuHold float64 var accept bool - available := m.cpuStats.GetCPUIdle() - m.pendingCPUs.Load() - minIdle*m.cpuStats.NumCPU() + available := m.getAvailable() switch info.InputType { case livekit.IngressInput_RTMP_INPUT: @@ -227,6 +225,8 @@ func (m *Monitor) canAcceptIngress(info *livekit.IngressInfo) (bool, float64, fl logger.Errorw("unsupported request type", errors.New("invalid parameter")) } + logger.Debugw("checking if request can be handled", "inputType", info.InputType, "accept", accept, "available", available, "cpuHold", cpuHold, "idle", m.cpuStats.GetCPUIdle(), "pending", m.pendingCPUs.Load()) + return accept, cpuHold, available } @@ -267,3 +267,7 @@ func (m *Monitor) IngressEnded(info *livekit.IngressInfo) { } } + +func (m *Monitor) getAvailable() float64 { + return m.cpuStats.GetCPUIdle() - m.pendingCPUs.Load() - minIdle*m.cpuStats.NumCPU() +}