Spaces:
Runtime error
Runtime error
from typing import Iterable, Tuple | |
import torch | |
import numpy as np | |
from PIL import Image | |
from tqdm.auto import tqdm | |
from librosa.beat import beat_track | |
from diffusers import AudioDiffusionPipeline | |
VERSION = "1.3.2" | |
class AudioDiffusion: | |
def __init__(self, | |
model_id: str = "teticio/audio-diffusion-256", | |
cuda: bool = torch.cuda.is_available(), | |
progress_bar: Iterable = tqdm): | |
"""Class for generating audio using De-noising Diffusion Probabilistic Models. | |
Args: | |
model_id (String): name of model (local directory or Hugging Face Hub) | |
cuda (bool): use CUDA? | |
progress_bar (iterable): iterable callback for progress updates or None | |
""" | |
self.model_id = model_id | |
self.pipe = AudioDiffusionPipeline.from_pretrained(self.model_id) | |
if cuda: | |
self.pipe.to("cuda") | |
self.progress_bar = progress_bar or (lambda _: _) | |
def generate_spectrogram_and_audio( | |
self, | |
steps: int = None, | |
generator: torch.Generator = None, | |
step_generator: torch.Generator = None, | |
eta: float = 0, | |
noise: torch.Tensor = None | |
) -> Tuple[Image.Image, Tuple[int, np.ndarray]]: | |
"""Generate random mel spectrogram and convert to audio. | |
Args: | |
steps (int): number of de-noising steps (defaults to 50 for DDIM, 1000 for DDPM) | |
generator (torch.Generator): random number generator or None | |
step_generator (torch.Generator): random number generator used to de-noise or None | |
eta (float): parameter between 0 and 1 used with DDIM scheduler | |
noise (torch.Tensor): noisy image or None | |
Returns: | |
PIL Image: mel spectrogram | |
(float, np.ndarray): sample rate and raw audio | |
""" | |
images, (sample_rate, | |
audios) = self.pipe(batch_size=1, | |
steps=steps, | |
generator=generator, | |
step_generator=step_generator, | |
eta=eta, | |
noise=noise, | |
return_dict=False) | |
return images[0], (sample_rate, audios[0]) | |
def generate_spectrogram_and_audio_from_audio( | |
self, | |
audio_file: str = None, | |
raw_audio: np.ndarray = None, | |
slice: int = 0, | |
start_step: int = 0, | |
steps: int = None, | |
generator: torch.Generator = None, | |
mask_start_secs: float = 0, | |
mask_end_secs: float = 0, | |
step_generator: torch.Generator = None, | |
eta: float = 0, | |
noise: torch.Tensor = None | |
) -> Tuple[Image.Image, Tuple[int, np.ndarray]]: | |
"""Generate random mel spectrogram from audio input and convert to audio. | |
Args: | |
audio_file (str): must be a file on disk due to Librosa limitation or | |
raw_audio (np.ndarray): audio as numpy array | |
slice (int): slice number of audio to convert | |
start_step (int): step to start from | |
steps (int): number of de-noising steps (defaults to 50 for DDIM, 1000 for DDPM) | |
generator (torch.Generator): random number generator or None | |
mask_start_secs (float): number of seconds of audio to mask (not generate) at start | |
mask_end_secs (float): number of seconds of audio to mask (not generate) at end | |
step_generator (torch.Generator): random number generator used to de-noise or None | |
eta (float): parameter between 0 and 1 used with DDIM scheduler | |
noise (torch.Tensor): noisy image or None | |
Returns: | |
PIL Image: mel spectrogram | |
(float, np.ndarray): sample rate and raw audio | |
""" | |
images, (sample_rate, | |
audios) = self.pipe(batch_size=1, | |
audio_file=audio_file, | |
raw_audio=raw_audio, | |
slice=slice, | |
start_step=start_step, | |
steps=steps, | |
generator=generator, | |
mask_start_secs=mask_start_secs, | |
mask_end_secs=mask_end_secs, | |
step_generator=step_generator, | |
eta=eta, | |
noise=noise, | |
return_dict=False) | |
return images[0], (sample_rate, audios[0]) | |
def loop_it(audio: np.ndarray, | |
sample_rate: int, | |
loops: int = 12) -> np.ndarray: | |
"""Loop audio | |
Args: | |
audio (np.ndarray): audio as numpy array | |
sample_rate (int): sample rate of audio | |
loops (int): number of times to loop | |
Returns: | |
(float, np.ndarray): sample rate and raw audio or None | |
""" | |
_, beats = beat_track(y=audio, sr=sample_rate, units='samples') | |
for beats_in_bar in [16, 12, 8, 4]: | |
if len(beats) > beats_in_bar: | |
return np.tile(audio[beats[0]:beats[beats_in_bar]], loops) | |
return None | |
''' | |
# This code will be migrated to diffusers shortly | |
#-----------------------------------------------------------------------------# | |
import os | |
import warnings | |
from typing import Any, Dict, Optional, Union | |
from diffusers.configuration_utils import ConfigMixin, register_to_config | |
from diffusers.schedulers.scheduling_utils import SchedulerMixin | |
warnings.filterwarnings("ignore") | |
import numpy as np # noqa: E402 | |
import librosa # noqa: E402 | |
from PIL import Image # noqa: E402 | |
class Mel(ConfigMixin, SchedulerMixin): | |
""" | |
Parameters: | |
x_res (`int`): x resolution of spectrogram (time) | |
y_res (`int`): y resolution of spectrogram (frequency bins) | |
sample_rate (`int`): sample rate of audio | |
n_fft (`int`): number of Fast Fourier Transforms | |
hop_length (`int`): hop length (a higher number is recommended for lower than 256 y_res) | |
top_db (`int`): loudest in decibels | |
n_iter (`int`): number of iterations for Griffin Linn mel inversion | |
""" | |
config_name = "mel_config.json" | |
@register_to_config | |
def __init__( | |
self, | |
x_res: int = 256, | |
y_res: int = 256, | |
sample_rate: int = 22050, | |
n_fft: int = 2048, | |
hop_length: int = 512, | |
top_db: int = 80, | |
n_iter: int = 32, | |
): | |
self.hop_length = hop_length | |
self.sr = sample_rate | |
self.n_fft = n_fft | |
self.top_db = top_db | |
self.n_iter = n_iter | |
self.set_resolution(x_res, y_res) | |
self.audio = None | |
def set_resolution(self, x_res: int, y_res: int): | |
"""Set resolution. | |
Args: | |
x_res (`int`): x resolution of spectrogram (time) | |
y_res (`int`): y resolution of spectrogram (frequency bins) | |
""" | |
self.x_res = x_res | |
self.y_res = y_res | |
self.n_mels = self.y_res | |
self.slice_size = self.x_res * self.hop_length - 1 | |
def load_audio(self, audio_file: str = None, raw_audio: np.ndarray = None): | |
"""Load audio. | |
Args: | |
audio_file (`str`): must be a file on disk due to Librosa limitation or | |
raw_audio (`np.ndarray`): audio as numpy array | |
""" | |
if audio_file is not None: | |
self.audio, _ = librosa.load(audio_file, mono=True, sr=self.sr) | |
else: | |
self.audio = raw_audio | |
# Pad with silence if necessary. | |
if len(self.audio) < self.x_res * self.hop_length: | |
self.audio = np.concatenate([self.audio, np.zeros((self.x_res * self.hop_length - len(self.audio),))]) | |
def get_number_of_slices(self) -> int: | |
"""Get number of slices in audio. | |
Returns: | |
`int`: number of spectograms audio can be sliced into | |
""" | |
return len(self.audio) // self.slice_size | |
def get_audio_slice(self, slice: int = 0) -> np.ndarray: | |
"""Get slice of audio. | |
Args: | |
slice (`int`): slice number of audio (out of get_number_of_slices()) | |
Returns: | |
`np.ndarray`: audio as numpy array | |
""" | |
return self.audio[self.slice_size * slice : self.slice_size * (slice + 1)] | |
def get_sample_rate(self) -> int: | |
"""Get sample rate: | |
Returns: | |
`int`: sample rate of audio | |
""" | |
return self.sr | |
def audio_slice_to_image(self, slice: int) -> Image.Image: | |
"""Convert slice of audio to spectrogram. | |
Args: | |
slice (`int`): slice number of audio to convert (out of get_number_of_slices()) | |
Returns: | |
`PIL Image`: grayscale image of x_res x y_res | |
""" | |
S = librosa.feature.melspectrogram( | |
y=self.get_audio_slice(slice), sr=self.sr, n_fft=self.n_fft, hop_length=self.hop_length, n_mels=self.n_mels | |
) | |
log_S = librosa.power_to_db(S, ref=np.max, top_db=self.top_db) | |
bytedata = (((log_S + self.top_db) * 255 / self.top_db).clip(0, 255) + 0.5).astype(np.uint8) | |
image = Image.fromarray(bytedata) | |
return image | |
def image_to_audio(self, image: Image.Image) -> np.ndarray: | |
"""Converts spectrogram to audio. | |
Args: | |
image (`PIL Image`): x_res x y_res grayscale image | |
Returns: | |
audio (`np.ndarray`): raw audio | |
""" | |
bytedata = np.frombuffer(image.tobytes(), dtype="uint8").reshape((image.height, image.width)) | |
log_S = bytedata.astype("float") * self.top_db / 255 - self.top_db | |
S = librosa.db_to_power(log_S) | |
audio = librosa.feature.inverse.mel_to_audio( | |
S, sr=self.sr, n_fft=self.n_fft, hop_length=self.hop_length, n_iter=self.n_iter | |
) | |
return audio | |
#-----------------------------------------------------------------------------# | |
from math import acos, sin | |
from typing import List, Tuple, Union | |
import numpy as np | |
import torch | |
from PIL import Image | |
from diffusers import AutoencoderKL, UNet2DConditionModel, DiffusionPipeline, DDIMScheduler, DDPMScheduler | |
from diffusers.pipeline_utils import AudioPipelineOutput, BaseOutput, ImagePipelineOutput | |
class AudioDiffusionPipeline(DiffusionPipeline): | |
""" | |
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the | |
library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) | |
Parameters: | |
vqae ([`AutoencoderKL`]): Variational AutoEncoder for Latent Audio Diffusion or None | |
unet ([`UNet2DConditionModel`]): UNET model | |
mel ([`Mel`]): transform audio <-> spectrogram | |
scheduler ([`DDIMScheduler` or `DDPMScheduler`]): de-noising scheduler | |
""" | |
_optional_components = ["vqvae"] | |
def __init__( | |
self, | |
vqvae: AutoencoderKL, | |
unet: UNet2DConditionModel, | |
mel: Mel, | |
scheduler: Union[DDIMScheduler, DDPMScheduler], | |
): | |
super().__init__() | |
self.register_modules(unet=unet, scheduler=scheduler, mel=mel, vqvae=vqvae) | |
def get_input_dims(self) -> Tuple: | |
"""Returns dimension of input image | |
Returns: | |
`Tuple`: (height, width) | |
""" | |
input_module = self.vqvae if self.vqvae is not None else self.unet | |
# For backwards compatibility | |
sample_size = ( | |
(input_module.sample_size, input_module.sample_size) | |
if type(input_module.sample_size) == int | |
else input_module.sample_size | |
) | |
return sample_size | |
def get_default_steps(self) -> int: | |
"""Returns default number of steps recommended for inference | |
Returns: | |
`int`: number of steps | |
""" | |
return 50 if isinstance(self.scheduler, DDIMScheduler) else 1000 | |
@torch.no_grad() | |
def __call__( | |
self, | |
batch_size: int = 1, | |
audio_file: str = None, | |
raw_audio: np.ndarray = None, | |
slice: int = 0, | |
start_step: int = 0, | |
steps: int = None, | |
generator: torch.Generator = None, | |
mask_start_secs: float = 0, | |
mask_end_secs: float = 0, | |
step_generator: torch.Generator = None, | |
eta: float = 0, | |
noise: torch.Tensor = None, | |
return_dict=True, | |
) -> Union[ | |
Union[AudioPipelineOutput, ImagePipelineOutput], Tuple[List[Image.Image], Tuple[int, List[np.ndarray]]] | |
]: | |
"""Generate random mel spectrogram from audio input and convert to audio. | |
Args: | |
batch_size (`int`): number of samples to generate | |
audio_file (`str`): must be a file on disk due to Librosa limitation or | |
raw_audio (`np.ndarray`): audio as numpy array | |
slice (`int`): slice number of audio to convert | |
start_step (int): step to start from | |
steps (`int`): number of de-noising steps (defaults to 50 for DDIM, 1000 for DDPM) | |
generator (`torch.Generator`): random number generator or None | |
mask_start_secs (`float`): number of seconds of audio to mask (not generate) at start | |
mask_end_secs (`float`): number of seconds of audio to mask (not generate) at end | |
step_generator (`torch.Generator`): random number generator used to de-noise or None | |
eta (`float`): parameter between 0 and 1 used with DDIM scheduler | |
noise (`torch.Tensor`): noise tensor of shape (batch_size, 1, height, width) or None | |
return_dict (`bool`): if True return AudioPipelineOutput, ImagePipelineOutput else Tuple | |
Returns: | |
`List[PIL Image]`: mel spectrograms (`float`, `List[np.ndarray]`): sample rate and raw audios | |
""" | |
steps = steps or self.get_default_steps() | |
self.scheduler.set_timesteps(steps) | |
step_generator = step_generator or generator | |
# For backwards compatibility | |
if type(self.unet.sample_size) == int: | |
self.unet.sample_size = (self.unet.sample_size, self.unet.sample_size) | |
input_dims = self.get_input_dims() | |
self.mel.set_resolution(x_res=input_dims[1], y_res=input_dims[0]) | |
if noise is None: | |
noise = torch.randn( | |
(batch_size, self.unet.in_channels, self.unet.sample_size[0], self.unet.sample_size[1]), | |
generator=generator, | |
device=self.device, | |
) | |
images = noise | |
mask = None | |
if audio_file is not None or raw_audio is not None: | |
self.mel.load_audio(audio_file, raw_audio) | |
input_image = self.mel.audio_slice_to_image(slice) | |
input_image = np.frombuffer(input_image.tobytes(), dtype="uint8").reshape( | |
(input_image.height, input_image.width) | |
) | |
input_image = (input_image / 255) * 2 - 1 | |
input_images = torch.tensor(input_image[np.newaxis, :, :], dtype=torch.float).to(self.device) | |
if self.vqvae is not None: | |
input_images = self.vqvae.encode(torch.unsqueeze(input_images, 0)).latent_dist.sample( | |
generator=generator | |
)[0] | |
input_images = 0.18215 * input_images | |
if start_step > 0: | |
images[0, 0] = self.scheduler.add_noise(input_images, noise, self.scheduler.timesteps[start_step - 1]) | |
pixels_per_second = ( | |
self.unet.sample_size[1] * self.mel.get_sample_rate() / self.mel.x_res / self.mel.hop_length | |
) | |
mask_start = int(mask_start_secs * pixels_per_second) | |
mask_end = int(mask_end_secs * pixels_per_second) | |
mask = self.scheduler.add_noise(input_images, noise, torch.tensor(self.scheduler.timesteps[start_step:])) | |
for step, t in enumerate(self.progress_bar(self.scheduler.timesteps[start_step:])): | |
model_output = self.unet(images, t)["sample"] | |
if isinstance(self.scheduler, DDIMScheduler): | |
images = self.scheduler.step( | |
model_output=model_output, timestep=t, sample=images, eta=eta, generator=step_generator | |
)["prev_sample"] | |
else: | |
images = self.scheduler.step( | |
model_output=model_output, timestep=t, sample=images, generator=step_generator | |
)["prev_sample"] | |
if mask is not None: | |
if mask_start > 0: | |
images[:, :, :, :mask_start] = mask[:, step, :, :mask_start] | |
if mask_end > 0: | |
images[:, :, :, -mask_end:] = mask[:, step, :, -mask_end:] | |
if self.vqvae is not None: | |
# 0.18215 was scaling factor used in training to ensure unit variance | |
images = 1 / 0.18215 * images | |
images = self.vqvae.decode(images)["sample"] | |
images = (images / 2 + 0.5).clamp(0, 1) | |
images = images.cpu().permute(0, 2, 3, 1).numpy() | |
images = (images * 255).round().astype("uint8") | |
images = list( | |
map(lambda _: Image.fromarray(_[:, :, 0]), images) | |
if images.shape[3] == 1 | |
else map(lambda _: Image.fromarray(_, mode="RGB").convert("L"), images) | |
) | |
audios = list(map(lambda _: self.mel.image_to_audio(_), images)) | |
if not return_dict: | |
return images, (self.mel.get_sample_rate(), audios) | |
return BaseOutput(**AudioPipelineOutput(np.array(audios)[:, np.newaxis, :]), **ImagePipelineOutput(images)) | |
@torch.no_grad() | |
def encode(self, images: List[Image.Image], steps: int = 50) -> np.ndarray: | |
"""Reverse step process: recover noisy image from generated image. | |
Args: | |
images (`List[PIL Image]`): list of images to encode | |
steps (`int`): number of encoding steps to perform (defaults to 50) | |
Returns: | |
`np.ndarray`: noise tensor of shape (batch_size, 1, height, width) | |
""" | |
# Only works with DDIM as this method is deterministic | |
assert isinstance(self.scheduler, DDIMScheduler) | |
self.scheduler.set_timesteps(steps) | |
sample = np.array( | |
[np.frombuffer(image.tobytes(), dtype="uint8").reshape((1, image.height, image.width)) for image in images] | |
) | |
sample = (sample / 255) * 2 - 1 | |
sample = torch.Tensor(sample).to(self.device) | |
for t in self.progress_bar(torch.flip(self.scheduler.timesteps, (0,))): | |
prev_timestep = t - self.scheduler.num_train_timesteps // self.scheduler.num_inference_steps | |
alpha_prod_t = self.scheduler.alphas_cumprod[t] | |
alpha_prod_t_prev = ( | |
self.scheduler.alphas_cumprod[prev_timestep] | |
if prev_timestep >= 0 | |
else self.scheduler.final_alpha_cumprod | |
) | |
beta_prod_t = 1 - alpha_prod_t | |
model_output = self.unet(sample, t)["sample"] | |
pred_sample_direction = (1 - alpha_prod_t_prev) ** (0.5) * model_output | |
sample = (sample - pred_sample_direction) * alpha_prod_t_prev ** (-0.5) | |
sample = sample * alpha_prod_t ** (0.5) + beta_prod_t ** (0.5) * model_output | |
return sample | |
@staticmethod | |
def slerp(x0: torch.Tensor, x1: torch.Tensor, alpha: float) -> torch.Tensor: | |
"""Spherical Linear intERPolation | |
Args: | |
x0 (`torch.Tensor`): first tensor to interpolate between | |
x1 (`torch.Tensor`): seconds tensor to interpolate between | |
alpha (`float`): interpolation between 0 and 1 | |
Returns: | |
`torch.Tensor`: interpolated tensor | |
""" | |
theta = acos(torch.dot(torch.flatten(x0), torch.flatten(x1)) / torch.norm(x0) / torch.norm(x1)) | |
return sin((1 - alpha) * theta) * x0 / sin(theta) + sin(alpha * theta) * x1 / sin(theta) | |
import sys | |
import diffusers | |
class audio_diffusion(): | |
__name__ = 'audio_diffusion' | |
pass | |
sys.modules['audio_diffusion'] = audio_diffusion | |
setattr(audio_diffusion, Mel.__name__, Mel) | |
diffusers.AudioDiffusionPipeline = AudioDiffusionPipeline | |
setattr(diffusers, AudioDiffusionPipeline.__name__, AudioDiffusionPipeline) | |
diffusers.pipeline_utils.LOADABLE_CLASSES['audio_diffusion'] = {} | |
diffusers.pipeline_utils.LOADABLE_CLASSES['audio_diffusion']['Mel'] = ["save_pretrained", "from_pretrained"] | |
''' | |