Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, UploadFile, File, Body, Form | |
from pathlib import Path | |
from fastapi.middleware.cors import CORSMiddleware | |
from typing import List | |
import numpy as np | |
from resemblyzer import preprocess_wav, VoiceEncoder | |
from itertools import groupby | |
from pathlib import Path | |
from tqdm import tqdm | |
import os | |
from sklearn.metrics.pairwise import cosine_similarity, cosine_distances | |
import glob | |
UPLOAD_DIR = Path() / "uploads" | |
UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
os.environ[ 'NUMBA_CACHE_DIR' ] = '/tmp/' | |
app = FastAPI() | |
# Add a CORS middleware to allow cross-origin requests from the frontend | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# del all files in uploads folder | |
def delFiles(): | |
files = glob.glob('uploads/*') | |
for f in files: | |
os.remove(f) | |
# main function which returns the name of person which has highest similarity index with test audio | |
async def predictor(names, file_uploads, usersNum, recordingsNum): | |
speaker_embed_list = [] | |
encoder = VoiceEncoder() | |
# Iterating over list of files corresponding to each user | |
speaker_wavs_list = [] | |
fileInd = 0 | |
names.pop() # to remove key named "test" | |
# print("file_uploads ", file_uploads, "recordingNums ", recordingsNum) | |
for name in names: | |
wav_fpaths = [] | |
for ind in range(int(recordingsNum)): | |
print("inside yo") | |
file_upload = file_uploads[fileInd] | |
data = await file_upload.read() | |
# appending person's name to the his/her recordings | |
filename = name+"¬"+file_upload.filename | |
file_path = UPLOAD_DIR / filename | |
with open(file_path, "wb") as file_object: | |
file_object.write(data) | |
wav_fpaths.append(Path(file_path)) | |
fileInd += 1 | |
# print("wav_fpaths len", len(wav_fpaths), "name", name) | |
try: | |
speaker_wavs = {speaker: list(map(preprocess_wav, wav_fpaths)) for speaker, wav_fpaths in | |
groupby(tqdm(wav_fpaths, "Preprocessing wavs", len(wav_fpaths), unit="wavs"), | |
lambda wav_fpath: os.path.basename(wav_fpath).split("¬")[0])} # extracting person's name from file name | |
speaker_wavs_list.append(speaker_wavs) | |
except Exception as e: | |
print("An exception occurred:", type(error).__name__) | |
print("Exception details:", error) | |
# make a list of the pre-processed audios ki arrays | |
for sp_wvs in speaker_wavs_list: | |
speaker_embed_list.append( | |
np.array([encoder.embed_speaker(wavs) for wavs in sp_wvs.values()])) | |
# print("preprocessed audio ki array ", speaker_embed_list) | |
# making preprocessed test audio | |
wav_fpaths = [] | |
file_upload = file_uploads[-1] | |
data = await file_upload.read() | |
# print("data", data) | |
filename = "test¬"+file_upload.filename | |
file_path = UPLOAD_DIR / filename | |
# print("filepath", file_path) | |
with open(file_path, "wb") as file_object: | |
file_object.write(data) | |
wav_fpaths.append(Path(file_path)) | |
# print("wav_fpath", wav_fpaths) | |
print("about to test") | |
try: | |
test_pos_wavs = {speaker: list(map(preprocess_wav, wav_fpaths)) for speaker, wav_fpaths in | |
groupby(tqdm(wav_fpaths, "Preprocessing wavs", len(wav_fpaths), unit="wavs"), | |
lambda wav_fpath: "test")} | |
# print("test_pos_wavs", test_pos_wavs) | |
except Exception as error: | |
print("An exception occurred:", type(error).__name__) | |
print("Exception details:", error) | |
test_pos_emb = np.array([encoder.embed_speaker(wavs) | |
for wavs in test_pos_wavs.values()]) | |
# calculates cosine similarity between the ground truth (test file) and registered audios | |
speakers = {} | |
val = 0 | |
for spkr_embd in speaker_embed_list: | |
key_val = names[val] | |
spkr_sim = cosine_similarity(spkr_embd, test_pos_emb)[0][0] | |
speakers[key_val] = spkr_sim | |
val += 1 | |
norm = [float(i)/sum(speakers.values()) for i in speakers.values()] | |
for i in range(len(norm)): | |
key_val = names[i] | |
speakers[key_val] = norm[i] | |
identified = max(speakers, key=speakers.get) | |
print("\nThe identity of the test speaker:\n", identified, "with a similarity with test of", | |
speakers[identified]*100, "percent match as compared to all.") | |
return identified | |
# Update the function parameter to use the Body module and media_type | |
async def resultGenerator(names: List[str] = Form(...), file_uploads: List[UploadFile] = File(...), usersNum: str = Form(...), recordingsNum: str = Form(...)): | |
# equal to 2 because names list is of the form [name1, name2,..., test] | |
try: | |
if (len(names) <= 2): | |
return {"error: ", "Incorrect data provided"} | |
else: | |
result = await predictor(names, file_uploads, usersNum, recordingsNum) | |
print('## Test Audio Belonged To: {}'.format(result)) | |
delFiles() # to delete all files from backend, used in this identification | |
return {"result": result} | |
except: | |
return {"error": "Server not responding"} | |