Skip to content

Commit

Permalink
Fixed a deadlock problem in the transcoder when updating the stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Keukhan committed Dec 4, 2024
1 parent 79a62e6 commit 09ca9aa
Showing 1 changed file with 69 additions and 43 deletions.
112 changes: 69 additions & 43 deletions src/projects/transcoder/transcoder_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,56 +271,66 @@ bool TranscoderStream::UpdateInternal(const std::shared_ptr<info::Stream> &strea
void TranscoderStream::RemoveDecoders()
{
std::unique_lock<std::shared_mutex> decoder_lock(_decoder_map_mutex);

for (auto &it : _decoders)

auto decoders = _decoders;
_decoders.clear();

decoder_lock.unlock();

for (auto &[id, object] : decoders)
{
auto object = it.second;
if (object != nullptr)
{
object->Stop();
object.reset();
}
}
_decoders.clear();
}

void TranscoderStream::RemoveFilters()
{
std::unique_lock<std::shared_mutex> filter_lock(_filter_map_mutex);

for (auto &it : _filters)

auto filters = _filters;
_filters.clear();

filter_lock.unlock();

for (auto &[id, object] : filters)
{
auto object = it.second;
if (object != nullptr)
{
object->Stop();
object.reset();
}
}
_filters.clear();
}

void TranscoderStream::RemoveEncoders()
{
std::unique_lock<std::shared_mutex> encoder_lock(_encoder_map_mutex);
for (auto &[encoder_id, encoder_pair] : _encoders)

auto encoders = _encoders;
_encoders.clear();

encoder_lock.unlock();

for (auto &[id, object] : encoders)
{
auto filter = encoder_pair.first;
auto encoder = encoder_pair.second;

if(filter != nullptr)
auto filter = object.first;
if (filter != nullptr)
{
filter->Stop();
filter.reset();
}

if(encoder != nullptr)
auto encoder = object.second;
if (encoder != nullptr)
{
encoder->Stop();
encoder.reset();
}
}
_encoders.clear();
}

std::shared_ptr<MediaTrack> TranscoderStream::GetInputTrack(MediaTrackId track_id)
Expand Down Expand Up @@ -554,7 +564,7 @@ std::shared_ptr<info::Stream> TranscoderStream::CreateOutputStream(const cfg::vh
auto output_track = CreateOutputTrack(input_track, profile);
if (output_track == nullptr)
{
logtw("Failed to create media tracks. Encoding options need to be checked. track_id(%d)", input_track_id);
logtw("Failed to create media tracks. Encoding options need to be checked. InputTrack(%d)", input_track_id);
continue;
}

Expand All @@ -569,7 +579,7 @@ std::shared_ptr<info::Stream> TranscoderStream::CreateOutputStream(const cfg::vh
auto output_track = CreateOutputTrack(input_track, profile);
if (output_track == nullptr)
{
logtw("Failed to create media tracks. Encoding options need to be checked. track_id(%d)", input_track_id);
logtw("Failed to create media tracks. Encoding options need to be checked. InputTrack(%d)", input_track_id);
continue;
}

Expand All @@ -586,7 +596,7 @@ std::shared_ptr<info::Stream> TranscoderStream::CreateOutputStream(const cfg::vh
auto output_track = CreateOutputTrack(input_track, profile);
if (output_track == nullptr)
{
logtw("Failed to create media tracks. Encoding options need to be checked. track_id(%d)", input_track_id);
logtw("Failed to create media tracks. Encoding options need to be checked. InputTrack(%d)", input_track_id);
continue;
}

Expand All @@ -602,7 +612,7 @@ std::shared_ptr<info::Stream> TranscoderStream::CreateOutputStream(const cfg::vh
auto output_track = CreateOutputTrackDataType(input_track);
if (output_track == nullptr)
{
logtw("Failed to create media tracks. Encoding options need to be checked. track_id(%d)", input_track_id);
logtw("Failed to create media tracks. Encoding options need to be checked. InputTrack(%d)", input_track_id);
continue;
}

Expand Down Expand Up @@ -859,6 +869,7 @@ std::shared_ptr<TranscodeDecoder> TranscoderStream::GetDecoder(MediaTrackId deco
void TranscoderStream::SetDecoder(MediaTrackId decoder_id, std::shared_ptr<TranscodeDecoder> decoder)
{
std::unique_lock<std::shared_mutex> decoder_lock(_decoder_map_mutex);

_decoders[decoder_id] = decoder;
}

Expand Down Expand Up @@ -910,7 +921,7 @@ bool TranscoderStream::CreateEncoder(MediaTrackId encoder_id, std::shared_ptr<in
{
if (GetEncoder(encoder_id).has_value())
{
logtd("%s Encoder already exists. Encoder(%d) > OutputTrack(%d)", _log_prefix.CStr(), encoder_id, output_track->GetId());
logtd("%s Encoder already exists. Encoder(%d)", _log_prefix.CStr(), encoder_id);
return true;
}

Expand Down Expand Up @@ -983,6 +994,7 @@ std::optional<std::pair<std::shared_ptr<TranscodeFilter>, std::shared_ptr<Transc
void TranscoderStream::SetEncoder(MediaTrackId encoder_id, std::shared_ptr<TranscodeFilter> filter, std::shared_ptr<TranscodeEncoder> encoder)
{
std::unique_lock<std::shared_mutex> encoder_lock(_encoder_map_mutex);

_encoders[encoder_id] = std::make_pair(filter, encoder);
}

Expand All @@ -1006,16 +1018,34 @@ int32_t TranscoderStream::CreateFilters(std::shared_ptr<MediaFrame> buffer)
for (auto &filter_id : filter_ids)
{
MediaTrackId encoder_id = _link_filter_to_encoder[filter_id];
auto encoder = GetEncoder(encoder_id);
if(encoder.has_value() == false)
auto encoder_set = GetEncoder(encoder_id);
if(encoder_set.has_value() == false)
{
logte("%s Failed to create filter. could not found encoder set. Encoder(%d), Filter(%d)", _log_prefix.CStr(), encoder_id, filter_id);
continue;
}

auto encoder = encoder_set->second;
if(encoder == nullptr)
{
logte("%s Failed to create filter. could not found encoder. EncoderId(%d), FilterId(%d)", _log_prefix.CStr(), encoder_id, filter_id);
logte("%s Failed to create filter. could not found encoder. Encoder(%d), Filter(%d)", _log_prefix.CStr(), encoder_id, filter_id);
continue;
}

auto input_track = GetDecoder(decoder_id)->GetRefTrack();
auto decoder = GetDecoder(decoder_id);
if(decoder == nullptr)
{
logte("%s Failed to create filter. could not found decoder. Decoder(%d), Filter(%d)", _log_prefix.CStr(), decoder_id, filter_id);
continue;
}

auto output_track = encoder->second->GetRefTrack();
auto input_track = decoder->GetRefTrack();
auto output_track = encoder->GetRefTrack();
if(input_track == nullptr || output_track == nullptr)
{
logte("%s Failed to create filter. could not found input or output track. Decoder(%d), Encoder(%d), Filter(%d)", _log_prefix.CStr(), decoder_id, encoder_id, filter_id);
continue;
}

if (CreateFilter(filter_id, input_track, output_track) == false)
{
Expand All @@ -1032,7 +1062,7 @@ bool TranscoderStream::CreateFilter(MediaTrackId filter_id, std::shared_ptr<Medi
{
if(GetFilter(filter_id) != nullptr)
{
logtw("%s Filter already exists. FilterId(%d)", _log_prefix.CStr(), filter_id);
logtw("%s Filter already exists. Filter(%d)", _log_prefix.CStr(), filter_id);
return true;
}

Expand All @@ -1053,13 +1083,13 @@ bool TranscoderStream::CreateFilter(MediaTrackId filter_id, std::shared_ptr<Medi
auto filter = TranscodeFilter::Create(filter_id, input_stream, input_track, output_stream, output_track, bind(&TranscoderStream::OnFilteredFrame, this, std::placeholders::_1, std::placeholders::_2));
if (filter == nullptr)
{
logte("%s Failed to create filter. FilterId(%d)", _log_prefix.CStr(), filter_id);
logte("%s Failed to create filter. Filter(%d)", _log_prefix.CStr(), filter_id);
return false;
}

SetFilter(filter_id, filter);

logtd("%s Created Filter. FilterId(%d)", _log_prefix.CStr(), filter_id);
logtd("%s Created Filter. Filter(%d)", _log_prefix.CStr(), filter_id);

return true;
}
Expand All @@ -1078,6 +1108,7 @@ std::shared_ptr<TranscodeFilter> TranscoderStream::GetFilter(MediaTrackId filter
void TranscoderStream::SetFilter(MediaTrackId filter_id, std::shared_ptr<TranscodeFilter> filter)
{
std::unique_lock<std::shared_mutex> lock(_filter_map_mutex);

_filters[filter_id] = filter;
}

Expand Down Expand Up @@ -1105,7 +1136,7 @@ void TranscoderStream::ChangeOutputFormat(std::shared_ptr<MediaFrame> buffer)
{
logtw("%s No encoders have been created. InputTrack(%u)", _log_prefix.CStr(), buffer->GetTrackId());

SetState(State::ERROR);
// SetState(State::ERROR);
return;
}

Expand All @@ -1114,7 +1145,7 @@ void TranscoderStream::ChangeOutputFormat(std::shared_ptr<MediaFrame> buffer)
{
logtw("%s No filters have been created. InputTrack(%u)", _log_prefix.CStr(), buffer->GetTrackId());

SetState(State::ERROR);
// SetState(State::ERROR);
return;
}

Expand Down Expand Up @@ -1180,7 +1211,7 @@ void TranscoderStream::UpdateOutputTrack(std::shared_ptr<MediaFrame> buffer)
{
UNUSED_VARIABLE(output_stream)

logtd("%s Updated output track. InputTrack(%u), OutputTrack(%u)", _log_prefix.CStr(), input_track_id, output_track->GetId());
logtd("%s Updated output track. OutputTrack(%u)", _log_prefix.CStr(), input_track_id, output_track->GetId());

// Case of Passthrough
if (output_track->IsBypass() == true)
Expand Down Expand Up @@ -1503,7 +1534,7 @@ TranscodeResult TranscoderStream::PreEncodeFilterFrame(std::shared_ptr<MediaFram
{
auto filter_id = frame->GetTrackId();

// Get Encoder ID
// Get Encoder ID form Filter ID
auto filter_to_encoder_it = _link_filter_to_encoder.find(filter_id);
if (filter_to_encoder_it == _link_filter_to_encoder.end())
{
Expand All @@ -1518,28 +1549,23 @@ TranscodeResult TranscoderStream::PreEncodeFilterFrame(std::shared_ptr<MediaFram
return TranscodeResult::NoData;
}

// If the encoder has a filter, it is passed to the filter.
// If the encoder has a pre-encode filter, it is passed to the filter.
auto encode_filter = encoder_set->first;
if (encode_filter != nullptr)
{
encode_filter->SendBuffer(std::move(frame));
return TranscodeResult::DataReady;
}

auto encoder = encoder_set->second;
if (encoder == nullptr)
{
return TranscodeResult::NoData;
return TranscodeResult::DataReady;
}

encoder->SendBuffer(std::move(frame));

OnPreEncodeFilteredFrame(encoder_id, std::move(frame));
return TranscodeResult::DataReady;
}

void TranscoderStream::OnPreEncodeFilteredFrame(MediaTrackId filter_id, std::shared_ptr<MediaFrame> filtered_frame)
void TranscoderStream::OnPreEncodeFilteredFrame(MediaTrackId encoder_id, std::shared_ptr<MediaFrame> filtered_frame)
{
filtered_frame->SetTrackId(filter_id);
filtered_frame->SetTrackId(encoder_id);

EncodeFrame(std::move(filtered_frame));
}
Expand Down

0 comments on commit 09ca9aa

Please sign in to comment.