Compare commits
6 Commits
04332b73ee
...
master
Author | SHA1 | Date | |
---|---|---|---|
615d8ab279 | |||
9a887c4ed5 | |||
63b140209b | |||
d181ac73b1 | |||
bddce23c76 | |||
b4758dd138 |
@@ -26,3 +26,6 @@ build-backend = "poetry.core.masonry.api"
|
|||||||
ipython = "^9.4.0"
|
ipython = "^9.4.0"
|
||||||
jupyter = "^1.1.1"
|
jupyter = "^1.1.1"
|
||||||
|
|
||||||
|
[tool.poetry.scripts]
|
||||||
|
ucsinfer = "ucsinfer.__main__:ucsinfer"
|
||||||
|
|
||||||
|
@@ -1,7 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
# import csv
|
# import csv
|
||||||
import logging
|
import logging
|
||||||
from subprocess import CalledProcessError
|
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
|
|
||||||
import tqdm
|
import tqdm
|
||||||
@@ -9,7 +8,8 @@ import click
|
|||||||
# from tabulate import tabulate, SEPARATING_LINE
|
# from tabulate import tabulate, SEPARATING_LINE
|
||||||
|
|
||||||
from .inference import InferenceContext, load_ucs
|
from .inference import InferenceContext, load_ucs
|
||||||
from .gather import build_sentence_class_dataset, print_dataset_stats
|
from .gather import (build_sentence_class_dataset, print_dataset_stats,
|
||||||
|
ucs_definitions_generator, scan_metadata, walk_path)
|
||||||
from .recommend import print_recommendation
|
from .recommend import print_recommendation
|
||||||
from .util import ffmpeg_description, parse_ucs
|
from .util import ffmpeg_description, parse_ucs
|
||||||
|
|
||||||
@@ -103,6 +103,11 @@ def recommend(ctx, text, paths, interactive, skip_ucs):
|
|||||||
catlist = [x.catid for x in inference_ctx.catlist]
|
catlist = [x.catid for x in inference_ctx.catlist]
|
||||||
|
|
||||||
for path in paths:
|
for path in paths:
|
||||||
|
_, ext = os.path.splitext(path)
|
||||||
|
|
||||||
|
if ext not in (".wav", ".flac"):
|
||||||
|
continue
|
||||||
|
|
||||||
basename = os.path.basename(path)
|
basename = os.path.basename(path)
|
||||||
if skip_ucs and parse_ucs(basename, catlist):
|
if skip_ucs and parse_ucs(basename, catlist):
|
||||||
continue
|
continue
|
||||||
@@ -152,67 +157,30 @@ def gather(ctx, paths, out, ucs_data):
|
|||||||
"""
|
"""
|
||||||
logger.debug("GATHER mode")
|
logger.debug("GATHER mode")
|
||||||
|
|
||||||
types = ['.wav', '.flac']
|
|
||||||
|
|
||||||
logger.debug(f"Loading category list...")
|
logger.debug(f"Loading category list...")
|
||||||
ucs = load_ucs(full_ucs=ctx.obj['complete_ucs'])
|
ucs = load_ucs(full_ucs=ctx.obj['complete_ucs'])
|
||||||
|
|
||||||
scan_list = []
|
scan_list: list[tuple[str,str]] = []
|
||||||
catid_list = [cat.catid for cat in ucs]
|
catid_list = [cat.catid for cat in ucs]
|
||||||
|
|
||||||
if ucs_data:
|
if ucs_data:
|
||||||
logger.info('Creating dataset for UCS categories instead of from PATH')
|
logger.info('Creating dataset for UCS categories instead of from PATH')
|
||||||
paths = []
|
paths = []
|
||||||
|
|
||||||
walker_p = tqdm.tqdm(total=None, unit='dir', desc="Walking filesystem...")
|
|
||||||
for path in paths:
|
for path in paths:
|
||||||
for dirpath, _, filenames in os.walk(path):
|
scan_list += walk_path(path, catid_list)
|
||||||
logger.info(f"Walking directory {dirpath}")
|
|
||||||
for filename in filenames:
|
|
||||||
walker_p.update()
|
|
||||||
root, ext = os.path.splitext(filename)
|
|
||||||
if ext not in types or filename.startswith("._"):
|
|
||||||
continue
|
|
||||||
|
|
||||||
if (ucs_components := parse_ucs(root, catid_list)):
|
|
||||||
p = os.path.join(dirpath, filename)
|
|
||||||
logger.info(f"Adding path to scan list {p}")
|
|
||||||
scan_list.append((ucs_components.cat_id, p))
|
|
||||||
walker_p.close()
|
|
||||||
|
|
||||||
logger.info(f"Found {len(scan_list)} files to process.")
|
logger.info(f"Found {len(scan_list)} files to process.")
|
||||||
|
|
||||||
def scan_metadata():
|
|
||||||
for pair in tqdm.tqdm(scan_list, unit='files'):
|
|
||||||
logger.info(f"Scanning file with ffprobe: {pair[1]}")
|
|
||||||
try:
|
|
||||||
desc = ffmpeg_description(pair[1])
|
|
||||||
except CalledProcessError as e:
|
|
||||||
logger.error(f"ffprobe returned error (){e.returncode}): " \
|
|
||||||
+ e.stderr)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if desc:
|
|
||||||
yield desc, str(pair[0])
|
|
||||||
else:
|
|
||||||
comps = parse_ucs(os.path.basename(pair[1]), catid_list)
|
|
||||||
assert comps
|
|
||||||
yield comps.fx_name, str(pair[0])
|
|
||||||
|
|
||||||
def ucs_metadata():
|
|
||||||
for cat in ucs:
|
|
||||||
yield cat.explanations, cat.catid
|
|
||||||
yield ", ".join(cat.synonymns), cat.catid
|
|
||||||
|
|
||||||
logger.info("Building dataset...")
|
logger.info("Building dataset...")
|
||||||
|
|
||||||
dataset = build_sentence_class_dataset(chain(scan_metadata(),
|
dataset = build_sentence_class_dataset(
|
||||||
ucs_metadata()),
|
chain(scan_metadata(scan_list, catid_list),
|
||||||
catid_list)
|
ucs_definitions_generator(ucs)),
|
||||||
|
catid_list)
|
||||||
|
|
||||||
logger.info(f"Saving dataset to disk at {out}")
|
logger.info(f"Saving dataset to disk at {out}")
|
||||||
print_dataset_stats(dataset)
|
print_dataset_stats(dataset, catid_list)
|
||||||
dataset.save_to_disk(out)
|
dataset.save_to_disk(out)
|
||||||
|
|
||||||
|
|
||||||
|
@@ -1,9 +1,63 @@
|
|||||||
|
from .inference import Ucs
|
||||||
|
from .util import ffmpeg_description, parse_ucs
|
||||||
|
|
||||||
|
from subprocess import CalledProcessError
|
||||||
|
import os.path
|
||||||
|
|
||||||
from datasets import Dataset, Features, Value, ClassLabel, DatasetInfo
|
from datasets import Dataset, Features, Value, ClassLabel, DatasetInfo
|
||||||
from datasets.dataset_dict import DatasetDict
|
from datasets.dataset_dict import DatasetDict
|
||||||
|
|
||||||
from typing import Iterator
|
from typing import Iterator, Generator
|
||||||
|
|
||||||
from tabulate import tabulate
|
from tabulate import tabulate
|
||||||
|
import logging
|
||||||
|
import tqdm
|
||||||
|
|
||||||
|
def walk_path(path:str, catid_list) -> list[tuple[str,str]]:
|
||||||
|
types = ['.wav', '.flac']
|
||||||
|
logger = logging.getLogger('ucsinfer')
|
||||||
|
walker_p = tqdm.tqdm(total=None, unit='dir', desc="Walking filesystem...")
|
||||||
|
scan_list = []
|
||||||
|
|
||||||
|
for dirpath, _, filenames in os.walk(path):
|
||||||
|
logger.info(f"Walking directory {dirpath}")
|
||||||
|
for filename in filenames:
|
||||||
|
walker_p.update()
|
||||||
|
root, ext = os.path.splitext(filename)
|
||||||
|
if ext not in types or filename.startswith("._"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if (ucs_components := parse_ucs(root, catid_list)):
|
||||||
|
p = os.path.join(dirpath, filename)
|
||||||
|
logger.info(f"Adding path to scan list {p}")
|
||||||
|
scan_list.append((ucs_components.cat_id, p))
|
||||||
|
|
||||||
|
return scan_list
|
||||||
|
|
||||||
|
def scan_metadata(scan_list: list[tuple[str,str]], catid_list: list[str]):
|
||||||
|
logger = logging.getLogger('ucsinfer')
|
||||||
|
for pair in tqdm.tqdm(scan_list, unit='files'):
|
||||||
|
logger.info(f"Scanning file with ffprobe: {pair[1]}")
|
||||||
|
try:
|
||||||
|
desc = ffmpeg_description(pair[1])
|
||||||
|
except CalledProcessError as e:
|
||||||
|
logger.error(f"ffprobe returned error (){e.returncode}): " \
|
||||||
|
+ e.stderr)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if desc:
|
||||||
|
yield desc, str(pair[0])
|
||||||
|
else:
|
||||||
|
comps = parse_ucs(os.path.basename(pair[1]), catid_list)
|
||||||
|
assert comps
|
||||||
|
yield comps.fx_name, str(pair[0])
|
||||||
|
|
||||||
|
|
||||||
|
def ucs_definitions_generator(ucs: list[Ucs]) \
|
||||||
|
-> Generator[tuple[str,str],None, None]:
|
||||||
|
for cat in ucs:
|
||||||
|
yield cat.explanations, cat.catid
|
||||||
|
yield ", ".join(cat.synonymns), cat.catid
|
||||||
|
|
||||||
def print_dataset_stats(dataset: DatasetDict, catlist: list[str]):
|
def print_dataset_stats(dataset: DatasetDict, catlist: list[str]):
|
||||||
|
|
||||||
|
@@ -1,10 +1,13 @@
|
|||||||
# recommend.py
|
# recommend.py
|
||||||
|
|
||||||
from re import match
|
from re import match
|
||||||
|
|
||||||
from .inference import InferenceContext
|
from .inference import InferenceContext
|
||||||
|
|
||||||
|
from tabulate import tabulate
|
||||||
|
|
||||||
def print_recommendation(path: str | None, text: str, ctx: InferenceContext,
|
def print_recommendation(path: str | None, text: str, ctx: InferenceContext,
|
||||||
interactive_rename: bool):
|
interactive_rename: bool, recommend_limit=10):
|
||||||
"""
|
"""
|
||||||
Print recommendations interactively.
|
Print recommendations interactively.
|
||||||
|
|
||||||
@@ -16,7 +19,7 @@ def print_recommendation(path: str | None, text: str, ctx: InferenceContext,
|
|||||||
`print_recommendation` should be called again with this argument.
|
`print_recommendation` should be called again with this argument.
|
||||||
- if retval[2] is a str, this is the catid the user has selected.
|
- if retval[2] is a str, this is the catid the user has selected.
|
||||||
"""
|
"""
|
||||||
recs = ctx.classify_text_ranked(text)
|
recs = ctx.classify_text_ranked(text, limit=recommend_limit)
|
||||||
print("----------")
|
print("----------")
|
||||||
if path:
|
if path:
|
||||||
print(f"Path: {path}")
|
print(f"Path: {path}")
|
||||||
@@ -28,7 +31,7 @@ def print_recommendation(path: str | None, text: str, ctx: InferenceContext,
|
|||||||
print(f"- {i}: {r} ({cat}-{subcat})")
|
print(f"- {i}: {r} ({cat}-{subcat})")
|
||||||
|
|
||||||
if interactive_rename and path is not None:
|
if interactive_rename and path is not None:
|
||||||
response = input("#, t [text], ?, q > ")
|
response = input("(n#), t, c, b, ?, q > ")
|
||||||
|
|
||||||
if m := match(r'^([0-9]+)', response):
|
if m := match(r'^([0-9]+)', response):
|
||||||
selection = int(m.group(1))
|
selection = int(m.group(1))
|
||||||
@@ -43,16 +46,27 @@ def print_recommendation(path: str | None, text: str, ctx: InferenceContext,
|
|||||||
text = m.group(1)
|
text = m.group(1)
|
||||||
return True, text, None
|
return True, text, None
|
||||||
|
|
||||||
elif m := match(r'^c (.*)', response):
|
elif m := match(r'^c (.+)', response):
|
||||||
return True, None, m.group(1)
|
return True, None, m.group(1)
|
||||||
|
|
||||||
|
elif m := match(r'^b (.+)', response):
|
||||||
|
expt = []
|
||||||
|
for cat in ctx.catlist:
|
||||||
|
if cat.catid.startswith(m.group(1)):
|
||||||
|
expt.append([f"{cat.catid}: ({cat.category}-{cat.subcategory})",
|
||||||
|
cat.explanations])
|
||||||
|
|
||||||
|
print(tabulate(expt, maxcolwidths=80))
|
||||||
|
return True, text, None
|
||||||
|
|
||||||
elif response.startswith("?"):
|
elif response.startswith("?"):
|
||||||
print("""
|
print("""
|
||||||
Choices:
|
Choices:
|
||||||
- Enter recommendation number to rename file,
|
- Enter recommendation number to rename file,
|
||||||
- "t [text]" to search for new recommendations based on [text]
|
- "t <text>" to search for new recommendations based on <text>
|
||||||
- "p" re-use the last selected cat-id
|
- "p" re-use the last selected cat-id
|
||||||
- "c [cat]" to type in a category by hand
|
- "c <cat>" to type in a category by hand
|
||||||
|
- "b <cat>" browse category list for categories starting with <cat>
|
||||||
- "?" for this message
|
- "?" for this message
|
||||||
- "q" to quit
|
- "q" to quit
|
||||||
- or any other key to skip this file and continue to next file
|
- or any other key to skip this file and continue to next file
|
||||||
@@ -60,6 +74,8 @@ Choices:
|
|||||||
return True, text, None
|
return True, text, None
|
||||||
elif response.startswith('q'):
|
elif response.startswith('q'):
|
||||||
return (False, None, None)
|
return (False, None, None)
|
||||||
|
else:
|
||||||
|
print()
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user