diff --git a/ptulsconv/__init__.py b/ptulsconv/__init__.py index 36aa627..82d3f3b 100644 --- a/ptulsconv/__init__.py +++ b/ptulsconv/__init__.py @@ -3,6 +3,7 @@ Parse and convert Pro Tools text exports """ __version__ = '2.0.0' -__author__ = 'Jamie Hardt' +__author__ = 'Jamie Hardt' __license__ = 'MIT' -__copyright__ = "%s %s (c) 2023 %s. All rights reserved." % (__name__, __version__, __author__) +__copyright__ = "%s %s (c) 2023 %s. All rights reserved." \ + % (__name__, __version__, __author__) diff --git a/ptulsconv/__main__.py b/ptulsconv/__main__.py index 6c9657f..1158abf 100644 --- a/ptulsconv/__main__.py +++ b/ptulsconv/__main__.py @@ -2,15 +2,17 @@ from optparse import OptionParser, OptionGroup import datetime import sys -from ptulsconv import __name__, __version__, __author__, __copyright__ +from ptulsconv import __name__, __copyright__ from ptulsconv.commands import convert -from ptulsconv.reporting import print_status_style, print_banner_style, print_section_header_style, print_fatal_error +from ptulsconv.reporting import print_status_style, \ + print_banner_style, print_section_header_style, \ + print_fatal_error def dump_field_map(output=sys.stdout): from ptulsconv.docparser.tag_mapping import TagMapping from ptulsconv.docparser.adr_entity import ADRLine, GenericEvent - + TagMapping.print_rules(GenericEvent, output=output) TagMapping.print_rules(ADRLine, output=output) @@ -19,12 +21,12 @@ def dump_formats(): print_section_header_style("`raw` format:") sys.stderr.write("A JSON document of the parsed Pro Tools export.\n") print_section_header_style("`tagged` Format:") - sys.stderr.write("A JSON document containing one record for each clip, with\n" - "all tags parsed and all tagging rules applied. \n") + sys.stderr.write( + "A JSON document containing one record for each clip, with\n" + "all tags parsed and all tagging rules applied. \n") print_section_header_style("`doc` format:") sys.stderr.write("Creates a directory with folders for different types\n" - "of ADR reports.\n\n") - + "of ADR reports.\n\n") def main(): @@ -45,38 +47,43 @@ def main(): warn_options.add_option('-W', action='store_false', dest='warnings', default=True, - help='Suppress warnings for common errors (missing code numbers etc.)') + help='Suppress warnings for common ' + 'errors (missing code numbers etc.)') parser.add_option_group(warn_options) informational_options = OptionGroup(title="Informational Options", parser=parser, - description='Print useful information and exit without processing ' - 'input files.') + description='Print useful ' + 'information ' + 'and exit without processing ' + 'input files.') - informational_options.add_option('--show-formats', - dest='show_formats', - action='store_true', - default=False, - help='Display helpful information about the ' - 'available output formats.') + informational_options.add_option( + '--show-formats', + dest='show_formats', + action='store_true', + default=False, + help='Display helpful information about the available ' + 'output formats.') - informational_options.add_option('--show-available-tags', - dest='show_tags', - action='store_true', - default=False, - help='Display tag mappings for the FMP XML ' - 'output style and exit.') + informational_options.add_option( + '--show-available-tags', + dest='show_tags', + action='store_true', + default=False, + help='Display tag mappings for the FMP XML output style ' + 'and exit.') parser.add_option_group(informational_options) print_banner_style(__copyright__) - + (options, args) = parser.parse_args(sys.argv) - print_section_header_style("Startup") - print_status_style("This run started %s" % (datetime.datetime.now().isoformat())) + print_status_style("This run started %s" % + (datetime.datetime.now().isoformat())) if options.show_tags: dump_field_map() @@ -87,14 +94,16 @@ def main(): sys.exit(0) try: major_mode = options.output_format - + if len(args) < 2: - print_status_style("No input file provided, will connect to Pro Tools with PTSL...") - convert(major_mode=major_mode, + print_status_style( + "No input file provided, will connect to Pro Tools " + "with PTSL...") + convert(major_mode=major_mode, warnings=options.warnings) else: - convert(input_file=args[1], - major_mode=major_mode, + convert(input_file=args[1], + major_mode=major_mode, warnings=options.warnings) except FileNotFoundError as e: diff --git a/ptulsconv/broadcast_timecode.py b/ptulsconv/broadcast_timecode.py index b548a62..0c423eb 100644 --- a/ptulsconv/broadcast_timecode.py +++ b/ptulsconv/broadcast_timecode.py @@ -9,13 +9,15 @@ from fractions import Fraction from typing import Optional, SupportsFloat -class TimecodeFormat(namedtuple("_TimecodeFormat", "frame_duration logical_fps drop_frame")): +class TimecodeFormat(namedtuple("_TimecodeFormat", + "frame_duration logical_fps drop_frame")): """ A struct reperesenting a timecode datum. """ - + def smpte_to_seconds(self, smpte: str) -> Optional[Fraction]: - frame_count = smpte_to_frame_count(smpte, self.logical_fps, drop_frame_hint=self.drop_frame) + frame_count = smpte_to_frame_count( + smpte, self.logical_fps, drop_frame_hint=self.drop_frame) if frame_count is None: return None else: @@ -23,29 +25,34 @@ class TimecodeFormat(namedtuple("_TimecodeFormat", "frame_duration logical_fps d def seconds_to_smpte(self, seconds: SupportsFloat) -> str: frame_count = int(seconds / self.frame_duration) - return frame_count_to_smpte(frame_count, self.logical_fps, self.drop_frame) + return frame_count_to_smpte(frame_count, self.logical_fps, + self.drop_frame) -def smpte_to_frame_count(smpte_rep_string: str, frames_per_logical_second: int, drop_frame_hint=False) -> Optional[int]: +def smpte_to_frame_count(smpte_rep_string: str, frames_per_logical_second: int, + drop_frame_hint=False) -> Optional[int]: """ Convert a string with a SMPTE timecode representation into a frame count. :param smpte_rep_string: The timecode string - :param frames_per_logical_second: Num of frames in a logical second. This is asserted to be - in one of `[24,25,30,48,50,60]` - :param drop_frame_hint: `True` if the timecode rep is drop frame. This is ignored (and implied `True`) if - the last separator in the timecode string is a semicolon. This is ignored (and implied `False`) if - `frames_per_logical_second` is not 30 or 60. + :param frames_per_logical_second: Num of frames in a logical second. This + is asserted to be in one of `[24,25,30,48,50,60]` + :param drop_frame_hint: `True` if the timecode rep is drop frame. This is + ignored (and implied `True`) if the last separator in the timecode + string is a semicolon. This is ignored (and implied `False`) if + `frames_per_logical_second` is not 30 or 60. """ assert frames_per_logical_second in [24, 25, 30, 48, 50, 60] - m = re.search(r'(\d?\d)[:;](\d\d)[:;](\d\d)([:;])(\d\d)(\.\d+)?', smpte_rep_string) - + m = re.search( + r'(\d?\d)[:;](\d\d)[:;](\d\d)([:;])(\d\d)(\.\d+)?', smpte_rep_string) + if m is None: return None - + hh, mm, ss, sep, ff, frac = m.groups() - hh, mm, ss, ff, frac = int(hh), int(mm), int(ss), int(ff), float(frac or 0.0) + hh, mm, ss, ff, frac = int(hh), int( + mm), int(ss), int(ff), float(frac or 0.0) drop_frame = drop_frame_hint if sep == ";": @@ -54,8 +61,8 @@ def smpte_to_frame_count(smpte_rep_string: str, frames_per_logical_second: int, if frames_per_logical_second not in [30, 60]: drop_frame = False - raw_frames = hh * 3600 * frames_per_logical_second + mm * 60 * frames_per_logical_second + \ - ss * frames_per_logical_second + ff + raw_frames = hh * 3600 * frames_per_logical_second + mm * 60 * \ + frames_per_logical_second + ss * frames_per_logical_second + ff frames = raw_frames if drop_frame is True: @@ -68,7 +75,8 @@ def smpte_to_frame_count(smpte_rep_string: str, frames_per_logical_second: int, return frames -def frame_count_to_smpte(frame_count: int, frames_per_logical_second: int, drop_frame: bool = False, +def frame_count_to_smpte(frame_count: int, frames_per_logical_second: int, + drop_frame: bool = False, fractional_frame: Optional[float] = None) -> str: assert frames_per_logical_second in [24, 25, 30, 48, 50, 60] assert fractional_frame is None or fractional_frame < 1.0 @@ -90,7 +98,8 @@ def frame_count_to_smpte(frame_count: int, frames_per_logical_second: int, drop_ hh = hh % 24 if fractional_frame is not None and fractional_frame > 0: - return "%02i:%02i:%02i%s%02i%s" % (hh, mm, ss, separator, ff, ("%.3f" % fractional_frame)[1:]) + return "%02i:%02i:%02i%s%02i%s" % (hh, mm, ss, separator, ff, + ("%.3f" % fractional_frame)[1:]) else: return "%02i:%02i:%02i%s%02i" % (hh, mm, ss, separator, ff) diff --git a/ptulsconv/commands.py b/ptulsconv/commands.py index 50cdb2a..1bb7dc3 100644 --- a/ptulsconv/commands.py +++ b/ptulsconv/commands.py @@ -8,19 +8,20 @@ import os import sys from itertools import chain import csv -from typing import List +from typing import List, Optional, Iterator from fractions import Fraction import ptsl -from .docparser.adr_entity import make_entities -from .reporting import print_section_header_style, print_status_style, print_warning -from .validations import * +from .docparser.adr_entity import make_entities, ADRLine +from .reporting import print_section_header_style, print_status_style,\ + print_warning +from .validations import validate_unique_field, validate_non_empty_field,\ + validate_dependent_value from ptulsconv.docparser import parse_document from ptulsconv.docparser.tag_compiler import TagCompiler from ptulsconv.broadcast_timecode import TimecodeFormat -from fractions import Fraction from ptulsconv.pdf.supervisor_1pg import output_report as output_supervisor_1pg from ptulsconv.pdf.line_count import output_report as output_line_count @@ -40,7 +41,7 @@ class MyEncoder(JSONEncoder): def default(self, o): """ - + """ if isinstance(o, Fraction): return dict(numerator=o.numerator, denominator=o.denominator) @@ -50,9 +51,9 @@ class MyEncoder(JSONEncoder): def output_adr_csv(lines: List[ADRLine], time_format: TimecodeFormat): """ - Writes ADR lines as CSV to the current working directory. Creates directories - for each character number and name pair, and within that directory, creates - a CSV file for each reel. + Writes ADR lines as CSV to the current working directory. Creates + directories for each character number and name pair, and within that + directory, creates a CSV file for each reel. """ reels = set([ln.reel for ln in lines]) @@ -61,12 +62,15 @@ def output_adr_csv(lines: List[ADRLine], time_format: TimecodeFormat): os.makedirs(dir_name, exist_ok=True) os.chdir(dir_name) for reel in reels: - these_lines = [ln for ln in lines if ln.character_id == n and ln.reel == reel] + these_lines = [ln for ln in lines + if ln.character_id == n and ln.reel == reel] if len(these_lines) == 0: continue - outfile_name = "%s_%s_%s_%s.csv" % (these_lines[0].title, n, these_lines[0].character_name, reel,) + outfile_name = "%s_%s_%s_%s.csv" % (these_lines[0].title, n, + these_lines[0].character_name, + reel,) with open(outfile_name, mode='w', newline='') as outfile: writer = csv.writer(outfile, dialect='excel') @@ -80,18 +84,21 @@ def output_adr_csv(lines: List[ADRLine], time_format: TimecodeFormat): for event in these_lines: this_start = event.start or 0 this_finish = event.finish or 0 - this_row = [event.title, event.character_name, event.cue_number, - event.reel, event.version, - time_format.seconds_to_smpte(this_start), time_format.seconds_to_smpte(this_finish), + this_row = [event.title, event.character_name, + event.cue_number, event.reel, event.version, + time_format.seconds_to_smpte(this_start), + time_format.seconds_to_smpte(this_finish), float(this_start), float(this_finish), event.prompt, - event.reason, event.note, "TV" if event.tv else ""] + event.reason, event.note, "TV" + if event.tv else ""] writer.writerow(this_row) os.chdir("..") -def generate_documents(session_tc_format, scenes, adr_lines: Iterator[ADRLine], title): +def generate_documents(session_tc_format, scenes, adr_lines: Iterator[ADRLine], + title): """ Create PDF output. """ @@ -105,27 +112,27 @@ def generate_documents(session_tc_format, scenes, adr_lines: Iterator[ADRLine], supervisor = next((x.supervisor for x in adr_lines), "") output_continuity(scenes=scenes, tc_display_format=session_tc_format, - title=title, client=client, supervisor=supervisor) + title=title, client=client, + supervisor=supervisor) - # reels = sorted([r for r in compiler.compile_all_time_spans() if r[0] == 'Reel'], - # key=lambda x: x[2]) reels = ['R1', 'R2', 'R3', 'R4', 'R5', 'R6'] if len(adr_lines) == 0: - print_status_style("No ADR lines were found in the " - "input document. ADR reports will not be generated.") + print_status_style("No ADR lines were found in the input document. " + "ADR reports will not be generated.") else: - create_adr_reports(adr_lines, tc_display_format=session_tc_format, - reel_list=sorted(reels)) + create_adr_reports(adr_lines, tc_display_format=session_tc_format, + reel_list=sorted(reels)) -def create_adr_reports(lines: List[ADRLine], tc_display_format: TimecodeFormat, reel_list: List[str]): +def create_adr_reports(lines: List[ADRLine], tc_display_format: TimecodeFormat, + reel_list: List[str]): """ - Creates a directory heirarchy and a respective set of ADR reports, + Creates a directory heirarchy and a respective set of ADR reports, given a list of lines. """ - + print_status_style("Creating ADR Report") output_summary(lines, tc_display_format=tc_display_format) @@ -141,7 +148,8 @@ def create_adr_reports(lines: List[ADRLine], tc_display_format: TimecodeFormat, print_status_style("Creating Director's Logs director and reports") os.makedirs("Director Logs", exist_ok=True) os.chdir("Director Logs") - output_summary(lines, tc_display_format=tc_display_format, by_character=True) + output_summary(lines, tc_display_format=tc_display_format, + by_character=True) os.chdir("..") print_status_style("Creating CSV outputs") @@ -156,7 +164,7 @@ def create_adr_reports(lines: List[ADRLine], tc_display_format: TimecodeFormat, output_talent_sides(lines, tc_display_format=tc_display_format) -def convert(major_mode, input_file = None, output=sys.stdout, warnings=True): +def convert(major_mode, input_file=None, output=sys.stdout, warnings=True): """ Primary worker function, accepts the input file and decides what to do with it based on the `major_mode`. @@ -170,7 +178,7 @@ def convert(major_mode, input_file = None, output=sys.stdout, warnings=True): session_text = file.read() else: with ptsl.open_engine( - company_name="The ptulsconv developers", + company_name="The ptulsconv developers", application_name="ptulsconv") as engine: req = engine.export_session_as_text() req.utf8_encoding() @@ -179,8 +187,8 @@ def convert(major_mode, input_file = None, output=sys.stdout, warnings=True): req.time_type("tc") req.dont_show_crossfades() req.selected_tracks_only() - session_text = req.export_string() - + session_text = req.export_string + session = parse_document(session_text) session_tc_format = session.header.timecode_format @@ -198,44 +206,49 @@ def convert(major_mode, input_file = None, output=sys.stdout, warnings=True): elif major_mode == 'doc': generic_events, adr_lines = make_entities(compiled_events) - scenes = sorted([s for s in compiler.compile_all_time_spans() if s[0] == 'Sc'], + scenes = sorted([s for s in compiler.compile_all_time_spans() + if s[0] == 'Sc'], key=lambda x: x[2]) # TODO: Breakdown by titles titles = set([x.title for x in (generic_events + adr_lines)]) if len(titles) != 1: print_warning("Multiple titles per export is not supported, " - "found multiple titles: %s Exiting." % titles) + "found multiple titles: %s Exiting." % titles) exit(-1) title = list(titles)[0] - print_status_style("%i generic events found." % len(generic_events)) + print_status_style( + "%i generic events found." % len(generic_events) + ) print_status_style("%i ADR events found." % len(adr_lines)) if warnings: perform_adr_validations(adr_lines) generate_documents(session_tc_format, scenes, adr_lines, title) - -def perform_adr_validations(lines : Iterator[ADRLine]): + +def perform_adr_validations(lines: Iterator[ADRLine]): """ Performs validations on the input. """ - for warning in chain(validate_unique_field(lines, - field='cue_number', - scope='title'), - validate_non_empty_field(lines, - field='cue_number'), - validate_non_empty_field(lines, - field='character_id'), - validate_non_empty_field(lines, - field='title'), - validate_dependent_value(lines, - key_field='character_id', - dependent_field='character_name'), - validate_dependent_value(lines, - key_field='character_id', - dependent_field='actor_name')): + for warning in chain( + validate_unique_field(lines, + field='cue_number', + scope='title'), + validate_non_empty_field(lines, + field='cue_number'), + validate_non_empty_field(lines, + field='character_id'), + validate_non_empty_field(lines, + field='title'), + validate_dependent_value(lines, + key_field='character_id', + dependent_field='character_name'), + validate_dependent_value(lines, + key_field='character_id', + dependent_field='actor_name')): + print_warning(warning.report_message())