more refactoring for new docparser

This commit is contained in:
Jamie Hardt
2021-06-02 15:40:06 -07:00
parent 24c5a87358
commit caf4317b76
10 changed files with 138 additions and 193 deletions

View File

@@ -1,6 +1,6 @@
from ptulsconv.docparser.ptuls_grammar import protools_text_export_grammar
from .ptuls_parser_visitor import DictionaryParserVisitor
from .transformations import TimecodeInterpreter
from ptulsconv.old_parser.ptuls_parser_visitor import DictionaryParserVisitor
from ptulsconv.old_parser.transformations import TimecodeInterpreter
__version__ = '0.7.0'
__author__ = 'Jamie Hardt'

View File

@@ -8,15 +8,19 @@ import csv
import ptulsconv
from .reporting import print_section_header_style, print_status_style, print_warning
from .validations import *
from .xml.common import dump_fmpxml, fmp_transformed_dump
from ptulsconv.docparser import parse_document
from ptulsconv.docparser.tag_compiler import TagCompiler
from ptulsconv.pdf.supervisor_1pg import output_report as output_supervisor_1pg
from ptulsconv.pdf.line_count import output_report as output_line_count
from ptulsconv.pdf.talent_sides import output_report as output_talent_sides
from ptulsconv.pdf.summary_log import output_report as output_summary
from .docparser.adr_entity import adr_field_map
from json import JSONEncoder
class MyEncoder(JSONEncoder):
def default(self, o):
return o.__dict__
def dump_csv(events, output=sys.stdout):
keys = set()
@@ -38,39 +42,11 @@ def dump_keyed_csv(events, keys=(), output=sys.stdout):
writer.writerow(this_row)
def dump_field_map(field_map_name, output=sys.stdout):
output.write("# Map of Tag fields to XML output columns\n")
output.write("# (in order of precedence)\n")
output.write("# \n")
field_map = []
if field_map_name == 'ADR':
field_map = adr_field_map
output.write("# ADR Table Fields\n")
def dump_field_map(output=sys.stdout):
from ptulsconv.docparser.tag_mapping import TagMapping
from ptulsconv.docparser.adr_entity import ADRLine
output.write("# \n")
output.write("# Tag Name | FMPXMLRESULT Column | Type | Column \n")
output.write("# ----------------------------+----------------------+---------+--------\n")
for n, field in enumerate(field_map):
for tag in field[0]:
output.write("# %-27s-> %-20s | %-8s| %-7i\n" % (tag[:27], field[1][:20], field[2].__name__, n + 1))
def normalize_record_keys_for_adr(records):
for record in records['events']:
if 'ADR' not in record.keys():
continue
for field in adr_field_map:
spot_keys = field[0]
output_key = field[1]
field_type = field[2]
for attempt_key in spot_keys:
if attempt_key in record.keys():
record[output_key] = field_type(record[attempt_key])
break
return records
TagMapping.print_rules(ADRLine, output=output)
def output_adr_csv(lines):
@@ -170,66 +146,38 @@ def convert(input_file, output_format='fmpxml',
progress=False, include_muted=False, xsl=None,
output=sys.stdout, log_output=sys.stderr, warnings=True):
with open(input_file, 'r') as file:
print_section_header_style('Parsing')
parsed = parse_text_export(file)
session = parse_document(input_file)
compiler = TagCompiler()
compiler.session = session
compiled_events = compiler.compile_events()
tcxform = ptulsconv.transformations.TimecodeInterpreter()
tagxform = ptulsconv.transformations.TagInterpreter(show_progress=progress,
ignore_muted=(not include_muted),
log_output=log_output)
parsed = tcxform.transform(parsed)
parsed = tagxform.transform(parsed)
# start=None, end=None, select_reel=None
#
# if start is not None and end is not None:
# start_fs = tcxform.convert_time(start,
# frame_rate=parsed['header']['timecode_format'],
# drop_frame=parsed['header']['timecode_drop_frame'])['frame_count']
#
# end_fs = tcxform.convert_time(end,
# frame_rate=parsed['header']['timecode_format'],
# drop_frame=parsed['header']['timecode_drop_frame'])['frame_count']
#
# subclipxform = ptulsconv.transformations.SubclipOfSequence(start=start_fs, end=end_fs)
# parsed = subclipxform.transform(parsed)
#
# if select_reel is not None:
# reel_xform = ptulsconv.transformations.SelectReel(reel_num=select_reel)
# parsed = reel_xform.transform(parsed)
parsed = normalize_record_keys_for_adr(parsed)
lines = list(map(ADRLine.from_event, compiled_events))
if warnings:
for warning in chain(validate_unique_field(parsed, field='QN'),
validate_non_empty_field(parsed, field='QN'),
validate_non_empty_field(parsed, field='CN'),
validate_non_empty_field(parsed, field='Title'),
validate_dependent_value(parsed, key_field='CN',
dependent_field='Char'),
validate_dependent_value(parsed, key_field='CN',
dependent_field='Actor'),
validate_unique_count(parsed, field='Title', count=1),
validate_unique_count(parsed, field='Spotting', count=1),
validate_unique_count(parsed, field='Supervisor', count=1)):
for warning in chain(validate_unique_field(lines, field='cue_number'),
validate_non_empty_field(lines, field='cue_number'),
validate_non_empty_field(lines, field='character_id'),
validate_non_empty_field(lines, field='title'),
validate_dependent_value(lines, key_field='character_id',
dependent_field='character_name'),
validate_dependent_value(lines, key_field='character_id',
dependent_field='actor_name')):
print_warning(warning.report_message())
if output_format == 'json':
json.dump(parsed, output)
print(MyEncoder().encode(lines))
elif output_format == 'csv':
dump_csv(parsed['events'])
# elif output_format == 'csv':
# dump_csv(parsed['events'])
#
# elif output_format == 'adr':
# create_adr_reports(parsed)
elif output_format == 'adr':
create_adr_reports(parsed)
elif output_format == 'fmpxml':
if xsl is None:
dump_fmpxml(parsed, input_file, output, adr_field_map)
else:
print_section_header_style("Performing XSL Translation")
print_status_style("Using builtin translation: %s" % xsl)
fmp_transformed_dump(parsed, input_file, xsl, output)
# elif output_format == 'fmpxml':
# if xsl is None:
# dump_fmpxml(parsed, input_file, output, adr_field_map)
# else:
# print_section_header_style("Performing XSL Translation")
# print_status_style("Using builtin translation: %s" % xsl)
# fmp_transformed_dump(parsed, input_file, xsl, output)

View File

@@ -1,70 +1,32 @@
from .doc_entity import SessionDescriptor, TrackDescriptor, TrackClipDescriptor
from .tag_compiler import Event
from typing import Optional, Generator
from ptulsconv.docparser.tag_compiler import Event
from typing import Optional
from dataclasses import dataclass
# field_map maps tags in the text export to fields in FMPXMLRESULT
# - tuple field 0 is a list of tags, the first tag with contents will be used as source
# - tuple field 1 is the field in FMPXMLRESULT
# - tuple field 2 the constructor/type of the field
from .tag_mapping import TagMapping
adr_field_map = ((['Title', 'PT.Session.Name'], 'Title', str),
(['Supv'], 'Supervisor', str),
(['Client'], 'Client', str),
(['Sc'], 'Scene', str),
(['Ver'], 'Version', str),
(['Reel'], 'Reel', str),
(['PT.Clip.Start'], 'Start', str),
(['PT.Clip.Finish'], 'Finish', str),
(['PT.Clip.Start_Seconds'], 'Start Seconds', float),
(['PT.Clip.Finish_Seconds'], 'Finish Seconds', float),
(['PT.Clip.Start_Frames'], 'Start Frames', int),
(['PT.Clip.Finish_Frames'], 'Finish Frames', int),
(['P'], 'Priority', int),
(['QN'], 'Cue Number', str),
(['Char', 'PT.Track.Name'], 'Character Name', str),
(['Actor'], 'Actor Name', str),
(['CN'], 'Character Number', str),
(['R'], 'Reason', str),
(['Rq'], 'Requested by', str),
(['Spot'], 'Spot', str),
(['PT.Clip.Name', 'Line'], 'Line', str),
(['Shot'], 'Shot', str),
(['Note'], 'Note', str),
(['Mins'], 'Time Budget Mins', float),
(['EFF'], 'Effort', str),
(['TV'], 'TV', str),
(['TBW'], 'To Be Written', str),
(['OMIT'], 'Omit', str),
(['ADLIB'], 'Adlib', str),
(['OPT'], 'Optional', str),
(['DONE'], 'Done', str),
(['Movie.Filename'], 'Movie', str),
(['Movie.Start_Offset_Seconds'], 'Movie Seconds', float),
)
from ptulsconv.docparser.tag_mapping import TagMapping
@dataclass
class ADRLine:
title: str
supervisor: str
client: str
scene: str
version: str
reel: str
start: str
finish: str
priority: int
cue_number: str
character_id: str
character_name: str
actor_name: str
prompt: str
reason: str
requested_by: str
time_budget_mins: float
note: str
spot: str
shot: str
title: Optional[str]
supervisor: Optional[str]
client: Optional[str]
scene: Optional[str]
version: Optional[str]
reel: Optional[str]
start: Optional[str]
finish: Optional[str]
priority: Optional[int]
cue_number: Optional[str]
character_id: Optional[str]
character_name: Optional[str]
actor_name: Optional[str]
prompt: Optional[str]
reason: Optional[str]
requested_by: Optional[str]
time_budget_mins: Optional[float]
note: Optional[str]
spot: Optional[str]
shot: Optional[str]
effort: bool
tv: bool
tbw: bool
@@ -72,7 +34,7 @@ class ADRLine:
adlib: bool
optional: bool
adr_tag_to_line_map = [
tag_mapping = [
TagMapping(source='Title', target="title", alt=TagMapping.ContentSource.Session),
TagMapping(source="Supv", target="supervisor"),
TagMapping(source="Client", target="client"),
@@ -135,9 +97,10 @@ class ADRLine:
self.optional = False
@classmethod
def from_event(cls, event: Event) -> Optional['ADRLine']:
def from_event(cls, event: Event) -> 'ADRLine':
new = cls()
TagMapping.apply_rules(cls.adr_tag_to_line_map, event.tags,
TagMapping.apply_rules(cls.tag_mapping, event.tags,
event.clip_name, event.track_name, event.session_name, new)
return new

View File

@@ -92,6 +92,7 @@ class HeaderDescriptor:
def _get_tc_format_params(self) -> Tuple[int, Fraction]:
frame_rates = {"23.976": (24, Fraction(1001, 24_000)),
"24": (24, Fraction(1, 24)),
"25": (25, Fraction(1, 25)),
"29.97": (30, Fraction(1001, 30_000)),
"30": (30, Fraction(1, 30)),
"59.94": (60, Fraction(1001, 60_000)),

View File

@@ -12,7 +12,7 @@ protools_text_export_grammar = Grammar(
"# OF AUDIO CLIPS:" fs integer_value rs
"# OF AUDIO FILES:" fs integer_value rs block_ending
frame_rate = ("60" / "59.94" / "30" / "29.97" / "24" / "23.976")
frame_rate = ("60" / "59.94" / "30" / "29.97" / "25" / "24" / "23.976")
files_section = files_header files_column_header file_record* block_ending
files_header = "F I L E S I N S E S S I O N" rs
files_column_header = "Filename" isp fs "Location" rs

View File

@@ -1,3 +1,4 @@
import sys
from enum import Enum
from typing import Optional, Callable, Any, List
@@ -12,6 +13,29 @@ class TagMapping:
alternate_source: Optional[ContentSource]
formatter: Callable[[str], Any]
@staticmethod
def print_rules(for_type: object, output=sys.stdout):
format_str = "%-20s | %-20s | %-25s"
hr = "%s+%s+%s" % ("-" * 21, "-" * 23, "-" * 26)
print("Tag mapping for %s" % for_type.__name__)
print(hr)
print(format_str % ("Tag Source", "Target", "Type"),
file=output)
print(hr)
for rule in for_type.tag_mapping:
t = for_type.__annotations__[rule.target]
print(format_str % (rule.source, rule.target, t),
file=output)
if rule.alternate_source is TagMapping.ContentSource.Session:
print(format_str % (" - (Session Name)", rule.target, t),
file=output)
elif rule.alternate_source is TagMapping.ContentSource.Track:
print(format_str % (" - (Track Name)", rule.target, t),
file=output)
elif rule.alternate_source is TagMapping.ContentSource.Clip:
print(format_str % (" - (Clip Name)", rule.target, t),
file=output)
@staticmethod
def apply_rules(rules: List['TagMapping'],
tags: dict,
@@ -30,11 +54,11 @@ class TagMapping:
def __init__(self, source: str,
target: str,
alt: Optional[ContentSource] = None,
formatter=(lambda x: x)):
formatter=None):
self.source = source
self.target = target
self.alternate_source = alt
self.formatter = formatter
self.formatter = formatter or (lambda x: x)
def apply(self, tags: dict,
clip_content: str,

View File

View File

@@ -1,11 +1,11 @@
from . import broadcast_timecode
from .docparser.tagged_string_parser_visitor import TaggedStringResult, tag_grammar
from ptulsconv import broadcast_timecode
from ptulsconv.docparser.tagged_string_parser_visitor import TaggedStringResult, tag_grammar
from parsimonious.exceptions import IncompleteParseError
import math
import sys
from .docparser.tagged_string_parser_visitor import TagListVisitor
from .reporting import print_advisory_tagging_error, print_section_header_style, print_status_style
from ptulsconv.docparser.tagged_string_parser_visitor import TagListVisitor
from ptulsconv.reporting import print_advisory_tagging_error, print_section_header_style, print_status_style
from tqdm import tqdm

View File

@@ -1,58 +1,67 @@
from dataclasses import dataclass
from sys import stderr
from ptulsconv.docparser.adr_entity import ADRLine
from typing import List, Iterator, Optional
@dataclass
class ValidationError:
message: str
event: dict
event: Optional[ADRLine]
def report_message(self):
return f"{self.message}: event at {self.event['PT.Clip.Start']} on track {self.event['PT.Track.Name']}"
if self.event is not None:
return f"{self.message}: event at {self.event.start} with number {self.event.cue_number}"
else:
return self.message
def validate_unique_count(input_dict, field='Title', count=1):
values = set(list(map(lambda e: e.get(field, None), input_dict['events'])))
def validate_unique_count(input_lines: Iterator[ADRLine], field='title', count=1):
values = set(list(map(lambda e: getattr(e, field), input_lines)))
if len(values) > count:
yield ValidationError(message="Field {} has too many values (max={}): {}".format(field, count, values))
def validate_value(input_dict, key_field, predicate):
for event in input_dict['events']:
val = event[key_field]
def validate_value(input_lines: Iterator[ADRLine], key_field, predicate):
for event in input_lines:
val = getattr(event, key_field)
if not predicate(val):
yield ValidationError(message='Field {} not in range'.format(val),
event=event)
def validate_unique_field(input_dict, field='QN'):
def validate_unique_field(input_lines: Iterator[ADRLine], field='cue_number'):
values = set()
for event in input_dict['events']:
if event[field] in values:
for event in input_lines:
this = getattr(event, field)
if this in values:
yield ValidationError(message='Re-used {}'.format(field), event=event)
else:
values.update(this)
def validate_non_empty_field(input_dict, field='QN'):
for event in input_dict['events']:
if field not in event.keys() or len(event[field]) == 0:
def validate_non_empty_field(input_lines: Iterator[ADRLine], field='cue_number'):
for event in input_lines:
if getattr(event, field, None) is None:
yield ValidationError(message='Empty field {}'.format(field), event=event)
def validate_dependent_value(input_dict, key_field, dependent_field):
def validate_dependent_value(input_lines: Iterator[ADRLine], key_field, dependent_field):
"""
Validates that two events with the same value in `key_field` always have the
same value in `dependent_field`
"""
value_map = dict()
for event in input_dict['events']:
if key_field not in event.keys():
continue
key_values = set((getattr(x, key_field) for x in input_lines))
if event[key_field] not in value_map.keys():
value_map[event[key_field]] = event.get(dependent_field, None)
else:
if value_map[event[key_field]] != event.get(dependent_field, None):
yield ValidationError(message='Field {} depends on key field {} (value={}), expected {}, was {}'
.format(dependent_field, key_field, event[key_field], value_map[key_field],
event.get(dependent_field, None)), event=event)
for key_value in key_values:
rows = [(getattr(x, key_field), getattr(x, dependent_field)) for x in input_lines
if getattr(x, key_field) == key_value]
unique_rows = set(rows)
if len(unique_rows) > 1:
message = "Non-unique values for key {} = ".format(key_field)
for u in unique_rows:
message = message + "\n - {} -> {}".format(u[0], u[1])
yield ValidationError(message=message, event=None)