Spaces:
Running
Running
import sys,os | |
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
import numpy as np | |
import argparse | |
import torch | |
import requests | |
from tqdm import tqdm | |
from whisper.model import Whisper, ModelDimensions | |
from whisper.audio import load_audio, pad_or_trim, log_mel_spectrogram | |
def load_model(path, device) -> Whisper: | |
checkpoint = torch.load(path, map_location="cpu") | |
dims = ModelDimensions(**checkpoint["dims"]) | |
# print(dims) | |
model = Whisper(dims) | |
del model.decoder | |
cut = len(model.encoder.blocks) // 4 | |
cut = -1 * cut | |
del model.encoder.blocks[cut:] | |
model.load_state_dict(checkpoint["model_state_dict"], strict=False) | |
model.eval() | |
if not (device == "cpu"): | |
model.half() | |
model.to(device) | |
# torch.save({ | |
# 'dims': checkpoint["dims"], | |
# 'model_state_dict': model.state_dict(), | |
# }, "large-v2.pt") | |
return model | |
def check_and_download_model(): | |
temp_dir = "/tmp" | |
model_path = os.path.join(temp_dir, "large-v2.pt") | |
if os.path.exists(model_path): | |
return f"モデルは既に存在します: {model_path}" | |
url = "https://openaipublic.azureedge.net/main/whisper/models/81f7c96c852ee8fc832187b0132e569d6c3065a3252ed18e56effd0b6a73e524/large-v2.pt" | |
try: | |
response = requests.get(url, stream=True) | |
response.raise_for_status() | |
total_size = int(response.headers.get('content-length', 0)) | |
with open(model_path, 'wb') as f, tqdm( | |
desc=model_path, | |
total=total_size, | |
unit='iB', | |
unit_scale=True, | |
unit_divisor=1024, | |
) as pbar: | |
for data in response.iter_content(chunk_size=1024): | |
size = f.write(data) | |
pbar.update(size) | |
return f"モデルのダウンロードが完了しました: {model_path}" | |
except Exception as e: | |
return f"エラーが発生しました: {e}" | |
def pred_ppg(whisper: Whisper, wavPath, ppgPath, device): | |
audio = load_audio(wavPath) | |
audln = audio.shape[0] | |
ppg_a = [] | |
idx_s = 0 | |
while (idx_s + 15 * 16000 < audln): | |
short = audio[idx_s:idx_s + 15 * 16000] | |
idx_s = idx_s + 15 * 16000 | |
ppgln = 15 * 16000 // 320 | |
# short = pad_or_trim(short) | |
mel = log_mel_spectrogram(short).to(device) | |
if not (device == "cpu"): | |
mel = mel.half() | |
with torch.no_grad(): | |
mel = mel + torch.randn_like(mel) * 0.1 | |
ppg = whisper.encoder(mel.unsqueeze(0)).squeeze().data.cpu().float().numpy() | |
ppg = ppg[:ppgln,] # [length, dim=1024] | |
ppg_a.extend(ppg) | |
if (idx_s < audln): | |
short = audio[idx_s:audln] | |
ppgln = (audln - idx_s) // 320 | |
# short = pad_or_trim(short) | |
mel = log_mel_spectrogram(short).to(device) | |
if not (device == "cpu"): | |
mel = mel.half() | |
with torch.no_grad(): | |
mel = mel + torch.randn_like(mel) * 0.1 | |
ppg = whisper.encoder(mel.unsqueeze(0)).squeeze().data.cpu().float().numpy() | |
ppg = ppg[:ppgln,] # [length, dim=1024] | |
ppg_a.extend(ppg) | |
np.save(ppgPath, ppg_a, allow_pickle=False) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-w", "--wav", help="wav", dest="wav", required=True) | |
parser.add_argument("-p", "--ppg", help="ppg", dest="ppg", required=True) | |
args = parser.parse_args() | |
print(args.wav) | |
print(args.ppg) | |
wavPath = args.wav | |
ppgPath = args.ppg | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
_ =check_and_download_model() | |
whisper = load_model("/tmp/large-v2.pt", device) | |
pred_ppg(whisper, wavPath, ppgPath, device) | |