Skip to content

Commit

Permalink
[RSDK-9935] - Iframe only decode (#90)
Browse files Browse the repository at this point in the history
* wip

* wip

* wip

* wip

* add warnings

* wip

* wip

* wip

* wip

* wip

* Update rtsp.go

Co-authored-by: randhid <[email protected]>

* Update rtsp.go

Co-authored-by: randhid <[email protected]>

* Update rtsp.go

Co-authored-by: randhid <[email protected]>

* Update rtsp.go

Co-authored-by: randhid <[email protected]>

---------

Co-authored-by: randhid <[email protected]>
  • Loading branch information
nicksanford and randhid authored Feb 11, 2025
1 parent d7074a7 commit b6d15ab
Showing 1 changed file with 60 additions and 16 deletions.
76 changes: 60 additions & 16 deletions rtsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ func init() {

// Config are the config attributes for an RTSP camera model.
type Config struct {
Address string `json:"rtsp_address"`
RTPPassthrough *bool `json:"rtp_passthrough"`
LazyDecode bool `json:"lazy_decode,omitempty"`
Query viamupnp.DeviceQuery `json:"query"`
Address string `json:"rtsp_address"`
RTPPassthrough *bool `json:"rtp_passthrough"`
LazyDecode bool `json:"lazy_decode,omitempty"`
IframeOnlyDecode bool `json:"i_frame_only_decode,omitempty"`
Query viamupnp.DeviceQuery `json:"query,omitempty"`
}

// CodecFormat contains a pointer to a format and the corresponding FFmpeg codec.
Expand Down Expand Up @@ -158,13 +159,20 @@ type (
}
)

type cache struct {
mimeType string
bytes []byte
imageMetadata camera.ImageMetadata
}

// rtspCamera contains the rtsp client, and the reader function that fulfills the camera interface.
type rtspCamera struct {
resource.AlwaysRebuild
model resource.Model
gostream.VideoReader
u *base.URL
lazyDecode bool
u *base.URL
lazyDecode bool
iframeOnlyDecode bool

closeMu sync.RWMutex

Expand All @@ -178,11 +186,12 @@ type rtspCamera struct {

activeBackgroundWorkers sync.WaitGroup

// latestFrameMu protects critical sections where frame state changes (e.g. ref counting) need to be atomic
// with swapping out the latest frame.
latestFrameMu sync.Mutex
latestMJPEGBytes atomic.Pointer[[]byte]
latestFrame *avFrameWrapper
// frameSwapMu protects critical sections where frame state changes (e.g. ref counting) need to be atomic
// with swapping out the latest frame.
frameSwapMu sync.Mutex
latestFrameCache cache
// We use a pool data structure to amortize the malloc cost of AVFrames and reduce pressure on memory
// management. We create one pool for the entire lifetime of the RTSP camera. Additionally, frames
// from the pool may be for a resolution that does not match the current image. The user of the pool
Expand Down Expand Up @@ -486,6 +495,10 @@ func (rc *rtspCamera) initH264(session *description.Session) (err error) {
return
}

if rc.iframeOnlyDecode && !h264.IDRPresent(au) {
return
}

if rc.lazyDecode {
if h264.IDRPresent(au) {
rc.resetLazyAU(au)
Expand Down Expand Up @@ -613,6 +626,10 @@ func (rc *rtspCamera) initH265(session *description.Session) (err error) {
return
}

if rc.iframeOnlyDecode && !h265.IsRandomAccess(au) {
return
}

packedAU := packH265AUIntoNALU(au, rc.logger)
if rc.lazyDecode {
if h265.IsRandomAccess(au) {
Expand Down Expand Up @@ -671,9 +688,16 @@ func (rc *rtspCamera) initMJPEG(session *description.Session) error {
if rc.rtpPassthrough {
rc.logger.Warn("rtp_passthrough is only supported for H264 codec. rtp_passthrough features disabled due to MJPEG RTSP track")
}

if rc.lazyDecode {
rc.logger.Warn("lazy_decode is currently only supported for H264 codec. lazy_decode features disabled due to MJPEG RTSP track")
rc.logger.Warn("lazy_decode is currently only supported for H264 and H265 codecs. lazy_decode features disabled due to MJPEG RTSP track")
}

if rc.iframeOnlyDecode {
rc.logger.Warn("i_frame_only_decode is currently only supported for H264 and H265 codecs. " +
"lazy_decode features disabled due to MJPEG RTSP track")
}

var f *format.MJPEG
media := session.FindFormat(&f)
if media == nil {
Expand Down Expand Up @@ -713,7 +737,12 @@ func (rc *rtspCamera) initMPEG4(session *description.Session) error {
}

if rc.lazyDecode {
rc.logger.Warn("lazy_decode is currently only supported for H264 codec. lazy_decode features disabled due to MPEG4 RTSP track")
rc.logger.Warn("lazy_decode is currently only supported for H264 and H265 codecs. lazy_decode features disabled due to MPEG4 RTSP track")
}

if rc.iframeOnlyDecode {
rc.logger.Warn("i_frame_only_decode is currently only supported for H264 and H265 codecs. " +
"lazy_decode features disabled due to MPEG4 RTSP track")
}

var f *format.MPEG4Video
Expand Down Expand Up @@ -925,6 +954,7 @@ func NewRTSPCamera(ctx context.Context, _ resource.Dependencies, conf resource.C
rc := &rtspCamera{
model: conf.Model,
lazyDecode: newConf.LazyDecode,
iframeOnlyDecode: newConf.IframeOnlyDecode,
u: u,
name: conf.ResourceName(),
rtpPassthrough: rtpPassthrough,
Expand Down Expand Up @@ -1097,8 +1127,8 @@ func (rc *rtspCamera) decodeAndStore(nalu []byte) error {
// the previous frame by trying to put it back in the pool. It might not make
// it back into the pool immediately or at all depending on its state.
func (rc *rtspCamera) handleLatestFrame(newFrame *avFrameWrapper) {
rc.frameSwapMu.Lock()
defer rc.frameSwapMu.Unlock()
rc.latestFrameMu.Lock()
defer rc.latestFrameMu.Unlock()

prevFrame := rc.latestFrame
if prevFrame != nil {
Expand All @@ -1108,6 +1138,7 @@ func (rc *rtspCamera) handleLatestFrame(newFrame *avFrameWrapper) {
}
newFrame.incrementRefs()
rc.latestFrame = newFrame
rc.latestFrameCache = cache{}
}

func naluType(nalu []byte) h264.NALUType {
Expand All @@ -1125,7 +1156,8 @@ func (rc *rtspCamera) Image(_ context.Context, mimeType string, _ map[string]int
defer rc.closeMu.RUnlock()
start := time.Now()
defer func() {
rc.logger.Infof("codec: %s, lazy: %t, time: %s", videoCodec(rc.currentCodec.Load()), rc.lazyDecode, time.Since(start))
rc.logger.Debugf("codec: %s, lazy: %t, iframe_only: %t, time: %s",
videoCodec(rc.currentCodec.Load()), rc.lazyDecode, rc.iframeOnlyDecode, time.Since(start))
}()
if err := rc.cancelCtx.Err(); err != nil {
return nil, camera.ImageMetadata{}, err
Expand All @@ -1144,8 +1176,8 @@ func (rc *rtspCamera) Image(_ context.Context, mimeType string, _ map[string]int

func (rc *rtspCamera) getAndConvertFrame(mimeType string) ([]byte, camera.ImageMetadata, error) {
rc.consumeLazyAU()
rc.frameSwapMu.Lock()
defer rc.frameSwapMu.Unlock()
rc.latestFrameMu.Lock()
defer rc.latestFrameMu.Unlock()
if rc.latestFrame == nil {
return nil, camera.ImageMetadata{}, errors.New("no frame yet")
}
Expand All @@ -1154,6 +1186,13 @@ func (rc *rtspCamera) getAndConvertFrame(mimeType string) ([]byte, camera.ImageM
var bytes []byte
var metadata camera.ImageMetadata
var err error
if rc.latestFrameCache.bytes != nil && rc.latestFrameCache.mimeType == mimeType {
if refCount := currentFrame.decrementRefs(); refCount == 0 {
rc.avFramePool.put(currentFrame)
}
return rc.latestFrameCache.bytes, rc.latestFrameCache.imageMetadata, nil
}

switch mimeType {
case rutils.MimeTypeJPEG, rutils.MimeTypeJPEG + "+" + rutils.MimeTypeSuffixLazy:
bytes, metadata, err = rc.mimeHandler.convertJPEG(currentFrame.frame)
Expand All @@ -1171,6 +1210,11 @@ func (rc *rtspCamera) getAndConvertFrame(mimeType string) ([]byte, camera.ImageM
if err != nil {
return nil, camera.ImageMetadata{}, err
}
rc.latestFrameCache = cache{
mimeType: mimeType,
bytes: bytes,
imageMetadata: metadata,
}
return bytes, metadata, err
}

Expand Down

0 comments on commit b6d15ab

Please sign in to comment.