diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..4b03aea --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +per-file-ignores = + wavinfo/__init__.py: F401 diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index e697847..d045e45 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -40,3 +40,4 @@ jobs: - name: Test with pytest run: | pytest + flake8 wavinfo diff --git a/tests/test_adm.py b/tests/test_adm.py index 95c7bb5..9e9cbb5 100644 --- a/tests/test_adm.py +++ b/tests/test_adm.py @@ -15,17 +15,20 @@ class TestADMWave(TestCase): adm = info.adm self.assertIsNotNone(adm) + assert adm is not None self.assertEqual(len(adm.channel_uids), 14) def test_to_dict(self): info = wavinfo.WavInfoReader(self.protools_adm_wav) adm = info.adm + assert adm is not None dict = adm.to_dict() self.assertIsNotNone(dict) def test_programme(self): info = wavinfo.WavInfoReader(self.protools_adm_wav) adm = info.adm + assert adm is not None pdict = adm.programme() self.assertIn("programme_id", pdict.keys()) self.assertIn("programme_name", pdict.keys()) @@ -37,7 +40,7 @@ class TestADMWave(TestCase): def test_track_info(self): info = wavinfo.WavInfoReader(self.protools_adm_wav) adm = info.adm - + assert adm is not None t1 = adm.track_info(0) self.assertTrue("channel_format_name" in t1.keys()) self.assertEqual("RoomCentricLeft", t1["channel_format_name"]) diff --git a/tests/test_cue.py b/tests/test_cue.py index 4271769..4c5e3fa 100644 --- a/tests/test_cue.py +++ b/tests/test_cue.py @@ -12,6 +12,7 @@ class TestCue(TestCase): file1 = "tests/test_files/cue_chunks/STE-000.wav" w1 = wavinfo.WavInfoReader(file1) self.assertIsNotNone(w1.cues) + assert w1.cues is not None vals = list(w1.cues.each_cue()) self.assertEqual(vals, [(1,29616),(2,74592),(3,121200)]) diff --git a/tests/test_dolby.py b/tests/test_dolby.py index 6a01962..93793cc 100644 --- a/tests/test_dolby.py +++ b/tests/test_dolby.py @@ -1,7 +1,7 @@ from unittest import TestCase import wavinfo -from wavinfo.wave_dbmd_reader import SegmentType, DolbyAtmosMetadata, DolbyDigitalPlusMetadata +from wavinfo.wave_dbmd_reader import SegmentType, DolbyDigitalPlusMetadata class TestDolby(TestCase): def setUp(self): @@ -19,8 +19,10 @@ class TestDolby(TestCase): d = t1.dolby assert d is not None - ddp = [x for x in d.segment_list if x[0] == SegmentType.DolbyDigitalPlus] - atmos = [x for x in d.segment_list if x[0] == SegmentType.DolbyAtmos] + ddp = [x for x in d.segment_list \ + if x[0] == SegmentType.DolbyDigitalPlus] + atmos = [x for x in d.segment_list \ + if x[0] == SegmentType.DolbyAtmos] self.assertEqual(len(ddp), 1) self.assertEqual(len(atmos), 1) @@ -38,8 +40,13 @@ class TestDolby(TestCase): d = t1.dolby assert d is not None ddp = d.dolby_digital_plus() - self.assertEqual(len(ddp), 1, "Failed to find exactly one Dolby Digital Plus metadata segment") - self.assertTrue( ddp[0].audio_coding_mode, DolbyDigitalPlusMetadata.AudioCodingMode.CH_ORD_3_2 ) + self.assertEqual(len(ddp), 1, + ("Failed to find exactly one Dolby Digital Plus " + "metadata segment") + ) + + self.assertTrue( ddp[0].audio_coding_mode, + DolbyDigitalPlusMetadata.AudioCodingMode.CH_ORD_3_2 ) self.assertTrue( ddp[0].lfe_on) def test_atmos(self): @@ -47,6 +54,7 @@ class TestDolby(TestCase): d = t1.dolby assert d is not None atmos = d.dolby_atmos() - self.assertEqual(len(atmos), 1, "Failed to find exactly one Atmos metadata segment") + self.assertEqual(len(atmos), 1, + "Failed to find exactly one Atmos metadata segment") diff --git a/tests/test_main.py b/tests/test_main.py index 0130cf5..e1474b0 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -26,7 +26,8 @@ class MainTest(unittest.TestCase): def test_ixml(self): with patch.object(sys, 'argv', - ['TEST', '--ixml', 'tests/test_files/sounddevices/A101_1.WAV']): + ['TEST', '--ixml', + 'tests/test_files/sounddevices/A101_1.WAV']): try: main() except: diff --git a/tests/test_wave_parsing.py b/tests/test_wave_parsing.py index 37d074d..39127cf 100644 --- a/tests/test_wave_parsing.py +++ b/tests/test_wave_parsing.py @@ -13,7 +13,9 @@ class TestWaveInfo(TestCase): def test_sanity(self): for wav_file in all_files(): info = wavinfo.WavInfoReader(wav_file) - self.assertEqual(info.__repr__(), 'WavInfoReader({}, latin_1, ascii)'.format(os.path.abspath(wav_file))) + self.assertEqual(info.__repr__(), + 'WavInfoReader({}, latin_1, ascii)' + .format(os.path.abspath(wav_file))) self.assertIsNotNone(info) def test_fmt_against_ffprobe(self): @@ -24,14 +26,21 @@ class TestWaveInfo(TestCase): assert info.fmt is not None assert ffprobe_info is not None - self.assertEqual(info.fmt.channel_count, ffprobe_info['streams'][0]['channels']) - self.assertEqual(info.fmt.sample_rate, int(ffprobe_info['streams'][0]['sample_rate'])) - self.assertEqual(info.fmt.bits_per_sample, int(ffprobe_info['streams'][0]['bits_per_sample'])) + self.assertEqual(info.fmt.channel_count, + ffprobe_info['streams'][0]['channels']) + self.assertEqual(info.fmt.sample_rate, + int(ffprobe_info['streams'][0]['sample_rate'])) + self.assertEqual(info.fmt.bits_per_sample, + int(ffprobe_info['streams'][0]['bits_per_sample'] + )) if info.fmt.audio_format == 1: - self.assertTrue(ffprobe_info['streams'][0]['codec_name'].startswith('pcm')) + self.assertTrue(ffprobe_info['streams'][0]['codec_name']\ + .startswith('pcm')) streams = ffprobe_info['streams'][0] - byte_rate = int(streams['sample_rate']) * streams['channels'] * int(streams['bits_per_sample']) / 8 + byte_rate = int(streams['sample_rate']) * \ + streams['channels'] * \ + int(streams['bits_per_sample']) / 8 self.assertEqual(info.fmt.byte_rate, byte_rate) def test_data_against_ffprobe(self): @@ -40,7 +49,8 @@ class TestWaveInfo(TestCase): ffprobe_info = cast(Dict[str,Any], ffprobe(wav_file)) assert ffprobe_info is not None assert info.data is not None - self.assertEqual(info.data.frame_count, int(ffprobe_info['streams'][0]['duration_ts'])) + self.assertEqual(info.data.frame_count, + int(ffprobe_info['streams'][0]['duration_ts'])) def test_bext_against_ffprobe(self): for wav_file in all_files(): @@ -50,39 +60,63 @@ class TestWaveInfo(TestCase): if info.bext: if 'comment' in ffprobe_info['format']['tags']: - self.assertEqual(info.bext.description, ffprobe_info['format']['tags']['comment']) + self.assertEqual(info.bext.description, + ffprobe_info['format']['tags']\ + ['comment']) else: self.assertEqual(info.bext.description, '') if 'encoded_by' in ffprobe_info['format']['tags']: - self.assertEqual(info.bext.originator, ffprobe_info['format']['tags']['encoded_by']) + self.assertEqual(info.bext.originator, + ffprobe_info['format']['tags']\ + ['encoded_by']) else: self.assertEqual(info.bext.originator, '') if 'originator_reference' in ffprobe_info['format']['tags']: - self.assertEqual(info.bext.originator_ref, ffprobe_info['format']['tags']['originator_reference']) + self.assertEqual(info.bext.originator_ref, + ffprobe_info['format']['tags']\ + ['originator_reference']) else: self.assertEqual(info.bext.originator_ref, '') # these don't always reflect the bext info - # self.assertEqual(info.bext.originator_date, ffprobe_info['format']['tags']['date']) - # self.assertEqual(info.bext.originator_time, ffprobe_info['format']['tags']['creation_time']) - self.assertEqual(info.bext.time_reference, int(ffprobe_info['format']['tags']['time_reference'])) + # self.assertEqual(info.bext.originator_date, + # ffprobe_info['format']['tags']['date']) + # self.assertEqual(info.bext.originator_time, + # ffprobe_info['format']['tags']['creation_time']) + self.assertEqual(info.bext.time_reference, + int(ffprobe_info['format']['tags']\ + ['time_reference'])) if 'coding_history' in ffprobe_info['format']['tags']: - self.assertEqual(info.bext.coding_history, ffprobe_info['format']['tags']['coding_history']) + self.assertEqual(info.bext.coding_history, + ffprobe_info['format']['tags']\ + ['coding_history']) else: self.assertEqual(info.bext.coding_history, '') def test_ixml(self): - expected = {'A101_4.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '4', - 'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124015008231000'}, - 'A101_3.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '3', - 'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124014008228300'}, - 'A101_2.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '2', - 'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124004008218600'}, - 'A101_1.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '1', - 'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124001008206300'}, + expected = {'A101_4.WAV': {'project': 'BMH', + 'scene': 'A101', 'take': '4', + 'tape': '18Y12M31', + 'family_uid': + 'USSDVGR1112089007124015008231000'}, + 'A101_3.WAV': {'project': 'BMH', + 'scene': 'A101', 'take': '3', + 'tape': '18Y12M31', + 'family_uid': + 'USSDVGR1112089007124014008228300'}, + 'A101_2.WAV': {'project': 'BMH', + 'scene': 'A101', 'take': '2', + 'tape': '18Y12M31', + 'family_uid': + 'USSDVGR1112089007124004008218600'}, + 'A101_1.WAV': {'project': 'BMH', + 'scene': 'A101', 'take': '1', + 'tape': '18Y12M31', + 'family_uid': + 'USSDVGR1112089007124001008206300'}, } for wav_file in all_files(): @@ -112,7 +146,8 @@ class TestWaveInfo(TestCase): assert info.ixml.steinberg is not None self.assertIsNotNone(info.ixml.steinberg.audio_speaker_arrangement) self.assertEqual(info.ixml.steinberg.sample_format_size, 3) - self.assertEqual(info.ixml.steinberg.media_company, "https://github.com/iluvcapra/wavinfo") + self.assertEqual(info.ixml.steinberg.media_company, + "https://github.com/iluvcapra/wavinfo") self.assertFalse(info.ixml.steinberg.media_drop_frames) self.assertEqual(info.ixml.steinberg.media_duration, 1200.0) @@ -124,7 +159,8 @@ class TestWaveInfo(TestCase): self.assertIsNone(info.ixml.steinberg) def test_info_metadata(self): - file_with_metadata = 'tests/test_files/sound_grinder_pro/new_camera bumb 1.wav' + file_with_metadata = \ + 'tests/test_files/sound_grinder_pro/new_camera bumb 1.wav' self.assertTrue(os.path.exists(file_with_metadata)) info = wavinfo.WavInfoReader(file_with_metadata).info @@ -138,7 +174,8 @@ class TestWaveInfo(TestCase): self.assertEqual(info.software, 'Sound Grinder Pro') self.assertEqual(info.created_date, '2010-12-28') self.assertEqual(info.engineer, 'JPH') - self.assertEqual(info.keywords, 'Sound Effect, movement, microphone, bump') + self.assertEqual(info.keywords, + 'Sound Effect, movement, microphone, bump') self.assertEqual(info.title, 'camera bumb 1') self.assertEqual(type(info.to_dict()), dict) self.assertEqual(type(info.__repr__()), str) diff --git a/tests/utils.py b/tests/utils.py index b802b8e..17ae284 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,7 +8,8 @@ FFPROBE = 'ffprobe' def ffprobe(path): - arguments = [FFPROBE, "-of", "json", "-show_format", "-show_streams", path] + arguments = [FFPROBE, "-of", "json", + "-show_format", "-show_streams", path] if int(sys.version[0]) < 3: process = subprocess.Popen(arguments, stdout=PIPE) process.wait() @@ -20,7 +21,8 @@ def ffprobe(path): else: return None else: - process = subprocess.run(arguments, stdin=None, stdout=PIPE, stderr=PIPE) + process = subprocess.run(arguments, stdin=None, + stdout=PIPE, stderr=PIPE) if process.returncode == 0: output_str = process.stdout.decode('utf-8') return json.loads(output_str) diff --git a/wavinfo/__main__.py b/wavinfo/__main__.py index 81640c5..8a5e2d4 100644 --- a/wavinfo/__main__.py +++ b/wavinfo/__main__.py @@ -1,11 +1,13 @@ -from optparse import OptionParser, OptionGroup import datetime from . import WavInfoReader from . import __version__ + +from optparse import OptionParser import sys import json from enum import Enum + class MyJSONEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, Enum): @@ -13,28 +15,30 @@ class MyJSONEncoder(json.JSONEncoder): else: return super().default(o) + class MissingDataError(RuntimeError): pass + def main(): parser = OptionParser() parser.usage = 'wavinfo (--adm | --ixml) +' - # parser.add_option('-f', dest='output_format', help='Set the output format', - # default='json', - # metavar='FORMAT') + parser.add_option('--adm', dest='adm', + help='Output ADM XML', + default=False, + action='store_true') - parser.add_option('--adm', dest='adm', help='Output ADM XML', - default=False, action='store_true') - - parser.add_option('--ixml', dest='ixml', help='Output iXML', - default=False, action='store_true') + parser.add_option('--ixml', dest='ixml', + help='Output iXML', + default=False, + action='store_true') (options, args) = parser.parse_args(sys.argv) for arg in args[1:]: try: - this_file = WavInfoReader(path=arg) + this_file = WavInfoReader(f=arg) if options.adm: if this_file.adm: sys.stdout.write(this_file.adm.xml_str()) @@ -47,9 +51,9 @@ def main(): raise MissingDataError("ixml") else: ret_dict = { - 'filename': arg, - 'run_date': datetime.datetime.now().isoformat() , - 'application': "wavinfo " + __version__, + 'filename': arg, + 'run_date': datetime.datetime.now().isoformat(), + 'application': "wavinfo " + __version__, 'scopes': {} } for scope, name, value in this_file.walk(): @@ -60,7 +64,8 @@ def main(): json.dump(ret_dict, cls=MyJSONEncoder, fp=sys.stdout, indent=2) except MissingDataError as e: - print("MissingDataError: Missing metadata (%s) in file %s" % (e, arg), file=sys.stderr) + print("MissingDataError: Missing metadata (%s) in file %s" % + (e, arg), file=sys.stderr) continue except Exception as e: raise e diff --git a/wavinfo/rf64_parser.py b/wavinfo/rf64_parser.py index b0b077c..5a46961 100644 --- a/wavinfo/rf64_parser.py +++ b/wavinfo/rf64_parser.py @@ -1,14 +1,18 @@ import struct -from collections import namedtuple +# from collections import namedtuple +from typing import NamedTuple, Dict from . import riff_parser -RF64Context = namedtuple('RF64Context','sample_count bigchunk_table') + +class RF64Context(NamedTuple): + sample_count: int + bigchunk_table: Dict[str, int] -def parse_rf64(stream, signature = b'RF64') -> RF64Context: +def parse_rf64(stream, signature=b'RF64') -> RF64Context: start = stream.tell() - assert( stream.read(4) == b'WAVE' ) + assert stream.read(4) == b'WAVE' ds64_chunk = riff_parser.parse_chunk(stream) assert type(ds64_chunk) is riff_parser.ChunkDescriptor, \ @@ -16,27 +20,28 @@ def parse_rf64(stream, signature = b'RF64') -> RF64Context: ds64_field_spec = "= ds64_fields_size) + assert len(ds64_data) >= ds64_fields_size riff_size, data_size, sample_count, length_lookup_table = struct.unpack( - ds64_field_spec, ds64_data[0:ds64_fields_size]) + ds64_field_spec, ds64_data[0:ds64_fields_size] + ) bigchunk_table = {} chunksize64format = "<4sL" # chunksize64size = struct.calcsize(chunksize64format) for _ in range(length_lookup_table): - bigname, bigsize = struct.unpack_from(chunksize64format, ds64_data, - offset= ds64_fields_size) + bigname, bigsize = struct.unpack_from(chunksize64format, + ds64_data, + offset=ds64_fields_size) bigchunk_table[bigname] = bigsize bigchunk_table[b'data'] = data_size bigchunk_table[signature] = riff_size stream.seek(start, 0) - return RF64Context( sample_count=sample_count, + return RF64Context(sample_count=sample_count, bigchunk_table=bigchunk_table) - diff --git a/wavinfo/riff_parser.py b/wavinfo/riff_parser.py index b10489e..87ce6d9 100644 --- a/wavinfo/riff_parser.py +++ b/wavinfo/riff_parser.py @@ -1,7 +1,7 @@ - +# from optparse import Option import struct -from collections import namedtuple -from .rf64_parser import parse_rf64 +from .rf64_parser import parse_rf64, RF64Context +from typing import NamedTuple, Union, List, Optional class WavInfoEOFError(EOFError): @@ -10,11 +10,17 @@ class WavInfoEOFError(EOFError): self.chunk_start = chunk_start -class ListChunkDescriptor(namedtuple('ListChunkDescriptor', 'signature children')): - pass +class ListChunkDescriptor(NamedTuple): + signature: bytes + children: List[Union['ChunkDescriptor', 'ListChunkDescriptor']] -class ChunkDescriptor(namedtuple('ChunkDescriptor', 'ident start length rf64_context')): +class ChunkDescriptor(NamedTuple): + ident: bytes + start: int + length: int + rf64_context: Optional[RF64Context] + def read_data(self, from_stream) -> bytes: from_stream.seek(self.start) return from_stream.read(self.length) @@ -49,8 +55,8 @@ def parse_chunk(stream, rf64_context=None): rf64_context = parse_rf64(stream=stream, signature=ident) assert rf64_context is not None, \ - f"Sentinel data size 0xFFFFFFFF found outside of RF64 context" - + "Sentinel data size 0xFFFFFFFF found outside of RF64 context" + data_size = rf64_context.bigchunk_table[ident] displacement = data_size @@ -64,5 +70,7 @@ def parse_chunk(stream, rf64_context=None): else: data_start = stream.tell() stream.seek(displacement, 1) - return ChunkDescriptor(ident=ident, start=data_start, length=data_size, + return ChunkDescriptor(ident=ident, + start=data_start, + length=data_size, rf64_context=rf64_context) diff --git a/wavinfo/umid_parser.py b/wavinfo/umid_parser.py index b6e67ab..86b3000 100644 --- a/wavinfo/umid_parser.py +++ b/wavinfo/umid_parser.py @@ -2,8 +2,8 @@ # def binary_to_string(binary_value): -# return reduce(lambda val, el: val + "{:02x}".format(el), binary_value, '') - +# return reduce(lambda val, el: val + "{:02x}".format(el), +# binary_value, '') # class UMIDParser: # """ @@ -13,109 +13,109 @@ # """ # def __init__(self, raw_umid: bytes): # self.raw_umid = raw_umid - # - # @property - # def universal_label(self) -> bytearray: - # return self.raw_umid[0:12] - # - # @property - # def basic_umid(self): - # return self.raw_umid[0:32] +# +# @property +# def universal_label(self) -> bytearray: +# return self.raw_umid[0:12] +# +# @property +# def basic_umid(self): +# return self.raw_umid[0:32] - # def basic_umid_to_str(self): - # return binary_to_string(self.raw_umid[0:32]) - # - # @property - # def universal_label_is_valid(self) -> bool: - # valid_preamble = b'\x06\x0a\x2b\x34\x01\x01\x01\x05\x01\x01' - # return self.universal_label[0:len(valid_preamble)] == valid_preamble - # - # @property - # def material_type(self) -> str: - # material_byte = self.raw_umid[10] - # if material_byte == 0x1: - # return 'picture' - # elif material_byte == 0x2: - # return 'audio' - # elif material_byte == 0x3: - # return 'data' - # elif material_byte == 0x4: - # return 'other' - # elif material_byte == 0x5: - # return 'picture_single_component' - # elif material_byte == 0x6: - # return 'picture_multiple_component' - # elif material_byte == 0x7: - # return 'audio_single_component' - # elif material_byte == 0x9: - # return 'audio_multiple_component' - # elif material_byte == 0xb: - # return 'auxiliary_single_component' - # elif material_byte == 0xc: - # return 'auxiliary_multiple_component' - # elif material_byte == 0xd: - # return 'mixed_components' - # elif material_byte == 0xf: - # return 'not_identified' - # else: - # return 'not_recognized' - # - # @property - # def material_number_creation_method(self) -> str: - # method_byte = self.raw_umid[11] - # method_byte = (method_byte << 4) & 0xf - # if method_byte == 0x0: - # return 'undefined' - # elif method_byte == 0x1: - # return 'smpte' - # elif method_byte == 0x2: - # return 'uuid' - # elif method_byte == 0x3: - # return 'masked' - # elif method_byte == 0x4: - # return 'ieee1394' - # elif 0x5 <= method_byte <= 0x7: - # return 'reserved_undefined' - # else: - # return 'unrecognized' - # - # @property - # def instance_number_creation_method(self) -> str: - # method_byte = self.raw_umid[11] - # method_byte = method_byte & 0xf - # if method_byte == 0x0: - # return 'undefined' - # elif method_byte == 0x01: - # return 'local_registration' - # elif method_byte == 0x02: - # return '24_bit_prs' - # elif method_byte == 0x03: - # return 'copy_number_and_16_bit_prs' - # elif 0x04 <= method_byte <= 0x0e: - # return 'reserved_undefined' - # elif method_byte == 0x0f: - # return 'live_stream' - # else: - # return 'unrecognized' - # - # @property - # def indicated_length(self) -> str: - # if self.raw_umid[12] == 0x13: - # return 'basic' - # elif self.raw_umid[12] == 0x33: - # return 'extended' - # - # @property - # def instance_number(self) -> bytearray: - # return self.raw_umid[13:3] - # - # @property - # def material_number(self) -> bytearray: - # return self.raw_umid[16:16] - # - # @property - # def source_pack(self) -> Union[bytearray, None]: - # if self.indicated_length == 'extended': - # return self.raw_umid[32:32] - # else: - # return None +# def basic_umid_to_str(self): +# return binary_to_string(self.raw_umid[0:32]) +# +# @property +# def universal_label_is_valid(self) -> bool: +# valid_preamble = b'\x06\x0a\x2b\x34\x01\x01\x01\x05\x01\x01' +# return self.universal_label[0:len(valid_preamble)] == valid_preamble +# +# @property +# def material_type(self) -> str: +# material_byte = self.raw_umid[10] +# if material_byte == 0x1: +# return 'picture' +# elif material_byte == 0x2: +# return 'audio' +# elif material_byte == 0x3: +# return 'data' +# elif material_byte == 0x4: +# return 'other' +# elif material_byte == 0x5: +# return 'picture_single_component' +# elif material_byte == 0x6: +# return 'picture_multiple_component' +# elif material_byte == 0x7: +# return 'audio_single_component' +# elif material_byte == 0x9: +# return 'audio_multiple_component' +# elif material_byte == 0xb: +# return 'auxiliary_single_component' +# elif material_byte == 0xc: +# return 'auxiliary_multiple_component' +# elif material_byte == 0xd: +# return 'mixed_components' +# elif material_byte == 0xf: +# return 'not_identified' +# else: +# return 'not_recognized' +# +# @property +# def material_number_creation_method(self) -> str: +# method_byte = self.raw_umid[11] +# method_byte = (method_byte << 4) & 0xf +# if method_byte == 0x0: +# return 'undefined' +# elif method_byte == 0x1: +# return 'smpte' +# elif method_byte == 0x2: +# return 'uuid' +# elif method_byte == 0x3: +# return 'masked' +# elif method_byte == 0x4: +# return 'ieee1394' +# elif 0x5 <= method_byte <= 0x7: +# return 'reserved_undefined' +# else: +# return 'unrecognized' +# +# @property +# def instance_number_creation_method(self) -> str: +# method_byte = self.raw_umid[11] +# method_byte = method_byte & 0xf +# if method_byte == 0x0: +# return 'undefined' +# elif method_byte == 0x01: +# return 'local_registration' +# elif method_byte == 0x02: +# return '24_bit_prs' +# elif method_byte == 0x03: +# return 'copy_number_and_16_bit_prs' +# elif 0x04 <= method_byte <= 0x0e: +# return 'reserved_undefined' +# elif method_byte == 0x0f: +# return 'live_stream' +# else: +# return 'unrecognized' +# +# @property +# def indicated_length(self) -> str: +# if self.raw_umid[12] == 0x13: +# return 'basic' +# elif self.raw_umid[12] == 0x33: +# return 'extended' +# +# @property +# def instance_number(self) -> bytearray: +# return self.raw_umid[13:3] +# +# @property +# def material_number(self) -> bytearray: +# return self.raw_umid[16:16] +# +# @property +# def source_pack(self) -> Union[bytearray, None]: +# if self.indicated_length == 'extended': +# return self.raw_umid[32:32] +# else: +# return None diff --git a/wavinfo/wave_adm_reader.py b/wavinfo/wave_adm_reader.py index e3155e1..665aa73 100644 --- a/wavinfo/wave_adm_reader.py +++ b/wavinfo/wave_adm_reader.py @@ -5,12 +5,14 @@ ADM Reader from struct import unpack, unpack_from, calcsize from io import BytesIO from collections import namedtuple -from typing import Iterable, Tuple +from typing import Optional from lxml import etree as ET + ChannelEntry = namedtuple('ChannelEntry', "track_index uid track_ref pack_ref") + class WavADMReader: """ Reads XML data from an EBU ADM (Audio Definiton Model) WAV File. @@ -26,24 +28,24 @@ class WavADMReader: _, uid_count = unpack(header_fmt, chna_data[0:4]) - #: A list of :class:`ChannelEntry` objects parsed from the - #: `chna` metadata chunk. - #: - #: .. note:: - #: In-file, the `chna` track indexes start at 1. However, this interface - #: numbers the first track 0, in order to maintain consistency with other - #: libraries. self.channel_uids = [] offset = calcsize(header_fmt) for _ in range(uid_count): - track_index, uid, track_ref, pack_ref = unpack_from(uid_fmt, chna_data, offset) + track_index, uid, track_ref, pack_ref = unpack_from(uid_fmt, + chna_data, + offset) # these values are either ascii or all null - self.channel_uids.append(ChannelEntry(track_index - 1, - uid.decode('ascii') , track_ref.decode('ascii'), pack_ref.decode('ascii'))) + self.channel_uids.append( + ChannelEntry(track_index - 1, + uid.decode('ascii'), + track_ref.decode('ascii'), + pack_ref.decode('ascii') + ) + ) offset += calcsize(uid_fmt) @@ -53,12 +55,13 @@ class WavADMReader: def programme(self) -> dict: """ - Read the ADM `audioProgramme` data structure and some of its reference properties. + Read the ADM `audioProgramme` data structure and some of its reference + properties. """ ret_dict = dict() - nsmap = self.axml.getroot().nsmap - + nsmap = self.axml.getroot().nsmap + afext = self.axml.find(".//audioFormatExtended", namespaces=nsmap) program = afext.find("audioProgramme", namespaces=nsmap) @@ -68,17 +71,21 @@ class WavADMReader: ret_dict['programme_end'] = program.get("end") ret_dict['contents'] = [] - for content_ref in program.findall("audioContentIDRef", namespaces=nsmap): + for content_ref in program.findall("audioContentIDRef", + namespaces=nsmap): content_dict = dict() content_dict['content_id'] = cid = content_ref.text - content = afext.find("audioContent[@audioContentID='%s']" % cid, namespaces=nsmap) + content = afext.find("audioContent[@audioContentID='%s']" % cid, + namespaces=nsmap) content_dict['content_name'] = content.get("audioContentName") content_dict['objects'] = [] - for object_ref in content.findall("audioObjectIDRef", namespaces=nsmap): + for object_ref in content.findall("audioObjectIDRef", + namespaces=nsmap): object_dict = dict() object_dict['object_id'] = oid = object_ref.text - object = afext.find("audioObject[@audioObjectID='%s']" % oid, namespaces=nsmap) + object = afext.find("audioObject[@audioObjectID='%s']" % oid, + namespaces=nsmap) pack = object.find("audioPackFormatIDRef", namespaces=nsmap) object_dict['object_name'] = object.get("audioObjectName") object_dict['object_start'] = object.get("start") @@ -95,16 +102,18 @@ class WavADMReader: return ret_dict - def track_info(self, index) -> dict: + def track_info(self, index) -> Optional[dict]: """ Information about a track in the WAV file. - :param index: index of audio track (indexed from zero) - :returns: a dictionary with *content_name*, *content_id*, *object_name*, *object_id*, + :param index: index of audio track (indexed from zero) + :returns: a dictionary with *content_name*, *content_id*, + *object_name*, *object_id*, *pack_format_name*, *pack_type*, *channel_format_name* """ - channel_info = next((x for x in self.channel_uids if x.track_index == index), None) - + channel_info = next((x for x in self.channel_uids + if x.track_index == index), None) + if channel_info is None: return None @@ -112,46 +121,60 @@ class WavADMReader: nsmap = self.axml.getroot().nsmap - afext = self.axml.find(".//audioFormatExtended", namespaces=nsmap) + afext = self.axml.find(".//audioFormatExtended", + namespaces=nsmap) - trackformat_elem = afext.find("audioTrackFormat[@audioTrackFormatID='%s']" % channel_info.track_ref, - namespaces=nsmap) + trackformat_elem = afext.find( + "audioTrackFormat[@audioTrackFormatID='%s']" + % channel_info.track_ref, namespaces=nsmap) stream_id = trackformat_elem[0].text - channelformatref_elem = afext.find("audioStreamFormat[@audioStreamFormatID='%s']/audioChannelFormatIDRef" % stream_id, + channelformatref_elem = afext.find( + ("audioStreamFormat[@audioStreamFormatID='%s']" + "/audioChannelFormatIDRef") % stream_id, namespaces=nsmap) channelformat_id = channelformatref_elem.text - packformatref_elem = afext.find("audioStreamFormat[@audioStreamFormatID='%s']/audioPackFormatIDRef" % stream_id, + packformatref_elem = afext.find( + ("audioStreamFormat[@audioStreamFormatID='%s']" + "/audioPackFormatIDRef") % stream_id, namespaces=nsmap) packformat_id = packformatref_elem.text - channelformat_elem = afext.find("audioChannelFormat[@audioChannelFormatID='%s']" % channelformat_id, - namespaces=nsmap) - ret_dict['channel_format_name'] = channelformat_elem.get("audioChannelFormatName") + channelformat_elem = afext\ + .find("audioChannelFormat[@audioChannelFormatID='%s']" + % channelformat_id, + namespaces=nsmap) + ret_dict['channel_format_name'] = channelformat_elem.get( + "audioChannelFormatName") - packformat_elem = afext.find("audioPackFormat[@audioPackFormatID='%s']" % packformat_id, + packformat_elem = afext.find( + "audioPackFormat[@audioPackFormatID='%s']" % packformat_id, namespaces=nsmap) - ret_dict['pack_type'] = packformat_elem.get("typeDefinition") - ret_dict['pack_format_name'] = packformat_elem.get("audioPackFormatName") + ret_dict['pack_type'] = packformat_elem.get( + "typeDefinition") + ret_dict['pack_format_name'] = packformat_elem.get( + "audioPackFormatName") - object_elem = afext.find("audioObject[audioPackFormatIDRef = '%s']" % packformat_id, - namespaces=nsmap) + object_elem = afext.find("audioObject[audioPackFormatIDRef = '%s']" + % packformat_id, + namespaces=nsmap) ret_dict['audio_object_name'] = object_elem.get("audioObjectName") object_id = object_elem.get("audioObjectID") ret_dict['object_id'] = object_id - content_elem = afext.find("audioContent/[audioObjectIDRef = '%s']" % object_id, - namespaces=nsmap) + content_elem = afext.find("audioContent/[audioObjectIDRef = '%s']" + % object_id, + namespaces=nsmap) ret_dict['content_name'] = content_elem.get("audioContentName") ret_dict['content_id'] = content_elem.get("audioContentID") return ret_dict - def to_dict(self) -> dict: #FIXME should be "asdict" + def to_dict(self) -> dict: # FIXME should be "asdict" """ Get ADM metadata as a dictionary. """ @@ -161,5 +184,6 @@ class WavADMReader: rd.update(self.track_info(channel_uid_rec.track_index)) return rd - return dict(channel_entries=list(map(lambda z: make_entry(z), self.channel_uids)), - programme=self.programme()) \ No newline at end of file + return dict(channel_entries=list(map(lambda z: make_entry(z), + self.channel_uids)), + programme=self.programme()) diff --git a/wavinfo/wave_bext_reader.py b/wavinfo/wave_bext_reader.py index d0a1f3c..e65925f 100644 --- a/wavinfo/wave_bext_reader.py +++ b/wavinfo/wave_bext_reader.py @@ -3,71 +3,75 @@ import struct from typing import Optional + class WavBextReader: def __init__(self, bext_data, encoding): """ Read Broadcast-WAV extended metadata. :param bext_data: The bytes-like data. - :param encoding: The encoding to use when decoding the text fields of the - BEXT metadata scope. According to EBU Rec 3285 this shall be ASCII. + :param encoding: The encoding to use when decoding the text fields of + the BEXT metadata scope. According to EBU Rec 3285 this shall be + ASCII. """ - packstring = "<256s" + "32s" + "32s" + "10s" + "8s" + "QH" + "64s" + "hhhhh" + "180s" + packstring = "<256s" + "32s" + "32s" + "10s" + "8s" + "QH" + "64s" + \ + "hhhhh" + "180s" rest_starts = struct.calcsize(packstring) unpacked = struct.unpack(packstring, bext_data[:rest_starts]) - def sanitize_bytes(b : bytes) -> str: + def sanitize_bytes(b: bytes) -> str: # honestly can't remember why I'm stripping nulls this way - first_null = next((index for index, byte in enumerate(b) if byte == 0), None) + first_null = next((index for index, byte in enumerate(b) + if byte == 0), None) trimmed = b if first_null is None else b[:first_null] decoded = trimmed.decode(encoding) return decoded #: Description. A free-text field up to 256 characters long. - self.description : str = sanitize_bytes(unpacked[0]) - + self.description: str = sanitize_bytes(unpacked[0]) + #: Originator. Usually the name of the encoding application, sometimes #: an artist name. - self.originator : str = sanitize_bytes(unpacked[1]) - + self.originator: str = sanitize_bytes(unpacked[1]) + #: A unique identifier for the file, a serial number. - self.originator_ref : str = sanitize_bytes(unpacked[2]) - + self.originator_ref: str = sanitize_bytes(unpacked[2]) + #: Date of the recording, in the format YYYY-MM-DD. - self.originator_date : str = sanitize_bytes(unpacked[3]) - + self.originator_date: str = sanitize_bytes(unpacked[3]) + #: Time of the recording, in the format HH:MM:SS. - self.originator_time : str = sanitize_bytes(unpacked[4]) - - #: The sample offset of the start, usually relative - #: to midnight. - self.time_reference : int = unpacked[5] - + self.originator_time: str = sanitize_bytes(unpacked[4]) + + #: The sample offset of the start, usually relative + #: to midnight. + self.time_reference: int = unpacked[5] + #: A variable-length text field containing a list of processes and #: and conversions performed on the file. - self.coding_history : str = sanitize_bytes(bext_data[rest_starts:]) - - #: BEXT version. - self.version : int = unpacked[6] - - #: SMPTE 330M UMID of this audio file, 64 bytes are allocated though the UMID - #: may only be 32 bytes long. - self.umid : Optional[bytes] = None - + self.coding_history: str = sanitize_bytes(bext_data[rest_starts:]) + + #: BEXT version. + self.version: int = unpacked[6] + + #: SMPTE 330M UMID of this audio file, 64 bytes are allocated though + #: the UMID may only be 32 bytes long. + self.umid: Optional[bytes] = None + #: EBU R128 Integrated loudness, in LUFS. - self.loudness_value : Optional[float] = None - + self.loudness_value: Optional[float] = None + #: EBU R128 Loudness range, in LUFS. - self.loudness_range : Optional[float] = None - + self.loudness_range: Optional[float] = None + #: True peak level, in dBFS TP - self.max_true_peak : Optional[float] = None - + self.max_true_peak: Optional[float] = None + #: EBU R128 Maximum momentary loudness, in LUFS - self.max_momentary_loudness : Optional[float] = None - + self.max_momentary_loudness: Optional[float] = None + #: EBU R128 Maximum short-term loudness, in LUFS. - self.max_shortterm_loudness : Optional[float] = None + self.max_shortterm_loudness: Optional[float] = None if self.version > 0: self.umid = unpacked[7] @@ -84,7 +88,7 @@ class WavBextReader: # umid_parsed = UMIDParser(self.umid) # umid_str = umid_parsed.basic_umid_to_str() # else: - + umid_str = None return {'description': self.description, diff --git a/wavinfo/wave_cues_reader.py b/wavinfo/wave_cues_reader.py index 10c5a5a..9d6cfac 100644 --- a/wavinfo/wave_cues_reader.py +++ b/wavinfo/wave_cues_reader.py @@ -1,26 +1,25 @@ """ Cues metadata -For reference on implementation of cues and related metadata see: +For reference on implementation of cues and related metadata see: August 1991, "Multimedia Programming Interface and Data Specifications 1.0", IBM Corporation and Microsoft Corporation https://www.aelius.com/njh/wavemetatools/doc/riffmci.pdf """ from dataclasses import dataclass -import encodings from .riff_parser import ChunkDescriptor from struct import unpack, calcsize from typing import Optional, Tuple, NamedTuple, List, Dict, Any, Generator -#: Country Codes used in the RIFF standard to resolve locale. These codes +#: Country Codes used in the RIFF standard to resolve locale. These codes #: appear in CSET and LTXT metadata. CountryCodes = """000 None Indicated 001,USA 002,Canada 003,Latin America -030,Greece +030,Greece 031,Netherlands 032,Belgium 033,France @@ -44,50 +43,50 @@ CountryCodes = """000 None Indicated 090,Turkey 351,Portugal 352,Luxembourg -354,Iceland +354,Iceland 358,Finland""" -#: Language and Dialect codes used in the RIFF standard to resolve native +#: Language and Dialect codes used in the RIFF standard to resolve native #: language of text fields. These codes appear in CSET and LTXT metadata. LanguageDialectCodes = """0 0 None Indicated 1,1,Arabic 2,1,Bulgarian 3,1,Catalan -4,1,Traditional Chinese -4,2,Simplified Chinese +4,1,Traditional Chinese +4,2,Simplified Chinese 5,1,Czech 6,1,Danish 7,1,German -7,2,Swiss German +7,2,Swiss German 8,1,Greek 9,1,US English -9,2,UK English +9,2,UK English 10,1,Spanish -10,2,Spanish Mexican +10,2,Spanish Mexican 11,1,Finnish 12,1,French -12,2,Belgian French -12,3,Canadian French -12,4,Swiss French -13,1,Hebrew +12,2,Belgian French +12,3,Canadian French +12,4,Swiss French +13,1,Hebrew 14,1,Hungarian 15,1,Icelandic 16,1,Italian -16,2,Swiss Italian +16,2,Swiss Italian 17,1,Japanese 18,1,Korean 19,1,Dutch -19,2,Belgian Dutch -20,1,Norwegian - Bokmal -20,2,Norwegian - Nynorsk +19,2,Belgian Dutch +20,1,Norwegian - Bokmal +20,2,Norwegian - Nynorsk 21,1,Polish -22,1,Brazilian Portuguese -22,2,Portuguese -23,1,Rhaeto-Romanic +22,1,Brazilian Portuguese +22,2,Portuguese +23,1,Rhaeto-Romanic 24,1,Romanian 25,1,Russian -26,1,Serbo-Croatian (Latin) -26,2,Serbo-Croatian (Cyrillic) +26,1,Serbo-Croatian (Latin) +26,2,Serbo-Croatian (Cyrillic) 27,1,Slovak 28,1,Albanian 29,1,Swedish @@ -101,10 +100,10 @@ class CueEntry(NamedTuple): """ A ``cue`` element structure. """ - #: Cue "name" or id number + #: Cue "name" or id number name: int - #: Cue position, as a frame count in the play order of the WAVE file. In - #: principle this can be affected by playlists and ``wavl`` chunk + #: Cue position, as a frame count in the play order of the WAVE file. In + #: principle this can be affected by playlists and ``wavl`` chunk #: placement. position: int chunk_id: bytes @@ -113,7 +112,7 @@ class CueEntry(NamedTuple): sample_offset: int Format = " int: return calcsize(cls.Format) @@ -121,13 +120,13 @@ class CueEntry(NamedTuple): @classmethod def read(cls, data: bytes) -> 'CueEntry': assert len(data) == cls.format_size(), \ - (f"cue data size incorrect, expected {calcsize(cls.Format)}" + (f"cue data size incorrect, expected {calcsize(cls.Format)} " "found {len(data)}") parsed = unpack(cls.Format, data) return cls(name=parsed[0], position=parsed[1], chunk_id=parsed[2], - chunk_start=parsed[3], block_start=parsed[4], + chunk_start=parsed[3], block_start=parsed[4], sample_offset=parsed[5]) @@ -170,8 +169,8 @@ class RangeLabel(NamedTuple): fallback_encoding = f"cp{data[6]}" return cls(name=parsed[0], length=parsed[1], purpose=parsed[2], - country=parsed[3], language=parsed[4], - dialect=parsed[5], codepage=parsed[6], + country=parsed[3], language=parsed[4], + dialect=parsed[5], codepage=parsed[6], text=text_data.decode(fallback_encoding)) @@ -192,19 +191,19 @@ class WavCuesReader: @classmethod def read_all(cls, f, - cues: Optional[ChunkDescriptor], - labls: List[ChunkDescriptor], - ltxts: List[ChunkDescriptor], - notes: List[ChunkDescriptor], - fallback_encoding: str) -> 'WavCuesReader': - + cues: Optional[ChunkDescriptor], + labls: List[ChunkDescriptor], + ltxts: List[ChunkDescriptor], + notes: List[ChunkDescriptor], + fallback_encoding: str) -> 'WavCuesReader': + cue_list = [] if cues is not None: cues_data = cues.read_data(f) assert len(cues_data) >= 4, "cue metadata too short" offset = calcsize(" Generator[Tuple[int, int], None, None]: """ - Iterate through each cue. + Iterate through each cue. :yields: the cue's ``name`` and ``sample_offset`` """ @@ -244,17 +243,17 @@ class WavCuesReader: yield (cue.name, cue.sample_offset) def label_and_note(self, cue_ident: int) -> Tuple[Optional[str], - Optional[str]]: + Optional[str]]: """ Get the label and note (extended comment) for a cue. :param cue_ident: the cue's name, its unique identifying number :returns: a tuple of the the cue's label (if present) and note (if - present) + present) """ - label = next((l.text for l in self.labels - if l.name == cue_ident), None) - note = next((n.text for n in self.notes + label = next((label.text for label in self.labels + if label.name == cue_ident), None) + note = next((n.text for n in self.notes if n.name == cue_ident), None) return (label, note) @@ -265,7 +264,7 @@ class WavCuesReader: :param cue_ident: the cue's name, its unique identifying number :returns: the length of the marker's range, or `None` """ - return next((r.length for r in self.ranges + return next((r.length for r in self.ranges if r.name == cue_ident), None) def to_dict(self) -> Dict[str, Any]: @@ -280,15 +279,8 @@ class WavCuesReader: if label is not None: retval[n]['label'] = label if note is not None: - retval[n]['note'] = note + retval[n]['note'] = note if r is not None: - retval[n]['length'] = r - - return retval - # return dict(cues=[c._asdict() for c in self.cues], - # labels=[l._asdict() for l in self.labels], - # ranges=[r._asdict() for r in self.ranges], - # notes=[n._asdict() for n in self.notes]) - - + retval[n]['length'] = r + return retval diff --git a/wavinfo/wave_dbmd_reader.py b/wavinfo/wave_dbmd_reader.py index cfd19ca..e255420 100644 --- a/wavinfo/wave_dbmd_reader.py +++ b/wavinfo/wave_dbmd_reader.py @@ -1,7 +1,7 @@ """ Reading Dolby Bitstream Metadata -Unless otherwise stated, all § references here are to +Unless otherwise stated, all § references here are to `EBU Tech 3285 Supplement 6`_. .. _EBU Tech 3285 Supplement 6: https://tech.ebu.ch/docs/tech/tech3285s6.pdf @@ -10,10 +10,11 @@ Unless otherwise stated, all § references here are to from enum import IntEnum, Enum from struct import unpack from dataclasses import dataclass, asdict -from typing import List, Optional, Tuple, Any, Union +from typing import List, Tuple, Any, Union from io import BytesIO + class SegmentType(IntEnum): """ Metadata segment type. @@ -31,7 +32,7 @@ class SegmentType(IntEnum): DolbyAtmosSupplemental = 0xa @classmethod - def _missing_(cls,val): + def _missing_(cls, val): return val @@ -39,11 +40,11 @@ class SegmentType(IntEnum): class DolbyDigitalPlusMetadata: """ *Dolby Digital Plus* is Dolby's brand for multichannel surround - on discrete formats that aren't AC-3 (Dolby Digital) or Dolby E. This - metadata segment is present in ADM wave files created with a Dolby Atmos + on discrete formats that aren't AC-3 (Dolby Digital) or Dolby E. This + metadata segment is present in ADM wave files created with a Dolby Atmos Production Suite. - Where an AC-3 bitstream can contain multiple programs, a Dolby Digital + Where an AC-3 bitstream can contain multiple programs, a Dolby Digital Plus bitstream will only contain one program. """ @@ -77,7 +78,6 @@ class DolbyDigitalPlusMetadata: MUTE = 0b111 "-∞ dB" - class DolbySurroundEncodingMode(Enum): """ Dolby surround endcoding mode. @@ -87,7 +87,6 @@ class DolbyDigitalPlusMetadata: NOT_IN_USE = 0b01 NOT_INDICATED = 0b00 - class BitStreamMode(Enum): """ Dolby Digital Plus `bsmod` field @@ -122,7 +121,6 @@ class DolbyDigitalPlusMetadata: should be interpreted as karaoke. """ - class AudioCodingMode(Enum): """ Dolby Digital Plus `acmod` field @@ -144,7 +142,6 @@ class DolbyDigitalPlusMetadata: CH_ORD_3_2 = 0b111 "LCR + LR surround" - class CenterDownMixLevel(Enum): """ § 4.3.3.1 @@ -152,16 +149,15 @@ class DolbyDigitalPlusMetadata: DOWN_3DB = 0b00 "Attenuate 3 dB" - + DOWN_45DB = 0b01 "Attenuate 4.5 dB" - + DOWN_6DB = 0b10 "Attenuate 6 dB" RESERVED = 0b11 - class SurroundDownMixLevel(Enum): """ Dolby Digital Plus `surmixlev` field @@ -172,7 +168,6 @@ class DolbyDigitalPlusMetadata: MUTE = 0b10 RESERVED = 0b11 - class LanguageCode(int): """ § 4.3.4.1 @@ -181,21 +176,18 @@ class DolbyDigitalPlusMetadata: """ pass - class MixLevel(int): """ § 4.3.6.2 """ pass - class DialnormLevel(int): """ § 4.3.4.4 """ pass - class RoomType(Enum): """ `roomtyp` 4.3.6.3 @@ -205,11 +197,10 @@ class DolbyDigitalPlusMetadata: SMALL_ROOM_FLAT_CURVE = 0b10 RESERVED = 0b11 - class PreferredDownMixMode(Enum): """ - Indicates the creating engineer's preference of what the receiver should - downmix. + Indicates the creating engineer's preference of what the receiver + should downmix. § 4.3.8.1 """ NOT_INDICATED = 0b00 @@ -217,7 +208,6 @@ class DolbyDigitalPlusMetadata: STEREO = 0b10 PRO_LOGIC_2 = 0b11 - class SurroundEXMode(IntEnum): """ Dolby Surround-EX mode. @@ -228,7 +218,6 @@ class DolbyDigitalPlusMetadata: SEX = 0b10 PRO_LOGIC_2 = 0b11 - class HeadphoneMode(IntEnum): """ `dheadphonmod` § 4.3.9.2 @@ -238,12 +227,10 @@ class DolbyDigitalPlusMetadata: DOLBY_HEADPHONE = 0b10 RESERVED = 0b11 - class ADConverterType(Enum): STANDARD = 0 HDCD = 1 - class StreamDependency(Enum): """ Encodes `ddplus_info1.stream_type` field § 4.3.12.1 @@ -254,7 +241,6 @@ class DolbyDigitalPlusMetadata: INDEPENDENT_FROM_DOLBY_DIGITAL = 2 RESERVED = 3 - class RFCompressionProfile(Enum): """ `compr1` RF compression profile @@ -267,7 +253,7 @@ class DolbyDigitalPlusMetadata: MUSIC_LIGHT = 4 SPEECH = 5 - #: Program ID number, this identifies the program in a multi-program + #: Program ID number, this identifies the program in a multi-program #: element. § 4.3.1 program_id: int @@ -317,13 +303,13 @@ class DolbyDigitalPlusMetadata: #: LoRo preferred center downmix level loro_center_downmix_level: DownMixLevelToken - + #: LoRo preferred surround downmix level loro_surround_downmix_level: DownMixLevelToken #: Preferred downmix mode downmix_mode: PreferredDownMixMode - + #: LtRt preferred center downmix level ltrt_center_downmix_level: DownMixLevelToken @@ -332,20 +318,20 @@ class DolbyDigitalPlusMetadata: #: Surround-EX mode surround_ex_mode: SurroundEXMode - + #: Dolby Headphone mode dolby_headphone_encoded: HeadphoneMode ad_converter_type: ADConverterType compression_profile: RFCompressionProfile dynamic_range: RFCompressionProfile - + #: Indicates if this stream can be decoded independently or not stream_dependency: StreamDependency #: Data rate of this bitstream in kilobits per second datarate_kbps: int - + @staticmethod def load(buffer: bytes): assert len(buffer) == 96, "Dolby Digital Plus segment incorrect size, " @@ -363,12 +349,14 @@ class DolbyDigitalPlusMetadata: pass def surround_config(b): - return DolbyDigitalPlusMetadata.CenterDownMixLevel(b & 0x30 >> 4), \ - DolbyDigitalPlusMetadata.SurroundDownMixLevel(b & 0xc >> 2), \ + return ( + DolbyDigitalPlusMetadata.CenterDownMixLevel(b & 0x30 >> 4), + DolbyDigitalPlusMetadata.SurroundDownMixLevel(b & 0xc >> 2), DolbyDigitalPlusMetadata.DolbySurroundEncodingMode(b & 0x3) + ) def dialnorm_info(b): - return (b & 0x80) > 0 , b & 0x40 > 0, b & 0x20 > 0, \ + return (b & 0x80) > 0, b & 0x40 > 0, b & 0x20 > 0, \ DolbyDigitalPlusMetadata.DialnormLevel(b & 0x1f) def langcod(b) -> int: @@ -379,22 +367,23 @@ class DolbyDigitalPlusMetadata: DolbyDigitalPlusMetadata.MixLevel(b & 0x7c >> 2), \ DolbyDigitalPlusMetadata.RoomType(b & 0x3) - # loro_center_downmix_level, loro_surround_downmix_level + # loro_center_downmix_level, loro_surround_downmix_level def ext_bsi1_word1(b): return DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x38 >> 3), \ DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x7) # downmix_mode, ltrt_center_downmix_level, ltrt_surround_downmix_level def ext_bsi1_word2(b): - return DolbyDigitalPlusMetadata.PreferredDownMixMode(b & 0xC0 >> 6), \ + return DolbyDigitalPlusMetadata\ + .PreferredDownMixMode(b & 0xC0 >> 6), \ DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x38 >> 3), \ DolbyDigitalPlusMetadata.DownMixLevelToken(b & 0x7) - #surround_ex_mode, dolby_headphone_encoded, ad_converter_type + # surround_ex_mode, dolby_headphone_encoded, ad_converter_type def ext_bsi2_word1(b): return DolbyDigitalPlusMetadata.SurroundEXMode(b & 0x60 >> 5), \ DolbyDigitalPlusMetadata.HeadphoneMode(b & 0x18 >> 3), \ - DolbyDigitalPlusMetadata.ADConverterType( b & 0x4 >> 2) + DolbyDigitalPlusMetadata.ADConverterType(b & 0x4 >> 2) def ddplus_reserved2(_): pass @@ -403,7 +392,7 @@ class DolbyDigitalPlusMetadata: return DolbyDigitalPlusMetadata.RFCompressionProfile(b) def dynrng1(b): - DolbyDigitalPlusMetadata.RFCompressionProfile(b) + DolbyDigitalPlusMetadata.RFCompressionProfile(b) def ddplus_reserved3(_): pass @@ -423,14 +412,19 @@ class DolbyDigitalPlusMetadata: pid = program_id(buffer[0]) lfe_on, bitstream_mode, audio_coding_mode = program_info(buffer[1]) ddplus_reserved1(buffer[2:2]) - center_downmix_level, surround_downmix_level, dolby_surround_encoded = surround_config(buffer[4]) - langcode_present, copyright_bitstream, original_bitstream, dialnorm = dialnorm_info(buffer[5]) + center_downmix_level, surround_downmix_level, \ + dolby_surround_encoded = surround_config(buffer[4]) + langcode_present, copyright_bitstream, original_bitstream, \ + dialnorm = dialnorm_info(buffer[5]) langcode = langcod(buffer[6]) prod_info_exists, mixlevel, roomtype = audio_prod_info(buffer[7]) - loro_center_downmix_level, loro_surround_downmix_level = ext_bsi1_word1(buffer[8]) - downmix_mode, ltrt_center_downmix_level, ltrt_surround_downmix_level = ext_bsi1_word2(buffer[9]) - surround_ex_mode, dolby_headphone_encoded, ad_converter_type = ext_bsi2_word1(buffer[10]) + loro_center_downmix_level, \ + loro_surround_downmix_level = ext_bsi1_word1(buffer[8]) + downmix_mode, ltrt_center_downmix_level, \ + ltrt_surround_downmix_level = ext_bsi1_word2(buffer[9]) + surround_ex_mode, dolby_headphone_encoded, \ + ad_converter_type = ext_bsi2_word1(buffer[10]) ddplus_reserved2(buffer[11:14]) compression = compr1(buffer[14]) @@ -441,33 +435,33 @@ class DolbyDigitalPlusMetadata: data_rate = datarate(buffer[25:27]) reserved(buffer[27:69]) - return DolbyDigitalPlusMetadata(program_id=pid, - lfe_on=lfe_on, - bitstream_mode=bitstream_mode, - audio_coding_mode=audio_coding_mode, - center_downmix_level=center_downmix_level, - surround_downmix_level=surround_downmix_level, - dolby_surround_encoded=dolby_surround_encoded, - langcode_present=langcode_present, - copyright_bitstream=copyright_bitstream, - original_bitstream=original_bitstream, - dialnorm=dialnorm, - langcode=langcode, - prod_info_exists=prod_info_exists, - mixlevel=mixlevel, - roomtype=roomtype, - loro_center_downmix_level=loro_center_downmix_level, - loro_surround_downmix_level=loro_surround_downmix_level, - downmix_mode=downmix_mode, - ltrt_center_downmix_level=ltrt_center_downmix_level, - ltrt_surround_downmix_level=ltrt_surround_downmix_level, - surround_ex_mode=surround_ex_mode, - dolby_headphone_encoded=dolby_headphone_encoded, - ad_converter_type=ad_converter_type, - compression_profile=compression, - dynamic_range=dynamic_range, - stream_dependency=stream_info, - datarate_kbps=data_rate) + return DolbyDigitalPlusMetadata( + program_id=pid, lfe_on=lfe_on, + bitstream_mode=bitstream_mode, + audio_coding_mode=audio_coding_mode, + center_downmix_level=center_downmix_level, + surround_downmix_level=surround_downmix_level, + dolby_surround_encoded=dolby_surround_encoded, + langcode_present=langcode_present, + copyright_bitstream=copyright_bitstream, + original_bitstream=original_bitstream, + dialnorm=dialnorm, + langcode=langcode, + prod_info_exists=prod_info_exists, + mixlevel=mixlevel, + roomtype=roomtype, + loro_center_downmix_level=loro_center_downmix_level, + loro_surround_downmix_level=loro_surround_downmix_level, + downmix_mode=downmix_mode, + ltrt_center_downmix_level=ltrt_center_downmix_level, + ltrt_surround_downmix_level=ltrt_surround_downmix_level, + surround_ex_mode=surround_ex_mode, + dolby_headphone_encoded=dolby_headphone_encoded, + ad_converter_type=ad_converter_type, + compression_profile=compression, + dynamic_range=dynamic_range, + stream_dependency=stream_info, + datarate_kbps=data_rate) @dataclass @@ -486,7 +480,7 @@ class DolbyAtmosMetadata: NOT_INDICATED = 0x04 tool_name: str - tool_version: Tuple[int,int,int] + tool_version: Tuple[int, int, int] warp_mode: WarpMode SEGMENT_LENGTH = 248 @@ -494,8 +488,10 @@ class DolbyAtmosMetadata: @classmethod def load(cls, data: bytes): - assert len(data) == cls.SEGMENT_LENGTH, "DolbyAtmosMetadata segment "\ - "is incorrect length, expected %i actual was %i" % (cls.SEGMENT_LENGTH, len(data)) + + assert len(data) == cls.SEGMENT_LENGTH + # (f"DolbyAtmosMetadata segment is incorrect length, " + # f"expected {cls.SEGMENT_LENGTH} actual was {len(data)}") h = BytesIO(data) @@ -512,17 +508,20 @@ class DolbyAtmosMetadata: a_val = unpack("B", h.read(1))[0] warp_mode = a_val & 0x7 - return DolbyAtmosMetadata(tool_name=toolname, - tool_version=(major, minor, fix), warp_mode=DolbyAtmosMetadata.WarpMode(warp_mode)) + return DolbyAtmosMetadata(tool_name=toolname, + tool_version=(major, minor, fix), + warp_mode=DolbyAtmosMetadata + .WarpMode(warp_mode)) + - @dataclass class DolbyAtmosSupplementalMetadata: """ Dolby Atmos supplemental metadata segment. - https://github.com/DolbyLaboratories/dbmd-atmos-parser/blob/master/dbmd_atmos_parse/src/dbmd_atmos_parse.c - """ + https://github.com/DolbyLaboratories/dbmd-atmos-parser/blob/ + master/dbmd_atmos_parse/src/dbmd_atmos_parse.c + """ class BinauralRenderMode(Enum): BYPASS = 0x00 @@ -531,12 +530,10 @@ class DolbyAtmosSupplementalMetadata: MID = 0x03 NOT_INDICATED = 0x04 - object_count: int render_modes: List['DolbyAtmosSupplementalMetadata.BinauralRenderMode'] trim_modes: List[int] - MAGIC = 0xf8726fbd TRIM_CONFIG_COUNT = 9 @@ -552,15 +549,15 @@ class DolbyAtmosSupplementalMetadata: object_count = unpack(" List[DolbyDigitalPlusMetadata]: """ Every valid Dolby Digital Plus metadata segment in the file. """ - return [x[2] for x in self.segment_list \ - if x[0] == SegmentType.DolbyDigitalPlus and x[1]] + return [x[2] for x in self.segment_list + if x[0] == SegmentType.DolbyDigitalPlus and x[1]] def dolby_atmos(self) -> List[DolbyAtmosMetadata]: """ Every valid Dolby Atmos metadata segment in the file. """ - return [x[2] for x in self.segment_list \ - if x[0] == SegmentType.DolbyAtmos and x[1]] + return [x[2] for x in self.segment_list + if x[0] == SegmentType.DolbyAtmos and x[1]] def dolby_atmos_supplemental(self) -> List[DolbyAtmosSupplementalMetadata]: """ Every valid Dolby Atmos Supplemental metadata segment in the file. """ - return [x[2] for x in self.segment_list \ - if x[0] == SegmentType.DolbyAtmosSupplemental and x[1]] + return [x[2] for x in self.segment_list + if x[0] == SegmentType.DolbyAtmosSupplemental and x[1]] def to_dict(self) -> dict: ddp = map(lambda x: asdict(x), self.dolby_digital_plus()) atmos = map(lambda x: asdict(x), self.dolby_atmos()) - #atmos_sup = map(lambda x: asdict(x), self.dolby_atmos_supplemental()) - + # atmos_sup = map(lambda x: asdict(x), self.dolby_atmos_supplemental()) + return dict(dolby_digital_plus=list(ddp), - dolby_atmos=list(atmos)) + dolby_atmos=list(atmos)) diff --git a/wavinfo/wave_info_reader.py b/wavinfo/wave_info_reader.py index dd4de12..34b9b96 100644 --- a/wavinfo/wave_info_reader.py +++ b/wavinfo/wave_info_reader.py @@ -2,6 +2,7 @@ from .riff_parser import parse_chunk, ListChunkDescriptor from typing import Optional + class WavInfoChunkReader: def __init__(self, f, encoding): @@ -9,47 +10,52 @@ class WavInfoChunkReader: f.seek(0) parsed_chunks = parse_chunk(f) + assert type(parsed_chunks) is ListChunkDescriptor - list_chunks = [chunk for chunk in parsed_chunks.children if type(chunk) is ListChunkDescriptor] + list_chunks = [chunk for chunk in parsed_chunks.children + if type(chunk) is ListChunkDescriptor] - self.info_chunk = next((chunk for chunk in list_chunks if chunk.signature == b'INFO'), None) + self.info_chunk = next((chunk for chunk in list_chunks + if chunk.signature == b'INFO'), None) #: 'ICOP' Copyright - self.copyright : Optional[str] = self._get_field(f, b'ICOP') + self.copyright: Optional[str] = self._get_field(f, b'ICOP') #: 'IPRD' Product - self.product : Optional[str]= self._get_field(f, b'IPRD') - self.album : Optional[str] = self.product + self.product: Optional[str] = self._get_field(f, b'IPRD') + self.album: Optional[str] = self.product #: 'IGNR' Genre - self.genre : Optional[str] = self._get_field(f, b'IGNR') + self.genre: Optional[str] = self._get_field(f, b'IGNR') #: 'ISBJ' Subject - self.subject : Optional[str] = self._get_field(f, b'ISBJ') + self.subject: Optional[str] = self._get_field(f, b'ISBJ') #: 'IART' Artist, composer, author - self.artist : Optional[str] = self._get_field(f, b'IART') + self.artist: Optional[str] = self._get_field(f, b'IART') #: 'ICMT' Comment - self.comment : Optional[str] = self._get_field(f, b'ICMT') + self.comment: Optional[str] = self._get_field(f, b'ICMT') #: 'ISFT' Software, encoding application - self.software : Optional[str] = self._get_field(f, b'ISFT') + self.software: Optional[str] = self._get_field(f, b'ISFT') #: 'ICRD' Created date - self.created_date : Optional[str] = self._get_field(f, b'ICRD') + self.created_date: Optional[str] = self._get_field(f, b'ICRD') #: 'IENG' Engineer - self.engineer : Optional[str] = self._get_field(f, b'IENG') + self.engineer: Optional[str] = self._get_field(f, b'IENG') #: 'ITCH' Technician - self.technician : Optional[str] = self._get_field(f, b'ITCH') + self.technician: Optional[str] = self._get_field(f, b'ITCH') #: 'IKEY' Keywords, keyword list - self.keywords : Optional[str] = self._get_field(f, b'IKEY') + self.keywords: Optional[str] = self._get_field(f, b'IKEY') #: 'INAM' Name, title - self.title : Optional[str] = self._get_field(f, b'INAM') + self.title: Optional[str] = self._get_field(f, b'INAM') #: 'ISRC' Source - self.source : Optional[str] = self._get_field(f, b'ISRC') + self.source: Optional[str] = self._get_field(f, b'ISRC') #: 'TAPE' Tape - self.tape : Optional[str] = self._get_field(f, b'TAPE') + self.tape: Optional[str] = self._get_field(f, b'TAPE') #: 'IARL' Archival Location - self.archival_location : Optional[str] = self._get_field(f, b'IARL') + self.archival_location: Optional[str] = self._get_field(f, b'IARL') #: 'ICSM' Commissioned - self.commissioned : Optional[str] = self._get_field(f, b'ICMS') + self.commissioned: Optional[str] = self._get_field(f, b'ICMS') def _get_field(self, f, field_ident) -> Optional[str]: - search = next(((chunk.start, chunk.length) for chunk in self.info_chunk.children if chunk.ident == field_ident), + search = next(((chunk.start, chunk.length) + for chunk in self.info_chunk.children + if chunk.ident == field_ident), None) if search is not None: @@ -59,7 +65,7 @@ class WavInfoChunkReader: else: return None - def to_dict(self) -> dict: #FIXME should be asdict + def to_dict(self) -> dict: # FIXME should be asdict """ A dictionary with all of the key/values read from the INFO scope. """ diff --git a/wavinfo/wave_ixml_reader.py b/wavinfo/wave_ixml_reader.py index 76ac4fc..5f3b4df 100644 --- a/wavinfo/wave_ixml_reader.py +++ b/wavinfo/wave_ixml_reader.py @@ -1,10 +1,16 @@ from lxml import etree as ET import io -from collections import namedtuple +# from collections import namedtuple from typing import Optional from enum import IntEnum +from typing import NamedTuple -IXMLTrack = namedtuple('IXMLTrack', ['channel_index', 'interleave_index', 'name', 'function']) + +class IXMLTrack(NamedTuple): + channel_index: int + interleave_index: int + name: str + function: str class SteinbergMetadata: @@ -29,7 +35,7 @@ class SteinbergMetadata: CINE_71 = 27 SDDS_70 = 24 SDDS_71 = 26 - MUSIC_60 = 21 #?? + MUSIC_60 = 21 # ?? MUSIC_61 = 23 ATMOS_712 = 33 ATMOS_504 = 35 @@ -72,7 +78,8 @@ class SteinbergMetadata: """ `AudioSpeakerArrangement` property """ - val = self.parsed.find("./ATTR_LIST/ATTR[NAME = 'AudioSpeakerArrangement']/VALUE") + val = self.parsed.find( + "./ATTR_LIST/ATTR[NAME = 'AudioSpeakerArrangement']/VALUE") if val is not None: return type(self).AudioSpeakerArrangement(int(val.text)) @@ -81,7 +88,8 @@ class SteinbergMetadata: """ AudioSampleFormatSize """ - val = self.parsed.find("./ATTR_LIST/ATTR[NAME = 'AudioSampleFormatSize']/VALUE") + val = self.parsed.find( + "./ATTR_LIST/ATTR[NAME = 'AudioSampleFormatSize']/VALUE") if val is not None: return int(val.text) @@ -90,7 +98,8 @@ class SteinbergMetadata: """ MediaCompany """ - val = self.parsed.find("./ATTR_LIST/ATTR[NAME = 'MediaCompany']/VALUE") + val = self.parsed.find( + "./ATTR_LIST/ATTR[NAME = 'MediaCompany']/VALUE") if val is not None: return val.text @@ -99,7 +108,8 @@ class SteinbergMetadata: """ MediaDropFrames """ - val = self.parsed.find("./ATTR_LIST/ATTR[NAME = 'MediaDropFrames']/VALUE") + val = self.parsed.find( + "./ATTR_LIST/ATTR[NAME = 'MediaDropFrames']/VALUE") if val is not None: return val.text == "1" @@ -108,7 +118,8 @@ class SteinbergMetadata: """ MediaDuration """ - val = self.parsed.find("./ATTR_LIST/ATTR[NAME = 'MediaDuration']/VALUE") + val = self.parsed.find( + "./ATTR_LIST/ATTR[NAME = 'MediaDuration']/VALUE") if val is not None: return float(val.text) @@ -145,6 +156,7 @@ class WavIXMLFormat: """ iXML recorder metadata. """ + def __init__(self, xml): """ Parse iXML. @@ -153,13 +165,13 @@ class WavIXMLFormat: self.source = xml xml_bytes = io.BytesIO(xml) parser = ET.XMLParser(recover=True) - self.parsed : ET.ElementTree = ET.parse(xml_bytes, parser=parser) + self.parsed: ET.ElementTree = ET.parse(xml_bytes, parser=parser) def _get_text_value(self, xpath) -> Optional[str]: e = self.parsed.find("./" + xpath) if e is not None: return e.text - else: + else: return None def xml_str(self) -> str: @@ -181,10 +193,13 @@ class WavIXMLFormat: """ for track in self.parsed.find("./TRACK_LIST").iter(): if track.tag == 'TRACK': - yield IXMLTrack(channel_index=track.xpath('string(CHANNEL_INDEX/text())'), - interleave_index=track.xpath('string(INTERLEAVE_INDEX/text())'), - name=track.xpath('string(NAME/text())'), - function=track.xpath('string(FUNCTION/text())')) + yield IXMLTrack( + channel_index=track.xpath('string(CHANNEL_INDEX/text())'), + interleave_index=track.xpath( + 'string(INTERLEAVE_INDEX/text())'), + name=track.xpath('string(NAME/text())'), + function=track.xpath('string(FUNCTION/text())') + ) @property def project(self) -> Optional[str]: @@ -201,7 +216,7 @@ class WavIXMLFormat: return self._get_text_value("SCENE") @property - def take(self) -> Optional[str]: + def take(self) -> Optional[str]: """ Take number. """ @@ -218,7 +233,8 @@ class WavIXMLFormat: def family_uid(self) -> Optional[str]: """ The globally-unique ID for this file family. This may be in the format - of a GUID, or an EBU Rec 9 source identifier, or some other dumb number. + of a GUID, or an EBU Rec 9 source identifier, or some other dumb + number. """ return self._get_text_value("FILE_SET/FAMILY_UID") @@ -240,11 +256,8 @@ class WavIXMLFormat: return None def to_dict(self): - return dict(track_list=list(map(lambda x: x._asdict(), self.track_list)), - project=self.project, - scene=self.scene, - take=self.take, - tape=self.tape, - family_uid=self.family_uid, - family_name=self.family_name - ) + return dict( + track_list=list(map(lambda x: x._asdict(), self.track_list)), + project=self.project, scene=self.scene, take=self.take, + tape=self.tape, family_uid=self.family_uid, + family_name=self.family_name) diff --git a/wavinfo/wave_reader.py b/wavinfo/wave_reader.py index 56a2f77..6aeb5d8 100644 --- a/wavinfo/wave_reader.py +++ b/wavinfo/wave_reader.py @@ -1,9 +1,7 @@ -#-*- coding: utf-8 -*- +# -*- coding: utf-8 -*- import struct import os -from collections import namedtuple - -from typing import Optional, Generator, Any +from typing import Optional, Generator, Any, NamedTuple import pathlib @@ -16,12 +14,21 @@ from .wave_dbmd_reader import WavDolbyMetadataReader from .wave_cues_reader import WavCuesReader #: Calculated statistics about the audio data. -WavDataDescriptor = namedtuple('WavDataDescriptor', 'byte_count frame_count') + + +class WavDataDescriptor(NamedTuple): + byte_count: int + frame_count: int + #: The format of the audio samples. -WavAudioFormat = namedtuple('WavAudioFormat', - ['audio_format', 'channel_count', 'sample_rate', - 'byte_rate', 'block_align', 'bits_per_sample']) +class WavAudioFormat(NamedTuple): + audio_format: int + channel_count: int + sample_rate: int + byte_rate: int + block_align: int + bits_per_sample: int class WavInfoReader: @@ -29,70 +36,70 @@ class WavInfoReader: Parse a WAV audio file for metadata. """ - def __init__(self, path, info_encoding='latin_1', bext_encoding='ascii'): + def __init__(self, f, info_encoding='latin_1', bext_encoding='ascii'): """ Create a new reader object. - :param path: - A pathlike object or IO to the wav file you wish to probe or a + :param path: + A pathlike object or IO to the wav file you wish to probe or a file handle to an open file. - :param info_encoding: + :param info_encoding: The text encoding of the ``INFO``, ``LABL`` and other RIFF-defined - metadata fields. + metadata fields. - :param bext_encoding: + :param bext_encoding: The text encoding to use when decoding the string fields of the Broadcast-WAV extension. Per EBU 3285 this is ASCII but this parameter is available to you if you encounter a weirdo. """ - + self.info_encoding = info_encoding self.bext_encoding = bext_encoding - + #: Wave audio data format. - self.fmt :Optional[WavAudioFormat] = None + self.fmt: Optional[WavAudioFormat] = None #: Statistics of the `data` section. - self.data :Optional[WavDataDescriptor] = None + self.data: Optional[WavDataDescriptor] = None #: Broadcast-Wave metadata. - self.bext :Optional[WavBextReader] = None + self.bext: Optional[WavBextReader] = None #: iXML metadata. - self.ixml :Optional[WavIXMLFormat] = None + self.ixml: Optional[WavIXMLFormat] = None #: ADM Audio Definiton Model metadata. - self.adm :Optional[WavADMReader]= None + self.adm: Optional[WavADMReader] = None #: Dolby bitstream metadata. - self.dolby :Optional[WavDolbyMetadataReader] = None + self.dolby: Optional[WavDolbyMetadataReader] = None #: RIFF INFO metadata. - self.info :Optional[WavInfoChunkReader]= None + self.info: Optional[WavInfoChunkReader] = None #: RIFF cues markers, labels, and notes. - self.cues :Optional[WavCuesReader] = None + self.cues: Optional[WavCuesReader] = None - if hasattr(path, 'read'): - self.get_wav_info(path) + if hasattr(f, 'read'): + self.get_wav_info(f) self.url = 'about:blank' - self.path = repr(path) - + self.path = repr(f) + else: - absolute_path = os.path.abspath(path) + absolute_path = os.path.abspath(f) #: `file://` url for the file. self.url: str = pathlib.Path(absolute_path).as_uri() self.path = absolute_path - - with open(path, 'rb') as f: + + with open(f, 'rb') as f: self.get_wav_info(f) - + def get_wav_info(self, wavfile): chunks = parse_chunk(wavfile) - assert type(chunks) is ListChunkDescriptor + assert type(chunks) is ListChunkDescriptor self.main_list = chunks.children wavfile.seek(0) @@ -100,15 +107,17 @@ class WavInfoReader: self.fmt = self._get_format(wavfile) self.bext = self._get_bext(wavfile, encoding=self.bext_encoding) self.ixml = self._get_ixml(wavfile) - self.adm = self._get_adm(wavfile) + self.adm = self._get_adm(wavfile) self.info = self._get_info(wavfile, encoding=self.info_encoding) self.dolby = self._get_dbmd(wavfile) self.cues = self._get_cue(wavfile) self.data = self._describe_data() - def _find_chunk_data(self, ident, from_stream, default_none=False) -> Optional[bytes]: - top_chunks = (chunk for chunk in self.main_list \ - if type(chunk) is ChunkDescriptor and chunk.ident == ident) + def _find_chunk_data(self, ident, from_stream, + default_none=False) -> Optional[bytes]: + top_chunks = (chunk for chunk in self.main_list + if type(chunk) is ChunkDescriptor and + chunk.ident == ident) chunk_descriptor = next(top_chunks, None) \ if default_none else next(top_chunks) @@ -117,19 +126,19 @@ class WavInfoReader: if chunk_descriptor else None def _find_list_chunk(self, signature) -> Optional[ListChunkDescriptor]: - top_chunks = (chunk for chunk in self.main_list \ - if type(chunk) is ListChunkDescriptor and \ - chunk.signature == signature) + top_chunks = (chunk for chunk in self.main_list + if type(chunk) is ListChunkDescriptor and + chunk.signature == signature) return next(top_chunks, None) def _describe_data(self): - data_chunk = next(c for c in self.main_list \ - if type(c) is ChunkDescriptor and c.ident == b'data') + data_chunk = next(c for c in self.main_list + if type(c) is ChunkDescriptor and c.ident == b'data') assert isinstance(self.fmt, WavAudioFormat) return WavDataDescriptor( - byte_count=data_chunk.length, + byte_count=data_chunk.length, frame_count=int(data_chunk.length / self.fmt.block_align)) def _get_format(self, f): @@ -150,8 +159,8 @@ class WavInfoReader: ) def _get_info(self, f, encoding): - finder = (chunk.signature for chunk in self.main_list \ - if type(chunk) is ListChunkDescriptor) + finder = (chunk.signature for chunk in self.main_list + if type(chunk) is ListChunkDescriptor) if b'INFO' in finder: return WavInfoChunkReader(f, encoding) @@ -176,9 +185,9 @@ class WavInfoReader: return WavIXMLFormat(ixml_data.rstrip(b'\0')) if ixml_data else None def _get_cue(self, f): - cue = next((cue_chunk for cue_chunk in self.main_list if \ - type(cue_chunk) is ChunkDescriptor and \ - cue_chunk.ident == b'cue '), None) + cue = next((cue_chunk for cue_chunk in self.main_list if + type(cue_chunk) is ChunkDescriptor and + cue_chunk.ident == b'cue '), None) adtl = self._find_list_chunk(b'adtl') labls = [] @@ -189,20 +198,21 @@ class WavInfoReader: ltxts = [c for c in adtl.children if c.ident == b'ltxt'] notes = [c for c in adtl.children if c.ident == b'note'] - return WavCuesReader.read_all(f, cue, labls, ltxts, notes, - fallback_encoding=self.info_encoding) + return WavCuesReader.read_all(f, cue, labls, ltxts, notes, + fallback_encoding=self.info_encoding) - def walk(self) -> Generator[str,str,Any]: #FIXME: this should probably be named "iter()" + # FIXME: this should probably be named "iter()" + def walk(self) -> Generator[str, str, Any]: """ Walk all of the available metadata fields. - + :yields: tuples of the *scope*, *key*, and *value* of each metadatum. The *scope* value will be one of "fmt", "data", "ixml", "bext", "info", "dolby", "cues" or "adm". """ - scopes = ('fmt', 'data', 'ixml', 'bext', 'info', 'adm', 'cues', - 'dolby') + scopes = ('fmt', 'data', 'ixml', 'bext', 'info', 'adm', 'cues', + 'dolby') for scope in scopes: if scope in ['fmt', 'data']: @@ -211,9 +221,12 @@ class WavInfoReader: yield scope, field, attr.__getattribute__(field) else: - dict = self.__getattribute__(scope).to_dict() if self.__getattribute__(scope) else {} + dict = self.__getattribute__(scope).to_dict( + ) if self.__getattribute__(scope) else {} for key in dict.keys(): yield scope, key, dict[key] - + def __repr__(self): - return 'WavInfoReader({}, {}, {})'.format(self.path, self.info_encoding, self.bext_encoding) + return 'WavInfoReader({}, {}, {})'.format(self.path, + self.info_encoding, + self.bext_encoding) diff --git a/wavinfo/wavfind.py b/wavinfo/wavfind.py index afb82a5..0b2aedf 100644 --- a/wavinfo/wavfind.py +++ b/wavinfo/wavfind.py @@ -9,23 +9,24 @@ import sys def main(): parser = OptionParser() - parser.usage = "wavfind [--scene=SCENE] [--take=TAKE] [--desc=DESC] +" + parser.usage = ("wavfind [--scene=SCENE] [--take=TAKE] [--desc=DESC] " + " +") primaries = OptionGroup(parser, title="Search Predicates", - description="Argument values can be globs, and are logically-AND'ed.") + description="Argument values can be globs, " + "and are logically-AND'ed.") + + primaries.add_option("--scene", + help='Search for this scene', + metavar='SCENE') - primaries.add_option("--scene", - help='Search for this scene', - metavar='SCENE') - primaries.add_option("--take", - help='Search for this take', - metavar='TAKE') + help='Search for this take', + metavar='TAKE') primaries.add_option("--desc", - help='Search descriptions', - metavar='DESC') - + help='Search descriptions', + metavar='DESC') (options, args) = parser.parse_args(sys.argv)