-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathframe_encoder.py
168 lines (130 loc) · 5.13 KB
/
frame_encoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import logging
from typing import Dict, Optional
import numpy as np
from av.codec import CodecContext
from av.error import FFmpegError
from av.packet import Packet
from av.video.codeccontext import VideoCodecContext
from av.video.frame import VideoFrame
class FrameEncoderError(Exception):
"""FrameEncoderError Exception."""
pass
# TODO: only for testing purpose
# from pathlib import Path
# Path("input").mkdir(parents=True, exist_ok=True)
logger = logging.getLogger("Video frame encoder")
class FrameEncoder:
"""Video frame Encoder."""
def __init__(
self, width: int, height: int, fps: float = 30, codec: str = "h264", options: Optional[Dict[str, str]] = None
) -> None:
"""Constructor.
Args:
width (int): Video frame width.
height (int): Video frame height.
fps (float): Video framerate (FPS), default: 30.
codec (str): Video codec name, e.g. h264, hevc, or vp9, default: h264.
options (Dict[str, str], optional): Codec options, e.g. {"crf": "0", "preset": "ultrafast",
"tune": "zerolatency", "x264-params": "keyint=5"}, default: {"preset": "ultrafast",
"tune": "zerolatency"}.
"""
# Some default options.
if options is None:
if codec == "h264":
options = {"preset": "ultrafast", "tune": "zerolatency"}
elif codec == "hevc":
options = {"crf": "10", "preset": "ultrafast", "tune": "zerolatency", "x265-params": "frame-threads=1"}
elif codec == "vp9":
options = {"quality": "realtime", "speed": "8", "lag-in-frames": "0"}
logger.debug(f"Encoder ({codec}) options: {options}")
# TODO: only for testing purpose
# options = {"crf": "0", "preset": "ultrafast", "tune": "zerolatency"}
# options = {"preset": "ultrafast", "tune": "zerolatency", "x264-params": "keyint=5"}
# self.frame_id = 0
self._fps = fps
self._width = width
self._height = height
self._options = options
self._pix_fmt = "yuv420p"
self._encoder: VideoCodecContext = CodecContext.create(codec, "w")
self._init_count = 0
self.last_timestamp: int = 0
self._last_frame_is_keyframe = False
self.encoder_init()
def width(self) -> int:
"""Get video frame width.
Returns:
Video frame width.
"""
return self._width
def height(self) -> int:
"""Get video frame height.
Returns:
Video frame height.
"""
return self._height
def fps(self) -> float:
"""Get video framerate.
Returns:
Video framerate.
"""
return self._fps
def encoder_init(self) -> None:
"""Init video encoder."""
self._init_count += 1
if self._encoder.is_open:
self._encoder.close()
self._encoder.width = self._width
self._encoder.height = self._height
self._encoder.framerate = self._fps
self._encoder.pix_fmt = self._pix_fmt
self._encoder.options = self._options
self._encoder.open()
def get_init_count(self) -> int:
"""Get encoder init attempts count.
Returns:
Encoder init attempts count.
"""
return self._init_count
def last_frame_is_keyframe(self) -> bool:
"""Is last frame a keyframe?
Returns:
True if last frame is keyframe.
"""
return self._last_frame_is_keyframe
def encode_ndarray(self, frame_data: np.ndarray, format: str = "bgr24") -> bytes:
"""Encode ndarray to packets bytes.
Args:
frame_data (ndarray): Video frame / image.
format (str): Image format.
Returns:
Packet data.
"""
try:
frame = VideoFrame.from_ndarray(frame_data, format=format)
# TODO: only for testing purpose
# frame.to_image().save("input/frame-%04d.jpg" % self.frame_id)
# self.frame_id += 1
self._last_frame_is_keyframe = False
packets = []
packet: Packet
for packet in self._encoder.encode(frame):
# TODO: only for testing purpose
logger.debug(f"Frame {frame} encoded to packet: {packet}")
logger.debug(
f"packet.pts: {packet.pts}, "
f"packet.dts: {packet.dts}, "
f"packet.key_frame: {packet.is_keyframe}, "
f"packet.is_corrupt: {packet.is_corrupt}"
)
if packet.is_keyframe:
self._last_frame_is_keyframe = True
packets.append(bytes(packet))
# TODO: only for testing purpose
# if len(packets) == 0:
# raise FFmpegError(tag_to_code(b"INDA"), f"No packet encoded from frame {frame}")
if len(packets) > 1:
logger.info(f"Frame {frame} encoded to multiple packets: {packets}")
return b"".join(packets)
except FFmpegError as e:
raise FrameEncoderError(e)