Wave cue implementation, lots of cleanups

This commit is contained in:
Jamie Hardt
2023-11-06 15:56:15 -08:00
parent c8add89bc2
commit 18eda82ebd
4 changed files with 244 additions and 18 deletions

View File

@@ -1,16 +1,18 @@
import struct
from collections import namedtuple
from . import riff_parser
RF64Context = namedtuple('RF64Context','sample_count bigchunk_table')
def parse_rf64(stream, signature = b'RF64'):
# print("starting parse_rf64")
def parse_rf64(stream, signature = b'RF64') -> RF64Context:
start = stream.tell()
assert( stream.read(4) == b'WAVE' )
ds64_chunk = riff_parser.parse_chunk(stream)
assert type(ds64_chunk) is riff_parser.ChunkDescriptor, \
f"Expected ds64 chunk here, found {type(ds64_chunk)}"
ds64_field_spec = "<QQQI"
ds64_fields_size = struct.calcsize(ds64_field_spec)
@@ -19,22 +21,21 @@ def parse_rf64(stream, signature = b'RF64'):
ds64_data = ds64_chunk.read_data(stream)
assert(len(ds64_data) >= ds64_fields_size )
# print("Read ds64 chunk: len()",len(ds64_data))
riff_size, data_size, sample_count, length_lookup_table = struct.unpack( ds64_field_spec , ds64_data[0:ds64_fields_size] )
riff_size, data_size, sample_count, length_lookup_table = struct.unpack(
ds64_field_spec , ds64_data[0:ds64_fields_size] )
bigchunk_table = {}
chunksize64format = "<4sL"
chunksize64size = struct.calcsize(chunksize64format)
# print("Found chunks64s:", length_lookup_table)
# chunksize64size = struct.calcsize(chunksize64format)
for n in range(length_lookup_table):
bigname, bigsize = struct.unpack_from( chunksize64format , ds64_data, offset= ds64_fields_size )
for _ in range(length_lookup_table):
bigname, bigsize = struct.unpack_from( chunksize64format , ds64_data,
offset= ds64_fields_size )
bigchunk_table[bigname] = bigsize
bigchunk_table[b'data'] = data_size
bigchunk_table[signature] = riff_size
stream.seek(start, 0)
# print("returning from parse_rf64, context: ", RF64Context(sample_count=sample_count, bigchunk_table=bigchunk_table))
return RF64Context( sample_count=sample_count, bigchunk_table=bigchunk_table )

View File

@@ -15,7 +15,7 @@ class ListChunkDescriptor(namedtuple('ListChunkDescriptor', 'signature children'
class ChunkDescriptor(namedtuple('ChunkDescriptor', 'ident start length rf64_context')):
def read_data(self, from_stream):
def read_data(self, from_stream) -> bytes:
from_stream.seek(self.start)
return from_stream.read(self.length)
@@ -48,14 +48,16 @@ def parse_chunk(stream, rf64_context=None):
if rf64_context is None and ident in {b'RF64', b'BW64'}:
rf64_context = parse_rf64(stream=stream, signature=ident)
assert rf64_context is not None
assert rf64_context is not None, \
f"Sentinel data size 0xFFFFFFFF found outside of RF64 context"
data_size = rf64_context.bigchunk_table[ident]
displacement = data_size
if displacement % 2:
displacement += 1
if ident in {b'RIFF', b'LIST', b'RF64', b'BW64'}:
if ident in {b'RIFF', b'LIST', b'RF64', b'BW64', b'list'}:
return parse_list_chunk(stream=stream, length=data_size,
rf64_context=rf64_context)

209
wavinfo/wave_cues_reader.py Normal file
View File

@@ -0,0 +1,209 @@
"""
Cues metadata
For reference on implementation of cues and related metadata see:
August 1991, "Multimedia Programming Interface and Data Specifications 1.0",
IBM Corporation and Microsoft Corporation
https://www.aelius.com/njh/wavemetatools/doc/riffmci.pdf
"""
from dataclasses import dataclass
import encodings
from .riff_parser import ChunkDescriptor
from struct import unpack, unpack_from, calcsize
from typing import Optional, NamedTuple, List
#: Country Codes used in the RIFF standard to resolve locale. These codes
#: appear in CSET and LTXT metadata.
CountryCodes = """000 None Indicated
001,USA
002,Canada
003,Latin America
030,Greece
031,Netherlands
032,Belgium
033,France
034,Spain
039,Italy
041,Switzerland
043,Austria
044,United Kingdom
045,Denmark
046,Sweden
047,Norway
049,West Germany
052,Mexico
055,Brazil
061,Australia
064,New Zealand
081,Japan
082,Korea
086,Peoples Republic of China
088,Taiwan
090,Turkey
351,Portugal
352,Luxembourg
354,Iceland
358,Finland"""
#: Language and Dialect codes used in the RIFF standard to resolve native
#: language of text fields. These codes appear in CSET and LTXT metadata.
LanguageDialectCodes = """0 0 None Indicated
1,1,Arabic
2,1,Bulgarian
3,1,Catalan
4,1,Traditional Chinese
4,2,Simplified Chinese
5,1,Czech
6,1,Danish
7,1,German
7,2,Swiss German
8,1,Greek
9,1,US English
9,2,UK English
10,1,Spanish
10,2,Spanish Mexican
11,1,Finnish
12,1,French
12,2,Belgian French
12,3,Canadian French
12,4,Swiss French
13,1,Hebrew
14,1,Hungarian
15,1,Icelandic
16,1,Italian
16,2,Swiss Italian
17,1,Japanese
18,1,Korean
19,1,Dutch
19,2,Belgian Dutch
20,1,Norwegian - Bokmal
20,2,Norwegian - Nynorsk
21,1,Polish
22,1,Brazilian Portuguese
22,2,Portuguese
23,1,Rhaeto-Romanic
24,1,Romanian
25,1,Russian
26,1,Serbo-Croatian (Latin)
26,2,Serbo-Croatian (Cyrillic)
27,1,Slovak
28,1,Albanian
29,1,Swedish
30,1,Thai
31,1,Turkish
32,1,Urdu
33,1,Bahasa"""
class CueEntry(NamedTuple):
name: int
position: int
chunk_id: bytes
chunk_start: int
block_start: int
sample_offset: int
Format = "<II4sIII"
@classmethod
def format_size(cls) -> int:
return calcsize(cls.Format)
@classmethod
def read(cls, data: bytes) -> 'CueEntry':
assert len(data) == calcsize(cls.Format), \
"cue data size incorrect, expected {calcsize(cls.Format)} found {len(cues_data)}"
parsed = unpack(cls.Format, data)
return cls(name=parsed[0], position=parsed[1], chunk_id=parsed[2],
chunk_start=parsed[3], block_start=parsed[4],
sample_offset=parsed[5])
class LabelEntry(NamedTuple):
name: int
text: str
@classmethod
def read(cls, data: bytes, encoding: str):
return cls(name=unpack("<I", data[0:4])[0],
text=data[4:].decode(encoding))
NoteEntry = LabelEntry
class RangeLabel(NamedTuple):
name: int
length: int
purpose: str
country: int
language: int
dialect: int
codepage: int
text: str
@classmethod
def read(cls, data: bytes, fallback_encoding: str):
leader_struct_fmt = "<II4sHHHH"
parsed = unpack(leader_struct_fmt, data[0:calcsize(leader_struct_fmt)])
text_data = data[calcsize(leader_struct_fmt):]
if data[6] != 0:
fallback_encoding = f"cp{data[6]}"
return cls(name=parsed[0], length=parsed[1], purpose=parsed[2],
country=parsed[3], language=parsed[4],
dialect=parsed[5], codepage=parsed[6],
text=text_data.decode(fallback_encoding))
@dataclass
class WavCuesReader:
cues: List[CueEntry]
labels: List[LabelEntry]
ranges: List[RangeLabel]
@classmethod
def merge(cls, f,
cues: Optional[ChunkDescriptor],
labls: List[ChunkDescriptor],
ltxts: List[ChunkDescriptor],
fallback_encoding: str) -> 'WavCuesReader':
cue_list = []
if cues is not None:
cues_data = cues.read_data(f)
assert len(cues_data) >= 4, "cue metadata too short"
cues_count = unpack("<I", cues_data)
offset = calcsize("<I")
for _ in cues_count:
cue_bytes = cues_data[offset: CueEntry.format_size() ]
cue_list.append(CueEntry.read(cue_bytes))
label_list = []
for labl in labls:
label_list.append(
LabelEntry.read(labl.read_data(f),
encoding=fallback_encoding)
)
range_list = []
for r in ltxts:
range_list.append(
RangeLabel.read(r.read_data(f),
fallback_encoding=fallback_encoding)
)
return WavCuesReader(cues=cue_list, labels=label_list,
ranges=range_list)

View File

@@ -13,6 +13,7 @@ from .wave_bext_reader import WavBextReader
from .wave_info_reader import WavInfoChunkReader
from .wave_adm_reader import WavADMReader
from .wave_dbmd_reader import WavDolbyMetadataReader
from .wave_cues_reader import WavCuesReader
#: Calculated statistics about the audio data.
WavDataDescriptor = namedtuple('WavDataDescriptor', 'byte_count frame_count')
@@ -90,6 +91,7 @@ class WavInfoReader:
def get_wav_info(self, wavfile):
chunks = parse_chunk(wavfile)
assert type(chunks) is ListChunkDescriptor
self.main_list = chunks.children
wavfile.seek(0)
@@ -100,10 +102,10 @@ class WavInfoReader:
self.adm = self._get_adm(wavfile)
self.info = self._get_info(wavfile, encoding=self.info_encoding)
self.dolby = self._get_dbmd(wavfile)
self.cue = self._get_cue(wavfile)
# self.cue = self._get_cue(wavfile)
self.data = self._describe_data()
def _find_chunk_data(self, ident, from_stream, default_none=False):
def _find_chunk_data(self, ident, from_stream, default_none=False) -> Optional[bytes]:
top_chunks = (chunk for chunk in self.main_list \
if type(chunk) is ChunkDescriptor and chunk.ident == ident)
@@ -113,6 +115,13 @@ class WavInfoReader:
return chunk_descriptor.read_data(from_stream) \
if chunk_descriptor else None
def _find_list_chunk(self, signature) -> Optional[ListChunkDescriptor]:
top_chunks = (chunk for chunk in self.main_list \
if type(chunk) is ListChunkDescriptor and \
chunk.signature == signature)
return next(top_chunks, None)
def _describe_data(self):
data_chunk = next(c for c in self.main_list \
if type(c) is ChunkDescriptor and c.ident == b'data')
@@ -179,10 +188,15 @@ class WavInfoReader:
return WavIXMLFormat(ixml_data.rstrip(b'\0')) if ixml_data else None
def _get_cue(self, f):
cue = self._find_chunk_data(b'cue ', f, default_none=True)
labl = self._find_chunk_data(b'labl', f, default_none=True)
ltxt = self._find_chunk_data(b'ltxt', f, default_none=True)
assert False, "cue metadata implementation in progress"
cue = next((cue_chunk for cue_chunk in self.main_list if cue_chunk.ident == b'cue '), None)
adtl = self._find_list_chunk(b'adtl')
labls = []
ltxts = []
if adtl is not None:
labls = [child.read_data(f) for child in adtl.children if child.ident == b'labl']
ltxts = [child.read_data(f) for child in adtl.children if child.ident == b'ltxt']
return WavCuesReader.merge(f, cue, labls, ltxts)
def walk(self) -> Generator[str,str,Any]: #FIXME: this should probably be named "iter()"
"""