Spaces:
Running
Running
""" | |
Copyright 2023 Balacoon | |
contains implementation | |
for Revoice request | |
""" | |
import os | |
import asyncio | |
import base64 | |
import hashlib | |
import json | |
import ssl | |
import time | |
from typing import Tuple, Union | |
import numpy as np | |
import resampy | |
import websockets | |
def prepare_audio(audio: Tuple[int, np.ndarray]) -> np.ndarray: | |
""" | |
ensures that audio is in int16 format, 16khz mono | |
""" | |
sr, wav = audio | |
# ensure proper type | |
if wav.dtype == np.int32: | |
max_val = np.max(np.abs(wav)) | |
mult = (32767.0 / 2**31) if max_val > 32768 else 1.0 | |
wav = (wav.astype(np.float32) * mult).astype(np.int16) | |
elif wav.dtype == np.float32 or wav.dtype == np.float64: | |
mult = 32767.0 if np.max(np.abs(wav)) <= 1.0 else 1.0 | |
wav = (wav * mult).astype(np.int16) | |
if wav.ndim == 2: | |
# average channels | |
if wav.shape[0] == 2: | |
wav = np.mean(wav, axis=0, keepdims=False) | |
if wav.shape[1] == 2: | |
wav = np.mean(wav, axis=1, keepdims=False) | |
if wav.ndim != 1: | |
return None | |
# ensure proper sampling rate | |
if sr != 16000: | |
wav = (wav / 32768.0).astype(np.float) | |
wav = resampy.resample(wav, sr, 16000) | |
wav = (wav * 32768.0).astype(np.int16) | |
return wav | |
def create_signature(api_secret: str) -> str: | |
""" | |
helper function that creates signature, | |
required to authentificate the request | |
""" | |
int_time = int(time.time() / 1000) | |
signature_input = (api_secret + str(int_time)).encode() | |
signature = hashlib.sha256(signature_input).hexdigest() | |
return signature | |
async def async_service_request(source_str: str, source: np.ndarray, target: np.ndarray, api_key: str, api_secret: str) -> np.ndarray: | |
if target is None or len(target) == 0: | |
return None | |
ssl_context = ssl.create_default_context() | |
async with websockets.connect( | |
os.environ["endpoint"], close_timeout=1024, ssl=ssl_context | |
) as websocket: | |
request_dict = { | |
"target": base64.b64encode(target.tobytes()).decode("utf-8"), | |
"api_key": api_key, | |
"signature": create_signature(api_secret), | |
} | |
if source_str and len(source_str) > 0: | |
request_dict["source_str"] = source_str | |
elif source is not None and len(source) > 0: | |
request_dict["source"] = base64.b64encode(source.tobytes()).decode("utf-8") | |
else: | |
return None | |
request = json.dumps(request_dict) | |
await websocket.send(request) | |
# read reply | |
result_lst = [] | |
while True: | |
try: | |
data = await asyncio.wait_for(websocket.recv(), timeout=30) | |
result_lst.append(np.frombuffer(data, dtype="int16")) | |
except websockets.exceptions.ConnectionClosed: | |
break | |
except asyncio.TimeoutError: | |
break | |
if data is None: | |
break | |
result = np.concatenate(result_lst) if result_lst else None | |
return result | |
def service_request( | |
source_str: str, source_audio: Tuple[int, np.ndarray], target_audio: Tuple[int, np.ndarray], | |
api_key: str, api_secret: str, | |
) -> Tuple[int, np.ndarray]: | |
""" | |
prepares audio (has to be 16khz mono) | |
and runs request to a voice conversion service | |
""" | |
src = None | |
if source_audio is not None: | |
src = prepare_audio(source_audio) | |
tgt = prepare_audio(target_audio) | |
if tgt is None: | |
return | |
if source_str is None and src is None: | |
return | |
if len(tgt) >= 30 * 16000: | |
# too long | |
return | |
if src is not None and len(src) >= 60 * 16000: | |
return | |
if source_str is not None and len(source_str) > 256: | |
return | |
res = asyncio.run(async_service_request(source_str, src, tgt, api_key, api_secret)) | |
return 16000, res | |