| | import argparse |
| | import logging |
| | import os |
| | import sys |
| | import time |
| | import typing as tp |
| | import warnings |
| | import base64 |
| | from pathlib import Path |
| | from tempfile import NamedTemporaryFile |
| |
|
| | import gradio as gr |
| | import requests |
| |
|
| | from theme_wave import theme, css |
| |
|
| | |
| | MLLM_API_URL = "http://localhost:8000" |
| | MUSICGEN_API_URL = "https://your-musicgen-api-endpoint.com" |
| |
|
| | |
| | INTERRUPTING = False |
| |
|
| |
|
| | |
| | def interrupt(): |
| | global INTERRUPTING |
| | INTERRUPTING = True |
| |
|
| |
|
| | class FileCleaner: |
| | def __init__(self, file_lifetime: float = 3600): |
| | self.file_lifetime = file_lifetime |
| | self.files = [] |
| |
|
| | def add(self, path: tp.Union[str, Path]): |
| | self._cleanup() |
| | self.files.append((time.time(), Path(path))) |
| |
|
| | def _cleanup(self): |
| | now = time.time() |
| | for time_added, path in list(self.files): |
| | if now - time_added > self.file_lifetime: |
| | if path.exists(): |
| | try: |
| | path.unlink() |
| | except Exception as e: |
| | print(f"Error deleting file {path}: {e}") |
| | self.files.pop(0) |
| | else: |
| | break |
| |
|
| |
|
| | file_cleaner = FileCleaner() |
| |
|
| |
|
| | def make_waveform(*args, **kwargs): |
| | with warnings.catch_warnings(): |
| | warnings.simplefilter("ignore") |
| | return gr.make_waveform(*args, **kwargs) |
| |
|
| |
|
| | |
| |
|
| | def get_mllm_description(media_path: str, user_prompt: str) -> str: |
| | """Gets the music description from the MLLM API.""" |
| |
|
| | try: |
| | if media_path.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): |
| | |
| | with open(media_path, "rb") as f: |
| | video_data = f.read() |
| | encoded_video = base64.b64encode(video_data).decode("utf-8") |
| | response = requests.post( |
| | f"{MLLM_API_URL}/describe_video/", |
| | json={"video": encoded_video, "user_prompt": user_prompt}, |
| | ) |
| | elif media_path.lower().endswith((".png", ".jpg", ".jpeg", ".gif", ".bmp")): |
| | |
| | with open(media_path, "rb") as f: |
| | image_data = f.read() |
| | encoded_image = base64.b64encode(image_data).decode("utf-8") |
| | response = requests.post( |
| | f"{MLLM_API_URL}/describe_image/", |
| | json={"image": encoded_image, "user_prompt": user_prompt}, |
| | ) |
| | else: |
| | response = requests.post( |
| | f"{MLLM_API_URL}/describe_text/", json={"user_prompt": user_prompt} |
| | ) |
| |
|
| | response.raise_for_status() |
| | return response.json()["description"] |
| |
|
| | except requests.exceptions.RequestException as e: |
| | raise gr.Error(f"Error communicating with MLLM API: {e}") |
| | except Exception as e: |
| | raise gr.Error(f"An unexpected error occurred: {e}") |
| |
|
| |
|
| | def generate_music_from_api( |
| | description: str, |
| | melody=None, |
| | duration: int = 10, |
| | model_version: str = "facebook/musicgen-stereo-melody-large", |
| | topk: int = 250, |
| | topp: float = 0, |
| | temperature: float = 1.0, |
| | cfg_coef: float = 3.0, |
| | use_diffusion: bool = False, |
| | ): |
| | """Generates music using the MusicGen API.""" |
| | |
| | |
| | payload = { |
| | "description": description, |
| | "duration": duration, |
| | "model_version": model_version, |
| | "topk": topk, |
| | "topp": topp, |
| | "temperature": temperature, |
| | "cfg_coef": cfg_coef, |
| | "use_diffusion": use_diffusion |
| | } |
| | |
| | |
| | if melody is not None: |
| | sr, melody_data = melody |
| | |
| | melody_bytes = melody_data.tobytes() if hasattr(melody_data, 'tobytes') else melody_data.tostring() |
| | encoded_melody = base64.b64encode(melody_bytes).decode("utf-8") |
| | payload["melody"] = encoded_melody |
| | payload["melody_sample_rate"] = sr |
| | |
| | try: |
| | response = requests.post(f"{MUSICGEN_API_URL}/generate", json=payload) |
| | response.raise_for_status() |
| | |
| | result = response.json() |
| | |
| | |
| | audio_data = base64.b64decode(result["audio"]) |
| | diffusion_audio_data = base64.b64decode(result.get("diffusion_audio", "")) if use_diffusion else None |
| | |
| | |
| | output_paths = [] |
| | with NamedTemporaryFile("wb", suffix=".wav", delete=False) as file: |
| | file.write(audio_data) |
| | output_paths.append(file.name) |
| | file_cleaner.add(file.name) |
| | |
| | if diffusion_audio_data: |
| | with NamedTemporaryFile("wb", suffix=".wav", delete=False) as file: |
| | file.write(diffusion_audio_data) |
| | output_paths.append(file.name) |
| | file_cleaner.add(file.name) |
| | |
| | return output_paths[0], output_paths[1] if len(output_paths) > 1 else None |
| | |
| | except requests.exceptions.RequestException as e: |
| | raise gr.Error(f"Error communicating with MusicGen API: {e}") |
| | except Exception as e: |
| | raise gr.Error(f"An unexpected error occurred: {e}") |
| |
|
| |
|
| | |
| |
|
| | def predict_full( |
| | model_version, |
| | media_type, |
| | image_input, |
| | video_input, |
| | text_prompt, |
| | melody, |
| | duration, |
| | topk, |
| | topp, |
| | temperature, |
| | cfg_coef, |
| | decoder, |
| | progress=gr.Progress(), |
| | ): |
| | global INTERRUPTING |
| | INTERRUPTING = False |
| | use_diffusion = decoder == "MultiBand_Diffusion" |
| |
|
| | if media_type == "Image": |
| | media = image_input if image_input else None |
| | elif media_type == "Video": |
| | media = video_input if video_input else None |
| | else: |
| | media = None |
| |
|
| | |
| | progress(progress=None, desc="Generating music description...") |
| | if media: |
| | try: |
| | music_description = get_mllm_description(media, text_prompt) |
| | except Exception as e: |
| | raise gr.Error(str(e)) |
| | else: |
| | music_description = text_prompt |
| |
|
| | |
| | progress(progress=None, desc="Generating music via API...") |
| | try: |
| | output_audio_path, output_audio_mbd_path = generate_music_from_api( |
| | description=music_description, |
| | melody=melody, |
| | duration=duration, |
| | model_version=model_version, |
| | topk=topk, |
| | topp=topp, |
| | temperature=temperature, |
| | cfg_coef=cfg_coef, |
| | use_diffusion=use_diffusion |
| | ) |
| | except Exception as e: |
| | raise gr.Error(f"Error generating music: {e}") |
| |
|
| | if INTERRUPTING: |
| | raise gr.Error("Generation interrupted.") |
| |
|
| | return output_audio_path, output_audio_mbd_path |
| |
|
| |
|
| | Wave = theme() |
| |
|
| |
|
| | def create_ui(launch_kwargs=None): |
| | """Creates and launches the Gradio UI.""" |
| |
|
| | if launch_kwargs is None: |
| | launch_kwargs = {} |
| |
|
| | def interrupt_handler(): |
| | interrupt() |
| |
|
| | with gr.Blocks(theme=Wave, css=css) as interface: |
| |
|
| | gr.Markdown( |
| | """ |
| | <div style="text-align: center;"> |
| | <h1>WeaveWave</h1> |
| | <h2>Towards Multimodal Music Generation</h2> |
| | </div> |
| | """ |
| | ) |
| |
|
| | with gr.Row(): |
| | with gr.Column(): |
| | with gr.Group(): |
| | image_input = gr.Image( |
| | value="./assets/WeaveWave.png", |
| | label="Input Image", |
| | type="filepath", |
| | height=320, |
| | visible=True, |
| | ) |
| | video_input = gr.Video( |
| | value="./assets/example_video_1.mp4", |
| | label="Input Video", |
| | height=320, |
| | visible=False, |
| | ) |
| | with gr.Row(): |
| | media_type = gr.Radio( |
| | choices=["Image", "Video"], |
| | value="Image", |
| | label="", |
| | interactive=True, |
| | elem_classes="center-radio compact-radio", |
| | ) |
| |
|
| | def toggle_media(choice): |
| | return { |
| | image_input: gr.update(visible=(choice == "Image")), |
| | video_input: gr.update(visible=(choice == "Video")), |
| | } |
| |
|
| | media_type.change( |
| | toggle_media, inputs=media_type, outputs=[image_input, video_input] |
| | ) |
| | with gr.Column(): |
| | text_input = gr.Text( |
| | value="Anything you like", |
| | label="User Prompt", |
| | ) |
| | melody_input = gr.Audio( |
| | value="./assets/bach.mp3", |
| | type="numpy", |
| | label="Melody", |
| | ) |
| | with gr.Row(): |
| | submit_button = gr.Button("Generate Music", variant="primary") |
| | interrupt_button = gr.Button("Interrupt", variant="stop") |
| | with gr.Row(): |
| | model_version = gr.Dropdown( |
| | [ |
| | "facebook/musicgen-melody", |
| | "facebook/musicgen-medium", |
| | "facebook/musicgen-small", |
| | "facebook/musicgen-large", |
| | "facebook/musicgen-melody-large", |
| | "facebook/musicgen-stereo-small", |
| | "facebook/musicgen-stereo-medium", |
| | "facebook/musicgen-stereo-melody", |
| | "facebook/musicgen-stereo-large", |
| | "facebook/musicgen-stereo-melody-large", |
| | ], |
| | label="MusicGen Model", |
| | value="facebook/musicgen-stereo-melody-large", |
| | ) |
| | duration = gr.Slider( |
| | minimum=1, maximum=120, value=10, label="Duration (seconds)" |
| | ) |
| | with gr.Row(): |
| | topk = gr.Number(label="Top-k", value=250) |
| | topp = gr.Number(label="Top-p", value=0) |
| | temperature = gr.Number(label="Temperature", value=1.0) |
| | cfg_coef = gr.Number(label="Classifier-Free Guidance", value=3.0) |
| | decoder = gr.Dropdown( |
| | ["Default", "MultiBand_Diffusion"], |
| | label="Decoder", |
| | value="Default", |
| | interactive=True, |
| | ) |
| |
|
| | with gr.Row(): |
| | output_audio = gr.Audio(label="Generated Music", type="filepath") |
| | output_audio_mbd = gr.Audio( |
| | label="MultiBand Diffusion Decoder", type="filepath" |
| | ) |
| |
|
| | submit_button.click( |
| | predict_full, |
| | inputs=[ |
| | model_version, |
| | media_type, |
| | image_input, |
| | video_input, |
| | text_input, |
| | melody_input, |
| | duration, |
| | topk, |
| | topp, |
| | temperature, |
| | cfg_coef, |
| | decoder, |
| | ], |
| | outputs=[output_audio, output_audio_mbd], |
| | ) |
| | interrupt_button.click(interrupt_handler, [], []) |
| |
|
| | gr.Examples( |
| | examples=[ |
| | [ |
| | "Image", |
| | "./assets/example_image_1.jpg", |
| | None, |
| | "Acoustic guitar solo. Country and folk music.", |
| | None, |
| | "facebook/musicgen-stereo-melody-large", |
| | 10, |
| | 250, |
| | 0, |
| | 1.0, |
| | 3.0, |
| | "MultiBand_Diffusion", |
| | ], |
| | [ |
| | "Video", |
| | None, |
| | "./assets/example_video_1.mp4", |
| | "Space Rock, Synthwave, 80s. Electric guitar and Drums.", |
| | None, |
| | "facebook/musicgen-stereo-melody-large", |
| | 10, |
| | 250, |
| | 0, |
| | 1.0, |
| | 3.0, |
| | "MultiBand_Diffusion", |
| | ], |
| | [ |
| | None, |
| | None, |
| | None, |
| | "An 80s driving pop song with heavy drums and synth pads in the background", |
| | "./assets/bach.mp3", |
| | "facebook/musicgen-stereo-melody-large", |
| | 10, |
| | 250, |
| | 0, |
| | 1.0, |
| | 3.0, |
| | "MultiBand_Diffusion", |
| | ], |
| | ], |
| | inputs=[ |
| | media_type, |
| | image_input, |
| | video_input, |
| | text_input, |
| | melody_input, |
| | model_version, |
| | duration, |
| | topk, |
| | topp, |
| | temperature, |
| | cfg_coef, |
| | decoder, |
| | ], |
| | ) |
| | interface.queue().launch(**launch_kwargs) |
| | return interface |
| |
|
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument( |
| | "--listen", |
| | type=str, |
| | default="0.0.0.0" if "SPACE_ID" in os.environ else "127.0.0.1", |
| | help="IP to listen on", |
| | ) |
| | parser.add_argument( |
| | "--username", type=str, default="", help="Username for authentication" |
| | ) |
| | parser.add_argument( |
| | "--password", type=str, default="", help="Password for authentication" |
| | ) |
| | parser.add_argument( |
| | "--server_port", type=int, default=0, help="Port to run the server on" |
| | ) |
| | parser.add_argument("--inbrowser", action="store_true", help="Open in browser") |
| | parser.add_argument("--share", action="store_true", help="Share the Gradio UI") |
| |
|
| | args = parser.parse_args() |
| |
|
| | launch_kwargs = {} |
| | launch_kwargs["server_name"] = args.listen |
| | if args.username and args.password: |
| | launch_kwargs["auth"] = (args.username, args.password) |
| | if args.server_port: |
| | launch_kwargs["server_port"] = args.server_port |
| | if args.inbrowser: |
| | launch_kwargs["inbrowser"] = args.inbrowser |
| | if args.share: |
| | launch_kwargs["share"] = args.share |
| |
|
| | logging.basicConfig(level=logging.INFO, stream=sys.stderr) |
| | create_ui(launch_kwargs) |