Removed old parser code

This commit is contained in:
Jamie Hardt
2021-06-03 20:31:12 -07:00
parent 55324a0f82
commit 784699050a
7 changed files with 58 additions and 490 deletions

View File

@@ -1,6 +1,4 @@
from ptulsconv.docparser.ptuls_grammar import protools_text_export_grammar from ptulsconv.docparser.ptuls_grammar import protools_text_export_grammar
from ptulsconv.old_parser.ptuls_parser_visitor import DictionaryParserVisitor
from ptulsconv.old_parser.transformations import TimecodeInterpreter
__version__ = '0.7.0' __version__ = '0.7.0'
__author__ = 'Jamie Hardt' __author__ = 'Jamie Hardt'

View File

@@ -32,26 +32,6 @@ class MyEncoder(JSONEncoder):
return o.__dict__ return o.__dict__
def dump_csv(events, output=sys.stdout):
keys = set()
for e in events:
keys.update(e.keys())
dump_keyed_csv(events, keys=keys, output=output)
def dump_keyed_csv(events, keys=(), output=sys.stdout):
writer = csv.writer(output, dialect='excel')
writer.writerow(keys)
for event in events:
this_row = list()
for key in keys:
this_row.append(event.get(key, ""))
writer.writerow(this_row)
def dump_field_map(output=sys.stdout): def dump_field_map(output=sys.stdout):
from ptulsconv.docparser.tag_mapping import TagMapping from ptulsconv.docparser.tag_mapping import TagMapping
from ptulsconv.docparser.adr_entity import ADRLine from ptulsconv.docparser.adr_entity import ADRLine
@@ -59,25 +39,36 @@ def dump_field_map(output=sys.stdout):
TagMapping.print_rules(ADRLine, output=output) TagMapping.print_rules(ADRLine, output=output)
def output_adr_csv(lines): def output_adr_csv(lines: List[ADRLine], time_format: TimecodeFormat):
adr_keys = ('Title', 'Cue Number', 'Character Name', 'Reel', 'Version', 'Line', reels = set([ln.reel for ln in lines])
'Start', 'Finish', 'Reason', 'Note', 'TV', 'Version')
reels = set([ln['Reel'] for ln in lines]) for n in [n.character_id for n in lines]:
reels.add(None)
for n in [n['Character Number'] for n in lines]:
for reel in reels: for reel in reels:
these_lines = [ln for ln in lines these_lines = [ln for ln in lines if ln.character_id == n and ln.reel == reel]
if ln['Character Number'] == n and
ln.get('Reel', None) == reel]
if len(these_lines) == 0: if len(these_lines) == 0:
continue continue
outfile_name = "%s_%s_%s_%s.csv" % (these_lines[0]['Title'], outfile_name = "%s_%s_%s_%s.csv" % (these_lines[0].title, n, these_lines[0].character_name, reel,)
n, these_lines[0]['Character Name'], reel,)
with open(outfile_name, mode='w', newline='') as outfile: with open(outfile_name, mode='w', newline='') as outfile:
dump_keyed_csv(these_lines, adr_keys, outfile) writer = csv.writer(outfile, dialect='excel')
writer.writerow(['Title', 'Character Name', 'Cue Number',
'Reel', 'Version',
'Start', 'Finish',
'Start Seconds', 'Finish Seconds',
'Prompt',
'Reason', 'Note', 'TV'])
for event in these_lines:
this_row = [event.title, event.character_name, event.cue_number,
event.reel, event.version,
time_format.seconds_to_smpte(event.start), time_format.seconds_to_smpte(event.finish),
float(event.start), float(event.finish),
event.prompt,
event.reason, event.note, "TV" if event.tv else ""]
writer.writerow(this_row)
def output_avid_markers(lines): def output_avid_markers(lines):
@@ -107,23 +98,23 @@ def create_adr_reports(lines: List[ADRLine], tc_display_format: TimecodeFormat):
os.chdir("Director Logs") os.chdir("Director Logs")
output_summary(lines, tc_display_format=tc_display_format, by_character=True) output_summary(lines, tc_display_format=tc_display_format, by_character=True)
os.chdir("..") os.chdir("..")
#
# print_status_style("Creating CSV outputs") print_status_style("Creating CSV outputs")
# os.makedirs("CSV", exist_ok=True) os.makedirs("CSV", exist_ok=True)
# os.chdir("CSV") os.chdir("CSV")
# output_adr_csv(lines) output_adr_csv(lines, time_format=tc_display_format)
# os.chdir("..") os.chdir("..")
#
# print_status_style("Creating Avid Marker XML files") # print_status_style("Creating Avid Marker XML files")
# os.makedirs("Avid Markers", exist_ok=True) # os.makedirs("Avid Markers", exist_ok=True)
# os.chdir("Avid Markers") # os.chdir("Avid Markers")
# output_avid_markers(lines) # output_avid_markers(lines)
# os.chdir("..") # os.chdir("..")
#
# print_status_style("Creating Scripts directory and reports") print_status_style("Creating Scripts directory and reports")
# os.makedirs("Talent Scripts", exist_ok=True) os.makedirs("Talent Scripts", exist_ok=True)
# os.chdir("Talent Scripts") os.chdir("Talent Scripts")
# output_talent_sides(lines) output_talent_sides(lines, tc_display_format=tc_display_format)
def parse_text_export(file): def parse_text_export(file):

View File

@@ -98,7 +98,11 @@ class ADRLine:
self.optional = False self.optional = False
@classmethod @classmethod
def from_event(cls, event: Event) -> 'ADRLine': def from_event(cls, event: Event) -> Optional['ADRLine']:
if 'QN' not in event.tags:
return None
new = cls() new = cls()
TagMapping.apply_rules(cls.tag_mapping, event.tags, TagMapping.apply_rules(cls.tag_mapping, event.tags,
event.clip_name, event.track_name, event.session_name, new) event.clip_name, event.track_name, event.session_name, new)

View File

@@ -1,152 +0,0 @@
from parsimonious.nodes import NodeVisitor, Node
class DictionaryParserVisitor(NodeVisitor):
def visit_document(self, node: Node, visited_children) -> dict:
files = next(iter(visited_children[1]), None)
clips = next(iter(visited_children[2]), None)
plugins = next(iter(visited_children[3]), None)
tracks = next(iter(visited_children[4]), None)
markers = next(iter(visited_children[5]), None)
return dict(header=visited_children[0],
files=files,
clips=clips,
plugins=plugins,
tracks=tracks,
markers=markers)
@staticmethod
def visit_header(node, visited_children):
tc_drop = False
for _ in visited_children[20]:
tc_drop = True
return dict(session_name=visited_children[2],
sample_rate=visited_children[6],
bit_depth=visited_children[10],
start_timecode=visited_children[15],
timecode_format=visited_children[19],
timecode_drop_frame=tc_drop,
count_audio_tracks=visited_children[25],
count_clips=visited_children[29],
count_files=visited_children[33])
@staticmethod
def visit_files_section(node, visited_children):
return list(map(lambda child: dict(filename=child[0], path=child[2]), visited_children[2]))
@staticmethod
def visit_clips_section(node, visited_children):
channel = next(iter(visited_children[2][3]), 1)
return list(map(lambda child: dict(clip_name=child[0], file=child[2], channel=channel),
visited_children[2]))
@staticmethod
def visit_plugin_listing(node, visited_children):
return list(map(lambda child: dict(manufacturer=child[0],
plugin_name=child[2],
version=child[4],
format=child[6],
stems=child[8],
count_instances=child[10]),
visited_children[2]))
@staticmethod
def visit_track_block(node, visited_children):
track_header, track_clip_list = visited_children
clips = []
for clip in track_clip_list:
if clip[0] != None:
clips.append(clip[0])
plugins = []
for plugin_opt in track_header[16]:
for plugin in plugin_opt[1]:
plugins.append(plugin[1])
return dict(
name=track_header[2],
comments=track_header[6],
user_delay_samples=track_header[10],
state=track_header[14],
plugins=plugins,
clips=clips
)
@staticmethod
def visit_track_listing(node, visited_children):
return visited_children[1]
@staticmethod
def visit_track_clip_entry(node, visited_children):
timestamp = None
if isinstance(visited_children[14], list):
timestamp = visited_children[14][0][0]
return dict(channel=visited_children[0],
event=visited_children[3],
clip_name=visited_children[6],
start_time=visited_children[8],
end_time=visited_children[10],
duration=visited_children[12],
timestamp=timestamp,
state=visited_children[15])
@staticmethod
def visit_track_state_list(node, visited_children):
states = []
for next_state in visited_children:
states.append(next_state[0][0].text)
return states
@staticmethod
def visit_track_clip_state(node, visited_children):
return node.text
@staticmethod
def visit_markers_listing(node, visited_children):
markers = []
for marker in visited_children[2]:
markers.append(marker)
return markers
@staticmethod
def visit_marker_record(node, visited_children):
return dict(number=visited_children[0],
location=visited_children[3],
time_reference=visited_children[5],
units=visited_children[8],
name=visited_children[10],
comments=visited_children[12])
@staticmethod
def visit_formatted_clip_name(_, visited_children):
return visited_children[1].text
@staticmethod
def visit_string_value(node, visited_children):
return node.text.strip(" ")
@staticmethod
def visit_integer_value(node, visited_children):
return int(node.text)
# def visit_timecode_value(self, node, visited_children):
# return node.text.strip(" ")
@staticmethod
def visit_float_value(node, visited_children):
return float(node.text)
def visit_block_ending(self, node, visited_children):
pass
def generic_visit(self, node, visited_children):
""" The generic visit method. """
return visited_children or node

View File

@@ -1,280 +0,0 @@
from ptulsconv import broadcast_timecode
from ptulsconv.docparser.tagged_string_parser_visitor import TaggedStringResult, tag_grammar
from parsimonious.exceptions import IncompleteParseError
import math
import sys
from ptulsconv.docparser.tagged_string_parser_visitor import TagListVisitor
from ptulsconv.reporting import print_advisory_tagging_error, print_section_header_style, print_status_style
from tqdm import tqdm
# fixme this whole file is a mess
class Transformation:
def transform(self, input_dict) -> dict:
return input_dict
class TimecodeInterpreter(Transformation):
def __init__(self):
self.apply_session_start = False
def transform(self, input_dict: dict) -> dict:
print_section_header_style('Converting Timecodes')
retval = super().transform(input_dict)
rate = input_dict['header']['timecode_format']
start_tc = self.convert_time(input_dict['header']['start_timecode'], rate,
drop_frame=input_dict['header']['timecode_drop_frame'])
retval['header']['start_timecode_decoded'] = start_tc
print_status_style('Converted start timecode.')
retval['tracks'] = self.convert_tracks(input_dict['tracks'], timecode_rate=rate,
drop_frame=retval['header']['timecode_drop_frame'])
print_status_style('Converted clip timecodes for %i tracks.' % len(retval['tracks']))
for marker in retval['markers']:
marker['location_decoded'] = self.convert_time(marker['location'], rate,
drop_frame=retval['header']['timecode_drop_frame'])
print_status_style('Converted %i markers.' % len(retval['markers']))
return retval
def convert_tracks(self, tracks, timecode_rate, drop_frame):
for track in tracks:
new_clips = []
for clip in track['clips']:
new_clips.append(self.convert_clip(clip, drop_frame=drop_frame, timecode_rate=timecode_rate))
track['clips'] = new_clips
return tracks
def convert_clip(self, clip, timecode_rate, drop_frame):
time_fields = ['start_time', 'end_time', 'duration', 'timestamp']
for time_field in time_fields:
if clip[time_field] is not None:
clip[time_field + "_decoded"] = self.convert_time(clip[time_field], drop_frame=drop_frame,
frame_rate=timecode_rate)
return clip
def convert_time(self, time_string, frame_rate, drop_frame=False):
lfps = math.ceil(frame_rate)
frame_count = broadcast_timecode.smpte_to_frame_count(time_string, lfps, drop_frame_hint=drop_frame)
return dict(frame_count=frame_count, logical_fps=lfps, drop_frame=drop_frame)
class TagInterpreter(Transformation):
def __init__(self, ignore_muted=True, show_progress=False, log_output=sys.stderr):
self.visitor = TagListVisitor()
self.ignore_muted = ignore_muted
self.show_progress = show_progress
self.log_output = log_output
self.transformed = list()
self.timespan_rules = list()
self.movie_rules = list()
self.title_tags = None
self.markers = list()
def transform(self, input_dict: dict) -> dict:
self.transformed = list()
self.timespan_rules = list()
self.movie_rules = list()
print_section_header_style('Parsing Tags')
self.title_tags = self.parse_tags(input_dict['header']['session_name'])
self.markers = sorted(input_dict['markers'],
key=lambda m: m['location_decoded']['frame_count'])
if self.show_progress:
track_iter = tqdm(input_dict['tracks'],
desc="Reading tracks...", unit='Track')
else:
track_iter = input_dict['tracks']
for track in track_iter:
if 'Muted' in track['state'] and self.ignore_muted:
continue
track_tags = self.parse_tags(track['name'],
parent_track_name=track['name'])
comment_tags = self.parse_tags(track['comments'],
parent_track_name=track['name'])
track_context_tags = track_tags.tag_dict
track_context_tags.update(comment_tags.tag_dict)
for clip in track['clips']:
if clip['state'] == 'Muted' and self.ignore_muted:
continue
clip_tags = self.parse_tags(clip['clip_name'],
parent_track_name=track['name'],
clip_time=clip['start_time'])
if clip_tags.mode == 'Normal':
event = self.decorate_event(clip, clip_tags, input_dict['header'],
track_context_tags, track_tags)
self.transformed.append(event)
elif clip_tags.mode == 'Append':
assert len(self.transformed) > 0, "First clip is in '&'-Append mode, fatal error."
self.transformed[-1].update(clip_tags.tag_dict)
self.transformed[-1]['PT.Clip.Name'] = self.transformed[-1]['PT.Clip.Name'] + " " \
+ clip_tags.content
self.transformed[-1]['PT.Clip.Finish_Frames'] = clip['end_time_decoded']['frame_count']
self.transformed[-1]['PT.Clip.Finish'] = clip['end_time']
self.transformed[-1]['PT.Clip.Finish_Seconds'] = \
clip['end_time_decoded']['frame_count'] / input_dict['header']['timecode_format']
elif clip_tags.mode == 'Timespan':
rule = {'start_time_literal': clip['start_time'],
'start_time': clip['start_time_decoded']['frame_count'],
'start_time_seconds': clip['start_time_decoded']['frame_count'] / input_dict['header'][
'timecode_format'], 'end_time': clip['end_time_decoded']['frame_count'],
'tags': clip_tags.tag_dict}
self.timespan_rules.append(rule)
elif clip_tags.mode == 'Movie':
rule = dict(movie_path=clip_tags.tag_dict['Movie'],
start_time=clip['start_time_decoded']['frame_count'],
end_time=clip['end_time_decoded']['frame_count'])
self.movie_rules.append(rule)
print_status_style('Processed %i clips' % len(self.transformed))
return dict(header=input_dict['header'], events=self.transformed)
def decorate_event(self, clip, clip_tags, header_dict, track_context_tags, track_tags):
event = dict()
start_frame = clip['start_time_decoded']['frame_count']
event.update(self.title_tags.tag_dict)
event.update(track_context_tags)
event.update(self.effective_timespan_tags_at_time(start_frame))
event.update(self.effective_marker_tags_at_time(start_frame))
event.update(self.effective_movie_at_time(start_frame, header_dict['timecode_format']))
event.update(clip_tags.tag_dict)
event['PT.Track.Name'] = track_tags.content
event['PT.Session.Name'] = self.title_tags.content
event['PT.Session.TimecodeFormat'] = header_dict['timecode_format']
event['PT.Session.Start'] = header_dict['start_timecode']
event['PT.Session.DropFrame'] = header_dict['timecode_drop_frame']
event['PT.Clip.Number'] = clip['event']
event['PT.Clip.Name'] = clip_tags.content
event['PT.Clip.Start'] = clip['start_time']
event['PT.Clip.Finish'] = clip['end_time']
event['PT.Clip.Start_Frames'] = start_frame
event['PT.Clip.Finish_Frames'] = clip['end_time_decoded']['frame_count']
event['PT.Clip.Start_Seconds'] = start_frame / header_dict['timecode_format']
event['PT.Clip.Finish_Seconds'] = clip['end_time_decoded']['frame_count'] / header_dict['timecode_format']
return event
def effective_movie_at_time(self, time, timecode_format) -> dict:
retval = dict()
for rule in reversed(self.movie_rules):
if rule['start_time'] <= time <= rule['end_time']:
retval['Movie.Filename'] = rule['movie_path']
retval['Movie.Start_Offset_Frames'] = time - rule['start_time']
retval['Movie.Start_Offset_Seconds'] = (time - rule['start_time']) / timecode_format
break
return retval
def effective_timespan_tags_at_time(self, time) -> dict:
retval = dict()
for rule in self.timespan_rules:
if rule['start_time'] <= time <= rule['end_time']:
tag_keys = list(rule['tags'].keys())
tag_times = dict()
for key in tag_keys:
key: str
time_value = rule['start_time']
tag_times["Timespan." + key + ".Start_Frames"] = time_value
tag_times["Timespan." + key + ".Start"] = rule['start_time_literal']
tag_times["Timespan." + key + ".Start_Seconds"] = rule['start_time_seconds']
retval.update(rule['tags'])
retval.update(tag_times)
return retval
def effective_marker_tags_at_time(self, time):
retval = dict()
for marker in self.markers:
marker_name_tags = self.parse_tags(marker['name'])
marker_comment_tags = self.parse_tags(marker['comments'])
effective_tags = marker_name_tags.tag_dict
effective_tags.update(marker_comment_tags.tag_dict)
if marker['location_decoded']['frame_count'] <= time:
retval.update(effective_tags)
else:
break
return retval
def parse_tags(self, source, parent_track_name=None, clip_time=None) -> TaggedStringResult:
try:
parse_tree = tag_grammar.parse(source)
return self.visitor.visit(parse_tree)
except IncompleteParseError as e:
print_advisory_tagging_error(failed_string=source,
parent_track_name=parent_track_name,
clip_time=clip_time, position=e.pos)
trimmed_source = source[:e.pos]
parse_tree = tag_grammar.parse(trimmed_source)
return self.visitor.visit(parse_tree)
class SelectReel(Transformation):
def __init__(self, reel_num):
self.reel_num = reel_num
def transform(self, input_dict) -> dict:
out_events = []
for event in input_dict['events']:
if event['Reel'] == str(self.reel_num):
offset = event.get('Timespan.Reel.Start_Frames', 0)
offset_sec = event.get('Timespan.Reel.Start_Seconds', 0.)
event['PT.Clip.Start_Frames'] -= offset
event['PT.Clip.Finish_Frames'] -= offset
event['PT.Clip.Start_Seconds'] -= offset_sec
event['PT.Clip.Finish_Seconds'] -= offset_sec
out_events.append(event)
return dict(header=input_dict['header'], events=out_events)
class SubclipOfSequence(Transformation):
def __init__(self, start, end):
self.start = start
self.end = end
def transform(self, input_dict: dict) -> dict:
out_events = []
offset = 0 # self.start
offset_sec = 0. # self.start / input_dict['header']['timecode_format']
for event in input_dict['events']:
if self.start <= event['PT.Clip.Start_Frames'] <= self.end:
e = event
e['PT.Clip.Start_Frames'] = event['PT.Clip.Start_Frames'] - offset
e['PT.Clip.Finish_Frames'] = event['PT.Clip.Finish_Frames'] - offset
e['PT.Clip.Start_Seconds'] = event['PT.Clip.Start_Seconds'] - offset_sec
e['PT.Clip.Finish_Seconds'] = event['PT.Clip.Finish_Seconds'] - offset_sec
out_events.append(e)
return dict(header=input_dict['header'], events=out_events)

View File

@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from typing import List
from .__init__ import make_doc_template from .__init__ import make_doc_template
from reportlab.lib.units import inch from reportlab.lib.units import inch
@@ -11,22 +12,25 @@ from reportlab.lib import colors
from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont from reportlab.pdfbase.ttfonts import TTFont
from ..broadcast_timecode import TimecodeFormat
from ..docparser.adr_entity import ADRLine
def output_report(lines):
character_numbers = set([n['Character Number'] for n in lines]) def output_report(lines: List[ADRLine], tc_display_format: TimecodeFormat):
character_numbers = set([n.character_id for n in lines])
pdfmetrics.registerFont(TTFont('Futura', 'Futura.ttc')) pdfmetrics.registerFont(TTFont('Futura', 'Futura.ttc'))
for n in character_numbers: for n in character_numbers:
char_lines = [line for line in lines char_lines = [line for line in lines if not line.omitted and line.character_id == n]
if 'Omit' not in line.keys() and line['Character Number'] == n] character_name = char_lines[0].character_name
sorted(char_lines, key=lambda line: line['PT.Clip.Start_Seconds']) sorted(char_lines, key=lambda line: line.start)
title = "%s (%s) %s ADR Script" % (char_lines[0]['Title'], char_lines[0]['Character Name'], n) title = "%s (%s) %s ADR Script" % (char_lines[0].title, character_name, n)
filename = "%s_%s_%s_ADR Script.pdf" % (char_lines[0]['Title'], n, char_lines[0]['Character Name']) filename = "%s_%s_%s_ADR Script.pdf" % (char_lines[0].title, n, character_name)
doc = make_doc_template(page_size=letter, filename=filename, document_title=title, doc = make_doc_template(page_size=letter, filename=filename, document_title=title,
record=char_lines[0], document_header=char_lines[0]['Character Name']) record=char_lines[0], document_header=character_name)
story = [] story = []
@@ -47,9 +51,12 @@ def output_report(lines):
number_style.rightIndent = 0. number_style.rightIndent = 0.
for line in char_lines: for line in char_lines:
data_block = [[Paragraph(line['Cue Number'], number_style), start_tc = tc_display_format.seconds_to_smpte(line.start)
Paragraph(line['PT.Clip.Start'] + " - " + line['PT.Clip.Finish'], number_style) finish_tc = tc_display_format.seconds_to_smpte(line.finish)
data_block = [[Paragraph(line.cue_number, number_style),
Paragraph(start_tc + " - " + finish_tc, number_style)
]] ]]
# RIGHTWARDS ARROW → # RIGHTWARDS ARROW →
# Unicode: U+2192, UTF-8: E2 86 92 # Unicode: U+2192, UTF-8: E2 86 92
story.append( story.append(
@@ -57,7 +64,7 @@ def output_report(lines):
[HRFlowable(width='50%', color=colors.black), [HRFlowable(width='50%', color=colors.black),
Table(data=data_block, colWidths=[1.5 * inch, 6. * inch], Table(data=data_block, colWidths=[1.5 * inch, 6. * inch],
style=[('LEFTPADDING', (0, 0), (-1, -1), 0.)]), style=[('LEFTPADDING', (0, 0), (-1, -1), 0.)]),
Paragraph(line['Line'], prompt_style), Paragraph(line.prompt, prompt_style),
Spacer(1., inch * 1.5)] Spacer(1., inch * 1.5)]
) )
) )