Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import gradio as gr | |
import shutil | |
from PIL import Image | |
import tempfile | |
import json | |
import zipfile | |
# Set the directory where files will be saved | |
SAVE_DIR = os.path.abspath("saved_media") #make sure its the as media_folder Above | |
# Ensure the save directory exists | |
os.makedirs(SAVE_DIR, exist_ok=True) | |
# Define supported file extensions | |
SUPPORTED_EXTENSIONS = { | |
"image": [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"], | |
"audio": [".mp3", ".wav", ".ogg"], | |
"video": [".mp4", ".avi", ".mov", ".webm"] | |
} | |
def save_file(file): | |
if file is None: | |
return "No file uploaded.", gr.update() | |
try: | |
# Get the original filename and extension | |
original_filename = os.path.basename(file.name) | |
_, extension = os.path.splitext(original_filename) | |
# Check if the file extension is supported | |
if not any(extension.lower() in exts for exts in SUPPORTED_EXTENSIONS.values()): | |
return f"Unsupported file type: {extension}", gr.update() | |
# Create a unique filename to avoid overwriting | |
base_name = os.path.splitext(original_filename)[0] | |
counter = 1 | |
new_filename = f"{base_name}{extension}" | |
while os.path.exists(os.path.join(SAVE_DIR, new_filename)): | |
new_filename = f"{base_name}_{counter}{extension}" | |
counter += 1 | |
# Copy the file from the temporary location to our save directory | |
dest_path = os.path.join(SAVE_DIR, new_filename) | |
shutil.copy2(file.name, dest_path) | |
# Return success message and updated FileExplorer | |
return f"File saved as {SAVE_DIR}/{new_filename}", gr.update(value=SAVE_DIR), gr.update(value=None) | |
except Exception as e: | |
return f"Error saving file: {str(e)}", gr.update(value=SAVE_DIR), gr.update() | |
def view_file(file_path): | |
if not file_path: | |
return None, None, None, "No file selected." | |
try: | |
full_path = os.path.join(SAVE_DIR, file_path) | |
_, extension = os.path.splitext(full_path) | |
extension = extension.lower() | |
if extension in SUPPORTED_EXTENSIONS["image"]: | |
return Image.open(full_path), None, None, None | |
elif extension in SUPPORTED_EXTENSIONS["audio"]: | |
return None, full_path, None, None | |
elif extension in SUPPORTED_EXTENSIONS["video"]: | |
return None, None, full_path, None | |
else: | |
return None, None, None, f"Unsupported file type: {extension}" | |
except Exception as e: | |
return None, None, None, f"Error viewing file: {str(e)}" | |
def refresh_file_explorer(): | |
return gr.update() | |
def import_config_with_media(zip_path): | |
global SAVE_DIR | |
target_folder = SAVE_DIR | |
""" | |
Import the config JSON and media files from a zip file. | |
Extract only the media files that don't already exist in the target folder. | |
:param zip_path: Path to the zip file containing the config and media files | |
:param target_folder: Path to the folder where media files should be extracted | |
:return: Tuple containing the loaded config (as a dictionary) and a list of newly extracted files | |
""" | |
config = None | |
extracted_files = [] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
# Extract all files to a temporary directory | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
# Load the config | |
config_path = os.path.join(temp_dir, 'config.json') | |
if os.path.exists(config_path): | |
with open(config_path, 'r') as f: | |
config = json.load(f) | |
else: | |
raise FileNotFoundError("config.json not found in the zip file") | |
# Create the target folder if it doesn't exist | |
os.makedirs(target_folder, exist_ok=True) | |
# Copy media files that don't already exist in the target folder | |
for root, _, files in os.walk(temp_dir): | |
for file in files: | |
if file != 'config.json': | |
src_path = os.path.join(root, file) | |
dst_path = os.path.join(target_folder, file) | |
if not os.path.exists(dst_path): | |
shutil.copy2(src_path, dst_path) | |
extracted_files.append(file) | |
return config, extracted_files |