In-progress flake8 fixes

This commit is contained in:
Jamie Hardt
2023-11-08 20:43:56 -08:00
parent f978927648
commit 5b1e4ab631
12 changed files with 485 additions and 483 deletions

View File

@@ -1,11 +1,13 @@
from optparse import OptionParser, OptionGroup
import datetime
from . import WavInfoReader
from . import __version__
from optparse import OptionParser
import sys
import json
from enum import Enum
class MyJSONEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, Enum):
@@ -13,23 +15,25 @@ class MyJSONEncoder(json.JSONEncoder):
else:
return super().default(o)
class MissingDataError(RuntimeError):
pass
def main():
parser = OptionParser()
parser.usage = 'wavinfo (--adm | --ixml) <FILE> +'
# parser.add_option('-f', dest='output_format', help='Set the output format',
# default='json',
# metavar='FORMAT')
parser.add_option('--adm', dest='adm',
help='Output ADM XML',
default=False,
action='store_true')
parser.add_option('--adm', dest='adm', help='Output ADM XML',
default=False, action='store_true')
parser.add_option('--ixml', dest='ixml', help='Output iXML',
default=False, action='store_true')
parser.add_option('--ixml', dest='ixml',
help='Output iXML',
default=False,
action='store_true')
(options, args) = parser.parse_args(sys.argv)
for arg in args[1:]:
@@ -47,9 +51,9 @@ def main():
raise MissingDataError("ixml")
else:
ret_dict = {
'filename': arg,
'run_date': datetime.datetime.now().isoformat() ,
'application': "wavinfo " + __version__,
'filename': arg,
'run_date': datetime.datetime.now().isoformat(),
'application': "wavinfo " + __version__,
'scopes': {}
}
for scope, name, value in this_file.walk():
@@ -60,8 +64,8 @@ def main():
json.dump(ret_dict, cls=MyJSONEncoder, fp=sys.stdout, indent=2)
except MissingDataError as e:
print("MissingDataError: Missing metadata (%s) in file %s" % \
(e, arg), file=sys.stderr)
print("MissingDataError: Missing metadata (%s) in file %s" %
(e, arg), file=sys.stderr)
continue
except Exception as e:
raise e

View File

@@ -1,14 +1,18 @@
import struct
from collections import namedtuple
# from collections import namedtuple
from typing import NamedTuple, Dict
from . import riff_parser
RF64Context = namedtuple('RF64Context','sample_count bigchunk_table')
class RF64Context(NamedTuple):
sample_count: int
bigchunk_table: Dict[str, int]
def parse_rf64(stream, signature = b'RF64') -> RF64Context:
def parse_rf64(stream, signature=b'RF64') -> RF64Context:
start = stream.tell()
assert( stream.read(4) == b'WAVE' )
assert stream.read(4) == b'WAVE'
ds64_chunk = riff_parser.parse_chunk(stream)
assert type(ds64_chunk) is riff_parser.ChunkDescriptor, \
@@ -16,10 +20,10 @@ def parse_rf64(stream, signature = b'RF64') -> RF64Context:
ds64_field_spec = "<QQQI"
ds64_fields_size = struct.calcsize(ds64_field_spec)
assert(ds64_chunk.ident == b'ds64')
assert ds64_chunk.ident == b'ds64'
ds64_data = ds64_chunk.read_data(stream)
assert(len(ds64_data) >= ds64_fields_size)
assert len(ds64_data) >= ds64_fields_size
riff_size, data_size, sample_count, length_lookup_table = struct.unpack(
ds64_field_spec, ds64_data[0:ds64_fields_size]
@@ -30,14 +34,14 @@ def parse_rf64(stream, signature = b'RF64') -> RF64Context:
# chunksize64size = struct.calcsize(chunksize64format)
for _ in range(length_lookup_table):
bigname, bigsize = struct.unpack_from(chunksize64format, ds64_data,
offset= ds64_fields_size)
bigname, bigsize = struct.unpack_from(chunksize64format,
ds64_data,
offset=ds64_fields_size)
bigchunk_table[bigname] = bigsize
bigchunk_table[b'data'] = data_size
bigchunk_table[signature] = riff_size
stream.seek(start, 0)
return RF64Context( sample_count=sample_count,
return RF64Context(sample_count=sample_count,
bigchunk_table=bigchunk_table)

View File

@@ -1,5 +1,4 @@
from optparse import Option
# from optparse import Option
import struct
from .rf64_parser import parse_rf64, RF64Context
from typing import NamedTuple, Union, List, Optional
@@ -12,14 +11,14 @@ class WavInfoEOFError(EOFError):
class ListChunkDescriptor(NamedTuple):
signature: bytes
signature: bytes
children: List[Union['ChunkDescriptor', 'ListChunkDescriptor']]
class ChunkDescriptor(NamedTuple):
ident: bytes
start: int
length: int
start: int
length: int
rf64_context: Optional[RF64Context]
def read_data(self, from_stream) -> bytes:
@@ -56,8 +55,8 @@ def parse_chunk(stream, rf64_context=None):
rf64_context = parse_rf64(stream=stream, signature=ident)
assert rf64_context is not None, \
f"Sentinel data size 0xFFFFFFFF found outside of RF64 context"
"Sentinel data size 0xFFFFFFFF found outside of RF64 context"
data_size = rf64_context.bigchunk_table[ident]
displacement = data_size
@@ -71,7 +70,7 @@ def parse_chunk(stream, rf64_context=None):
else:
data_start = stream.tell()
stream.seek(displacement, 1)
return ChunkDescriptor(ident=ident,
start=data_start,
return ChunkDescriptor(ident=ident,
start=data_start,
length=data_size,
rf64_context=rf64_context)

View File

@@ -2,8 +2,8 @@
# def binary_to_string(binary_value):
# return reduce(lambda val, el: val + "{:02x}".format(el), binary_value, '')
# return reduce(lambda val, el: val + "{:02x}".format(el),
# binary_value, '')
# class UMIDParser:
# """
@@ -13,109 +13,109 @@
# """
# def __init__(self, raw_umid: bytes):
# self.raw_umid = raw_umid
#
# @property
# def universal_label(self) -> bytearray:
# return self.raw_umid[0:12]
#
# @property
# def basic_umid(self):
# return self.raw_umid[0:32]
#
# @property
# def universal_label(self) -> bytearray:
# return self.raw_umid[0:12]
#
# @property
# def basic_umid(self):
# return self.raw_umid[0:32]
# def basic_umid_to_str(self):
# return binary_to_string(self.raw_umid[0:32])
#
# @property
# def universal_label_is_valid(self) -> bool:
# valid_preamble = b'\x06\x0a\x2b\x34\x01\x01\x01\x05\x01\x01'
# return self.universal_label[0:len(valid_preamble)] == valid_preamble
#
# @property
# def material_type(self) -> str:
# material_byte = self.raw_umid[10]
# if material_byte == 0x1:
# return 'picture'
# elif material_byte == 0x2:
# return 'audio'
# elif material_byte == 0x3:
# return 'data'
# elif material_byte == 0x4:
# return 'other'
# elif material_byte == 0x5:
# return 'picture_single_component'
# elif material_byte == 0x6:
# return 'picture_multiple_component'
# elif material_byte == 0x7:
# return 'audio_single_component'
# elif material_byte == 0x9:
# return 'audio_multiple_component'
# elif material_byte == 0xb:
# return 'auxiliary_single_component'
# elif material_byte == 0xc:
# return 'auxiliary_multiple_component'
# elif material_byte == 0xd:
# return 'mixed_components'
# elif material_byte == 0xf:
# return 'not_identified'
# else:
# return 'not_recognized'
#
# @property
# def material_number_creation_method(self) -> str:
# method_byte = self.raw_umid[11]
# method_byte = (method_byte << 4) & 0xf
# if method_byte == 0x0:
# return 'undefined'
# elif method_byte == 0x1:
# return 'smpte'
# elif method_byte == 0x2:
# return 'uuid'
# elif method_byte == 0x3:
# return 'masked'
# elif method_byte == 0x4:
# return 'ieee1394'
# elif 0x5 <= method_byte <= 0x7:
# return 'reserved_undefined'
# else:
# return 'unrecognized'
#
# @property
# def instance_number_creation_method(self) -> str:
# method_byte = self.raw_umid[11]
# method_byte = method_byte & 0xf
# if method_byte == 0x0:
# return 'undefined'
# elif method_byte == 0x01:
# return 'local_registration'
# elif method_byte == 0x02:
# return '24_bit_prs'
# elif method_byte == 0x03:
# return 'copy_number_and_16_bit_prs'
# elif 0x04 <= method_byte <= 0x0e:
# return 'reserved_undefined'
# elif method_byte == 0x0f:
# return 'live_stream'
# else:
# return 'unrecognized'
#
# @property
# def indicated_length(self) -> str:
# if self.raw_umid[12] == 0x13:
# return 'basic'
# elif self.raw_umid[12] == 0x33:
# return 'extended'
#
# @property
# def instance_number(self) -> bytearray:
# return self.raw_umid[13:3]
#
# @property
# def material_number(self) -> bytearray:
# return self.raw_umid[16:16]
#
# @property
# def source_pack(self) -> Union[bytearray, None]:
# if self.indicated_length == 'extended':
# return self.raw_umid[32:32]
# else:
# return None
# def basic_umid_to_str(self):
# return binary_to_string(self.raw_umid[0:32])
#
# @property
# def universal_label_is_valid(self) -> bool:
# valid_preamble = b'\x06\x0a\x2b\x34\x01\x01\x01\x05\x01\x01'
# return self.universal_label[0:len(valid_preamble)] == valid_preamble
#
# @property
# def material_type(self) -> str:
# material_byte = self.raw_umid[10]
# if material_byte == 0x1:
# return 'picture'
# elif material_byte == 0x2:
# return 'audio'
# elif material_byte == 0x3:
# return 'data'
# elif material_byte == 0x4:
# return 'other'
# elif material_byte == 0x5:
# return 'picture_single_component'
# elif material_byte == 0x6:
# return 'picture_multiple_component'
# elif material_byte == 0x7:
# return 'audio_single_component'
# elif material_byte == 0x9:
# return 'audio_multiple_component'
# elif material_byte == 0xb:
# return 'auxiliary_single_component'
# elif material_byte == 0xc:
# return 'auxiliary_multiple_component'
# elif material_byte == 0xd:
# return 'mixed_components'
# elif material_byte == 0xf:
# return 'not_identified'
# else:
# return 'not_recognized'
#
# @property
# def material_number_creation_method(self) -> str:
# method_byte = self.raw_umid[11]
# method_byte = (method_byte << 4) & 0xf
# if method_byte == 0x0:
# return 'undefined'
# elif method_byte == 0x1:
# return 'smpte'
# elif method_byte == 0x2:
# return 'uuid'
# elif method_byte == 0x3:
# return 'masked'
# elif method_byte == 0x4:
# return 'ieee1394'
# elif 0x5 <= method_byte <= 0x7:
# return 'reserved_undefined'
# else:
# return 'unrecognized'
#
# @property
# def instance_number_creation_method(self) -> str:
# method_byte = self.raw_umid[11]
# method_byte = method_byte & 0xf
# if method_byte == 0x0:
# return 'undefined'
# elif method_byte == 0x01:
# return 'local_registration'
# elif method_byte == 0x02:
# return '24_bit_prs'
# elif method_byte == 0x03:
# return 'copy_number_and_16_bit_prs'
# elif 0x04 <= method_byte <= 0x0e:
# return 'reserved_undefined'
# elif method_byte == 0x0f:
# return 'live_stream'
# else:
# return 'unrecognized'
#
# @property
# def indicated_length(self) -> str:
# if self.raw_umid[12] == 0x13:
# return 'basic'
# elif self.raw_umid[12] == 0x33:
# return 'extended'
#
# @property
# def instance_number(self) -> bytearray:
# return self.raw_umid[13:3]
#
# @property
# def material_number(self) -> bytearray:
# return self.raw_umid[16:16]
#
# @property
# def source_pack(self) -> Union[bytearray, None]:
# if self.indicated_length == 'extended':
# return self.raw_umid[32:32]
# else:
# return None

View File

@@ -9,8 +9,10 @@ from typing import Optional
from lxml import etree as ET
ChannelEntry = namedtuple('ChannelEntry', "track_index uid track_ref pack_ref")
class WavADMReader:
"""
Reads XML data from an EBU ADM (Audio Definiton Model) WAV File.
@@ -31,15 +33,19 @@ class WavADMReader:
offset = calcsize(header_fmt)
for _ in range(uid_count):
track_index, uid, track_ref, pack_ref = unpack_from(uid_fmt,
chna_data,
track_index, uid, track_ref, pack_ref = unpack_from(uid_fmt,
chna_data,
offset)
# these values are either ascii or all null
self.channel_uids.append(ChannelEntry(track_index - 1,
uid.decode('ascii') , track_ref.decode('ascii'),
pack_ref.decode('ascii')))
self.channel_uids.append(
ChannelEntry(track_index - 1,
uid.decode('ascii'),
track_ref.decode('ascii'),
pack_ref.decode('ascii')
)
)
offset += calcsize(uid_fmt)
@@ -49,13 +55,13 @@ class WavADMReader:
def programme(self) -> dict:
"""
Read the ADM `audioProgramme` data structure and some of its reference
properties.
Read the ADM `audioProgramme` data structure and some of its reference
properties.
"""
ret_dict = dict()
nsmap = self.axml.getroot().nsmap
nsmap = self.axml.getroot().nsmap
afext = self.axml.find(".//audioFormatExtended", namespaces=nsmap)
program = afext.find("audioProgramme", namespaces=nsmap)
@@ -65,20 +71,20 @@ class WavADMReader:
ret_dict['programme_end'] = program.get("end")
ret_dict['contents'] = []
for content_ref in program.findall("audioContentIDRef",
for content_ref in program.findall("audioContentIDRef",
namespaces=nsmap):
content_dict = dict()
content_dict['content_id'] = cid = content_ref.text
content = afext.find("audioContent[@audioContentID='%s']" % cid,
content = afext.find("audioContent[@audioContentID='%s']" % cid,
namespaces=nsmap)
content_dict['content_name'] = content.get("audioContentName")
content_dict['objects'] = []
for object_ref in content.findall("audioObjectIDRef",
for object_ref in content.findall("audioObjectIDRef",
namespaces=nsmap):
object_dict = dict()
object_dict['object_id'] = oid = object_ref.text
object = afext.find("audioObject[@audioObjectID='%s']" % oid,
object = afext.find("audioObject[@audioObjectID='%s']" % oid,
namespaces=nsmap)
pack = object.find("audioPackFormatIDRef", namespaces=nsmap)
object_dict['object_name'] = object.get("audioObjectName")
@@ -100,14 +106,14 @@ class WavADMReader:
"""
Information about a track in the WAV file.
:param index: index of audio track (indexed from zero)
:returns: a dictionary with *content_name*, *content_id*,
*object_name*, *object_id*,
:param index: index of audio track (indexed from zero)
:returns: a dictionary with *content_name*, *content_id*,
*object_name*, *object_id*,
*pack_format_name*, *pack_type*, *channel_format_name*
"""
channel_info = next((x for x in self.channel_uids \
if x.track_index == index), None)
channel_info = next((x for x in self.channel_uids
if x.track_index == index), None)
if channel_info is None:
return None
@@ -115,60 +121,60 @@ class WavADMReader:
nsmap = self.axml.getroot().nsmap
afext = self.axml.find(".//audioFormatExtended", namespaces=nsmap)
afext = self.axml.find(".//audioFormatExtended",
namespaces=nsmap)
trackformat_elem = afext.find(
"audioTrackFormat[@audioTrackFormatID='%s']" \
% channel_info.track_ref,
namespaces=nsmap)
"audioTrackFormat[@audioTrackFormatID='%s']"
% channel_info.track_ref, namespaces=nsmap)
stream_id = trackformat_elem[0].text
channelformatref_elem = afext.find(
("audioStreamFormat[@audioStreamFormatID='%s']"
"/audioChannelFormatIDRef") % stream_id,
("audioStreamFormat[@audioStreamFormatID='%s']"
"/audioChannelFormatIDRef") % stream_id,
namespaces=nsmap)
channelformat_id = channelformatref_elem.text
packformatref_elem = afext\
.find(("audioStreamFormat[@audioStreamFormatID='%s']"
"/audioPackFormatIDRef") % stream_id,
packformatref_elem = afext.find(
("audioStreamFormat[@audioStreamFormatID='%s']"
"/audioPackFormatIDRef") % stream_id,
namespaces=nsmap)
packformat_id = packformatref_elem.text
channelformat_elem = afext\
.find("audioChannelFormat[@audioChannelFormatID='%s']" \
% channelformat_id,
namespaces=nsmap)
.find("audioChannelFormat[@audioChannelFormatID='%s']"
% channelformat_id,
namespaces=nsmap)
ret_dict['channel_format_name'] = channelformat_elem.get(
"audioChannelFormatName")
"audioChannelFormatName")
packformat_elem = afext.find(
"audioPackFormat[@audioPackFormatID='%s']" % packformat_id,
"audioPackFormat[@audioPackFormatID='%s']" % packformat_id,
namespaces=nsmap)
ret_dict['pack_type'] = packformat_elem.get(
"typeDefinition")
"typeDefinition")
ret_dict['pack_format_name'] = packformat_elem.get(
"audioPackFormatName")
"audioPackFormatName")
object_elem = afext.find("audioObject[audioPackFormatIDRef = '%s']" \
% packformat_id,
namespaces=nsmap)
object_elem = afext.find("audioObject[audioPackFormatIDRef = '%s']"
% packformat_id,
namespaces=nsmap)
ret_dict['audio_object_name'] = object_elem.get("audioObjectName")
object_id = object_elem.get("audioObjectID")
ret_dict['object_id'] = object_id
content_elem = afext.find("audioContent/[audioObjectIDRef = '%s']" \
% object_id,
namespaces=nsmap)
content_elem = afext.find("audioContent/[audioObjectIDRef = '%s']"
% object_id,
namespaces=nsmap)
ret_dict['content_name'] = content_elem.get("audioContentName")
ret_dict['content_id'] = content_elem.get("audioContentID")
return ret_dict
def to_dict(self) -> dict: #FIXME should be "asdict"
def to_dict(self) -> dict: # FIXME should be "asdict"
"""
Get ADM metadata as a dictionary.
"""
@@ -178,6 +184,6 @@ class WavADMReader:
rd.update(self.track_info(channel_uid_rec.track_index))
return rd
return dict(channel_entries=list(map(lambda z: make_entry(z),
return dict(channel_entries=list(map(lambda z: make_entry(z),
self.channel_uids)),
programme=self.programme())
programme=self.programme())

View File

@@ -3,74 +3,75 @@ import struct
from typing import Optional
class WavBextReader:
def __init__(self, bext_data, encoding):
"""
Read Broadcast-WAV extended metadata.
:param bext_data: The bytes-like data.
:param encoding: The encoding to use when decoding the text fields of
the BEXT metadata scope. According to EBU Rec 3285 this shall be
:param encoding: The encoding to use when decoding the text fields of
the BEXT metadata scope. According to EBU Rec 3285 this shall be
ASCII.
"""
packstring = "<256s" + "32s" + "32s" + "10s" + "8s" + "QH" + "64s" + \
"hhhhh" + "180s"
"hhhhh" + "180s"
rest_starts = struct.calcsize(packstring)
unpacked = struct.unpack(packstring, bext_data[:rest_starts])
def sanitize_bytes(b : bytes) -> str:
def sanitize_bytes(b: bytes) -> str:
# honestly can't remember why I'm stripping nulls this way
first_null = next((index for index, byte in enumerate(b) \
if byte == 0), None)
first_null = next((index for index, byte in enumerate(b)
if byte == 0), None)
trimmed = b if first_null is None else b[:first_null]
decoded = trimmed.decode(encoding)
return decoded
#: Description. A free-text field up to 256 characters long.
self.description : str = sanitize_bytes(unpacked[0])
self.description: str = sanitize_bytes(unpacked[0])
#: Originator. Usually the name of the encoding application, sometimes
#: an artist name.
self.originator : str = sanitize_bytes(unpacked[1])
self.originator: str = sanitize_bytes(unpacked[1])
#: A unique identifier for the file, a serial number.
self.originator_ref : str = sanitize_bytes(unpacked[2])
self.originator_ref: str = sanitize_bytes(unpacked[2])
#: Date of the recording, in the format YYYY-MM-DD.
self.originator_date : str = sanitize_bytes(unpacked[3])
self.originator_date: str = sanitize_bytes(unpacked[3])
#: Time of the recording, in the format HH:MM:SS.
self.originator_time : str = sanitize_bytes(unpacked[4])
#: The sample offset of the start, usually relative
#: to midnight.
self.time_reference : int = unpacked[5]
self.originator_time: str = sanitize_bytes(unpacked[4])
#: The sample offset of the start, usually relative
#: to midnight.
self.time_reference: int = unpacked[5]
#: A variable-length text field containing a list of processes and
#: and conversions performed on the file.
self.coding_history : str = sanitize_bytes(bext_data[rest_starts:])
#: BEXT version.
self.version : int = unpacked[6]
#: SMPTE 330M UMID of this audio file, 64 bytes are allocated though
self.coding_history: str = sanitize_bytes(bext_data[rest_starts:])
#: BEXT version.
self.version: int = unpacked[6]
#: SMPTE 330M UMID of this audio file, 64 bytes are allocated though
#: the UMID may only be 32 bytes long.
self.umid : Optional[bytes] = None
self.umid: Optional[bytes] = None
#: EBU R128 Integrated loudness, in LUFS.
self.loudness_value : Optional[float] = None
self.loudness_value: Optional[float] = None
#: EBU R128 Loudness range, in LUFS.
self.loudness_range : Optional[float] = None
self.loudness_range: Optional[float] = None
#: True peak level, in dBFS TP
self.max_true_peak : Optional[float] = None
self.max_true_peak: Optional[float] = None
#: EBU R128 Maximum momentary loudness, in LUFS
self.max_momentary_loudness : Optional[float] = None
self.max_momentary_loudness: Optional[float] = None
#: EBU R128 Maximum short-term loudness, in LUFS.
self.max_shortterm_loudness : Optional[float] = None
self.max_shortterm_loudness: Optional[float] = None
if self.version > 0:
self.umid = unpacked[7]
@@ -87,7 +88,7 @@ class WavBextReader:
# umid_parsed = UMIDParser(self.umid)
# umid_str = umid_parsed.basic_umid_to_str()
# else:
umid_str = None
return {'description': self.description,

View File

@@ -1,26 +1,25 @@
"""
Cues metadata
For reference on implementation of cues and related metadata see:
For reference on implementation of cues and related metadata see:
August 1991, "Multimedia Programming Interface and Data Specifications 1.0",
IBM Corporation and Microsoft Corporation
https://www.aelius.com/njh/wavemetatools/doc/riffmci.pdf
"""
from dataclasses import dataclass
import encodings
from .riff_parser import ChunkDescriptor
from struct import unpack, calcsize
from typing import Optional, Tuple, NamedTuple, List, Dict, Any, Generator
#: Country Codes used in the RIFF standard to resolve locale. These codes
#: Country Codes used in the RIFF standard to resolve locale. These codes
#: appear in CSET and LTXT metadata.
CountryCodes = """000 None Indicated
001,USA
002,Canada
003,Latin America
030,Greece
030,Greece
031,Netherlands
032,Belgium
033,France
@@ -44,50 +43,50 @@ CountryCodes = """000 None Indicated
090,Turkey
351,Portugal
352,Luxembourg
354,Iceland
354,Iceland
358,Finland"""
#: Language and Dialect codes used in the RIFF standard to resolve native
#: Language and Dialect codes used in the RIFF standard to resolve native
#: language of text fields. These codes appear in CSET and LTXT metadata.
LanguageDialectCodes = """0 0 None Indicated
1,1,Arabic
2,1,Bulgarian
3,1,Catalan
4,1,Traditional Chinese
4,2,Simplified Chinese
4,1,Traditional Chinese
4,2,Simplified Chinese
5,1,Czech
6,1,Danish
7,1,German
7,2,Swiss German
7,2,Swiss German
8,1,Greek
9,1,US English
9,2,UK English
9,2,UK English
10,1,Spanish
10,2,Spanish Mexican
10,2,Spanish Mexican
11,1,Finnish
12,1,French
12,2,Belgian French
12,3,Canadian French
12,4,Swiss French
13,1,Hebrew
12,2,Belgian French
12,3,Canadian French
12,4,Swiss French
13,1,Hebrew
14,1,Hungarian
15,1,Icelandic
16,1,Italian
16,2,Swiss Italian
16,2,Swiss Italian
17,1,Japanese
18,1,Korean
19,1,Dutch
19,2,Belgian Dutch
20,1,Norwegian - Bokmal
20,2,Norwegian - Nynorsk
19,2,Belgian Dutch
20,1,Norwegian - Bokmal
20,2,Norwegian - Nynorsk
21,1,Polish
22,1,Brazilian Portuguese
22,2,Portuguese
23,1,Rhaeto-Romanic
22,1,Brazilian Portuguese
22,2,Portuguese
23,1,Rhaeto-Romanic
24,1,Romanian
25,1,Russian
26,1,Serbo-Croatian (Latin)
26,2,Serbo-Croatian (Cyrillic)
26,1,Serbo-Croatian (Latin)
26,2,Serbo-Croatian (Cyrillic)
27,1,Slovak
28,1,Albanian
29,1,Swedish
@@ -101,10 +100,10 @@ class CueEntry(NamedTuple):
"""
A ``cue`` element structure.
"""
#: Cue "name" or id number
#: Cue "name" or id number
name: int
#: Cue position, as a frame count in the play order of the WAVE file. In
#: principle this can be affected by playlists and ``wavl`` chunk
#: Cue position, as a frame count in the play order of the WAVE file. In
#: principle this can be affected by playlists and ``wavl`` chunk
#: placement.
position: int
chunk_id: bytes
@@ -113,7 +112,7 @@ class CueEntry(NamedTuple):
sample_offset: int
Format = "<II4sIII"
@classmethod
def format_size(cls) -> int:
return calcsize(cls.Format)
@@ -121,13 +120,13 @@ class CueEntry(NamedTuple):
@classmethod
def read(cls, data: bytes) -> 'CueEntry':
assert len(data) == cls.format_size(), \
(f"cue data size incorrect, expected {calcsize(cls.Format)} "
"found {len(data)}")
(f"cue data size incorrect, expected {calcsize(cls.Format)} "
"found {len(data)}")
parsed = unpack(cls.Format, data)
return cls(name=parsed[0], position=parsed[1], chunk_id=parsed[2],
chunk_start=parsed[3], block_start=parsed[4],
chunk_start=parsed[3], block_start=parsed[4],
sample_offset=parsed[5])
@@ -170,8 +169,8 @@ class RangeLabel(NamedTuple):
fallback_encoding = f"cp{data[6]}"
return cls(name=parsed[0], length=parsed[1], purpose=parsed[2],
country=parsed[3], language=parsed[4],
dialect=parsed[5], codepage=parsed[6],
country=parsed[3], language=parsed[4],
dialect=parsed[5], codepage=parsed[6],
text=text_data.decode(fallback_encoding))
@@ -192,19 +191,19 @@ class WavCuesReader:
@classmethod
def read_all(cls, f,
cues: Optional[ChunkDescriptor],
labls: List[ChunkDescriptor],
ltxts: List[ChunkDescriptor],
notes: List[ChunkDescriptor],
fallback_encoding: str) -> 'WavCuesReader':
cues: Optional[ChunkDescriptor],
labls: List[ChunkDescriptor],
ltxts: List[ChunkDescriptor],
notes: List[ChunkDescriptor],
fallback_encoding: str) -> 'WavCuesReader':
cue_list = []
if cues is not None:
cues_data = cues.read_data(f)
assert len(cues_data) >= 4, "cue metadata too short"
offset = calcsize("<I")
cues_count = unpack("<I", cues_data[0:offset])
for _ in range(cues_count[0]):
cue_bytes = cues_data[offset: offset + CueEntry.format_size()]
cue_list.append(CueEntry.read(cue_bytes))
@@ -213,14 +212,14 @@ class WavCuesReader:
label_list = []
for labl in labls:
label_list.append(
LabelEntry.read(labl.read_data(f),
LabelEntry.read(labl.read_data(f),
encoding=fallback_encoding)
)
range_list = []
for r in ltxts:
range_list.append(
RangeLabel.read(r.read_data(f),
RangeLabel.read(r.read_data(f),
fallback_encoding=fallback_encoding)
)
@@ -236,7 +235,7 @@ class WavCuesReader:
def each_cue(self) -> Generator[Tuple[int, int], None, None]:
"""
Iterate through each cue.
Iterate through each cue.
:yields: the cue's ``name`` and ``sample_offset``
"""
@@ -244,17 +243,17 @@ class WavCuesReader:
yield (cue.name, cue.sample_offset)
def label_and_note(self, cue_ident: int) -> Tuple[Optional[str],
Optional[str]]:
Optional[str]]:
"""
Get the label and note (extended comment) for a cue.
:param cue_ident: the cue's name, its unique identifying number
:returns: a tuple of the the cue's label (if present) and note (if
present)
present)
"""
label = next((l.text for l in self.labels
if l.name == cue_ident), None)
note = next((n.text for n in self.notes
label = next((label.text for label in self.labels
if label.name == cue_ident), None)
note = next((n.text for n in self.notes
if n.name == cue_ident), None)
return (label, note)
@@ -265,7 +264,7 @@ class WavCuesReader:
:param cue_ident: the cue's name, its unique identifying number
:returns: the length of the marker's range, or `None`
"""
return next((r.length for r in self.ranges
return next((r.length for r in self.ranges
if r.name == cue_ident), None)
def to_dict(self) -> Dict[str, Any]:
@@ -280,9 +279,8 @@ class WavCuesReader:
if label is not None:
retval[n]['label'] = label
if note is not None:
retval[n]['note'] = note
retval[n]['note'] = note
if r is not None:
retval[n]['length'] = r
return retval
retval[n]['length'] = r
return retval

View File

@@ -1,7 +1,7 @@
"""
Reading Dolby Bitstream Metadata
Unless otherwise stated, all § references here are to
Unless otherwise stated, all § references here are to
`EBU Tech 3285 Supplement 6`_.
.. _EBU Tech 3285 Supplement 6: https://tech.ebu.ch/docs/tech/tech3285s6.pdf
@@ -14,6 +14,7 @@ from typing import List, Tuple, Any, Union
from io import BytesIO
class SegmentType(IntEnum):
"""
Metadata segment type.
@@ -31,7 +32,7 @@ class SegmentType(IntEnum):
DolbyAtmosSupplemental = 0xa
@classmethod
def _missing_(cls,val):
def _missing_(cls, val):
return val
@@ -39,11 +40,11 @@ class SegmentType(IntEnum):
class DolbyDigitalPlusMetadata:
"""
*Dolby Digital Plus* is Dolby's brand for multichannel surround
on discrete formats that aren't AC-3 (Dolby Digital) or Dolby E. This
metadata segment is present in ADM wave files created with a Dolby Atmos
on discrete formats that aren't AC-3 (Dolby Digital) or Dolby E. This
metadata segment is present in ADM wave files created with a Dolby Atmos
Production Suite.
Where an AC-3 bitstream can contain multiple programs, a Dolby Digital
Where an AC-3 bitstream can contain multiple programs, a Dolby Digital
Plus bitstream will only contain one program.
"""
@@ -77,7 +78,6 @@ class DolbyDigitalPlusMetadata:
MUTE = 0b111
"-∞ dB"
class DolbySurroundEncodingMode(Enum):
"""
Dolby surround endcoding mode.
@@ -87,7 +87,6 @@ class DolbyDigitalPlusMetadata:
NOT_IN_USE = 0b01
NOT_INDICATED = 0b00
class BitStreamMode(Enum):
"""
Dolby Digital Plus `bsmod` field
@@ -122,7 +121,6 @@ class DolbyDigitalPlusMetadata:
should be interpreted as karaoke.
"""
class AudioCodingMode(Enum):
"""
Dolby Digital Plus `acmod` field
@@ -144,7 +142,6 @@ class DolbyDigitalPlusMetadata:
CH_ORD_3_2 = 0b111
"LCR + LR surround"
class CenterDownMixLevel(Enum):
"""
§ 4.3.3.1
@@ -152,16 +149,15 @@ class DolbyDigitalPlusMetadata:
DOWN_3DB = 0b00
"Attenuate 3 dB"
DOWN_45DB = 0b01
"Attenuate 4.5 dB"
DOWN_6DB = 0b10
"Attenuate 6 dB"
RESERVED = 0b11
class SurroundDownMixLevel(Enum):
"""
Dolby Digital Plus `surmixlev` field
@@ -172,7 +168,6 @@ class DolbyDigitalPlusMetadata:
MUTE = 0b10
RESERVED = 0b11
class LanguageCode(int):
"""
§ 4.3.4.1
@@ -181,21 +176,18 @@ class DolbyDigitalPlusMetadata:
"""
pass
class MixLevel(int):
"""
§ 4.3.6.2
"""
pass
class DialnormLevel(int):
"""
§ 4.3.4.4
"""
pass
class RoomType(Enum):
"""
`roomtyp` 4.3.6.3
@@ -205,10 +197,9 @@ class DolbyDigitalPlusMetadata:
SMALL_ROOM_FLAT_CURVE = 0b10
RESERVED = 0b11
class PreferredDownMixMode(Enum):
"""
Indicates the creating engineer's preference of what the receiver
Indicates the creating engineer's preference of what the receiver
should downmix.
§ 4.3.8.1
"""
@@ -217,7 +208,6 @@ class DolbyDigitalPlusMetadata:
STEREO = 0b10
PRO_LOGIC_2 = 0b11
class SurroundEXMode(IntEnum):
"""
Dolby Surround-EX mode.
@@ -228,7 +218,6 @@ class DolbyDigitalPlusMetadata:
SEX = 0b10
PRO_LOGIC_2 = 0b11
class HeadphoneMode(IntEnum):
"""
`dheadphonmod` § 4.3.9.2
@@ -238,12 +227,10 @@ class DolbyDigitalPlusMetadata:
DOLBY_HEADPHONE = 0b10
RESERVED = 0b11
class ADConverterType(Enum):
STANDARD = 0
HDCD = 1
class StreamDependency(Enum):
"""
Encodes `ddplus_info1.stream_type` field § 4.3.12.1
@@ -254,7 +241,6 @@ class DolbyDigitalPlusMetadata:
INDEPENDENT_FROM_DOLBY_DIGITAL = 2
RESERVED = 3
class RFCompressionProfile(Enum):
"""
`compr1` RF compression profile
@@ -267,7 +253,7 @@ class DolbyDigitalPlusMetadata:
MUSIC_LIGHT = 4
SPEECH = 5
#: Program ID number, this identifies the program in a multi-program
#: Program ID number, this identifies the program in a multi-program
#: element. § 4.3.1
program_id: int
@@ -317,13 +303,13 @@ class DolbyDigitalPlusMetadata:
#: LoRo preferred center downmix level
loro_center_downmix_level: DownMixLevelToken
#: LoRo preferred surround downmix level
loro_surround_downmix_level: DownMixLevelToken
#: Preferred downmix mode
downmix_mode: PreferredDownMixMode
#: LtRt preferred center downmix level
ltrt_center_downmix_level: DownMixLevelToken
@@ -332,20 +318,20 @@ class DolbyDigitalPlusMetadata:
#: Surround-EX mode
surround_ex_mode: SurroundEXMode
#: Dolby Headphone mode
dolby_headphone_encoded: HeadphoneMode
ad_converter_type: ADConverterType
compression_profile: RFCompressionProfile
dynamic_range: RFCompressionProfile
#: Indicates if this stream can be decoded independently or not
stream_dependency: StreamDependency
#: Data rate of this bitstream in kilobits per second
datarate_kbps: int
@staticmethod
def load(buffer: bytes):
assert len(buffer) == 96, "Dolby Digital Plus segment incorrect size, "
@@ -363,12 +349,15 @@ class DolbyDigitalPlusMetadata:
pass
def surround_config(b):
return DolbyDigitalPlusMetadata.CenterDownMixLevel(b & 0x30 >> 4), \
DolbyDigitalPlusMetadata.SurroundDownMixLevel(b & 0xc >> 2), \
DolbyDigitalPlusMetadata.DolbySurroundEncodingMode(b & 0x3)
return DolbyDigitalPlusMetadata\
.CenterDownMixLevel(b & 0x30 >> 4),\
DolbyDigitalPlusMetadata\
.SurroundDownMixLevel(b & 0xc >> 2), \
DolbyDigitalPlusMetadata\
.DolbySurroundEncodingMode(b & 0x3)
def dialnorm_info(b):
return (b & 0x80) > 0 , b & 0x40 > 0, b & 0x20 > 0, \
return (b & 0x80) > 0, b & 0x40 > 0, b & 0x20 > 0, \
DolbyDigitalPlusMetadata.DialnormLevel(b & 0x1f)
def langcod(b) -> int:
@@ -379,7 +368,7 @@ class DolbyDigitalPlusMetadata:
DolbyDigitalPlusMetadata.MixLevel(b & 0x7c >> 2), \
DolbyDigitalPlusMetadata.RoomType(b & 0x3)
# loro_center_downmix_level, loro_surround_downmix_level
# loro_center_downmix_level, loro_surround_downmix_level
def ext_bsi1_word1(b):
return DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x38 >> 3), \
DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x7)
@@ -387,15 +376,15 @@ class DolbyDigitalPlusMetadata:
# downmix_mode, ltrt_center_downmix_level, ltrt_surround_downmix_level
def ext_bsi1_word2(b):
return DolbyDigitalPlusMetadata\
.PreferredDownMixMode(b & 0xC0 >> 6), \
.PreferredDownMixMode(b & 0xC0 >> 6), \
DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x38 >> 3), \
DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x7)
#surround_ex_mode, dolby_headphone_encoded, ad_converter_type
# surround_ex_mode, dolby_headphone_encoded, ad_converter_type
def ext_bsi2_word1(b):
return DolbyDigitalPlusMetadata.SurroundEXMode(b & 0x60 >> 5), \
DolbyDigitalPlusMetadata.HeadphoneMode(b & 0x18 >> 3), \
DolbyDigitalPlusMetadata.ADConverterType( b & 0x4 >> 2)
DolbyDigitalPlusMetadata.ADConverterType(b & 0x4 >> 2)
def ddplus_reserved2(_):
pass
@@ -404,7 +393,7 @@ class DolbyDigitalPlusMetadata:
return DolbyDigitalPlusMetadata.RFCompressionProfile(b)
def dynrng1(b):
DolbyDigitalPlusMetadata.RFCompressionProfile(b)
DolbyDigitalPlusMetadata.RFCompressionProfile(b)
def ddplus_reserved3(_):
pass
@@ -425,18 +414,18 @@ class DolbyDigitalPlusMetadata:
lfe_on, bitstream_mode, audio_coding_mode = program_info(buffer[1])
ddplus_reserved1(buffer[2:2])
center_downmix_level, surround_downmix_level, \
dolby_surround_encoded = surround_config(buffer[4])
dolby_surround_encoded = surround_config(buffer[4])
langcode_present, copyright_bitstream, original_bitstream, \
dialnorm = dialnorm_info(buffer[5])
dialnorm = dialnorm_info(buffer[5])
langcode = langcod(buffer[6])
prod_info_exists, mixlevel, roomtype = audio_prod_info(buffer[7])
loro_center_downmix_level, \
loro_surround_downmix_level = ext_bsi1_word1(buffer[8])
loro_surround_downmix_level = ext_bsi1_word1(buffer[8])
downmix_mode, ltrt_center_downmix_level, \
ltrt_surround_downmix_level = ext_bsi1_word2(buffer[9])
ltrt_surround_downmix_level = ext_bsi1_word2(buffer[9])
surround_ex_mode, dolby_headphone_encoded, \
ad_converter_type = ext_bsi2_word1(buffer[10])
ad_converter_type = ext_bsi2_word1(buffer[10])
ddplus_reserved2(buffer[11:14])
compression = compr1(buffer[14])
@@ -448,32 +437,32 @@ class DolbyDigitalPlusMetadata:
reserved(buffer[27:69])
return DolbyDigitalPlusMetadata(program_id=pid,
lfe_on=lfe_on,
bitstream_mode=bitstream_mode,
audio_coding_mode=audio_coding_mode,
center_downmix_level=center_downmix_level,
surround_downmix_level=surround_downmix_level,
dolby_surround_encoded=dolby_surround_encoded,
langcode_present=langcode_present,
copyright_bitstream=copyright_bitstream,
original_bitstream=original_bitstream,
dialnorm=dialnorm,
langcode=langcode,
prod_info_exists=prod_info_exists,
mixlevel=mixlevel,
roomtype=roomtype,
loro_center_downmix_level=loro_center_downmix_level,
loro_surround_downmix_level=loro_surround_downmix_level,
downmix_mode=downmix_mode,
ltrt_center_downmix_level=ltrt_center_downmix_level,
ltrt_surround_downmix_level=ltrt_surround_downmix_level,
surround_ex_mode=surround_ex_mode,
dolby_headphone_encoded=dolby_headphone_encoded,
ad_converter_type=ad_converter_type,
compression_profile=compression,
dynamic_range=dynamic_range,
stream_dependency=stream_info,
datarate_kbps=data_rate)
lfe_on=lfe_on,
bitstream_mode=bitstream_mode,
audio_coding_mode=audio_coding_mode,
center_downmix_level=center_downmix_level,
surround_downmix_level=surround_downmix_level,
dolby_surround_encoded=dolby_surround_encoded,
langcode_present=langcode_present,
copyright_bitstream=copyright_bitstream,
original_bitstream=original_bitstream,
dialnorm=dialnorm,
langcode=langcode,
prod_info_exists=prod_info_exists,
mixlevel=mixlevel,
roomtype=roomtype,
loro_center_downmix_level=loro_center_downmix_level,
loro_surround_downmix_level=loro_surround_downmix_level,
downmix_mode=downmix_mode,
ltrt_center_downmix_level=ltrt_center_downmix_level,
ltrt_surround_downmix_level=ltrt_surround_downmix_level,
surround_ex_mode=surround_ex_mode,
dolby_headphone_encoded=dolby_headphone_encoded,
ad_converter_type=ad_converter_type,
compression_profile=compression,
dynamic_range=dynamic_range,
stream_dependency=stream_info,
datarate_kbps=data_rate)
@dataclass
@@ -492,7 +481,7 @@ class DolbyAtmosMetadata:
NOT_INDICATED = 0x04
tool_name: str
tool_version: Tuple[int,int,int]
tool_version: Tuple[int, int, int]
warp_mode: WarpMode
SEGMENT_LENGTH = 248
@@ -501,8 +490,8 @@ class DolbyAtmosMetadata:
@classmethod
def load(cls, data: bytes):
assert len(data) == cls.SEGMENT_LENGTH, ("DolbyAtmosMetadata segment "
"is incorrect length, "
"expected %i actual was %i") % (cls.SEGMENT_LENGTH, len(data))
"is incorrect length, "
"expected %i actual was %i") % (cls.SEGMENT_LENGTH, len(data))
h = BytesIO(data)
@@ -519,12 +508,12 @@ class DolbyAtmosMetadata:
a_val = unpack("B", h.read(1))[0]
warp_mode = a_val & 0x7
return DolbyAtmosMetadata(tool_name=toolname,
tool_version=(major, minor, fix),
warp_mode=DolbyAtmosMetadata\
.WarpMode(warp_mode))
return DolbyAtmosMetadata(tool_name=toolname,
tool_version=(major, minor, fix),
warp_mode=DolbyAtmosMetadata
.WarpMode(warp_mode))
@dataclass
class DolbyAtmosSupplementalMetadata:
"""
@@ -532,7 +521,7 @@ class DolbyAtmosSupplementalMetadata:
https://github.com/DolbyLaboratories/dbmd-atmos-parser/blob/
master/dbmd_atmos_parse/src/dbmd_atmos_parse.c
"""
"""
class BinauralRenderMode(Enum):
BYPASS = 0x00
@@ -541,12 +530,10 @@ class DolbyAtmosSupplementalMetadata:
MID = 0x03
NOT_INDICATED = 0x04
object_count: int
render_modes: List['DolbyAtmosSupplementalMetadata.BinauralRenderMode']
trim_modes: List[int]
MAGIC = 0xf8726fbd
TRIM_CONFIG_COUNT = 9
@@ -562,15 +549,15 @@ class DolbyAtmosSupplementalMetadata:
object_count = unpack("<H", h.read(2))[0]
h.read(1) #skip 1
h.read(1) # skip 1
for _ in range(cls.TRIM_CONFIG_COUNT):
auto_trim = unpack("B", h.read(1))
trim_modes.append(auto_trim)
h.read(14) #skip 14
h.read(object_count) # skip object_count bytes
h.read(14) # skip 14
h.read(object_count) # skip object_count bytes
for _ in range(object_count):
binaural_mode = unpack("B", h.read(1))[0]
@@ -578,7 +565,7 @@ class DolbyAtmosSupplementalMetadata:
render_modes.append(binaural_mode)
return DolbyAtmosSupplementalMetadata(object_count=object_count,
render_modes=render_modes,trim_modes=trim_modes)
render_modes=render_modes, trim_modes=trim_modes)
class WavDolbyMetadataReader:
@@ -590,11 +577,11 @@ class WavDolbyMetadataReader:
#:
#: Each list entry is a tuple of `SegmentType`, a `bool`
#: indicating if the segment's checksum was valid, and the
#: segment's parsed dataclass (or a `bytes` array if it was
#: segment's parsed dataclass (or a `bytes` array if it was
#: not recognized).
segment_list: List[Tuple[Union[SegmentType, int], bool, Any]]
version: Tuple[int,int,int,int]
version: Tuple[int, int, int, int]
@staticmethod
def segment_checksum(bs: bytes, size: int):
@@ -607,7 +594,6 @@ class WavDolbyMetadataReader:
return retval
def __init__(self, dbmd_data):
self.segment_list = []
@@ -616,19 +602,19 @@ class WavDolbyMetadataReader:
v_vec = []
for _ in range(4):
b = h.read(1)
v_vec.insert(0, unpack("B",b)[0])
v_vec.insert(0, unpack("B", b)[0])
self.version = tuple(v_vec)
while True:
stype= SegmentType(unpack("B", h.read(1))[0])
stype = SegmentType(unpack("B", h.read(1))[0])
if stype == SegmentType.EndMarker:
break
else:
seg_size = unpack("<H", h.read(2))[0]
seg_payload = h.read(seg_size)
expected_checksum = WavDolbyMetadataReader\
.segment_checksum(seg_payload, seg_size)
.segment_checksum(seg_payload, seg_size)
checksum = unpack("B", h.read(1))[0]
segment = seg_payload
@@ -638,36 +624,36 @@ class WavDolbyMetadataReader:
segment = DolbyAtmosMetadata.load(segment)
elif stype == SegmentType.DolbyAtmosSupplemental:
segment = DolbyAtmosSupplementalMetadata.load(segment)
self.segment_list\
.append((stype, checksum == expected_checksum, segment))
.append((stype, checksum == expected_checksum, segment))
def dolby_digital_plus(self) -> List[DolbyDigitalPlusMetadata]:
"""
Every valid Dolby Digital Plus metadata segment in the file.
"""
return [x[2] for x in self.segment_list \
if x[0] == SegmentType.DolbyDigitalPlus and x[1]]
return [x[2] for x in self.segment_list
if x[0] == SegmentType.DolbyDigitalPlus and x[1]]
def dolby_atmos(self) -> List[DolbyAtmosMetadata]:
"""
Every valid Dolby Atmos metadata segment in the file.
"""
return [x[2] for x in self.segment_list \
if x[0] == SegmentType.DolbyAtmos and x[1]]
return [x[2] for x in self.segment_list
if x[0] == SegmentType.DolbyAtmos and x[1]]
def dolby_atmos_supplemental(self) -> List[DolbyAtmosSupplementalMetadata]:
"""
Every valid Dolby Atmos Supplemental metadata segment in the file.
"""
return [x[2] for x in self.segment_list \
if x[0] == SegmentType.DolbyAtmosSupplemental and x[1]]
return [x[2] for x in self.segment_list
if x[0] == SegmentType.DolbyAtmosSupplemental and x[1]]
def to_dict(self) -> dict:
ddp = map(lambda x: asdict(x), self.dolby_digital_plus())
atmos = map(lambda x: asdict(x), self.dolby_atmos())
#atmos_sup = map(lambda x: asdict(x), self.dolby_atmos_supplemental())
# atmos_sup = map(lambda x: asdict(x), self.dolby_atmos_supplemental())
return dict(dolby_digital_plus=list(ddp),
dolby_atmos=list(atmos))
dolby_atmos=list(atmos))

View File

@@ -2,6 +2,7 @@ from .riff_parser import parse_chunk, ListChunkDescriptor
from typing import Optional
class WavInfoChunkReader:
def __init__(self, f, encoding):
@@ -11,50 +12,50 @@ class WavInfoChunkReader:
parsed_chunks = parse_chunk(f)
assert type(parsed_chunks) == ListChunkDescriptor
list_chunks = [chunk for chunk in parsed_chunks.children
if type(chunk) is ListChunkDescriptor]
list_chunks = [chunk for chunk in parsed_chunks.children
if type(chunk) is ListChunkDescriptor]
self.info_chunk = next((chunk for chunk in list_chunks
self.info_chunk = next((chunk for chunk in list_chunks
if chunk.signature == b'INFO'), None)
#: 'ICOP' Copyright
self.copyright : Optional[str] = self._get_field(f, b'ICOP')
self.copyright: Optional[str] = self._get_field(f, b'ICOP')
#: 'IPRD' Product
self.product : Optional[str]= self._get_field(f, b'IPRD')
self.album : Optional[str] = self.product
self.product: Optional[str] = self._get_field(f, b'IPRD')
self.album: Optional[str] = self.product
#: 'IGNR' Genre
self.genre : Optional[str] = self._get_field(f, b'IGNR')
self.genre: Optional[str] = self._get_field(f, b'IGNR')
#: 'ISBJ' Subject
self.subject : Optional[str] = self._get_field(f, b'ISBJ')
self.subject: Optional[str] = self._get_field(f, b'ISBJ')
#: 'IART' Artist, composer, author
self.artist : Optional[str] = self._get_field(f, b'IART')
self.artist: Optional[str] = self._get_field(f, b'IART')
#: 'ICMT' Comment
self.comment : Optional[str] = self._get_field(f, b'ICMT')
self.comment: Optional[str] = self._get_field(f, b'ICMT')
#: 'ISFT' Software, encoding application
self.software : Optional[str] = self._get_field(f, b'ISFT')
self.software: Optional[str] = self._get_field(f, b'ISFT')
#: 'ICRD' Created date
self.created_date : Optional[str] = self._get_field(f, b'ICRD')
self.created_date: Optional[str] = self._get_field(f, b'ICRD')
#: 'IENG' Engineer
self.engineer : Optional[str] = self._get_field(f, b'IENG')
self.engineer: Optional[str] = self._get_field(f, b'IENG')
#: 'ITCH' Technician
self.technician : Optional[str] = self._get_field(f, b'ITCH')
self.technician: Optional[str] = self._get_field(f, b'ITCH')
#: 'IKEY' Keywords, keyword list
self.keywords : Optional[str] = self._get_field(f, b'IKEY')
self.keywords: Optional[str] = self._get_field(f, b'IKEY')
#: 'INAM' Name, title
self.title : Optional[str] = self._get_field(f, b'INAM')
self.title: Optional[str] = self._get_field(f, b'INAM')
#: 'ISRC' Source
self.source : Optional[str] = self._get_field(f, b'ISRC')
self.source: Optional[str] = self._get_field(f, b'ISRC')
#: 'TAPE' Tape
self.tape : Optional[str] = self._get_field(f, b'TAPE')
self.tape: Optional[str] = self._get_field(f, b'TAPE')
#: 'IARL' Archival Location
self.archival_location : Optional[str] = self._get_field(f, b'IARL')
self.archival_location: Optional[str] = self._get_field(f, b'IARL')
#: 'ICSM' Commissioned
self.commissioned : Optional[str] = self._get_field(f, b'ICMS')
self.commissioned: Optional[str] = self._get_field(f, b'ICMS')
def _get_field(self, f, field_ident) -> Optional[str]:
search = next(((chunk.start, chunk.length) \
for chunk in self.info_chunk.children \
if chunk.ident == field_ident),
search = next(((chunk.start, chunk.length)
for chunk in self.info_chunk.children
if chunk.ident == field_ident),
None)
if search is not None:
@@ -64,7 +65,7 @@ class WavInfoChunkReader:
else:
return None
def to_dict(self) -> dict: #FIXME should be asdict
def to_dict(self) -> dict: # FIXME should be asdict
"""
A dictionary with all of the key/values read from the INFO scope.
"""

View File

@@ -7,11 +7,12 @@ from typing import NamedTuple
class IXMLTrack(NamedTuple):
channel_index: int
interleave_index: int
name: str
channel_index: int
interleave_index: int
name: str
function: str
class SteinbergMetadata:
"""
Vendor-specific Steinberg metadata.
@@ -34,7 +35,7 @@ class SteinbergMetadata:
CINE_71 = 27
SDDS_70 = 24
SDDS_71 = 26
MUSIC_60 = 21 #??
MUSIC_60 = 21 # ??
MUSIC_61 = 23
ATMOS_712 = 33
ATMOS_504 = 35
@@ -78,7 +79,7 @@ class SteinbergMetadata:
`AudioSpeakerArrangement` property
"""
val = self.parsed.find(
"./ATTR_LIST/ATTR[NAME = 'AudioSpeakerArrangement']/VALUE")
"./ATTR_LIST/ATTR[NAME = 'AudioSpeakerArrangement']/VALUE")
if val is not None:
return type(self).AudioSpeakerArrangement(int(val.text))
@@ -88,7 +89,7 @@ class SteinbergMetadata:
AudioSampleFormatSize
"""
val = self.parsed.find(
"./ATTR_LIST/ATTR[NAME = 'AudioSampleFormatSize']/VALUE")
"./ATTR_LIST/ATTR[NAME = 'AudioSampleFormatSize']/VALUE")
if val is not None:
return int(val.text)
@@ -98,7 +99,7 @@ class SteinbergMetadata:
MediaCompany
"""
val = self.parsed.find(
"./ATTR_LIST/ATTR[NAME = 'MediaCompany']/VALUE")
"./ATTR_LIST/ATTR[NAME = 'MediaCompany']/VALUE")
if val is not None:
return val.text
@@ -108,7 +109,7 @@ class SteinbergMetadata:
MediaDropFrames
"""
val = self.parsed.find(
"./ATTR_LIST/ATTR[NAME = 'MediaDropFrames']/VALUE")
"./ATTR_LIST/ATTR[NAME = 'MediaDropFrames']/VALUE")
if val is not None:
return val.text == "1"
@@ -118,7 +119,7 @@ class SteinbergMetadata:
MediaDuration
"""
val = self.parsed.find(
"./ATTR_LIST/ATTR[NAME = 'MediaDuration']/VALUE")
"./ATTR_LIST/ATTR[NAME = 'MediaDuration']/VALUE")
if val is not None:
return float(val.text)
@@ -155,6 +156,7 @@ class WavIXMLFormat:
"""
iXML recorder metadata.
"""
def __init__(self, xml):
"""
Parse iXML.
@@ -163,13 +165,13 @@ class WavIXMLFormat:
self.source = xml
xml_bytes = io.BytesIO(xml)
parser = ET.XMLParser(recover=True)
self.parsed : ET.ElementTree = ET.parse(xml_bytes, parser=parser)
self.parsed: ET.ElementTree = ET.parse(xml_bytes, parser=parser)
def _get_text_value(self, xpath) -> Optional[str]:
e = self.parsed.find("./" + xpath)
if e is not None:
return e.text
else:
else:
return None
def xml_str(self) -> str:
@@ -192,15 +194,12 @@ class WavIXMLFormat:
for track in self.parsed.find("./TRACK_LIST").iter():
if track.tag == 'TRACK':
yield IXMLTrack(
channel_index=
track.xpath('string(CHANNEL_INDEX/text())'),
interleave_index=
track.xpath('string(INTERLEAVE_INDEX/text())'),
name=
track.xpath('string(NAME/text())'),
function=
track.xpath('string(FUNCTION/text())')
)
channel_index=track.xpath('string(CHANNEL_INDEX/text())'),
interleave_index=track.xpath(
'string(INTERLEAVE_INDEX/text())'),
name=track.xpath('string(NAME/text())'),
function=track.xpath('string(FUNCTION/text())')
)
@property
def project(self) -> Optional[str]:
@@ -217,7 +216,7 @@ class WavIXMLFormat:
return self._get_text_value("SCENE")
@property
def take(self) -> Optional[str]:
def take(self) -> Optional[str]:
"""
Take number.
"""
@@ -257,7 +256,7 @@ class WavIXMLFormat:
def to_dict(self):
return dict(
track_list=list(map(lambda x: x._asdict(), self.track_list)),
project=self.project, scene=self.scene, take=self.take,
tape=self.tape, family_uid=self.family_uid,
family_name=self.family_name )
track_list=list(map(lambda x: x._asdict(), self.track_list)),
project=self.project, scene=self.scene, take=self.take,
tape=self.tape, family_uid=self.family_uid,
family_name=self.family_name)

View File

@@ -1,4 +1,4 @@
#-*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import struct
import os
from typing import Optional, Generator, Any, NamedTuple
@@ -14,20 +14,23 @@ from .wave_dbmd_reader import WavDolbyMetadataReader
from .wave_cues_reader import WavCuesReader
#: Calculated statistics about the audio data.
class WavDataDescriptor(NamedTuple):
byte_count: int
byte_count: int
frame_count: int
#: The format of the audio samples.
class WavAudioFormat(NamedTuple):
audio_format: int
channel_count: int
sample_rate: int
byte_rate: int
audio_format: int
channel_count: int
sample_rate: int
byte_rate: int
block_align: int
bits_per_sample: int
class WavInfoReader:
"""
Parse a WAV audio file for metadata.
@@ -50,39 +53,39 @@ class WavInfoReader:
fields of the Broadcast-WAV extension. Per EBU 3285 this is ASCII
but this parameter is available to you if you encounter a weirdo.
"""
self.info_encoding = info_encoding
self.bext_encoding = bext_encoding
#: Wave audio data format.
self.fmt :Optional[WavAudioFormat] = None
self.fmt: Optional[WavAudioFormat] = None
#: Statistics of the `data` section.
self.data :Optional[WavDataDescriptor] = None
self.data: Optional[WavDataDescriptor] = None
#: Broadcast-Wave metadata.
self.bext :Optional[WavBextReader] = None
self.bext: Optional[WavBextReader] = None
#: iXML metadata.
self.ixml :Optional[WavIXMLFormat] = None
self.ixml: Optional[WavIXMLFormat] = None
#: ADM Audio Definiton Model metadata.
self.adm :Optional[WavADMReader]= None
self.adm: Optional[WavADMReader] = None
#: Dolby bitstream metadata.
self.dolby :Optional[WavDolbyMetadataReader] = None
self.dolby: Optional[WavDolbyMetadataReader] = None
#: RIFF INFO metadata.
self.info :Optional[WavInfoChunkReader]= None
self.info: Optional[WavInfoChunkReader] = None
#: RIFF cues markers, labels, and notes.
self.cues :Optional[WavCuesReader] = None
self.cues: Optional[WavCuesReader] = None
if hasattr(f, 'read'):
self.get_wav_info(f)
self.url = 'about:blank'
self.path = repr(f)
else:
absolute_path = os.path.abspath(f)
@@ -90,13 +93,13 @@ class WavInfoReader:
self.url: str = pathlib.Path(absolute_path).as_uri()
self.path = absolute_path
with open(f, 'rb') as f:
self.get_wav_info(f)
def get_wav_info(self, wavfile):
chunks = parse_chunk(wavfile)
assert type(chunks) is ListChunkDescriptor
assert type(chunks) is ListChunkDescriptor
self.main_list = chunks.children
wavfile.seek(0)
@@ -104,16 +107,16 @@ class WavInfoReader:
self.fmt = self._get_format(wavfile)
self.bext = self._get_bext(wavfile, encoding=self.bext_encoding)
self.ixml = self._get_ixml(wavfile)
self.adm = self._get_adm(wavfile)
self.adm = self._get_adm(wavfile)
self.info = self._get_info(wavfile, encoding=self.info_encoding)
self.dolby = self._get_dbmd(wavfile)
self.cues = self._get_cue(wavfile)
self.data = self._describe_data()
def _find_chunk_data(self, ident, from_stream,
def _find_chunk_data(self, ident, from_stream,
default_none=False) -> Optional[bytes]:
top_chunks = (chunk for chunk in self.main_list \
if type(chunk) is ChunkDescriptor and chunk.ident == ident)
top_chunks = (chunk for chunk in self.main_list
if type(chunk) is ChunkDescriptor and chunk.ident == ident)
chunk_descriptor = next(top_chunks, None) \
if default_none else next(top_chunks)
@@ -122,19 +125,19 @@ class WavInfoReader:
if chunk_descriptor else None
def _find_list_chunk(self, signature) -> Optional[ListChunkDescriptor]:
top_chunks = (chunk for chunk in self.main_list \
if type(chunk) is ListChunkDescriptor and \
chunk.signature == signature)
top_chunks = (chunk for chunk in self.main_list
if type(chunk) is ListChunkDescriptor and
chunk.signature == signature)
return next(top_chunks, None)
def _describe_data(self):
data_chunk = next(c for c in self.main_list \
if type(c) is ChunkDescriptor and c.ident == b'data')
data_chunk = next(c for c in self.main_list
if type(c) is ChunkDescriptor and c.ident == b'data')
assert isinstance(self.fmt, WavAudioFormat)
return WavDataDescriptor(
byte_count=data_chunk.length,
byte_count=data_chunk.length,
frame_count=int(data_chunk.length / self.fmt.block_align))
def _get_format(self, f):
@@ -155,8 +158,8 @@ class WavInfoReader:
)
def _get_info(self, f, encoding):
finder = (chunk.signature for chunk in self.main_list \
if type(chunk) is ListChunkDescriptor)
finder = (chunk.signature for chunk in self.main_list
if type(chunk) is ListChunkDescriptor)
if b'INFO' in finder:
return WavInfoChunkReader(f, encoding)
@@ -181,9 +184,9 @@ class WavInfoReader:
return WavIXMLFormat(ixml_data.rstrip(b'\0')) if ixml_data else None
def _get_cue(self, f):
cue = next((cue_chunk for cue_chunk in self.main_list if \
type(cue_chunk) is ChunkDescriptor and \
cue_chunk.ident == b'cue '), None)
cue = next((cue_chunk for cue_chunk in self.main_list if
type(cue_chunk) is ChunkDescriptor and
cue_chunk.ident == b'cue '), None)
adtl = self._find_list_chunk(b'adtl')
labls = []
@@ -194,20 +197,21 @@ class WavInfoReader:
ltxts = [c for c in adtl.children if c.ident == b'ltxt']
notes = [c for c in adtl.children if c.ident == b'note']
return WavCuesReader.read_all(f, cue, labls, ltxts, notes,
fallback_encoding=self.info_encoding)
return WavCuesReader.read_all(f, cue, labls, ltxts, notes,
fallback_encoding=self.info_encoding)
def walk(self) -> Generator[str,str,Any]: #FIXME: this should probably be named "iter()"
# FIXME: this should probably be named "iter()"
def walk(self) -> Generator[str, str, Any]:
"""
Walk all of the available metadata fields.
:yields: tuples of the *scope*, *key*, and *value* of
each metadatum. The *scope* value will be one of
"fmt", "data", "ixml", "bext", "info", "dolby", "cues" or "adm".
"""
scopes = ('fmt', 'data', 'ixml', 'bext', 'info', 'adm', 'cues',
'dolby')
scopes = ('fmt', 'data', 'ixml', 'bext', 'info', 'adm', 'cues',
'dolby')
for scope in scopes:
if scope in ['fmt', 'data']:
@@ -216,11 +220,12 @@ class WavInfoReader:
yield scope, field, attr.__getattribute__(field)
else:
dict = self.__getattribute__(scope).to_dict() if self.__getattribute__(scope) else {}
dict = self.__getattribute__(scope).to_dict(
) if self.__getattribute__(scope) else {}
for key in dict.keys():
yield scope, key, dict[key]
def __repr__(self):
return 'WavInfoReader({}, {}, {})'.format(self.path,
self.info_encoding,
return 'WavInfoReader({}, {}, {})'.format(self.path,
self.info_encoding,
self.bext_encoding)

View File

@@ -12,20 +12,19 @@ def main():
parser.usage = "wavfind [--scene=SCENE] [--take=TAKE] [--desc=DESC] <PATH> +"
primaries = OptionGroup(parser, title="Search Predicates",
description="Argument values can be globs, and are logically-AND'ed.")
description="Argument values can be globs, and are logically-AND'ed.")
primaries.add_option("--scene",
help='Search for this scene',
metavar='SCENE')
primaries.add_option("--scene",
help='Search for this scene',
metavar='SCENE')
primaries.add_option("--take",
help='Search for this take',
metavar='TAKE')
help='Search for this take',
metavar='TAKE')
primaries.add_option("--desc",
help='Search descriptions',
metavar='DESC')
help='Search descriptions',
metavar='DESC')
(options, args) = parser.parse_args(sys.argv)