flake8 fixes

This commit is contained in:
Jamie Hardt
2023-07-21 13:38:24 -07:00
parent f825b92586
commit 2021159666
5 changed files with 151 additions and 103 deletions

View File

@@ -1,6 +1,6 @@
"""
This module defines classes and methods for converting :class:`Event` objects into
:class:`ADRLine` objects.
This module defines classes and methods for converting :class:`Event` objects
into :class:`ADRLine` objects.
"""
from ptulsconv.docparser.tag_compiler import Event
@@ -11,15 +11,16 @@ from fractions import Fraction
from ptulsconv.docparser.tag_mapping import TagMapping
def make_entities(from_events: List[Event]) -> Tuple[List['GenericEvent'], List['ADRLine']]:
def make_entities(from_events: List[Event]) -> Tuple[List['GenericEvent'],
List['ADRLine']]:
"""
Accepts a list of Events and converts them into either ADRLine events or
GenricEvents by calling :func:`make_entity` on each member.
:param from_events: A list of `Event` objects.
:returns: A tuple of two lists, the first containing :class:`GenericEvent` and the
second containing :class:`ADRLine`.
:returns: A tuple of two lists, the first containing :class:`GenericEvent`
and the second containing :class:`ADRLine`.
"""
generic_events = list()
adr_lines = list()
@@ -67,14 +68,15 @@ class GenericEvent:
scene: Optional[str] = None
version: Optional[str] = None
reel: Optional[str] = None
start: Fraction = Fraction(0,1)
finish: Fraction = Fraction(0,1)
start: Fraction = Fraction(0, 1)
finish: Fraction = Fraction(0, 1)
omitted: bool = False
note: Optional[str] = None
requested_by: Optional[str] = None
tag_mapping = [
TagMapping(source='Title', target="title", alt=TagMapping.ContentSource.Session),
TagMapping(source='Title', target="title",
alt=TagMapping.ContentSource.Session),
TagMapping(source="Supv", target="supervisor"),
TagMapping(source="Client", target="client"),
TagMapping(source="Sc", target="scene"),
@@ -111,9 +113,11 @@ class ADRLine(GenericEvent):
TagMapping(source="P", target="priority"),
TagMapping(source="QN", target="cue_number"),
TagMapping(source="CN", target="character_id"),
TagMapping(source="Char", target="character_name", alt=TagMapping.ContentSource.Track),
TagMapping(source="Char", target="character_name",
alt=TagMapping.ContentSource.Track),
TagMapping(source="Actor", target="actor_name"),
TagMapping(source="Line", target="prompt", alt=TagMapping.ContentSource.Clip),
TagMapping(source="Line", target="prompt",
alt=TagMapping.ContentSource.Clip),
TagMapping(source="R", target="reason"),
TagMapping(source="Mins", target="time_budget_mins",
formatter=(lambda n: float(n))),
@@ -131,5 +135,3 @@ class ADRLine(GenericEvent):
TagMapping(source="OPT", target="optional",
formatter=(lambda x: len(x) > 0))
]

View File

@@ -21,19 +21,24 @@ class SessionDescriptor:
def markers_timed(self) -> Iterator[Tuple['MarkerDescriptor', Fraction]]:
for marker in self.markers:
marker_time = Fraction(marker.time_reference, int(self.header.sample_rate))
#marker_time = self.header.convert_timecode(marker.location)
marker_time = Fraction(marker.time_reference,
int(self.header.sample_rate))
# marker_time = self.header.convert_timecode(marker.location)
yield marker, marker_time
def tracks_clips(self) -> Iterator[Tuple['TrackDescriptor', 'TrackClipDescriptor']]:
def tracks_clips(self) -> Iterator[Tuple['TrackDescriptor',
'TrackClipDescriptor']]:
for track in self.tracks:
for clip in track.clips:
yield track, clip
def track_clips_timed(self) -> Iterator[Tuple["TrackDescriptor", "TrackClipDescriptor",
Fraction, Fraction, Fraction]]:
def track_clips_timed(self) -> Iterator[Tuple["TrackDescriptor",
"TrackClipDescriptor",
Fraction, Fraction, Fraction]
]:
"""
:return: A Generator that yields track, clip, start time, finish time, and timestamp
:return: A Generator that yields track, clip, start time, finish time,
and timestamp
"""
for track, clip in self.tracks_clips():
start_time = self.header.convert_timecode(clip.start_timecode)
@@ -105,7 +110,8 @@ class HeaderDescriptor:
if self.timecode_fps in frame_rates.keys():
return frame_rates[self.timecode_fps]
else:
raise ValueError("Unrecognized TC rate (%s)" % self.timecode_format)
raise ValueError("Unrecognized TC rate (%s)" %
self.timecode_format)
class TrackDescriptor:

View File

@@ -1,13 +1,15 @@
from parsimonious.nodes import NodeVisitor
from parsimonious.grammar import Grammar
from .doc_entity import SessionDescriptor, HeaderDescriptor, TrackDescriptor, FileDescriptor, \
TrackClipDescriptor, ClipDescriptor, PluginDescriptor, MarkerDescriptor
from .doc_entity import SessionDescriptor, HeaderDescriptor, TrackDescriptor,\
FileDescriptor, TrackClipDescriptor, ClipDescriptor, PluginDescriptor,\
MarkerDescriptor
protools_text_export_grammar = Grammar(
r"""
document = header files_section? clips_section? plugin_listing? track_listing? markers_listing?
document = header files_section? clips_section? plugin_listing?
track_listing? markers_listing?
header = "SESSION NAME:" fs string_value rs
"SAMPLE RATE:" fs float_value rs
"BIT DEPTH:" fs integer_value "-bit" rs
@@ -17,21 +19,29 @@ protools_text_export_grammar = Grammar(
"# OF AUDIO CLIPS:" fs integer_value rs
"# OF AUDIO FILES:" fs integer_value rs block_ending
frame_rate = ("60" / "59.94" / "30" / "29.97" / "25" / "24" / "23.976")
files_section = files_header files_column_header file_record* block_ending
frame_rate = ("60" / "59.94" / "30" / "29.97" / "25" / "24" /
"23.976")
files_section = files_header files_column_header file_record*
block_ending
files_header = "F I L E S I N S E S S I O N" rs
files_column_header = "Filename" isp fs "Location" rs
file_record = string_value fs string_value rs
clips_section = clips_header clips_column_header clip_record* block_ending
clips_section = clips_header clips_column_header clip_record*
block_ending
clips_header = "O N L I N E C L I P S I N S E S S I O N" rs
clips_column_header = string_value fs string_value rs
clip_record = string_value fs string_value (fs "[" integer_value "]")? rs
clip_record = string_value fs string_value
(fs "[" integer_value "]")? rs
plugin_listing = plugin_header plugin_column_header plugin_record* block_ending
plugin_listing = plugin_header plugin_column_header plugin_record*
block_ending
plugin_header = "P L U G - I N S L I S T I N G" rs
plugin_column_header = "MANUFACTURER " fs "PLUG-IN NAME " fs
"VERSION " fs "FORMAT " fs "STEMS " fs
plugin_column_header = "MANUFACTURER " fs
"PLUG-IN NAME " fs
"VERSION " fs
"FORMAT " fs
"STEMS " fs
"NUMBER OF INSTANCES" rs
plugin_record = string_value fs string_value fs string_value fs
string_value fs string_value fs string_value rs
@@ -45,8 +55,10 @@ protools_text_export_grammar = Grammar(
"USER DELAY:" fs integer_value " Samples" rs
"STATE: " track_state_list rs
("PLUG-INS: " ( fs string_value )* rs)?
"CHANNEL " fs "EVENT " fs "CLIP NAME " fs
"START TIME " fs "END TIME " fs "DURATION " fs
"CHANNEL " fs "EVENT " fs
"CLIP NAME " fs
"START TIME " fs "END TIME " fs
"DURATION " fs
("TIMESTAMP " fs)? "STATE" rs
track_state_list = (track_state " ")*
@@ -56,15 +68,20 @@ protools_text_export_grammar = Grammar(
track_clip_entry = integer_value isp fs
integer_value isp fs
string_value fs
string_value fs string_value fs string_value fs (string_value fs)?
string_value fs string_value fs string_value fs
(string_value fs)?
track_clip_state rs
track_clip_state = ("Muted" / "Unmuted")
markers_listing = markers_listing_header markers_column_header marker_record*
markers_listing = markers_listing_header markers_column_header
marker_record*
markers_listing_header = "M A R K E R S L I S T I N G" rs
markers_column_header = "# " fs "LOCATION " fs "TIME REFERENCE " fs
"UNITS " fs "NAME " fs "COMMENTS" rs
markers_column_header = "# " fs "LOCATION " fs
"TIME REFERENCE " fs
"UNITS " fs
"NAME " fs
"COMMENTS" rs
marker_record = integer_value isp fs string_value fs integer_value isp fs
string_value fs string_value fs string_value rs
@@ -125,23 +142,28 @@ class DocParserVisitor(NodeVisitor):
@staticmethod
def visit_files_section(_, visited_children):
return list(map(lambda child: FileDescriptor(filename=child[0], path=child[2]), visited_children[2]))
return list(map(
lambda child: FileDescriptor(filename=child[0], path=child[2]),
visited_children[2]))
@staticmethod
def visit_clips_section(_, visited_children):
channel = next(iter(visited_children[2][3]), 1)
return list(map(lambda child: ClipDescriptor(clip_name=child[0], file=child[2], channel=channel),
visited_children[2]))
return list(map(
lambda child: ClipDescriptor(clip_name=child[0], file=child[2],
channel=channel),
visited_children[2]))
@staticmethod
def visit_plugin_listing(_, visited_children):
return list(map(lambda child: PluginDescriptor(manufacturer=child[0],
plugin_name=child[2],
version=child[4],
format=child[6],
stems=child[8],
count_instances=child[10]),
return list(map(lambda child:
PluginDescriptor(manufacturer=child[0],
plugin_name=child[2],
version=child[4],
format=child[6],
stems=child[8],
count_instances=child[10]),
visited_children[2]))
@staticmethod

View File

@@ -24,20 +24,25 @@ class TagCompiler:
items.
"""
Intermediate = namedtuple('Intermediate', 'track_content track_tags track_comment_tags '
'clip_content clip_tags clip_tag_mode start finish')
Intermediate = namedtuple('Intermediate',
'track_content track_tags track_comment_tags '
'clip_content clip_tags clip_tag_mode start '
'finish')
session: doc_entity.SessionDescriptor
def compile_all_time_spans(self) -> List[Tuple[str, str, Fraction, Fraction]]:
def compile_all_time_spans(self) -> List[Tuple[str, str, Fraction,
Fraction]]:
"""
:returns: A `List` of (key: str, value: str, start: Fraction, finish: Fraction)
:returns: A `List` of (key: str, value: str, start: Fraction,
finish: Fraction)
"""
ret_list = list()
for element in self.parse_data():
if element.clip_tag_mode == TagPreModes.TIMESPAN:
for k in element.clip_tags.keys():
ret_list.append((k, element.clip_tags[k], element.start, element.finish))
ret_list.append((k, element.clip_tags[k], element.start,
element.finish))
return ret_list
@@ -73,26 +78,31 @@ class TagCompiler:
step3 = self.collect_time_spans(step2)
step4 = self.apply_tags(step3)
for datum in step4:
yield Event(clip_name=datum[0], track_name=datum[1], session_name=datum[2],
tags=datum[3], start=datum[4], finish=datum[5])
yield Event(clip_name=datum[0], track_name=datum[1],
session_name=datum[2], tags=datum[3], start=datum[4],
finish=datum[5])
def _marker_tags(self, at):
retval = dict()
applicable = [(m, t) for (m, t) in self.session.markers_timed() if t <= at]
applicable = [(m, t) for (m, t) in
self.session.markers_timed() if t <= at]
for marker, _ in sorted(applicable, key=lambda x: x[1]):
retval.update(parse_tags(marker.comments or "").tag_dict)
retval.update(parse_tags(marker.name or "").tag_dict)
return retval
def filter_out_directives(self, clips : Iterator[Intermediate]) -> Iterator[Intermediate]:
def filter_out_directives(self,
clips: Iterator[Intermediate]) \
-> Iterator[Intermediate]:
for clip in clips:
if clip.clip_tag_mode == 'Directive':
continue
else:
yield clip
@staticmethod
def _coalesce_tags(clip_tags: dict, track_tags: dict,
track_comment_tags: dict,
@@ -117,29 +127,33 @@ class TagCompiler:
track_comments_parsed = parse_tags(track.comments)
clip_parsed = parse_tags(clip.clip_name)
yield TagCompiler.Intermediate(track_content=track_parsed.content,
track_tags=track_parsed.tag_dict,
track_comment_tags=track_comments_parsed.tag_dict,
clip_content=clip_parsed.content,
clip_tags=clip_parsed.tag_dict,
clip_tag_mode=clip_parsed.mode,
start=start, finish=finish)
yield TagCompiler.Intermediate(
track_content=track_parsed.content,
track_tags=track_parsed.tag_dict,
track_comment_tags=track_comments_parsed.tag_dict,
clip_content=clip_parsed.content,
clip_tags=clip_parsed.tag_dict,
clip_tag_mode=clip_parsed.mode,
start=start, finish=finish)
@staticmethod
def apply_appends(parsed: Iterator[Intermediate]) -> Iterator[Intermediate]:
def apply_appends(parsed: Iterator[Intermediate]) -> \
Iterator[Intermediate]:
def should_append(a, b):
return b.clip_tag_mode == TagPreModes.APPEND and b.start >= a.finish
return b.clip_tag_mode == TagPreModes.APPEND and \
b.start >= a.finish
def do_append(a, b):
merged_tags = dict(a.clip_tags)
merged_tags.update(b.clip_tags)
return TagCompiler.Intermediate(track_content=a.track_content,
track_tags=a.track_tags,
track_comment_tags=a.track_comment_tags,
clip_content=a.clip_content + ' ' + b.clip_content,
clip_tags=merged_tags, clip_tag_mode=a.clip_tag_mode,
start=a.start, finish=b.finish)
return TagCompiler.Intermediate(
track_content=a.track_content,
track_tags=a.track_tags,
track_comment_tags=a.track_comment_tags,
clip_content=a.clip_content + ' ' + b.clip_content,
clip_tags=merged_tags, clip_tag_mode=a.clip_tag_mode,
start=a.start, finish=b.finish)
yield from apply_appends(parsed, should_append, do_append)
@@ -158,12 +172,14 @@ class TagCompiler:
@staticmethod
def _time_span_tags(at_time: Fraction, applicable_spans) -> dict:
retval = dict()
for tags in reversed([a[0] for a in applicable_spans if a[1] <= at_time <= a[2]]):
for tags in reversed([a[0] for a in applicable_spans
if a[1] <= at_time <= a[2]]):
retval.update(tags)
return retval
def apply_tags(self, parsed_with_time_spans) -> Iterator[Tuple[str, str, str, dict, Fraction, Fraction]]:
def apply_tags(self, parsed_with_time_spans) ->\
Iterator[Tuple[str, str, str, dict, Fraction, Fraction]]:
session_parsed = parse_tags(self.session.header.session_name)
@@ -171,14 +187,16 @@ class TagCompiler:
event: 'TagCompiler.Intermediate'
marker_tags = self._marker_tags(event.start)
time_span_tags = self._time_span_tags(event.start, time_spans)
tags = self._coalesce_tags(clip_tags=event.clip_tags,
track_tags=event.track_tags,
track_comment_tags=event.track_comment_tags,
timespan_tags=time_span_tags,
marker_tags=marker_tags,
session_tags=session_parsed.tag_dict)
tags = self._coalesce_tags(
clip_tags=event.clip_tags,
track_tags=event.track_tags,
track_comment_tags=event.track_comment_tags,
timespan_tags=time_span_tags,
marker_tags=marker_tags,
session_tags=session_parsed.tag_dict)
yield event.clip_content, event.track_content, session_parsed.content, tags, event.start, event.finish
yield (event.clip_content, event.track_content,
session_parsed.content, tags, event.start, event.finish)
def apply_appends(source: Iterator,