-
Notifications
You must be signed in to change notification settings - Fork 136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding various fps support to SCC read and SCC write #264
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,7 +93,7 @@ | |
MICROSECONDS_PER_CODEWORD, CHARACTER_TO_CODE, | ||
SPECIAL_OR_EXTENDED_CHAR_TO_CODE, PAC_BYTES_TO_POSITIONING_MAP, | ||
PAC_HIGH_BYTE_BY_ROW, PAC_LOW_BYTE_BY_ROW_RESTRICTED, | ||
PAC_TAB_OFFSET_COMMANDS, | ||
PAC_TAB_OFFSET_COMMANDS, MICROSECONDS_PER_CODEWORD_fps, | ||
) | ||
from .specialized_collections import ( # noqa: F401 | ||
TimingCorrectingCaptionList, NotifyingDict, CaptionCreator, | ||
|
@@ -195,7 +195,7 @@ def detect(self, content): | |
else: | ||
return False | ||
|
||
def read(self, content, lang='en-US', simulate_roll_up=False, offset=0): | ||
def read(self, content, lang='en-US', simulate_roll_up=False, offset=0, src_fps=30.0): | ||
"""Converts the unicode string into a CaptionSet | ||
|
||
:type content: str | ||
|
@@ -212,8 +212,12 @@ def read(self, content, lang='en-US', simulate_roll_up=False, offset=0): | |
:type offset: int | ||
:param offset: | ||
|
||
:type src_fps: float | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update the user documentation as well with a short explanation and a usage example for both reader and writer class. You can find the documentation in |
||
:param src_fps: source frame rate default value 30.0 | ||
|
||
:rtype: CaptionSet | ||
""" | ||
|
||
if not isinstance(content, str): | ||
raise InvalidInputError('The content is not a unicode string.') | ||
|
||
|
@@ -224,7 +228,7 @@ def read(self, content, lang='en-US', simulate_roll_up=False, offset=0): | |
|
||
# loop through each line except the first | ||
for line in lines[1:]: | ||
self._translate_line(line) | ||
self._translate_line(line, src_fps) | ||
|
||
self._flush_implicit_buffers(self.buffer_dict.active_key) | ||
|
||
|
@@ -246,6 +250,28 @@ def read(self, content, lang='en-US', simulate_roll_up=False, offset=0): | |
|
||
return captions | ||
|
||
def _fix_last_timing(self, timing, src_fps=30.0): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is no longer used and it needs to be removed. |
||
"""HACK HACK: Certain Paint-On captions don't specify the 942f [EOC] | ||
(End Of Caption) command on the same line. | ||
If this is a 942f line, also simulate a 942c (Erase Displayed Memory) | ||
to properly set the timing on the last caption. | ||
|
||
This method needs some serious attention, because it proves the timing | ||
calculation is not done well for Pop-On captions | ||
""" | ||
# Calculate the end time from the current line | ||
time_translator = _SccTimeTranslator() | ||
time_translator.start_at(timing) | ||
time_translator.offset = self.time_translator.offset | ||
|
||
# But use the current time translator for the start time | ||
self.caption_stash.create_and_store( | ||
self.buffer, self.time_translator.get_time(src_fps)) | ||
|
||
self.caption_stash.correct_last_timing(time_translator.get_time(src_fps)) | ||
self.buffer = self.node_creator_factory.node_creator() | ||
|
||
|
||
def _flush_implicit_buffers(self, old_key=None, *args): | ||
"""Convert to Captions those buffers whose behavior is implicit. | ||
|
||
|
@@ -257,6 +283,7 @@ def _flush_implicit_buffers(self, old_key=None, *args): | |
If they're on the last row however, or if the caption type is changing, | ||
we make sure to convert the buffers to text, so we don't lose any info. | ||
""" | ||
|
||
if old_key == 'pop': | ||
if self.pop_ons_queue: | ||
self._pop_on() | ||
|
@@ -270,7 +297,7 @@ def _flush_implicit_buffers(self, old_key=None, *args): | |
self.caption_stash.create_and_store(self.buffer, self.time) | ||
self.buffer = self.node_creator_factory.new_creator() | ||
|
||
def _translate_line(self, line): | ||
def _translate_line(self, line, src_fps=30.0): | ||
# ignore blank lines | ||
if line.strip() == '': | ||
return | ||
|
@@ -286,9 +313,9 @@ def _translate_line(self, line): | |
# ignore empty results or invalid commands | ||
word = word.strip() | ||
if len(word) == 4: | ||
self._translate_word(word) | ||
self._translate_word(word, src_fps) | ||
|
||
def _translate_word(self, word): | ||
def _translate_word(self, word, src_fps=30.0): | ||
# count frames for timing | ||
self.time_translator.increment_frames() | ||
|
||
|
@@ -298,7 +325,7 @@ def _translate_word(self, word): | |
# TODO - check that all the positioning commands are here, or use | ||
# some other strategy to determine if the word is a command. | ||
if word in COMMANDS or _is_pac_command(word): | ||
self._translate_command(word) | ||
self._translate_command(word, src_fps) | ||
|
||
# second, check if word is a special character | ||
elif word in SPECIAL_CHARS: | ||
|
@@ -344,7 +371,7 @@ def _translate_extended_char(self, word): | |
# add to buffer | ||
self.buffer.add_chars(EXTENDED_CHARS[word]) | ||
|
||
def _translate_command(self, word): | ||
def _translate_command(self, word, src_fps=30.0): | ||
# if command is pop_up | ||
if word == '9420': | ||
self.buffer_dict.set_active('pop') | ||
|
@@ -360,7 +387,7 @@ def _translate_command(self, word): | |
) | ||
self.buffer = self.node_creator_factory.new_creator() | ||
|
||
self.time = self.time_translator.get_time() | ||
self.time = self.time_translator.get_time(src_fps) | ||
|
||
# if command is roll_up 2, 3 or 4 rows | ||
elif word in ('9425', '9426', '94a7'): | ||
|
@@ -382,34 +409,35 @@ def _translate_command(self, word): | |
|
||
# set rows to empty, configure start time for caption | ||
self.roll_rows = [] | ||
self.time = self.time_translator.get_time() | ||
self.time = self.time_translator.get_time(src_fps) | ||
|
||
# clear pop_on buffer | ||
elif word == '94ae': | ||
self.buffer = self.node_creator_factory.new_creator() | ||
|
||
# display pop_on buffer [End Of Caption] | ||
elif word == '942f': | ||
self.time = self.time_translator.get_time() | ||
self.time = self.time_translator.get_time(src_fps) | ||
if self.pop_ons_queue: | ||
# there's a pop-on cue not ended by the 942c command | ||
self._pop_on(end=self.time) | ||
if self.buffer.is_empty(): | ||
return | ||
cue = PopOnCue(buffer=deepcopy(self.buffer), start=self.time, end=0) | ||
self.pop_ons_queue.appendleft(cue) | ||
|
||
self.buffer = self.node_creator_factory.new_creator() | ||
|
||
# roll up captions [Carriage Return] | ||
elif word == '94ad': | ||
# display roll-up buffer | ||
if not self.buffer.is_empty(): | ||
self._roll_up() | ||
self._roll_up(src_fps) | ||
|
||
# 942c - Erase Displayed Memory - Clear the current screen of any | ||
# displayed captions or text. | ||
elif word == '942c' and self.pop_ons_queue: | ||
self._pop_on(end=self.time_translator.get_time()) | ||
self._pop_on(end=self.time_translator.get_time(src_fps)) | ||
|
||
# If command is not one of the aforementioned, add it to buffer | ||
else: | ||
|
@@ -443,7 +471,8 @@ def buffer(self, value): | |
except TypeError: | ||
pass | ||
|
||
def _roll_up(self): | ||
|
||
def _roll_up(self, src_fps=30.0): | ||
# We expect the active buffer to be the roll buffer | ||
if self.simulate_roll_up: | ||
if self.roll_rows_expected > 1: | ||
|
@@ -459,7 +488,7 @@ def _roll_up(self): | |
self.buffer = self.node_creator_factory.new_creator() | ||
|
||
# configure time | ||
self.time = self.time_translator.get_time() | ||
self.time = self.time_translator.get_time(src_fps) | ||
|
||
# try to insert the proper ending time for the previous caption | ||
self.caption_stash.correct_last_timing(self.time, force=True) | ||
|
@@ -474,7 +503,7 @@ class SCCWriter(BaseWriter): | |
def __init__(self, *args, **kw): | ||
super().__init__(*args, **kw) | ||
|
||
def write(self, caption_set): | ||
def write(self, caption_set, src_fps=30.0): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think the name src_fps is a good fit, as this is no longer the fps dedicated to the source file, but to the output file. Maybe calling it simply fps would be easier? |
||
output = HEADER + '\n\n' | ||
|
||
if caption_set.is_empty(): | ||
|
@@ -495,24 +524,24 @@ def write(self, caption_set): | |
# buffer; possibly remove the previous clear-screen command | ||
for index, (code, start, end) in enumerate(codes): | ||
code_words = len(code) / 5 + 8 | ||
code_time_microseconds = code_words * MICROSECONDS_PER_CODEWORD | ||
code_time_microseconds = code_words * MICROSECONDS_PER_CODEWORD_fps(src_fps) | ||
code_start = start - code_time_microseconds | ||
if index == 0: | ||
continue | ||
previous_code, previous_start, previous_end = codes[index - 1] | ||
if previous_end + 3 * MICROSECONDS_PER_CODEWORD >= code_start: | ||
if previous_end + 3 * MICROSECONDS_PER_CODEWORD_fps(src_fps) >= code_start: | ||
codes[index - 1] = (previous_code, previous_start, None) | ||
codes[index] = (code, code_start, end) | ||
|
||
# PASS 3: | ||
# Write captions. | ||
for (code, start, end) in codes: | ||
output += f'{self._format_timestamp(start)}\t' | ||
output += f'{self._format_timestamp(start, src_fps)}\t' | ||
output += '94ae 94ae 9420 9420 ' | ||
output += code | ||
output += '942c 942c 942f 942f\n\n' | ||
if end is not None: | ||
output += f'{self._format_timestamp(end)}\t942c 942c\n\n' | ||
output += f'{self._format_timestamp(end, src_fps)}\t942c 942c\n\n' | ||
|
||
return output | ||
|
||
|
@@ -577,7 +606,7 @@ def _text_to_code(self, s): | |
return code | ||
|
||
@staticmethod | ||
def _format_timestamp(microseconds): | ||
def _format_timestamp(microseconds, src_fps=30.0): | ||
seconds_float = microseconds / 1000.0 / 1000.0 | ||
# Convert to non-drop-frame timecode | ||
seconds_float *= 1000.0 / 1001.0 | ||
|
@@ -587,7 +616,7 @@ def _format_timestamp(microseconds): | |
seconds_float -= minutes * 60 | ||
seconds = math.floor(seconds_float) | ||
seconds_float -= seconds | ||
frames = math.floor(seconds_float * 30) | ||
frames = math.floor(seconds_float * src_fps) | ||
return f'{hours:02}:{minutes:02}:{seconds:02}:{frames:02}' | ||
|
||
|
||
|
@@ -601,19 +630,20 @@ def __init__(self): | |
self.offset = 0 | ||
self._frames = 0 | ||
|
||
def get_time(self): | ||
def get_time(self, src_fps=30.0): | ||
"""Returns the time, in microseconds. Takes into account the number of | ||
frames passed, and the offset | ||
|
||
:rtype: int | ||
""" | ||
return self._translate_time( | ||
self._time[:-2] + str(int(self._time[-2:]) + self._frames), | ||
self.offset | ||
self.offset, | ||
src_fps | ||
) | ||
|
||
@staticmethod | ||
def _translate_time(stamp, offset): | ||
def _translate_time(stamp, offset, src_fps=30.0): | ||
""" | ||
:param stamp: | ||
:type offset: int | ||
|
@@ -634,7 +664,7 @@ def _translate_time(stamp, offset): | |
timestamp_seconds = (int(time_split[0]) * 3600 | ||
+ int(time_split[1]) * 60 | ||
+ int(time_split[2]) | ||
+ int(time_split[3]) / 30.0) | ||
+ int(time_split[3]) / src_fps) | ||
|
||
seconds = timestamp_seconds * seconds_per_timestamp_second | ||
microseconds = seconds * 1000 * 1000 - offset | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -983,5 +983,7 @@ def _restructure_bytes_to_position_map(byte_to_pos_map): | |
# Time to transmit a single codeword = 1 second / 29.97 | ||
MICROSECONDS_PER_CODEWORD = 1000.0 * 1000.0 / (30.0 * 1000.0 / 1001.0) | ||
|
||
def MICROSECONDS_PER_CODEWORD_fps(src_fps=30.0): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a function, not a constant, so it shouldn't be declared with capital letters. Please use only lowercase for function names as per PEP 8 style guide |
||
return 1000.0 * 1000.0 / (src_fps * 1000.0 / 1001.0) | ||
|
||
HEADER = 'Scenarist_SCC V1.0' |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,7 +50,7 @@ def test_empty_file(self, sample_scc_empty): | |
SCCReader().read(sample_scc_empty) | ||
|
||
def test_positioning(self, sample_scc_multiple_positioning): | ||
captions = SCCReader().read(sample_scc_multiple_positioning) | ||
captions = SCCReader().read(sample_scc_multiple_positioning, src_fps=25.0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test doesn't check the timestamps and it's only focused on the SCCReader, please write dedicated tests for both SCCReader and SCCWriter and check if the resulted timestamps are correct. |
||
|
||
# SCC generates only origin, and we always expect it. | ||
expected_positioning = [ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the src_fps parameter gets passed a lot between methods without being used, so maybe it would be a good idea to declare it as a class attribute and assign its value in init. That way you can use it directly anywhere you need as
self.fps
. Please do the same for the writer class to have consistency.