Merge pull request #8 from elibroftw/master

Updated Code
This commit is contained in:
Jamie Hardt
2020-08-17 11:15:50 -07:00
committed by GitHub
15 changed files with 155 additions and 156 deletions

View File

@@ -2,8 +2,8 @@ dist: xenial
language: python
python:
# - "2.7"
- "3.6"
- "3.5"
- "3.6"
- "3.7"
- "3.8"
script:

View File

@@ -6,7 +6,6 @@
# wavinfo
The `wavinfo` package allows you to probe WAVE and [RF64/WAVE files][eburf64] and extract extended metadata, with an emphasis on film, video and professional music production metadata.
`wavinfo` reads:
@@ -32,8 +31,6 @@ In progress:
[eburf64]:https://tech.ebu.ch/docs/tech/tech3306v1_1.pdf
[info-tags]:https://exiftool.org/TagNames/RIFF.html#Info
## Demonstration
The entry point for wavinfo is the WavInfoReader class.
@@ -67,11 +64,6 @@ The length of the file in frames (interleaved samples) and bytes is available, a
Python 3.5 support is deprecated.
## Other Resources
* For other file formats and ID3 decoding, look at [audio-metadata](https://github.com/thebigmunch/audio-metadata).

View File

@@ -1,6 +1,7 @@
import unittest
import wavinfo
class TestWalk(unittest.TestCase):
def test_walk_metadata(self):
test_file = 'tests/test_files/protools/PT A101_4.A1.wav'

View File

@@ -7,33 +7,34 @@ from .utils import all_files, ffprobe
import wavinfo
class TestWaveInfo(TestCase):
def test_sanity(self):
for wav_file in all_files():
info = wavinfo.WavInfoReader(wav_file)
self.assertTrue(info is not None)
self.assertEqual(info.__repr__(), 'WavInfoReader(%s, %s, %s)'.format(wav_file, 'latin_1', 'ascii'))
self.assertIsNotNone(info)
def test_fmt_against_ffprobe(self):
for wav_file in all_files():
info = wavinfo.WavInfoReader(wav_file)
ffprobe_info = ffprobe(wav_file)
self.assertEqual( info.fmt.channel_count , ffprobe_info['streams'][0]['channels'] )
self.assertEqual( info.fmt.sample_rate , int(ffprobe_info['streams'][0]['sample_rate']) )
self.assertEqual( info.fmt.bits_per_sample, int(ffprobe_info['streams'][0]['bits_per_raw_sample']) )
self.assertEqual(info.fmt.channel_count, ffprobe_info['streams'][0]['channels'])
self.assertEqual(info.fmt.sample_rate, int(ffprobe_info['streams'][0]['sample_rate']))
self.assertEqual(info.fmt.bits_per_sample, int(ffprobe_info['streams'][0]['bits_per_raw_sample']))
if info.fmt.audio_format == 1:
self.assertTrue(ffprobe_info['streams'][0]['codec_name'].startswith('pcm') )
byte_rate = int(ffprobe_info['streams'][0]['sample_rate']) \
* ffprobe_info['streams'][0]['channels'] \
* int(ffprobe_info['streams'][0]['bits_per_raw_sample']) / 8
self.assertEqual( info.fmt.byte_rate , byte_rate )
self.assertTrue(ffprobe_info['streams'][0]['codec_name'].startswith('pcm'))
streams = ffprobe_info['streams'][0]
byte_rate = int(streams['sample_rate']) * streams['channels'] * int(streams['bits_per_raw_sample']) / 8
self.assertEqual(info.fmt.byte_rate, byte_rate)
def test_data_against_ffprobe(self):
for wav_file in all_files():
info = wavinfo.WavInfoReader(wav_file)
ffprobe_info = ffprobe(wav_file)
self.assertEqual( info.data.frame_count, int(ffprobe_info['streams'][0]['duration_ts'] ))
self.assertEqual(info.data.frame_count, int(ffprobe_info['streams'][0]['duration_ts']))
def test_bext_against_ffprobe(self):
for wav_file in all_files():
@@ -41,38 +42,38 @@ class TestWaveInfo(TestCase):
ffprobe_info = ffprobe(wav_file)
if info.bext:
if 'comment' in ffprobe_info['format']['tags']:
self.assertEqual( info.bext.description, ffprobe_info['format']['tags']['comment'] )
self.assertEqual(info.bext.description, ffprobe_info['format']['tags']['comment'])
else:
self.assertEqual( info.bext.description , '')
self.assertEqual(info.bext.description, '')
if 'encoded_by' in ffprobe_info['format']['tags']:
self.assertEqual( info.bext.originator, ffprobe_info['format']['tags']['encoded_by'] )
self.assertEqual(info.bext.originator, ffprobe_info['format']['tags']['encoded_by'])
else:
self.assertEqual( info.bext.originator, '')
self.assertEqual(info.bext.originator, '')
if 'originator_reference' in ffprobe_info['format']['tags']:
self.assertEqual( info.bext.originator_ref, ffprobe_info['format']['tags']['originator_reference'] )
self.assertEqual(info.bext.originator_ref, ffprobe_info['format']['tags']['originator_reference'])
else:
self.assertEqual( info.bext.originator_ref, '')
self.assertEqual(info.bext.originator_ref, '')
# these don't always reflect the bext info
# self.assertEqual( info.bext.originator_date, ffprobe_info['format']['tags']['date'] )
# self.assertEqual( info.bext.originator_time, ffprobe_info['format']['tags']['creation_time'] )
self.assertEqual( info.bext.time_reference, int(ffprobe_info['format']['tags']['time_reference']) )
# self.assertEqual(info.bext.originator_date, ffprobe_info['format']['tags']['date'])
# self.assertEqual(info.bext.originator_time, ffprobe_info['format']['tags']['creation_time'])
self.assertEqual(info.bext.time_reference, int(ffprobe_info['format']['tags']['time_reference']))
if 'coding_history' in ffprobe_info['format']['tags']:
self.assertEqual( info.bext.coding_history, ffprobe_info['format']['tags']['coding_history'] )
self.assertEqual(info.bext.coding_history, ffprobe_info['format']['tags']['coding_history'])
else:
self.assertEqual( info.bext.coding_history, '' )
self.assertEqual(info.bext.coding_history, '')
def test_ixml(self):
expected = {'A101_4.WAV': {'project' : 'BMH', 'scene': 'A101', 'take': '4',
expected = {'A101_4.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '4',
'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124015008231000'},
'A101_3.WAV': {'project' : 'BMH', 'scene': 'A101', 'take': '3',
'A101_3.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '3',
'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124014008228300'},
'A101_2.WAV': {'project' : 'BMH', 'scene': 'A101', 'take': '2',
'A101_2.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '2',
'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124004008218600'},
'A101_1.WAV': {'project' : 'BMH', 'scene': 'A101', 'take': '1',
'A101_1.WAV': {'project': 'BMH', 'scene': 'A101', 'take': '1',
'tape': '18Y12M31', 'family_uid': 'USSDVGR1112089007124001008206300'},
}
@@ -82,15 +83,32 @@ class TestWaveInfo(TestCase):
info = wavinfo.WavInfoReader(wav_file)
e = expected[basename]
self.assertEqual( e['project'], info.ixml.project )
self.assertEqual( e['scene'], info.ixml.scene )
self.assertEqual( e['take'], info.ixml.take )
self.assertEqual( e['tape'], info.ixml.tape )
self.assertEqual( e['family_uid'], info.ixml.family_uid )
self.assertEqual(e['project'], info.ixml.project)
self.assertEqual(e['scene'], info.ixml.scene)
self.assertEqual(e['take'], info.ixml.take)
self.assertEqual(e['tape'], info.ixml.tape)
self.assertEqual(e['family_uid'], info.ixml.family_uid)
for track in info.ixml.track_list:
self.assertIsNotNone(track.channel_index)
if basename == 'A101_4.WAV' and track.channel_index == '1':
self.assertTrue(track.name == 'MKH516 A')
self.assertEqual(track.name, 'MKH516 A')
def test_metadata(self):
file_with_metadata = 'tests/test_files/sound_grinder_pro/new_camera bumb 1.wav'
self.assertTrue(os.path.exists(file_with_metadata))
info = wavinfo.WavInfoReader(file_with_metadata).info
self.assertEqual(info.title, 'camera bumb 1')
self.assertEqual(info.artist, 'Jamie Hardt')
self.assertEqual(info.copyright, '© 2010 Jamie Hardt')
self.assertEqual(info.product, 'Test Sounds') # album
self.assertEqual(info.album, info.product)
self.assertEqual(info.comment, 'Comments')
self.assertEqual(info.software, 'Sound Grinder Pro')
self.assertEqual(info.created_date, '2010-12-28')
self.assertEqual(info.engineer, 'JPH')
self.assertEqual(info.keywords, 'Sound Effect, movement, microphone, bump')
self.assertEqual(info.title, 'camera bumb 1')
self.assertEqual(type(info.to_dict()), dict)
self.assertEqual(type(info.__repr__()), str)

View File

@@ -8,5 +8,6 @@ from unittest import TestCase
import wavinfo
class TestZoomF8(TestCase):
pass

View File

@@ -4,10 +4,11 @@ import subprocess
from subprocess import PIPE
import json
FFPROBE='ffprobe'
FFPROBE = 'ffprobe'
def ffprobe(path):
arguments = [ FFPROBE , "-of", "json" , "-show_format", "-show_streams", path ]
arguments = [FFPROBE, "-of", "json", "-show_format", "-show_streams", path]
if int(sys.version[0]) < 3:
process = subprocess.Popen(arguments, stdout=PIPE)
process.wait()
@@ -27,13 +28,9 @@ def ffprobe(path):
return None
def all_files():
for dirpath, _, filenames in os.walk('tests/test_files'):
for filename in filenames:
_, ext = os.path.splitext(filename)
if ext in ['.wav','.WAV']:
if ext in ['.wav', '.WAV']:
yield os.path.join(dirpath, filename)

View File

@@ -7,6 +7,6 @@ Go to the documentation for wavinfo.WavInfoReader for more information.
from .wave_reader import WavInfoReader
from .riff_parser import WavInfoEOFError
__version__ = '1.5'
__version__ = '1.6'
__author__ = 'Jamie Hardt <jamiehardt@gmail.com>'
__license__ = "MIT"

View File

@@ -4,6 +4,7 @@ from . import WavInfoReader
import sys
import json
def main():
parser = OptionParser()
@@ -28,5 +29,6 @@ def main():
except Exception as e:
print(e)
if __name__ == "__main__":
main()

View File

@@ -6,7 +6,7 @@ RF64Context = namedtuple('RF64Context','sample_count bigchunk_table')
def parse_rf64(stream, signature = b'RF64'):
#print("starting parse_rf64")
# print("starting parse_rf64")
start = stream.tell()
assert( stream.read(4) == b'WAVE' )
@@ -19,13 +19,13 @@ def parse_rf64(stream, signature = b'RF64'):
ds64_data = ds64_chunk.read_data(stream)
assert(len(ds64_data) >= ds64_fields_size )
#print("Read ds64 chunk: len()",len(ds64_data))
# print("Read ds64 chunk: len()",len(ds64_data))
riff_size, data_size, sample_count, length_lookup_table = struct.unpack( ds64_field_spec , ds64_data[0:ds64_fields_size] )
bigchunk_table = {}
chunksize64format = "<4sL"
chunksize64size = struct.calcsize(chunksize64format)
#print("Found chunks64s:", length_lookup_table)
# print("Found chunks64s:", length_lookup_table)
for n in range(length_lookup_table):
bigname, bigsize = struct.unpack_from( chunksize64format , ds64_data, offset= ds64_fields_size )
@@ -35,6 +35,6 @@ def parse_rf64(stream, signature = b'RF64'):
bigchunk_table[signature] = riff_size
stream.seek(start, 0)
#print("returning from parse_rf64, context: ",RF64Context( sample_count=sample_count, bigchunk_table=bigchunk_table ) )
# print("returning from parse_rf64, context: ", RF64Context(sample_count=sample_count, bigchunk_table=bigchunk_table))
return RF64Context( sample_count=sample_count, bigchunk_table=bigchunk_table )

View File

@@ -36,7 +36,7 @@ def parse_list_chunk(stream, length, rf64_context=None):
signature = stream.read(4)
children = []
while (stream.tell() - start + 8) < length:
while stream.tell() - start + 8 < length:
child_chunk = parse_chunk(stream, rf64_context=rf64_context)
children.append(child_chunk)
@@ -56,16 +56,16 @@ def parse_chunk(stream, rf64_context=None):
data_size = struct.unpack('<I', size_bytes)[0]
if data_size == 0xFFFFFFFF:
if rf64_context is None and ident in [b'RF64', b'BW64']:
if rf64_context is None and ident in {b'RF64', b'BW64'}:
rf64_context = parse_rf64(stream=stream, signature=ident)
data_size = rf64_context.bigchunk_table[ident]
displacement = data_size
if (displacement % 2) != 0:
displacement = displacement + 1
if displacement % 2:
displacement += 1
if ident in [b'RIFF', b'LIST', b'RF64', b'BW64']:
if ident in {b'RIFF', b'LIST', b'RF64', b'BW64'}:
return parse_list_chunk(stream=stream, length=data_size, rf64_context=rf64_context)
else:
data_start = stream.tell()

View File

@@ -1,14 +1,10 @@
from typing import Union
import binascii
from functools import reduce
def binary_to_string(binary_value):
retval = ''
for n in range(0, len(binary_value)):
sr = "{:02x}".format(binary_value[n])
retval += sr
return retval
return reduce(lambda val, el: val + "{:02x}".format(el), binary_value, '')
class UMIDParser:
@@ -125,5 +121,3 @@ class UMIDParser:
# return self.raw_umid[32:32]
# else:
# return None

View File

@@ -2,12 +2,13 @@ import struct
import binascii
from .umid_parser import UMIDParser
class WavBextReader:
def __init__(self, bext_data, encoding):
"""
Read Broadcast-WAV extended metadata.
:param best_data: The bytes-like data.
"param encoding: The encoding to use when decoding the text fields of the
:param bext_data: The bytes-like data.
:param encoding: The encoding to use when decoding the text fields of the
BEXT metadata scope. According to EBU Rec 3285 this shall be ASCII.
"""
packstring = "<256s" + "32s" + "32s" + "10s" + "8s" + "QH" + "64s" + "hhhhh" + "180s"
@@ -15,33 +16,29 @@ class WavBextReader:
rest_starts = struct.calcsize(packstring)
unpacked = struct.unpack(packstring, bext_data[:rest_starts])
def sanatize_bytes(bytes):
first_null = next((index for index, byte in enumerate(bytes) if byte == 0), None)
if first_null is not None:
trimmed = bytes[:first_null]
else:
trimmed = bytes
def sanitize_bytes(b):
first_null = next((index for index, byte in enumerate(b) if byte == 0), None)
trimmed = b if first_null is None else b[:first_null]
decoded = trimmed.decode(encoding)
return decoded
#: Description. A free-text field up to 256 characters long.
self.description = sanatize_bytes(unpacked[0])
self.description = sanitize_bytes(unpacked[0])
#: Originator. Usually the name of the encoding application, sometimes
#: a artist name.
self.originator = sanatize_bytes(unpacked[1])
#: A unique identifer for the file, a serial number.
self.originator_ref = sanatize_bytes(unpacked[2])
self.originator = sanitize_bytes(unpacked[1])
#: A unique identifier for the file, a serial number.
self.originator_ref = sanitize_bytes(unpacked[2])
#: Date of the recording, in the format YYY-MM-DD
self.originator_date = sanatize_bytes(unpacked[3])
self.originator_date = sanitize_bytes(unpacked[3])
#: Time of the recording, in the format HH:MM:SS.
self.originator_time = sanatize_bytes(unpacked[4])
self.originator_time = sanitize_bytes(unpacked[4])
#: The sample offset of the start of the file relative to an
#: epoch, usually midnight the day of the recording.
self.time_reference = unpacked[5]
#: A variable-length text field containing a list of processes and
#: and conversions performed on the file.
self.coding_history = sanatize_bytes(bext_data[rest_starts:])
self.coding_history = sanitize_bytes(bext_data[rest_starts:])
#: BEXT version.
self.version = unpacked[6]
#: SMPTE 330M UMID of this audio file, 64 bytes are allocated though the UMID

View File

@@ -17,6 +17,7 @@ class WavInfoChunkReader:
self.copyright = self._get_field(f, b'ICOP')
#: 'IPRD' Product
self.product = self._get_field(f, b'IPRD')
self.album = self.product
#: 'IGNR' Genre
self.genre = self._get_field(f, b'IGNR')
#: 'ISBJ' Supject
@@ -63,6 +64,7 @@ class WavInfoChunkReader:
"""
return {'copyright': self.copyright,
'product': self.product,
'album': self.album,
'genre': self.genre,
'artist': self.artist,
'comment': self.comment,
@@ -78,3 +80,8 @@ class WavInfoChunkReader:
'subject': self.subject,
'technician': self.technician
}
def __repr__(self):
return_val = self.to_dict()
return_val.update({'encoding': self.encoding})
return str(return_val)

View File

@@ -1,4 +1,4 @@
#import xml.etree.ElementTree as ET
# import xml.etree.ElementTree as ET
from lxml import etree as ET
import io
from collections import namedtuple
@@ -6,6 +6,7 @@ from collections import namedtuple
IXMLTrack = namedtuple('IXMLTrack', ['channel_index', 'interleave_index', 'name', 'function'])
class WavIXMLFormat:
"""
iXML recorder metadata.
@@ -16,9 +17,9 @@ class WavIXMLFormat:
:param xml: A bytes-like object containing the iXML payload.
"""
self.source = xml
xmlBytes = io.BytesIO(xml)
xml_bytes = io.BytesIO(xml)
parser = ET.XMLParser(recover=True)
self.parsed = ET.parse(xmlBytes, parser=parser)
self.parsed = ET.parse(xml_bytes, parser=parser)
def _get_text_value(self, xpath):
e = self.parsed.find("./" + xpath)
@@ -87,5 +88,3 @@ class WavIXMLFormat:
The name of this file's file family.
"""
return self._get_text_value("FILE_SET/FAMILY_NAME")

View File

@@ -12,12 +12,14 @@ from .wave_bext_reader import WavBextReader
from .wave_info_reader import WavInfoChunkReader
#: Calculated statistics about the audio data.
WavDataDescriptor = namedtuple('WavDataDescriptor','byte_count frame_count')
WavDataDescriptor = namedtuple('WavDataDescriptor', 'byte_count frame_count')
#: The format of the audio samples.
WavAudioFormat = namedtuple('WavAudioFormat','audio_format channel_count sample_rate byte_rate block_align bits_per_sample')
WavAudioFormat = namedtuple('WavAudioFormat',
'audio_format channel_count sample_rate byte_rate block_align bits_per_sample')
class WavInfoReader():
class WavInfoReader:
"""
Parse a WAV audio file for metadata.
"""
@@ -33,13 +35,18 @@ class WavInfoReader():
:param bext_encoding: The text encoding to use when decoding the string
fields of the Broadcast-WAV extension. Per EBU 3285 this is ASCII
but this parameter is available to you if you encounter a werido.
but this parameter is available to you if you encounter a weirdo.
"""
absolute_path = os.path.abspath(path)
#: `file://` url for the file.
self.url = pathlib.Path(absolute_path).as_uri()
# for __repr__()
self.path = absolute_path
self.info_encoding = info_encoding
self.bext_encoding = bext_encoding
with open(path, 'rb') as f:
chunks = parse_chunk(f)
@@ -57,32 +64,21 @@ class WavInfoReader():
#: :class:`wavinfo.wave_info_reader.WavInfoChunkReader` with RIFF INFO metadata
self.info = self._get_info(f, encoding=info_encoding)
self.data = self._describe_data(f)
self.data = self._describe_data()
def _find_chunk_data(self, ident, from_stream, default_none=False):
chunk_descriptor = None
top_chunks = (chunk for chunk in self.main_list if type(chunk) is ChunkDescriptor)
top_chunks = (chunk for chunk in self.main_list if type(chunk) is ChunkDescriptor and chunk.ident == ident)
chunk_descriptor = next(top_chunks, None) if default_none else next(top_chunks)
return chunk_descriptor.read_data(from_stream) if chunk_descriptor else None
if default_none:
chunk_descriptor = next((chunk for chunk in top_chunks if chunk.ident == ident),None)
else:
chunk_descriptor = next((chunk for chunk in top_chunks if chunk.ident == ident))
if chunk_descriptor:
return chunk_descriptor.read_data(from_stream)
else:
return None
def _describe_data(self,f):
def _describe_data(self):
data_chunk = next(c for c in self.main_list if c.ident == b'data')
return WavDataDescriptor(byte_count= data_chunk.length,
frame_count= int(data_chunk.length / self.fmt.block_align))
return WavDataDescriptor(byte_count=data_chunk.length,
frame_count=int(data_chunk.length / self.fmt.block_align))
def _get_format(self,f):
fmt_data = self._find_chunk_data(b'fmt ',f)
def _get_format(self, f):
fmt_data = self._find_chunk_data(b'fmt ', f)
# The format chunk is
# audio_format U16
@@ -96,42 +92,34 @@ class WavInfoReader():
unpacked = struct.unpack(packstring, fmt_data[:rest_starts])
#0x0001 WAVE_FORMAT_PCM PCM
#0x0003 WAVE_FORMAT_IEEE_FLOAT IEEE float
#0x0006 WAVE_FORMAT_ALAW 8-bit ITU-T G.711 A-law
#0x0007 WAVE_FORMAT_MULAW 8-bit ITU-T G.711 µ-law
#0xFFFE WAVE_FORMAT_EXTENSIBLE Determined by SubFormat
# 0x0001 WAVE_FORMAT_PCM PCM
# 0x0003 WAVE_FORMAT_IEEE_FLOAT IEEE float
# 0x0006 WAVE_FORMAT_ALAW 8-bit ITU-T G.711 A-law
# 0x0007 WAVE_FORMAT_MULAW 8-bit ITU-T G.711 µ-law
# 0xFFFE WAVE_FORMAT_EXTENSIBLE Determined by SubFormat
#https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html
return WavAudioFormat(audio_format = unpacked[0],
channel_count = unpacked[1],
sample_rate = unpacked[2],
byte_rate = unpacked[3],
block_align = unpacked[4],
bits_per_sample = unpacked[5]
# https://sno.phy.queensu.ca/~phil/exiftool/TagNames/RIFF.html
return WavAudioFormat(audio_format=unpacked[0],
channel_count=unpacked[1],
sample_rate=unpacked[2],
byte_rate=unpacked[3],
block_align=unpacked[4],
bits_per_sample=unpacked[5]
)
def _get_info(self, f, encoding):
finder = (chunk.signature for chunk in self.main_list \
if type(chunk) is ListChunkDescriptor)
finder = (chunk.signature for chunk in self.main_list if type(chunk) is ListChunkDescriptor)
if b'INFO' in finder:
return WavInfoChunkReader(f, encoding)
def _get_bext(self, f, encoding):
bext_data = self._find_chunk_data(b'bext',f,default_none=True)
if bext_data:
return WavBextReader(bext_data, encoding)
else:
return None
bext_data = self._find_chunk_data(b'bext', f, default_none=True)
return WavBextReader(bext_data, encoding) if bext_data else None
def _get_ixml(self,f):
ixml_data = self._find_chunk_data(b'iXML',f,default_none=True)
if ixml_data is None:
return None
ixml_string = ixml_data.rstrip(b'\0')
return WavIXMLFormat(ixml_string)
def _get_ixml(self, f):
ixml_data = self._find_chunk_data(b'iXML', f, default_none=True)
return None if ixml_data is None else WavIXMLFormat(ixml_data.rstrip(b'\0'))
def walk(self):
"""
@@ -141,7 +129,7 @@ class WavInfoReader():
metadata field, and the value.
"""
scopes = ('fmt', 'data') #'bext', 'ixml', 'info')
scopes = ('fmt', 'data') # 'bext', 'ixml', 'info')
for scope in scopes:
attr = self.__getattribute__(scope)
@@ -157,3 +145,6 @@ class WavInfoReader():
info_dict = self.info.to_dict()
for key in info_dict.keys():
yield 'info', key, info_dict[key]
def __repr__(self):
return 'WavInfoReader(%s, %s, %s)'.format(self.path, self.info_encoding, self.bext_encoding)