implementation of tolerant parsing

This commit is contained in:
2025-12-17 19:38:53 -08:00
parent cf1b3fb42c
commit ebdc73198c
5 changed files with 74 additions and 25 deletions

View File

@@ -78,17 +78,16 @@ class EditList:
@property
def events(self) -> Generator[Event, None, None]:
'A generator for all the events in the edit list'
# breakpoint()
current_event_num = None
event_statements = []
for stmt in self.event_statements:
breakpoint()
if type(stmt) is StmtEvent:
if current_event_num is None:
current_event_num = stmt.event
event_statements.append(stmt)
else:
if current_event_num != stmt.event:
# breakpoint()
yield Event(statements=event_statements)
event_statements = [stmt]
current_event_num = stmt.event

View File

@@ -70,7 +70,6 @@ class Event:
the_zip.append(trans_names)
except IndexError:
the_zip.append([None] * len(edits_audio))
return [Edit(edit_statement=e1[0],
audio_ext_statement=e1[1],
clip_name_statement=n1,
@@ -105,6 +104,11 @@ class Event:
def _statements_with_audio_ext(self) -> Generator[
Tuple[StmtEvent, Optional[StmtAudioExt]], None, None]:
if len(self.statements) == 1 and type(self.statements[0]) is StmtEvent:
yield (self.statements[0], None)
else:
for (s1, s2) in zip(self.statements, self.statements[1:]):
if type(s1) is StmtEvent and type(s2) is StmtAudioExt:
yield (s1, s2)

View File

@@ -7,12 +7,14 @@ from .parse_cmx_statements import (parse_cmx3600_statements)
from .edit_list import EditList
def parse_cmx3600(f: TextIO) -> EditList:
def parse_cmx3600(f: TextIO, tolerant: bool = False) -> EditList:
"""
Parse a CMX 3600 EDL.
:param TextIO f: a file-like object, an opened CMX 3600 .EDL file.
:param bool tolerant: If `True`, a relaxed event line recognition method
will be used in the case the stricter default method fails.
:returns: An :class:`pycmx.edit_list.EditList`.
"""
statements = parse_cmx3600_statements(f)
statements = parse_cmx3600_statements(f, tolerant)
return EditList(statements)

View File

@@ -13,12 +13,12 @@ from .statements import (StmtCdlSat, StmtCdlSop, StmtCorruptRemark, StmtFrmc,
from .util import collimate
def parse_cmx3600_statements(file: TextIO) -> List[object]:
def parse_cmx3600_statements(file: TextIO, tolerant: bool = False) -> List[object]:
"""
Return a list of every statement in the file argument.
"""
lines = file.readlines()
return [_parse_cmx3600_line(line.strip(), line_number)
return [_parse_cmx3600_line(line.strip(), line_number, tolerant)
for (line_number, line) in enumerate(lines)]
@@ -38,7 +38,7 @@ def _edl_column_widths(event_field_length, source_field_length) -> List[int]:
# 8,8,1,4,2,1,4,13,3,1,1]
def _parse_cmx3600_line(line: str, line_number: int) -> object:
def _parse_cmx3600_line(line: str, line_number: int, tolerant: bool = False) -> object:
"""
Parses a single CMX EDL line.
@@ -54,14 +54,21 @@ def _parse_cmx3600_line(line: str, line_number: int) -> object:
return _parse_fcm(line, line_number)
if line_matcher is not None:
event_field_len = len(line_matcher.group(1))
source_in_match = re.search(r'\d\d\:\d\d\:\d\d\:\d\d', line)
if not source_in_match:
return _parse_unrecognized(line, line_number)
source_field_len = source_in_match.start() - (event_field_len + 18)
# breakpoint()
source_field_len = len(line) - (event_field_len + 65)
try:
return _parse_columns_for_standard_form(line, event_field_len,
source_field_len, line_number)
source_field_len,
line_number)
except EventFormError:
# breakpoint()
if tolerant:
return _parse_columns_tolerant(line, line_number)
else:
return StmtUnrecognized(line, line_number)
if line.startswith("AUD"):
return _parse_extended_audio_channels(line, line_number)
if line.startswith("*"):
@@ -191,6 +198,9 @@ def _parse_split(line: str, line_number):
# return StmtMotionMemory(source="", fps="")
#
class EventFormError(RuntimeError):
pass
def _parse_unrecognized(line, line_number):
return StmtUnrecognized(content=line, line_number=line_number)
@@ -202,14 +212,20 @@ def _parse_columns_for_standard_form(line: str, event_field_length: int,
col_widths = _edl_column_widths(event_field_length, source_field_length)
if sum(col_widths) > len(line):
return StmtUnrecognized(content=line, line_number=line_number)
raise EventFormError()
column_strings = collimate(line, col_widths)
channels = column_strings[4].strip()
trans = column_strings[6].strip()
if len(channels) == 0 or len(trans) == 0:
raise EventFormError()
return StmtEvent(event=column_strings[0],
source=column_strings[2].strip(),
channels=column_strings[4].strip(),
trans=column_strings[6].strip(),
channels=channels,
trans=trans,
trans_op=column_strings[8].strip(),
source_in=column_strings[10].strip(),
source_out=column_strings[12].strip(),
@@ -219,6 +235,27 @@ def _parse_columns_for_standard_form(line: str, event_field_length: int,
source_field_size=source_field_length)
def _parse_columns_tolerant(line: str, line_number: int):
# breakpoint()
pattern = re.compile(r'^\s*(\d+)\s+(.{8,128}?)\s+'
r'(V|A|A2|AA|NONE|AA/V|A2/V|B)\s+'
r'(C|D|W|KB|K|KO)\s+(\d*)\s+(\d\d.\d\d.\d\d.\d\d)\s'
r'(\d\d.\d\d.\d\d.\d\d)\s(\d\d.\d\d.\d\d.\d\d)\s'
r'(\d\d.\d\d.\d\d.\d\d)'
)
match = pattern.match(line)
if match:
return StmtEvent(event=int(match.group(1)), source=match.group(2),
channels=match.group(3), trans=match.group(4),
trans_op=match.group(5), source_in=match.group(6),
source_out=match.group(7), record_in=match.group(8),
record_out=match.group(9), line_number=line_number,
source_field_size=len(match.group(2)))
else:
return StmtUnrecognized(line, line_number)
def _parse_source_umid_statement(line, line_number):
# trimmed = line[3:].strip()
# return StmtSourceUMID(name=None, umid=None, line_number=line_number)

View File

@@ -8,11 +8,18 @@ class Issue19Test(TestCase):
def test_parse(self):
edl = parse_cmx3600(self.f)
edl = parse_cmx3600(self.f, tolerant=True)
for event in edl.events:
self.assertIsNotNone(event.edits)
if event.number == 1:
self.assertEqual(len(event.edits), 1)
self.assertEqual(event.edits[0].source, "Z125C001_220217_ROLX")
self.assertEqual(event.edits[0].channels.v, True)
self.assertEqual(event.edits[0].transition.kind, "C")
self.assertEqual(event.edits[0].transition.operand, "")
self.assertEqual(event.edits[0].source_in, "15:51:58:10")
self.assertEqual(event.edits[0].record_out, "00:00:04:06")
break
def tearDown(self):