mirror of
https://github.com/iluvcapra/wavinfo.git
synced 2025-12-31 08:50:41 +00:00
Formatting, refactoring, __repr__
This commit is contained in:
@@ -7,6 +7,6 @@ Go to the documentation for wavinfo.WavInfoReader for more information.
|
||||
from .wave_reader import WavInfoReader
|
||||
from .riff_parser import WavInfoEOFError
|
||||
|
||||
__version__ = '1.5'
|
||||
__version__ = '1.6'
|
||||
__author__ = 'Jamie Hardt <jamiehardt@gmail.com>'
|
||||
__license__ = "MIT"
|
||||
@@ -4,6 +4,7 @@ from . import WavInfoReader
|
||||
import sys
|
||||
import json
|
||||
|
||||
|
||||
def main():
|
||||
parser = OptionParser()
|
||||
|
||||
@@ -28,5 +29,6 @@ def main():
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -6,7 +6,7 @@ RF64Context = namedtuple('RF64Context','sample_count bigchunk_table')
|
||||
|
||||
|
||||
def parse_rf64(stream, signature = b'RF64'):
|
||||
#print("starting parse_rf64")
|
||||
# print("starting parse_rf64")
|
||||
start = stream.tell()
|
||||
assert( stream.read(4) == b'WAVE' )
|
||||
|
||||
@@ -19,13 +19,13 @@ def parse_rf64(stream, signature = b'RF64'):
|
||||
ds64_data = ds64_chunk.read_data(stream)
|
||||
assert(len(ds64_data) >= ds64_fields_size )
|
||||
|
||||
#print("Read ds64 chunk: len()",len(ds64_data))
|
||||
# print("Read ds64 chunk: len()",len(ds64_data))
|
||||
riff_size, data_size, sample_count, length_lookup_table = struct.unpack( ds64_field_spec , ds64_data[0:ds64_fields_size] )
|
||||
|
||||
bigchunk_table = {}
|
||||
chunksize64format = "<4sL"
|
||||
chunksize64size = struct.calcsize(chunksize64format)
|
||||
#print("Found chunks64s:", length_lookup_table)
|
||||
# print("Found chunks64s:", length_lookup_table)
|
||||
|
||||
for n in range(length_lookup_table):
|
||||
bigname, bigsize = struct.unpack_from( chunksize64format , ds64_data, offset= ds64_fields_size )
|
||||
@@ -35,6 +35,6 @@ def parse_rf64(stream, signature = b'RF64'):
|
||||
bigchunk_table[signature] = riff_size
|
||||
|
||||
stream.seek(start, 0)
|
||||
#print("returning from parse_rf64, context: ",RF64Context( sample_count=sample_count, bigchunk_table=bigchunk_table ) )
|
||||
# print("returning from parse_rf64, context: ", RF64Context(sample_count=sample_count, bigchunk_table=bigchunk_table))
|
||||
return RF64Context( sample_count=sample_count, bigchunk_table=bigchunk_table )
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ def parse_list_chunk(stream, length, rf64_context=None):
|
||||
signature = stream.read(4)
|
||||
|
||||
children = []
|
||||
while (stream.tell() - start + 8) < length:
|
||||
while stream.tell() - start + 8 < length:
|
||||
child_chunk = parse_chunk(stream, rf64_context=rf64_context)
|
||||
children.append(child_chunk)
|
||||
|
||||
@@ -56,16 +56,16 @@ def parse_chunk(stream, rf64_context=None):
|
||||
data_size = struct.unpack('<I', size_bytes)[0]
|
||||
|
||||
if data_size == 0xFFFFFFFF:
|
||||
if rf64_context is None and ident in [b'RF64', b'BW64']:
|
||||
if rf64_context is None and ident in {b'RF64', b'BW64'}:
|
||||
rf64_context = parse_rf64(stream=stream, signature=ident)
|
||||
|
||||
data_size = rf64_context.bigchunk_table[ident]
|
||||
|
||||
displacement = data_size
|
||||
if (displacement % 2) != 0:
|
||||
displacement = displacement + 1
|
||||
if displacement % 2:
|
||||
displacement += 1
|
||||
|
||||
if ident in [b'RIFF', b'LIST', b'RF64', b'BW64']:
|
||||
if ident in {b'RIFF', b'LIST', b'RF64', b'BW64'}:
|
||||
return parse_list_chunk(stream=stream, length=data_size, rf64_context=rf64_context)
|
||||
else:
|
||||
data_start = stream.tell()
|
||||
|
||||
@@ -1,14 +1,10 @@
|
||||
from typing import Union
|
||||
import binascii
|
||||
from functools import reduce
|
||||
|
||||
|
||||
def binary_to_string(binary_value):
|
||||
retval = ''
|
||||
for n in range(0, len(binary_value)):
|
||||
sr = "{:02x}".format(binary_value[n])
|
||||
retval += sr
|
||||
|
||||
return retval
|
||||
return reduce(lambda val, el: val + "{:02x}".format(el), binary_value, '')
|
||||
|
||||
|
||||
class UMIDParser:
|
||||
@@ -125,5 +121,3 @@ class UMIDParser:
|
||||
# return self.raw_umid[32:32]
|
||||
# else:
|
||||
# return None
|
||||
|
||||
|
||||
|
||||
@@ -2,12 +2,13 @@ import struct
|
||||
import binascii
|
||||
from .umid_parser import UMIDParser
|
||||
|
||||
|
||||
class WavBextReader:
|
||||
def __init__(self, bext_data, encoding):
|
||||
"""
|
||||
Read Broadcast-WAV extended metadata.
|
||||
:param best_data: The bytes-like data.
|
||||
"param encoding: The encoding to use when decoding the text fields of the
|
||||
:param bext_data: The bytes-like data.
|
||||
:param encoding: The encoding to use when decoding the text fields of the
|
||||
BEXT metadata scope. According to EBU Rec 3285 this shall be ASCII.
|
||||
"""
|
||||
packstring = "<256s" + "32s" + "32s" + "10s" + "8s" + "QH" + "64s" + "hhhhh" + "180s"
|
||||
@@ -15,33 +16,29 @@ class WavBextReader:
|
||||
rest_starts = struct.calcsize(packstring)
|
||||
unpacked = struct.unpack(packstring, bext_data[:rest_starts])
|
||||
|
||||
def sanatize_bytes(bytes):
|
||||
first_null = next((index for index, byte in enumerate(bytes) if byte == 0), None)
|
||||
if first_null is not None:
|
||||
trimmed = bytes[:first_null]
|
||||
else:
|
||||
trimmed = bytes
|
||||
|
||||
def sanitize_bytes(b):
|
||||
first_null = next((index for index, byte in enumerate(b) if byte == 0), None)
|
||||
trimmed = b if first_null is None else b[:first_null]
|
||||
decoded = trimmed.decode(encoding)
|
||||
return decoded
|
||||
|
||||
#: Description. A free-text field up to 256 characters long.
|
||||
self.description = sanatize_bytes(unpacked[0])
|
||||
self.description = sanitize_bytes(unpacked[0])
|
||||
#: Originator. Usually the name of the encoding application, sometimes
|
||||
#: a artist name.
|
||||
self.originator = sanatize_bytes(unpacked[1])
|
||||
#: A unique identifer for the file, a serial number.
|
||||
self.originator_ref = sanatize_bytes(unpacked[2])
|
||||
self.originator = sanitize_bytes(unpacked[1])
|
||||
#: A unique identifier for the file, a serial number.
|
||||
self.originator_ref = sanitize_bytes(unpacked[2])
|
||||
#: Date of the recording, in the format YYY-MM-DD
|
||||
self.originator_date = sanatize_bytes(unpacked[3])
|
||||
self.originator_date = sanitize_bytes(unpacked[3])
|
||||
#: Time of the recording, in the format HH:MM:SS.
|
||||
self.originator_time = sanatize_bytes(unpacked[4])
|
||||
self.originator_time = sanitize_bytes(unpacked[4])
|
||||
#: The sample offset of the start of the file relative to an
|
||||
#: epoch, usually midnight the day of the recording.
|
||||
self.time_reference = unpacked[5]
|
||||
#: A variable-length text field containing a list of processes and
|
||||
#: and conversions performed on the file.
|
||||
self.coding_history = sanatize_bytes(bext_data[rest_starts:])
|
||||
self.coding_history = sanitize_bytes(bext_data[rest_starts:])
|
||||
#: BEXT version.
|
||||
self.version = unpacked[6]
|
||||
#: SMPTE 330M UMID of this audio file, 64 bytes are allocated though the UMID
|
||||
|
||||
@@ -78,3 +78,8 @@ class WavInfoChunkReader:
|
||||
'subject': self.subject,
|
||||
'technician': self.technician
|
||||
}
|
||||
|
||||
def __repr__(self):
|
||||
return_val = self.to_dict()
|
||||
return_val.update({'encoding', self.encoding})
|
||||
return str(return_val)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#import xml.etree.ElementTree as ET
|
||||
# import xml.etree.ElementTree as ET
|
||||
from lxml import etree as ET
|
||||
import io
|
||||
from collections import namedtuple
|
||||
@@ -6,6 +6,7 @@ from collections import namedtuple
|
||||
|
||||
IXMLTrack = namedtuple('IXMLTrack', ['channel_index', 'interleave_index', 'name', 'function'])
|
||||
|
||||
|
||||
class WavIXMLFormat:
|
||||
"""
|
||||
iXML recorder metadata.
|
||||
@@ -16,9 +17,9 @@ class WavIXMLFormat:
|
||||
:param xml: A bytes-like object containing the iXML payload.
|
||||
"""
|
||||
self.source = xml
|
||||
xmlBytes = io.BytesIO(xml)
|
||||
xml_bytes = io.BytesIO(xml)
|
||||
parser = ET.XMLParser(recover=True)
|
||||
self.parsed = ET.parse(xmlBytes, parser=parser)
|
||||
self.parsed = ET.parse(xml_bytes, parser=parser)
|
||||
|
||||
def _get_text_value(self, xpath):
|
||||
e = self.parsed.find("./" + xpath)
|
||||
@@ -87,5 +88,3 @@ class WavIXMLFormat:
|
||||
The name of this file's file family.
|
||||
"""
|
||||
return self._get_text_value("FILE_SET/FAMILY_NAME")
|
||||
|
||||
|
||||
|
||||
@@ -12,12 +12,14 @@ from .wave_bext_reader import WavBextReader
|
||||
from .wave_info_reader import WavInfoChunkReader
|
||||
|
||||
#: Calculated statistics about the audio data.
|
||||
WavDataDescriptor = namedtuple('WavDataDescriptor','byte_count frame_count')
|
||||
WavDataDescriptor = namedtuple('WavDataDescriptor', 'byte_count frame_count')
|
||||
|
||||
#: The format of the audio samples.
|
||||
WavAudioFormat = namedtuple('WavAudioFormat','audio_format channel_count sample_rate byte_rate block_align bits_per_sample')
|
||||
WavAudioFormat = namedtuple('WavAudioFormat',
|
||||
'audio_format channel_count sample_rate byte_rate block_align bits_per_sample')
|
||||
|
||||
class WavInfoReader():
|
||||
|
||||
class WavInfoReader:
|
||||
"""
|
||||
Parse a WAV audio file for metadata.
|
||||
"""
|
||||
@@ -33,13 +35,18 @@ class WavInfoReader():
|
||||
|
||||
:param bext_encoding: The text encoding to use when decoding the string
|
||||
fields of the Broadcast-WAV extension. Per EBU 3285 this is ASCII
|
||||
but this parameter is available to you if you encounter a werido.
|
||||
but this parameter is available to you if you encounter a weirdo.
|
||||
"""
|
||||
absolute_path = os.path.abspath(path)
|
||||
|
||||
#: `file://` url for the file.
|
||||
self.url = pathlib.Path(absolute_path).as_uri()
|
||||
|
||||
# for __repr__()
|
||||
self.path = absolute_path
|
||||
self.info_encoding = info_encoding
|
||||
self.bext_encoding = bext_encoding
|
||||
|
||||
with open(path, 'rb') as f:
|
||||
chunks = parse_chunk(f)
|
||||
|
||||
@@ -57,32 +64,21 @@ class WavInfoReader():
|
||||
|
||||
#: :class:`wavinfo.wave_info_reader.WavInfoChunkReader` with RIFF INFO metadata
|
||||
self.info = self._get_info(f, encoding=info_encoding)
|
||||
self.data = self._describe_data(f)
|
||||
self.data = self._describe_data()
|
||||
|
||||
def _find_chunk_data(self, ident, from_stream, default_none=False):
|
||||
chunk_descriptor = None
|
||||
top_chunks = (chunk for chunk in self.main_list if type(chunk) is ChunkDescriptor)
|
||||
top_chunks = (chunk for chunk in self.main_list if type(chunk) is ChunkDescriptor and chunk.ident == ident)
|
||||
chunk_descriptor = next(top_chunks, None) if default_none else next(top_chunks)
|
||||
return chunk_descriptor.read_data(from_stream) if chunk_descriptor else None
|
||||
|
||||
if default_none:
|
||||
chunk_descriptor = next((chunk for chunk in top_chunks if chunk.ident == ident),None)
|
||||
else:
|
||||
chunk_descriptor = next((chunk for chunk in top_chunks if chunk.ident == ident))
|
||||
|
||||
if chunk_descriptor:
|
||||
return chunk_descriptor.read_data(from_stream)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def _describe_data(self,f):
|
||||
def _describe_data(self):
|
||||
data_chunk = next(c for c in self.main_list if c.ident == b'data')
|
||||
|
||||
return WavDataDescriptor(byte_count= data_chunk.length,
|
||||
frame_count= int(data_chunk.length / self.fmt.block_align))
|
||||
return WavDataDescriptor(byte_count=data_chunk.length,
|
||||
frame_count=int(data_chunk.length / self.fmt.block_align))
|
||||
|
||||
|
||||
def _get_format(self,f):
|
||||
fmt_data = self._find_chunk_data(b'fmt ',f)
|
||||
def _get_format(self, f):
|
||||
fmt_data = self._find_chunk_data(b'fmt ', f)
|
||||
|
||||
# The format chunk is
|
||||
# audio_format U16
|
||||
@@ -96,42 +92,34 @@ class WavInfoReader():
|
||||
|
||||
unpacked = struct.unpack(packstring, fmt_data[:rest_starts])
|
||||
|
||||
#0x0001 WAVE_FORMAT_PCM PCM
|
||||
#0x0003 WAVE_FORMAT_IEEE_FLOAT IEEE float
|
||||
#0x0006 WAVE_FORMAT_ALAW 8-bit ITU-T G.711 A-law
|
||||
#0x0007 WAVE_FORMAT_MULAW 8-bit ITU-T G.711 µ-law
|
||||
#0xFFFE WAVE_FORMAT_EXTENSIBLE Determined by SubFormat
|
||||
# 0x0001 WAVE_FORMAT_PCM PCM
|
||||
# 0x0003 WAVE_FORMAT_IEEE_FLOAT IEEE float
|
||||
# 0x0006 WAVE_FORMAT_ALAW 8-bit ITU-T G.711 A-law
|
||||
# 0x0007 WAVE_FORMAT_MULAW 8-bit ITU-T G.711 µ-law
|
||||
# 0xFFFE WAVE_FORMAT_EXTENSIBLE Determined by SubFormat
|
||||
|
||||
#https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html
|
||||
return WavAudioFormat(audio_format = unpacked[0],
|
||||
channel_count = unpacked[1],
|
||||
sample_rate = unpacked[2],
|
||||
byte_rate = unpacked[3],
|
||||
block_align = unpacked[4],
|
||||
bits_per_sample = unpacked[5]
|
||||
# https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html
|
||||
return WavAudioFormat(audio_format=unpacked[0],
|
||||
channel_count=unpacked[1],
|
||||
sample_rate=unpacked[2],
|
||||
byte_rate=unpacked[3],
|
||||
block_align=unpacked[4],
|
||||
bits_per_sample=unpacked[5]
|
||||
)
|
||||
|
||||
def _get_info(self, f, encoding):
|
||||
finder = (chunk.signature for chunk in self.main_list \
|
||||
if type(chunk) is ListChunkDescriptor)
|
||||
finder = (chunk.signature for chunk in self.main_list if type(chunk) is ListChunkDescriptor)
|
||||
|
||||
if b'INFO' in finder:
|
||||
return WavInfoChunkReader(f, encoding)
|
||||
|
||||
def _get_bext(self, f, encoding):
|
||||
bext_data = self._find_chunk_data(b'bext',f,default_none=True)
|
||||
if bext_data:
|
||||
return WavBextReader(bext_data, encoding)
|
||||
else:
|
||||
return None
|
||||
bext_data = self._find_chunk_data(b'bext', f, default_none=True)
|
||||
return WavBextReader(bext_data, encoding) if bext_data else None
|
||||
|
||||
def _get_ixml(self,f):
|
||||
ixml_data = self._find_chunk_data(b'iXML',f,default_none=True)
|
||||
if ixml_data is None:
|
||||
return None
|
||||
|
||||
ixml_string = ixml_data.rstrip(b'\0')
|
||||
return WavIXMLFormat(ixml_string)
|
||||
def _get_ixml(self, f):
|
||||
ixml_data = self._find_chunk_data(b'iXML', f, default_none=True)
|
||||
return None if ixml_data else WavIXMLFormat(ixml_data.rstrip(b'\0'))
|
||||
|
||||
def walk(self):
|
||||
"""
|
||||
@@ -141,10 +129,10 @@ class WavInfoReader():
|
||||
metadata field, and the value.
|
||||
"""
|
||||
|
||||
scopes = ('fmt', 'data') #'bext', 'ixml', 'info')
|
||||
scopes = ('fmt', 'data') # 'bext', 'ixml', 'info')
|
||||
|
||||
for scope in scopes:
|
||||
attr = self.__getattribute__(scope)
|
||||
attr: WavAudioFormat = self.__getattribute__(scope)
|
||||
for field in attr._fields:
|
||||
yield scope, field, attr.__getattribute__(field)
|
||||
|
||||
@@ -157,3 +145,6 @@ class WavInfoReader():
|
||||
info_dict = self.info.to_dict()
|
||||
for key in info_dict.keys():
|
||||
yield 'info', key, info_dict[key]
|
||||
|
||||
def __repr__(self):
|
||||
return 'WavInfoReader(%s, %s, %s)'.format(self.path, self.info_encoding, self.bext_encoding)
|
||||
|
||||
Reference in New Issue
Block a user