Merge pull request #29 from iluvcapra/maint-reds

Added flake8 Linting
This commit is contained in:
Jamie Hardt
2023-11-08 21:14:56 -08:00
committed by GitHub
20 changed files with 657 additions and 531 deletions

3
.flake8 Normal file
View File

@@ -0,0 +1,3 @@
[flake8]
per-file-ignores =
wavinfo/__init__.py: F401

View File

@@ -40,3 +40,4 @@ jobs:
- name: Test with pytest
run: |
pytest
flake8 wavinfo

View File

@@ -15,17 +15,20 @@ class TestADMWave(TestCase):
adm = info.adm
self.assertIsNotNone(adm)
assert adm is not None
self.assertEqual(len(adm.channel_uids), 14)
def test_to_dict(self):
info = wavinfo.WavInfoReader(self.protools_adm_wav)
adm = info.adm
assert adm is not None
dict = adm.to_dict()
self.assertIsNotNone(dict)
def test_programme(self):
info = wavinfo.WavInfoReader(self.protools_adm_wav)
adm = info.adm
assert adm is not None
pdict = adm.programme()
self.assertIn("programme_id", pdict.keys())
self.assertIn("programme_name", pdict.keys())
@@ -37,7 +40,7 @@ class TestADMWave(TestCase):
def test_track_info(self):
info = wavinfo.WavInfoReader(self.protools_adm_wav)
adm = info.adm
assert adm is not None
t1 = adm.track_info(0)
self.assertTrue("channel_format_name" in t1.keys())
self.assertEqual("RoomCentricLeft", t1["channel_format_name"])

View File

@@ -12,6 +12,7 @@ class TestCue(TestCase):
file1 = "tests/test_files/cue_chunks/STE-000.wav"
w1 = wavinfo.WavInfoReader(file1)
self.assertIsNotNone(w1.cues)
assert w1.cues is not None
vals = list(w1.cues.each_cue())
self.assertEqual(vals, [(1,29616),(2,74592),(3,121200)])

View File

@@ -1,7 +1,7 @@
from unittest import TestCase
import wavinfo
from wavinfo.wave_dbmd_reader import SegmentType, DolbyAtmosMetadata, DolbyDigitalPlusMetadata
from wavinfo.wave_dbmd_reader import SegmentType, DolbyDigitalPlusMetadata
class TestDolby(TestCase):
def setUp(self):
@@ -19,8 +19,10 @@ class TestDolby(TestCase):
d = t1.dolby
assert d is not None
ddp = [x for x in d.segment_list if x[0] == SegmentType.DolbyDigitalPlus]
atmos = [x for x in d.segment_list if x[0] == SegmentType.DolbyAtmos]
ddp = [x for x in d.segment_list \
if x[0] == SegmentType.DolbyDigitalPlus]
atmos = [x for x in d.segment_list \
if x[0] == SegmentType.DolbyAtmos]
self.assertEqual(len(ddp), 1)
self.assertEqual(len(atmos), 1)
@@ -38,8 +40,13 @@ class TestDolby(TestCase):
d = t1.dolby
assert d is not None
ddp = d.dolby_digital_plus()
self.assertEqual(len(ddp), 1, "Failed to find exactly one Dolby Digital Plus metadata segment")
self.assertTrue( ddp[0].audio_coding_mode, DolbyDigitalPlusMetadata.AudioCodingMode.CH_ORD_3_2 )
self.assertEqual(len(ddp), 1,
("Failed to find exactly one Dolby Digital Plus "
"metadata segment")
)
self.assertTrue( ddp[0].audio_coding_mode,
DolbyDigitalPlusMetadata.AudioCodingMode.CH_ORD_3_2 )
self.assertTrue( ddp[0].lfe_on)
def test_atmos(self):
@@ -47,6 +54,7 @@ class TestDolby(TestCase):
d = t1.dolby
assert d is not None
atmos = d.dolby_atmos()
self.assertEqual(len(atmos), 1, "Failed to find exactly one Atmos metadata segment")
self.assertEqual(len(atmos), 1,
"Failed to find exactly one Atmos metadata segment")

View File

@@ -26,7 +26,8 @@ class MainTest(unittest.TestCase):
def test_ixml(self):
with patch.object(sys, 'argv',
['TEST', '--ixml', 'tests/test_files/sounddevices/A101_1.WAV']):
['TEST', '--ixml',
'tests/test_files/sounddevices/A101_1.WAV']):
try:
main()
except:

View File

@@ -13,7 +13,9 @@ class TestWaveInfo(TestCase):
def test_sanity(self):
for wav_file in all_files():
info = wavinfo.WavInfoReader(wav_file)
self.assertEqual(info.__repr__(), 'WavInfoReader({}, latin_1, ascii)'.format(os.path.abspath(wav_file)))
self.assertEqual(info.__repr__(),
'WavInfoReader({}, latin_1, ascii)'
.format(os.path.abspath(wav_file)))
self.assertIsNotNone(info)
def test_fmt_against_ffprobe(self):
@@ -24,14 +26,21 @@ class TestWaveInfo(TestCase):
assert info.fmt is not None
assert ffprobe_info is not None
self.assertEqual(info.fmt.channel_count, ffprobe_info['streams'][0]['channels'])
self.assertEqual(info.fmt.sample_rate, int(ffprobe_info['streams'][0]['sample_rate']))
self.assertEqual(info.fmt.bits_per_sample, int(ffprobe_info['streams'][0]['bits_per_sample']))
self.assertEqual(info.fmt.channel_count,
ffprobe_info['streams'][0]['channels'])
self.assertEqual(info.fmt.sample_rate,
int(ffprobe_info['streams'][0]['sample_rate']))
self.assertEqual(info.fmt.bits_per_sample,
int(ffprobe_info['streams'][0]['bits_per_sample']
))
if info.fmt.audio_format == 1:
self.assertTrue(ffprobe_info['streams'][0]['codec_name'].startswith('pcm'))
self.assertTrue(ffprobe_info['streams'][0]['codec_name']\
.startswith('pcm'))
streams = ffprobe_info['streams'][0]
byte_rate = int(streams['sample_rate']) * streams['channels'] * int(streams['bits_per_sample']) / 8
byte_rate = int(streams['sample_rate']) * \
streams['channels'] * \
int(streams['bits_per_sample']) / 8
self.assertEqual(info.fmt.byte_rate, byte_rate)
def test_data_against_ffprobe(self):
@@ -40,7 +49,8 @@ class TestWaveInfo(TestCase):
ffprobe_info = cast(Dict[str,Any], ffprobe(wav_file))
assert ffprobe_info is not None
assert info.data is not None
self.assertEqual(info.data.frame_count, int(ffprobe_info['streams'][0]['duration_ts']))
self.assertEqual(info.data.frame_count,
int(ffprobe_info['streams'][0]['duration_ts']))
def test_bext_against_ffprobe(self):
for wav_file in all_files():
@@ -50,39 +60,63 @@ class TestWaveInfo(TestCase):
if info.bext:
if 'comment' in ffprobe_info['format']['tags']:
self.assertEqual(info.bext.description, ffprobe_info['format']['tags']['comment'])
self.assertEqual(info.bext.description,
ffprobe_info['format']['tags']\
['comment'])
else:
self.assertEqual(info.bext.description, '')
if 'encoded_by' in ffprobe_info['format']['tags']:
self.assertEqual(info.bext.originator, ffprobe_info['format']['tags']['encoded_by'])
self.assertEqual(info.bext.originator,
ffprobe_info['format']['tags']\
['encoded_by'])
else:
self.assertEqual(info.bext.originator, '')
if 'originator_reference' in ffprobe_info['format']['tags']:
self.assertEqual(info.bext.originator_ref, ffprobe_info['format']['tags']['originator_reference'])
self.assertEqual(info.bext.originator_ref,
ffprobe_info['format']['tags']\
['originator_reference'])
else:
self.assertEqual(info.bext.originator_ref, '')
# these don't always reflect the bext info
# self.assertEqual(info.bext.originator_date, ffprobe_info['format']['tags']['date'])
# self.assertEqual(info.bext.originator_time, ffprobe_info['format']['tags']['creation_time'])
self.assertEqual(info.bext.time_reference, int(ffprobe_info['format']['tags']['time_reference']))
# self.assertEqual(info.bext.originator_date,
# ffprobe_info['format']['tags']['date'])
# self.assertEqual(info.bext.originator_time,
# ffprobe_info['format']['tags']['creation_time'])
self.assertEqual(info.bext.time_reference,
int(ffprobe_info['format']['tags']\
['time_reference']))
if 'coding_history' in ffprobe_info['format']['tags']:
self.assertEqual(info.bext.coding_history, ffprobe_info['format']['tags']['coding_history'])
self.assertEqual(info.bext.coding_history,
ffprobe_info['format']['tags']\
['coding_history'])
else:
self.assertEqual(info.bext.coding_history, '')
def test_ixml(self):
expected = {'A101_4.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '4',
'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124015008231000'},
'A101_3.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '3',
'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124014008228300'},
'A101_2.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '2',
'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124004008218600'},
'A101_1.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '1',
'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124001008206300'},
expected = {'A101_4.WAV': {'project': 'BMH',
'scene': 'A101', 'take': '4',
'tape': '18Y12M31',
'family_uid':
'USSDVGR1112089007124015008231000'},
'A101_3.WAV': {'project': 'BMH',
'scene': 'A101', 'take': '3',
'tape': '18Y12M31',
'family_uid':
'USSDVGR1112089007124014008228300'},
'A101_2.WAV': {'project': 'BMH',
'scene': 'A101', 'take': '2',
'tape': '18Y12M31',
'family_uid':
'USSDVGR1112089007124004008218600'},
'A101_1.WAV': {'project': 'BMH',
'scene': 'A101', 'take': '1',
'tape': '18Y12M31',
'family_uid':
'USSDVGR1112089007124001008206300'},
}
for wav_file in all_files():
@@ -112,7 +146,8 @@ class TestWaveInfo(TestCase):
assert info.ixml.steinberg is not None
self.assertIsNotNone(info.ixml.steinberg.audio_speaker_arrangement)
self.assertEqual(info.ixml.steinberg.sample_format_size, 3)
self.assertEqual(info.ixml.steinberg.media_company, "https://github.com/iluvcapra/wavinfo")
self.assertEqual(info.ixml.steinberg.media_company,
"https://github.com/iluvcapra/wavinfo")
self.assertFalse(info.ixml.steinberg.media_drop_frames)
self.assertEqual(info.ixml.steinberg.media_duration, 1200.0)
@@ -124,7 +159,8 @@ class TestWaveInfo(TestCase):
self.assertIsNone(info.ixml.steinberg)
def test_info_metadata(self):
file_with_metadata = 'tests/test_files/sound_grinder_pro/new_camera bumb 1.wav'
file_with_metadata = \
'tests/test_files/sound_grinder_pro/new_camera bumb 1.wav'
self.assertTrue(os.path.exists(file_with_metadata))
info = wavinfo.WavInfoReader(file_with_metadata).info
@@ -138,7 +174,8 @@ class TestWaveInfo(TestCase):
self.assertEqual(info.software, 'Sound Grinder Pro')
self.assertEqual(info.created_date, '2010-12-28')
self.assertEqual(info.engineer, 'JPH')
self.assertEqual(info.keywords, 'Sound Effect, movement, microphone, bump')
self.assertEqual(info.keywords,
'Sound Effect, movement, microphone, bump')
self.assertEqual(info.title, 'camera bumb 1')
self.assertEqual(type(info.to_dict()), dict)
self.assertEqual(type(info.__repr__()), str)

View File

@@ -8,7 +8,8 @@ FFPROBE = 'ffprobe'
def ffprobe(path):
arguments = [FFPROBE, "-of", "json", "-show_format", "-show_streams", path]
arguments = [FFPROBE, "-of", "json",
"-show_format", "-show_streams", path]
if int(sys.version[0]) < 3:
process = subprocess.Popen(arguments, stdout=PIPE)
process.wait()
@@ -20,7 +21,8 @@ def ffprobe(path):
else:
return None
else:
process = subprocess.run(arguments, stdin=None, stdout=PIPE, stderr=PIPE)
process = subprocess.run(arguments, stdin=None,
stdout=PIPE, stderr=PIPE)
if process.returncode == 0:
output_str = process.stdout.decode('utf-8')
return json.loads(output_str)

View File

@@ -1,11 +1,13 @@
from optparse import OptionParser, OptionGroup
import datetime
from . import WavInfoReader
from . import __version__
from optparse import OptionParser
import sys
import json
from enum import Enum
class MyJSONEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, Enum):
@@ -13,28 +15,30 @@ class MyJSONEncoder(json.JSONEncoder):
else:
return super().default(o)
class MissingDataError(RuntimeError):
pass
def main():
parser = OptionParser()
parser.usage = 'wavinfo (--adm | --ixml) <FILE> +'
# parser.add_option('-f', dest='output_format', help='Set the output format',
# default='json',
# metavar='FORMAT')
parser.add_option('--adm', dest='adm',
help='Output ADM XML',
default=False,
action='store_true')
parser.add_option('--adm', dest='adm', help='Output ADM XML',
default=False, action='store_true')
parser.add_option('--ixml', dest='ixml', help='Output iXML',
default=False, action='store_true')
parser.add_option('--ixml', dest='ixml',
help='Output iXML',
default=False,
action='store_true')
(options, args) = parser.parse_args(sys.argv)
for arg in args[1:]:
try:
this_file = WavInfoReader(path=arg)
this_file = WavInfoReader(f=arg)
if options.adm:
if this_file.adm:
sys.stdout.write(this_file.adm.xml_str())
@@ -47,9 +51,9 @@ def main():
raise MissingDataError("ixml")
else:
ret_dict = {
'filename': arg,
'run_date': datetime.datetime.now().isoformat() ,
'application': "wavinfo " + __version__,
'filename': arg,
'run_date': datetime.datetime.now().isoformat(),
'application': "wavinfo " + __version__,
'scopes': {}
}
for scope, name, value in this_file.walk():
@@ -60,7 +64,8 @@ def main():
json.dump(ret_dict, cls=MyJSONEncoder, fp=sys.stdout, indent=2)
except MissingDataError as e:
print("MissingDataError: Missing metadata (%s) in file %s" % (e, arg), file=sys.stderr)
print("MissingDataError: Missing metadata (%s) in file %s" %
(e, arg), file=sys.stderr)
continue
except Exception as e:
raise e

View File

@@ -1,14 +1,18 @@
import struct
from collections import namedtuple
# from collections import namedtuple
from typing import NamedTuple, Dict
from . import riff_parser
RF64Context = namedtuple('RF64Context','sample_count bigchunk_table')
class RF64Context(NamedTuple):
sample_count: int
bigchunk_table: Dict[str, int]
def parse_rf64(stream, signature = b'RF64') -> RF64Context:
def parse_rf64(stream, signature=b'RF64') -> RF64Context:
start = stream.tell()
assert( stream.read(4) == b'WAVE' )
assert stream.read(4) == b'WAVE'
ds64_chunk = riff_parser.parse_chunk(stream)
assert type(ds64_chunk) is riff_parser.ChunkDescriptor, \
@@ -16,27 +20,28 @@ def parse_rf64(stream, signature = b'RF64') -> RF64Context:
ds64_field_spec = "<QQQI"
ds64_fields_size = struct.calcsize(ds64_field_spec)
assert(ds64_chunk.ident == b'ds64')
assert ds64_chunk.ident == b'ds64'
ds64_data = ds64_chunk.read_data(stream)
assert(len(ds64_data) >= ds64_fields_size)
assert len(ds64_data) >= ds64_fields_size
riff_size, data_size, sample_count, length_lookup_table = struct.unpack(
ds64_field_spec, ds64_data[0:ds64_fields_size])
ds64_field_spec, ds64_data[0:ds64_fields_size]
)
bigchunk_table = {}
chunksize64format = "<4sL"
# chunksize64size = struct.calcsize(chunksize64format)
for _ in range(length_lookup_table):
bigname, bigsize = struct.unpack_from(chunksize64format, ds64_data,
offset= ds64_fields_size)
bigname, bigsize = struct.unpack_from(chunksize64format,
ds64_data,
offset=ds64_fields_size)
bigchunk_table[bigname] = bigsize
bigchunk_table[b'data'] = data_size
bigchunk_table[signature] = riff_size
stream.seek(start, 0)
return RF64Context( sample_count=sample_count,
return RF64Context(sample_count=sample_count,
bigchunk_table=bigchunk_table)

View File

@@ -1,7 +1,7 @@
# from optparse import Option
import struct
from collections import namedtuple
from .rf64_parser import parse_rf64
from .rf64_parser import parse_rf64, RF64Context
from typing import NamedTuple, Union, List, Optional
class WavInfoEOFError(EOFError):
@@ -10,11 +10,17 @@ class WavInfoEOFError(EOFError):
self.chunk_start = chunk_start
class ListChunkDescriptor(namedtuple('ListChunkDescriptor', 'signature children')):
pass
class ListChunkDescriptor(NamedTuple):
signature: bytes
children: List[Union['ChunkDescriptor', 'ListChunkDescriptor']]
class ChunkDescriptor(namedtuple('ChunkDescriptor', 'ident start length rf64_context')):
class ChunkDescriptor(NamedTuple):
ident: bytes
start: int
length: int
rf64_context: Optional[RF64Context]
def read_data(self, from_stream) -> bytes:
from_stream.seek(self.start)
return from_stream.read(self.length)
@@ -49,8 +55,8 @@ def parse_chunk(stream, rf64_context=None):
rf64_context = parse_rf64(stream=stream, signature=ident)
assert rf64_context is not None, \
f"Sentinel data size 0xFFFFFFFF found outside of RF64 context"
"Sentinel data size 0xFFFFFFFF found outside of RF64 context"
data_size = rf64_context.bigchunk_table[ident]
displacement = data_size
@@ -64,5 +70,7 @@ def parse_chunk(stream, rf64_context=None):
else:
data_start = stream.tell()
stream.seek(displacement, 1)
return ChunkDescriptor(ident=ident, start=data_start, length=data_size,
return ChunkDescriptor(ident=ident,
start=data_start,
length=data_size,
rf64_context=rf64_context)

View File

@@ -2,8 +2,8 @@
# def binary_to_string(binary_value):
# return reduce(lambda val, el: val + "{:02x}".format(el), binary_value, '')
# return reduce(lambda val, el: val + "{:02x}".format(el),
# binary_value, '')
# class UMIDParser:
# """
@@ -13,109 +13,109 @@
# """
# def __init__(self, raw_umid: bytes):
# self.raw_umid = raw_umid
#
# @property
# def universal_label(self) -> bytearray:
# return self.raw_umid[0:12]
#
# @property
# def basic_umid(self):
# return self.raw_umid[0:32]
#
# @property
# def universal_label(self) -> bytearray:
# return self.raw_umid[0:12]
#
# @property
# def basic_umid(self):
# return self.raw_umid[0:32]
# def basic_umid_to_str(self):
# return binary_to_string(self.raw_umid[0:32])
#
# @property
# def universal_label_is_valid(self) -> bool:
# valid_preamble = b'\x06\x0a\x2b\x34\x01\x01\x01\x05\x01\x01'
# return self.universal_label[0:len(valid_preamble)] == valid_preamble
#
# @property
# def material_type(self) -> str:
# material_byte = self.raw_umid[10]
# if material_byte == 0x1:
# return 'picture'
# elif material_byte == 0x2:
# return 'audio'
# elif material_byte == 0x3:
# return 'data'
# elif material_byte == 0x4:
# return 'other'
# elif material_byte == 0x5:
# return 'picture_single_component'
# elif material_byte == 0x6:
# return 'picture_multiple_component'
# elif material_byte == 0x7:
# return 'audio_single_component'
# elif material_byte == 0x9:
# return 'audio_multiple_component'
# elif material_byte == 0xb:
# return 'auxiliary_single_component'
# elif material_byte == 0xc:
# return 'auxiliary_multiple_component'
# elif material_byte == 0xd:
# return 'mixed_components'
# elif material_byte == 0xf:
# return 'not_identified'
# else:
# return 'not_recognized'
#
# @property
# def material_number_creation_method(self) -> str:
# method_byte = self.raw_umid[11]
# method_byte = (method_byte << 4) & 0xf
# if method_byte == 0x0:
# return 'undefined'
# elif method_byte == 0x1:
# return 'smpte'
# elif method_byte == 0x2:
# return 'uuid'
# elif method_byte == 0x3:
# return 'masked'
# elif method_byte == 0x4:
# return 'ieee1394'
# elif 0x5 <= method_byte <= 0x7:
# return 'reserved_undefined'
# else:
# return 'unrecognized'
#
# @property
# def instance_number_creation_method(self) -> str:
# method_byte = self.raw_umid[11]
# method_byte = method_byte & 0xf
# if method_byte == 0x0:
# return 'undefined'
# elif method_byte == 0x01:
# return 'local_registration'
# elif method_byte == 0x02:
# return '24_bit_prs'
# elif method_byte == 0x03:
# return 'copy_number_and_16_bit_prs'
# elif 0x04 <= method_byte <= 0x0e:
# return 'reserved_undefined'
# elif method_byte == 0x0f:
# return 'live_stream'
# else:
# return 'unrecognized'
#
# @property
# def indicated_length(self) -> str:
# if self.raw_umid[12] == 0x13:
# return 'basic'
# elif self.raw_umid[12] == 0x33:
# return 'extended'
#
# @property
# def instance_number(self) -> bytearray:
# return self.raw_umid[13:3]
#
# @property
# def material_number(self) -> bytearray:
# return self.raw_umid[16:16]
#
# @property
# def source_pack(self) -> Union[bytearray, None]:
# if self.indicated_length == 'extended':
# return self.raw_umid[32:32]
# else:
# return None
# def basic_umid_to_str(self):
# return binary_to_string(self.raw_umid[0:32])
#
# @property
# def universal_label_is_valid(self) -> bool:
# valid_preamble = b'\x06\x0a\x2b\x34\x01\x01\x01\x05\x01\x01'
# return self.universal_label[0:len(valid_preamble)] == valid_preamble
#
# @property
# def material_type(self) -> str:
# material_byte = self.raw_umid[10]
# if material_byte == 0x1:
# return 'picture'
# elif material_byte == 0x2:
# return 'audio'
# elif material_byte == 0x3:
# return 'data'
# elif material_byte == 0x4:
# return 'other'
# elif material_byte == 0x5:
# return 'picture_single_component'
# elif material_byte == 0x6:
# return 'picture_multiple_component'
# elif material_byte == 0x7:
# return 'audio_single_component'
# elif material_byte == 0x9:
# return 'audio_multiple_component'
# elif material_byte == 0xb:
# return 'auxiliary_single_component'
# elif material_byte == 0xc:
# return 'auxiliary_multiple_component'
# elif material_byte == 0xd:
# return 'mixed_components'
# elif material_byte == 0xf:
# return 'not_identified'
# else:
# return 'not_recognized'
#
# @property
# def material_number_creation_method(self) -> str:
# method_byte = self.raw_umid[11]
# method_byte = (method_byte << 4) & 0xf
# if method_byte == 0x0:
# return 'undefined'
# elif method_byte == 0x1:
# return 'smpte'
# elif method_byte == 0x2:
# return 'uuid'
# elif method_byte == 0x3:
# return 'masked'
# elif method_byte == 0x4:
# return 'ieee1394'
# elif 0x5 <= method_byte <= 0x7:
# return 'reserved_undefined'
# else:
# return 'unrecognized'
#
# @property
# def instance_number_creation_method(self) -> str:
# method_byte = self.raw_umid[11]
# method_byte = method_byte & 0xf
# if method_byte == 0x0:
# return 'undefined'
# elif method_byte == 0x01:
# return 'local_registration'
# elif method_byte == 0x02:
# return '24_bit_prs'
# elif method_byte == 0x03:
# return 'copy_number_and_16_bit_prs'
# elif 0x04 <= method_byte <= 0x0e:
# return 'reserved_undefined'
# elif method_byte == 0x0f:
# return 'live_stream'
# else:
# return 'unrecognized'
#
# @property
# def indicated_length(self) -> str:
# if self.raw_umid[12] == 0x13:
# return 'basic'
# elif self.raw_umid[12] == 0x33:
# return 'extended'
#
# @property
# def instance_number(self) -> bytearray:
# return self.raw_umid[13:3]
#
# @property
# def material_number(self) -> bytearray:
# return self.raw_umid[16:16]
#
# @property
# def source_pack(self) -> Union[bytearray, None]:
# if self.indicated_length == 'extended':
# return self.raw_umid[32:32]
# else:
# return None

View File

@@ -5,12 +5,14 @@ ADM Reader
from struct import unpack, unpack_from, calcsize
from io import BytesIO
from collections import namedtuple
from typing import Iterable, Tuple
from typing import Optional
from lxml import etree as ET
ChannelEntry = namedtuple('ChannelEntry', "track_index uid track_ref pack_ref")
class WavADMReader:
"""
Reads XML data from an EBU ADM (Audio Definiton Model) WAV File.
@@ -26,24 +28,24 @@ class WavADMReader:
_, uid_count = unpack(header_fmt, chna_data[0:4])
#: A list of :class:`ChannelEntry` objects parsed from the
#: `chna` metadata chunk.
#:
#: .. note::
#: In-file, the `chna` track indexes start at 1. However, this interface
#: numbers the first track 0, in order to maintain consistency with other
#: libraries.
self.channel_uids = []
offset = calcsize(header_fmt)
for _ in range(uid_count):
track_index, uid, track_ref, pack_ref = unpack_from(uid_fmt, chna_data, offset)
track_index, uid, track_ref, pack_ref = unpack_from(uid_fmt,
chna_data,
offset)
# these values are either ascii or all null
self.channel_uids.append(ChannelEntry(track_index - 1,
uid.decode('ascii') , track_ref.decode('ascii'), pack_ref.decode('ascii')))
self.channel_uids.append(
ChannelEntry(track_index - 1,
uid.decode('ascii'),
track_ref.decode('ascii'),
pack_ref.decode('ascii')
)
)
offset += calcsize(uid_fmt)
@@ -53,12 +55,13 @@ class WavADMReader:
def programme(self) -> dict:
"""
Read the ADM `audioProgramme` data structure and some of its reference properties.
Read the ADM `audioProgramme` data structure and some of its reference
properties.
"""
ret_dict = dict()
nsmap = self.axml.getroot().nsmap
nsmap = self.axml.getroot().nsmap
afext = self.axml.find(".//audioFormatExtended", namespaces=nsmap)
program = afext.find("audioProgramme", namespaces=nsmap)
@@ -68,17 +71,21 @@ class WavADMReader:
ret_dict['programme_end'] = program.get("end")
ret_dict['contents'] = []
for content_ref in program.findall("audioContentIDRef", namespaces=nsmap):
for content_ref in program.findall("audioContentIDRef",
namespaces=nsmap):
content_dict = dict()
content_dict['content_id'] = cid = content_ref.text
content = afext.find("audioContent[@audioContentID='%s']" % cid, namespaces=nsmap)
content = afext.find("audioContent[@audioContentID='%s']" % cid,
namespaces=nsmap)
content_dict['content_name'] = content.get("audioContentName")
content_dict['objects'] = []
for object_ref in content.findall("audioObjectIDRef", namespaces=nsmap):
for object_ref in content.findall("audioObjectIDRef",
namespaces=nsmap):
object_dict = dict()
object_dict['object_id'] = oid = object_ref.text
object = afext.find("audioObject[@audioObjectID='%s']" % oid, namespaces=nsmap)
object = afext.find("audioObject[@audioObjectID='%s']" % oid,
namespaces=nsmap)
pack = object.find("audioPackFormatIDRef", namespaces=nsmap)
object_dict['object_name'] = object.get("audioObjectName")
object_dict['object_start'] = object.get("start")
@@ -95,16 +102,18 @@ class WavADMReader:
return ret_dict
def track_info(self, index) -> dict:
def track_info(self, index) -> Optional[dict]:
"""
Information about a track in the WAV file.
:param index: index of audio track (indexed from zero)
:returns: a dictionary with *content_name*, *content_id*, *object_name*, *object_id*,
:param index: index of audio track (indexed from zero)
:returns: a dictionary with *content_name*, *content_id*,
*object_name*, *object_id*,
*pack_format_name*, *pack_type*, *channel_format_name*
"""
channel_info = next((x for x in self.channel_uids if x.track_index == index), None)
channel_info = next((x for x in self.channel_uids
if x.track_index == index), None)
if channel_info is None:
return None
@@ -112,46 +121,60 @@ class WavADMReader:
nsmap = self.axml.getroot().nsmap
afext = self.axml.find(".//audioFormatExtended", namespaces=nsmap)
afext = self.axml.find(".//audioFormatExtended",
namespaces=nsmap)
trackformat_elem = afext.find("audioTrackFormat[@audioTrackFormatID='%s']" % channel_info.track_ref,
namespaces=nsmap)
trackformat_elem = afext.find(
"audioTrackFormat[@audioTrackFormatID='%s']"
% channel_info.track_ref, namespaces=nsmap)
stream_id = trackformat_elem[0].text
channelformatref_elem = afext.find("audioStreamFormat[@audioStreamFormatID='%s']/audioChannelFormatIDRef" % stream_id,
channelformatref_elem = afext.find(
("audioStreamFormat[@audioStreamFormatID='%s']"
"/audioChannelFormatIDRef") % stream_id,
namespaces=nsmap)
channelformat_id = channelformatref_elem.text
packformatref_elem = afext.find("audioStreamFormat[@audioStreamFormatID='%s']/audioPackFormatIDRef" % stream_id,
packformatref_elem = afext.find(
("audioStreamFormat[@audioStreamFormatID='%s']"
"/audioPackFormatIDRef") % stream_id,
namespaces=nsmap)
packformat_id = packformatref_elem.text
channelformat_elem = afext.find("audioChannelFormat[@audioChannelFormatID='%s']" % channelformat_id,
namespaces=nsmap)
ret_dict['channel_format_name'] = channelformat_elem.get("audioChannelFormatName")
channelformat_elem = afext\
.find("audioChannelFormat[@audioChannelFormatID='%s']"
% channelformat_id,
namespaces=nsmap)
ret_dict['channel_format_name'] = channelformat_elem.get(
"audioChannelFormatName")
packformat_elem = afext.find("audioPackFormat[@audioPackFormatID='%s']" % packformat_id,
packformat_elem = afext.find(
"audioPackFormat[@audioPackFormatID='%s']" % packformat_id,
namespaces=nsmap)
ret_dict['pack_type'] = packformat_elem.get("typeDefinition")
ret_dict['pack_format_name'] = packformat_elem.get("audioPackFormatName")
ret_dict['pack_type'] = packformat_elem.get(
"typeDefinition")
ret_dict['pack_format_name'] = packformat_elem.get(
"audioPackFormatName")
object_elem = afext.find("audioObject[audioPackFormatIDRef = '%s']" % packformat_id,
namespaces=nsmap)
object_elem = afext.find("audioObject[audioPackFormatIDRef = '%s']"
% packformat_id,
namespaces=nsmap)
ret_dict['audio_object_name'] = object_elem.get("audioObjectName")
object_id = object_elem.get("audioObjectID")
ret_dict['object_id'] = object_id
content_elem = afext.find("audioContent/[audioObjectIDRef = '%s']" % object_id,
namespaces=nsmap)
content_elem = afext.find("audioContent/[audioObjectIDRef = '%s']"
% object_id,
namespaces=nsmap)
ret_dict['content_name'] = content_elem.get("audioContentName")
ret_dict['content_id'] = content_elem.get("audioContentID")
return ret_dict
def to_dict(self) -> dict: #FIXME should be "asdict"
def to_dict(self) -> dict: # FIXME should be "asdict"
"""
Get ADM metadata as a dictionary.
"""
@@ -161,5 +184,6 @@ class WavADMReader:
rd.update(self.track_info(channel_uid_rec.track_index))
return rd
return dict(channel_entries=list(map(lambda z: make_entry(z), self.channel_uids)),
programme=self.programme())
return dict(channel_entries=list(map(lambda z: make_entry(z),
self.channel_uids)),
programme=self.programme())

View File

@@ -3,71 +3,75 @@ import struct
from typing import Optional
class WavBextReader:
def __init__(self, bext_data, encoding):
"""
Read Broadcast-WAV extended metadata.
:param bext_data: The bytes-like data.
:param encoding: The encoding to use when decoding the text fields of the
BEXT metadata scope. According to EBU Rec 3285 this shall be ASCII.
:param encoding: The encoding to use when decoding the text fields of
the BEXT metadata scope. According to EBU Rec 3285 this shall be
ASCII.
"""
packstring = "<256s" + "32s" + "32s" + "10s" + "8s" + "QH" + "64s" + "hhhhh" + "180s"
packstring = "<256s" + "32s" + "32s" + "10s" + "8s" + "QH" + "64s" + \
"hhhhh" + "180s"
rest_starts = struct.calcsize(packstring)
unpacked = struct.unpack(packstring, bext_data[:rest_starts])
def sanitize_bytes(b : bytes) -> str:
def sanitize_bytes(b: bytes) -> str:
# honestly can't remember why I'm stripping nulls this way
first_null = next((index for index, byte in enumerate(b) if byte == 0), None)
first_null = next((index for index, byte in enumerate(b)
if byte == 0), None)
trimmed = b if first_null is None else b[:first_null]
decoded = trimmed.decode(encoding)
return decoded
#: Description. A free-text field up to 256 characters long.
self.description : str = sanitize_bytes(unpacked[0])
self.description: str = sanitize_bytes(unpacked[0])
#: Originator. Usually the name of the encoding application, sometimes
#: an artist name.
self.originator : str = sanitize_bytes(unpacked[1])
self.originator: str = sanitize_bytes(unpacked[1])
#: A unique identifier for the file, a serial number.
self.originator_ref : str = sanitize_bytes(unpacked[2])
self.originator_ref: str = sanitize_bytes(unpacked[2])
#: Date of the recording, in the format YYYY-MM-DD.
self.originator_date : str = sanitize_bytes(unpacked[3])
self.originator_date: str = sanitize_bytes(unpacked[3])
#: Time of the recording, in the format HH:MM:SS.
self.originator_time : str = sanitize_bytes(unpacked[4])
#: The sample offset of the start, usually relative
#: to midnight.
self.time_reference : int = unpacked[5]
self.originator_time: str = sanitize_bytes(unpacked[4])
#: The sample offset of the start, usually relative
#: to midnight.
self.time_reference: int = unpacked[5]
#: A variable-length text field containing a list of processes and
#: and conversions performed on the file.
self.coding_history : str = sanitize_bytes(bext_data[rest_starts:])
#: BEXT version.
self.version : int = unpacked[6]
#: SMPTE 330M UMID of this audio file, 64 bytes are allocated though the UMID
#: may only be 32 bytes long.
self.umid : Optional[bytes] = None
self.coding_history: str = sanitize_bytes(bext_data[rest_starts:])
#: BEXT version.
self.version: int = unpacked[6]
#: SMPTE 330M UMID of this audio file, 64 bytes are allocated though
#: the UMID may only be 32 bytes long.
self.umid: Optional[bytes] = None
#: EBU R128 Integrated loudness, in LUFS.
self.loudness_value : Optional[float] = None
self.loudness_value: Optional[float] = None
#: EBU R128 Loudness range, in LUFS.
self.loudness_range : Optional[float] = None
self.loudness_range: Optional[float] = None
#: True peak level, in dBFS TP
self.max_true_peak : Optional[float] = None
self.max_true_peak: Optional[float] = None
#: EBU R128 Maximum momentary loudness, in LUFS
self.max_momentary_loudness : Optional[float] = None
self.max_momentary_loudness: Optional[float] = None
#: EBU R128 Maximum short-term loudness, in LUFS.
self.max_shortterm_loudness : Optional[float] = None
self.max_shortterm_loudness: Optional[float] = None
if self.version > 0:
self.umid = unpacked[7]
@@ -84,7 +88,7 @@ class WavBextReader:
# umid_parsed = UMIDParser(self.umid)
# umid_str = umid_parsed.basic_umid_to_str()
# else:
umid_str = None
return {'description': self.description,

View File

@@ -1,26 +1,25 @@
"""
Cues metadata
For reference on implementation of cues and related metadata see:
For reference on implementation of cues and related metadata see:
August 1991, "Multimedia Programming Interface and Data Specifications 1.0",
IBM Corporation and Microsoft Corporation
https://www.aelius.com/njh/wavemetatools/doc/riffmci.pdf
"""
from dataclasses import dataclass
import encodings
from .riff_parser import ChunkDescriptor
from struct import unpack, calcsize
from typing import Optional, Tuple, NamedTuple, List, Dict, Any, Generator
#: Country Codes used in the RIFF standard to resolve locale. These codes
#: Country Codes used in the RIFF standard to resolve locale. These codes
#: appear in CSET and LTXT metadata.
CountryCodes = """000 None Indicated
001,USA
002,Canada
003,Latin America
030,Greece
030,Greece
031,Netherlands
032,Belgium
033,France
@@ -44,50 +43,50 @@ CountryCodes = """000 None Indicated
090,Turkey
351,Portugal
352,Luxembourg
354,Iceland
354,Iceland
358,Finland"""
#: Language and Dialect codes used in the RIFF standard to resolve native
#: Language and Dialect codes used in the RIFF standard to resolve native
#: language of text fields. These codes appear in CSET and LTXT metadata.
LanguageDialectCodes = """0 0 None Indicated
1,1,Arabic
2,1,Bulgarian
3,1,Catalan
4,1,Traditional Chinese
4,2,Simplified Chinese
4,1,Traditional Chinese
4,2,Simplified Chinese
5,1,Czech
6,1,Danish
7,1,German
7,2,Swiss German
7,2,Swiss German
8,1,Greek
9,1,US English
9,2,UK English
9,2,UK English
10,1,Spanish
10,2,Spanish Mexican
10,2,Spanish Mexican
11,1,Finnish
12,1,French
12,2,Belgian French
12,3,Canadian French
12,4,Swiss French
13,1,Hebrew
12,2,Belgian French
12,3,Canadian French
12,4,Swiss French
13,1,Hebrew
14,1,Hungarian
15,1,Icelandic
16,1,Italian
16,2,Swiss Italian
16,2,Swiss Italian
17,1,Japanese
18,1,Korean
19,1,Dutch
19,2,Belgian Dutch
20,1,Norwegian - Bokmal
20,2,Norwegian - Nynorsk
19,2,Belgian Dutch
20,1,Norwegian - Bokmal
20,2,Norwegian - Nynorsk
21,1,Polish
22,1,Brazilian Portuguese
22,2,Portuguese
23,1,Rhaeto-Romanic
22,1,Brazilian Portuguese
22,2,Portuguese
23,1,Rhaeto-Romanic
24,1,Romanian
25,1,Russian
26,1,Serbo-Croatian (Latin)
26,2,Serbo-Croatian (Cyrillic)
26,1,Serbo-Croatian (Latin)
26,2,Serbo-Croatian (Cyrillic)
27,1,Slovak
28,1,Albanian
29,1,Swedish
@@ -101,10 +100,10 @@ class CueEntry(NamedTuple):
"""
A ``cue`` element structure.
"""
#: Cue "name" or id number
#: Cue "name" or id number
name: int
#: Cue position, as a frame count in the play order of the WAVE file. In
#: principle this can be affected by playlists and ``wavl`` chunk
#: Cue position, as a frame count in the play order of the WAVE file. In
#: principle this can be affected by playlists and ``wavl`` chunk
#: placement.
position: int
chunk_id: bytes
@@ -113,7 +112,7 @@ class CueEntry(NamedTuple):
sample_offset: int
Format = "<II4sIII"
@classmethod
def format_size(cls) -> int:
return calcsize(cls.Format)
@@ -121,13 +120,13 @@ class CueEntry(NamedTuple):
@classmethod
def read(cls, data: bytes) -> 'CueEntry':
assert len(data) == cls.format_size(), \
(f"cue data size incorrect, expected {calcsize(cls.Format)}"
(f"cue data size incorrect, expected {calcsize(cls.Format)} "
"found {len(data)}")
parsed = unpack(cls.Format, data)
return cls(name=parsed[0], position=parsed[1], chunk_id=parsed[2],
chunk_start=parsed[3], block_start=parsed[4],
chunk_start=parsed[3], block_start=parsed[4],
sample_offset=parsed[5])
@@ -170,8 +169,8 @@ class RangeLabel(NamedTuple):
fallback_encoding = f"cp{data[6]}"
return cls(name=parsed[0], length=parsed[1], purpose=parsed[2],
country=parsed[3], language=parsed[4],
dialect=parsed[5], codepage=parsed[6],
country=parsed[3], language=parsed[4],
dialect=parsed[5], codepage=parsed[6],
text=text_data.decode(fallback_encoding))
@@ -192,19 +191,19 @@ class WavCuesReader:
@classmethod
def read_all(cls, f,
cues: Optional[ChunkDescriptor],
labls: List[ChunkDescriptor],
ltxts: List[ChunkDescriptor],
notes: List[ChunkDescriptor],
fallback_encoding: str) -> 'WavCuesReader':
cues: Optional[ChunkDescriptor],
labls: List[ChunkDescriptor],
ltxts: List[ChunkDescriptor],
notes: List[ChunkDescriptor],
fallback_encoding: str) -> 'WavCuesReader':
cue_list = []
if cues is not None:
cues_data = cues.read_data(f)
assert len(cues_data) >= 4, "cue metadata too short"
offset = calcsize("<I")
cues_count = unpack("<I", cues_data[0:offset])
for _ in range(cues_count[0]):
cue_bytes = cues_data[offset: offset + CueEntry.format_size()]
cue_list.append(CueEntry.read(cue_bytes))
@@ -213,14 +212,14 @@ class WavCuesReader:
label_list = []
for labl in labls:
label_list.append(
LabelEntry.read(labl.read_data(f),
LabelEntry.read(labl.read_data(f),
encoding=fallback_encoding)
)
range_list = []
for r in ltxts:
range_list.append(
RangeLabel.read(r.read_data(f),
RangeLabel.read(r.read_data(f),
fallback_encoding=fallback_encoding)
)
@@ -236,7 +235,7 @@ class WavCuesReader:
def each_cue(self) -> Generator[Tuple[int, int], None, None]:
"""
Iterate through each cue.
Iterate through each cue.
:yields: the cue's ``name`` and ``sample_offset``
"""
@@ -244,17 +243,17 @@ class WavCuesReader:
yield (cue.name, cue.sample_offset)
def label_and_note(self, cue_ident: int) -> Tuple[Optional[str],
Optional[str]]:
Optional[str]]:
"""
Get the label and note (extended comment) for a cue.
:param cue_ident: the cue's name, its unique identifying number
:returns: a tuple of the the cue's label (if present) and note (if
present)
present)
"""
label = next((l.text for l in self.labels
if l.name == cue_ident), None)
note = next((n.text for n in self.notes
label = next((label.text for label in self.labels
if label.name == cue_ident), None)
note = next((n.text for n in self.notes
if n.name == cue_ident), None)
return (label, note)
@@ -265,7 +264,7 @@ class WavCuesReader:
:param cue_ident: the cue's name, its unique identifying number
:returns: the length of the marker's range, or `None`
"""
return next((r.length for r in self.ranges
return next((r.length for r in self.ranges
if r.name == cue_ident), None)
def to_dict(self) -> Dict[str, Any]:
@@ -280,15 +279,8 @@ class WavCuesReader:
if label is not None:
retval[n]['label'] = label
if note is not None:
retval[n]['note'] = note
retval[n]['note'] = note
if r is not None:
retval[n]['length'] = r
return retval
# return dict(cues=[c._asdict() for c in self.cues],
# labels=[l._asdict() for l in self.labels],
# ranges=[r._asdict() for r in self.ranges],
# notes=[n._asdict() for n in self.notes])
retval[n]['length'] = r
return retval

View File

@@ -1,7 +1,7 @@
"""
Reading Dolby Bitstream Metadata
Unless otherwise stated, all § references here are to
Unless otherwise stated, all § references here are to
`EBU Tech 3285 Supplement 6`_.
.. _EBU Tech 3285 Supplement 6: https://tech.ebu.ch/docs/tech/tech3285s6.pdf
@@ -10,10 +10,11 @@ Unless otherwise stated, all § references here are to
from enum import IntEnum, Enum
from struct import unpack
from dataclasses import dataclass, asdict
from typing import List, Optional, Tuple, Any, Union
from typing import List, Tuple, Any, Union
from io import BytesIO
class SegmentType(IntEnum):
"""
Metadata segment type.
@@ -31,7 +32,7 @@ class SegmentType(IntEnum):
DolbyAtmosSupplemental = 0xa
@classmethod
def _missing_(cls,val):
def _missing_(cls, val):
return val
@@ -39,11 +40,11 @@ class SegmentType(IntEnum):
class DolbyDigitalPlusMetadata:
"""
*Dolby Digital Plus* is Dolby's brand for multichannel surround
on discrete formats that aren't AC-3 (Dolby Digital) or Dolby E. This
metadata segment is present in ADM wave files created with a Dolby Atmos
on discrete formats that aren't AC-3 (Dolby Digital) or Dolby E. This
metadata segment is present in ADM wave files created with a Dolby Atmos
Production Suite.
Where an AC-3 bitstream can contain multiple programs, a Dolby Digital
Where an AC-3 bitstream can contain multiple programs, a Dolby Digital
Plus bitstream will only contain one program.
"""
@@ -77,7 +78,6 @@ class DolbyDigitalPlusMetadata:
MUTE = 0b111
"-∞ dB"
class DolbySurroundEncodingMode(Enum):
"""
Dolby surround endcoding mode.
@@ -87,7 +87,6 @@ class DolbyDigitalPlusMetadata:
NOT_IN_USE = 0b01
NOT_INDICATED = 0b00
class BitStreamMode(Enum):
"""
Dolby Digital Plus `bsmod` field
@@ -122,7 +121,6 @@ class DolbyDigitalPlusMetadata:
should be interpreted as karaoke.
"""
class AudioCodingMode(Enum):
"""
Dolby Digital Plus `acmod` field
@@ -144,7 +142,6 @@ class DolbyDigitalPlusMetadata:
CH_ORD_3_2 = 0b111
"LCR + LR surround"
class CenterDownMixLevel(Enum):
"""
§ 4.3.3.1
@@ -152,16 +149,15 @@ class DolbyDigitalPlusMetadata:
DOWN_3DB = 0b00
"Attenuate 3 dB"
DOWN_45DB = 0b01
"Attenuate 4.5 dB"
DOWN_6DB = 0b10
"Attenuate 6 dB"
RESERVED = 0b11
class SurroundDownMixLevel(Enum):
"""
Dolby Digital Plus `surmixlev` field
@@ -172,7 +168,6 @@ class DolbyDigitalPlusMetadata:
MUTE = 0b10
RESERVED = 0b11
class LanguageCode(int):
"""
§ 4.3.4.1
@@ -181,21 +176,18 @@ class DolbyDigitalPlusMetadata:
"""
pass
class MixLevel(int):
"""
§ 4.3.6.2
"""
pass
class DialnormLevel(int):
"""
§ 4.3.4.4
"""
pass
class RoomType(Enum):
"""
`roomtyp` 4.3.6.3
@@ -205,11 +197,10 @@ class DolbyDigitalPlusMetadata:
SMALL_ROOM_FLAT_CURVE = 0b10
RESERVED = 0b11
class PreferredDownMixMode(Enum):
"""
Indicates the creating engineer's preference of what the receiver should
downmix.
Indicates the creating engineer's preference of what the receiver
should downmix.
§ 4.3.8.1
"""
NOT_INDICATED = 0b00
@@ -217,7 +208,6 @@ class DolbyDigitalPlusMetadata:
STEREO = 0b10
PRO_LOGIC_2 = 0b11
class SurroundEXMode(IntEnum):
"""
Dolby Surround-EX mode.
@@ -228,7 +218,6 @@ class DolbyDigitalPlusMetadata:
SEX = 0b10
PRO_LOGIC_2 = 0b11
class HeadphoneMode(IntEnum):
"""
`dheadphonmod` § 4.3.9.2
@@ -238,12 +227,10 @@ class DolbyDigitalPlusMetadata:
DOLBY_HEADPHONE = 0b10
RESERVED = 0b11
class ADConverterType(Enum):
STANDARD = 0
HDCD = 1
class StreamDependency(Enum):
"""
Encodes `ddplus_info1.stream_type` field § 4.3.12.1
@@ -254,7 +241,6 @@ class DolbyDigitalPlusMetadata:
INDEPENDENT_FROM_DOLBY_DIGITAL = 2
RESERVED = 3
class RFCompressionProfile(Enum):
"""
`compr1` RF compression profile
@@ -267,7 +253,7 @@ class DolbyDigitalPlusMetadata:
MUSIC_LIGHT = 4
SPEECH = 5
#: Program ID number, this identifies the program in a multi-program
#: Program ID number, this identifies the program in a multi-program
#: element. § 4.3.1
program_id: int
@@ -317,13 +303,13 @@ class DolbyDigitalPlusMetadata:
#: LoRo preferred center downmix level
loro_center_downmix_level: DownMixLevelToken
#: LoRo preferred surround downmix level
loro_surround_downmix_level: DownMixLevelToken
#: Preferred downmix mode
downmix_mode: PreferredDownMixMode
#: LtRt preferred center downmix level
ltrt_center_downmix_level: DownMixLevelToken
@@ -332,20 +318,20 @@ class DolbyDigitalPlusMetadata:
#: Surround-EX mode
surround_ex_mode: SurroundEXMode
#: Dolby Headphone mode
dolby_headphone_encoded: HeadphoneMode
ad_converter_type: ADConverterType
compression_profile: RFCompressionProfile
dynamic_range: RFCompressionProfile
#: Indicates if this stream can be decoded independently or not
stream_dependency: StreamDependency
#: Data rate of this bitstream in kilobits per second
datarate_kbps: int
@staticmethod
def load(buffer: bytes):
assert len(buffer) == 96, "Dolby Digital Plus segment incorrect size, "
@@ -363,12 +349,14 @@ class DolbyDigitalPlusMetadata:
pass
def surround_config(b):
return DolbyDigitalPlusMetadata.CenterDownMixLevel(b & 0x30 >> 4), \
DolbyDigitalPlusMetadata.SurroundDownMixLevel(b & 0xc >> 2), \
return (
DolbyDigitalPlusMetadata.CenterDownMixLevel(b & 0x30 >> 4),
DolbyDigitalPlusMetadata.SurroundDownMixLevel(b & 0xc >> 2),
DolbyDigitalPlusMetadata.DolbySurroundEncodingMode(b & 0x3)
)
def dialnorm_info(b):
return (b & 0x80) > 0 , b & 0x40 > 0, b & 0x20 > 0, \
return (b & 0x80) > 0, b & 0x40 > 0, b & 0x20 > 0, \
DolbyDigitalPlusMetadata.DialnormLevel(b & 0x1f)
def langcod(b) -> int:
@@ -379,22 +367,23 @@ class DolbyDigitalPlusMetadata:
DolbyDigitalPlusMetadata.MixLevel(b & 0x7c >> 2), \
DolbyDigitalPlusMetadata.RoomType(b & 0x3)
# loro_center_downmix_level, loro_surround_downmix_level
# loro_center_downmix_level, loro_surround_downmix_level
def ext_bsi1_word1(b):
return DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x38 >> 3), \
DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x7)
# downmix_mode, ltrt_center_downmix_level, ltrt_surround_downmix_level
def ext_bsi1_word2(b):
return DolbyDigitalPlusMetadata.PreferredDownMixMode(b & 0xC0 >> 6), \
return DolbyDigitalPlusMetadata\
.PreferredDownMixMode(b & 0xC0 >> 6), \
DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x38 >> 3), \
DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x7)
#surround_ex_mode, dolby_headphone_encoded, ad_converter_type
# surround_ex_mode, dolby_headphone_encoded, ad_converter_type
def ext_bsi2_word1(b):
return DolbyDigitalPlusMetadata.SurroundEXMode(b & 0x60 >> 5), \
DolbyDigitalPlusMetadata.HeadphoneMode(b & 0x18 >> 3), \
DolbyDigitalPlusMetadata.ADConverterType( b & 0x4 >> 2)
DolbyDigitalPlusMetadata.ADConverterType(b & 0x4 >> 2)
def ddplus_reserved2(_):
pass
@@ -403,7 +392,7 @@ class DolbyDigitalPlusMetadata:
return DolbyDigitalPlusMetadata.RFCompressionProfile(b)
def dynrng1(b):
DolbyDigitalPlusMetadata.RFCompressionProfile(b)
DolbyDigitalPlusMetadata.RFCompressionProfile(b)
def ddplus_reserved3(_):
pass
@@ -423,14 +412,19 @@ class DolbyDigitalPlusMetadata:
pid = program_id(buffer[0])
lfe_on, bitstream_mode, audio_coding_mode = program_info(buffer[1])
ddplus_reserved1(buffer[2:2])
center_downmix_level, surround_downmix_level, dolby_surround_encoded = surround_config(buffer[4])
langcode_present, copyright_bitstream, original_bitstream, dialnorm = dialnorm_info(buffer[5])
center_downmix_level, surround_downmix_level, \
dolby_surround_encoded = surround_config(buffer[4])
langcode_present, copyright_bitstream, original_bitstream, \
dialnorm = dialnorm_info(buffer[5])
langcode = langcod(buffer[6])
prod_info_exists, mixlevel, roomtype = audio_prod_info(buffer[7])
loro_center_downmix_level, loro_surround_downmix_level = ext_bsi1_word1(buffer[8])
downmix_mode, ltrt_center_downmix_level, ltrt_surround_downmix_level = ext_bsi1_word2(buffer[9])
surround_ex_mode, dolby_headphone_encoded, ad_converter_type = ext_bsi2_word1(buffer[10])
loro_center_downmix_level, \
loro_surround_downmix_level = ext_bsi1_word1(buffer[8])
downmix_mode, ltrt_center_downmix_level, \
ltrt_surround_downmix_level = ext_bsi1_word2(buffer[9])
surround_ex_mode, dolby_headphone_encoded, \
ad_converter_type = ext_bsi2_word1(buffer[10])
ddplus_reserved2(buffer[11:14])
compression = compr1(buffer[14])
@@ -441,33 +435,33 @@ class DolbyDigitalPlusMetadata:
data_rate = datarate(buffer[25:27])
reserved(buffer[27:69])
return DolbyDigitalPlusMetadata(program_id=pid,
lfe_on=lfe_on,
bitstream_mode=bitstream_mode,
audio_coding_mode=audio_coding_mode,
center_downmix_level=center_downmix_level,
surround_downmix_level=surround_downmix_level,
dolby_surround_encoded=dolby_surround_encoded,
langcode_present=langcode_present,
copyright_bitstream=copyright_bitstream,
original_bitstream=original_bitstream,
dialnorm=dialnorm,
langcode=langcode,
prod_info_exists=prod_info_exists,
mixlevel=mixlevel,
roomtype=roomtype,
loro_center_downmix_level=loro_center_downmix_level,
loro_surround_downmix_level=loro_surround_downmix_level,
downmix_mode=downmix_mode,
ltrt_center_downmix_level=ltrt_center_downmix_level,
ltrt_surround_downmix_level=ltrt_surround_downmix_level,
surround_ex_mode=surround_ex_mode,
dolby_headphone_encoded=dolby_headphone_encoded,
ad_converter_type=ad_converter_type,
compression_profile=compression,
dynamic_range=dynamic_range,
stream_dependency=stream_info,
datarate_kbps=data_rate)
return DolbyDigitalPlusMetadata(
program_id=pid, lfe_on=lfe_on,
bitstream_mode=bitstream_mode,
audio_coding_mode=audio_coding_mode,
center_downmix_level=center_downmix_level,
surround_downmix_level=surround_downmix_level,
dolby_surround_encoded=dolby_surround_encoded,
langcode_present=langcode_present,
copyright_bitstream=copyright_bitstream,
original_bitstream=original_bitstream,
dialnorm=dialnorm,
langcode=langcode,
prod_info_exists=prod_info_exists,
mixlevel=mixlevel,
roomtype=roomtype,
loro_center_downmix_level=loro_center_downmix_level,
loro_surround_downmix_level=loro_surround_downmix_level,
downmix_mode=downmix_mode,
ltrt_center_downmix_level=ltrt_center_downmix_level,
ltrt_surround_downmix_level=ltrt_surround_downmix_level,
surround_ex_mode=surround_ex_mode,
dolby_headphone_encoded=dolby_headphone_encoded,
ad_converter_type=ad_converter_type,
compression_profile=compression,
dynamic_range=dynamic_range,
stream_dependency=stream_info,
datarate_kbps=data_rate)
@dataclass
@@ -486,7 +480,7 @@ class DolbyAtmosMetadata:
NOT_INDICATED = 0x04
tool_name: str
tool_version: Tuple[int,int,int]
tool_version: Tuple[int, int, int]
warp_mode: WarpMode
SEGMENT_LENGTH = 248
@@ -494,8 +488,10 @@ class DolbyAtmosMetadata:
@classmethod
def load(cls, data: bytes):
assert len(data) == cls.SEGMENT_LENGTH, "DolbyAtmosMetadata segment "\
"is incorrect length, expected %i actual was %i" % (cls.SEGMENT_LENGTH, len(data))
assert len(data) == cls.SEGMENT_LENGTH
# (f"DolbyAtmosMetadata segment is incorrect length, "
# f"expected {cls.SEGMENT_LENGTH} actual was {len(data)}")
h = BytesIO(data)
@@ -512,17 +508,20 @@ class DolbyAtmosMetadata:
a_val = unpack("B", h.read(1))[0]
warp_mode = a_val & 0x7
return DolbyAtmosMetadata(tool_name=toolname,
tool_version=(major, minor, fix), warp_mode=DolbyAtmosMetadata.WarpMode(warp_mode))
return DolbyAtmosMetadata(tool_name=toolname,
tool_version=(major, minor, fix),
warp_mode=DolbyAtmosMetadata
.WarpMode(warp_mode))
@dataclass
class DolbyAtmosSupplementalMetadata:
"""
Dolby Atmos supplemental metadata segment.
https://github.com/DolbyLaboratories/dbmd-atmos-parser/blob/master/dbmd_atmos_parse/src/dbmd_atmos_parse.c
"""
https://github.com/DolbyLaboratories/dbmd-atmos-parser/blob/
master/dbmd_atmos_parse/src/dbmd_atmos_parse.c
"""
class BinauralRenderMode(Enum):
BYPASS = 0x00
@@ -531,12 +530,10 @@ class DolbyAtmosSupplementalMetadata:
MID = 0x03
NOT_INDICATED = 0x04
object_count: int
render_modes: List['DolbyAtmosSupplementalMetadata.BinauralRenderMode']
trim_modes: List[int]
MAGIC = 0xf8726fbd
TRIM_CONFIG_COUNT = 9
@@ -552,15 +549,15 @@ class DolbyAtmosSupplementalMetadata:
object_count = unpack("<H", h.read(2))[0]
h.read(1) #skip 1
h.read(1) # skip 1
for _ in range(cls.TRIM_CONFIG_COUNT):
auto_trim = unpack("B", h.read(1))
trim_modes.append(auto_trim)
h.read(14) #skip 14
h.read(object_count) # skip object_count bytes
h.read(14) # skip 14
h.read(object_count) # skip object_count bytes
for _ in range(object_count):
binaural_mode = unpack("B", h.read(1))[0]
@@ -568,7 +565,8 @@ class DolbyAtmosSupplementalMetadata:
render_modes.append(binaural_mode)
return DolbyAtmosSupplementalMetadata(object_count=object_count,
render_modes=render_modes,trim_modes=trim_modes)
render_modes=render_modes,
trim_modes=trim_modes)
class WavDolbyMetadataReader:
@@ -580,11 +578,11 @@ class WavDolbyMetadataReader:
#:
#: Each list entry is a tuple of `SegmentType`, a `bool`
#: indicating if the segment's checksum was valid, and the
#: segment's parsed dataclass (or a `bytes` array if it was
#: segment's parsed dataclass (or a `bytes` array if it was
#: not recognized).
segment_list: List[Tuple[Union[SegmentType, int], bool, Any]]
version: Tuple[int,int,int,int]
version: Tuple[int, int, int, int]
@staticmethod
def segment_checksum(bs: bytes, size: int):
@@ -597,7 +595,6 @@ class WavDolbyMetadataReader:
return retval
def __init__(self, dbmd_data):
self.segment_list = []
@@ -606,18 +603,19 @@ class WavDolbyMetadataReader:
v_vec = []
for _ in range(4):
b = h.read(1)
v_vec.insert(0, unpack("B",b)[0])
v_vec.insert(0, unpack("B", b)[0])
self.version = tuple(v_vec)
while True:
stype= SegmentType(unpack("B", h.read(1))[0])
stype = SegmentType(unpack("B", h.read(1))[0])
if stype == SegmentType.EndMarker:
break
else:
seg_size = unpack("<H", h.read(2))[0]
seg_payload = h.read(seg_size)
expected_checksum = WavDolbyMetadataReader.segment_checksum(seg_payload, seg_size)
expected_checksum = WavDolbyMetadataReader\
.segment_checksum(seg_payload, seg_size)
checksum = unpack("B", h.read(1))[0]
segment = seg_payload
@@ -627,35 +625,36 @@ class WavDolbyMetadataReader:
segment = DolbyAtmosMetadata.load(segment)
elif stype == SegmentType.DolbyAtmosSupplemental:
segment = DolbyAtmosSupplementalMetadata.load(segment)
self.segment_list.append( (stype, checksum == expected_checksum, segment) )
self.segment_list\
.append((stype, checksum == expected_checksum, segment))
def dolby_digital_plus(self) -> List[DolbyDigitalPlusMetadata]:
"""
Every valid Dolby Digital Plus metadata segment in the file.
"""
return [x[2] for x in self.segment_list \
if x[0] == SegmentType.DolbyDigitalPlus and x[1]]
return [x[2] for x in self.segment_list
if x[0] == SegmentType.DolbyDigitalPlus and x[1]]
def dolby_atmos(self) -> List[DolbyAtmosMetadata]:
"""
Every valid Dolby Atmos metadata segment in the file.
"""
return [x[2] for x in self.segment_list \
if x[0] == SegmentType.DolbyAtmos and x[1]]
return [x[2] for x in self.segment_list
if x[0] == SegmentType.DolbyAtmos and x[1]]
def dolby_atmos_supplemental(self) -> List[DolbyAtmosSupplementalMetadata]:
"""
Every valid Dolby Atmos Supplemental metadata segment in the file.
"""
return [x[2] for x in self.segment_list \
if x[0] == SegmentType.DolbyAtmosSupplemental and x[1]]
return [x[2] for x in self.segment_list
if x[0] == SegmentType.DolbyAtmosSupplemental and x[1]]
def to_dict(self) -> dict:
ddp = map(lambda x: asdict(x), self.dolby_digital_plus())
atmos = map(lambda x: asdict(x), self.dolby_atmos())
#atmos_sup = map(lambda x: asdict(x), self.dolby_atmos_supplemental())
# atmos_sup = map(lambda x: asdict(x), self.dolby_atmos_supplemental())
return dict(dolby_digital_plus=list(ddp),
dolby_atmos=list(atmos))
dolby_atmos=list(atmos))

View File

@@ -2,6 +2,7 @@ from .riff_parser import parse_chunk, ListChunkDescriptor
from typing import Optional
class WavInfoChunkReader:
def __init__(self, f, encoding):
@@ -9,47 +10,52 @@ class WavInfoChunkReader:
f.seek(0)
parsed_chunks = parse_chunk(f)
assert type(parsed_chunks) is ListChunkDescriptor
list_chunks = [chunk for chunk in parsed_chunks.children if type(chunk) is ListChunkDescriptor]
list_chunks = [chunk for chunk in parsed_chunks.children
if type(chunk) is ListChunkDescriptor]
self.info_chunk = next((chunk for chunk in list_chunks if chunk.signature == b'INFO'), None)
self.info_chunk = next((chunk for chunk in list_chunks
if chunk.signature == b'INFO'), None)
#: 'ICOP' Copyright
self.copyright : Optional[str] = self._get_field(f, b'ICOP')
self.copyright: Optional[str] = self._get_field(f, b'ICOP')
#: 'IPRD' Product
self.product : Optional[str]= self._get_field(f, b'IPRD')
self.album : Optional[str] = self.product
self.product: Optional[str] = self._get_field(f, b'IPRD')
self.album: Optional[str] = self.product
#: 'IGNR' Genre
self.genre : Optional[str] = self._get_field(f, b'IGNR')
self.genre: Optional[str] = self._get_field(f, b'IGNR')
#: 'ISBJ' Subject
self.subject : Optional[str] = self._get_field(f, b'ISBJ')
self.subject: Optional[str] = self._get_field(f, b'ISBJ')
#: 'IART' Artist, composer, author
self.artist : Optional[str] = self._get_field(f, b'IART')
self.artist: Optional[str] = self._get_field(f, b'IART')
#: 'ICMT' Comment
self.comment : Optional[str] = self._get_field(f, b'ICMT')
self.comment: Optional[str] = self._get_field(f, b'ICMT')
#: 'ISFT' Software, encoding application
self.software : Optional[str] = self._get_field(f, b'ISFT')
self.software: Optional[str] = self._get_field(f, b'ISFT')
#: 'ICRD' Created date
self.created_date : Optional[str] = self._get_field(f, b'ICRD')
self.created_date: Optional[str] = self._get_field(f, b'ICRD')
#: 'IENG' Engineer
self.engineer : Optional[str] = self._get_field(f, b'IENG')
self.engineer: Optional[str] = self._get_field(f, b'IENG')
#: 'ITCH' Technician
self.technician : Optional[str] = self._get_field(f, b'ITCH')
self.technician: Optional[str] = self._get_field(f, b'ITCH')
#: 'IKEY' Keywords, keyword list
self.keywords : Optional[str] = self._get_field(f, b'IKEY')
self.keywords: Optional[str] = self._get_field(f, b'IKEY')
#: 'INAM' Name, title
self.title : Optional[str] = self._get_field(f, b'INAM')
self.title: Optional[str] = self._get_field(f, b'INAM')
#: 'ISRC' Source
self.source : Optional[str] = self._get_field(f, b'ISRC')
self.source: Optional[str] = self._get_field(f, b'ISRC')
#: 'TAPE' Tape
self.tape : Optional[str] = self._get_field(f, b'TAPE')
self.tape: Optional[str] = self._get_field(f, b'TAPE')
#: 'IARL' Archival Location
self.archival_location : Optional[str] = self._get_field(f, b'IARL')
self.archival_location: Optional[str] = self._get_field(f, b'IARL')
#: 'ICSM' Commissioned
self.commissioned : Optional[str] = self._get_field(f, b'ICMS')
self.commissioned: Optional[str] = self._get_field(f, b'ICMS')
def _get_field(self, f, field_ident) -> Optional[str]:
search = next(((chunk.start, chunk.length) for chunk in self.info_chunk.children if chunk.ident == field_ident),
search = next(((chunk.start, chunk.length)
for chunk in self.info_chunk.children
if chunk.ident == field_ident),
None)
if search is not None:
@@ -59,7 +65,7 @@ class WavInfoChunkReader:
else:
return None
def to_dict(self) -> dict: #FIXME should be asdict
def to_dict(self) -> dict: # FIXME should be asdict
"""
A dictionary with all of the key/values read from the INFO scope.
"""

View File

@@ -1,10 +1,16 @@
from lxml import etree as ET
import io
from collections import namedtuple
# from collections import namedtuple
from typing import Optional
from enum import IntEnum
from typing import NamedTuple
IXMLTrack = namedtuple('IXMLTrack', ['channel_index', 'interleave_index', 'name', 'function'])
class IXMLTrack(NamedTuple):
channel_index: int
interleave_index: int
name: str
function: str
class SteinbergMetadata:
@@ -29,7 +35,7 @@ class SteinbergMetadata:
CINE_71 = 27
SDDS_70 = 24
SDDS_71 = 26
MUSIC_60 = 21 #??
MUSIC_60 = 21 # ??
MUSIC_61 = 23
ATMOS_712 = 33
ATMOS_504 = 35
@@ -72,7 +78,8 @@ class SteinbergMetadata:
"""
`AudioSpeakerArrangement` property
"""
val = self.parsed.find("./ATTR_LIST/ATTR[NAME = 'AudioSpeakerArrangement']/VALUE")
val = self.parsed.find(
"./ATTR_LIST/ATTR[NAME = 'AudioSpeakerArrangement']/VALUE")
if val is not None:
return type(self).AudioSpeakerArrangement(int(val.text))
@@ -81,7 +88,8 @@ class SteinbergMetadata:
"""
AudioSampleFormatSize
"""
val = self.parsed.find("./ATTR_LIST/ATTR[NAME = 'AudioSampleFormatSize']/VALUE")
val = self.parsed.find(
"./ATTR_LIST/ATTR[NAME = 'AudioSampleFormatSize']/VALUE")
if val is not None:
return int(val.text)
@@ -90,7 +98,8 @@ class SteinbergMetadata:
"""
MediaCompany
"""
val = self.parsed.find("./ATTR_LIST/ATTR[NAME = 'MediaCompany']/VALUE")
val = self.parsed.find(
"./ATTR_LIST/ATTR[NAME = 'MediaCompany']/VALUE")
if val is not None:
return val.text
@@ -99,7 +108,8 @@ class SteinbergMetadata:
"""
MediaDropFrames
"""
val = self.parsed.find("./ATTR_LIST/ATTR[NAME = 'MediaDropFrames']/VALUE")
val = self.parsed.find(
"./ATTR_LIST/ATTR[NAME = 'MediaDropFrames']/VALUE")
if val is not None:
return val.text == "1"
@@ -108,7 +118,8 @@ class SteinbergMetadata:
"""
MediaDuration
"""
val = self.parsed.find("./ATTR_LIST/ATTR[NAME = 'MediaDuration']/VALUE")
val = self.parsed.find(
"./ATTR_LIST/ATTR[NAME = 'MediaDuration']/VALUE")
if val is not None:
return float(val.text)
@@ -145,6 +156,7 @@ class WavIXMLFormat:
"""
iXML recorder metadata.
"""
def __init__(self, xml):
"""
Parse iXML.
@@ -153,13 +165,13 @@ class WavIXMLFormat:
self.source = xml
xml_bytes = io.BytesIO(xml)
parser = ET.XMLParser(recover=True)
self.parsed : ET.ElementTree = ET.parse(xml_bytes, parser=parser)
self.parsed: ET.ElementTree = ET.parse(xml_bytes, parser=parser)
def _get_text_value(self, xpath) -> Optional[str]:
e = self.parsed.find("./" + xpath)
if e is not None:
return e.text
else:
else:
return None
def xml_str(self) -> str:
@@ -181,10 +193,13 @@ class WavIXMLFormat:
"""
for track in self.parsed.find("./TRACK_LIST").iter():
if track.tag == 'TRACK':
yield IXMLTrack(channel_index=track.xpath('string(CHANNEL_INDEX/text())'),
interleave_index=track.xpath('string(INTERLEAVE_INDEX/text())'),
name=track.xpath('string(NAME/text())'),
function=track.xpath('string(FUNCTION/text())'))
yield IXMLTrack(
channel_index=track.xpath('string(CHANNEL_INDEX/text())'),
interleave_index=track.xpath(
'string(INTERLEAVE_INDEX/text())'),
name=track.xpath('string(NAME/text())'),
function=track.xpath('string(FUNCTION/text())')
)
@property
def project(self) -> Optional[str]:
@@ -201,7 +216,7 @@ class WavIXMLFormat:
return self._get_text_value("SCENE")
@property
def take(self) -> Optional[str]:
def take(self) -> Optional[str]:
"""
Take number.
"""
@@ -218,7 +233,8 @@ class WavIXMLFormat:
def family_uid(self) -> Optional[str]:
"""
The globally-unique ID for this file family. This may be in the format
of a GUID, or an EBU Rec 9 source identifier, or some other dumb number.
of a GUID, or an EBU Rec 9 source identifier, or some other dumb
number.
"""
return self._get_text_value("FILE_SET/FAMILY_UID")
@@ -240,11 +256,8 @@ class WavIXMLFormat:
return None
def to_dict(self):
return dict(track_list=list(map(lambda x: x._asdict(), self.track_list)),
project=self.project,
scene=self.scene,
take=self.take,
tape=self.tape,
family_uid=self.family_uid,
family_name=self.family_name
)
return dict(
track_list=list(map(lambda x: x._asdict(), self.track_list)),
project=self.project, scene=self.scene, take=self.take,
tape=self.tape, family_uid=self.family_uid,
family_name=self.family_name)

View File

@@ -1,9 +1,7 @@
#-*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import struct
import os
from collections import namedtuple
from typing import Optional, Generator, Any
from typing import Optional, Generator, Any, NamedTuple
import pathlib
@@ -16,12 +14,21 @@ from .wave_dbmd_reader import WavDolbyMetadataReader
from .wave_cues_reader import WavCuesReader
#: Calculated statistics about the audio data.
WavDataDescriptor = namedtuple('WavDataDescriptor', 'byte_count frame_count')
class WavDataDescriptor(NamedTuple):
byte_count: int
frame_count: int
#: The format of the audio samples.
WavAudioFormat = namedtuple('WavAudioFormat',
['audio_format', 'channel_count', 'sample_rate',
'byte_rate', 'block_align', 'bits_per_sample'])
class WavAudioFormat(NamedTuple):
audio_format: int
channel_count: int
sample_rate: int
byte_rate: int
block_align: int
bits_per_sample: int
class WavInfoReader:
@@ -29,70 +36,70 @@ class WavInfoReader:
Parse a WAV audio file for metadata.
"""
def __init__(self, path, info_encoding='latin_1', bext_encoding='ascii'):
def __init__(self, f, info_encoding='latin_1', bext_encoding='ascii'):
"""
Create a new reader object.
:param path:
A pathlike object or IO to the wav file you wish to probe or a
:param path:
A pathlike object or IO to the wav file you wish to probe or a
file handle to an open file.
:param info_encoding:
:param info_encoding:
The text encoding of the ``INFO``, ``LABL`` and other RIFF-defined
metadata fields.
metadata fields.
:param bext_encoding:
:param bext_encoding:
The text encoding to use when decoding the string
fields of the Broadcast-WAV extension. Per EBU 3285 this is ASCII
but this parameter is available to you if you encounter a weirdo.
"""
self.info_encoding = info_encoding
self.bext_encoding = bext_encoding
#: Wave audio data format.
self.fmt :Optional[WavAudioFormat] = None
self.fmt: Optional[WavAudioFormat] = None
#: Statistics of the `data` section.
self.data :Optional[WavDataDescriptor] = None
self.data: Optional[WavDataDescriptor] = None
#: Broadcast-Wave metadata.
self.bext :Optional[WavBextReader] = None
self.bext: Optional[WavBextReader] = None
#: iXML metadata.
self.ixml :Optional[WavIXMLFormat] = None
self.ixml: Optional[WavIXMLFormat] = None
#: ADM Audio Definiton Model metadata.
self.adm :Optional[WavADMReader]= None
self.adm: Optional[WavADMReader] = None
#: Dolby bitstream metadata.
self.dolby :Optional[WavDolbyMetadataReader] = None
self.dolby: Optional[WavDolbyMetadataReader] = None
#: RIFF INFO metadata.
self.info :Optional[WavInfoChunkReader]= None
self.info: Optional[WavInfoChunkReader] = None
#: RIFF cues markers, labels, and notes.
self.cues :Optional[WavCuesReader] = None
self.cues: Optional[WavCuesReader] = None
if hasattr(path, 'read'):
self.get_wav_info(path)
if hasattr(f, 'read'):
self.get_wav_info(f)
self.url = 'about:blank'
self.path = repr(path)
self.path = repr(f)
else:
absolute_path = os.path.abspath(path)
absolute_path = os.path.abspath(f)
#: `file://` url for the file.
self.url: str = pathlib.Path(absolute_path).as_uri()
self.path = absolute_path
with open(path, 'rb') as f:
with open(f, 'rb') as f:
self.get_wav_info(f)
def get_wav_info(self, wavfile):
chunks = parse_chunk(wavfile)
assert type(chunks) is ListChunkDescriptor
assert type(chunks) is ListChunkDescriptor
self.main_list = chunks.children
wavfile.seek(0)
@@ -100,15 +107,17 @@ class WavInfoReader:
self.fmt = self._get_format(wavfile)
self.bext = self._get_bext(wavfile, encoding=self.bext_encoding)
self.ixml = self._get_ixml(wavfile)
self.adm = self._get_adm(wavfile)
self.adm = self._get_adm(wavfile)
self.info = self._get_info(wavfile, encoding=self.info_encoding)
self.dolby = self._get_dbmd(wavfile)
self.cues = self._get_cue(wavfile)
self.data = self._describe_data()
def _find_chunk_data(self, ident, from_stream, default_none=False) -> Optional[bytes]:
top_chunks = (chunk for chunk in self.main_list \
if type(chunk) is ChunkDescriptor and chunk.ident == ident)
def _find_chunk_data(self, ident, from_stream,
default_none=False) -> Optional[bytes]:
top_chunks = (chunk for chunk in self.main_list
if type(chunk) is ChunkDescriptor and
chunk.ident == ident)
chunk_descriptor = next(top_chunks, None) \
if default_none else next(top_chunks)
@@ -117,19 +126,19 @@ class WavInfoReader:
if chunk_descriptor else None
def _find_list_chunk(self, signature) -> Optional[ListChunkDescriptor]:
top_chunks = (chunk for chunk in self.main_list \
if type(chunk) is ListChunkDescriptor and \
chunk.signature == signature)
top_chunks = (chunk for chunk in self.main_list
if type(chunk) is ListChunkDescriptor and
chunk.signature == signature)
return next(top_chunks, None)
def _describe_data(self):
data_chunk = next(c for c in self.main_list \
if type(c) is ChunkDescriptor and c.ident == b'data')
data_chunk = next(c for c in self.main_list
if type(c) is ChunkDescriptor and c.ident == b'data')
assert isinstance(self.fmt, WavAudioFormat)
return WavDataDescriptor(
byte_count=data_chunk.length,
byte_count=data_chunk.length,
frame_count=int(data_chunk.length / self.fmt.block_align))
def _get_format(self, f):
@@ -150,8 +159,8 @@ class WavInfoReader:
)
def _get_info(self, f, encoding):
finder = (chunk.signature for chunk in self.main_list \
if type(chunk) is ListChunkDescriptor)
finder = (chunk.signature for chunk in self.main_list
if type(chunk) is ListChunkDescriptor)
if b'INFO' in finder:
return WavInfoChunkReader(f, encoding)
@@ -176,9 +185,9 @@ class WavInfoReader:
return WavIXMLFormat(ixml_data.rstrip(b'\0')) if ixml_data else None
def _get_cue(self, f):
cue = next((cue_chunk for cue_chunk in self.main_list if \
type(cue_chunk) is ChunkDescriptor and \
cue_chunk.ident == b'cue '), None)
cue = next((cue_chunk for cue_chunk in self.main_list if
type(cue_chunk) is ChunkDescriptor and
cue_chunk.ident == b'cue '), None)
adtl = self._find_list_chunk(b'adtl')
labls = []
@@ -189,20 +198,21 @@ class WavInfoReader:
ltxts = [c for c in adtl.children if c.ident == b'ltxt']
notes = [c for c in adtl.children if c.ident == b'note']
return WavCuesReader.read_all(f, cue, labls, ltxts, notes,
fallback_encoding=self.info_encoding)
return WavCuesReader.read_all(f, cue, labls, ltxts, notes,
fallback_encoding=self.info_encoding)
def walk(self) -> Generator[str,str,Any]: #FIXME: this should probably be named "iter()"
# FIXME: this should probably be named "iter()"
def walk(self) -> Generator[str, str, Any]:
"""
Walk all of the available metadata fields.
:yields: tuples of the *scope*, *key*, and *value* of
each metadatum. The *scope* value will be one of
"fmt", "data", "ixml", "bext", "info", "dolby", "cues" or "adm".
"""
scopes = ('fmt', 'data', 'ixml', 'bext', 'info', 'adm', 'cues',
'dolby')
scopes = ('fmt', 'data', 'ixml', 'bext', 'info', 'adm', 'cues',
'dolby')
for scope in scopes:
if scope in ['fmt', 'data']:
@@ -211,9 +221,12 @@ class WavInfoReader:
yield scope, field, attr.__getattribute__(field)
else:
dict = self.__getattribute__(scope).to_dict() if self.__getattribute__(scope) else {}
dict = self.__getattribute__(scope).to_dict(
) if self.__getattribute__(scope) else {}
for key in dict.keys():
yield scope, key, dict[key]
def __repr__(self):
return 'WavInfoReader({}, {}, {})'.format(self.path, self.info_encoding, self.bext_encoding)
return 'WavInfoReader({}, {}, {})'.format(self.path,
self.info_encoding,
self.bext_encoding)

View File

@@ -9,23 +9,24 @@ import sys
def main():
parser = OptionParser()
parser.usage = "wavfind [--scene=SCENE] [--take=TAKE] [--desc=DESC] <PATH> +"
parser.usage = ("wavfind [--scene=SCENE] [--take=TAKE] [--desc=DESC] "
"<PATH> +")
primaries = OptionGroup(parser, title="Search Predicates",
description="Argument values can be globs, and are logically-AND'ed.")
description="Argument values can be globs, "
"and are logically-AND'ed.")
primaries.add_option("--scene",
help='Search for this scene',
metavar='SCENE')
primaries.add_option("--scene",
help='Search for this scene',
metavar='SCENE')
primaries.add_option("--take",
help='Search for this take',
metavar='TAKE')
help='Search for this take',
metavar='TAKE')
primaries.add_option("--desc",
help='Search descriptions',
metavar='DESC')
help='Search descriptions',
metavar='DESC')
(options, args) = parser.parse_args(sys.argv)