diff --git a/ptulsconv/__init__.py b/ptulsconv/__init__.py index f03fed9..2924f35 100644 --- a/ptulsconv/__init__.py +++ b/ptulsconv/__init__.py @@ -1,6 +1,4 @@ from ptulsconv.docparser.ptuls_grammar import protools_text_export_grammar -from ptulsconv.old_parser.ptuls_parser_visitor import DictionaryParserVisitor -from ptulsconv.old_parser.transformations import TimecodeInterpreter __version__ = '0.7.0' __author__ = 'Jamie Hardt' diff --git a/ptulsconv/commands.py b/ptulsconv/commands.py index 2cf8032..92b4585 100644 --- a/ptulsconv/commands.py +++ b/ptulsconv/commands.py @@ -32,26 +32,6 @@ class MyEncoder(JSONEncoder): return o.__dict__ -def dump_csv(events, output=sys.stdout): - keys = set() - for e in events: - keys.update(e.keys()) - - dump_keyed_csv(events, keys=keys, output=output) - - -def dump_keyed_csv(events, keys=(), output=sys.stdout): - writer = csv.writer(output, dialect='excel') - writer.writerow(keys) - - for event in events: - this_row = list() - for key in keys: - this_row.append(event.get(key, "")) - - writer.writerow(this_row) - - def dump_field_map(output=sys.stdout): from ptulsconv.docparser.tag_mapping import TagMapping from ptulsconv.docparser.adr_entity import ADRLine @@ -59,25 +39,36 @@ def dump_field_map(output=sys.stdout): TagMapping.print_rules(ADRLine, output=output) -def output_adr_csv(lines): - adr_keys = ('Title', 'Cue Number', 'Character Name', 'Reel', 'Version', 'Line', - 'Start', 'Finish', 'Reason', 'Note', 'TV', 'Version') - reels = set([ln['Reel'] for ln in lines]) - reels.add(None) - for n in [n['Character Number'] for n in lines]: +def output_adr_csv(lines: List[ADRLine], time_format: TimecodeFormat): + reels = set([ln.reel for ln in lines]) + + for n in [n.character_id for n in lines]: for reel in reels: - these_lines = [ln for ln in lines - if ln['Character Number'] == n and - ln.get('Reel', None) == reel] + these_lines = [ln for ln in lines if ln.character_id == n and ln.reel == reel] if len(these_lines) == 0: continue - outfile_name = "%s_%s_%s_%s.csv" % (these_lines[0]['Title'], - n, these_lines[0]['Character Name'], reel,) + outfile_name = "%s_%s_%s_%s.csv" % (these_lines[0].title, n, these_lines[0].character_name, reel,) with open(outfile_name, mode='w', newline='') as outfile: - dump_keyed_csv(these_lines, adr_keys, outfile) + writer = csv.writer(outfile, dialect='excel') + writer.writerow(['Title', 'Character Name', 'Cue Number', + 'Reel', 'Version', + 'Start', 'Finish', + 'Start Seconds', 'Finish Seconds', + 'Prompt', + 'Reason', 'Note', 'TV']) + + for event in these_lines: + this_row = [event.title, event.character_name, event.cue_number, + event.reel, event.version, + time_format.seconds_to_smpte(event.start), time_format.seconds_to_smpte(event.finish), + float(event.start), float(event.finish), + event.prompt, + event.reason, event.note, "TV" if event.tv else ""] + + writer.writerow(this_row) def output_avid_markers(lines): @@ -107,23 +98,23 @@ def create_adr_reports(lines: List[ADRLine], tc_display_format: TimecodeFormat): os.chdir("Director Logs") output_summary(lines, tc_display_format=tc_display_format, by_character=True) os.chdir("..") - # - # print_status_style("Creating CSV outputs") - # os.makedirs("CSV", exist_ok=True) - # os.chdir("CSV") - # output_adr_csv(lines) - # os.chdir("..") - # + + print_status_style("Creating CSV outputs") + os.makedirs("CSV", exist_ok=True) + os.chdir("CSV") + output_adr_csv(lines, time_format=tc_display_format) + os.chdir("..") + # print_status_style("Creating Avid Marker XML files") # os.makedirs("Avid Markers", exist_ok=True) # os.chdir("Avid Markers") # output_avid_markers(lines) # os.chdir("..") - # - # print_status_style("Creating Scripts directory and reports") - # os.makedirs("Talent Scripts", exist_ok=True) - # os.chdir("Talent Scripts") - # output_talent_sides(lines) + + print_status_style("Creating Scripts directory and reports") + os.makedirs("Talent Scripts", exist_ok=True) + os.chdir("Talent Scripts") + output_talent_sides(lines, tc_display_format=tc_display_format) def parse_text_export(file): diff --git a/ptulsconv/docparser/adr_entity.py b/ptulsconv/docparser/adr_entity.py index 2d41c7d..41704be 100644 --- a/ptulsconv/docparser/adr_entity.py +++ b/ptulsconv/docparser/adr_entity.py @@ -98,7 +98,11 @@ class ADRLine: self.optional = False @classmethod - def from_event(cls, event: Event) -> 'ADRLine': + def from_event(cls, event: Event) -> Optional['ADRLine']: + + if 'QN' not in event.tags: + return None + new = cls() TagMapping.apply_rules(cls.tag_mapping, event.tags, event.clip_name, event.track_name, event.session_name, new) diff --git a/ptulsconv/old_parser/__init__.py b/ptulsconv/old_parser/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/ptulsconv/old_parser/ptuls_parser_visitor.py b/ptulsconv/old_parser/ptuls_parser_visitor.py deleted file mode 100644 index 23c53c0..0000000 --- a/ptulsconv/old_parser/ptuls_parser_visitor.py +++ /dev/null @@ -1,152 +0,0 @@ -from parsimonious.nodes import NodeVisitor, Node - - -class DictionaryParserVisitor(NodeVisitor): - - def visit_document(self, node: Node, visited_children) -> dict: - files = next(iter(visited_children[1]), None) - clips = next(iter(visited_children[2]), None) - plugins = next(iter(visited_children[3]), None) - tracks = next(iter(visited_children[4]), None) - markers = next(iter(visited_children[5]), None) - - return dict(header=visited_children[0], - files=files, - clips=clips, - plugins=plugins, - tracks=tracks, - markers=markers) - - @staticmethod - def visit_header(node, visited_children): - - tc_drop = False - for _ in visited_children[20]: - tc_drop = True - - return dict(session_name=visited_children[2], - sample_rate=visited_children[6], - bit_depth=visited_children[10], - start_timecode=visited_children[15], - timecode_format=visited_children[19], - timecode_drop_frame=tc_drop, - count_audio_tracks=visited_children[25], - count_clips=visited_children[29], - count_files=visited_children[33]) - - @staticmethod - def visit_files_section(node, visited_children): - return list(map(lambda child: dict(filename=child[0], path=child[2]), visited_children[2])) - - @staticmethod - def visit_clips_section(node, visited_children): - channel = next(iter(visited_children[2][3]), 1) - - return list(map(lambda child: dict(clip_name=child[0], file=child[2], channel=channel), - visited_children[2])) - - @staticmethod - def visit_plugin_listing(node, visited_children): - return list(map(lambda child: dict(manufacturer=child[0], - plugin_name=child[2], - version=child[4], - format=child[6], - stems=child[8], - count_instances=child[10]), - visited_children[2])) - - @staticmethod - def visit_track_block(node, visited_children): - track_header, track_clip_list = visited_children - clips = [] - for clip in track_clip_list: - if clip[0] != None: - clips.append(clip[0]) - - plugins = [] - for plugin_opt in track_header[16]: - for plugin in plugin_opt[1]: - plugins.append(plugin[1]) - - return dict( - name=track_header[2], - comments=track_header[6], - user_delay_samples=track_header[10], - state=track_header[14], - plugins=plugins, - clips=clips - ) - - @staticmethod - def visit_track_listing(node, visited_children): - return visited_children[1] - - @staticmethod - def visit_track_clip_entry(node, visited_children): - timestamp = None - if isinstance(visited_children[14], list): - timestamp = visited_children[14][0][0] - - return dict(channel=visited_children[0], - event=visited_children[3], - clip_name=visited_children[6], - start_time=visited_children[8], - end_time=visited_children[10], - duration=visited_children[12], - timestamp=timestamp, - state=visited_children[15]) - - @staticmethod - def visit_track_state_list(node, visited_children): - states = [] - for next_state in visited_children: - states.append(next_state[0][0].text) - return states - - @staticmethod - def visit_track_clip_state(node, visited_children): - return node.text - - @staticmethod - def visit_markers_listing(node, visited_children): - markers = [] - - for marker in visited_children[2]: - markers.append(marker) - - return markers - - @staticmethod - def visit_marker_record(node, visited_children): - return dict(number=visited_children[0], - location=visited_children[3], - time_reference=visited_children[5], - units=visited_children[8], - name=visited_children[10], - comments=visited_children[12]) - - @staticmethod - def visit_formatted_clip_name(_, visited_children): - return visited_children[1].text - - @staticmethod - def visit_string_value(node, visited_children): - return node.text.strip(" ") - - @staticmethod - def visit_integer_value(node, visited_children): - return int(node.text) - - # def visit_timecode_value(self, node, visited_children): - # return node.text.strip(" ") - - @staticmethod - def visit_float_value(node, visited_children): - return float(node.text) - - def visit_block_ending(self, node, visited_children): - pass - - def generic_visit(self, node, visited_children): - """ The generic visit method. """ - return visited_children or node diff --git a/ptulsconv/old_parser/transformations.py b/ptulsconv/old_parser/transformations.py deleted file mode 100644 index 96ff0e3..0000000 --- a/ptulsconv/old_parser/transformations.py +++ /dev/null @@ -1,280 +0,0 @@ -from ptulsconv import broadcast_timecode -from ptulsconv.docparser.tagged_string_parser_visitor import TaggedStringResult, tag_grammar -from parsimonious.exceptions import IncompleteParseError -import math -import sys - -from ptulsconv.docparser.tagged_string_parser_visitor import TagListVisitor -from ptulsconv.reporting import print_advisory_tagging_error, print_section_header_style, print_status_style - -from tqdm import tqdm - -# fixme this whole file is a mess -class Transformation: - def transform(self, input_dict) -> dict: - return input_dict - - -class TimecodeInterpreter(Transformation): - - def __init__(self): - self.apply_session_start = False - - def transform(self, input_dict: dict) -> dict: - print_section_header_style('Converting Timecodes') - - retval = super().transform(input_dict) - rate = input_dict['header']['timecode_format'] - start_tc = self.convert_time(input_dict['header']['start_timecode'], rate, - drop_frame=input_dict['header']['timecode_drop_frame']) - - retval['header']['start_timecode_decoded'] = start_tc - print_status_style('Converted start timecode.') - - retval['tracks'] = self.convert_tracks(input_dict['tracks'], timecode_rate=rate, - drop_frame=retval['header']['timecode_drop_frame']) - - print_status_style('Converted clip timecodes for %i tracks.' % len(retval['tracks'])) - - for marker in retval['markers']: - marker['location_decoded'] = self.convert_time(marker['location'], rate, - drop_frame=retval['header']['timecode_drop_frame']) - - print_status_style('Converted %i markers.' % len(retval['markers'])) - - return retval - - def convert_tracks(self, tracks, timecode_rate, drop_frame): - for track in tracks: - new_clips = [] - for clip in track['clips']: - new_clips.append(self.convert_clip(clip, drop_frame=drop_frame, timecode_rate=timecode_rate)) - - track['clips'] = new_clips - - return tracks - - def convert_clip(self, clip, timecode_rate, drop_frame): - time_fields = ['start_time', 'end_time', 'duration', 'timestamp'] - - for time_field in time_fields: - if clip[time_field] is not None: - clip[time_field + "_decoded"] = self.convert_time(clip[time_field], drop_frame=drop_frame, - frame_rate=timecode_rate) - return clip - - def convert_time(self, time_string, frame_rate, drop_frame=False): - lfps = math.ceil(frame_rate) - - frame_count = broadcast_timecode.smpte_to_frame_count(time_string, lfps, drop_frame_hint=drop_frame) - - return dict(frame_count=frame_count, logical_fps=lfps, drop_frame=drop_frame) - - -class TagInterpreter(Transformation): - - def __init__(self, ignore_muted=True, show_progress=False, log_output=sys.stderr): - self.visitor = TagListVisitor() - self.ignore_muted = ignore_muted - self.show_progress = show_progress - self.log_output = log_output - - self.transformed = list() - self.timespan_rules = list() - self.movie_rules = list() - self.title_tags = None - self.markers = list() - - def transform(self, input_dict: dict) -> dict: - self.transformed = list() - self.timespan_rules = list() - self.movie_rules = list() - - print_section_header_style('Parsing Tags') - - self.title_tags = self.parse_tags(input_dict['header']['session_name']) - self.markers = sorted(input_dict['markers'], - key=lambda m: m['location_decoded']['frame_count']) - - if self.show_progress: - track_iter = tqdm(input_dict['tracks'], - desc="Reading tracks...", unit='Track') - else: - track_iter = input_dict['tracks'] - - for track in track_iter: - if 'Muted' in track['state'] and self.ignore_muted: - continue - - track_tags = self.parse_tags(track['name'], - parent_track_name=track['name']) - comment_tags = self.parse_tags(track['comments'], - parent_track_name=track['name']) - track_context_tags = track_tags.tag_dict - track_context_tags.update(comment_tags.tag_dict) - - for clip in track['clips']: - if clip['state'] == 'Muted' and self.ignore_muted: - continue - - clip_tags = self.parse_tags(clip['clip_name'], - parent_track_name=track['name'], - clip_time=clip['start_time']) - - if clip_tags.mode == 'Normal': - event = self.decorate_event(clip, clip_tags, input_dict['header'], - track_context_tags, track_tags) - self.transformed.append(event) - - elif clip_tags.mode == 'Append': - assert len(self.transformed) > 0, "First clip is in '&'-Append mode, fatal error." - - self.transformed[-1].update(clip_tags.tag_dict) - self.transformed[-1]['PT.Clip.Name'] = self.transformed[-1]['PT.Clip.Name'] + " " \ - + clip_tags.content - self.transformed[-1]['PT.Clip.Finish_Frames'] = clip['end_time_decoded']['frame_count'] - self.transformed[-1]['PT.Clip.Finish'] = clip['end_time'] - self.transformed[-1]['PT.Clip.Finish_Seconds'] = \ - clip['end_time_decoded']['frame_count'] / input_dict['header']['timecode_format'] - - elif clip_tags.mode == 'Timespan': - rule = {'start_time_literal': clip['start_time'], - 'start_time': clip['start_time_decoded']['frame_count'], - 'start_time_seconds': clip['start_time_decoded']['frame_count'] / input_dict['header'][ - 'timecode_format'], 'end_time': clip['end_time_decoded']['frame_count'], - 'tags': clip_tags.tag_dict} - self.timespan_rules.append(rule) - - elif clip_tags.mode == 'Movie': - rule = dict(movie_path=clip_tags.tag_dict['Movie'], - start_time=clip['start_time_decoded']['frame_count'], - end_time=clip['end_time_decoded']['frame_count']) - self.movie_rules.append(rule) - - print_status_style('Processed %i clips' % len(self.transformed)) - return dict(header=input_dict['header'], events=self.transformed) - - def decorate_event(self, clip, clip_tags, header_dict, track_context_tags, track_tags): - event = dict() - start_frame = clip['start_time_decoded']['frame_count'] - event.update(self.title_tags.tag_dict) - event.update(track_context_tags) - event.update(self.effective_timespan_tags_at_time(start_frame)) - event.update(self.effective_marker_tags_at_time(start_frame)) - event.update(self.effective_movie_at_time(start_frame, header_dict['timecode_format'])) - event.update(clip_tags.tag_dict) - event['PT.Track.Name'] = track_tags.content - event['PT.Session.Name'] = self.title_tags.content - event['PT.Session.TimecodeFormat'] = header_dict['timecode_format'] - event['PT.Session.Start'] = header_dict['start_timecode'] - event['PT.Session.DropFrame'] = header_dict['timecode_drop_frame'] - event['PT.Clip.Number'] = clip['event'] - event['PT.Clip.Name'] = clip_tags.content - event['PT.Clip.Start'] = clip['start_time'] - event['PT.Clip.Finish'] = clip['end_time'] - event['PT.Clip.Start_Frames'] = start_frame - event['PT.Clip.Finish_Frames'] = clip['end_time_decoded']['frame_count'] - event['PT.Clip.Start_Seconds'] = start_frame / header_dict['timecode_format'] - event['PT.Clip.Finish_Seconds'] = clip['end_time_decoded']['frame_count'] / header_dict['timecode_format'] - return event - - def effective_movie_at_time(self, time, timecode_format) -> dict: - retval = dict() - - for rule in reversed(self.movie_rules): - if rule['start_time'] <= time <= rule['end_time']: - retval['Movie.Filename'] = rule['movie_path'] - retval['Movie.Start_Offset_Frames'] = time - rule['start_time'] - retval['Movie.Start_Offset_Seconds'] = (time - rule['start_time']) / timecode_format - break - - return retval - - def effective_timespan_tags_at_time(self, time) -> dict: - retval = dict() - - for rule in self.timespan_rules: - if rule['start_time'] <= time <= rule['end_time']: - tag_keys = list(rule['tags'].keys()) - tag_times = dict() - for key in tag_keys: - key: str - time_value = rule['start_time'] - tag_times["Timespan." + key + ".Start_Frames"] = time_value - tag_times["Timespan." + key + ".Start"] = rule['start_time_literal'] - tag_times["Timespan." + key + ".Start_Seconds"] = rule['start_time_seconds'] - - retval.update(rule['tags']) - retval.update(tag_times) - - return retval - - def effective_marker_tags_at_time(self, time): - retval = dict() - - for marker in self.markers: - marker_name_tags = self.parse_tags(marker['name']) - marker_comment_tags = self.parse_tags(marker['comments']) - effective_tags = marker_name_tags.tag_dict - effective_tags.update(marker_comment_tags.tag_dict) - - if marker['location_decoded']['frame_count'] <= time: - retval.update(effective_tags) - else: - break - return retval - - def parse_tags(self, source, parent_track_name=None, clip_time=None) -> TaggedStringResult: - try: - parse_tree = tag_grammar.parse(source) - return self.visitor.visit(parse_tree) - except IncompleteParseError as e: - print_advisory_tagging_error(failed_string=source, - parent_track_name=parent_track_name, - clip_time=clip_time, position=e.pos) - - trimmed_source = source[:e.pos] - parse_tree = tag_grammar.parse(trimmed_source) - return self.visitor.visit(parse_tree) - - -class SelectReel(Transformation): - - def __init__(self, reel_num): - self.reel_num = reel_num - - def transform(self, input_dict) -> dict: - out_events = [] - for event in input_dict['events']: - if event['Reel'] == str(self.reel_num): - offset = event.get('Timespan.Reel.Start_Frames', 0) - offset_sec = event.get('Timespan.Reel.Start_Seconds', 0.) - event['PT.Clip.Start_Frames'] -= offset - event['PT.Clip.Finish_Frames'] -= offset - event['PT.Clip.Start_Seconds'] -= offset_sec - event['PT.Clip.Finish_Seconds'] -= offset_sec - out_events.append(event) - - return dict(header=input_dict['header'], events=out_events) - - -class SubclipOfSequence(Transformation): - - def __init__(self, start, end): - self.start = start - self.end = end - - def transform(self, input_dict: dict) -> dict: - out_events = [] - offset = 0 # self.start - offset_sec = 0. # self.start / input_dict['header']['timecode_format'] - for event in input_dict['events']: - if self.start <= event['PT.Clip.Start_Frames'] <= self.end: - e = event - e['PT.Clip.Start_Frames'] = event['PT.Clip.Start_Frames'] - offset - e['PT.Clip.Finish_Frames'] = event['PT.Clip.Finish_Frames'] - offset - e['PT.Clip.Start_Seconds'] = event['PT.Clip.Start_Seconds'] - offset_sec - e['PT.Clip.Finish_Seconds'] = event['PT.Clip.Finish_Seconds'] - offset_sec - out_events.append(e) - - return dict(header=input_dict['header'], events=out_events) diff --git a/ptulsconv/pdf/talent_sides.py b/ptulsconv/pdf/talent_sides.py index 23fbe2c..44884a6 100644 --- a/ptulsconv/pdf/talent_sides.py +++ b/ptulsconv/pdf/talent_sides.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from typing import List from .__init__ import make_doc_template from reportlab.lib.units import inch @@ -11,22 +12,25 @@ from reportlab.lib import colors from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont +from ..broadcast_timecode import TimecodeFormat +from ..docparser.adr_entity import ADRLine -def output_report(lines): - character_numbers = set([n['Character Number'] for n in lines]) + +def output_report(lines: List[ADRLine], tc_display_format: TimecodeFormat): + character_numbers = set([n.character_id for n in lines]) pdfmetrics.registerFont(TTFont('Futura', 'Futura.ttc')) for n in character_numbers: - char_lines = [line for line in lines - if 'Omit' not in line.keys() and line['Character Number'] == n] + char_lines = [line for line in lines if not line.omitted and line.character_id == n] + character_name = char_lines[0].character_name - sorted(char_lines, key=lambda line: line['PT.Clip.Start_Seconds']) + sorted(char_lines, key=lambda line: line.start) - title = "%s (%s) %s ADR Script" % (char_lines[0]['Title'], char_lines[0]['Character Name'], n) - filename = "%s_%s_%s_ADR Script.pdf" % (char_lines[0]['Title'], n, char_lines[0]['Character Name']) + title = "%s (%s) %s ADR Script" % (char_lines[0].title, character_name, n) + filename = "%s_%s_%s_ADR Script.pdf" % (char_lines[0].title, n, character_name) doc = make_doc_template(page_size=letter, filename=filename, document_title=title, - record=char_lines[0], document_header=char_lines[0]['Character Name']) + record=char_lines[0], document_header=character_name) story = [] @@ -47,9 +51,12 @@ def output_report(lines): number_style.rightIndent = 0. for line in char_lines: - data_block = [[Paragraph(line['Cue Number'], number_style), - Paragraph(line['PT.Clip.Start'] + " - " + line['PT.Clip.Finish'], number_style) + start_tc = tc_display_format.seconds_to_smpte(line.start) + finish_tc = tc_display_format.seconds_to_smpte(line.finish) + data_block = [[Paragraph(line.cue_number, number_style), + Paragraph(start_tc + " - " + finish_tc, number_style) ]] + # RIGHTWARDS ARROW → # Unicode: U+2192, UTF-8: E2 86 92 story.append( @@ -57,7 +64,7 @@ def output_report(lines): [HRFlowable(width='50%', color=colors.black), Table(data=data_block, colWidths=[1.5 * inch, 6. * inch], style=[('LEFTPADDING', (0, 0), (-1, -1), 0.)]), - Paragraph(line['Line'], prompt_style), + Paragraph(line.prompt, prompt_style), Spacer(1., inch * 1.5)] ) )