diff --git a/.idea/dictionaries/jamie.xml b/.idea/dictionaries/jamie.xml
index 448d4b5..c97c337 100644
--- a/.idea/dictionaries/jamie.xml
+++ b/.idea/dictionaries/jamie.xml
@@ -5,6 +5,7 @@
fmpxml
futura
ptulsconv
+ retval
timecode
timespan
diff --git a/ptulsconv/docparser/doc_entity.py b/ptulsconv/docparser/doc_entity.py
index 71ebb58..fd63a99 100644
--- a/ptulsconv/docparser/doc_entity.py
+++ b/ptulsconv/docparser/doc_entity.py
@@ -22,7 +22,7 @@ class SessionDescriptor:
self.tracks = kwargs['tracks']
self.markers = kwargs['markers']
- def markers_timed(self):
+ def markers_timed(self) -> Generator[tuple['MarkerDescriptor', Fraction]]:
for marker in self.markers:
marker_time = self.header.convert_timecode(marker.location)
yield marker, marker_time
@@ -32,18 +32,18 @@ class SessionDescriptor:
for clip in track.clips:
yield track_idx, track, clip
- def track_clips_timed(self) -> Generator[Tuple[int, "TrackDescriptor", "TrackClipDescriptor",
+ def track_clips_timed(self) -> Generator[Tuple["TrackDescriptor", "TrackClipDescriptor",
Fraction, Fraction, Fraction]]:
"""
:return: A Generator that yields track, clip, start time, finish time, and timestamp
"""
- for track_idx, track, clip in self.tracks_clips():
+ for track, clip in self.tracks_clips():
start_time = self.header.convert_timecode(clip.start_timecode)
finish_time = self.header.convert_timecode(clip.finish_timecode)
timestamp_time = self.header.convert_timecode(clip.timestamp) \
if clip.timestamp is not None else None
- yield track_idx, track, clip, start_time, finish_time, timestamp_time
+ yield track, clip, start_time, finish_time, timestamp_time
class HeaderDescriptor:
diff --git a/ptulsconv/docparser/tag_mapping.py b/ptulsconv/docparser/tag_mapping.py
index 160dc77..e31cb37 100644
--- a/ptulsconv/docparser/tag_mapping.py
+++ b/ptulsconv/docparser/tag_mapping.py
@@ -1,74 +1,124 @@
from enum import Enum
-from typing import Optional, Callable, Any, List
-from .doc_entity import SessionDescriptor, TrackClipDescriptor
+from typing import Optional, Callable, Any, List, Generator, Tuple
+from .doc_entity import SessionDescriptor
from .tagged_string_parser_visitor import parse_tags, TagPreModes
from . import apply_appends
from fractions import Fraction
+from collections import namedtuple
class TagCompiler:
session: SessionDescriptor
-
- def timespan_tags(self, at: Fraction, track_index: int):
- retval = dict()
- for this_track_idx, _, clip, start, finish, _ in self.session.track_clips_timed():
- if this_track_idx > track_index:
- break
-
- clip_parsed = parse_tags(clip)
- if clip_parsed.mode == TagPreModes.TIMESPAN and start <= at < finish:
- retval.update(clip_parsed.tag_dict)
-
- return retval
+ def compile_events(self) -> Generator[Tuple[str, str, str, dict, Fraction, Fraction]]:
+ yield from self.apply_tags(
+ self.collect_time_spans(
+ self.apply_appends(
+ self.parse_data()
+ )
+ )
+ )
def marker_tags(self, at):
retval = dict()
+ for marker, time in [(m, t) for (m, t) in self.session.markers_timed() if t >= at ]:
+ retval.update(parse_tags(marker.comments).tag_dict)
+ retval.update(parse_tags(marker.name).tag_dict)
+
return retval
- def combined_clips(self):
-
- def should_append(_, rhs):
- parsed = parse_tags(rhs[0][2].clip_name)
- return parsed.mode == TagPreModes.APPEND
-
- def do_append(lhs: List, rhs: List):
- return lhs + rhs
-
- source = ([x] for x in self.session.track_clips_timed())
- yield from apply_appends(source, should_append, do_append)
-
- def coalesce_tags(self, clip_tags: dict, track_tags: dict, timespan_tags: dict,
+ @staticmethod
+ def coalesce_tags(clip_tags: dict, track_tags: dict,
+ track_comment_tags: dict,
+ timespan_tags: dict,
marker_tags: dict, session_tags: dict):
effective_tags = session_tags
effective_tags.update(marker_tags)
effective_tags.update(timespan_tags)
+ effective_tags.update(track_comment_tags)
effective_tags.update(track_tags)
effective_tags.update(clip_tags)
return effective_tags
- def compiled_clips(self):
- session_parsed = parse_tags(self.session.header.session_name)
- for track_idx, track, clip, start, finish, _ in self.session.track_clips_timed():
- clip_parsed = parse_tags(clip.clip_name)
+ Intermediate = namedtuple('Intermediate', 'track_content track_tags track_comment_tags '
+ 'clip_content clip_tags clip_tag_mode start finish')
+
+ def parse_data(self) -> Generator[Intermediate]:
+
+ for track, clip, start, finish in self.session.track_clips_timed():
+ if clip.state == 'Muted':
+ continue
+
track_parsed = parse_tags(track.name)
+ track_comments_parsed = parse_tags(track.comments)
+ clip_parsed = parse_tags(clip.clip_name)
- timespan_tags = self.timespan_tags(start, track_idx)
- marker_tags = self.marker_tags(start)
+ yield TagCompiler.Intermediate(track_content=track_parsed.content,
+ track_tags=track_parsed.tag_dict,
+ track_comment_tags=track_comments_parsed.tag_dict,
+ clip_content=clip_parsed.content, clip_tags=clip_parsed.tag_dict,
+ clip_tag_mode=clip_parsed.mode,
+ start=start, finish=finish)
- tags = self.coalesce_tags(clip_parsed.tag_dict, track_parsed.tag_dict,
- timespan_tags, marker_tags, session_parsed.tag_dict)
+ @staticmethod
+ def apply_appends(parsed: Generator[Intermediate]) -> Generator[Intermediate]:
- yield track, clip, tags, start, finish
+ def should_append(a, b):
+ return b.clip_tag_mode == TagPreModes.APPEND and b.start >= a.finish
+ def do_append(a, b):
+ merged_tags = a.clip_tags
+ merged_tags.update(b.clip_tags)
+ return TagCompiler.Intermediate(track_content=a.track_content,
+ track_tags=a.track_tags,
+ track_comment_tags=a.track_comment_tags,
+ clip_content=a.clip_content + ' ' + b.clip_content,
+ clip_tags=merged_tags, clip_tag_mode=a.clip_tag_mode,
+ start=a.start, finish=b.finish)
+ yield from apply_appends(parsed, should_append, do_append)
+ @staticmethod
+ def collect_time_spans(parsed: Generator[Intermediate]) -> \
+ Generator[Tuple[Intermediate, List[dict, Fraction, Fraction]]]:
+
+ time_spans = list()
+
+ for item in parsed:
+ if item.clip_tag_mode == TagPreModes.TIMESPAN:
+ time_spans.append((item.clip_tags, item.start, item.finish))
+ else:
+ yield item, list(time_spans)
+
+ @staticmethod
+ def time_span_tags(at_time: Fraction, applicable_spans) -> dict:
+ retval = dict()
+ for tags in [a[0] for a in applicable_spans if a.start <= at_time <= a.finish]:
+ retval.update(tags)
+
+ return retval
+
+ def apply_tags(self, parsed_with_time_spans) -> Generator[Tuple[str, str, str, dict, Fraction, Fraction]]:
+
+ session_parsed = parse_tags(self.session.header.session_name)
+
+ for event, time_spans in parsed_with_time_spans:
+ event: 'TagCompiler.Intermediate'
+ marker_tags = self.marker_tags(event.start)
+ time_span_tags = self.time_span_tags(event.start, time_spans)
+ tags = self.coalesce_tags(clip_tags=event.clip_tags,
+ track_tags=event.track_tags,
+ track_comment_tags=event.track_comment_tags,
+ timespan_tags=time_span_tags,
+ marker_tags=marker_tags,
+ session_tags=session_parsed.tag_dict)
+
+ yield event.clip_content, event.track_content, session_parsed.content, tags, event.start, event.finish
class TagMapping:
-
class ContentSource(Enum):
Session = 1,
Track = 2,
@@ -124,4 +174,3 @@ class TagMapping:
return True
else:
return False
-