From ebdc73198c2ffaf998a9c17f139f793c44b1dc56 Mon Sep 17 00:00:00 2001 From: Jamie Hardt Date: Wed, 17 Dec 2025 19:38:53 -0800 Subject: [PATCH] implementation of tolerant parsing --- pycmx/edit_list.py | 3 +- pycmx/event.py | 16 +++++---- pycmx/parse_cmx_events.py | 6 ++-- pycmx/parse_cmx_statements.py | 65 +++++++++++++++++++++++++++-------- tests/test_issue_19.py | 9 ++++- 5 files changed, 74 insertions(+), 25 deletions(-) diff --git a/pycmx/edit_list.py b/pycmx/edit_list.py index 2c4dacb..27778ba 100644 --- a/pycmx/edit_list.py +++ b/pycmx/edit_list.py @@ -78,17 +78,16 @@ class EditList: @property def events(self) -> Generator[Event, None, None]: 'A generator for all the events in the edit list' - # breakpoint() current_event_num = None event_statements = [] for stmt in self.event_statements: - breakpoint() if type(stmt) is StmtEvent: if current_event_num is None: current_event_num = stmt.event event_statements.append(stmt) else: if current_event_num != stmt.event: + # breakpoint() yield Event(statements=event_statements) event_statements = [stmt] current_event_num = stmt.event diff --git a/pycmx/event.py b/pycmx/event.py index 35082b8..6aee310 100644 --- a/pycmx/event.py +++ b/pycmx/event.py @@ -70,7 +70,6 @@ class Event: the_zip.append(trans_names) except IndexError: the_zip.append([None] * len(edits_audio)) - return [Edit(edit_statement=e1[0], audio_ext_statement=e1[1], clip_name_statement=n1, @@ -105,11 +104,16 @@ class Event: def _statements_with_audio_ext(self) -> Generator[ Tuple[StmtEvent, Optional[StmtAudioExt]], None, None]: - for (s1, s2) in zip(self.statements, self.statements[1:]): - if type(s1) is StmtEvent and type(s2) is StmtAudioExt: - yield (s1, s2) - elif type(s1) is StmtEvent: - yield (s1, None) + + if len(self.statements) == 1 and type(self.statements[0]) is StmtEvent: + yield (self.statements[0], None) + + else: + for (s1, s2) in zip(self.statements, self.statements[1:]): + if type(s1) is StmtEvent and type(s2) is StmtAudioExt: + yield (s1, s2) + elif type(s1) is StmtEvent: + yield (s1, None) def _asc_sop_statement(self) -> Optional[StmtCdlSop]: return next((s for s in self.statements if type(s) is StmtCdlSop), diff --git a/pycmx/parse_cmx_events.py b/pycmx/parse_cmx_events.py index 65f5630..87a5b03 100644 --- a/pycmx/parse_cmx_events.py +++ b/pycmx/parse_cmx_events.py @@ -7,12 +7,14 @@ from .parse_cmx_statements import (parse_cmx3600_statements) from .edit_list import EditList -def parse_cmx3600(f: TextIO) -> EditList: +def parse_cmx3600(f: TextIO, tolerant: bool = False) -> EditList: """ Parse a CMX 3600 EDL. :param TextIO f: a file-like object, an opened CMX 3600 .EDL file. + :param bool tolerant: If `True`, a relaxed event line recognition method + will be used in the case the stricter default method fails. :returns: An :class:`pycmx.edit_list.EditList`. """ - statements = parse_cmx3600_statements(f) + statements = parse_cmx3600_statements(f, tolerant) return EditList(statements) diff --git a/pycmx/parse_cmx_statements.py b/pycmx/parse_cmx_statements.py index 55a9cc5..dacceaa 100644 --- a/pycmx/parse_cmx_statements.py +++ b/pycmx/parse_cmx_statements.py @@ -13,12 +13,12 @@ from .statements import (StmtCdlSat, StmtCdlSop, StmtCorruptRemark, StmtFrmc, from .util import collimate -def parse_cmx3600_statements(file: TextIO) -> List[object]: +def parse_cmx3600_statements(file: TextIO, tolerant: bool = False) -> List[object]: """ Return a list of every statement in the file argument. """ lines = file.readlines() - return [_parse_cmx3600_line(line.strip(), line_number) + return [_parse_cmx3600_line(line.strip(), line_number, tolerant) for (line_number, line) in enumerate(lines)] @@ -38,7 +38,7 @@ def _edl_column_widths(event_field_length, source_field_length) -> List[int]: # 8,8,1,4,2,1,4,13,3,1,1] -def _parse_cmx3600_line(line: str, line_number: int) -> object: +def _parse_cmx3600_line(line: str, line_number: int, tolerant: bool = False) -> object: """ Parses a single CMX EDL line. @@ -54,14 +54,21 @@ def _parse_cmx3600_line(line: str, line_number: int) -> object: return _parse_fcm(line, line_number) if line_matcher is not None: event_field_len = len(line_matcher.group(1)) - source_in_match = re.search(r'\d\d\:\d\d\:\d\d\:\d\d', line) - if not source_in_match: - return _parse_unrecognized(line, line_number) - - source_field_len = source_in_match.start() - (event_field_len + 18) - # breakpoint() - return _parse_columns_for_standard_form(line, event_field_len, - source_field_len, line_number) + + source_field_len = len(line) - (event_field_len + 65) + + try: + return _parse_columns_for_standard_form(line, event_field_len, + source_field_len, + line_number) + + except EventFormError: + # breakpoint() + if tolerant: + return _parse_columns_tolerant(line, line_number) + else: + return StmtUnrecognized(line, line_number) + if line.startswith("AUD"): return _parse_extended_audio_channels(line, line_number) if line.startswith("*"): @@ -191,6 +198,9 @@ def _parse_split(line: str, line_number): # return StmtMotionMemory(source="", fps="") # +class EventFormError(RuntimeError): + pass + def _parse_unrecognized(line, line_number): return StmtUnrecognized(content=line, line_number=line_number) @@ -202,14 +212,20 @@ def _parse_columns_for_standard_form(line: str, event_field_length: int, col_widths = _edl_column_widths(event_field_length, source_field_length) if sum(col_widths) > len(line): - return StmtUnrecognized(content=line, line_number=line_number) + raise EventFormError() column_strings = collimate(line, col_widths) + channels = column_strings[4].strip() + trans = column_strings[6].strip() + + if len(channels) == 0 or len(trans) == 0: + raise EventFormError() + return StmtEvent(event=column_strings[0], source=column_strings[2].strip(), - channels=column_strings[4].strip(), - trans=column_strings[6].strip(), + channels=channels, + trans=trans, trans_op=column_strings[8].strip(), source_in=column_strings[10].strip(), source_out=column_strings[12].strip(), @@ -219,6 +235,27 @@ def _parse_columns_for_standard_form(line: str, event_field_length: int, source_field_size=source_field_length) +def _parse_columns_tolerant(line: str, line_number: int): + # breakpoint() + pattern = re.compile(r'^\s*(\d+)\s+(.{8,128}?)\s+' + r'(V|A|A2|AA|NONE|AA/V|A2/V|B)\s+' + r'(C|D|W|KB|K|KO)\s+(\d*)\s+(\d\d.\d\d.\d\d.\d\d)\s' + r'(\d\d.\d\d.\d\d.\d\d)\s(\d\d.\d\d.\d\d.\d\d)\s' + r'(\d\d.\d\d.\d\d.\d\d)' + ) + + match = pattern.match(line) + if match: + return StmtEvent(event=int(match.group(1)), source=match.group(2), + channels=match.group(3), trans=match.group(4), + trans_op=match.group(5), source_in=match.group(6), + source_out=match.group(7), record_in=match.group(8), + record_out=match.group(9), line_number=line_number, + source_field_size=len(match.group(2))) + else: + return StmtUnrecognized(line, line_number) + + def _parse_source_umid_statement(line, line_number): # trimmed = line[3:].strip() # return StmtSourceUMID(name=None, umid=None, line_number=line_number) diff --git a/tests/test_issue_19.py b/tests/test_issue_19.py index 7eca0e8..efef8e4 100644 --- a/tests/test_issue_19.py +++ b/tests/test_issue_19.py @@ -8,11 +8,18 @@ class Issue19Test(TestCase): def test_parse(self): - edl = parse_cmx3600(self.f) + edl = parse_cmx3600(self.f, tolerant=True) for event in edl.events: self.assertIsNotNone(event.edits) if event.number == 1: self.assertEqual(len(event.edits), 1) + self.assertEqual(event.edits[0].source, "Z125C001_220217_ROLX") + self.assertEqual(event.edits[0].channels.v, True) + self.assertEqual(event.edits[0].transition.kind, "C") + self.assertEqual(event.edits[0].transition.operand, "") + self.assertEqual(event.edits[0].source_in, "15:51:58:10") + self.assertEqual(event.edits[0].record_out, "00:00:04:06") + break def tearDown(self):