|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
import subprocess as sp
|
|
import sys
|
|
from tempfile import NamedTemporaryFile
|
|
import time
|
|
import typing as tp
|
|
import warnings
|
|
|
|
from einops import rearrange
|
|
import torch
|
|
import gradio as gr
|
|
|
|
from audiocraft.data.audio_utils import convert_audio
|
|
from audiocraft.data.audio import audio_write
|
|
from audiocraft.models import MusicGen, MultiBandDiffusion
|
|
|
|
|
|
MODEL = None
|
|
SPACE_ID = os.environ.get('SPACE_ID', '')
|
|
INTERRUPTING = False
|
|
MBD = None
|
|
|
|
_old_call = sp.call
|
|
|
|
|
|
def _call_nostderr(*args, **kwargs):
|
|
|
|
kwargs['stderr'] = sp.DEVNULL
|
|
kwargs['stdout'] = sp.DEVNULL
|
|
_old_call(*args, **kwargs)
|
|
|
|
|
|
sp.call = _call_nostderr
|
|
|
|
pool = ProcessPoolExecutor(4)
|
|
pool.__enter__()
|
|
|
|
|
|
def interrupt():
|
|
global INTERRUPTING
|
|
INTERRUPTING = True
|
|
|
|
|
|
class FileCleaner:
|
|
def __init__(self, file_lifetime: float = 3600):
|
|
self.file_lifetime = file_lifetime
|
|
self.files = []
|
|
|
|
def add(self, path: tp.Union[str, Path]):
|
|
self._cleanup()
|
|
self.files.append((time.time(), Path(path)))
|
|
|
|
def _cleanup(self):
|
|
now = time.time()
|
|
for time_added, path in list(self.files):
|
|
if now - time_added > self.file_lifetime:
|
|
if path.exists():
|
|
path.unlink()
|
|
self.files.pop(0)
|
|
else:
|
|
break
|
|
|
|
file_cleaner = FileCleaner()
|
|
|
|
|
|
def make_waveform(*args, **kwargs):
|
|
|
|
be = time.time()
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter('ignore')
|
|
out = gr.make_waveform(*args, **kwargs)
|
|
print("Make a video took", time.time() - be)
|
|
return out
|
|
|
|
|
|
def load_model(version='facebook/musicgen-style'):
|
|
global MODEL
|
|
print("Loading model", version)
|
|
if MODEL is None or MODEL.name != version:
|
|
|
|
del MODEL
|
|
torch.cuda.empty_cache()
|
|
MODEL = None
|
|
MODEL = MusicGen.get_pretrained(version)
|
|
|
|
|
|
def load_diffusion():
|
|
global MBD
|
|
if MBD is None:
|
|
print("loading MBD")
|
|
MBD = MultiBandDiffusion.get_mbd_musicgen()
|
|
|
|
|
|
def _do_predictions(texts, melodies, duration, top_k, top_p, temperature, cfg_coef, cfg_coef_beta, eval_q, excerpt_length, progress=False, gradio_progress=None):
|
|
MODEL.set_generation_params(duration=duration, top_k=top_k, top_p=top_p, temperature=temperature, cfg_coef=cfg_coef, cfg_coef_beta=cfg_coef_beta)
|
|
MODEL.set_style_conditioner_params(eval_q=eval_q, excerpt_length=excerpt_length)
|
|
print("new batch", len(texts), texts, [None if m is None else (m[0], m[1].shape) for m in melodies])
|
|
be = time.time()
|
|
processed_melodies = []
|
|
target_sr = 32000
|
|
target_ac = 1
|
|
for melody in melodies:
|
|
if melody is None:
|
|
processed_melodies.append(None)
|
|
else:
|
|
sr, melody = melody[0], torch.from_numpy(melody[1]).to(MODEL.device).float().t()
|
|
if melody.dim() == 1:
|
|
melody = melody[None]
|
|
melody = melody[..., :int(sr * duration)]
|
|
melody = convert_audio(melody, sr, target_sr, target_ac)
|
|
processed_melodies.append(melody)
|
|
|
|
try:
|
|
if any(m is not None for m in processed_melodies):
|
|
outputs = MODEL.generate_with_chroma(
|
|
descriptions=texts,
|
|
melody_wavs=processed_melodies,
|
|
melody_sample_rate=target_sr,
|
|
progress=progress,
|
|
return_tokens=USE_DIFFUSION
|
|
)
|
|
else:
|
|
outputs = MODEL.generate(texts, progress=progress, return_tokens=USE_DIFFUSION)
|
|
except RuntimeError as e:
|
|
raise gr.Error("Error while generating " + e.args[0])
|
|
if USE_DIFFUSION:
|
|
if gradio_progress is not None:
|
|
gradio_progress(1, desc='Running MultiBandDiffusion...')
|
|
tokens = outputs[1]
|
|
outputs_diffusion = MBD.tokens_to_wav(tokens)
|
|
outputs = torch.cat([outputs[0], outputs_diffusion], dim=0)
|
|
outputs = outputs.detach().cpu().float()
|
|
pending_videos = []
|
|
out_wavs = []
|
|
for output in outputs:
|
|
with NamedTemporaryFile("wb", suffix=".wav", delete=False) as file:
|
|
audio_write(
|
|
file.name, output, MODEL.sample_rate, strategy="loudness",
|
|
loudness_headroom_db=16, loudness_compressor=True, add_suffix=False)
|
|
pending_videos.append(pool.submit(make_waveform, file.name))
|
|
out_wavs.append(file.name)
|
|
file_cleaner.add(file.name)
|
|
out_videos = [pending_video.result() for pending_video in pending_videos]
|
|
for video in out_videos:
|
|
file_cleaner.add(video)
|
|
print("batch finished", len(texts), time.time() - be)
|
|
print("Tempfiles currently stored: ", len(file_cleaner.files))
|
|
return out_videos, out_wavs
|
|
|
|
|
|
def predict_full(model, model_path, decoder, text, melody, duration, topk, topp, temperature, cfg_coef, double_cfg, cfg_coef_beta, eval_q, excerpt_length, progress=gr.Progress()):
|
|
global INTERRUPTING
|
|
global USE_DIFFUSION
|
|
INTERRUPTING = False
|
|
progress(0, desc="Loading model...")
|
|
model_path = model_path.strip()
|
|
if model_path:
|
|
if not Path(model_path).exists():
|
|
raise gr.Error(f"Model path {model_path} doesn't exist.")
|
|
if not Path(model_path).is_dir():
|
|
raise gr.Error(f"Model path {model_path} must be a folder containing "
|
|
"state_dict.bin and compression_state_dict_.bin.")
|
|
model = model_path
|
|
if temperature < 0:
|
|
raise gr.Error("Temperature must be >= 0.")
|
|
if topk < 0:
|
|
raise gr.Error("Topk must be non-negative.")
|
|
if topp < 0:
|
|
raise gr.Error("Topp must be non-negative.")
|
|
if eval_q < 1 or eval_q > 6:
|
|
raise gr.Error("eval_q must be an integer between 1 and 6 included.")
|
|
if excerpt_length > 4.5:
|
|
raise gr.Error("excerpt_length must be <= 4.5 seconds")
|
|
|
|
topk = int(topk)
|
|
eval_q = int(eval_q)
|
|
if decoder == "MultiBand_Diffusion":
|
|
USE_DIFFUSION = True
|
|
progress(0, desc="Loading diffusion model...")
|
|
load_diffusion()
|
|
else:
|
|
USE_DIFFUSION = False
|
|
load_model(model)
|
|
|
|
if double_cfg != "Yes":
|
|
cfg_coef_beta = None
|
|
max_generated = 0
|
|
|
|
def _progress(generated, to_generate):
|
|
nonlocal max_generated
|
|
max_generated = max(generated, max_generated)
|
|
progress((min(max_generated, to_generate), to_generate))
|
|
if INTERRUPTING:
|
|
raise gr.Error("Interrupted.")
|
|
MODEL.set_custom_progress_callback(_progress)
|
|
|
|
videos, wavs = _do_predictions(
|
|
[text], [melody], duration, progress=True,
|
|
top_k=topk, top_p=topp, temperature=temperature, cfg_coef=cfg_coef,
|
|
cfg_coef_beta=cfg_coef_beta, eval_q=eval_q, excerpt_length=excerpt_length,
|
|
gradio_progress=progress)
|
|
if USE_DIFFUSION:
|
|
return videos[0], wavs[0], videos[1], wavs[1]
|
|
return videos[0], wavs[0], None, None
|
|
|
|
|
|
def toggle_audio_src(choice):
|
|
if choice == "mic":
|
|
return gr.update(source="microphone", value=None, label="Microphone")
|
|
else:
|
|
return gr.update(source="upload", value=None, label="File")
|
|
|
|
|
|
def toggle_diffusion(choice):
|
|
if choice == "MultiBand_Diffusion":
|
|
return [gr.update(visible=True)] * 2
|
|
else:
|
|
return [gr.update(visible=False)] * 2
|
|
|
|
|
|
def ui_full(launch_kwargs):
|
|
with gr.Blocks() as interface:
|
|
gr.Markdown(
|
|
"""
|
|
# MusicGen-Style
|
|
This is your private demo for [MusicGen-Style](https://github.com/facebookresearch/audiocraft),
|
|
a simple and controllable model for music generation
|
|
presented at: ["Audio Conditioning for Music Generation via Discrete Bottleneck Features"](https://arxiv.org/abs/2407.12563)
|
|
"""
|
|
)
|
|
with gr.Row():
|
|
with gr.Column():
|
|
with gr.Row():
|
|
text = gr.Text(label="Input Text", interactive=True)
|
|
with gr.Column():
|
|
radio = gr.Radio(["file", "mic"], value="file",
|
|
label="Condition on a melody (optional) File or Mic")
|
|
melody = gr.Audio(sources=["upload"], type="numpy", label="File",
|
|
interactive=True, elem_id="melody-input")
|
|
with gr.Row():
|
|
submit = gr.Button("Submit")
|
|
|
|
_ = gr.Button("Interrupt").click(fn=interrupt, queue=False)
|
|
with gr.Row():
|
|
model = gr.Radio(["facebook/musicgen-style"],
|
|
label="Model", value="facebook/musicgen-style", interactive=True)
|
|
model_path = gr.Text(label="Model Path (custom models)")
|
|
with gr.Row():
|
|
decoder = gr.Radio(["Default", "MultiBand_Diffusion"],
|
|
label="Decoder", value="Default", interactive=True)
|
|
with gr.Row():
|
|
duration = gr.Slider(minimum=1, maximum=120, value=10, label="Duration", interactive=True)
|
|
eval_q = gr.Slider(minimum=1, maximum=6, value=3, step=1, label="Number of RVQ in the style conditioner", interactive=True)
|
|
with gr.Row():
|
|
topk = gr.Number(label="Top-k", value=250, interactive=True)
|
|
topp = gr.Number(label="Top-p", value=0, interactive=True)
|
|
temperature = gr.Number(label="Temperature", value=1.0, interactive=True)
|
|
cfg_coef = gr.Number(label="CFG alpha", value=3.0, interactive=True)
|
|
double_cfg = gr.Radio(["Yes", "No"],
|
|
label="Use Double Classifier Free Guidance (if No, CFG beta is useless). Only use it if you have input text and a melody file.", value="Yes", interactive=True)
|
|
cfg_coef_beta = gr.Number(label="CFG beta (double CFG)", value=5.0, interactive=True)
|
|
excerpt_length = gr.Number(label="length used of the conditioning (has to be <= 4.5 seconds)", value=3.0, interactive=True)
|
|
with gr.Column():
|
|
output = gr.Video(label="Generated Music")
|
|
audio_output = gr.Audio(label="Generated Music (wav)", type='filepath')
|
|
diffusion_output = gr.Video(label="MultiBand Diffusion Decoder")
|
|
audio_diffusion = gr.Audio(label="MultiBand Diffusion Decoder (wav)", type='filepath')
|
|
submit.click(toggle_diffusion, decoder, [diffusion_output, audio_diffusion], queue=False,
|
|
show_progress=False).then(predict_full, inputs=[model, model_path, decoder, text, melody, duration, topk, topp,
|
|
temperature, cfg_coef, double_cfg, cfg_coef_beta, eval_q, excerpt_length],
|
|
outputs=[output, audio_output, diffusion_output, audio_diffusion])
|
|
radio.change(toggle_audio_src, radio, [melody], queue=False, show_progress=False)
|
|
|
|
gr.Examples(
|
|
fn=predict_full,
|
|
examples=[
|
|
[
|
|
"80s New Wave with synthesizer",
|
|
"./assets/electronic.mp3",
|
|
"facebook/musicgen-style",
|
|
"Default"
|
|
],
|
|
],
|
|
inputs=[text, melody, model, decoder],
|
|
outputs=[output]
|
|
)
|
|
gr.Markdown(
|
|
"""
|
|
### More details
|
|
|
|
The model can generate a short music extract based on 3 different input setups:
|
|
1) A textual description. In that case we recommend to use simple (not double!) classifier free guidance with the CFG coef = 3.
|
|
|
|
2) A audio excerpt that it use for style conditioning. The audio shouldn't be longer that 4.5 seconds. If so,
|
|
a random subsequence will be subsample with the length being chosen by the user. We recommend this length to be between 1.5 and 4.5 seconds.
|
|
We recommend simple CFG with the coef = 3.
|
|
|
|
3) Both a textual description and an audio input. In that case the user should use double CFG with alpha=3 and beta=4. Then, if the model
|
|
adheres too much to the text description, the user should lower beta. If the model adheres too much to the style, the user can augment beta.
|
|
The model can generate up to 30 seconds of audio in one pass.
|
|
|
|
The model was trained with description from a stock music catalog, descriptions that will work best
|
|
should include some level of details on the instruments present, along with some intended use case
|
|
(e.g. adding "perfect for a commercial" can somehow help).
|
|
|
|
We also present two way of decoding the audio tokens
|
|
1. Use the default GAN based compression model. It can suffer from artifacts especially
|
|
for crashes, snares etc.
|
|
2. Use [MultiBand Diffusion](https://arxiv.org/abs/2308.02560). Should improve the audio quality,
|
|
at an extra computational cost. When this is selected, we provide both the GAN based decoded
|
|
audio, and the one obtained with MBD.
|
|
|
|
See [github.com/facebookresearch/audiocraft](https://github.com/facebookresearch/audiocraft/blob/main/docs/MUSICGEN_STYLE.md)
|
|
for more details.
|
|
"""
|
|
)
|
|
|
|
interface.queue().launch(**launch_kwargs)
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
'--listen',
|
|
type=str,
|
|
default='0.0.0.0' if 'SPACE_ID' in os.environ else '127.0.0.1',
|
|
help='IP to listen on for connections to Gradio',
|
|
)
|
|
parser.add_argument(
|
|
'--username', type=str, default='', help='Username for authentication'
|
|
)
|
|
parser.add_argument(
|
|
'--password', type=str, default='', help='Password for authentication'
|
|
)
|
|
parser.add_argument(
|
|
'--server_port',
|
|
type=int,
|
|
default=0,
|
|
help='Port to run the server listener on',
|
|
)
|
|
parser.add_argument(
|
|
'--inbrowser', action='store_true', help='Open in browser'
|
|
)
|
|
parser.add_argument(
|
|
'--share', action='store_true', help='Share the gradio UI'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
launch_kwargs = {}
|
|
launch_kwargs['server_name'] = args.listen
|
|
|
|
if args.username and args.password:
|
|
launch_kwargs['auth'] = (args.username, args.password)
|
|
if args.server_port:
|
|
launch_kwargs['server_port'] = args.server_port
|
|
if args.inbrowser:
|
|
launch_kwargs['inbrowser'] = args.inbrowser
|
|
if args.share:
|
|
launch_kwargs['share'] = args.share
|
|
|
|
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
|
|
|
|
|
|
ui_full(launch_kwargs) |