Text twiddles

This commit is contained in:
Jamie Hardt
2023-11-08 19:15:42 -08:00
parent d75e55e870
commit 86a4edc983
4 changed files with 67 additions and 38 deletions

View File

@@ -60,7 +60,8 @@ def main():
json.dump(ret_dict, cls=MyJSONEncoder, fp=sys.stdout, indent=2) json.dump(ret_dict, cls=MyJSONEncoder, fp=sys.stdout, indent=2)
except MissingDataError as e: except MissingDataError as e:
print("MissingDataError: Missing metadata (%s) in file %s" % (e, arg), file=sys.stderr) print("MissingDataError: Missing metadata (%s) in file %s" % \
(e, arg), file=sys.stderr)
continue continue
except Exception as e: except Exception as e:
raise e raise e

View File

@@ -22,7 +22,8 @@ def parse_rf64(stream, signature = b'RF64') -> RF64Context:
assert(len(ds64_data) >= ds64_fields_size) assert(len(ds64_data) >= ds64_fields_size)
riff_size, data_size, sample_count, length_lookup_table = struct.unpack( riff_size, data_size, sample_count, length_lookup_table = struct.unpack(
ds64_field_spec, ds64_data[0:ds64_fields_size]) ds64_field_spec, ds64_data[0:ds64_fields_size]
)
bigchunk_table = {} bigchunk_table = {}
chunksize64format = "<4sL" chunksize64format = "<4sL"

View File

@@ -1,7 +1,8 @@
from optparse import Option
import struct import struct
from collections import namedtuple from .rf64_parser import parse_rf64, RF64Context
from .rf64_parser import parse_rf64 from typing import NamedTuple, Union, List, Optional
class WavInfoEOFError(EOFError): class WavInfoEOFError(EOFError):
@@ -10,11 +11,17 @@ class WavInfoEOFError(EOFError):
self.chunk_start = chunk_start self.chunk_start = chunk_start
class ListChunkDescriptor(namedtuple('ListChunkDescriptor', 'signature children')): class ListChunkDescriptor(NamedTuple):
pass signature: bytes
children: List[Union['ChunkDescriptor', 'ListChunkDescriptor']]
class ChunkDescriptor(namedtuple('ChunkDescriptor', 'ident start length rf64_context')): class ChunkDescriptor(NamedTuple):
ident: bytes
start: int
length: int
rf64_context: Optional[RF64Context]
def read_data(self, from_stream) -> bytes: def read_data(self, from_stream) -> bytes:
from_stream.seek(self.start) from_stream.seek(self.start)
return from_stream.read(self.length) return from_stream.read(self.length)
@@ -64,5 +71,7 @@ def parse_chunk(stream, rf64_context=None):
else: else:
data_start = stream.tell() data_start = stream.tell()
stream.seek(displacement, 1) stream.seek(displacement, 1)
return ChunkDescriptor(ident=ident, start=data_start, length=data_size, return ChunkDescriptor(ident=ident,
start=data_start,
length=data_size,
rf64_context=rf64_context) rf64_context=rf64_context)

View File

@@ -5,7 +5,7 @@ ADM Reader
from struct import unpack, unpack_from, calcsize from struct import unpack, unpack_from, calcsize
from io import BytesIO from io import BytesIO
from collections import namedtuple from collections import namedtuple
from typing import Iterable, Tuple from typing import Optional
from lxml import etree as ET from lxml import etree as ET
@@ -26,24 +26,20 @@ class WavADMReader:
_, uid_count = unpack(header_fmt, chna_data[0:4]) _, uid_count = unpack(header_fmt, chna_data[0:4])
#: A list of :class:`ChannelEntry` objects parsed from the
#: `chna` metadata chunk.
#:
#: .. note::
#: In-file, the `chna` track indexes start at 1. However, this interface
#: numbers the first track 0, in order to maintain consistency with other
#: libraries.
self.channel_uids = [] self.channel_uids = []
offset = calcsize(header_fmt) offset = calcsize(header_fmt)
for _ in range(uid_count): for _ in range(uid_count):
track_index, uid, track_ref, pack_ref = unpack_from(uid_fmt, chna_data, offset) track_index, uid, track_ref, pack_ref = unpack_from(uid_fmt,
chna_data,
offset)
# these values are either ascii or all null # these values are either ascii or all null
self.channel_uids.append(ChannelEntry(track_index - 1, self.channel_uids.append(ChannelEntry(track_index - 1,
uid.decode('ascii') , track_ref.decode('ascii'), pack_ref.decode('ascii'))) uid.decode('ascii') , track_ref.decode('ascii'),
pack_ref.decode('ascii')))
offset += calcsize(uid_fmt) offset += calcsize(uid_fmt)
@@ -53,7 +49,8 @@ class WavADMReader:
def programme(self) -> dict: def programme(self) -> dict:
""" """
Read the ADM `audioProgramme` data structure and some of its reference properties. Read the ADM `audioProgramme` data structure and some of its reference
properties.
""" """
ret_dict = dict() ret_dict = dict()
@@ -68,17 +65,21 @@ class WavADMReader:
ret_dict['programme_end'] = program.get("end") ret_dict['programme_end'] = program.get("end")
ret_dict['contents'] = [] ret_dict['contents'] = []
for content_ref in program.findall("audioContentIDRef", namespaces=nsmap): for content_ref in program.findall("audioContentIDRef",
namespaces=nsmap):
content_dict = dict() content_dict = dict()
content_dict['content_id'] = cid = content_ref.text content_dict['content_id'] = cid = content_ref.text
content = afext.find("audioContent[@audioContentID='%s']" % cid, namespaces=nsmap) content = afext.find("audioContent[@audioContentID='%s']" % cid,
namespaces=nsmap)
content_dict['content_name'] = content.get("audioContentName") content_dict['content_name'] = content.get("audioContentName")
content_dict['objects'] = [] content_dict['objects'] = []
for object_ref in content.findall("audioObjectIDRef", namespaces=nsmap): for object_ref in content.findall("audioObjectIDRef",
namespaces=nsmap):
object_dict = dict() object_dict = dict()
object_dict['object_id'] = oid = object_ref.text object_dict['object_id'] = oid = object_ref.text
object = afext.find("audioObject[@audioObjectID='%s']" % oid, namespaces=nsmap) object = afext.find("audioObject[@audioObjectID='%s']" % oid,
namespaces=nsmap)
pack = object.find("audioPackFormatIDRef", namespaces=nsmap) pack = object.find("audioPackFormatIDRef", namespaces=nsmap)
object_dict['object_name'] = object.get("audioObjectName") object_dict['object_name'] = object.get("audioObjectName")
object_dict['object_start'] = object.get("start") object_dict['object_start'] = object.get("start")
@@ -95,15 +96,17 @@ class WavADMReader:
return ret_dict return ret_dict
def track_info(self, index) -> dict: def track_info(self, index) -> Optional[dict]:
""" """
Information about a track in the WAV file. Information about a track in the WAV file.
:param index: index of audio track (indexed from zero) :param index: index of audio track (indexed from zero)
:returns: a dictionary with *content_name*, *content_id*, *object_name*, *object_id*, :returns: a dictionary with *content_name*, *content_id*,
*object_name*, *object_id*,
*pack_format_name*, *pack_type*, *channel_format_name* *pack_format_name*, *pack_type*, *channel_format_name*
""" """
channel_info = next((x for x in self.channel_uids if x.track_index == index), None) channel_info = next((x for x in self.channel_uids \
if x.track_index == index), None)
if channel_info is None: if channel_info is None:
return None return None
@@ -114,36 +117,50 @@ class WavADMReader:
afext = self.axml.find(".//audioFormatExtended", namespaces=nsmap) afext = self.axml.find(".//audioFormatExtended", namespaces=nsmap)
trackformat_elem = afext.find("audioTrackFormat[@audioTrackFormatID='%s']" % channel_info.track_ref, trackformat_elem = afext.find(
"audioTrackFormat[@audioTrackFormatID='%s']" \
% channel_info.track_ref,
namespaces=nsmap) namespaces=nsmap)
stream_id = trackformat_elem[0].text stream_id = trackformat_elem[0].text
channelformatref_elem = afext.find("audioStreamFormat[@audioStreamFormatID='%s']/audioChannelFormatIDRef" % stream_id, channelformatref_elem = afext.find(
("audioStreamFormat[@audioStreamFormatID='%s']"
"/audioChannelFormatIDRef") % stream_id,
namespaces=nsmap) namespaces=nsmap)
channelformat_id = channelformatref_elem.text channelformat_id = channelformatref_elem.text
packformatref_elem = afext.find("audioStreamFormat[@audioStreamFormatID='%s']/audioPackFormatIDRef" % stream_id, packformatref_elem = afext\
.find(("audioStreamFormat[@audioStreamFormatID='%s']"
"/audioPackFormatIDRef") % stream_id,
namespaces=nsmap) namespaces=nsmap)
packformat_id = packformatref_elem.text packformat_id = packformatref_elem.text
channelformat_elem = afext.find("audioChannelFormat[@audioChannelFormatID='%s']" % channelformat_id, channelformat_elem = afext\
.find("audioChannelFormat[@audioChannelFormatID='%s']" \
% channelformat_id,
namespaces=nsmap) namespaces=nsmap)
ret_dict['channel_format_name'] = channelformat_elem.get("audioChannelFormatName") ret_dict['channel_format_name'] = channelformat_elem.get(
"audioChannelFormatName")
packformat_elem = afext.find("audioPackFormat[@audioPackFormatID='%s']" % packformat_id, packformat_elem = afext.find(
"audioPackFormat[@audioPackFormatID='%s']" % packformat_id,
namespaces=nsmap) namespaces=nsmap)
ret_dict['pack_type'] = packformat_elem.get("typeDefinition") ret_dict['pack_type'] = packformat_elem.get(
ret_dict['pack_format_name'] = packformat_elem.get("audioPackFormatName") "typeDefinition")
ret_dict['pack_format_name'] = packformat_elem.get(
"audioPackFormatName")
object_elem = afext.find("audioObject[audioPackFormatIDRef = '%s']" % packformat_id, object_elem = afext.find("audioObject[audioPackFormatIDRef = '%s']" \
% packformat_id,
namespaces=nsmap) namespaces=nsmap)
ret_dict['audio_object_name'] = object_elem.get("audioObjectName") ret_dict['audio_object_name'] = object_elem.get("audioObjectName")
object_id = object_elem.get("audioObjectID") object_id = object_elem.get("audioObjectID")
ret_dict['object_id'] = object_id ret_dict['object_id'] = object_id
content_elem = afext.find("audioContent/[audioObjectIDRef = '%s']" % object_id, content_elem = afext.find("audioContent/[audioObjectIDRef = '%s']" \
% object_id,
namespaces=nsmap) namespaces=nsmap)
ret_dict['content_name'] = content_elem.get("audioContentName") ret_dict['content_name'] = content_elem.get("audioContentName")
@@ -161,5 +178,6 @@ class WavADMReader:
rd.update(self.track_info(channel_uid_rec.track_index)) rd.update(self.track_info(channel_uid_rec.track_index))
return rd return rd
return dict(channel_entries=list(map(lambda z: make_entry(z), self.channel_uids)), return dict(channel_entries=list(map(lambda z: make_entry(z),
self.channel_uids)),
programme=self.programme()) programme=self.programme())