Flake8 cleanups

This commit is contained in:
Jamie Hardt
2023-07-21 13:21:01 -07:00
parent 4318946596
commit f825b92586
4 changed files with 133 additions and 101 deletions

View File

@@ -3,6 +3,7 @@ Parse and convert Pro Tools text exports
"""
__version__ = '2.0.0'
__author__ = 'Jamie Hardt'
__author__ = 'Jamie Hardt'
__license__ = 'MIT'
__copyright__ = "%s %s (c) 2023 %s. All rights reserved." % (__name__, __version__, __author__)
__copyright__ = "%s %s (c) 2023 %s. All rights reserved." \
% (__name__, __version__, __author__)

View File

@@ -2,9 +2,11 @@ from optparse import OptionParser, OptionGroup
import datetime
import sys
from ptulsconv import __name__, __version__, __author__, __copyright__
from ptulsconv import __name__, __copyright__
from ptulsconv.commands import convert
from ptulsconv.reporting import print_status_style, print_banner_style, print_section_header_style, print_fatal_error
from ptulsconv.reporting import print_status_style, \
print_banner_style, print_section_header_style, \
print_fatal_error
def dump_field_map(output=sys.stdout):
@@ -19,12 +21,12 @@ def dump_formats():
print_section_header_style("`raw` format:")
sys.stderr.write("A JSON document of the parsed Pro Tools export.\n")
print_section_header_style("`tagged` Format:")
sys.stderr.write("A JSON document containing one record for each clip, with\n"
"all tags parsed and all tagging rules applied. \n")
sys.stderr.write(
"A JSON document containing one record for each clip, with\n"
"all tags parsed and all tagging rules applied. \n")
print_section_header_style("`doc` format:")
sys.stderr.write("Creates a directory with folders for different types\n"
"of ADR reports.\n\n")
"of ADR reports.\n\n")
def main():
@@ -45,28 +47,33 @@ def main():
warn_options.add_option('-W', action='store_false',
dest='warnings',
default=True,
help='Suppress warnings for common errors (missing code numbers etc.)')
help='Suppress warnings for common '
'errors (missing code numbers etc.)')
parser.add_option_group(warn_options)
informational_options = OptionGroup(title="Informational Options",
parser=parser,
description='Print useful information and exit without processing '
'input files.')
description='Print useful '
'information '
'and exit without processing '
'input files.')
informational_options.add_option('--show-formats',
dest='show_formats',
action='store_true',
default=False,
help='Display helpful information about the '
'available output formats.')
informational_options.add_option(
'--show-formats',
dest='show_formats',
action='store_true',
default=False,
help='Display helpful information about the available '
'output formats.')
informational_options.add_option('--show-available-tags',
dest='show_tags',
action='store_true',
default=False,
help='Display tag mappings for the FMP XML '
'output style and exit.')
informational_options.add_option(
'--show-available-tags',
dest='show_tags',
action='store_true',
default=False,
help='Display tag mappings for the FMP XML output style '
'and exit.')
parser.add_option_group(informational_options)
@@ -74,9 +81,9 @@ def main():
(options, args) = parser.parse_args(sys.argv)
print_section_header_style("Startup")
print_status_style("This run started %s" % (datetime.datetime.now().isoformat()))
print_status_style("This run started %s" %
(datetime.datetime.now().isoformat()))
if options.show_tags:
dump_field_map()
@@ -89,7 +96,9 @@ def main():
major_mode = options.output_format
if len(args) < 2:
print_status_style("No input file provided, will connect to Pro Tools with PTSL...")
print_status_style(
"No input file provided, will connect to Pro Tools "
"with PTSL...")
convert(major_mode=major_mode,
warnings=options.warnings)
else:

View File

@@ -9,13 +9,15 @@ from fractions import Fraction
from typing import Optional, SupportsFloat
class TimecodeFormat(namedtuple("_TimecodeFormat", "frame_duration logical_fps drop_frame")):
class TimecodeFormat(namedtuple("_TimecodeFormat",
"frame_duration logical_fps drop_frame")):
"""
A struct reperesenting a timecode datum.
"""
def smpte_to_seconds(self, smpte: str) -> Optional[Fraction]:
frame_count = smpte_to_frame_count(smpte, self.logical_fps, drop_frame_hint=self.drop_frame)
frame_count = smpte_to_frame_count(
smpte, self.logical_fps, drop_frame_hint=self.drop_frame)
if frame_count is None:
return None
else:
@@ -23,29 +25,34 @@ class TimecodeFormat(namedtuple("_TimecodeFormat", "frame_duration logical_fps d
def seconds_to_smpte(self, seconds: SupportsFloat) -> str:
frame_count = int(seconds / self.frame_duration)
return frame_count_to_smpte(frame_count, self.logical_fps, self.drop_frame)
return frame_count_to_smpte(frame_count, self.logical_fps,
self.drop_frame)
def smpte_to_frame_count(smpte_rep_string: str, frames_per_logical_second: int, drop_frame_hint=False) -> Optional[int]:
def smpte_to_frame_count(smpte_rep_string: str, frames_per_logical_second: int,
drop_frame_hint=False) -> Optional[int]:
"""
Convert a string with a SMPTE timecode representation into a frame count.
:param smpte_rep_string: The timecode string
:param frames_per_logical_second: Num of frames in a logical second. This is asserted to be
in one of `[24,25,30,48,50,60]`
:param drop_frame_hint: `True` if the timecode rep is drop frame. This is ignored (and implied `True`) if
the last separator in the timecode string is a semicolon. This is ignored (and implied `False`) if
`frames_per_logical_second` is not 30 or 60.
:param frames_per_logical_second: Num of frames in a logical second. This
is asserted to be in one of `[24,25,30,48,50,60]`
:param drop_frame_hint: `True` if the timecode rep is drop frame. This is
ignored (and implied `True`) if the last separator in the timecode
string is a semicolon. This is ignored (and implied `False`) if
`frames_per_logical_second` is not 30 or 60.
"""
assert frames_per_logical_second in [24, 25, 30, 48, 50, 60]
m = re.search(r'(\d?\d)[:;](\d\d)[:;](\d\d)([:;])(\d\d)(\.\d+)?', smpte_rep_string)
m = re.search(
r'(\d?\d)[:;](\d\d)[:;](\d\d)([:;])(\d\d)(\.\d+)?', smpte_rep_string)
if m is None:
return None
hh, mm, ss, sep, ff, frac = m.groups()
hh, mm, ss, ff, frac = int(hh), int(mm), int(ss), int(ff), float(frac or 0.0)
hh, mm, ss, ff, frac = int(hh), int(
mm), int(ss), int(ff), float(frac or 0.0)
drop_frame = drop_frame_hint
if sep == ";":
@@ -54,8 +61,8 @@ def smpte_to_frame_count(smpte_rep_string: str, frames_per_logical_second: int,
if frames_per_logical_second not in [30, 60]:
drop_frame = False
raw_frames = hh * 3600 * frames_per_logical_second + mm * 60 * frames_per_logical_second + \
ss * frames_per_logical_second + ff
raw_frames = hh * 3600 * frames_per_logical_second + mm * 60 * \
frames_per_logical_second + ss * frames_per_logical_second + ff
frames = raw_frames
if drop_frame is True:
@@ -68,7 +75,8 @@ def smpte_to_frame_count(smpte_rep_string: str, frames_per_logical_second: int,
return frames
def frame_count_to_smpte(frame_count: int, frames_per_logical_second: int, drop_frame: bool = False,
def frame_count_to_smpte(frame_count: int, frames_per_logical_second: int,
drop_frame: bool = False,
fractional_frame: Optional[float] = None) -> str:
assert frames_per_logical_second in [24, 25, 30, 48, 50, 60]
assert fractional_frame is None or fractional_frame < 1.0
@@ -90,7 +98,8 @@ def frame_count_to_smpte(frame_count: int, frames_per_logical_second: int, drop_
hh = hh % 24
if fractional_frame is not None and fractional_frame > 0:
return "%02i:%02i:%02i%s%02i%s" % (hh, mm, ss, separator, ff, ("%.3f" % fractional_frame)[1:])
return "%02i:%02i:%02i%s%02i%s" % (hh, mm, ss, separator, ff,
("%.3f" % fractional_frame)[1:])
else:
return "%02i:%02i:%02i%s%02i" % (hh, mm, ss, separator, ff)

View File

@@ -8,19 +8,20 @@ import os
import sys
from itertools import chain
import csv
from typing import List
from typing import List, Optional, Iterator
from fractions import Fraction
import ptsl
from .docparser.adr_entity import make_entities
from .reporting import print_section_header_style, print_status_style, print_warning
from .validations import *
from .docparser.adr_entity import make_entities, ADRLine
from .reporting import print_section_header_style, print_status_style,\
print_warning
from .validations import validate_unique_field, validate_non_empty_field,\
validate_dependent_value
from ptulsconv.docparser import parse_document
from ptulsconv.docparser.tag_compiler import TagCompiler
from ptulsconv.broadcast_timecode import TimecodeFormat
from fractions import Fraction
from ptulsconv.pdf.supervisor_1pg import output_report as output_supervisor_1pg
from ptulsconv.pdf.line_count import output_report as output_line_count
@@ -50,9 +51,9 @@ class MyEncoder(JSONEncoder):
def output_adr_csv(lines: List[ADRLine], time_format: TimecodeFormat):
"""
Writes ADR lines as CSV to the current working directory. Creates directories
for each character number and name pair, and within that directory, creates
a CSV file for each reel.
Writes ADR lines as CSV to the current working directory. Creates
directories for each character number and name pair, and within that
directory, creates a CSV file for each reel.
"""
reels = set([ln.reel for ln in lines])
@@ -61,12 +62,15 @@ def output_adr_csv(lines: List[ADRLine], time_format: TimecodeFormat):
os.makedirs(dir_name, exist_ok=True)
os.chdir(dir_name)
for reel in reels:
these_lines = [ln for ln in lines if ln.character_id == n and ln.reel == reel]
these_lines = [ln for ln in lines
if ln.character_id == n and ln.reel == reel]
if len(these_lines) == 0:
continue
outfile_name = "%s_%s_%s_%s.csv" % (these_lines[0].title, n, these_lines[0].character_name, reel,)
outfile_name = "%s_%s_%s_%s.csv" % (these_lines[0].title, n,
these_lines[0].character_name,
reel,)
with open(outfile_name, mode='w', newline='') as outfile:
writer = csv.writer(outfile, dialect='excel')
@@ -80,18 +84,21 @@ def output_adr_csv(lines: List[ADRLine], time_format: TimecodeFormat):
for event in these_lines:
this_start = event.start or 0
this_finish = event.finish or 0
this_row = [event.title, event.character_name, event.cue_number,
event.reel, event.version,
time_format.seconds_to_smpte(this_start), time_format.seconds_to_smpte(this_finish),
this_row = [event.title, event.character_name,
event.cue_number, event.reel, event.version,
time_format.seconds_to_smpte(this_start),
time_format.seconds_to_smpte(this_finish),
float(this_start), float(this_finish),
event.prompt,
event.reason, event.note, "TV" if event.tv else ""]
event.reason, event.note, "TV"
if event.tv else ""]
writer.writerow(this_row)
os.chdir("..")
def generate_documents(session_tc_format, scenes, adr_lines: Iterator[ADRLine], title):
def generate_documents(session_tc_format, scenes, adr_lines: Iterator[ADRLine],
title):
"""
Create PDF output.
"""
@@ -105,22 +112,22 @@ def generate_documents(session_tc_format, scenes, adr_lines: Iterator[ADRLine],
supervisor = next((x.supervisor for x in adr_lines), "")
output_continuity(scenes=scenes, tc_display_format=session_tc_format,
title=title, client=client, supervisor=supervisor)
title=title, client=client,
supervisor=supervisor)
# reels = sorted([r for r in compiler.compile_all_time_spans() if r[0] == 'Reel'],
# key=lambda x: x[2])
reels = ['R1', 'R2', 'R3', 'R4', 'R5', 'R6']
if len(adr_lines) == 0:
print_status_style("No ADR lines were found in the "
"input document. ADR reports will not be generated.")
print_status_style("No ADR lines were found in the input document. "
"ADR reports will not be generated.")
else:
create_adr_reports(adr_lines, tc_display_format=session_tc_format,
reel_list=sorted(reels))
reel_list=sorted(reels))
def create_adr_reports(lines: List[ADRLine], tc_display_format: TimecodeFormat, reel_list: List[str]):
def create_adr_reports(lines: List[ADRLine], tc_display_format: TimecodeFormat,
reel_list: List[str]):
"""
Creates a directory heirarchy and a respective set of ADR reports,
given a list of lines.
@@ -141,7 +148,8 @@ def create_adr_reports(lines: List[ADRLine], tc_display_format: TimecodeFormat,
print_status_style("Creating Director's Logs director and reports")
os.makedirs("Director Logs", exist_ok=True)
os.chdir("Director Logs")
output_summary(lines, tc_display_format=tc_display_format, by_character=True)
output_summary(lines, tc_display_format=tc_display_format,
by_character=True)
os.chdir("..")
print_status_style("Creating CSV outputs")
@@ -156,7 +164,7 @@ def create_adr_reports(lines: List[ADRLine], tc_display_format: TimecodeFormat,
output_talent_sides(lines, tc_display_format=tc_display_format)
def convert(major_mode, input_file = None, output=sys.stdout, warnings=True):
def convert(major_mode, input_file=None, output=sys.stdout, warnings=True):
"""
Primary worker function, accepts the input file and decides
what to do with it based on the `major_mode`.
@@ -179,7 +187,7 @@ def convert(major_mode, input_file = None, output=sys.stdout, warnings=True):
req.time_type("tc")
req.dont_show_crossfades()
req.selected_tracks_only()
session_text = req.export_string()
session_text = req.export_string
session = parse_document(session_text)
session_tc_format = session.header.timecode_format
@@ -198,19 +206,22 @@ def convert(major_mode, input_file = None, output=sys.stdout, warnings=True):
elif major_mode == 'doc':
generic_events, adr_lines = make_entities(compiled_events)
scenes = sorted([s for s in compiler.compile_all_time_spans() if s[0] == 'Sc'],
scenes = sorted([s for s in compiler.compile_all_time_spans()
if s[0] == 'Sc'],
key=lambda x: x[2])
# TODO: Breakdown by titles
titles = set([x.title for x in (generic_events + adr_lines)])
if len(titles) != 1:
print_warning("Multiple titles per export is not supported, "
"found multiple titles: %s Exiting." % titles)
"found multiple titles: %s Exiting." % titles)
exit(-1)
title = list(titles)[0]
print_status_style("%i generic events found." % len(generic_events))
print_status_style(
"%i generic events found." % len(generic_events)
)
print_status_style("%i ADR events found." % len(adr_lines))
if warnings:
@@ -219,23 +230,25 @@ def convert(major_mode, input_file = None, output=sys.stdout, warnings=True):
generate_documents(session_tc_format, scenes, adr_lines, title)
def perform_adr_validations(lines : Iterator[ADRLine]):
def perform_adr_validations(lines: Iterator[ADRLine]):
"""
Performs validations on the input.
"""
for warning in chain(validate_unique_field(lines,
field='cue_number',
scope='title'),
validate_non_empty_field(lines,
field='cue_number'),
validate_non_empty_field(lines,
field='character_id'),
validate_non_empty_field(lines,
field='title'),
validate_dependent_value(lines,
key_field='character_id',
dependent_field='character_name'),
validate_dependent_value(lines,
key_field='character_id',
dependent_field='actor_name')):
for warning in chain(
validate_unique_field(lines,
field='cue_number',
scope='title'),
validate_non_empty_field(lines,
field='cue_number'),
validate_non_empty_field(lines,
field='character_id'),
validate_non_empty_field(lines,
field='title'),
validate_dependent_value(lines,
key_field='character_id',
dependent_field='character_name'),
validate_dependent_value(lines,
key_field='character_id',
dependent_field='actor_name')):
print_warning(warning.report_message())