From 18eda82ebd1be8fdaaeb78a2e70bdbd4139ffe87 Mon Sep 17 00:00:00 2001 From: Jamie Hardt Date: Mon, 6 Nov 2023 15:56:15 -0800 Subject: [PATCH] Wave cue implementation, lots of cleanups --- wavinfo/rf64_parser.py | 19 ++-- wavinfo/riff_parser.py | 8 +- wavinfo/wave_cues_reader.py | 209 ++++++++++++++++++++++++++++++++++++ wavinfo/wave_reader.py | 26 +++-- 4 files changed, 244 insertions(+), 18 deletions(-) create mode 100644 wavinfo/wave_cues_reader.py diff --git a/wavinfo/rf64_parser.py b/wavinfo/rf64_parser.py index 8f27a4a..3d93351 100644 --- a/wavinfo/rf64_parser.py +++ b/wavinfo/rf64_parser.py @@ -1,16 +1,18 @@ import struct from collections import namedtuple + from . import riff_parser RF64Context = namedtuple('RF64Context','sample_count bigchunk_table') -def parse_rf64(stream, signature = b'RF64'): - # print("starting parse_rf64") +def parse_rf64(stream, signature = b'RF64') -> RF64Context: start = stream.tell() assert( stream.read(4) == b'WAVE' ) ds64_chunk = riff_parser.parse_chunk(stream) + assert type(ds64_chunk) is riff_parser.ChunkDescriptor, \ + f"Expected ds64 chunk here, found {type(ds64_chunk)}" ds64_field_spec = "= ds64_fields_size ) - # print("Read ds64 chunk: len()",len(ds64_data)) - riff_size, data_size, sample_count, length_lookup_table = struct.unpack( ds64_field_spec , ds64_data[0:ds64_fields_size] ) + riff_size, data_size, sample_count, length_lookup_table = struct.unpack( + ds64_field_spec , ds64_data[0:ds64_fields_size] ) bigchunk_table = {} chunksize64format = "<4sL" - chunksize64size = struct.calcsize(chunksize64format) - # print("Found chunks64s:", length_lookup_table) + # chunksize64size = struct.calcsize(chunksize64format) - for n in range(length_lookup_table): - bigname, bigsize = struct.unpack_from( chunksize64format , ds64_data, offset= ds64_fields_size ) + for _ in range(length_lookup_table): + bigname, bigsize = struct.unpack_from( chunksize64format , ds64_data, + offset= ds64_fields_size ) bigchunk_table[bigname] = bigsize bigchunk_table[b'data'] = data_size bigchunk_table[signature] = riff_size stream.seek(start, 0) - # print("returning from parse_rf64, context: ", RF64Context(sample_count=sample_count, bigchunk_table=bigchunk_table)) return RF64Context( sample_count=sample_count, bigchunk_table=bigchunk_table ) diff --git a/wavinfo/riff_parser.py b/wavinfo/riff_parser.py index 476585f..b10489e 100644 --- a/wavinfo/riff_parser.py +++ b/wavinfo/riff_parser.py @@ -15,7 +15,7 @@ class ListChunkDescriptor(namedtuple('ListChunkDescriptor', 'signature children' class ChunkDescriptor(namedtuple('ChunkDescriptor', 'ident start length rf64_context')): - def read_data(self, from_stream): + def read_data(self, from_stream) -> bytes: from_stream.seek(self.start) return from_stream.read(self.length) @@ -48,14 +48,16 @@ def parse_chunk(stream, rf64_context=None): if rf64_context is None and ident in {b'RF64', b'BW64'}: rf64_context = parse_rf64(stream=stream, signature=ident) - assert rf64_context is not None + assert rf64_context is not None, \ + f"Sentinel data size 0xFFFFFFFF found outside of RF64 context" + data_size = rf64_context.bigchunk_table[ident] displacement = data_size if displacement % 2: displacement += 1 - if ident in {b'RIFF', b'LIST', b'RF64', b'BW64'}: + if ident in {b'RIFF', b'LIST', b'RF64', b'BW64', b'list'}: return parse_list_chunk(stream=stream, length=data_size, rf64_context=rf64_context) diff --git a/wavinfo/wave_cues_reader.py b/wavinfo/wave_cues_reader.py new file mode 100644 index 0000000..e546897 --- /dev/null +++ b/wavinfo/wave_cues_reader.py @@ -0,0 +1,209 @@ +""" +Cues metadata + +For reference on implementation of cues and related metadata see: +August 1991, "Multimedia Programming Interface and Data Specifications 1.0", +IBM Corporation and Microsoft Corporation + +https://www.aelius.com/njh/wavemetatools/doc/riffmci.pdf +""" +from dataclasses import dataclass +import encodings +from .riff_parser import ChunkDescriptor + +from struct import unpack, unpack_from, calcsize +from typing import Optional, NamedTuple, List + +#: Country Codes used in the RIFF standard to resolve locale. These codes +#: appear in CSET and LTXT metadata. +CountryCodes = """000 None Indicated +001,USA +002,Canada +003,Latin America +030,Greece +031,Netherlands +032,Belgium +033,France +034,Spain +039,Italy +041,Switzerland +043,Austria +044,United Kingdom +045,Denmark +046,Sweden +047,Norway +049,West Germany +052,Mexico +055,Brazil +061,Australia +064,New Zealand +081,Japan +082,Korea +086,People’s Republic of China +088,Taiwan +090,Turkey +351,Portugal +352,Luxembourg +354,Iceland +358,Finland""" + +#: Language and Dialect codes used in the RIFF standard to resolve native +#: language of text fields. These codes appear in CSET and LTXT metadata. +LanguageDialectCodes = """0 0 None Indicated +1,1,Arabic +2,1,Bulgarian +3,1,Catalan +4,1,Traditional Chinese +4,2,Simplified Chinese +5,1,Czech +6,1,Danish +7,1,German +7,2,Swiss German +8,1,Greek +9,1,US English +9,2,UK English +10,1,Spanish +10,2,Spanish Mexican +11,1,Finnish +12,1,French +12,2,Belgian French +12,3,Canadian French +12,4,Swiss French +13,1,Hebrew +14,1,Hungarian +15,1,Icelandic +16,1,Italian +16,2,Swiss Italian +17,1,Japanese +18,1,Korean +19,1,Dutch +19,2,Belgian Dutch +20,1,Norwegian - Bokmal +20,2,Norwegian - Nynorsk +21,1,Polish +22,1,Brazilian Portuguese +22,2,Portuguese +23,1,Rhaeto-Romanic +24,1,Romanian +25,1,Russian +26,1,Serbo-Croatian (Latin) +26,2,Serbo-Croatian (Cyrillic) +27,1,Slovak +28,1,Albanian +29,1,Swedish +30,1,Thai +31,1,Turkish +32,1,Urdu +33,1,Bahasa""" + + +class CueEntry(NamedTuple): + name: int + position: int + chunk_id: bytes + chunk_start: int + block_start: int + sample_offset: int + + Format = " int: + return calcsize(cls.Format) + + @classmethod + def read(cls, data: bytes) -> 'CueEntry': + assert len(data) == calcsize(cls.Format), \ + "cue data size incorrect, expected {calcsize(cls.Format)} found {len(cues_data)}" + + parsed = unpack(cls.Format, data) + + return cls(name=parsed[0], position=parsed[1], chunk_id=parsed[2], + chunk_start=parsed[3], block_start=parsed[4], + sample_offset=parsed[5]) + + +class LabelEntry(NamedTuple): + name: int + text: str + + @classmethod + def read(cls, data: bytes, encoding: str): + return cls(name=unpack(" 'WavCuesReader': + + cue_list = [] + if cues is not None: + cues_data = cues.read_data(f) + assert len(cues_data) >= 4, "cue metadata too short" + cues_count = unpack(" Optional[bytes]: top_chunks = (chunk for chunk in self.main_list \ if type(chunk) is ChunkDescriptor and chunk.ident == ident) @@ -113,6 +115,13 @@ class WavInfoReader: return chunk_descriptor.read_data(from_stream) \ if chunk_descriptor else None + def _find_list_chunk(self, signature) -> Optional[ListChunkDescriptor]: + top_chunks = (chunk for chunk in self.main_list \ + if type(chunk) is ListChunkDescriptor and \ + chunk.signature == signature) + + return next(top_chunks, None) + def _describe_data(self): data_chunk = next(c for c in self.main_list \ if type(c) is ChunkDescriptor and c.ident == b'data') @@ -179,10 +188,15 @@ class WavInfoReader: return WavIXMLFormat(ixml_data.rstrip(b'\0')) if ixml_data else None def _get_cue(self, f): - cue = self._find_chunk_data(b'cue ', f, default_none=True) - labl = self._find_chunk_data(b'labl', f, default_none=True) - ltxt = self._find_chunk_data(b'ltxt', f, default_none=True) - assert False, "cue metadata implementation in progress" + cue = next((cue_chunk for cue_chunk in self.main_list if cue_chunk.ident == b'cue '), None) + adtl = self._find_list_chunk(b'adtl') + labls = [] + ltxts = [] + if adtl is not None: + labls = [child.read_data(f) for child in adtl.children if child.ident == b'labl'] + ltxts = [child.read_data(f) for child in adtl.children if child.ident == b'ltxt'] + + return WavCuesReader.merge(f, cue, labls, ltxts) def walk(self) -> Generator[str,str,Any]: #FIXME: this should probably be named "iter()" """