diff --git a/src/projects/transcoder/filter/filter_base.h b/src/projects/transcoder/filter/filter_base.h index 2335ac925..c931782a2 100644 --- a/src/projects/transcoder/filter/filter_base.h +++ b/src/projects/transcoder/filter/filter_base.h @@ -63,6 +63,13 @@ class FilterBase return _src_height; } + // If the input track and output track are the same, the filter is used for a single track. + // The main goal of this filter is to handle frame drops. + bool IsSingleTrack() const + { + return (_input_track == _output_track)? true : false; + } + void SetCompleteHandler(CompleteHandler complete_handler) { _complete_handler = complete_handler; } diff --git a/src/projects/transcoder/filter/filter_resampler.cpp b/src/projects/transcoder/filter/filter_resampler.cpp index b46f7afe8..7ec7033fc 100644 --- a/src/projects/transcoder/filter/filter_resampler.cpp +++ b/src/projects/transcoder/filter/filter_resampler.cpp @@ -103,11 +103,17 @@ bool FilterResampler::InitializeFilterDescription() { std::vector filters; - filters.push_back(ov::String::FormatString("asettb=%s", _output_track->GetTimeBase().GetStringExpr().CStr())); - filters.push_back(ov::String::FormatString("aresample=async=1000")); - filters.push_back(ov::String::FormatString("aresample=%d", _output_track->GetSampleRate())); - filters.push_back(ov::String::FormatString("aformat=sample_fmts=%s:channel_layouts=%s", _output_track->GetSample().GetName(), _output_track->GetChannel().GetName())); - filters.push_back(ov::String::FormatString("asetnsamples=n=%d", _output_track->GetAudioSamplesPerFrame())); + if(IsSingleTrack()) + { + filters.push_back(ov::String::FormatString("aresample=async=1")); + filters.push_back(ov::String::FormatString("asetnsamples=n=%d", _output_track->GetAudioSamplesPerFrame())); + } + else + { + filters.push_back(ov::String::FormatString("asettb=%s", _output_track->GetTimeBase().GetStringExpr().CStr())); + filters.push_back(ov::String::FormatString("aresample=%d", _output_track->GetSampleRate())); + filters.push_back(ov::String::FormatString("aformat=sample_fmts=%s:channel_layouts=%s", _output_track->GetSample().GetName(), _output_track->GetChannel().GetName())); + } if (filters.size() == 0) { @@ -227,7 +233,8 @@ void FilterResampler::Stop() void FilterResampler::WorkerThread() { - if(_codec_init_event.Submit(Configure(_input_track, _output_track)) == false) + auto result = Configure(_input_track, _output_track); + if (_codec_init_event.Submit(result) == false) { return; } @@ -256,6 +263,8 @@ void FilterResampler::WorkerThread() break; } + // logtw("Resampled in frame. pts: %lld, linesize: %d, samples: %d", av_frame->pts, av_frame->linesize[0], av_frame->nb_samples); + ret = ::av_buffersrc_write_frame(_buffersrc_ctx, av_frame); if (ret < 0) { @@ -291,6 +300,7 @@ void FilterResampler::WorkerThread() } else { + // logti("Resampled out frame. pts: %lld, linesize: %d, samples : %d", _frame->pts, _frame->linesize[0], _frame->nb_samples); auto output_frame = ffmpeg::Conv::ToMediaFrame(cmn::MediaType::Audio, _frame); ::av_frame_unref(_frame); if (output_frame == nullptr) diff --git a/src/projects/transcoder/filter/filter_rescaler.cpp b/src/projects/transcoder/filter/filter_rescaler.cpp index 8e75fb305..68bb36c1e 100644 --- a/src/projects/transcoder/filter/filter_rescaler.cpp +++ b/src/projects/transcoder/filter/filter_rescaler.cpp @@ -101,173 +101,181 @@ bool FilterRescaler::InitializeFilterDescription() { std::vector filters; - // 2. Timebase - filters.push_back(ov::String::FormatString("settb=%s", _output_track->GetTimeBase().GetStringExpr().CStr())); - - // 3. Scaler - auto input_module_id = _input_track->GetCodecModuleId(); - auto input_device_id = _input_track->GetCodecDeviceId(); - auto output_module_id = _output_track->GetCodecModuleId(); - auto output_device_id = _output_track->GetCodecDeviceId(); - - // Scaler is performed on the same device as the encoder(output module) - ov::String desc = ""; - - if (output_module_id == cmn::MediaCodecModuleId::DEFAULT || - output_module_id == cmn::MediaCodecModuleId::BEAMR || - output_module_id == cmn::MediaCodecModuleId::OPENH264 || - output_module_id == cmn::MediaCodecModuleId::X264 || - output_module_id == cmn::MediaCodecModuleId::QSV || - output_module_id == cmn::MediaCodecModuleId::LIBVPX || - // Until now, Logan VPU processes in CPU memory like SW-based modules. Performance needs to be improved in the future - output_module_id == cmn::MediaCodecModuleId::NILOGAN - /* || output_module_id == cmn::MediaCodecModuleId::libx26x */) + if (IsSingleTrack()) { - switch (input_module_id) + // No need to rescale if the input and output are the same. + } + else + { + // 2. Timebase + filters.push_back(ov::String::FormatString("settb=%s", _output_track->GetTimeBase().GetStringExpr().CStr())); + + // 3. Scaler + auto input_module_id = _input_track->GetCodecModuleId(); + auto input_device_id = _input_track->GetCodecDeviceId(); + auto output_module_id = _output_track->GetCodecModuleId(); + auto output_device_id = _output_track->GetCodecDeviceId(); + + // Scaler is performed on the same device as the encoder(output module) + ov::String desc = ""; + + if (output_module_id == cmn::MediaCodecModuleId::DEFAULT || + output_module_id == cmn::MediaCodecModuleId::BEAMR || + output_module_id == cmn::MediaCodecModuleId::OPENH264 || + output_module_id == cmn::MediaCodecModuleId::X264 || + output_module_id == cmn::MediaCodecModuleId::QSV || + output_module_id == cmn::MediaCodecModuleId::LIBVPX || + // Until now, Logan VPU processes in CPU memory like SW-based modules. Performance needs to be improved in the future + output_module_id == cmn::MediaCodecModuleId::NILOGAN + /* || output_module_id == cmn::MediaCodecModuleId::libx26x */) { - case cmn::MediaCodecModuleId::NVENC: + switch (input_module_id) { - // Use the av_hwframe_transfer_data function to enable copying from GPU memory to CPU memory. - _use_hwframe_transfer = true; + case cmn::MediaCodecModuleId::NVENC: { + // Use the av_hwframe_transfer_data function to enable copying from GPU memory to CPU memory. + _use_hwframe_transfer = true; - // Change the pixel format of the source filter to the SW pixel format supported by the hardware. - auto hw_device_ctx = TranscodeGPU::GetInstance()->GetDeviceContext(input_module_id, input_device_id); - if (hw_device_ctx == nullptr) + // Change the pixel format of the source filter to the SW pixel format supported by the hardware. + auto hw_device_ctx = TranscodeGPU::GetInstance()->GetDeviceContext(input_module_id, input_device_id); + if (hw_device_ctx == nullptr) + { + logte("Could not get hw device context for %s(%d)", cmn::GetStringFromCodecModuleId(input_module_id).CStr(), input_device_id); + return false; + } + auto constraints = av_hwdevice_get_hwframe_constraints(hw_device_ctx, nullptr); + _src_pixfmt = *(constraints->valid_sw_formats); + desc = ov::String::FormatString(""); + } + break; + case cmn::MediaCodecModuleId::XMA: { + // Copy the frames in Xilinx Device memory to the CPU memory using the xvbm_convert filter. + desc = ov::String::FormatString("xvbm_convert,"); + } + break; + default: + logtw("Unsupported input module: %s", cmn::GetStringFromCodecModuleId(input_module_id).CStr()); + case cmn::MediaCodecModuleId::X264: + case cmn::MediaCodecModuleId::QSV: // CPU memory using 'gpu_copy=on' + case cmn::MediaCodecModuleId::NILOGAN: // CPU memory using 'out=sw' + case cmn::MediaCodecModuleId::DEFAULT: // CPU memory { - logte("Could not get hw device context for %s(%d)", cmn::GetStringFromCodecModuleId(input_module_id).CStr(), input_device_id); - return false; + desc = ov::String::FormatString(""); } - auto constraints = av_hwdevice_get_hwframe_constraints(hw_device_ctx, nullptr); - _src_pixfmt = *(constraints->valid_sw_formats); - desc = ov::String::FormatString(""); - } - break; - case cmn::MediaCodecModuleId::XMA: - { - // Copy the frames in Xilinx Device memory to the CPU memory using the xvbm_convert filter. - desc = ov::String::FormatString("xvbm_convert,"); - } - break; - default: - logtw("Unsupported input module: %s", cmn::GetStringFromCodecModuleId(input_module_id).CStr()); - case cmn::MediaCodecModuleId::QSV: // CPU memory using 'gpu_copy=on' - case cmn::MediaCodecModuleId::NILOGAN: // CPU memory using 'out=sw' - case cmn::MediaCodecModuleId::DEFAULT: // CPU memory - { - desc = ov::String::FormatString(""); } + // Scaler description of defulat module + desc += ov::String::FormatString("scale=%dx%d:flags=bilinear", _output_track->GetWidth(), _output_track->GetHeight()); } - // Scaler description of defulat module - desc += ov::String::FormatString("scale=%dx%d:flags=bilinear", _output_track->GetWidth(), _output_track->GetHeight()); - } - else if (output_module_id == cmn::MediaCodecModuleId::NVENC) - { - switch (input_module_id) + else if (output_module_id == cmn::MediaCodecModuleId::NVENC) { - case cmn::MediaCodecModuleId::NVENC: { // Zero Copy - //TODO: Exception handling required if Device ID is different. Find out if memory copy between GPUs is possible - desc = ov::String::FormatString(""); - } - break; - case cmn::MediaCodecModuleId::XMA: { - desc = ov::String::FormatString("xvbm_convert,hwupload_cuda=device=%d,",output_device_id); - } - break; - default: - logtw("Unsupported input module: %s", cmn::GetStringFromCodecModuleId(input_module_id).CStr()); - case cmn::MediaCodecModuleId::QSV: // CPU memory using 'gpu_copy=on' - case cmn::MediaCodecModuleId::NILOGAN: // CPU memory using 'out=sw' - case cmn::MediaCodecModuleId::DEFAULT: // CPU memory + switch (input_module_id) { - desc = ov::String::FormatString("hwupload_cuda=device=%d,", output_device_id); + case cmn::MediaCodecModuleId::NVENC: { // Zero Copy + //TODO: Exception handling required if Device ID is different. Find out if memory copy between GPUs is possible + desc = ov::String::FormatString(""); + } + break; + case cmn::MediaCodecModuleId::XMA: { + desc = ov::String::FormatString("xvbm_convert,hwupload_cuda=device=%d,", output_device_id); + } + break; + default: + logtw("Unsupported input module: %s", cmn::GetStringFromCodecModuleId(input_module_id).CStr()); + case cmn::MediaCodecModuleId::X264: + case cmn::MediaCodecModuleId::QSV: // CPU memory using 'gpu_copy=on' + case cmn::MediaCodecModuleId::NILOGAN: // CPU memory using 'out=sw' + case cmn::MediaCodecModuleId::DEFAULT: // CPU memory + { + desc = ov::String::FormatString("hwupload_cuda=device=%d,", output_device_id); + } } + desc += ov::String::FormatString("scale_npp=%d:%d", _output_track->GetWidth(), _output_track->GetHeight()); } - desc += ov::String::FormatString("scale_npp=%d:%d", _output_track->GetWidth(), _output_track->GetHeight()); - } - else if (output_module_id == cmn::MediaCodecModuleId::XMA) - { - // multiscale_xma only supports resolutions multiple of 4. - bool need_crop_for_multiple_of_4 = (_input_track->GetHeight() % 4 != 0 || _input_track->GetHeight() % 4 != 0);; - if(need_crop_for_multiple_of_4) + else if (output_module_id == cmn::MediaCodecModuleId::XMA) { - logtw("multiscale_xma only supports resolutions multiple of 4. The resolution will be cropped to a multiple of 4."); - } + // multiscale_xma only supports resolutions multiple of 4. + bool need_crop_for_multiple_of_4 = (_input_track->GetHeight() % 4 != 0 || _input_track->GetHeight() % 4 != 0); + if (need_crop_for_multiple_of_4) + { + logtw("multiscale_xma only supports resolutions multiple of 4. The resolution will be cropped to a multiple of 4."); + } - int32_t desire_width = _input_track->GetWidth() - _input_track->GetWidth() % 4; - int32_t desire_height = _input_track->GetHeight() - _input_track->GetHeight() % 4; + int32_t desire_width = _input_track->GetWidth() - _input_track->GetWidth() % 4; + int32_t desire_height = _input_track->GetHeight() - _input_track->GetHeight() % 4; - switch (input_module_id) - { - case cmn::MediaCodecModuleId::XMA: { // Zero Copy - if (input_device_id != output_device_id) - { - desc = ov::String::FormatString("xvbm_convert,"); + switch (input_module_id) + { + case cmn::MediaCodecModuleId::XMA: { // Zero Copy + if (input_device_id != output_device_id) + { + desc = ov::String::FormatString("xvbm_convert,"); + if (need_crop_for_multiple_of_4) + { + desc += ov::String::FormatString("crop=%d:%d:0:0,", desire_width, desire_height); + } + } + else + { + desc = ov::String::FormatString(""); + if (need_crop_for_multiple_of_4) + { + desc += ov::String::FormatString("xvbm_convert,crop=%d:%d:0:0,", desire_width, desire_height); + } + } + } + break; + case cmn::MediaCodecModuleId::NVENC: { + // Use the av_hwframe_transfer_data function to enable copying from GPU memory to CPU memory. + _use_hwframe_transfer = true; + + // Change the pixel format of the source filter to the SW pixel format supported by the hardware. + auto hw_device_ctx = TranscodeGPU::GetInstance()->GetDeviceContext(input_module_id, input_device_id); + if (hw_device_ctx == nullptr) + { + logte("Could not get hw device context for %s(%d)", cmn::GetStringFromCodecModuleId(input_module_id).CStr(), input_device_id); + return false; + } + auto constraints = av_hwdevice_get_hwframe_constraints(hw_device_ctx, nullptr); + _src_pixfmt = *(constraints->valid_sw_formats); + // desc = ov::String::FormatString("xvbm_convert,"); + desc = ov::String::FormatString(""); if (need_crop_for_multiple_of_4) { - desc += ov::String::FormatString("crop=%d:%d:0:0,", desire_width, desire_height); + desc += ov::String::FormatString("crop=%d:%d:0:0,", desire_width, desire_height); } } - else + break; + default: + logtw("Unsupported input module: %s", cmn::GetStringFromCodecModuleId(input_module_id).CStr()); + case cmn::MediaCodecModuleId::X264: // CPU memory + case cmn::MediaCodecModuleId::QSV: // CPU memory using 'gpu_copy=on' + case cmn::MediaCodecModuleId::NILOGAN: // CPU memory using 'out=sw' + case cmn::MediaCodecModuleId::DEFAULT: // CPU memory { + // xvbm_convert is xvbm frame to av frame converter filter + // desc = ov::String::FormatString("xvbm_convert,"); desc = ov::String::FormatString(""); if (need_crop_for_multiple_of_4) { - desc += ov::String::FormatString("xvbm_convert,crop=%d:%d:0:0,", desire_width, desire_height); - } + desc += ov::String::FormatString("crop=%d:%d:0:0,", desire_width, desire_height); + } } } - break; - case cmn::MediaCodecModuleId::NVENC: { - // Use the av_hwframe_transfer_data function to enable copying from GPU memory to CPU memory. - _use_hwframe_transfer = true; - // Change the pixel format of the source filter to the SW pixel format supported by the hardware. - auto hw_device_ctx = TranscodeGPU::GetInstance()->GetDeviceContext(input_module_id, input_device_id); - if (hw_device_ctx == nullptr) - { - logte("Could not get hw device context for %s(%d)", cmn::GetStringFromCodecModuleId(input_module_id).CStr(), input_device_id); - return false; - } - auto constraints = av_hwdevice_get_hwframe_constraints(hw_device_ctx, nullptr); - _src_pixfmt = *(constraints->valid_sw_formats); - // desc = ov::String::FormatString("xvbm_convert,"); - desc = ov::String::FormatString(""); - if (need_crop_for_multiple_of_4) - { - desc += ov::String::FormatString("crop=%d:%d:0:0,", desire_width, desire_height); - } - } - break; - default: - logtw("Unsupported input module: %s", cmn::GetStringFromCodecModuleId(input_module_id).CStr()); - case cmn::MediaCodecModuleId::QSV: // CPU memory using 'gpu_copy=on' - case cmn::MediaCodecModuleId::NILOGAN: // CPU memory using 'out=sw' - case cmn::MediaCodecModuleId::DEFAULT: // CPU memory - { - // xvbm_convert is xvbm frame to av frame converter filter - // desc = ov::String::FormatString("xvbm_convert,"); - desc = ov::String::FormatString(""); - if (need_crop_for_multiple_of_4) - { - desc += ov::String::FormatString("crop=%d:%d:0:0,", desire_width, desire_height); - } - } + desc += ov::String::FormatString("multiscale_xma=lxlnx_hwdev=%d:outputs=1:out_1_width=%d:out_1_height=%d:out_1_rate=full", + _output_track->GetCodecDeviceId(), _output_track->GetWidth(), _output_track->GetHeight()); + } + else + { + logtw("Unsupported output module id: %d", output_module_id); + return false; } - desc += ov::String::FormatString("multiscale_xma=lxlnx_hwdev=%d:outputs=1:out_1_width=%d:out_1_height=%d:out_1_rate=full", - _output_track->GetCodecDeviceId(), _output_track->GetWidth(), _output_track->GetHeight()); - } - else - { - logtw("Unsupported output module id: %d", output_module_id); - return false; - } - - filters.push_back(desc); - - // 4. Pixel Format - filters.push_back(ov::String::FormatString("format=%s", ::av_get_pix_fmt_name((AVPixelFormat)_output_track->GetColorspace()))); + filters.push_back(desc); + // 4. Pixel Format + filters.push_back(ov::String::FormatString("format=%s", ::av_get_pix_fmt_name((AVPixelFormat)_output_track->GetColorspace()))); + } + if(filters.size() == 0) { filters.push_back("null"); @@ -294,8 +302,7 @@ bool FilterRescaler::Configure(const std::shared_ptr &input_track, c // If the user is not the set output Framerate, use the measured Framerate _fps_filter.SetOutputFrameRate(_output_track->GetFrameRateByConfig() > 0 ? _output_track->GetFrameRateByConfig() : _output_track->GetEstimateFrameRate()); _fps_filter.SetSkipFrames(_output_track->GetSkipFramesByConfig() >= 0 ? _output_track->GetSkipFramesByConfig() : 0); - logtd("Created FPS filter. %s", _fps_filter.GetInfoString().CStr()); - + // Set the threshold of the input buffer to 2 seconds. _input_buffer.SetThreshold(_input_track->GetFrameRate() * 2); @@ -321,7 +328,7 @@ bool FilterRescaler::Configure(const std::shared_ptr &input_track, c return false; } - logti("Rescaler parameters. track(#%u -> #%u), module(%s:%d -> %s:%d). desc(src:%s -> output:%s)", + logti("Rescaler parameters. track(#%u -> #%u), module(%s:%d -> %s:%d). desc(src:%s -> output:%s), fps(%.2f -> %.2f), skipFrames(%d)", _input_track->GetId(), _output_track->GetId(), GetStringFromCodecModuleId(_input_track->GetCodecModuleId()).CStr(), @@ -329,7 +336,10 @@ bool FilterRescaler::Configure(const std::shared_ptr &input_track, c GetStringFromCodecModuleId(_output_track->GetCodecModuleId()).CStr(), _output_track->GetCodecDeviceId(), _src_args.CStr(), - _filter_desc.CStr()); + _filter_desc.CStr(), + _fps_filter.GetInputFrameRate(), + _fps_filter.GetOutputFrameRate(), + _fps_filter.GetSkipFrames()); if ((::avfilter_graph_parse_ptr(_filter_graph, _filter_desc, &_inputs, &_outputs, nullptr)) < 0) { diff --git a/src/projects/transcoder/transcoder_filter.cpp b/src/projects/transcoder/transcoder_filter.cpp index b64189b31..aa2d71ebd 100644 --- a/src/projects/transcoder/transcoder_filter.cpp +++ b/src/projects/transcoder/transcoder_filter.cpp @@ -18,24 +18,51 @@ TranscodeFilter::~TranscodeFilter() { } +std::shared_ptr TranscodeFilter::Create(int32_t id, + const std::shared_ptr& input_stream_info, std::shared_ptr input_track, + const std::shared_ptr& output_stream_info, std::shared_ptr output_track, + CompleteHandler complete_handler) +{ + auto filter = std::make_shared(); + if (filter->Configure(id, input_stream_info, input_track, output_stream_info, output_track) == false) + { + return nullptr; + } + filter->SetCompleteHandler(complete_handler); + return filter; +} + +std::shared_ptr TranscodeFilter::Create(int32_t id, + const std::shared_ptr& output_stream_info, std::shared_ptr output_track, + CompleteHandler complete_handler) +{ + auto filter = std::make_shared(); + if (filter->Configure(id, output_stream_info, output_track, output_stream_info, output_track) == false) + { + return nullptr; + } + filter->SetCompleteHandler(complete_handler); + return filter; +} + bool TranscodeFilter::Configure(int32_t id, const std::shared_ptr& input_stream_info, std::shared_ptr input_track, - const std::shared_ptr& output_stream_info, std::shared_ptr output_track, - CompleteHandler complete_handler) + const std::shared_ptr& output_stream_info, std::shared_ptr output_track) { _id = id; _input_stream_info = input_stream_info; _input_track = input_track; + + [[maybe_unused]] _output_stream_info = output_stream_info; _output_track = output_track; - _complete_handler = complete_handler; - + _timestamp_jump_threshold = (int64_t)_input_track->GetTimeBase().GetTimescale() * PTS_INCREMENT_LIMIT; - return Create(); + return CreateInternal(); } -bool TranscodeFilter::Create() +bool TranscodeFilter::CreateInternal() { std::lock_guard lock(_mutex); @@ -90,7 +117,7 @@ bool TranscodeFilter::SendBuffer(std::shared_ptr buffer) { if (IsNeedUpdate(buffer) == true) { - if (Create() == false) + if (CreateInternal() == false) { logte("Failed to regenerate filter"); return false; diff --git a/src/projects/transcoder/transcoder_filter.h b/src/projects/transcoder/transcoder_filter.h index a3f8a4744..5361ee949 100644 --- a/src/projects/transcoder/transcoder_filter.h +++ b/src/projects/transcoder/transcoder_filter.h @@ -23,15 +23,25 @@ class TranscodeFilter public: typedef std::function)> CompleteHandler; + static std::shared_ptr Create( + int32_t filter_id, + const std::shared_ptr &input_stream_info, std::shared_ptr input_track, + const std::shared_ptr &output_stream_info, std::shared_ptr output_track, + CompleteHandler complete_handler); + + static std::shared_ptr Create( + int32_t filter_id, + const std::shared_ptr &output_tsream_info, std::shared_ptr output_track, + CompleteHandler complete_handler); + public: TranscodeFilter(); ~TranscodeFilter(); - + bool Configure( int32_t filter_id, const std::shared_ptr &input_stream_info, std::shared_ptr input_track, - const std::shared_ptr &output_stream_info, std::shared_ptr output_track, - CompleteHandler complete_handler); + const std::shared_ptr &output_stream_info, std::shared_ptr output_track); bool SendBuffer(std::shared_ptr buffer); void Stop(); @@ -44,7 +54,7 @@ class TranscodeFilter void OnComplete(std::shared_ptr frame); private: - bool Create(); + bool CreateInternal(); bool IsNeedUpdate(std::shared_ptr buffer); int32_t _id; diff --git a/src/projects/transcoder/transcoder_stream.cpp b/src/projects/transcoder/transcoder_stream.cpp index 052123b05..332715910 100644 --- a/src/projects/transcoder/transcoder_stream.cpp +++ b/src/projects/transcoder/transcoder_stream.cpp @@ -134,7 +134,9 @@ bool TranscoderStream::Stop() logtd("%s Wait for stream thread to terminated", _log_prefix.CStr()); - RemoveAllComponents(); + RemoveDecoders(); + RemoveFilters(); + RemoveEncoders(); // Delete all composite of components _link_input_to_outputs.clear(); @@ -242,7 +244,7 @@ bool TranscoderStream::UpdateInternal(const std::shared_ptr &strea // - The input track should not change. if (IsAvailableSmoothTransition(stream) == true) { - logti("%s This stream will be a smooth transition", _log_prefix.CStr()); + logtd("%s This stream will be a smooth transition", _log_prefix.CStr()); RemoveDecoders(); RemoveFilters(); @@ -250,9 +252,11 @@ bool TranscoderStream::UpdateInternal(const std::shared_ptr &strea } else { - logti("%s This stream does not support smooth transitions. renew the all components", _log_prefix.CStr()); - RemoveAllComponents(); - + logtw("%s This stream does not support smooth transitions. renew the all components", _log_prefix.CStr()); + RemoveDecoders(); + RemoveFilters(); + RemoveEncoders(); + CreateDecoders(); UpdateMsidOfOutputStreams(stream->GetMsid()); @@ -264,21 +268,9 @@ bool TranscoderStream::UpdateInternal(const std::shared_ptr &strea return true; } -void TranscoderStream::RemoveAllComponents() -{ - // Stop all decoder - RemoveDecoders(); - - // Stop all filters - RemoveFilters(); - - // Stop all encoders - RemoveEncoders(); -} - void TranscoderStream::RemoveDecoders() { - std::lock_guard decoder_lock(_decoder_map_mutex); + std::unique_lock decoder_lock(_decoder_map_mutex); for (auto &it : _decoders) { @@ -291,7 +283,7 @@ void TranscoderStream::RemoveDecoders() void TranscoderStream::RemoveFilters() { - std::lock_guard filter_lock(_filter_map_mutex); + std::unique_lock filter_lock(_filter_map_mutex); for (auto &it : _filters) { @@ -304,12 +296,16 @@ void TranscoderStream::RemoveFilters() void TranscoderStream::RemoveEncoders() { - std::lock_guard encoder_lock(_encoder_map_mutex); - for (auto &iter : _encoders) + std::unique_lock encoder_lock(_encoder_map_mutex); + for (auto &[encoder_id, encoder_pair] : _encoders) { - auto object = iter.second; - object->Stop(); - object.reset(); + auto filter = encoder_pair.first; + auto encoder = encoder_pair.second; + + filter->Stop(); + filter.reset(); + encoder->Stop(); + encoder.reset(); } _encoders.clear(); } @@ -787,13 +783,9 @@ int32_t TranscoderStream::CreateDecoders() bool TranscoderStream::CreateDecoder(MediaTrackId decoder_id, std::shared_ptr input_stream, std::shared_ptr input_track) { - std::lock_guard decoder_lock(_decoder_map_mutex); - - // If there is an existing decoder, do not create decoder - if (_decoders.find(decoder_id) != _decoders.end()) + if(GetDecoder(decoder_id) != nullptr) { logtw("%s Decoder already exists. InputTrack(%d) > Decoder(%d)", _log_prefix.CStr(), input_track->GetId(), decoder_id); - return true; } @@ -807,12 +799,13 @@ bool TranscoderStream::CreateDecoder(MediaTrackId decoder_id, std::shared_ptrSetThreadCount(GetOutputProfilesCfg()->GetDecodes().GetThreadCount()); + auto hwaccels_enable = GetOutputProfilesCfg()->GetHWAccels().GetDecoder().IsEnable() || + GetOutputProfilesCfg()->IsHardwareAcceleration(); // Deprecated + + auto hwaccels_modules = GetOutputProfilesCfg()->GetHWAccels().GetDecoder().GetModules(); + // Get a list of available decoder candidates. - // TODO: GetOutputProfilesCfg()->IsHardwareAcceleration() is deprecated. It will be deleted soon. - auto candidates = TranscodeDecoder::GetCandidates( - GetOutputProfilesCfg()->GetHWAccels().GetDecoder().IsEnable() || GetOutputProfilesCfg()->IsHardwareAcceleration(), - GetOutputProfilesCfg()->GetHWAccels().GetDecoder().GetModules(), - input_track); + auto candidates = TranscodeDecoder::GetCandidates(hwaccels_enable, hwaccels_modules, input_track); if (candidates == nullptr) { logte("%s Decoder candidates are not found. InputTrack(%u)", _log_prefix.CStr(), input_track->GetId()); @@ -832,14 +825,31 @@ bool TranscoderStream::CreateDecoder(MediaTrackId decoder_id, std::shared_ptr Decoder(%u)", _log_prefix.CStr(), input_track->GetId(), decoder_id); return true; } -int32_t TranscoderStream::CreateEncoders(MediaFrame *buffer) +std::shared_ptr TranscoderStream::GetDecoder(MediaTrackId decoder_id) +{ + std::shared_lock decoder_lock(_decoder_map_mutex); + if (_decoders.find(decoder_id) == _decoders.end()) + { + return nullptr; + } + + return _decoders[decoder_id]; +} + +void TranscoderStream::SetDecoder(MediaTrackId decoder_id, std::shared_ptr decoder) +{ + std::unique_lock decoder_lock(_decoder_map_mutex); + _decoders[decoder_id] = decoder; +} + +int32_t TranscoderStream::CreateEncoders(std::shared_ptr buffer) { int32_t created = 0; MediaTrackId track_id = buffer->GetTrackId(); @@ -876,21 +886,6 @@ int32_t TranscoderStream::CreateEncoders(MediaFrame *buffer) continue; } - // Set the sample format and color space supported by the encoder to the output track. - // These values are used in the Resampler/Rescaler filter. - if (output_track->GetMediaType() == cmn::MediaType::Video) - { - auto encoder = _encoders[encoder_id]; - - output_track->SetColorspace(encoder->GetSupportedFormat()); - } - else if (output_track->GetMediaType() == cmn::MediaType::Audio) - { - auto encoder = _encoders[encoder_id]; - - output_track->GetSample().SetFormat(ffmpeg::Conv::ToAudioSampleFormat(encoder->GetSupportedFormat())); - } - created++; } } @@ -900,77 +895,120 @@ int32_t TranscoderStream::CreateEncoders(MediaFrame *buffer) bool TranscoderStream::CreateEncoder(MediaTrackId encoder_id, std::shared_ptr output_stream, std::shared_ptr output_track) { - std::lock_guard encoder_lock(_encoder_map_mutex); - - if (_encoders.find(encoder_id) != _encoders.end()) + if (GetEncoder(encoder_id).has_value()) { - logtd("%s The encoder has already created. Encoder(%d) > OutputTrack(%d)", _log_prefix.CStr(), encoder_id, output_track->GetId()); + logtd("%s Encoder already exists. Encoder(%d) > OutputTrack(%d)", _log_prefix.CStr(), encoder_id, output_track->GetId()); return true; } + auto hwaccels_enable = GetOutputProfilesCfg()->GetHWAccels().GetEncoder().IsEnable() || + GetOutputProfilesCfg()->IsHardwareAcceleration(); // Deprecated + + auto hwaccels_modules = GetOutputProfilesCfg()->GetHWAccels().GetEncoder().GetModules(); + // Get a list of available encoder candidates. - // TODO: GetOutputProfilesCfg()->IsHardwareAcceleration() is deprecated. It will be deleted soon. - auto candidates = TranscodeEncoder::GetCandidates( - GetOutputProfilesCfg()->GetHWAccels().GetEncoder().IsEnable() || GetOutputProfilesCfg()->IsHardwareAcceleration(), - GetOutputProfilesCfg()->GetHWAccels().GetEncoder().GetModules(), output_track); - if(candidates == nullptr) + auto candidates = TranscodeEncoder::GetCandidates(hwaccels_enable, hwaccels_modules, output_track); + if (candidates == nullptr) { logte("%s Decoder candidates are not found. InputTrack(%d)", _log_prefix.CStr(), output_track->GetId()); return false; } - auto encoder = TranscodeEncoder::Create( - encoder_id, - output_stream, - output_track, - candidates, bind(&TranscoderStream::OnEncodedPacket, this, std::placeholders::_1, std::placeholders::_2)); + // Create Encoder + auto encoder = TranscodeEncoder::Create(encoder_id, output_stream, output_track, candidates, + bind(&TranscoderStream::OnEncodedPacket, this, std::placeholders::_1, std::placeholders::_2)); if (encoder == nullptr) { return false; } - _encoders[encoder_id] = std::move(encoder); + // Set the sample format and color space supported by the encoder to the output track. + // These values are used in the Resampler/Rescaler filter. + if (output_track->GetMediaType() == cmn::MediaType::Video) + { + output_track->SetColorspace(encoder->GetSupportedFormat()); + } + else if (output_track->GetMediaType() == cmn::MediaType::Audio) + { + output_track->GetSample().SetFormat(ffmpeg::Conv::ToAudioSampleFormat(encoder->GetSupportedFormat())); + } + + // Create Paired Filter + std::shared_ptr filter = nullptr; + if(output_track->GetMediaType() == cmn::MediaType::Audio) + { + filter = TranscodeFilter::Create(encoder_id, output_stream, output_track, + bind(&TranscoderStream::OnPreEncodeFilteredFrame, this, std::placeholders::_1, std::placeholders::_2)); + if (filter == nullptr) + { + // Stop & Release Encoder + encoder->Stop(); + encoder.reset(); + + return false; + } + } + + SetEncoder(encoder_id, filter, encoder); logtd("%s Created encoder. Encoder(%d) > OutputTrack(%d)", _log_prefix.CStr(), encoder_id, output_track->GetId()); + return true; } +std::optional, std::shared_ptr>> TranscoderStream::GetEncoder(MediaTrackId encoder_id) +{ + std::shared_lock encoder_lock(_encoder_map_mutex); + if (_encoders.find(encoder_id) == _encoders.end()) + { + return std::nullopt; + } + + return _encoders[encoder_id]; +} + +void TranscoderStream::SetEncoder(MediaTrackId encoder_id, std::shared_ptr filter, std::shared_ptr encoder) +{ + std::unique_lock encoder_lock(_encoder_map_mutex); + _encoders[encoder_id] = std::make_pair(filter, encoder); +} -int32_t TranscoderStream::CreateFilters(MediaFrame *buffer) +int32_t TranscoderStream::CreateFilters(std::shared_ptr buffer) { int32_t created = 0; MediaTrackId decoder_id = buffer->GetTrackId(); - // 1. Get Input Track of Decoder - auto input_track = _decoders[decoder_id]->GetRefTrack(); - + // 1. Get Decoder -> Filter List auto decoder_to_filters_it = _link_decoder_to_filters.find(decoder_id); if (decoder_to_filters_it == _link_decoder_to_filters.end()) - { + { logtw("%s Could not found filter list related to decoder", _log_prefix.CStr()); return created; } - auto filter_ids = decoder_to_filters_it->second; // 2. Get Output Track of Encoders + auto filter_ids = decoder_to_filters_it->second; for (auto &filter_id : filter_ids) { MediaTrackId encoder_id = _link_filter_to_encoder[filter_id]; - if (_encoders.find(encoder_id) == _encoders.end()) + auto encoder = GetEncoder(encoder_id); + if(encoder.has_value() == false) { - logte("%s Filter creation failed because the encoder could not be found. EncoderId(%d), FilterId(%d)", - _log_prefix.CStr(), encoder_id, filter_id); + logte("%s Failed to create filter. could not found encoder. EncoderId(%d), FilterId(%d)", _log_prefix.CStr(), encoder_id, filter_id); continue; } - auto output_track = _encoders[encoder_id]->GetRefTrack(); + auto input_track = GetDecoder(decoder_id)->GetRefTrack(); + + auto output_track = encoder->second->GetRefTrack(); - if(CreateFilter(filter_id, input_track, output_track) == false) + if (CreateFilter(filter_id, input_track, output_track) == false) { continue; } + created++; } @@ -979,12 +1017,9 @@ int32_t TranscoderStream::CreateFilters(MediaFrame *buffer) bool TranscoderStream::CreateFilter(MediaTrackId filter_id, std::shared_ptr input_track, std::shared_ptr output_track) { - std::lock_guard filter_lock(_filter_map_mutex); - - if (_filters.find(filter_id) != _filters.end()) + if(GetFilter(filter_id) != nullptr) { - logtd("%s The filter has already created. FilterId(%d)", _log_prefix.CStr(), filter_id); - + logtw("%s Filter already exists. FilterId(%d)", _log_prefix.CStr(), filter_id); return true; } @@ -1002,28 +1037,43 @@ bool TranscoderStream::CreateFilter(MediaTrackId filter_id, std::shared_ptr(); - if (filter->Configure(filter_id, input_stream, input_track, output_stream, output_track, - bind(&TranscoderStream::OnFilteredFrame, this, std::placeholders::_1, std::placeholders::_2)) != true) + auto filter = TranscodeFilter::Create(filter_id, input_stream, input_track, output_stream, output_track, bind(&TranscoderStream::OnFilteredFrame, this, std::placeholders::_1, std::placeholders::_2)); + if (filter == nullptr) { - logte("%s Failed to create filter. Filter(%d)", _log_prefix.CStr(), filter_id); + logte("%s Failed to create filter. FilterId(%d)", _log_prefix.CStr(), filter_id); return false; } - _filters[filter_id] = filter; + SetFilter(filter_id, filter); logtd("%s Created Filter. FilterId(%d)", _log_prefix.CStr(), filter_id); return true; } +std::shared_ptr TranscoderStream::GetFilter(MediaTrackId filter_id) +{ + std::shared_lock lock(_filter_map_mutex); + if (_filters.find(filter_id) == _filters.end()) + { + return nullptr; + } + + return _filters[filter_id]; +} + +void TranscoderStream::SetFilter(MediaTrackId filter_id, std::shared_ptr filter) +{ + std::unique_lock lock(_filter_map_mutex); + _filters[filter_id] = filter; +} // Function called when codec information is extracted or changed from the decoder -void TranscoderStream::ChangeOutputFormat(MediaFrame *buffer) +void TranscoderStream::ChangeOutputFormat(std::shared_ptr buffer) { logtd("%s Changed output format. InputTrack(%u)", _log_prefix.CStr(), buffer->GetTrackId()); - std::lock_guard lock(_format_change_mutex); + std::unique_lock lock(_format_change_mutex); if (buffer == nullptr) { @@ -1037,7 +1087,6 @@ void TranscoderStream::ChangeOutputFormat(MediaFrame *buffer) // Update Track of Output Stream UpdateOutputTrack(buffer); - // Create an encoder. If there is an existing encoder, reuse it if(CreateEncoders(buffer) == 0) { @@ -1059,7 +1108,7 @@ void TranscoderStream::ChangeOutputFormat(MediaFrame *buffer) } // Information of the input track is updated by the decoded frame -void TranscoderStream::UpdateInputTrack(MediaFrame *buffer) +void TranscoderStream::UpdateInputTrack(std::shared_ptr buffer) { MediaTrackId track_id = buffer->GetTrackId(); @@ -1097,7 +1146,7 @@ void TranscoderStream::UpdateInputTrack(MediaFrame *buffer) } // Update Output Track -void TranscoderStream::UpdateOutputTrack(MediaFrame *buffer) +void TranscoderStream::UpdateOutputTrack(std::shared_ptr buffer) { MediaTrackId input_track_id = buffer->GetTrackId(); @@ -1139,6 +1188,11 @@ void TranscoderStream::DecodePacket(const std::shared_ptr &packet) { MediaTrackId input_track_id = packet->GetTrackId(); + // if(input_track_id == 1) + // { + // logtd("%s DecodePacket.(%u) %s", _log_prefix.CStr(), input_track_id, packet->GetInfoString().CStr()); + // } + // 1. Packet to Output Stream (bypass) auto output_streams = _link_input_to_outputs.find(input_track_id); if (output_streams != _link_input_to_outputs.end()) @@ -1193,15 +1247,12 @@ void TranscoderStream::DecodePacket(const std::shared_ptr &packet) } auto decoder_id = input_to_decoder_it->second; - std::shared_lock lock(_decoder_map_mutex); - - auto decoder_it = _decoders.find(decoder_id); - if (decoder_it == _decoders.end()) + auto decoder = GetDecoder(decoder_id); + if (decoder == nullptr) { + logte("%s Could not found decoder. Decoder(%d)", _log_prefix.CStr(), decoder_id); return; } - - auto decoder = decoder_it->second; decoder->SendBuffer(std::move(packet)); } @@ -1248,7 +1299,7 @@ void TranscoderStream::OnDecodedFrame(TranscodeResult result, MediaTrackId decod last_frame->SetPts((int64_t)((double)decoded_frame->GetPts() * input_expr / filter_expr)); // Record the timestamp of the last decoded frame. managed by microseconds. - _last_decoded_frame_pts[decoder_id] = last_frame->GetPts() * filter_expr * 1000000; + _last_decoded_frame_pts[decoder_id] = last_frame->GetPts() * filter_expr * 1000000.0; logtd("%s Create filler frame because there is no decoding frame. Type(%s), Decoder(%u), FillerFrames(%d)" , _log_prefix.CStr(), cmn::GetMediaTypeString(input_track->GetMediaType()).CStr(), decoder_id, 1); @@ -1263,69 +1314,88 @@ void TranscoderStream::OnDecodedFrame(TranscodeResult result, MediaTrackId decod case TranscodeResult::FormatChanged: { // Re-create filter and encoder using the format of decoded frame - ChangeOutputFormat(decoded_frame.get()); + ChangeOutputFormat(decoded_frame); #if FILLER_ENABLED /////////////////////////////////////////////////////////////////// // Generate a filler frame (Part 2). * Using latest decoded frame /////////////////////////////////////////////////////////////////// - // - It is mainly used in Persistent Stream. + // - It is mainly used in Schedule stream. // - When the input stream is changed, an empty section occurs in sequential frames. There is a problem with the A/V sync in the player. // If there is a section where the frame is empty, may be out of sync in the player. // Therefore, the filler frame is inserted in the hole of the sequential frame. - auto it = _last_decoded_frame_pts.find(decoder_id); - if(it != _last_decoded_frame_pts.end()) + auto input_track = GetInputTrack(decoder_id); + if (!input_track) { - auto input_track = GetInputTrack(decoder_id); - if(!input_track) + logte("Could not found input track. Decoder(%d)", decoder_id); + return; + } + + if (_last_decoded_frame_pts.find(decoder_id) != _last_decoded_frame_pts.end()) + { + auto last_decoded_frame_time_us = _last_decoded_frame_pts[decoder_id]; + auto last_decoded_frame_duration_us = _last_decoded_frame_duration[decoder_id]; + + // Decoded frame PTS to microseconds + int64_t curr_decoded_frame_time_us = (int64_t)((double)decoded_frame->GetPts() * input_track->GetTimeBase().GetExpr() * 1000000); + + // Calculate the time difference between the last decoded frame and the current decoded frame. + int64_t hole_time_us = curr_decoded_frame_time_us - (last_decoded_frame_time_us + last_decoded_frame_duration_us); + int64_t hole_time_tb = (int64_t)(floor((double)hole_time_us / input_track->GetTimeBase().GetExpr() / 1000000)); + + int64_t duration_per_frame = 0; + switch (input_track->GetMediaType()) { - logte("Could not found input track. Decoder(%d)", decoder_id); - return; + case cmn::MediaType::Video: + duration_per_frame = input_track->GetTimeBase().GetTimescale() / input_track->GetFrameRate(); + break; + case cmn::MediaType::Audio: + duration_per_frame = decoded_frame->GetNbSamples(); + break; + default: + break; } - int64_t frame_hole_time_us = (int64_t)((double)decoded_frame->GetPts() * input_track->GetTimeBase().GetExpr() * 1000000) - (int64_t)it->second; - if(frame_hole_time_us > 0) + // If the time difference is greater than 0, it means that there is a hole between with the last frame and the current frame. + if (hole_time_tb >= duration_per_frame) { - int32_t number_of_filler_frames_needed = 0; + int64_t start_pts = decoded_frame->GetPts() - hole_time_tb; + int64_t end_pts = decoded_frame->GetPts(); + int32_t needed_frames = hole_time_tb / duration_per_frame; + int64_t reamiain_pts = hole_time_tb - (needed_frames * duration_per_frame); - switch(input_track->GetMediaType()) - { - case cmn::MediaType::Video: - number_of_filler_frames_needed = (int32_t)(((double)frame_hole_time_us/1000000) / (1.0f / input_track->GetFrameRate())); - break; - case cmn::MediaType::Audio: - number_of_filler_frames_needed = (int32_t)(((double)frame_hole_time_us/1000000) / (input_track->GetTimeBase().GetExpr() * (double)decoded_frame->GetNbSamples())); - break; - default: - break; - } + logtd("%s Create filler frame because time diffrence from last frame. Type(%s), needed(%d), last_pts(%lld), curr_pts(%lld), hole_time(%lld), hole_time_tb(%lld), frame_duration(%lld), remain_pts(%lld), start_pts(%lld), end_pts(%lld)", + _log_prefix.CStr(), cmn::GetMediaTypeString(input_track->GetMediaType()).CStr(), needed_frames, last_decoded_frame_time_us, curr_decoded_frame_time_us, hole_time_us, hole_time_tb, duration_per_frame, reamiain_pts, start_pts, end_pts); - if(number_of_filler_frames_needed > 0) + for (int64_t filler_pts = start_pts; filler_pts < end_pts; filler_pts += duration_per_frame) { - logtd("%s Create filler frame because time diffrence from last frame. Type(%s), Decoder(%u), FillerFrames(%d)" - ,_log_prefix.CStr(), cmn::GetMediaTypeString(input_track->GetMediaType()).CStr(), decoder_id, number_of_filler_frames_needed); - - int64_t frame_hole_time_tb = (int64_t)((double)frame_hole_time_us / input_track->GetTimeBase().GetExpr() / 1000000); - int64_t frame_duration_avg = frame_hole_time_tb / number_of_filler_frames_needed; - int64_t start_pts = decoded_frame->GetPts() - frame_hole_time_tb + frame_duration_avg; - int64_t end_pts = decoded_frame->GetPts(); - - for (int64_t filler_pts = start_pts; filler_pts < end_pts; filler_pts += frame_duration_avg) + std::shared_ptr clone_frame = decoded_frame->CloneFrame(true); + if (!clone_frame) { - auto clone_frame = decoded_frame->CloneFrame(); - clone_frame->SetPts(filler_pts); + continue; + } + clone_frame->SetPts(filler_pts); + clone_frame->SetDuration(duration_per_frame); - // Fill the silence in the audio frame. - if (input_track->GetMediaType() == cmn::MediaType::Audio) + if (input_track->GetMediaType() == cmn::MediaType::Audio) + { + if (end_pts - filler_pts < duration_per_frame) { - clone_frame->FillZeroData(); - } + int32_t remain_samples = end_pts - filler_pts; + clone_frame->SetDuration(remain_samples); - SpreadToFilters(decoder_id, clone_frame); + // There is no problem making the Samples smaller since the cloned frame is larger than the remaining samples. + // To do this properly, It need to reallocate audio buffers of MediaFrame. + clone_frame->SetNbSamples(remain_samples); + } + clone_frame->FillZeroData(); } + + SpreadToFilters(decoder_id, clone_frame); + // logte("%s Create filler frame. Type(%s), %s", _log_prefix.CStr(), cmn::GetMediaTypeString(input_track->GetMediaType()).CStr(), clone_frame->GetInfoString().CStr()); } } - } + } #endif // End of Filler Frame Generation [[fallthrough]]; @@ -1333,11 +1403,6 @@ void TranscoderStream::OnDecodedFrame(TranscodeResult result, MediaTrackId decod case TranscodeResult::DataReady: { - auto input_track = GetInputTrack(decoder_id); - - // Record the timestamp of the last decoded frame. managed by microseconds. - _last_decoded_frame_pts[decoder_id] = decoded_frame->GetPts() * input_track->GetTimeBase().GetExpr() * 1000000; - // The last decoded frame is kept and used as a filling frame in the blank section. SetLastDecodedFrame(decoder_id, decoded_frame); @@ -1354,6 +1419,12 @@ void TranscoderStream::OnDecodedFrame(TranscodeResult result, MediaTrackId decod void TranscoderStream::SetLastDecodedFrame(MediaTrackId decoder_id, std::shared_ptr &decoded_frame) { + auto input_track = GetInputTrack(decoder_id); + + auto scale_factor = input_track->GetTimeBase().GetExpr() * 1000000.0; + _last_decoded_frame_pts[decoder_id] = static_cast(decoded_frame->GetPts() * scale_factor); + _last_decoded_frame_duration[decoder_id] = static_cast(decoded_frame->GetDuration() * scale_factor); + _last_decoded_frames[decoder_id] = decoded_frame->CloneFrame(); } @@ -1383,29 +1454,23 @@ std::shared_ptr TranscoderStream::GetInputTrackOfFilter(MediaTrackId return nullptr; } - std::shared_lock lock(_filter_map_mutex); - - auto it = _filters.find(filter_ids[0]); - if (it == _filters.end()) + auto filter = GetFilter(filter_ids[0]); + if (filter == nullptr) { return nullptr; } - auto filter = it->second; - + return filter->GetInputTrack(); } TranscodeResult TranscoderStream::FilterFrame(MediaTrackId filter_id, std::shared_ptr decoded_frame) { - std::shared_lock lock(_filter_map_mutex); - - auto filter_it = _filters.find(filter_id); - if (filter_it == _filters.end()) + auto filter = GetFilter(filter_id); + if (filter == nullptr) { return TranscodeResult::NoData; } - auto filter = filter_it->second.get(); if (filter->SendBuffer(std::move(decoded_frame)) == false) { return TranscodeResult::DataError; @@ -1418,33 +1483,72 @@ void TranscoderStream::OnFilteredFrame(MediaTrackId filter_id, std::shared_ptrSetTrackId(filter_id); - EncodeFrame(std::move(filtered_frame)); + PreEncodeFilterFrame(std::move(filtered_frame)); } -TranscodeResult TranscoderStream::EncodeFrame(std::shared_ptr frame) +TranscodeResult TranscoderStream::PreEncodeFilterFrame(std::shared_ptr frame) { auto filter_id = frame->GetTrackId(); // Get Encoder ID auto filter_to_encoder_it = _link_filter_to_encoder.find(filter_id); - if(filter_to_encoder_it == _link_filter_to_encoder.end()) + if (filter_to_encoder_it == _link_filter_to_encoder.end()) { return TranscodeResult::NoData; } auto encoder_id = filter_to_encoder_it->second; - // Get Encoder - std::shared_lock lock(_encoder_map_mutex); - - auto encoder_map_it = _encoders.find(encoder_id); - if (encoder_map_it == _encoders.end()) + // Get EncoderSet + auto encoder_set = GetEncoder(encoder_id); + if(encoder_set.has_value() == false) + { + return TranscodeResult::NoData; + } + + // If the encoder has a filter, it is passed to the filter. + auto encode_filter = encoder_set->first; + if (encode_filter != nullptr) + { + encode_filter->SendBuffer(std::move(frame)); + return TranscodeResult::DataReady; + } + + auto encoder = encoder_set->second; + if (encoder == nullptr) { return TranscodeResult::NoData; } - auto encoder = encoder_map_it->second.get(); encoder->SendBuffer(std::move(frame)); + + return TranscodeResult::DataReady; +} +void TranscoderStream::OnPreEncodeFilteredFrame(MediaTrackId filter_id, std::shared_ptr filtered_frame) +{ + filtered_frame->SetTrackId(filter_id); + + EncodeFrame(std::move(filtered_frame)); +} + +TranscodeResult TranscoderStream::EncodeFrame(std::shared_ptr frame) +{ + auto encoder_id = frame->GetTrackId(); + + // Get Encoder + auto encoder_set = GetEncoder(encoder_id); + if (encoder_set.has_value() == false) + { + return TranscodeResult::NoData; + } + + auto encoder = encoder_set->second; + if (encoder == nullptr) + { + return TranscodeResult::NoData; + } + + encoder->SendBuffer(std::move(frame)); return TranscodeResult::DataReady; } diff --git a/src/projects/transcoder/transcoder_stream.h b/src/projects/transcoder/transcoder_stream.h index d2677ff9a..20dcdfd88 100644 --- a/src/projects/transcoder/transcoder_stream.h +++ b/src/projects/transcoder/transcoder_stream.h @@ -122,12 +122,14 @@ class TranscoderStream : public ov::EnableSharedFromThis, publ private: ov::String _log_prefix; + + TranscodeApplication *_parent; + std::shared_mutex _format_change_mutex; std::shared_mutex _decoder_map_mutex; std::shared_mutex _filter_map_mutex; std::shared_mutex _encoder_map_mutex; - TranscodeApplication *_parent; const info::Application _application_info; @@ -147,7 +149,7 @@ class TranscoderStream : public ov::EnableSharedFromThis, publ // [OUTPUT_STREAM_NAME, OUTPUT_stream] std::map> _output_streams; - // Map of CompositeContext + // Map of CompositeContext // Purpose of reusing the same encoding profile. // // [ @@ -183,14 +185,15 @@ class TranscoderStream : public ov::EnableSharedFromThis, publ std::map> _last_decoded_frames; // [DECODER_ID, Timestamp(microseconds)] std::map _last_decoded_frame_pts; + std::map _last_decoded_frame_duration; - // Filter Component + // Filters // [FILTER_ID, FILTER] std::map> _filters; // Encoder Component - // [ENCODER_ID, ENCODER] - std::map> _encoders; + // [ENCODER_ID, [FILTER, ENCODER]] + std::map, std::shared_ptr>> _encoders; private: std::shared_ptr GetInputTrack(MediaTrackId track_id); @@ -215,14 +218,25 @@ class TranscoderStream : public ov::EnableSharedFromThis, publ int32_t CreateDecoders(); bool CreateDecoder(MediaTrackId decoder_id, std::shared_ptr input_stream, std::shared_ptr input_track); + std::shared_ptr GetDecoder(MediaTrackId decoder_id); + void SetDecoder(MediaTrackId decoder_id, std::shared_ptr decoder); + void RemoveDecoders(); - int32_t CreateFilters(MediaFrame *buffer); + + int32_t CreateFilters(std::shared_ptr buffer); bool CreateFilter(MediaTrackId filter_id, std::shared_ptr input_track, std::shared_ptr output_track); + std::shared_ptr GetFilter(MediaTrackId filter_id); + void SetFilter(MediaTrackId filter_id, std::shared_ptr filter); + void RemoveFilters(); + std::shared_ptr GetInputTrackOfFilter(MediaTrackId decoder_id); - int32_t CreateEncoders(MediaFrame *buffer); + int32_t CreateEncoders(std::shared_ptr buffer); bool CreateEncoder(MediaTrackId encoder_id, std::shared_ptr output_stream, std::shared_ptr output_track); - + std::optional, std::shared_ptr>> GetEncoder(MediaTrackId encoder_id); + void SetEncoder(MediaTrackId encoder_id, std::shared_ptr filter, std::shared_ptr encoder); + void RemoveEncoders(); + // Step 1: Decode (Decode a frame from given packets) void DecodePacket(const std::shared_ptr &packet); void OnDecodedFrame(TranscodeResult result, MediaTrackId decoder_id, std::shared_ptr decoded_frame); @@ -230,16 +244,19 @@ class TranscoderStream : public ov::EnableSharedFromThis, publ std::shared_ptr GetLastDecodedFrame(MediaTrackId decoder_id); // Called when formatting of decoded frames is analyzed or changed. - void ChangeOutputFormat(MediaFrame *buffer); - void UpdateInputTrack(MediaFrame *buffer); - void UpdateOutputTrack(MediaFrame *buffer); + void ChangeOutputFormat(std::shared_ptr buffer); + void UpdateInputTrack(std::shared_ptr buffer); + void UpdateOutputTrack(std::shared_ptr buffer); void UpdateMsidOfOutputStreams(uint32_t msid); + bool IsAvailableSmoothTransition(const std::shared_ptr &stream); // Step 2: Filter (resample/rescale the decoded frame) void SpreadToFilters(MediaTrackId decoder_id, std::shared_ptr frame); TranscodeResult FilterFrame(MediaTrackId track_id, std::shared_ptr frame); void OnFilteredFrame(MediaTrackId filter_id, std::shared_ptr decoded_frame); - bool IsAvailableSmoothTransition(const std::shared_ptr &stream); + + TranscodeResult PreEncodeFilterFrame(std::shared_ptr frame); + void OnPreEncodeFilteredFrame(MediaTrackId filter_id, std::shared_ptr decoded_frame); // Step 3: Encode (Encode the filtered frame to packets) TranscodeResult EncodeFrame(std::shared_ptr frame); @@ -248,11 +265,7 @@ class TranscoderStream : public ov::EnableSharedFromThis, publ // Send encoded packet to mediarouter via transcoder application void SendFrame(std::shared_ptr &stream, std::shared_ptr packet); - // Remove all components - void RemoveAllComponents(); - void RemoveDecoders(); - void RemoveFilters(); - void RemoveEncoders(); + private: // Initial buffer for ready to stream diff --git a/src/projects/transcoder/transcoder_stream_internal.cpp b/src/projects/transcoder/transcoder_stream_internal.cpp index 9db9f76f0..b358cecac 100644 --- a/src/projects/transcoder/transcoder_stream_internal.cpp +++ b/src/projects/transcoder/transcoder_stream_internal.cpp @@ -665,7 +665,7 @@ double TranscoderStreamInternal::MeasurementToRecommendFramerate(double framerat return ::floor(recommend_framerate); } -void TranscoderStreamInternal::UpdateOutputTrackPassthrough(const std::shared_ptr &output_track, MediaFrame *buffer) +void TranscoderStreamInternal::UpdateOutputTrackPassthrough(const std::shared_ptr &output_track, std::shared_ptr buffer) { if (output_track->GetMediaType() == cmn::MediaType::Video) { @@ -681,7 +681,7 @@ void TranscoderStreamInternal::UpdateOutputTrackPassthrough(const std::shared_pt } } -void TranscoderStreamInternal::UpdateOutputTrackTranscode(const std::shared_ptr &output_track, const std::shared_ptr &input_track, MediaFrame *buffer) +void TranscoderStreamInternal::UpdateOutputTrackTranscode(const std::shared_ptr &output_track, const std::shared_ptr &input_track, std::shared_ptr buffer) { if (output_track->GetMediaType() == cmn::MediaType::Video) { diff --git a/src/projects/transcoder/transcoder_stream_internal.h b/src/projects/transcoder/transcoder_stream_internal.h index 9a12a45d9..102fc96b4 100644 --- a/src/projects/transcoder/transcoder_stream_internal.h +++ b/src/projects/transcoder/transcoder_stream_internal.h @@ -47,8 +47,8 @@ class TranscoderStreamInternal double GetProperFramerate(const std::shared_ptr &ref_track); static double MeasurementToRecommendFramerate(double framerate); - void UpdateOutputTrackPassthrough(const std::shared_ptr &output_track, MediaFrame *buffer); - void UpdateOutputTrackTranscode(const std::shared_ptr &output_track, const std::shared_ptr &input_track, MediaFrame *buffer); + void UpdateOutputTrackPassthrough(const std::shared_ptr &output_track, std::shared_ptr buffer); + void UpdateOutputTrackTranscode(const std::shared_ptr &output_track, const std::shared_ptr &input_track, std::shared_ptr buffer); // This is used to check if only keyframes can be decoded.