Skip to content

Commit

Permalink
Merge branch 'finos:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
topilski authored Nov 22, 2024
2 parents 49b9465 + 83d6acd commit 3db7800
Show file tree
Hide file tree
Showing 37 changed files with 466 additions and 237 deletions.
3 changes: 0 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,6 @@ set(FILES
utils/IdGenerator.h
utils/MersienneRandom.h
utils/Optional.h
utils/Pacer.cpp
utils/Pacer.h
utils/ScopedFileHandle.h
utils/ScopedInvariantChecker.h
Expand Down Expand Up @@ -502,8 +501,6 @@ set(TEST_LIB_FILES
test/integration/FFTanalysis.cpp
test/sctp/SctpEndpoint.h
test/sctp/SctpEndpoint.cpp
test/transport/SrtpProtectJob.cpp
test/transport/SrtpProtectJob.h
test/transport/SrtpUnprotectJob.cpp
test/transport/SrtpUnprotectJob.h
test/transport/FakeNetwork.h
Expand Down
13 changes: 6 additions & 7 deletions bridge/engine/DiscardReceivedVideoPacketJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ void DiscardReceivedVideoPacketJob::run()
return;
}

const bool noPacketsProcessedYet =
(_ssrcContext.packetsProcessed == 0 && _ssrcContext.lastReceivedExtendedSequenceNumber == 0 &&
_ssrcContext.lastUnprotectedExtendedSequenceNumber == 0);

if (noPacketsProcessedYet && (_extendedSequenceNumber >> 16) == 0)
if (!_ssrcContext.hasDecryptedPackets)
{
_sender->unprotect(*_packet); // make sure srtp sees one packet with ROC=0
_ssrcContext.lastUnprotectedExtendedSequenceNumber = _extendedSequenceNumber;
if (_sender->unprotectFirstRtp(*_packet, _ssrcContext.rocOffset)) // make sure srtp sees one packet with ROC=0
{
_ssrcContext.lastUnprotectedExtendedSequenceNumber = _extendedSequenceNumber;
_ssrcContext.hasDecryptedPackets = true;
}
}

_ssrcContext.onRtpPacketReceived(_timestamp);
Expand Down
1 change: 1 addition & 0 deletions bridge/engine/EngineMixer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ EngineMixer::EngineMixer(const std::string& id,
_lastReceiveTimeOnRegularTransports(_lastStartedIterationTimestamp),
_lastReceiveTimeOnBarbellTransports(_lastStartedIterationTimestamp),
_lastSendTimeOfUserMediaMapMessageOverBarbells(_lastStartedIterationTimestamp),
_lastCounterCheck(0),
_engineStreamDirector(std::make_unique<EngineStreamDirector>(_loggableId.getInstanceId(), config, lastN)),
_activeMediaList(std::make_unique<ActiveMediaList>(_loggableId.getInstanceId(),
audioSsrcs,
Expand Down
3 changes: 1 addition & 2 deletions bridge/engine/ProcessMissingVideoPacketsJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ ProcessMissingVideoPacketsJob::ProcessMissingVideoPacketsJob(SsrcInboundContext&
void ProcessMissingVideoPacketsJob::run()
{
auto timestamp = utils::Time::getAbsoluteTime();
auto* videoMissingPacketsTracker = _ssrcContext.videoMissingPacketsTracker.get();
if (!videoMissingPacketsTracker)
if (!_ssrcContext.videoMissingPacketsTracker)
{
return;
}
Expand Down
19 changes: 17 additions & 2 deletions bridge/engine/RtpForwarderReceiveBaseJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,26 @@ RtpForwarderReceiveBaseJob::RtpForwarderReceiveBaseJob(memory::UniquePacket&& pa

bool RtpForwarderReceiveBaseJob::tryUnprotectRtpPacket(const char* logGroup)
{
if (!_ssrcContext.hasDecryptedPackets)
{
if (_sender->unprotectFirstRtp(*_packet, _ssrcContext.rocOffset))
{
_ssrcContext.lastUnprotectedExtendedSequenceNumber = _extendedSequenceNumber;
_ssrcContext.hasDecryptedPackets = true;
return true;
}
else
{
return false;
}
}

if (transport::SrtpClient::shouldSetRolloverCounter(_ssrcContext.lastUnprotectedExtendedSequenceNumber,
_extendedSequenceNumber))
{
const uint32_t oldRolloverCounter = _ssrcContext.lastUnprotectedExtendedSequenceNumber >> 16;
const uint32_t newRolloverCounter = _extendedSequenceNumber >> 16;
const uint32_t oldRolloverCounter =
_ssrcContext.rocOffset + (_ssrcContext.lastUnprotectedExtendedSequenceNumber >> 16);
const uint32_t newRolloverCounter = _ssrcContext.rocOffset + (_extendedSequenceNumber >> 16);

logger::info("Setting rollover counter for ssrc %u, extseqno %u->%u, seqno %u->%u, roc %u->%u, %s",
logGroup,
Expand Down
6 changes: 4 additions & 2 deletions bridge/engine/SsrcInboundContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ class SsrcInboundContext
defaultLevelSsrc(defaultLevelSsrc),
markNextPacket(true),
lastReceivedExtendedSequenceNumber(0),
packetsProcessed(0),
hasDecryptedPackets(false),
lastUnprotectedExtendedSequenceNumber(0),
rocOffset(0),
activeMedia(false),
inactiveTransitionCount(0),
isSsrcUsed(true),
Expand Down Expand Up @@ -109,8 +110,9 @@ class SsrcInboundContext
// transport thread variables ===================================
bool markNextPacket;
uint32_t lastReceivedExtendedSequenceNumber;
uint32_t packetsProcessed;
bool hasDecryptedPackets;
uint32_t lastUnprotectedExtendedSequenceNumber;
uint32_t rocOffset; // srtp packets with roc=0 were lost
std::shared_ptr<VideoMissingPacketsTracker> videoMissingPacketsTracker;
std::unique_ptr<codec::OpusDecoder> opusDecoder; // used for missing audio level
std::unique_ptr<utils::AvgRateTracker> opusPacketRate; // pkt/s
Expand Down
4 changes: 1 addition & 3 deletions bridge/engine/VideoForwarderReceiveJob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ void VideoForwarderReceiveJob::run()
? codec::H264Header::isKeyFrame(payload, payloadSize)
: codec::Vp8Header::isKeyFrame(payload, codec::Vp8Header::getPayloadDescriptorSize(payload, payloadSize));

++_ssrcContext.packetsProcessed;

if (_ssrcContext.packetsProcessed == 1)
if (!_ssrcContext.videoMissingPacketsTracker)
{
_ssrcContext.lastReceivedExtendedSequenceNumber = _extendedSequenceNumber - 1;
_ssrcContext.videoMissingPacketsTracker = std::make_shared<VideoMissingPacketsTracker>();
Expand Down
87 changes: 39 additions & 48 deletions concurrency/LockFreeList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,44 @@ namespace concurrency
{

std::atomic_uint32_t LockFreeList::_versionCounter(1);
LockFreeList::LockFreeList() : _head(&_eol), _tail(&_eol), _count(0)
LockFreeList::LockFreeList() : _count(0)
{
_eol._next = &_eol;
_head.store(VersionedPtr<ListItem>(&_eol, 0));
_tail.store(VersionedPtr<ListItem>(&_eol, 0));
_eol._next = VersionedPtr<ListItem>(&_eol, 0);
_cacheLineSeparator[0] = 0xBA;
}

// Possible to push a list of items too
bool LockFreeList::push(ListItem* item)
{
ListItem* last = nullptr;
const uint32_t version = _versionCounter.fetch_add(1);
int count = 0;

for (auto p = item; p; p = p->_next.load())
auto itemNode = VersionedPtr<ListItem>(item, version);
VersionedPtr<ListItem> lastNode;
for (auto p = itemNode; p; p = p->_next.load())
{
++count;
const auto nextPtr = p->_next.load(std::memory_order::memory_order_relaxed);
if (nextPtr == nullptr)
auto nextPtr = p->_next.load(std::memory_order::memory_order_acquire);
if (!nextPtr)
{
last = p;
p->_next.store(makeVersionedPointer(&_eol, version));
lastNode = VersionedPtr<ListItem>(p.get(), version);
p->_next.store(VersionedPtr<ListItem>(&_eol, version), std::memory_order::memory_order_release);
break;
}
else
{
p->_next.store(makeVersionedPointer(nextPtr, version));
p->_next.store(VersionedPtr<ListItem>(nextPtr.get(), version), std::memory_order::memory_order_release);
}
}

auto itemNode = makeVersionedPointer(item, version);
auto lastNode = makeVersionedPointer(last, version);

// move tail to our new tail, retry until we make it
auto prevTailNode = _tail.load();
auto pPrevTail = getPointer(prevTailNode);
auto prevTailNext = pPrevTail->_next.load();
auto prevTailNext = prevTailNode->_next.load();
while (!_tail.compare_exchange_weak(prevTailNode, lastNode))
{
pPrevTail = getPointer(prevTailNode);
prevTailNext = pPrevTail->_next;
prevTailNext = prevTailNode->_next;
}

// prevTail may be one of: &this->_eol, prev node that may be in list or popped already
Expand All @@ -52,8 +50,8 @@ bool LockFreeList::push(ListItem* item)
// if it was popped and reinserted and next is still _eol, the version will differ, and we attach to head.
// Otherwise we would detach ourselves and the rest. if we set next first, popper must notice and move head

if (pPrevTail != &_eol && getPointer(prevTailNext) == &_eol &&
pPrevTail->_next.compare_exchange_strong(prevTailNext, itemNode))
if (prevTailNode.get() != &_eol && prevTailNext.get() == &_eol &&
prevTailNode->_next.compare_exchange_strong(prevTailNext, itemNode))
{
// if we won this, the popper will have to move head to us
_count.fetch_add(count, std::memory_order::memory_order_relaxed);
Expand All @@ -63,7 +61,7 @@ bool LockFreeList::push(ListItem* item)
{
// prevTailNext is now indicating either nullptr, eol in another list, eol in this list but another version so
// re-inserted at end popped empty, we must attach to head
_head.store(itemNode);
_head.store(itemNode, std::memory_order_release);
_count.fetch_add(count, std::memory_order::memory_order_relaxed);
return true;
}
Expand All @@ -73,66 +71,59 @@ bool LockFreeList::push(ListItem* item)
empty list, head points to _eol
return false
Otherwise, we iterate down the list and try to lok an item. If a next pointer is not pointing to node of proper
version we restart from head. Version always have to be checked after locking.
= Popping the tail item =
if we locked tail item we can try to set tail back to head but it may already have been set to a new tail
We can try to set the head past our item but it may have been moved to earlier item or later item. In case of
earlier, we retry. If head has been moved back to eol we cannot decide unless head ptr is still versioned. eol
item does not have to be versioned.
= Popping first item or mid item
There is no difference. Once item is locked we try to move the pointer to next as long as the version of pointer
is less than our item's version
Otherwise, we load _head, next of head, and try to update _head to point to next. That will fail if _head changed and we
retry.
- Once popped, we cannot tell if we popped the _tail also. We do not know if others are pushing to our next.
- We try to update _tail in case it pointed to our node, but it may have moved on and there is nothing we can do.
It may have moved on before we checked it.
- Now if we fail to update our next from the next pointer we had before pop, it means nodes have been added to our next
because our node was the tail node at some point. But that also means we moved the _head to eol. All we can do is to
have _head point to the new list added to our tail.
*/
bool LockFreeList::pop(ListItem*& item)
{
ListItem* pCurrent = nullptr;
ListItem* nextNode = nullptr;
ListItem* currentNode = _head.load(std::memory_order::memory_order_consume);
VersionedPtr<ListItem> nextNode;
auto nodeToPop = _head.load(std::memory_order::memory_order_acquire);
for (;;)
{
pCurrent = getPointer(currentNode);
if (pCurrent == &_eol)
if (nodeToPop.get() == &_eol)
{
return false;
}

nextNode = pCurrent->_next.load();
if (nextNode == nullptr) // cheap check instead of CAS
nextNode = nodeToPop->_next.load();
if (!nextNode) // cheap check if node is popped, instead of CAS
{
currentNode = _head.load();
nodeToPop = _head.load();
continue;
}

if (_head.compare_exchange_weak(currentNode, nextNode))
if (_head.compare_exchange_weak(nodeToPop, nextNode))
{
break;
}
}
// we won the node for popping and the next pointer is from that time

{
auto tail = _tail.load();
if (tail == currentNode)
if (tail == nodeToPop)
{
_tail.compare_exchange_strong(tail, nextNode);
// else tail moved ahead already
}
}

// neither head nor tail can point to this node. We own it.
// but a pusher may be trying to add to next
if (!pCurrent->_next.compare_exchange_strong(nextNode, nullptr))
// Neither head nor tail can point to this node now. We own it.
// But a pusher may have added to our tail, before we changed _tail
if (!nodeToPop->_next.compare_exchange_strong(nextNode, VersionedPtr<ListItem>()))
{
// pusher won. tail was added to our node. We have to fix head since we moved it to eol
// A tail was added to our node. We have to fix _head since we moved it to eol
_head.store(nextNode);
pCurrent->_next.store(nullptr, std::memory_order::memory_order_relaxed);
nodeToPop->_next.store(VersionedPtr<ListItem>(), std::memory_order::memory_order_release);
}

item = pCurrent;
item = nodeToPop.get();
_count.fetch_sub(1, std::memory_order::memory_order_relaxed);
return true;
}
Expand Down
14 changes: 5 additions & 9 deletions concurrency/LockFreeList.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,10 @@ namespace concurrency
class ListItem
{
public:
ListItem()
{
// _next must be initialied atomically. Cannot use constructor
_next = nullptr;
}
ListItem() { _next = VersionedPtr<ListItem>(); }

private:
std::atomic<ListItem*> _next;
std::atomic<VersionedPtr<ListItem>> _next;
friend class LockFreeList;
};

Expand All @@ -35,13 +31,13 @@ class LockFreeList
bool push(ListItem* item);
bool pop(ListItem*& item);

bool empty() const { return getPointer(_head.load()) == &_eol; }
bool empty() const { return _head.load().get() == &_eol; }
uint32_t size() const { return _count; }

private:
std::atomic<ListItem*> _head;
std::atomic<VersionedPtr<ListItem>> _head;
uint64_t _cacheLineSeparator[7];
std::atomic<ListItem*> _tail;
std::atomic<VersionedPtr<ListItem>> _tail;
ListItem _eol;
std::atomic_uint32_t _count;
static std::atomic_uint32_t _versionCounter;
Expand Down
Loading

0 comments on commit 3db7800

Please sign in to comment.