TagMapping implementation

This commit is contained in:
Jamie Hardt
2021-06-01 14:02:40 -07:00
parent c6be2ba404
commit 32e3cfc594
3 changed files with 92 additions and 42 deletions

View File

@@ -5,6 +5,7 @@
<w>fmpxml</w> <w>fmpxml</w>
<w>futura</w> <w>futura</w>
<w>ptulsconv</w> <w>ptulsconv</w>
<w>retval</w>
<w>timecode</w> <w>timecode</w>
<w>timespan</w> <w>timespan</w>
</words> </words>

View File

@@ -22,7 +22,7 @@ class SessionDescriptor:
self.tracks = kwargs['tracks'] self.tracks = kwargs['tracks']
self.markers = kwargs['markers'] self.markers = kwargs['markers']
def markers_timed(self): def markers_timed(self) -> Generator[tuple['MarkerDescriptor', Fraction]]:
for marker in self.markers: for marker in self.markers:
marker_time = self.header.convert_timecode(marker.location) marker_time = self.header.convert_timecode(marker.location)
yield marker, marker_time yield marker, marker_time
@@ -32,18 +32,18 @@ class SessionDescriptor:
for clip in track.clips: for clip in track.clips:
yield track_idx, track, clip yield track_idx, track, clip
def track_clips_timed(self) -> Generator[Tuple[int, "TrackDescriptor", "TrackClipDescriptor", def track_clips_timed(self) -> Generator[Tuple["TrackDescriptor", "TrackClipDescriptor",
Fraction, Fraction, Fraction]]: Fraction, Fraction, Fraction]]:
""" """
:return: A Generator that yields track, clip, start time, finish time, and timestamp :return: A Generator that yields track, clip, start time, finish time, and timestamp
""" """
for track_idx, track, clip in self.tracks_clips(): for track, clip in self.tracks_clips():
start_time = self.header.convert_timecode(clip.start_timecode) start_time = self.header.convert_timecode(clip.start_timecode)
finish_time = self.header.convert_timecode(clip.finish_timecode) finish_time = self.header.convert_timecode(clip.finish_timecode)
timestamp_time = self.header.convert_timecode(clip.timestamp) \ timestamp_time = self.header.convert_timecode(clip.timestamp) \
if clip.timestamp is not None else None if clip.timestamp is not None else None
yield track_idx, track, clip, start_time, finish_time, timestamp_time yield track, clip, start_time, finish_time, timestamp_time
class HeaderDescriptor: class HeaderDescriptor:

View File

@@ -1,74 +1,124 @@
from enum import Enum from enum import Enum
from typing import Optional, Callable, Any, List from typing import Optional, Callable, Any, List, Generator, Tuple
from .doc_entity import SessionDescriptor, TrackClipDescriptor from .doc_entity import SessionDescriptor
from .tagged_string_parser_visitor import parse_tags, TagPreModes from .tagged_string_parser_visitor import parse_tags, TagPreModes
from . import apply_appends from . import apply_appends
from fractions import Fraction from fractions import Fraction
from collections import namedtuple
class TagCompiler: class TagCompiler:
session: SessionDescriptor session: SessionDescriptor
def compile_events(self) -> Generator[Tuple[str, str, str, dict, Fraction, Fraction]]:
def timespan_tags(self, at: Fraction, track_index: int): yield from self.apply_tags(
retval = dict() self.collect_time_spans(
for this_track_idx, _, clip, start, finish, _ in self.session.track_clips_timed(): self.apply_appends(
if this_track_idx > track_index: self.parse_data()
break )
)
clip_parsed = parse_tags(clip) )
if clip_parsed.mode == TagPreModes.TIMESPAN and start <= at < finish:
retval.update(clip_parsed.tag_dict)
return retval
def marker_tags(self, at): def marker_tags(self, at):
retval = dict() retval = dict()
for marker, time in [(m, t) for (m, t) in self.session.markers_timed() if t >= at ]:
retval.update(parse_tags(marker.comments).tag_dict)
retval.update(parse_tags(marker.name).tag_dict)
return retval return retval
def combined_clips(self): @staticmethod
def coalesce_tags(clip_tags: dict, track_tags: dict,
def should_append(_, rhs): track_comment_tags: dict,
parsed = parse_tags(rhs[0][2].clip_name) timespan_tags: dict,
return parsed.mode == TagPreModes.APPEND
def do_append(lhs: List, rhs: List):
return lhs + rhs
source = ([x] for x in self.session.track_clips_timed())
yield from apply_appends(source, should_append, do_append)
def coalesce_tags(self, clip_tags: dict, track_tags: dict, timespan_tags: dict,
marker_tags: dict, session_tags: dict): marker_tags: dict, session_tags: dict):
effective_tags = session_tags effective_tags = session_tags
effective_tags.update(marker_tags) effective_tags.update(marker_tags)
effective_tags.update(timespan_tags) effective_tags.update(timespan_tags)
effective_tags.update(track_comment_tags)
effective_tags.update(track_tags) effective_tags.update(track_tags)
effective_tags.update(clip_tags) effective_tags.update(clip_tags)
return effective_tags return effective_tags
def compiled_clips(self): Intermediate = namedtuple('Intermediate', 'track_content track_tags track_comment_tags '
session_parsed = parse_tags(self.session.header.session_name) 'clip_content clip_tags clip_tag_mode start finish')
for track_idx, track, clip, start, finish, _ in self.session.track_clips_timed():
clip_parsed = parse_tags(clip.clip_name) def parse_data(self) -> Generator[Intermediate]:
for track, clip, start, finish in self.session.track_clips_timed():
if clip.state == 'Muted':
continue
track_parsed = parse_tags(track.name) track_parsed = parse_tags(track.name)
track_comments_parsed = parse_tags(track.comments)
clip_parsed = parse_tags(clip.clip_name)
timespan_tags = self.timespan_tags(start, track_idx) yield TagCompiler.Intermediate(track_content=track_parsed.content,
marker_tags = self.marker_tags(start) track_tags=track_parsed.tag_dict,
track_comment_tags=track_comments_parsed.tag_dict,
clip_content=clip_parsed.content, clip_tags=clip_parsed.tag_dict,
clip_tag_mode=clip_parsed.mode,
start=start, finish=finish)
tags = self.coalesce_tags(clip_parsed.tag_dict, track_parsed.tag_dict, @staticmethod
timespan_tags, marker_tags, session_parsed.tag_dict) def apply_appends(parsed: Generator[Intermediate]) -> Generator[Intermediate]:
yield track, clip, tags, start, finish def should_append(a, b):
return b.clip_tag_mode == TagPreModes.APPEND and b.start >= a.finish
def do_append(a, b):
merged_tags = a.clip_tags
merged_tags.update(b.clip_tags)
return TagCompiler.Intermediate(track_content=a.track_content,
track_tags=a.track_tags,
track_comment_tags=a.track_comment_tags,
clip_content=a.clip_content + ' ' + b.clip_content,
clip_tags=merged_tags, clip_tag_mode=a.clip_tag_mode,
start=a.start, finish=b.finish)
yield from apply_appends(parsed, should_append, do_append)
@staticmethod
def collect_time_spans(parsed: Generator[Intermediate]) -> \
Generator[Tuple[Intermediate, List[dict, Fraction, Fraction]]]:
time_spans = list()
for item in parsed:
if item.clip_tag_mode == TagPreModes.TIMESPAN:
time_spans.append((item.clip_tags, item.start, item.finish))
else:
yield item, list(time_spans)
@staticmethod
def time_span_tags(at_time: Fraction, applicable_spans) -> dict:
retval = dict()
for tags in [a[0] for a in applicable_spans if a.start <= at_time <= a.finish]:
retval.update(tags)
return retval
def apply_tags(self, parsed_with_time_spans) -> Generator[Tuple[str, str, str, dict, Fraction, Fraction]]:
session_parsed = parse_tags(self.session.header.session_name)
for event, time_spans in parsed_with_time_spans:
event: 'TagCompiler.Intermediate'
marker_tags = self.marker_tags(event.start)
time_span_tags = self.time_span_tags(event.start, time_spans)
tags = self.coalesce_tags(clip_tags=event.clip_tags,
track_tags=event.track_tags,
track_comment_tags=event.track_comment_tags,
timespan_tags=time_span_tags,
marker_tags=marker_tags,
session_tags=session_parsed.tag_dict)
yield event.clip_content, event.track_content, session_parsed.content, tags, event.start, event.finish
class TagMapping: class TagMapping:
class ContentSource(Enum): class ContentSource(Enum):
Session = 1, Session = 1,
Track = 2, Track = 2,
@@ -124,4 +174,3 @@ class TagMapping:
return True return True
else: else:
return False return False