Refactoring tag parser

This commit is contained in:
Jamie Hardt
2021-05-27 21:34:43 -07:00
parent d1bb5990b2
commit 3dd36a9901
5 changed files with 145 additions and 114 deletions

View File

@@ -0,0 +1,92 @@
from parsimonious import NodeVisitor, Grammar
from typing import Dict, Optional
tag_grammar = Grammar(
r"""
document = modifier? line? word_sep? tag_list?
line = word (word_sep word)*
tag_list = tag*
tag = key_tag / short_tag / full_text_tag / tag_junk
key_tag = "[" key "]" word_sep?
short_tag = "$" key "=" word word_sep?
full_text_tag = "{" key "=" value "}" word_sep?
key = ~"[A-Za-z][A-Za-z0-9_]*"
value = ~"[^}]+"
tag_junk = word word_sep?
word = ~"[^ \[\{\$][^ ]*"
word_sep = ~" +"
modifier = ("@" / "&" / "!") word_sep?
"""
)
def parse_tags(prompt) -> "TaggedStringResult":
ast = tag_grammar.parse(prompt)
return TagListVisitor().visit(ast)
class TaggedStringResult:
content: Optional[str]
tag_dict: Optional[Dict[str, str]]
mode: str
def __init__(self, content, tag_dict, mode):
self.content = content
self.tag_dict = tag_dict
self.mode = mode
class TagListVisitor(NodeVisitor):
@staticmethod
def visit_document(_, visited_children) -> TaggedStringResult:
modifier_opt, line_opt, _, tag_list_opt = visited_children
return TaggedStringResult(content=next(iter(line_opt), None),
tag_dict=next(iter(tag_list_opt), None),
mode=next(iter(modifier_opt), 'Normal')
)
@staticmethod
def visit_line(node, _):
return str.strip(node.text, " ")
@staticmethod
def visit_modifier(node, _):
if node.text.startswith('@'):
return 'Timespan'
elif node.text.startswith('&'):
return 'Append'
elif node.text.startswith('!'):
return 'Movie'
else:
return 'Normal'
@staticmethod
def visit_tag_list(_, visited_children):
retdict = dict()
for child in visited_children:
if child[0] is not None:
k, v = child[0]
retdict[k] = v
return retdict
@staticmethod
def visit_key_tag(_, children):
return children[1].text, children[1].text
@staticmethod
def visit_short_tag(_, children):
return children[1].text, children[3].text
@staticmethod
def visit_full_text_tag(_, children):
return children[1].text, children[3].text
@staticmethod
def visit_tag_junk(_node, _visited_children):
return None
def generic_visit(self, node, visited_children) -> object:
return visited_children or node

View File

@@ -1,8 +1,10 @@
from . import broadcast_timecode
from parsimonious import Grammar, NodeVisitor
from .docparser.tagged_string_parser_visitor import TaggedStringResult, tag_grammar
from parsimonious.exceptions import IncompleteParseError
import math
import sys
from .docparser.tagged_string_parser_visitor import TagListVisitor
from .reporting import print_advisory_tagging_error, print_section_header_style, print_status_style
from tqdm import tqdm
@@ -70,71 +72,9 @@ class TimecodeInterpreter(Transformation):
class TagInterpreter(Transformation):
tag_grammar = Grammar(
r"""
document = modifier? line? word_sep? tag_list?
line = word (word_sep word)*
tag_list = tag*
tag = key_tag / short_tag / full_text_tag / tag_junk
key_tag = "[" key "]" word_sep?
short_tag = "$" key "=" word word_sep?
full_text_tag = "{" key "=" value "}" word_sep?
key = ~"[A-Za-z][A-Za-z0-9_]*"
value = ~"[^}]+"
tag_junk = word word_sep?
word = ~"[^ \[\{\$][^ ]*"
word_sep = ~" +"
modifier = ("@" / "&" / "!") word_sep?
"""
)
class TagListVisitor(NodeVisitor):
def visit_document(self, _, visited_children):
modifier_opt, line_opt, _, tag_list_opt = visited_children
return dict(line=next(iter(line_opt), None),
tags=next(iter(tag_list_opt), None),
mode=next(iter(modifier_opt), 'Normal')
)
def visit_line(self, node, _):
return str.strip(node.text, " ")
def visit_modifier(self, node, _):
if node.text.startswith('@'):
return 'Timespan'
elif node.text.startswith('&'):
return 'Append'
elif node.text.startswith('!'):
return 'Movie'
else:
return 'Normal'
def visit_tag_list(self, _, visited_children):
retdict = dict()
for child in visited_children:
if child[0] is not None:
k, v = child[0]
retdict[k] = v
return retdict
def visit_key_tag(self, _, children):
return children[1].text, children[1].text
def visit_short_tag(self, _, children):
return children[1].text, children[3].text
def visit_full_text_tag(self, _, children):
return children[1].text, children[3].text
def visit_tag_junk(self, node, _):
return None
def generic_visit(self, node, visited_children):
return visited_children or node
def __init__(self, ignore_muted=True, show_progress=False, log_output=sys.stderr):
self.visitor = TagInterpreter.TagListVisitor()
self.visitor = TagListVisitor()
self.ignore_muted = ignore_muted
self.show_progress = show_progress
self.log_output = log_output
@@ -142,7 +82,7 @@ class TagInterpreter(Transformation):
self.transformed = list()
self.timespan_rules = list()
self.movie_rules = list()
self.title_tags = {}
self.title_tags = None
self.markers = list()
def transform(self, input_dict: dict) -> dict:
@@ -170,8 +110,8 @@ class TagInterpreter(Transformation):
parent_track_name=track['name'])
comment_tags = self.parse_tags(track['comments'],
parent_track_name=track['name'])
track_context_tags = track_tags['tags']
track_context_tags.update(comment_tags['tags'])
track_context_tags = track_tags.tag_dict
track_context_tags.update(comment_tags.tag_dict)
for clip in track['clips']:
if clip['state'] == 'Muted' and self.ignore_muted:
@@ -181,32 +121,32 @@ class TagInterpreter(Transformation):
parent_track_name=track['name'],
clip_time=clip['start_time'])
if clip_tags['mode'] == 'Normal':
if clip_tags.mode == 'Normal':
event = self.decorate_event(clip, clip_tags, input_dict['header'],
track_context_tags, track_tags)
self.transformed.append(event)
elif clip_tags['mode'] == 'Append':
elif clip_tags.mode == 'Append':
assert len(self.transformed) > 0, "First clip is in '&'-Append mode, fatal error."
self.transformed[-1].update(clip_tags['tags'])
self.transformed[-1]['PT.Clip.Name'] = self.transformed[-1]['PT.Clip.Name'] + " " + clip_tags[
'line']
self.transformed[-1].update(clip_tags.tag_dict)
self.transformed[-1]['PT.Clip.Name'] = self.transformed[-1]['PT.Clip.Name'] + " " \
+ clip_tags.content
self.transformed[-1]['PT.Clip.Finish_Frames'] = clip['end_time_decoded']['frame_count']
self.transformed[-1]['PT.Clip.Finish'] = clip['end_time']
self.transformed[-1]['PT.Clip.Finish_Seconds'] = \
clip['end_time_decoded']['frame_count'] / input_dict['header']['timecode_format']
elif clip_tags['mode'] == 'Timespan':
elif clip_tags.mode == 'Timespan':
rule = {'start_time_literal': clip['start_time'],
'start_time': clip['start_time_decoded']['frame_count'],
'start_time_seconds': clip['start_time_decoded']['frame_count'] / input_dict['header'][
'timecode_format'], 'end_time': clip['end_time_decoded']['frame_count'],
'tags': clip_tags['tags']}
'tags': clip_tags.tag_dict}
self.timespan_rules.append(rule)
elif clip_tags['mode'] == 'Movie':
rule = dict(movie_path=clip_tags['tags']['Movie'],
elif clip_tags.mode == 'Movie':
rule = dict(movie_path=clip_tags.tag_dict['Movie'],
start_time=clip['start_time_decoded']['frame_count'],
end_time=clip['end_time_decoded']['frame_count'])
self.movie_rules.append(rule)
@@ -217,19 +157,19 @@ class TagInterpreter(Transformation):
def decorate_event(self, clip, clip_tags, header_dict, track_context_tags, track_tags):
event = dict()
start_frame = clip['start_time_decoded']['frame_count']
event.update(self.title_tags['tags'])
event.update(self.title_tags.tag_dict)
event.update(track_context_tags)
event.update(self.effective_timespan_tags_at_time(start_frame))
event.update(self.effective_marker_tags_at_time(start_frame))
event.update(self.effective_movie_at_time(start_frame, header_dict['timecode_format']))
event.update(clip_tags['tags'])
event['PT.Track.Name'] = track_tags['line']
event['PT.Session.Name'] = self.title_tags['line']
event.update(clip_tags.tag_dict)
event['PT.Track.Name'] = track_tags.content
event['PT.Session.Name'] = self.title_tags.content
event['PT.Session.TimecodeFormat'] = header_dict['timecode_format']
event['PT.Session.Start'] = header_dict['start_timecode']
event['PT.Session.DropFrame'] = header_dict['timecode_drop_frame']
event['PT.Clip.Number'] = clip['event']
event['PT.Clip.Name'] = clip_tags['line']
event['PT.Clip.Name'] = clip_tags.content
event['PT.Clip.Start'] = clip['start_time']
event['PT.Clip.Finish'] = clip['end_time']
event['PT.Clip.Start_Frames'] = start_frame
@@ -273,10 +213,10 @@ class TagInterpreter(Transformation):
retval = dict()
for marker in self.markers:
marker_name_tags = self.parse_tags(marker['name'], marker_index=marker['number'])
marker_comment_tags = self.parse_tags(marker['comments'], marker_index=marker['number'])
effective_tags = marker_name_tags['tags']
effective_tags.update(marker_comment_tags['tags'])
marker_name_tags = self.parse_tags(marker['name'])
marker_comment_tags = self.parse_tags(marker['comments'])
effective_tags = marker_name_tags.tag_dict
effective_tags.update(marker_comment_tags.tag_dict)
if marker['location_decoded']['frame_count'] <= time:
retval.update(effective_tags)
@@ -284,9 +224,9 @@ class TagInterpreter(Transformation):
break
return retval
def parse_tags(self, source, parent_track_name=None, clip_time=None, marker_index=None):
def parse_tags(self, source, parent_track_name=None, clip_time=None) -> TaggedStringResult:
try:
parse_tree = self.tag_grammar.parse(source)
parse_tree = tag_grammar.parse(source)
return self.visitor.visit(parse_tree)
except IncompleteParseError as e:
print_advisory_tagging_error(failed_string=source,
@@ -294,7 +234,7 @@ class TagInterpreter(Transformation):
clip_time=clip_time, position=e.pos)
trimmed_source = source[:e.pos]
parse_tree = self.tag_grammar.parse(trimmed_source)
parse_tree = tag_grammar.parse(trimmed_source)
return self.visitor.visit(parse_tree)

View File

@@ -1,4 +1,3 @@
import datetime
import os
import os.path
import pathlib

View File

@@ -2,40 +2,41 @@ import unittest
from ptulsconv.transformations import TagInterpreter
class TestTagInterpreter(unittest.TestCase):
def test_line(self):
ti = TagInterpreter()
s1 = ti.parse_tags("this is a test")
self.assertEqual(s1['line'], "this is a test")
self.assertEqual(s1['mode'], 'Normal')
self.assertEqual(len(s1['tags']), 0)
self.assertEqual(s1.content, "this is a test")
self.assertEqual(s1.mode, 'Normal')
self.assertEqual(len(s1.tag_dict), 0)
s2 = ti.parse_tags("this! IS! Me! ** Typing! 123 <> |||")
self.assertEqual(s2['line'], "this! IS! Me! ** Typing! 123 <> |||")
self.assertEqual(s2['mode'], 'Normal')
self.assertEqual(len(s2['tags']), 0)
self.assertEqual(s2.content, "this! IS! Me! ** Typing! 123 <> |||")
self.assertEqual(s2.mode, 'Normal')
self.assertEqual(len(s2.tag_dict), 0)
def test_tags(self):
ti = TagInterpreter()
s1 = ti.parse_tags("{a=100}")
self.assertIn('tags', s1)
self.assertEqual(s1['tags']['a'], "100")
self.assertEqual(s1.tag_dict['a'], "100")
s2 = ti.parse_tags("{b=This is a test} [option] $X=9")
self.assertEqual(s2['tags']['b'], 'This is a test')
self.assertEqual(s2['tags']['option'], 'option')
self.assertEqual(s2['tags']['X'], "9")
self.assertEqual(s2.tag_dict['b'], 'This is a test')
self.assertEqual(s2.tag_dict['option'], 'option')
self.assertEqual(s2.tag_dict['X'], "9")
def test_modes(self):
ti = TagInterpreter()
s1 = ti.parse_tags("@ Monday Tuesday {a=1}")
self.assertEqual(s1['mode'], 'Timespan')
self.assertEqual(s1.mode, 'Timespan')
s2 = ti.parse_tags("Monday Tuesday {a=1}")
self.assertEqual(s2['mode'], 'Normal')
self.assertEqual(s2.mode, 'Normal')
s3 = ti.parse_tags("&Monday Tuesday {a=1}")
self.assertEqual(s3['mode'], 'Append')
self.assertEqual(s3.mode, 'Append')
if __name__ == '__main__':
unittest.main()

View File

@@ -2,8 +2,8 @@ import unittest
import ptulsconv
import os.path
class TaggingIntegratedTests(unittest.TestCase):
class TaggingIntegratedTests(unittest.TestCase):
path = os.path.dirname(__file__) + '/export_cases/Tag Tests/Tag Tests.txt'
def test_event_list(self):
@@ -87,6 +87,5 @@ class TaggingIntegratedTests(unittest.TestCase):
self.assertTrue(1080, parsed['events'][3]['PT.Clip.Finish_Frames'])
if __name__ == '__main__':
unittest.main()