-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathframe_decoder.py
139 lines (105 loc) · 4.03 KB
/
frame_decoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import logging
import numpy as np
from av.codec import CodecContext
from av.error import FFmpegError, tag_to_code
from av.packet import Packet
from av.video.codeccontext import VideoCodecContext
from av.video.frame import VideoFrame
class FrameDecoderError(Exception):
"""FrameDecoderError Exception."""
pass
# TODO: only for testing purpose
# from pathlib import Path
# Path("output").mkdir(parents=True, exist_ok=True)
logger = logging.getLogger("Video frame decoder")
class FrameDecoder:
"""Video frame Decoder."""
def __init__(self, width: int, height: int, fps: float = 30, codec: str = "h264") -> None:
"""Constructor.
Args:
width (int): Video frame width.
height (int): Video frame height.
fps (float): Video framerate (FPS), default: 30.
codec (str): Video codec name, e.g. h264, hevc, or vp9, default: h264.
"""
self._fps = fps
self._width = width
self._height = height
self._pix_fmt = "yuv420p"
self._decoder: VideoCodecContext = CodecContext.create(codec, "r")
self._init_count = 0
self.last_timestamp: int = 0
self._last_frame_is_keyframe = False
self.decoder_init()
def width(self) -> int:
"""Get video frame width.
Returns:
Video frame width.
"""
return self._width
def height(self) -> int:
"""Get video frame height.
Returns:
Video frame height.
"""
return self._height
def fps(self) -> float:
"""Get video framerate.
Returns:
Video framerate.
"""
return self._fps
def decoder_init(self) -> None:
"""Init video decoder."""
self._init_count += 1
if self._decoder.is_open:
self._decoder.close()
self._decoder.width = self._width
self._decoder.height = self._height
self._decoder.framerate = self._fps
self._decoder.pix_fmt = self._pix_fmt
self._decoder.open()
def get_init_count(self) -> int:
"""Get decoder init attempts count.
Returns:
Decoder init attempts count.
"""
return self._init_count
def last_frame_is_keyframe(self) -> bool:
"""Is last frame a keyframe?
Returns:
True if last frame is keyframe.
"""
return self._last_frame_is_keyframe
def decode_packet_data(self, packet_data: bytes, format: str = "bgr24") -> np.ndarray:
"""Decode packets bytes to ndarray.
Args:
packet_data (bytes): Packet data.
format (str): Image format.
Returns:
Video frame / image.
"""
try:
packet = Packet(packet_data)
# Multiple frames? - This should not happen because on the encoders side one frame is always encoded and sent
frame: VideoFrame
for frame in self._decoder.decode(packet):
# TODO: only for testing purpose
logger.debug(f"Frame {frame} decoded from packet: {packet}")
logger.debug(
f"frame.pts: {frame.pts}, "
f"frame.dts: {frame.dts}, "
f"frame.key_frame: {frame.key_frame}, "
f"frame.is_corrupt: {frame.is_corrupt}, "
f"frame.time: {frame.time}"
)
# TODO: only for testing purpose
# frame.to_image().save("output/frame-%04d.jpg" % frame.index)
self._last_frame_is_keyframe = frame.key_frame
frame_ndarray: np.ndarray = frame.to_ndarray(format=format)
return frame_ndarray
# Sometimes, with dropping some TCP packets, no frame is decoded from av.Packet while no FFmpegError is
# raised. This is resolved by throwing this exception.
raise FFmpegError(tag_to_code(b"INDA"), f"No frame decoded from packet {packet}")
except FFmpegError as e:
raise FrameDecoderError(e)