import bpy from os.path import dirname from typing import List from .geom_utils import (speaker_active_time_range, speakers_by_min_distance, speakers_by_start_time) from .object_mix import (ObjectMixPool, object_mixes_from_source_groups) from .speaker_utils import (all_speakers) def group_speakers(speakers, scene) -> List[List[bpy.types.Object]]: """ Accepts a list of speakers and a scene, and returns a list of lists. Each list contains a list of speakers which are guaranteed to not have overlapping sounds. Each of the child lists contains a list of speaker objects in ascending order by start time. Speakers are allocated to lists on the basis of their minimum distance to the camera according to `speakers_by_min_distance`. Closer sounds will appear on the earliest list if there is no overlap. """ def list_can_accept_speaker(speaker_list, speaker_to_test): test_range = speaker_active_time_range(speaker_to_test) def filter_f(spk): spk_range = speaker_active_time_range(spk) return test_range.overlaps(spk_range) result = next((x for x in speaker_list if filter_f(x) is True), None) return result is None by_priority = speakers_by_min_distance(scene, speakers) ret_val = [[]] for spk in by_priority: success = False # flaggy-flag because I can't do a break->continue # from the inner for elem in ret_val: if list_can_accept_speaker(elem, spk): elem.append(spk) success = True break if not success: ret_val.append([spk]) for i in range(len(ret_val)): ret_val[i] = speakers_by_start_time(ret_val[i]) return ret_val # def adm_for_object(scene: bpy.types.Scene, sound_object: ObjectMix, room_size, # adm_builder, object_index): # fps = scene.render.fps # frame_start = scene.frame_start # frame_end = scene.frame_end # # # block_formats = sound_object.adm_block_formats(room_size=room_size) # # created = adm_builder.create_item_objects(track_index=object_index, # name=sound_object.object_name, # block_formats=block_formats) # # created.audio_object.start = Fraction(frame_start, fps) # created.audio_object.duration = Fraction(frame_end - frame_start, fps) # created.track_uid.sampleRate = sound_object.sample_rate # created.track_uid.bitDepth = sound_object.bits_per_sample # def adm_for_scene(scene: bpy.types.Scene, sound_object_mixes: List[ObjectMix], # room_size): # adm_builder = ADMBuilder() # # frame_start = scene.frame_start # frame_end = scene.frame_end # fps = scene.render.fps # # adm_builder.create_programme(audioProgrammeName=scene.name, # start=Fraction(frame_start, fps), # end=Fraction(frame_end, fps)) # # adm_builder.create_content(audioContentName="Objects") # # for object_index, sound_object in enumerate(sound_object_mixes): # adm_for_object(scene, sound_object, room_size, # adm_builder, object_index) # # adm = adm_builder.adm # # generate_ids(adm) # chna = ChnaChunk() # adm_chna.populate_chna_chunk(chna, adm) # # return adm_to_xml(adm), chna # # def bext_data(scene, sample_rate, room_size): # description = "SCENE={};ROOM_SIZE={}\n".format( # scene.name, room_size).encode("ascii") # originator_name = "Blender {}".format( # bpy.app.version_string).encode("ascii") # originator_ref = uuid.uuid1().hex.encode("ascii") # date10 = strftime("%Y-%m-%d").encode("ascii") # time8 = strftime("%H:%M:%S").encode("ascii") # timeref = int(float(scene.frame_start) * # sample_rate / float(scene.render.fps)) # version = 0 # umid = b"\0" * 64 # pad = b"\0" * 190 # # data = struct.pack("<256s32s32s10s8sQH64s190s", description, # originator_name, originator_ref, date10, time8, timeref, # version, umid, pad) # # return data # # # def attach_outfile_metadata(out_format, outfile, room_size, scene, # sound_objects): # adm, chna = adm_for_scene(scene, sound_objects, room_size=room_size) # outfile.axml = lxml.etree.tostring(adm, pretty_print=True) # outfile.chna = chna # outfile.bext = bext_data(scene, out_format.sampleRate, room_size=room_size) # # # def write_outfile_audio_data(outfile, shortest_file, sound_objects): # READ_BLOCK = 1024 # cursor = 0 # # # Not sure if this is necessary but lets do it # for obj in sound_objects: # obj.mixdown_reader.seek(0) # # while True: # remainder = shortest_file - cursor # to_read = min(READ_BLOCK, remainder) # if to_read == 0: # break # # buffer = numpy.zeros((to_read, len(sound_objects))) # for i, sound_object in enumerate(sound_objects): # buffer[:, i] = sound_object.mixdown_reader.read(to_read)[:, 0] # # outfile.write(buffer) # cursor = cursor + to_read # def write_muxed_wav(mix_pool: ObjectMixPool, scene, out_format, room_size, # outfile, shortest_file): # sound_objects = mix_pool.object_mixes # attach_outfile_metadata(out_format, outfile, # room_size, scene, sound_objects) # write_outfile_audio_data(outfile, shortest_file, sound_objects) # # # def mux_adm_from_object_mix_pool(scene, mix_pool: ObjectMixPool, # output_filename, room_size=1.): # object_count = len(mix_pool.object_mixes) # assert object_count > 0 # # out_format = FormatInfoChunk(channelCount=object_count, # sampleRate=scene.render.ffmpeg.audio_mixrate, # bitsPerSample=24) # # with openBw64(output_filename, 'w', formatInfo=out_format) as outfile: # write_muxed_wav(mix_pool, scene, out_format, room_size, # outfile, mix_pool.shortest_file_length) # def print_partition_results(object_groups, sound_sources, too_far_speakers): print("Will create {} objects for {} sources, ignoring {} sources".format( len(object_groups), len(sound_sources), len(too_far_speakers))) for i, group in enumerate(object_groups): print("Object Group %i" % i) for source in group: print(" - %s" % source.name) def partition_sounds_to_objects(scene, max_objects) -> \ tuple[list[list[bpy.types.Object]], list[list[bpy.types.Object]]]: """ Allocates sounds in the scene into non-overlapping lists of sounds. The second return value is the list of sounds that could not be allocated because the max_objects limit was exceeded. Sounds are allocated to lists according to `group_speakers`. """ sound_sources = all_speakers(scene) if len(sound_sources) == 0: return [], [] object_groups = group_speakers(sound_sources, scene) too_far_speakers = [] if len(object_groups) > max_objects: too_far_speakers = object_groups[max_objects:] object_groups = object_groups[0:max_objects] print_partition_results(object_groups, sound_sources, too_far_speakers) return object_groups, too_far_speakers def generate_adm(context: bpy.types.Context, filepath: str, room_size: float, max_objects: int) -> set[str]: scene = context.scene object_groups, _ = partition_sounds_to_objects(scene, max_objects) if len(object_groups) == 0: return {'FINISHED'} mix_groups = object_mixes_from_source_groups(object_groups, scene=scene, base_dir=dirname(filepath)) with ObjectMixPool(object_mixes=mix_groups) as pool: # here is where we place the mix objects into the session pass # print("generate_adm exiting") return {'FINISHED'}