Spaces:
Running
Running
import argparse | |
from concurrent.futures import ProcessPoolExecutor | |
import logging | |
import os | |
from pathlib import Path | |
import subprocess as sp | |
import sys | |
from tempfile import NamedTemporaryFile | |
import time | |
import typing as tp | |
import warnings | |
import torch | |
import gradio as gr | |
from audiocraft.data.audio_utils import convert_audio | |
from audiocraft.data.audio import audio_write | |
from audiocraft.models import MusicGen | |
MODEL = None # Last used model | |
INTERRUPTING = False | |
pool = ProcessPoolExecutor(4) | |
pool.__enter__() | |
class FileCleaner: | |
def __init__(self, file_lifetime: float = 3600): | |
self.file_lifetime = file_lifetime | |
self.files = [] | |
def add(self, path: tp.Union[str, Path]): | |
self._cleanup() | |
self.files.append((time.time(), Path(path))) | |
def _cleanup(self): | |
now = time.time() | |
for time_added, path in list(self.files): | |
if now - time_added > self.file_lifetime: | |
if path.exists(): | |
path.unlink() | |
self.files.pop(0) | |
else: | |
break | |
file_cleaner = FileCleaner() | |
def load_model(version='facebook/musicgen-small'): | |
global MODEL | |
print("Loading model", version) | |
if MODEL is None or MODEL.name != version: | |
del MODEL | |
torch.cuda.empty_cache() | |
MODEL = None | |
MODEL = MusicGen.get_pretrained(version) | |
def _do_predictions(texts, duration): | |
MODEL.set_generation_params(duration=duration) | |
outputs = MODEL.generate(texts) | |
outputs = outputs.detach().cpu().float() | |
out_wavs = [] | |
for output in outputs: | |
with NamedTemporaryFile("wb", suffix=".wav", delete=False) as file: | |
audio_write( | |
file.name, output, MODEL.sample_rate, strategy="loudness", | |
loudness_headroom_db=16, loudness_compressor=True, add_suffix=False) | |
out_wavs.append(file.name) | |
file_cleaner.add(file.name) | |
return out_wavs | |
def predict(text, duration): | |
load_model('facebook/musicgen-small') | |
wav_files = _do_predictions([text], duration) | |
return wav_files[0] # Return the first file in the list | |
def ui(launch_kwargs): | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" | |
# MusicGen | |
This demo uses the MusicGen model to generate music based on a text prompt. | |
""" | |
) | |
with gr.Row(): | |
text = gr.Text(label="Input Text", interactive=True) | |
duration = gr.Slider(minimum=1, maximum=120, value=10, label="Duration", interactive=True) | |
submit = gr.Button("Submit") | |
with gr.Row(): | |
audio_output = gr.Audio(label="Generated Music", type='filepath') | |
submit.click(predict, inputs=[text, duration], outputs=[audio_output]) | |
gr.Markdown(""" | |
### More details | |
This model generates audio based on a textual description. You can specify the duration of the generated audio. | |
""") | |
demo.queue(max_size=8 * 4).launch(**launch_kwargs) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'--listen', | |
type=str, | |
default='0.0.0.0' if 'SPACE_ID' in os.environ else '127.0.0.1', | |
help='IP to listen on for connections to Gradio', | |
) | |
parser.add_argument( | |
'--username', type=str, default='', help='Username for authentication' | |
) | |
parser.add_argument( | |
'--password', type=str, default='', help='Password for authentication' | |
) | |
parser.add_argument( | |
'--server_port', | |
type=int, | |
default=0, | |
help='Port to run the server listener on', | |
) | |
parser.add_argument( | |
'--inbrowser', action='store_true', help='Open in browser' | |
) | |
parser.add_argument( | |
'--share', action='store_true', help='Share the gradio UI' | |
) | |
args = parser.parse_args() | |
launch_kwargs = {} | |
launch_kwargs['server_name'] = args.listen | |
if args.username and args.password: | |
launch_kwargs['auth'] = (args.username, args.password) | |
if args.server_port: | |
launch_kwargs['server_port'] = args.server_port | |
if args.inbrowser: | |
launch_kwargs['inbrowser'] = args.inbrowser | |
if args.share: | |
launch_kwargs['share'] = args.share | |
logging.basicConfig(level=logging.INFO, stream=sys.stderr) | |
# Show the interface | |
ui(launch_kwargs) | |