Skip to content

Commit

Permalink
MEDIA-1620: fix mixer idle timeout (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
RicardoMDomingues authored Dec 14, 2023
1 parent e7752fb commit 994e060
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 20 deletions.
71 changes: 55 additions & 16 deletions bridge/engine/EngineMixer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ EngineMixer::EngineMixer(const std::string& id,
_mainAllocator(mainAllocator),
_sendAllocator(sendAllocator),
_audioAllocator(audioAllocator),
_lastReceiveTime(utils::Time::getAbsoluteTime()),
_lastReceiveTimeOnRegularTransports(utils::Time::getAbsoluteTime()),
_lastReceiveTimeOnBarbellTransports(utils::Time::getAbsoluteTime()),
_engineStreamDirector(std::make_unique<EngineStreamDirector>(_loggableId.getInstanceId(), config, lastN)),
_activeMediaList(std::make_unique<ActiveMediaList>(_loggableId.getInstanceId(),
audioSsrcs,
Expand Down Expand Up @@ -88,6 +89,21 @@ EngineMixer::EngineMixer(const std::string& id,

EngineMixer::~EngineMixer() {}

bool EngineMixer::isIdle(const uint16_t timestamp) const
{
if (!utils::Time::diffGE(_lastReceiveTimeOnRegularTransports,
timestamp,
_config.mixerInactivityTimeoutMs * utils::Time::ms))
{
return false;
}

return _config.deleteEmptyConferencesWithBarbells ||
utils::Time::diffGE(_lastReceiveTimeOnBarbellTransports,
timestamp,
_config.mixerInactivityTimeoutMs * utils::Time::ms);
}

memory::UniquePacket EngineMixer::createGoodBye(uint32_t ssrc, memory::PacketPoolAllocator& allocator)
{
auto packet = memory::makeUniquePacket(allocator);
Expand Down Expand Up @@ -201,10 +217,7 @@ void EngineMixer::run(const uint64_t engineIterationStartTimestamp)
// 6. Maintain transports.
runTransportTicks(engineIterationStartTimestamp);

const bool isIdle = utils::Time::diffGE(_lastReceiveTime,
engineIterationStartTimestamp,
_config.mixerInactivityTimeoutMs * utils::Time::ms);
if (isIdle && !_hasSentTimeout)
if (!_hasSentTimeout && isIdle(engineIterationStartTimestamp))
{
_hasSentTimeout = _messageListener.asyncMixerTimedOut(*this);
}
Expand Down Expand Up @@ -975,6 +988,7 @@ SsrcInboundContext* EngineMixer::emplaceInboundSsrcContext(const uint32_t ssrc,
void EngineMixer::processIncomingRtpPackets(const uint64_t timestamp)
{
uint32_t numRtpPackets = 0;
uint32_t numBarbellRtpPackets = 0;

if (_numMixedAudioStreams > 0)
{
Expand All @@ -997,6 +1011,10 @@ void EngineMixer::processIncomingRtpPackets(const uint64_t timestamp)
for (IncomingPacketInfo packetInfo; _incomingForwarderAudioRtp.pop(packetInfo);)
{
++numRtpPackets;
if (EngineBarbell::isFromBarbell(packetInfo.transport()->getTag()))
{
++numBarbellRtpPackets;
}

const auto rtpHeader = rtp::RtpHeader::fromPacket(*packetInfo.packet());
if (!rtpHeader)
Expand All @@ -1017,6 +1035,11 @@ void EngineMixer::processIncomingRtpPackets(const uint64_t timestamp)
for (IncomingPacketInfo packetInfo; _incomingForwarderVideoRtp.pop(packetInfo);)
{
++numRtpPackets;
if (EngineBarbell::isFromBarbell(packetInfo.transport()->getTag()))
{
++numBarbellRtpPackets;
}

auto ssrcContext = packetInfo.inboundContext();
if (ssrcContext && !ssrcContext->activeMedia)
{
Expand All @@ -1031,9 +1054,14 @@ void EngineMixer::processIncomingRtpPackets(const uint64_t timestamp)
forwardVideoRtpPacketRecording(packetInfo, timestamp);
}

if (numRtpPackets > 0)
if (numBarbellRtpPackets > 0)
{
_lastReceiveTime = timestamp;
_lastReceiveTimeOnBarbellTransports = timestamp;
}

if (numBarbellRtpPackets != numRtpPackets)
{
_lastReceiveTimeOnRegularTransports = timestamp;
}
}

Expand Down Expand Up @@ -1062,19 +1090,27 @@ void EngineMixer::processIncomingRtcpPackets(const uint64_t timestamp)
}
}

_lastReceiveTime = timestamp;
if (EngineBarbell::isFromBarbell(packetInfo.transport()->getTag()))
{
_lastReceiveTimeOnBarbellTransports = timestamp;
}
else
{
_lastReceiveTimeOnRegularTransports = timestamp;
}
}
}

void EngineMixer::processIceActivity(const uint64_t timestamp)
{
bool needToUpdate = !_iceReceivedOnBarbellTransport.test_and_set() && !_config.deleteEmptyConferencesWithBarbells;
needToUpdate |= !_iceReceivedOnRegularTransport.test_and_set();
if (!_iceReceivedOnBarbellTransport.test_and_set())
{
_lastReceiveTimeOnBarbellTransports = timestamp;
}

if (needToUpdate)
if (!_iceReceivedOnRegularTransport.test_and_set())
{
// if it was cleared by any transport receiving ICE, we will now set the keep alive timestamp
_lastReceiveTime = timestamp;
_lastReceiveTimeOnRegularTransports = timestamp;
}
}

Expand Down Expand Up @@ -1594,10 +1630,13 @@ bool EngineMixer::asyncHandleSctpControl(const size_t endpointIdHash, memory::Un

void EngineMixer::onIceReceived(transport::RtcTransport* transport, uint64_t timestamp)
{
if (EngineBarbell::isFromBarbell(transport->getTag())) {
if (EngineBarbell::isFromBarbell(transport->getTag()))
{
_iceReceivedOnBarbellTransport.clear();
} else {
_iceReceivedOnRegularTransport.clear();
}
else
{
_iceReceivedOnRegularTransport.clear();
}
}
} // namespace bridge
5 changes: 4 additions & 1 deletion bridge/engine/EngineMixer.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ class EngineMixer : public transport::DataReceiver
memory::PacketPoolAllocator& _sendAllocator;
memory::AudioPacketPoolAllocator& _audioAllocator;

uint64_t _lastReceiveTime;
uint64_t _lastReceiveTimeOnRegularTransports;
uint64_t _lastReceiveTimeOnBarbellTransports;
std::atomic_flag _iceReceivedOnRegularTransport = ATOMIC_FLAG_INIT;
std::atomic_flag _iceReceivedOnBarbellTransport = ATOMIC_FLAG_INIT;
uint64_t _lastCounterCheck;
Expand All @@ -408,6 +409,8 @@ class EngineMixer : public transport::DataReceiver
uint64_t _lastRecordingAckProcessed;
bool _slidesPresent;

bool isIdle(uint16_t timestamp) const;

uint32_t getMinRemoteClientDownlinkBandwidth() const;
void reportMinRemoteClientDownlinkBandwidthToBarbells(const uint32_t minUplinkEstimate) const;

Expand Down
2 changes: 0 additions & 2 deletions test/integration/emulator/FakeTcpEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ void FakeTcpEndpoint::dispatchReceivedPacket(fakenet::Protocol protocol,
void FakeTcpEndpoint::process(uint64_t timestamp)
{
const auto start = utils::Time::getAbsoluteTime();
uint32_t packetCounter = 0;

if (_state != Endpoint::CONNECTED && _state != Endpoint::CONNECTING)
{
Expand All @@ -305,7 +304,6 @@ void FakeTcpEndpoint::process(uint64_t timestamp)
size_t byteCount = 0;
for (OutboundPacket packetInfo; _sendQueue.pop(packetInfo);)
{
++packetCounter;
auto& packet = packetInfo.packet;
byteCount += packet->getLength();

Expand Down
2 changes: 1 addition & 1 deletion test/integration/emulator/JitterPacketSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ uint32_t identifyAudioSsrc(logger::PacketLogReader& reader)
{
logger::PacketLogItem item;
std::map<uint32_t, SsrcTrack> ssrcs;
for (int i = 0; reader.getNext(item); ++i)
while (reader.getNext(item))
{
if (item.size >= 300)
{
Expand Down
1 change: 1 addition & 0 deletions test/utils/LogSpamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "logger/PruneSpam.h"
#include "logger/SuspendSpam.h"
#include <gtest/gtest.h>
#include <signal.h>
#include <unordered_map>

TEST(LogSpam, prune)
Expand Down

0 comments on commit 994e060

Please sign in to comment.