Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MEDIA-2252: implement reconfigure audio stream neighbours #349

Merged
merged 3 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bridge/Mixer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,19 @@ bool Mixer::reconfigureAudioStream(const std::string& endpointId, const utils::O
audioStream->remoteSsrc.isSet() ? audioStream->remoteSsrc.get() : 0u);
}

bool Mixer::reconfigureAudioStreamNeighbours(const std::string& endpointId, const std::vector<uint32_t>& neighbours)
{
std::lock_guard<std::mutex> locker(_configurationLock);
auto audioStreamItr = _audioStreams.find(endpointId);
if (audioStreamItr == _audioStreams.end())
{
return false;
}
auto audioStream = audioStreamItr->second.get();
audioStream->neighbours = neighbours;
return _engineMixer->asyncReconfigureNeighbours(*audioStream->transport, neighbours);
}

bool Mixer::configureVideoStream(const std::string& endpointId,
const RtpMap& rtpMap,
const RtpMap& feedbackRtpMap,
Expand Down
1 change: 1 addition & 0 deletions bridge/Mixer.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class Mixer
const std::vector<uint32_t>& neighbours);

bool reconfigureAudioStream(const std::string& endpointId, const utils::Optional<uint32_t>& remoteSsrc);
bool reconfigureAudioStreamNeighbours(const std::string& endpointId, const std::vector<uint32_t>& neighbours);

bool configureVideoStream(const std::string& endpointId,
const RtpMap& rtpMap,
Expand Down
17 changes: 17 additions & 0 deletions bridge/endpointActions/ConferenceActions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -836,13 +836,20 @@ httpd::Response reconfigureEndpoint(ActionContext* context,

const bool isAudioSet = endpointDescription.audio.isSet();
const bool isVideoSet = endpointDescription.video.isSet();
const bool isNeighboursSet = !endpointDescription.neighbours.empty();

if (isAudioSet && !mixer->isAudioStreamConfigured(endpointId))
{
throw httpd::RequestErrorException(httpd::StatusCode::BAD_REQUEST,
"Can't reconfigure audio because it was not configured in first place");
}

if (isNeighboursSet && !mixer->isAudioStreamConfigured(endpointId))
{
throw httpd::RequestErrorException(httpd::StatusCode::BAD_REQUEST,
"Can't reconfigure neighbours because audio stream was not configured in first place");
}

if (isVideoSet && !mixer->isVideoStreamConfigured(endpointId))
{
throw httpd::RequestErrorException(httpd::StatusCode::BAD_REQUEST,
Expand All @@ -865,6 +872,16 @@ httpd::Response reconfigureEndpoint(ActionContext* context,
}
}

if (isNeighboursSet)
{
auto neighbours = convertGroupIds(endpointDescription.neighbours);
if (!mixer->reconfigureAudioStreamNeighbours(endpointId, neighbours))
{
throw httpd::RequestErrorException(httpd::StatusCode::INTERNAL_SERVER_ERROR,
"Fail to reconfigure audio stream's neighbours setting");
}
}

if (isVideoSet)
{
const auto& video = endpointDescription.video.get();
Expand Down
2 changes: 2 additions & 0 deletions bridge/engine/EngineMixer.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class EngineMixer : public transport::DataReceiver
bool asyncRemoveStream(const EngineDataStream* stream);
bool asyncAddVideoPacketCache(const uint32_t ssrc, const size_t endpointIdHash, PacketCache* videoPacketCache);
bool asyncReconfigureAudioStream(const transport::RtcTransport& transport, const uint32_t remoteSsrc);
bool asyncReconfigureNeighbours(const transport::RtcTransport& transport, const std::vector<uint32_t>& neighbours);
bool asyncStartTransport(transport::RtcTransport& transport);
bool asyncAddAudioStream(EngineAudioStream* engineAudioStream);
bool asyncAddVideoStream(EngineVideoStream* engineVideoStream);
Expand Down Expand Up @@ -206,6 +207,7 @@ class EngineMixer : public transport::DataReceiver
void startTransport(transport::RtcTransport& transport);
void startRecordingTransport(transport::RecordingTransport& transport);
void reconfigureAudioStream(const transport::RtcTransport& transport, const uint32_t remoteSsrc);
void reconfigureNeighbours(const transport::RtcTransport& transport, const std::vector<uint32_t>& neighbours);
void addVideoPacketCache(const uint32_t ssrc, const size_t endpointIdHash, PacketCache* videoPacketCache);
void pinEndpoint(const size_t endpointIdHash, const size_t targetEndpointIdHash);
void sendEndpointMessage(const size_t toEndpointIdHash,
Expand Down
38 changes: 38 additions & 0 deletions bridge/engine/EngineMixerAudio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,38 @@ void EngineMixer::reconfigureAudioStream(const transport::RtcTransport& transpor
updateBandwidthFloor();
}

void EngineMixer::reconfigureNeighbours(const transport::RtcTransport& transport,
const std::vector<uint32_t>& neighbourList)
{
auto* engineAudioStream = _engineAudioStreams.getItem(transport.getEndpointIdHash());
if (!engineAudioStream)
{
return;
}

engineAudioStream->neighbours.clear();

for (auto& neighbour : neighbourList)
{
engineAudioStream->neighbours.add(neighbour, true);
}

const auto endpointIdHash = engineAudioStream->endpointIdHash;
auto neighbourIt = _neighbourMemberships.find(endpointIdHash);
if (neighbourIt != _neighbourMemberships.end())
{
auto& neighbourList = neighbourIt->second.memberships;
for (auto& it : engineAudioStream->neighbours)
{
neighbourList.push_back(it.first);
}
}
else
{
logger::error("Failed to update neighbour list for audio stream %zu", _loggableId.c_str(), endpointIdHash);
}
}

void EngineMixer::onAudioRtpPacketReceived(SsrcInboundContext& ssrcContext,
transport::RtcTransport* sender,
memory::UniquePacket packet,
Expand Down Expand Up @@ -477,6 +509,12 @@ bool EngineMixer::asyncReconfigureAudioStream(const transport::RtcTransport& tra
return post(utils::bind(&EngineMixer::reconfigureAudioStream, this, std::cref(transport), remoteSsrc));
}

bool EngineMixer::asyncReconfigureNeighbours(const transport::RtcTransport& transport,
const std::vector<uint32_t>& neighbours)
{
return post(utils::bind(&EngineMixer::reconfigureNeighbours, this, std::cref(transport), neighbours));
}

bool EngineMixer::asyncAddAudioStream(EngineAudioStream* engineAudioStream)
{
return post(utils::bind(&EngineMixer::addAudioStream, this, engineAudioStream));
Expand Down
108 changes: 108 additions & 0 deletions test/integration/ConfIntegrationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,114 @@ TEST_F(IntegrationTest, neighbours)
});
}

TEST_F(IntegrationTest, dynamicNeighbours)
{
runTestInThread(expectedTestThreadCount(1), [this]() {
_config.readFromString(_defaultSmbConfig);

initBridge(_config);
const auto baseUrl = "http://127.0.0.1:8080";

GroupCall<SfuClient<Channel>> group(_httpd,
_instanceCounter,
*_mainPoolAllocator,
_audioAllocator,
*_clientTransportFactory,
*_publicTransportFactory,
*_sslDtls,
4);

Conference conf(_httpd);

ScopedFinalize finalize(std::bind(&IntegrationTest::finalizeSimulation, this));
startSimulation();

group.startConference(conf, baseUrl);

std::string neighbourGroupName = "gid1";
auto neighbourGroups = nlohmann::json::array();
neighbourGroups.push_back(neighbourGroupName);

CallConfigBuilder cfgBuilder(conf.getId());
cfgBuilder.url(baseUrl).withOpus().withVideo();

group.clients[0]->initiateCall(cfgBuilder.build());
group.clients[1]->joinCall(cfgBuilder.build());
group.clients[2]->joinCall(cfgBuilder.build());
group.clients[3]->joinCall(cfgBuilder.mixed().build());

// 600, 1300, 2100, 3200
ASSERT_TRUE(group.connectAll(utils::Time::sec * _clientsConnectionTimeout));

nlohmann::json responseBody;
const auto conferenceId = conf.getId();

for (int i = 1; i < 4; i++)
{
auto endpointId = group.clients[i]->getEndpointId();

nlohmann::json body = {{"action", "reconfigure"}};
body["neighbours"] = {{"groups", neighbourGroups}};

auto neighboursSet = emulator::awaitResponse<HttpPutRequest>(_httpd,
std::string(baseUrl) + "/conferences/" + conferenceId + "/" + endpointId,
body.dump(),
1.5 * utils::Time::sec,
responseBody);
EXPECT_TRUE(neighboursSet);
}

make5secCallWithDefaultAudioProfile(group);

auto statsSuccess = emulator::awaitResponse<HttpGetRequest>(_httpd,
std::string(baseUrl) + "/stats",
1500 * utils::Time::ms,
responseBody);
EXPECT_TRUE(statsSuccess);

auto confRequest = emulator::awaitResponse<HttpGetRequest>(_httpd,
std::string(baseUrl) + "/conferences",
1500 * utils::Time::ms,
responseBody);
EXPECT_TRUE(confRequest);

group.stopTransports();

group.awaitPendingJobs(utils::Time::sec * 4);
finalizeSimulation();

const size_t chMixed[] = {0, 0, 0, 1};
AudioAnalysisData results[4];
for (size_t id = 0; id < 4; ++id)
{
results[id] = analyzeRecording<SfuClient<Channel>>(group.clients[id].get(), 5, true, chMixed[id]);

std::unordered_map<uint32_t, transport::ReportSummary> transportSummary;
std::string clientName = "client_" + std::to_string(id);
group.clients[id]->getReportSummary(transportSummary);
logTransportSummary(clientName.c_str(), transportSummary);

logVideoSent(clientName.c_str(), *group.clients[id]);
logVideoReceive(clientName.c_str(), *group.clients[id]);
}

EXPECT_EQ(results[0].audioSsrcCount, 3u);
EXPECT_EQ(results[1].audioSsrcCount, 1u);
EXPECT_EQ(results[2].audioSsrcCount, 1u);
EXPECT_EQ(results[3].audioSsrcCount, 1u);

EXPECT_EQ(results[0].dominantFrequencies.size(), 3);
EXPECT_EQ(results[1].dominantFrequencies.size(), 1);
EXPECT_EQ(results[2].dominantFrequencies.size(), 1);
EXPECT_EQ(results[3].dominantFrequencies.size(), 1);

if (results[3].dominantFrequencies.size() > 0)
{
EXPECT_NEAR(results[3].dominantFrequencies[0], 600.0, 50.0);
}
});
}

class WebRtcListenerMock : public webrtc::WebRtcDataStream::Listener
{
public:
Expand Down
Loading