diff --git a/examples/wavinfo.ipynb b/examples/wavinfo.ipynb index 513b43d..b0b5d95 100644 --- a/examples/wavinfo.ipynb +++ b/examples/wavinfo.ipynb @@ -96,7 +96,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "WavBextFormat(description='dUBITS=12311804\\r\\ndSCENE=A101\\r\\ndTAKE=4\\r\\ndTAPE=18Y12M31\\r\\ndFRAMERATE=23.976ND\\r\\ndSPEED=023.976-NDF\\r\\ndTRK1=MKH516 A\\r\\ndTRK2=Boom\\r\\n', originator='Sound Dev: 702T S#GR1112089007', originator_ref='aa4CKtcd13Vk', originator_date='2018-12-31', originator_time='12:40:07', time_reference=2191709524, version=0, umid=None, loudness_value=0.0, loudness_range=0.0, max_true_peak=0.0, max_momentary_loudness=0.0, max_shortterm_loudness=0.0, coding_history='A=PCM,F=48000,W=24,M=stereo,R=48000,T=2 Ch\\r\\n')\n" + "\n" ] } ], @@ -106,7 +106,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 7, "metadata": {}, "outputs": [ { @@ -126,6 +126,50 @@ "print(len(regn_bin))" ] }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{ 'artist': 'Frank Bry',\n", + " 'comment': 'BULLET Impact Plastic LCD TV Screen Shatter Debris 2x',\n", + " 'copyright': '2018 Creative Sound Design, LLC (The Recordist Christmas '\n", + " '2018) www.therecordist.com',\n", + " 'created_date': '2018-11-15',\n", + " 'engineer': None,\n", + " 'genre': 'Bullets',\n", + " 'keywords': None,\n", + " 'product': 'The Recordist Christmas 2018',\n", + " 'software': 'Soundminer',\n", + " 'source': None,\n", + " 'tape': None,\n", + " 'title': None}\n" + ] + } + ], + "source": [ + "path = '../tests/test_files/BULLET Impact Plastic LCD TV Screen Shatter Debris 2x.wav'\n", + "\n", + "info = wavinfo.WavInfoReader(path)\n", + "\n", + "with open(path,'rb') as f:\n", + " chunk_tree = wavinfo.wave_parser.parse_chunk(f)\n", + " \n", + "pp.pprint(info.info.to_dict())\n", + "pp.pprint(info.bext.to_dict())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": null, diff --git a/tests/test_wave_parsing.py b/tests/test_wave_parsing.py index 29295d9..df98a57 100644 --- a/tests/test_wave_parsing.py +++ b/tests/test_wave_parsing.py @@ -74,8 +74,8 @@ class TestWaveInfo(TestCase): self.assertEqual( info.bext.originator_ref, '') # these don't always reflect the bext info - #self.assertEqual( info.bext.originator_date, ffprobe_info['format']['tags']['date'] ) - #self.assertEqual( info.bext.originator_time, ffprobe_info['format']['tags']['creation_time'] ) + # self.assertEqual( info.bext.originator_date, ffprobe_info['format']['tags']['date'] ) + # self.assertEqual( info.bext.originator_time, ffprobe_info['format']['tags']['creation_time'] ) self.assertEqual( info.bext.time_reference, int(ffprobe_info['format']['tags']['time_reference']) ) if 'coding_history' in ffprobe_info['format']['tags']: diff --git a/wavinfo/wave_bext_reader.py b/wavinfo/wave_bext_reader.py new file mode 100644 index 0000000..cf0166a --- /dev/null +++ b/wavinfo/wave_bext_reader.py @@ -0,0 +1,86 @@ +import struct + +class WavBextReader: + def __init__(self,bext_data,encoding): + # description[256] + + # originator[32] + # originatorref[32] + # originatordate[10] "YYYY:MM:DD" + # originatortime[8] "HH:MM:SS" + # lowtimeref U32 + # hightimeref U32 + # version U16 + # umid[64] + # + # EBU 3285 fields + # loudnessvalue S16 (in LUFS*100) + # loudnessrange S16 (in LUFS*100) + # maxtruepeak S16 (in dbTB*100) + # maxmomentaryloudness S16 (LUFS*100) + # maxshorttermloudness S16 (LUFS*100) + # reserved[180] + # codinghistory [] + if bext_data is None: + return None + + packstring = "<256s"+ "32s" + "32s" + "10s" + "8s" + "QH" + "64s" + "hhhhh" + "180s" + + rest_starts = struct.calcsize(packstring) + unpacked = struct.unpack(packstring, bext_data[:rest_starts]) + + def sanatize_bytes(bytes): + first_null = next( (index for index, byte in enumerate(bytes) if byte == 0 ), None ) + if first_null is not None: + trimmed = bytes[:first_null] + else: + trimmed = bytes + + decoded = trimmed.decode(encoding) + return decoded + + bext_version = unpacked[6] + if bext_version > 0: + self.umid = unpacked[6] + else: + self.umid = None + + if bext_version > 1: + self.loudness_value = unpacked[8] / 100.0, + self.loudness_range = unpacked[9] / 100.0 + self.max_true_peak = unpacked[10] / 100.0 + self.max_momentary_loudness = unpacked[11] / 100.0 + self.max_shortterm_loudness = unpacked[12] / 100.0 + else: + self.loudness_value = None + self.loudness_range = None + self.max_true_peak = None + self.max_momentary_loudness = None + self.max_shortterm_loudness = None + + self.description = sanatize_bytes(unpacked[0]) + self.originator = sanatize_bytes(unpacked[1]) + self.originator_ref = sanatize_bytes(unpacked[2]) + self.originator_date = sanatize_bytes(unpacked[3]) + self.originator_time = sanatize_bytes(unpacked[4]) + self.time_reference = unpacked[5] + self.version = unpacked[6] + self.coding_history = sanatize_bytes(bext_data[rest_starts:]) + + + def to_dict(self): + return {'description': self.description, + 'originator': self.originator, + 'originator_ref': self.originator_ref, + 'originator_date': self.originator_date, + 'originator_time': self.originator_time, + 'time_reference': self.time_reference, + 'version': self.version, + 'coding_history': self.coding_history, + 'loudness_value': self.loudness_value, + 'loudness_range': self.loudness_range, + 'max_true_peak': self.max_true_peak, + 'max_momentary_loudness': self.max_momentary_loudness, + 'max_shortterm_loudness': self.max_shortterm_loudness + } + diff --git a/wavinfo/wave_info_reader.py b/wavinfo/wave_info_reader.py new file mode 100644 index 0000000..570d9a7 --- /dev/null +++ b/wavinfo/wave_info_reader.py @@ -0,0 +1,64 @@ + +from .wave_parser import parse_chunk, ListChunkDescriptor + +class WavInfoChunkReader: + + def __init__(self, f, encoding): + self.encoding = encoding + + f.seek(0) + parsed_chunks = parse_chunk(f) + + list_chunks = [chunk for chunk in parsed_chunks.children \ + if type(chunk) is ListChunkDescriptor] + + self.info_chunk = next((chunk for chunk in list_chunks \ + if chunk.signature == b'INFO'), None) + + self.copyright = self._get_field(f,b'ICOP') + self.product = self._get_field(f,b'IPRD') + self.genre = self._get_field(f,b'IGNR') + self.artist = self._get_field(f,b'IART') + self.comment = self._get_field(f,b'ICMT') + self.software = self._get_field(f,b'ISFT') + self.created_date = self._get_field(f,b'ICRD') + self.engineer = self._get_field(f,b'IENG') + self.keywords = self._get_field(f,b'IKEY') + self.title = self._get_field(f,b'INAM') + self.source = self._get_field(f,b'ISRC') + self.tape = self._get_field(f,b'TAPE') + + + def _get_field(self, f, field_ident): + + search = next( ( (chunk.start, chunk.length) for chunk in self.info_chunk.children \ + if chunk.ident == field_ident ), None) + + if search is not None: + f.seek(search[0]) + data = f.read(search[1]) + return data.decode(self.encoding).rstrip('\0') + else: + return None + + + def to_dict(self): + return {'copyright': self.copyright, + 'product': self.product, + 'genre': self.genre, + 'artist': self.artist, + 'comment': self.comment, + 'software': self.software, + 'created_date': self.created_date, + 'engineer': self.engineer, + 'keywords': self.keywords, + 'title': self.title, + 'source': self.source, + 'tape': self.tape + } + + + + + + diff --git a/wavinfo/wave_parser.py b/wavinfo/wave_parser.py index 5d59bb0..dfa65b7 100644 --- a/wavinfo/wave_parser.py +++ b/wavinfo/wave_parser.py @@ -3,30 +3,40 @@ import struct from collections import namedtuple -ListChunkDescriptor = namedtuple('ListChunkDescriptor' , 'signature children') +class ListChunkDescriptor(namedtuple('ListChunkDescriptor' , 'signature children')): + + def find(chunk_path): + if len(chunk_path) > 1: + for chunk in self.children: + if type(chunk) is ListChunkDescriptor and \ + chunk.signature is chunk_path[0]: + return chunk.find(chunk_path[1:]) + else: + for chunk in self.children: + if type(chunk) is ChunkDescriptor and \ + chunk.ident is chunk_path[0]: + return chunk + class ChunkDescriptor(namedtuple('ChunkDescriptor', 'ident start length') ): def read_data(self, from_stream): from_stream.seek(self.start) return from_stream.read(self.length) - def parse_list_chunk(stream, length): - children = [] - start = stream.tell() signature = stream.read(4) + children = [] while (stream.tell() - start) < length: children.append(parse_chunk(stream)) return ListChunkDescriptor(signature=signature, children=children) - def parse_chunk(stream): ident = stream.read(4) - if len(ident) != 4: + if len(ident) != 4: return sizeb = stream.read(4) @@ -47,11 +57,3 @@ def parse_chunk(stream): - - - - - - - - diff --git a/wavinfo/wave_reader.py b/wavinfo/wave_reader.py index 7978324..6147004 100644 --- a/wavinfo/wave_reader.py +++ b/wavinfo/wave_reader.py @@ -4,6 +4,8 @@ from collections import namedtuple from .wave_parser import parse_chunk, ChunkDescriptor, ListChunkDescriptor from .wave_ixml_reader import WavIXMLFormat +from .wave_bext_reader import WavBextReader +from .wave_info_reader import WavInfoChunkReader WavDataDescriptor = namedtuple('WavDataDescriptor','byte_count frame_count') @@ -22,7 +24,20 @@ class WavInfoReader(): """ - def __init__(self, path): + def __init__(self, path, info_encoding='latin_1', bext_encoding='ascii'): + """ + Parse a WAV audio file for metadata. + + * `path`: A filesystem path to the wav file you wish to probe. + + * `info_encoding`: The text encoding of the INFO metadata fields. + `latin_1`/Win CP1252 has always been a pretty good guess for this. + + * `bext_encoding`: The text encoding to use when decoding the string + fields of the Broadcast-WAV extension. Per EBU 3285 this is ASCII + but this parameter is available to you if you encounter a werido. + + """ with open(path, 'rb') as f: chunks = parse_chunk(f) @@ -30,9 +45,9 @@ class WavInfoReader(): f.seek(0) self.fmt = self._get_format(f) - self.bext = self._get_bext(f) + self.bext = self._get_bext(f, encoding=bext_encoding) self.ixml = self._get_ixml(f) - + self.info = self._get_info(f, encoding=info_encoding) self.data = self._describe_data(f) def _find_chunk_data(self, ident, from_stream, default_none=False): @@ -57,7 +72,6 @@ class WavInfoReader(): frame_count= int(data_chunk.length / self.fmt.block_align)) - def _get_format(self,f): fmt_data = self._find_chunk_data(b'fmt ',f) @@ -78,92 +92,28 @@ class WavInfoReader(): #0x0006 WAVE_FORMAT_ALAW 8-bit ITU-T G.711 A-law #0x0007 WAVE_FORMAT_MULAW 8-bit ITU-T G.711 ยต-law #0xFFFE WAVE_FORMAT_EXTENSIBLE Determined by SubFormat - if unpacked[0] == 0x0001: - return WavInfoFormat(audio_format = unpacked[0], - channel_count = unpacked[1], - sample_rate = unpacked[2], - byte_rate = unpacked[3], - block_align = unpacked[4], + + #https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html + return WavInfoFormat(audio_format = unpacked[0], + channel_count = unpacked[1], + sample_rate = unpacked[2], + byte_rate = unpacked[3], + block_align = unpacked[4], bits_per_sample = unpacked[5] ) - def _get_bext(self,f,encoding='ascii'): + def _get_info(self, f, encoding): + finder = (chunk.signature for chunk in self.main_list \ + if type(chunk) is ListChunkDescriptor) + if b'INFO' in finder: + return WavInfoChunkReader(f, encoding) + + def _get_bext(self, f, encoding): bext_data = self._find_chunk_data(b'bext',f,default_none=True) - - # description[256] - # originator[32] - # originatorref[32] - # originatordate[10] "YYYY:MM:DD" - # originatortime[8] "HH:MM:SS" - # lowtimeref U32 - # hightimeref U32 - # version U16 - # umid[64] - # - # EBU 3285 fields - # loudnessvalue S16 (in LUFS*100) - # loudnessrange S16 (in LUFS*100) - # maxtruepeak S16 (in dbTB*100) - # maxmomentaryloudness S16 (LUFS*100) - # maxshorttermloudness S16 (LUFS*100) - # reserved[180] - # codinghistory [] - if bext_data is None: - return None - - packstring = "<256s"+ "32s" + "32s" + "10s" + "8s" + "QH" + "64s" + "hhhhh" + "180s" - - rest_starts = struct.calcsize(packstring) - unpacked = struct.unpack(packstring, bext_data[:rest_starts]) - - def sanatize_bytes(bytes): - first_null = next( (index for index, byte in enumerate(bytes) if byte == 0 ), None ) - if first_null is not None: - trimmed = bytes[:first_null] - else: - trimmed = bytes - - decoded = trimmed.decode(encoding) - return decoded - - bext_version = unpacked[6] - if bext_version > 0: - umid = unpacked[6] - else: - umid = None - - if bext_version > 1: - loudness_value = unpacked[8] / 100.0, - loudness_range = unpacked[9] / 100.0 - max_true_peak = unpacked[10] / 100.0 - max_momentary_loudness = unpacked[11] / 100.0 - max_shortterm_loudness = unpacked[12] / 100.0 - else: - loudness_value = None - loudness_range = None - max_true_peak = None - max_momentary_loudness = None - max_shortterm_loudness = None - - return WavBextFormat(description=sanatize_bytes(unpacked[0]), - originator = sanatize_bytes(unpacked[1]), - originator_ref = sanatize_bytes(unpacked[2]), - originator_date = sanatize_bytes(unpacked[3]), - originator_time = sanatize_bytes(unpacked[4]), - time_reference = unpacked[5], - version = unpacked[6], - umid = umid, - loudness_value = loudness_value, - loudness_range = loudness_range, - max_true_peak = max_true_peak, - max_momentary_loudness = max_momentary_loudness, - max_shortterm_loudness = max_shortterm_loudness, - coding_history = sanatize_bytes(bext_data[rest_starts:]) - ) + return WavBextReader(bext_data, encoding) def _get_ixml(self,f): - ixml_data = self._find_chunk_data(b'iXML',f,default_none=True) if ixml_data is None: return None