Skip to content

Commit

Permalink
Added AMF onTextData, onCuePoint event handling module
Browse files Browse the repository at this point in the history
  • Loading branch information
Keukhan committed Dec 24, 2024
1 parent f0b7587 commit 5577d76
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <modules/data_format/id3v2/id3v2.h>
#include <modules/data_format/id3v2/frames/id3v2_frames.h>
#include <modules/data_format/cue_event/cue_event.h>
#include <modules/data_format/amf_event/amf_event.h>

namespace api
{
Expand Down Expand Up @@ -572,5 +573,55 @@ namespace api

return cue_event->Serialize();
}

std::shared_ptr<ov::Data> StreamActionsController::MakeAMFData(const Json::Value &events)
{
if (events.size() == 0)
{
throw http::HttpError(http::StatusCode::BadRequest, "events must have at least one event");
}

// only first event is used
auto event = events[0];
if (event.isMember("amfType") == false || event["amfType"].isString() == false)
{
throw http::HttpError(http::StatusCode::BadRequest, "amfType is required in events");
}

ov::String amf_type = event["amfType"].asString().c_str();

if (AmfTextDataEvent::IsMatch(amf_type))
{
if(AmfTextDataEvent::IsValid(event) == false)
{
throw http::HttpError(http::StatusCode::BadRequest, "data is required in events");
}

auto amf_event = AmfTextDataEvent::Parse(event);
if (amf_event == nullptr)
{
throw http::HttpError(http::StatusCode::BadRequest, "Could not parse amf data");
}

return amf_event->Serialize();
}
else if (AmfCuePointEvent::IsMatch(amf_type))
{
if(AmfCuePointEvent::IsValid(event) == false)
{
throw http::HttpError(http::StatusCode::BadRequest, "version, preRollTimeSec is required in events");
}

auto amf_event = AmfCuePointEvent::Parse(event);
if (amf_event == nullptr)
{
throw http::HttpError(http::StatusCode::BadRequest, "Could not parse amf data");
}

return amf_event->Serialize();
}

return nullptr;
}
} // namespace v1
} // namespace api
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ namespace api

std::shared_ptr<ov::Data> MakeID3Data(const Json::Value &events); // ID3v2
std::shared_ptr<ov::Data> MakeCueData(const Json::Value &events); // CUE
std::shared_ptr<ov::Data> MakeAMFData(const Json::Value &events); // AMF
};
}
}
53 changes: 48 additions & 5 deletions src/projects/providers/rtmp/rtmp_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,48 @@ namespace pvd
return true;
}

bool RtmpStream::OnAmfTextData(const std::shared_ptr<const RtmpChunkHeader> &header, const AmfDocument &document)
{
int64_t pts = 0;
if (_last_video_pts > _last_audio_pts)
{
pts = _last_video_pts + _last_video_pts_clock.Elapsed();
}
else
{
pts = _last_audio_pts + _last_audio_pts_clock.Elapsed();
}

ov::ByteStream byte_stream;
if (document.Encode(byte_stream) == false)
{
return false;
}

return SendDataFrame(pts, cmn::BitstreamFormat::AMF, cmn::PacketType::EVENT, byte_stream.GetDataPointer(), false);
}

bool RtmpStream::OnAmfCuePoint(const std::shared_ptr<const RtmpChunkHeader> &header, const AmfDocument &document)
{
int64_t pts = 0;
if (_last_video_pts > _last_audio_pts)
{
pts = _last_video_pts + _last_video_pts_clock.Elapsed();
}
else
{
pts = _last_audio_pts + _last_audio_pts_clock.Elapsed();
}

ov::ByteStream byte_stream;
if (document.Encode(byte_stream) == false)
{
return false;
}

return SendDataFrame(pts, cmn::BitstreamFormat::AMF, cmn::PacketType::EVENT, byte_stream.GetDataPointer(), false);
}

off_t RtmpStream::ReceiveHandshakePacket(const std::shared_ptr<const ov::Data> &data)
{
// +-------------+ +-------------+
Expand Down Expand Up @@ -1312,13 +1354,14 @@ namespace pvd
break;

default:
// Find it in Events
if (CheckEventMessage(message->header, document) == false)
{
logtw("Unknown Amf0DataMessage - Message(%s / %s)", message_name.CStr(), data_name.CStr());
}
break;
}

// Find it in Events
if (CheckEventMessage(message->header, document) == false)
{
logtd("There were no triggered events - Message(%s / %s)", message_name.CStr(), data_name.CStr());
}
}

bool RtmpStream::CheckEventMessage(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document)
Expand Down
2 changes: 2 additions & 0 deletions src/projects/providers/rtmp/rtmp_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ namespace pvd
bool OnAmfPublish(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
bool OnAmfDeleteStream(const std::shared_ptr<const RtmpChunkHeader> &header, AmfDocument &document, double transaction_id);
bool OnAmfMetaData(const std::shared_ptr<const RtmpChunkHeader> &header, const AmfProperty *property);
bool OnAmfTextData(const std::shared_ptr<const RtmpChunkHeader> &header, const AmfDocument &document);
bool OnAmfCuePoint(const std::shared_ptr<const RtmpChunkHeader> &header, const AmfDocument &document);

// Send messages
bool SendData(const std::shared_ptr<const ov::Data> &data);
Expand Down

0 comments on commit 5577d76

Please sign in to comment.