From 5577d76576495f6dfccb3a078e0ccd65fec04248 Mon Sep 17 00:00:00 2001 From: Keukhan Date: Tue, 24 Dec 2024 13:23:25 +0900 Subject: [PATCH] Added AMF onTextData, onCuePoint event handling module --- .../streams/stream_actions_controller.cpp | 51 ++++++++++++++++++ .../apps/streams/stream_actions_controller.h | 1 + src/projects/providers/rtmp/rtmp_stream.cpp | 53 +++++++++++++++++-- src/projects/providers/rtmp/rtmp_stream.h | 2 + 4 files changed, 102 insertions(+), 5 deletions(-) diff --git a/src/projects/api_server/controllers/v1/vhosts/apps/streams/stream_actions_controller.cpp b/src/projects/api_server/controllers/v1/vhosts/apps/streams/stream_actions_controller.cpp index 573a0ddcd..cadc3c765 100644 --- a/src/projects/api_server/controllers/v1/vhosts/apps/streams/stream_actions_controller.cpp +++ b/src/projects/api_server/controllers/v1/vhosts/apps/streams/stream_actions_controller.cpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace api { @@ -572,5 +573,55 @@ namespace api return cue_event->Serialize(); } + + std::shared_ptr StreamActionsController::MakeAMFData(const Json::Value &events) + { + if (events.size() == 0) + { + throw http::HttpError(http::StatusCode::BadRequest, "events must have at least one event"); + } + + // only first event is used + auto event = events[0]; + if (event.isMember("amfType") == false || event["amfType"].isString() == false) + { + throw http::HttpError(http::StatusCode::BadRequest, "amfType is required in events"); + } + + ov::String amf_type = event["amfType"].asString().c_str(); + + if (AmfTextDataEvent::IsMatch(amf_type)) + { + if(AmfTextDataEvent::IsValid(event) == false) + { + throw http::HttpError(http::StatusCode::BadRequest, "data is required in events"); + } + + auto amf_event = AmfTextDataEvent::Parse(event); + if (amf_event == nullptr) + { + throw http::HttpError(http::StatusCode::BadRequest, "Could not parse amf data"); + } + + return amf_event->Serialize(); + } + else if (AmfCuePointEvent::IsMatch(amf_type)) + { + if(AmfCuePointEvent::IsValid(event) == false) + { + throw http::HttpError(http::StatusCode::BadRequest, "version, preRollTimeSec is required in events"); + } + + auto amf_event = AmfCuePointEvent::Parse(event); + if (amf_event == nullptr) + { + throw http::HttpError(http::StatusCode::BadRequest, "Could not parse amf data"); + } + + return amf_event->Serialize(); + } + + return nullptr; + } } // namespace v1 } // namespace api \ No newline at end of file diff --git a/src/projects/api_server/controllers/v1/vhosts/apps/streams/stream_actions_controller.h b/src/projects/api_server/controllers/v1/vhosts/apps/streams/stream_actions_controller.h index ff4979b04..8bf1fbf34 100644 --- a/src/projects/api_server/controllers/v1/vhosts/apps/streams/stream_actions_controller.h +++ b/src/projects/api_server/controllers/v1/vhosts/apps/streams/stream_actions_controller.h @@ -100,6 +100,7 @@ namespace api std::shared_ptr MakeID3Data(const Json::Value &events); // ID3v2 std::shared_ptr MakeCueData(const Json::Value &events); // CUE + std::shared_ptr MakeAMFData(const Json::Value &events); // AMF }; } } \ No newline at end of file diff --git a/src/projects/providers/rtmp/rtmp_stream.cpp b/src/projects/providers/rtmp/rtmp_stream.cpp index d108547a8..69b6c34c3 100644 --- a/src/projects/providers/rtmp/rtmp_stream.cpp +++ b/src/projects/providers/rtmp/rtmp_stream.cpp @@ -891,6 +891,48 @@ namespace pvd return true; } + bool RtmpStream::OnAmfTextData(const std::shared_ptr &header, const AmfDocument &document) + { + int64_t pts = 0; + if (_last_video_pts > _last_audio_pts) + { + pts = _last_video_pts + _last_video_pts_clock.Elapsed(); + } + else + { + pts = _last_audio_pts + _last_audio_pts_clock.Elapsed(); + } + + ov::ByteStream byte_stream; + if (document.Encode(byte_stream) == false) + { + return false; + } + + return SendDataFrame(pts, cmn::BitstreamFormat::AMF, cmn::PacketType::EVENT, byte_stream.GetDataPointer(), false); + } + + bool RtmpStream::OnAmfCuePoint(const std::shared_ptr &header, const AmfDocument &document) + { + int64_t pts = 0; + if (_last_video_pts > _last_audio_pts) + { + pts = _last_video_pts + _last_video_pts_clock.Elapsed(); + } + else + { + pts = _last_audio_pts + _last_audio_pts_clock.Elapsed(); + } + + ov::ByteStream byte_stream; + if (document.Encode(byte_stream) == false) + { + return false; + } + + return SendDataFrame(pts, cmn::BitstreamFormat::AMF, cmn::PacketType::EVENT, byte_stream.GetDataPointer(), false); + } + off_t RtmpStream::ReceiveHandshakePacket(const std::shared_ptr &data) { // +-------------+ +-------------+ @@ -1312,13 +1354,14 @@ namespace pvd break; default: - // Find it in Events - if (CheckEventMessage(message->header, document) == false) - { - logtw("Unknown Amf0DataMessage - Message(%s / %s)", message_name.CStr(), data_name.CStr()); - } break; } + + // Find it in Events + if (CheckEventMessage(message->header, document) == false) + { + logtd("There were no triggered events - Message(%s / %s)", message_name.CStr(), data_name.CStr()); + } } bool RtmpStream::CheckEventMessage(const std::shared_ptr &header, AmfDocument &document) diff --git a/src/projects/providers/rtmp/rtmp_stream.h b/src/projects/providers/rtmp/rtmp_stream.h index 4375fb57e..1ea9bbb19 100644 --- a/src/projects/providers/rtmp/rtmp_stream.h +++ b/src/projects/providers/rtmp/rtmp_stream.h @@ -64,6 +64,8 @@ namespace pvd bool OnAmfPublish(const std::shared_ptr &header, AmfDocument &document, double transaction_id); bool OnAmfDeleteStream(const std::shared_ptr &header, AmfDocument &document, double transaction_id); bool OnAmfMetaData(const std::shared_ptr &header, const AmfProperty *property); + bool OnAmfTextData(const std::shared_ptr &header, const AmfDocument &document); + bool OnAmfCuePoint(const std::shared_ptr &header, const AmfDocument &document); // Send messages bool SendData(const std::shared_ptr &data);