diff --git a/ptulsconv/docparser/tagged_string_parser_visitor.py b/ptulsconv/docparser/tagged_string_parser_visitor.py new file mode 100644 index 0000000..fb95f50 --- /dev/null +++ b/ptulsconv/docparser/tagged_string_parser_visitor.py @@ -0,0 +1,92 @@ +from parsimonious import NodeVisitor, Grammar +from typing import Dict, Optional + + +tag_grammar = Grammar( + r""" + document = modifier? line? word_sep? tag_list? + line = word (word_sep word)* + tag_list = tag* + tag = key_tag / short_tag / full_text_tag / tag_junk + key_tag = "[" key "]" word_sep? + short_tag = "$" key "=" word word_sep? + full_text_tag = "{" key "=" value "}" word_sep? + key = ~"[A-Za-z][A-Za-z0-9_]*" + value = ~"[^}]+" + tag_junk = word word_sep? + word = ~"[^ \[\{\$][^ ]*" + word_sep = ~" +" + modifier = ("@" / "&" / "!") word_sep? + """ +) + + +def parse_tags(prompt) -> "TaggedStringResult": + ast = tag_grammar.parse(prompt) + return TagListVisitor().visit(ast) + + +class TaggedStringResult: + content: Optional[str] + tag_dict: Optional[Dict[str, str]] + mode: str + + def __init__(self, content, tag_dict, mode): + self.content = content + self.tag_dict = tag_dict + self.mode = mode + + +class TagListVisitor(NodeVisitor): + + @staticmethod + def visit_document(_, visited_children) -> TaggedStringResult: + modifier_opt, line_opt, _, tag_list_opt = visited_children + + return TaggedStringResult(content=next(iter(line_opt), None), + tag_dict=next(iter(tag_list_opt), None), + mode=next(iter(modifier_opt), 'Normal') + ) + + @staticmethod + def visit_line(node, _): + return str.strip(node.text, " ") + + @staticmethod + def visit_modifier(node, _): + if node.text.startswith('@'): + return 'Timespan' + elif node.text.startswith('&'): + return 'Append' + elif node.text.startswith('!'): + return 'Movie' + else: + return 'Normal' + + @staticmethod + def visit_tag_list(_, visited_children): + retdict = dict() + for child in visited_children: + if child[0] is not None: + k, v = child[0] + retdict[k] = v + return retdict + + @staticmethod + def visit_key_tag(_, children): + return children[1].text, children[1].text + + @staticmethod + def visit_short_tag(_, children): + return children[1].text, children[3].text + + @staticmethod + def visit_full_text_tag(_, children): + return children[1].text, children[3].text + + @staticmethod + def visit_tag_junk(_node, _visited_children): + return None + + def generic_visit(self, node, visited_children) -> object: + return visited_children or node diff --git a/ptulsconv/transformations.py b/ptulsconv/transformations.py index 77268e6..b3098ce 100644 --- a/ptulsconv/transformations.py +++ b/ptulsconv/transformations.py @@ -1,8 +1,10 @@ from . import broadcast_timecode -from parsimonious import Grammar, NodeVisitor +from .docparser.tagged_string_parser_visitor import TaggedStringResult, tag_grammar from parsimonious.exceptions import IncompleteParseError import math import sys + +from .docparser.tagged_string_parser_visitor import TagListVisitor from .reporting import print_advisory_tagging_error, print_section_header_style, print_status_style from tqdm import tqdm @@ -70,71 +72,9 @@ class TimecodeInterpreter(Transformation): class TagInterpreter(Transformation): - tag_grammar = Grammar( - r""" - document = modifier? line? word_sep? tag_list? - line = word (word_sep word)* - tag_list = tag* - tag = key_tag / short_tag / full_text_tag / tag_junk - key_tag = "[" key "]" word_sep? - short_tag = "$" key "=" word word_sep? - full_text_tag = "{" key "=" value "}" word_sep? - key = ~"[A-Za-z][A-Za-z0-9_]*" - value = ~"[^}]+" - tag_junk = word word_sep? - word = ~"[^ \[\{\$][^ ]*" - word_sep = ~" +" - modifier = ("@" / "&" / "!") word_sep? - """ - ) - - class TagListVisitor(NodeVisitor): - def visit_document(self, _, visited_children): - modifier_opt, line_opt, _, tag_list_opt = visited_children - - return dict(line=next(iter(line_opt), None), - tags=next(iter(tag_list_opt), None), - mode=next(iter(modifier_opt), 'Normal') - ) - - def visit_line(self, node, _): - return str.strip(node.text, " ") - - def visit_modifier(self, node, _): - if node.text.startswith('@'): - return 'Timespan' - elif node.text.startswith('&'): - return 'Append' - elif node.text.startswith('!'): - return 'Movie' - else: - return 'Normal' - - def visit_tag_list(self, _, visited_children): - retdict = dict() - for child in visited_children: - if child[0] is not None: - k, v = child[0] - retdict[k] = v - return retdict - - def visit_key_tag(self, _, children): - return children[1].text, children[1].text - - def visit_short_tag(self, _, children): - return children[1].text, children[3].text - - def visit_full_text_tag(self, _, children): - return children[1].text, children[3].text - - def visit_tag_junk(self, node, _): - return None - - def generic_visit(self, node, visited_children): - return visited_children or node def __init__(self, ignore_muted=True, show_progress=False, log_output=sys.stderr): - self.visitor = TagInterpreter.TagListVisitor() + self.visitor = TagListVisitor() self.ignore_muted = ignore_muted self.show_progress = show_progress self.log_output = log_output @@ -142,7 +82,7 @@ class TagInterpreter(Transformation): self.transformed = list() self.timespan_rules = list() self.movie_rules = list() - self.title_tags = {} + self.title_tags = None self.markers = list() def transform(self, input_dict: dict) -> dict: @@ -170,8 +110,8 @@ class TagInterpreter(Transformation): parent_track_name=track['name']) comment_tags = self.parse_tags(track['comments'], parent_track_name=track['name']) - track_context_tags = track_tags['tags'] - track_context_tags.update(comment_tags['tags']) + track_context_tags = track_tags.tag_dict + track_context_tags.update(comment_tags.tag_dict) for clip in track['clips']: if clip['state'] == 'Muted' and self.ignore_muted: @@ -181,32 +121,32 @@ class TagInterpreter(Transformation): parent_track_name=track['name'], clip_time=clip['start_time']) - if clip_tags['mode'] == 'Normal': + if clip_tags.mode == 'Normal': event = self.decorate_event(clip, clip_tags, input_dict['header'], track_context_tags, track_tags) self.transformed.append(event) - elif clip_tags['mode'] == 'Append': + elif clip_tags.mode == 'Append': assert len(self.transformed) > 0, "First clip is in '&'-Append mode, fatal error." - self.transformed[-1].update(clip_tags['tags']) - self.transformed[-1]['PT.Clip.Name'] = self.transformed[-1]['PT.Clip.Name'] + " " + clip_tags[ - 'line'] + self.transformed[-1].update(clip_tags.tag_dict) + self.transformed[-1]['PT.Clip.Name'] = self.transformed[-1]['PT.Clip.Name'] + " " \ + + clip_tags.content self.transformed[-1]['PT.Clip.Finish_Frames'] = clip['end_time_decoded']['frame_count'] self.transformed[-1]['PT.Clip.Finish'] = clip['end_time'] self.transformed[-1]['PT.Clip.Finish_Seconds'] = \ clip['end_time_decoded']['frame_count'] / input_dict['header']['timecode_format'] - elif clip_tags['mode'] == 'Timespan': + elif clip_tags.mode == 'Timespan': rule = {'start_time_literal': clip['start_time'], 'start_time': clip['start_time_decoded']['frame_count'], 'start_time_seconds': clip['start_time_decoded']['frame_count'] / input_dict['header'][ 'timecode_format'], 'end_time': clip['end_time_decoded']['frame_count'], - 'tags': clip_tags['tags']} + 'tags': clip_tags.tag_dict} self.timespan_rules.append(rule) - elif clip_tags['mode'] == 'Movie': - rule = dict(movie_path=clip_tags['tags']['Movie'], + elif clip_tags.mode == 'Movie': + rule = dict(movie_path=clip_tags.tag_dict['Movie'], start_time=clip['start_time_decoded']['frame_count'], end_time=clip['end_time_decoded']['frame_count']) self.movie_rules.append(rule) @@ -217,19 +157,19 @@ class TagInterpreter(Transformation): def decorate_event(self, clip, clip_tags, header_dict, track_context_tags, track_tags): event = dict() start_frame = clip['start_time_decoded']['frame_count'] - event.update(self.title_tags['tags']) + event.update(self.title_tags.tag_dict) event.update(track_context_tags) event.update(self.effective_timespan_tags_at_time(start_frame)) event.update(self.effective_marker_tags_at_time(start_frame)) event.update(self.effective_movie_at_time(start_frame, header_dict['timecode_format'])) - event.update(clip_tags['tags']) - event['PT.Track.Name'] = track_tags['line'] - event['PT.Session.Name'] = self.title_tags['line'] + event.update(clip_tags.tag_dict) + event['PT.Track.Name'] = track_tags.content + event['PT.Session.Name'] = self.title_tags.content event['PT.Session.TimecodeFormat'] = header_dict['timecode_format'] event['PT.Session.Start'] = header_dict['start_timecode'] event['PT.Session.DropFrame'] = header_dict['timecode_drop_frame'] event['PT.Clip.Number'] = clip['event'] - event['PT.Clip.Name'] = clip_tags['line'] + event['PT.Clip.Name'] = clip_tags.content event['PT.Clip.Start'] = clip['start_time'] event['PT.Clip.Finish'] = clip['end_time'] event['PT.Clip.Start_Frames'] = start_frame @@ -245,7 +185,7 @@ class TagInterpreter(Transformation): if rule['start_time'] <= time <= rule['end_time']: retval['Movie.Filename'] = rule['movie_path'] retval['Movie.Start_Offset_Frames'] = time - rule['start_time'] - retval['Movie.Start_Offset_Seconds'] = (time - rule['start_time'] ) / timecode_format + retval['Movie.Start_Offset_Seconds'] = (time - rule['start_time']) / timecode_format break return retval @@ -273,10 +213,10 @@ class TagInterpreter(Transformation): retval = dict() for marker in self.markers: - marker_name_tags = self.parse_tags(marker['name'], marker_index=marker['number']) - marker_comment_tags = self.parse_tags(marker['comments'], marker_index=marker['number']) - effective_tags = marker_name_tags['tags'] - effective_tags.update(marker_comment_tags['tags']) + marker_name_tags = self.parse_tags(marker['name']) + marker_comment_tags = self.parse_tags(marker['comments']) + effective_tags = marker_name_tags.tag_dict + effective_tags.update(marker_comment_tags.tag_dict) if marker['location_decoded']['frame_count'] <= time: retval.update(effective_tags) @@ -284,9 +224,9 @@ class TagInterpreter(Transformation): break return retval - def parse_tags(self, source, parent_track_name=None, clip_time=None, marker_index=None): + def parse_tags(self, source, parent_track_name=None, clip_time=None) -> TaggedStringResult: try: - parse_tree = self.tag_grammar.parse(source) + parse_tree = tag_grammar.parse(source) return self.visitor.visit(parse_tree) except IncompleteParseError as e: print_advisory_tagging_error(failed_string=source, @@ -294,7 +234,7 @@ class TagInterpreter(Transformation): clip_time=clip_time, position=e.pos) trimmed_source = source[:e.pos] - parse_tree = self.tag_grammar.parse(trimmed_source) + parse_tree = tag_grammar.parse(trimmed_source) return self.visitor.visit(parse_tree) diff --git a/ptulsconv/xml/common.py b/ptulsconv/xml/common.py index 767df53..564b558 100644 --- a/ptulsconv/xml/common.py +++ b/ptulsconv/xml/common.py @@ -1,4 +1,3 @@ -import datetime import os import os.path import pathlib diff --git a/tests/test_tag_interpreter.py b/tests/test_tag_interpreter.py index c676cb6..f4f79c9 100644 --- a/tests/test_tag_interpreter.py +++ b/tests/test_tag_interpreter.py @@ -2,40 +2,41 @@ import unittest from ptulsconv.transformations import TagInterpreter + class TestTagInterpreter(unittest.TestCase): def test_line(self): ti = TagInterpreter() s1 = ti.parse_tags("this is a test") - self.assertEqual(s1['line'], "this is a test") - self.assertEqual(s1['mode'], 'Normal') - self.assertEqual(len(s1['tags']), 0) + self.assertEqual(s1.content, "this is a test") + self.assertEqual(s1.mode, 'Normal') + self.assertEqual(len(s1.tag_dict), 0) s2 = ti.parse_tags("this! IS! Me! ** Typing! 123 <> |||") - self.assertEqual(s2['line'], "this! IS! Me! ** Typing! 123 <> |||") - self.assertEqual(s2['mode'], 'Normal') - self.assertEqual(len(s2['tags']), 0) + self.assertEqual(s2.content, "this! IS! Me! ** Typing! 123 <> |||") + self.assertEqual(s2.mode, 'Normal') + self.assertEqual(len(s2.tag_dict), 0) def test_tags(self): ti = TagInterpreter() s1 = ti.parse_tags("{a=100}") - self.assertIn('tags', s1) - self.assertEqual(s1['tags']['a'], "100") + self.assertEqual(s1.tag_dict['a'], "100") s2 = ti.parse_tags("{b=This is a test} [option] $X=9") - self.assertEqual(s2['tags']['b'], 'This is a test') - self.assertEqual(s2['tags']['option'], 'option') - self.assertEqual(s2['tags']['X'], "9") + self.assertEqual(s2.tag_dict['b'], 'This is a test') + self.assertEqual(s2.tag_dict['option'], 'option') + self.assertEqual(s2.tag_dict['X'], "9") def test_modes(self): ti = TagInterpreter() s1 = ti.parse_tags("@ Monday Tuesday {a=1}") - self.assertEqual(s1['mode'], 'Timespan') + self.assertEqual(s1.mode, 'Timespan') s2 = ti.parse_tags("Monday Tuesday {a=1}") - self.assertEqual(s2['mode'], 'Normal') + self.assertEqual(s2.mode, 'Normal') s3 = ti.parse_tags("&Monday Tuesday {a=1}") - self.assertEqual(s3['mode'], 'Append') + self.assertEqual(s3.mode, 'Append') + if __name__ == '__main__': unittest.main() diff --git a/tests/test_tagging.py b/tests/test_tagging.py index 9fdded1..4a864ad 100644 --- a/tests/test_tagging.py +++ b/tests/test_tagging.py @@ -2,8 +2,8 @@ import unittest import ptulsconv import os.path -class TaggingIntegratedTests(unittest.TestCase): +class TaggingIntegratedTests(unittest.TestCase): path = os.path.dirname(__file__) + '/export_cases/Tag Tests/Tag Tests.txt' def test_event_list(self): @@ -22,14 +22,14 @@ class TaggingIntegratedTests(unittest.TestCase): self.assertEqual(9, len(parsed['events'])) self.assertEqual("Clip Name", parsed['events'][0]['PT.Clip.Name']) - self.assertEqual("Lorem ipsum" , parsed['events'][1]['PT.Clip.Name']) - self.assertEqual("Dolor sic amet the rain in spain" , parsed['events'][2]['PT.Clip.Name']) - self.assertEqual("A B C" , parsed['events'][3]['PT.Clip.Name']) - self.assertEqual("Silver Bridge" , parsed['events'][4]['PT.Clip.Name']) - self.assertEqual("Region 02" , parsed['events'][5]['PT.Clip.Name']) - self.assertEqual("Region 12" , parsed['events'][6]['PT.Clip.Name']) - self.assertEqual("Region 22" , parsed['events'][7]['PT.Clip.Name']) - self.assertEqual("Region 04" , parsed['events'][8]['PT.Clip.Name']) + self.assertEqual("Lorem ipsum", parsed['events'][1]['PT.Clip.Name']) + self.assertEqual("Dolor sic amet the rain in spain", parsed['events'][2]['PT.Clip.Name']) + self.assertEqual("A B C", parsed['events'][3]['PT.Clip.Name']) + self.assertEqual("Silver Bridge", parsed['events'][4]['PT.Clip.Name']) + self.assertEqual("Region 02", parsed['events'][5]['PT.Clip.Name']) + self.assertEqual("Region 12", parsed['events'][6]['PT.Clip.Name']) + self.assertEqual("Region 22", parsed['events'][7]['PT.Clip.Name']) + self.assertEqual("Region 04", parsed['events'][8]['PT.Clip.Name']) def test_append(self): with open(self.path, 'r') as f: @@ -87,6 +87,5 @@ class TaggingIntegratedTests(unittest.TestCase): self.assertTrue(1080, parsed['events'][3]['PT.Clip.Finish_Frames']) - if __name__ == '__main__': unittest.main()