Work on rewrting the parser

This commit is contained in:
Jamie Hardt
2021-05-26 16:50:46 -07:00
parent f4ad4a5b5d
commit 3889f871b8
6 changed files with 275 additions and 6 deletions

View File

@@ -1,4 +1,4 @@
from .ptuls_grammar import protools_text_export_grammar
from ptulsconv.docparser.ptuls_grammar import protools_text_export_grammar
from .ptuls_parser_visitor import DictionaryParserVisitor
from .transformations import TimecodeInterpreter

View File

@@ -1,4 +1,4 @@
from ptulsconv.commands import convert, dump_field_map
from ptulsconv.commands import convert, dump_field_map, raw_output
from ptulsconv import __name__, __version__, __author__
from optparse import OptionParser, OptionGroup
from .xml.common import dump_xform_options
@@ -22,14 +22,15 @@ def main():
filter_opts.add_option('-m', '--include-muted', default=False, action='store_true', dest='include_muted',
help='Include muted clips.')
# filter_opts.add_option('-r', '--reel', dest='select_reel', help="Output only events in reel N, and recalculate "
# " start times relative to that reel's start time.",
# filter_opts.add_option('-r', '--reel', dest='select_reel',
# help="Output only events in reel N, and recalculate "
# " start times relative to that reel's start time.",
# default=None, metavar='N')
parser.add_option_group(filter_opts)
parser.add_option('-f', '--format', dest='output_format', metavar='FMT',
choices=['fmpxml', 'json', 'adr', 'csv'], default='fmpxml',
choices=['fmpxml', 'json', 'adr', 'csv', 'raw'], default='fmpxml',
help='Set output format, `fmpxml`, `json`, `csv`, or `adr`. Default '
'is `fmpxml`.')
@@ -59,6 +60,10 @@ def main():
(options, args) = parser.parse_args(sys.argv)
if options.output_format == 'raw':
raw_output(args[1])
exit(0)
print_banner_style("%s %s (c) 2020 %s. All rights reserved." % (__name__, __version__, __author__))
print_section_header_style("Startup")

View File

@@ -3,7 +3,7 @@ import os
import sys
from itertools import chain
from collections import namedtuple
import csv
import ptulsconv
@@ -176,6 +176,7 @@ def create_adr_reports(parsed):
os.chdir("Talent Scripts")
output_talent_sides(lines)
def parse_text_export(file):
ast = ptulsconv.protools_text_export_grammar.parse(file.read())
dict_parser = ptulsconv.DictionaryParserVisitor()
@@ -187,6 +188,21 @@ def parse_text_export(file):
return parsed
def raw_output(input_file, output=sys.stdout):
from .docparser.doc_parser_visitor import DocParserVisitor
from json import JSONEncoder
class DescriptorJSONEncoder(JSONEncoder):
def default(self, obj):
return obj.__dict__
with open(input_file, 'r') as file:
ast = ptulsconv.protools_text_export_grammar.parse(file.read())
visitor = DocParserVisitor()
parsed = visitor.visit(ast)
json.dump(parsed, output, cls=DescriptorJSONEncoder)
def convert(input_file, output_format='fmpxml',
progress=False, include_muted=False, xsl=None,
output=sys.stdout, log_output=sys.stderr, warnings=True):

View File

View File

@@ -0,0 +1,248 @@
from parsimonious.nodes import NodeVisitor, Node
from collections import namedtuple
# _SessionDescriptor = namedtuple('_SessionDescriptor',
# "header files clips plugins tracks markers")
#
# _HeaderDescriptor = namedtuple('_HeaderDescriptor',
# "session_name sample_rate bit_depth start_timecode "
# "timecode_format timecode_drop_frame "
# "count_audio_tracks count_clips count_files")
#
# _TrackDescriptor = namedtuple("_TrackDescriptor",
# "name comments user_delay_samples state plugins "
# "clips")
#
# _TrackClipDescriptor = namedtuple("_TrackClipDescriptor",
# "channel event clip_name start_time end_time "
# "duration timestamp state")
#
# _PluginDescriptor = namedtuple("_PluginDescriptor",
# "manufacturer plugin_name version format stems "
# "count_instances")
#
# _MarkerDescriptor = namedtuple("_MarkerDescriptor",
# "number location time_reference units name "
# "comments")
#
# _FileDescriptor = namedtuple("_FileDescriptor", "filename path")
class SessionDescriptor:
def __init__(self, **kwargs):
self.header = kwargs['header']
self.files = kwargs['files']
self.clips = kwargs['clips']
self.plugins = kwargs['plugins']
self.tracks = kwargs['tracks']
self.markers = kwargs['markers']
class HeaderDescriptor:
def __init__(self, **kwargs):
self.session_name = kwargs['session_name']
self.sample_rate = kwargs['sample_rate']
self.bit_depth = kwargs['bit_depth']
self.start_timecode = kwargs['start_timecode']
self.timecode_format = kwargs['timecode_format']
self.timecode_drop_frame = kwargs['timecode_drop_frame']
self.count_audio_tracks = kwargs['count_audio_tracks']
self.count_clips = kwargs['count_clips']
self.count_files = kwargs['count_files']
class TrackDescriptor:
def __init__(self, **kwargs):
self.name = kwargs['name']
self.comments = kwargs['comments']
self.user_delay_samples = kwargs['user_delay_samples']
self.state = kwargs['state']
self.plugins = kwargs['plugins']
self.clips = kwargs['clips']
class FileDescriptor(dict):
pass
class TrackClipDescriptor:
def __init__(self, **kwargs):
self.channel = kwargs['channel']
self.event = kwargs['event']
self.clip_name = kwargs['clip_name']
self.start_time = kwargs['start_time']
self.end_time = kwargs['end_time']
self.duration = kwargs['duration']
self.timestamp = kwargs['timestamp']
self.state = kwargs['state']
class ClipDescriptor(dict):
pass
class PluginDescriptor(dict):
pass
class MarkerDescriptor:
def __init__(self, **kwargs):
self.number = kwargs['number']
self.location = kwargs['location']
self.time_reference = kwargs['time_reference']
self.units = kwargs['units']
self.name = kwargs['name']
self.comments = kwargs['comments']
class DocParserVisitor(NodeVisitor):
@staticmethod
def visit_document(_, visited_children) -> SessionDescriptor:
files = next(iter(visited_children[1]), None)
clips = next(iter(visited_children[2]), None)
plugins = next(iter(visited_children[3]), None)
tracks = next(iter(visited_children[4]), None)
markers = next(iter(visited_children[5]), None)
return SessionDescriptor(header=visited_children[0],
files=files,
clips=clips,
plugins=plugins,
tracks=tracks,
markers=markers)
@staticmethod
def visit_header(_, visited_children):
tc_drop = False
for _ in visited_children[20]:
tc_drop = True
return HeaderDescriptor(session_name=visited_children[2],
sample_rate=visited_children[6],
bit_depth=visited_children[10],
start_timecode=visited_children[15],
timecode_format=visited_children[19],
timecode_drop_frame=tc_drop,
count_audio_tracks=visited_children[25],
count_clips=visited_children[29],
count_files=visited_children[33])
@staticmethod
def visit_files_section(_, visited_children):
return list(map(lambda child: FileDescriptor(filename=child[0], path=child[2]), visited_children[2]))
@staticmethod
def visit_clips_section(_, visited_children):
channel = next(iter(visited_children[2][3]), 1)
return list(map(lambda child: ClipDescriptor(clip_name=child[0], file=child[2], channel=channel),
visited_children[2]))
@staticmethod
def visit_plugin_listing(_, visited_children):
return list(map(lambda child: PluginDescriptor(manufacturer=child[0],
plugin_name=child[2],
version=child[4],
format=child[6],
stems=child[8],
count_instances=child[10]),
visited_children[2]))
@staticmethod
def visit_track_block(_, visited_children):
track_header, track_clip_list = visited_children
clips = []
for clip in track_clip_list:
if clip[0] is not None:
clips.append(clip[0])
plugins = []
for plugin_opt in track_header[16]:
for plugin in plugin_opt[1]:
plugins.append(plugin[1])
return TrackDescriptor(
name=track_header[2],
comments=track_header[6],
user_delay_samples=track_header[10],
state=track_header[14],
plugins=plugins,
clips=clips
)
@staticmethod
def visit_track_listing(_, visited_children):
return visited_children[1]
@staticmethod
def visit_track_clip_entry(_, visited_children):
timestamp = None
if isinstance(visited_children[14], list):
timestamp = visited_children[14][0][0]
return TrackClipDescriptor(channel=visited_children[0],
event=visited_children[3],
clip_name=visited_children[6],
start_time=visited_children[8],
end_time=visited_children[10],
duration=visited_children[12],
timestamp=timestamp,
state=visited_children[15])
@staticmethod
def visit_track_state_list(_, visited_children):
states = []
for next_state in visited_children:
states.append(next_state[0][0].text)
return states
@staticmethod
def visit_track_clip_state(node, _):
return node.text
@staticmethod
def visit_markers_listing(_, visited_children):
markers = []
for marker in visited_children[2]:
markers.append(marker)
return markers
@staticmethod
def visit_marker_record(_, visited_children):
return MarkerDescriptor(number=visited_children[0],
location=visited_children[3],
time_reference=visited_children[5],
units=visited_children[8],
name=visited_children[10],
comments=visited_children[12])
@staticmethod
def visit_formatted_clip_name(_, visited_children):
return visited_children[1].text
@staticmethod
def visit_string_value(node, _):
return node.text.strip(" ")
@staticmethod
def visit_integer_value(node, _):
return int(node.text)
# def visit_timecode_value(self, node, visited_children):
# return node.text.strip(" ")
@staticmethod
def visit_float_value(node, _):
return float(node.text)
def visit_block_ending(self, node, visited_children):
pass
def generic_visit(self, node, visited_children):
""" The generic visit method. """
return visited_children or node