# Copyright 2024 LY Corporation # LY Corporation licenses this file to you under the Apache License, # version 2.0 (the "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at: # https://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import argparse import sys from concurrent.futures import ProcessPoolExecutor from pathlib import Path import librosa import numpy as np import pyloudnorm as pyln import pyworld import soundfile as sf import syllables import yaml from common import getLogger, load_libritts_spk_metadata from promptttspp.utils.textgrid import read_textgrid from tqdm.auto import tqdm def compute_speaking_rate(textgrid_path): labels = read_textgrid(textgrid_path.as_posix(), "words") if len(labels) < 2: return -1 assert len(labels) >= 2 start_time = None end_time = 0 num_syllables = 0 sil_dur = 0 for label in labels: if start_time is None and len(label.name) > 0: start_time = label.start if len(label.name) > 0: num_syllables += syllables.estimate(label.name) else: sil_dur += label.stop - label.start end_time = labels[-1].stop try: rate = num_syllables / (end_time - start_time - sil_dur) except ZeroDivisionError: print(f"warn: {textgrid_path}. {end_time}, {start_time}, {sil_dur}") rate = -1 if rate < 0: print(f"warn: {textgrid_path}. {end_time}, {start_time}, {sil_dur}") rate = -1 return round(rate, 2) def loudness_extract(audio, sampling_rate, n_fft=1024, hop_length=240): stft = librosa.stft(audio, n_fft=n_fft, hop_length=hop_length) + 1e-7 power_spectrum = np.abs(stft) ** 2 bins = librosa.fft_frequencies(sr=sampling_rate, n_fft=n_fft) bins[0] += 1e-5 # To prevent zero division loudness = librosa.perceptual_weighting(power_spectrum, bins) loudness = librosa.db_to_power(loudness) loudness = np.log(np.mean(loudness, axis=0) + 1e-5) return loudness def get_parser(): parser = argparse.ArgumentParser( description="Compute per-utterance statistics", ) parser.add_argument( "in_dir", type=str, help="LibriTTS per-speaker restructured data root" ) parser.add_argument("f0_stats", type=str, help="f0 stats") parser.add_argument( "--out_filename", type=str, default="libritts_r_metadata.yaml", help="Output filename", ) parser.add_argument("--num_jobs", type=int, default=8, help="Number of jobs") parser.add_argument("--debug", action="store_true", help="Debug") return parser def process_utterance(logger, wav_file, textgrid_file, f0_stats): utt_id = wav_file.stem spk = utt_id.split("_")[0] x, sr = sf.read(wav_file) hop_length = int(sr * 0.010) invalid = 0 # Loudness in LUFS block_size = min(0.4, len(x) / sr - 0.01) meter = pyln.Meter(sr, block_size=block_size) loudness_lufs = round(meter.integrated_loudness(x), 2) # Per-frame loudness frame_loudness = loudness_extract(x, sr, n_fft=1024, hop_length=hop_length) # F0 if spk in f0_stats: f0_floor = f0_stats[spk]["f0_floor"] f0_ceil = f0_stats[spk]["f0_ceil"] else: f0_floor = 70 f0_ceil = 800 logger.warning(f"Using default f0_floor={f0_floor}, f0_ceil={f0_ceil}") f0, timeaxis = pyworld.dio( x, sr, frame_period=5, f0_floor=f0_floor, f0_ceil=f0_ceil ) f0 = pyworld.stonemask(x, f0, timeaxis, sr) f0_v = f0[f0 > 0] lf0_v = np.log(f0_v) # e.g. 14_212_000011_000004, 14_212_000011_000009, 14_212_000018_000001 if len(f0_v) == 0: logger.warning(f"{utt_id} has no f0") f0_mean = 0 f0_scale = 1.0 invalid = 1 lf0_mean = 0 lf0_scale = 1.0 else: lf0_mean = np.mean(lf0_v) lf0_scale = np.std(lf0_v) f0_mean = np.mean(f0_v) f0_scale = np.std(f0_v) try: speaking_rate = compute_speaking_rate(textgrid_file) if speaking_rate < 0: invalid = 1 except RuntimeError: logger.warning(f"{utt_id} has no valid speaking rate") speaking_rate = 0 invalid = 1 out = { "raw_loudness_lufs": round(float(loudness_lufs), 2), "raw_loudness_mean": round(float(frame_loudness.mean()), 2), "raw_loudness_scale": round(float(frame_loudness.std()), 2), "raw_f0_mean": round(float(f0_mean), 2), "raw_f0_scale": round(float(f0_scale), 2), "raw_lf0_mean": round(float(lf0_mean), 2), "raw_lf0_scale": round(float(lf0_scale), 2), "raw_speaking_rate": round(float(speaking_rate), 2), "invalid": invalid, } return utt_id, out if __name__ == "__main__": args = get_parser().parse_args(sys.argv[1:]) num_jobs = int(args.num_jobs) spk2meta = load_libritts_spk_metadata(debug=args.debug) in_dir = Path(args.in_dir) logger = getLogger( verbose=100, filename="log/compute_utt_stats.log", name="compute_utt_stats" ) executor = ProcessPoolExecutor(max_workers=num_jobs) futures = [] with open(args.f0_stats) as f: f0_stats = yaml.load(f, Loader=yaml.SafeLoader) for spk, _ in tqdm(spk2meta.items()): spk_in_dir = in_dir / spk spk_mfa_dir = spk_in_dir / "textgrid" if not spk_in_dir.exists(): continue textgrid_files = sorted(list(spk_mfa_dir.glob("*.TextGrid"))) # valid utt_ids utt_ids = [f.stem for f in textgrid_files] wav_files = [spk_in_dir / "wav24k" / f"{utt_id}.wav" for utt_id in utt_ids] for wav_file, textgrid_file in zip(wav_files, textgrid_files): futures.append( executor.submit( process_utterance, logger, wav_file, textgrid_file, f0_stats, ) ) metadata = {} for future in tqdm(futures): utt_id, meta = future.result() metadata[utt_id] = meta with open(args.out_filename, "w") as of: yaml.dump(metadata, of)