Spaces:
Sleeping
Sleeping
import os | |
import subprocess | |
import signal | |
import time | |
import json | |
from datetime import datetime | |
import threading | |
import logging | |
import gradio as gr | |
from huggingface_hub import HfApi, login, whoami | |
from pathlib import Path | |
import shutil | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Constants | |
SOURCE_REPO = "Sculptor-AI/Ursa_Minor" | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
CONVERSION_SCRIPT = "./llama.cpp/convert.py" | |
MODEL_CACHE_DIR = "model_cache" | |
TEMP_DIR = "temp_outputs" | |
# Restored full quantization set, sorted from smallest to largest | |
QUANT_CONFIGS = [ | |
{"name": "Q2_K", "size_gb": 0.8, "notes": "smallest size"}, | |
{"name": "Q3_K_S", "size_gb": 0.9, "notes": "small size"}, | |
{"name": "Q3_K_M", "size_gb": 0.9, "notes": "lower quality"}, | |
{"name": "Q3_K_L", "size_gb": 1.0, "notes": ""}, | |
{"name": "IQ4_XS", "size_gb": 1.0, "notes": ""}, | |
{"name": "Q4_K_S", "size_gb": 1.0, "notes": "fast, recommended"}, | |
{"name": "Q4_K_M", "size_gb": 1.1, "notes": "fast, recommended"}, | |
{"name": "Q5_K_S", "size_gb": 1.2, "notes": "good balance"}, | |
{"name": "Q5_K_M", "size_gb": 1.2, "notes": ""}, | |
{"name": "Q6_K", "size_gb": 1.4, "notes": "very good quality"}, | |
{"name": "Q8_0", "size_gb": 1.7, "notes": "fast, best quality"}, | |
{"name": "f16", "size_gb": 3.2, "notes": "16 bpw, full precision"} | |
] | |
# State variables | |
state = { | |
"last_checked": None, | |
"last_commit_hash": None, | |
"is_up_to_date": True, | |
"is_processing": False, | |
"current_quant": None, | |
"progress": 0, | |
"total_quants": len(QUANT_CONFIGS), | |
"completed_quants": [], | |
"failed_quants": [], | |
"out_of_memory": False, | |
"last_error": None, | |
"status_message": "Ready to check for updates" | |
} | |
# Initialize HF API | |
hf_api = HfApi(token=HF_TOKEN) | |
# Set up llama.cpp tools on first run | |
if not os.path.exists("./llama.cpp/convert.py"): | |
try: | |
logger.info("Setting up llama.cpp tools...") | |
subprocess.run(["bash", "setup.sh"], check=True) | |
logger.info("Setup completed successfully") | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Error setting up llama.cpp tools: {e}") | |
raise | |
# Helper functions | |
def save_state(): | |
with open("state.json", "w") as f: | |
# Create a serializable copy of the state | |
serializable_state = state.copy() | |
serializable_state["last_checked"] = str(serializable_state["last_checked"]) if serializable_state["last_checked"] else None | |
json.dump(serializable_state, f) | |
def load_state(): | |
global state | |
try: | |
if os.path.exists("state.json"): | |
with open("state.json", "r") as f: | |
loaded_state = json.load(f) | |
# Convert string back to datetime if it exists | |
if loaded_state.get("last_checked"): | |
loaded_state["last_checked"] = datetime.fromisoformat(loaded_state["last_checked"]) | |
state.update(loaded_state) | |
except Exception as e: | |
logger.error(f"Error loading state: {e}") | |
def get_latest_commit(): | |
try: | |
repo_info = hf_api.repo_info(repo_id=SOURCE_REPO) | |
return repo_info.sha | |
except Exception as e: | |
logger.error(f"Error getting latest commit: {e}") | |
return None | |
def check_for_updates(): | |
global state | |
state["last_checked"] = datetime.now() | |
latest_commit = get_latest_commit() | |
if latest_commit and latest_commit != state["last_commit_hash"]: | |
logger.info(f"New commit detected: {latest_commit}") | |
state["last_commit_hash"] = latest_commit | |
state["is_up_to_date"] = False | |
state["status_message"] = f"Updates detected in {SOURCE_REPO}. Ready to generate quantizations." | |
else: | |
state["is_up_to_date"] = True | |
state["status_message"] = f"No updates detected in {SOURCE_REPO}. Last checked: {state['last_checked'].strftime('%Y-%m-%d %H:%M:%S')}" | |
save_state() | |
return state["status_message"] | |
def download_model(): | |
try: | |
# Create cache directory if it doesn't exist | |
os.makedirs(MODEL_CACHE_DIR, exist_ok=True) | |
# Clean up any previous downloads to save space | |
if os.path.exists(os.path.join(MODEL_CACHE_DIR, os.path.basename(SOURCE_REPO))): | |
shutil.rmtree(os.path.join(MODEL_CACHE_DIR, os.path.basename(SOURCE_REPO))) | |
# Get model repo information to find the smallest safetensors file | |
logger.info(f"Getting repository information for {SOURCE_REPO}") | |
files = hf_api.list_repo_files(repo_id=SOURCE_REPO) | |
# Filter for safetensors files (which are the model weights) | |
safetensors_files = [f for f in files if f.endswith(".safetensors")] | |
if not safetensors_files: | |
raise Exception(f"No safetensors files found in {SOURCE_REPO}") | |
# Download only required files instead of the entire repo to save space | |
# This includes model config and one weights file | |
required_files = [ | |
"config.json", | |
"tokenizer.json", | |
"tokenizer_config.json", | |
safetensors_files[0] # Just take the first weights file | |
] | |
# Create the model directory | |
model_dir = os.path.join(MODEL_CACHE_DIR, os.path.basename(SOURCE_REPO)) | |
os.makedirs(model_dir, exist_ok=True) | |
# Download only the required files | |
for file in required_files: | |
if file in files: | |
logger.info(f"Downloading {file}") | |
hf_api.hf_hub_download( | |
repo_id=SOURCE_REPO, | |
filename=file, | |
local_dir=model_dir, | |
token=HF_TOKEN | |
) | |
return model_dir | |
except Exception as e: | |
logger.error(f"Error downloading model: {e}") | |
state["last_error"] = str(e) | |
return None | |
def process_quantization(): | |
global state | |
if state["is_processing"]: | |
return "Already processing quantizations. Please wait." | |
state["is_processing"] = True | |
state["progress"] = 0 | |
state["completed_quants"] = [] | |
state["failed_quants"] = [] | |
state["out_of_memory"] = False | |
state["last_error"] = None | |
state["status_message"] = "Starting quantization process..." | |
# Start the processing in a separate thread | |
thread = threading.Thread(target=quantization_worker) | |
thread.daemon = True | |
thread.start() | |
return "Quantization process started. Please wait for it to complete." | |
def quantization_worker(): | |
global state | |
try: | |
# Download the model | |
model_path = download_model() | |
if not model_path: | |
state["is_processing"] = False | |
state["status_message"] = "Failed to download model. Check logs for details." | |
return | |
# Create temporary output directory | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
# Get model name from the source repo | |
model_name = os.path.basename(SOURCE_REPO).lower() | |
# Process each quantization configuration - we'll do one at a time to save memory | |
total_quants = len(QUANT_CONFIGS) | |
for i, quant_config in enumerate(QUANT_CONFIGS): | |
if state["out_of_memory"]: | |
# Skip further processing if we've hit memory limits | |
break | |
quant_name = quant_config["name"] | |
state["current_quant"] = quant_name | |
state["progress"] = (i / total_quants) * 100 | |
state["status_message"] = f"Processing {quant_name} quantization ({i+1}/{total_quants})" | |
logger.info(f"Processing quantization: {quant_name}") | |
try: | |
# Free up memory between quantizations - this is crucial for the free tier | |
if i > 0: | |
# Clean up previous files | |
for file in os.listdir(TEMP_DIR): | |
file_path = os.path.join(TEMP_DIR, file) | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
# Output path for this quantization | |
quant_output_path = os.path.join(TEMP_DIR, f"{model_name}-{quant_name.lower()}.gguf") | |
# Check available disk space before starting | |
try: | |
statvfs = os.statvfs(TEMP_DIR) | |
free_space_gb = (statvfs.f_frsize * statvfs.f_bavail) / (1024 * 1024 * 1024) | |
logger.info(f"Available disk space: {free_space_gb:.2f} GB") | |
# Skip if we don't have enough disk space | |
if free_space_gb < quant_config["size_gb"] * 1.5: # 50% buffer | |
logger.warning(f"Not enough disk space for {quant_name} quantization. Need {quant_config['size_gb'] * 1.5:.2f} GB, have {free_space_gb:.2f} GB") | |
state["failed_quants"].append(f"{quant_name} (disk space)") | |
continue | |
except Exception as e: | |
logger.warning(f"Could not check disk space: {e}") | |
# Run the conversion+quantization in one step to save memory | |
# We'll use direct conversion to the target quantization format | |
logger.info(f"Converting and quantizing directly to {quant_name}") | |
# Command to convert and quantize in one step | |
quantize_cmd = [ | |
"python", | |
"./llama.cpp/convert.py", | |
model_path, | |
"--outfile", quant_output_path, | |
"--outtype", quant_name.lower() | |
] | |
# Create a process for monitoring memory usage | |
quantize_process = subprocess.Popen( | |
quantize_cmd, | |
shell=False, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True | |
) | |
# Poll the process and monitor system resources | |
while quantize_process.poll() is None: | |
# Check if we're getting low on memory | |
try: | |
with open('/proc/meminfo', 'r') as f: | |
meminfo = f.read() | |
# Extract available memory | |
available_mem = 0 | |
for line in meminfo.split('\n'): | |
if 'MemAvailable:' in line: | |
available_mem = int(line.split()[1]) / 1024 # Convert to MB | |
break | |
# If memory is critically low (less than 500MB), kill the process | |
if available_mem < 500: | |
logger.warning(f"Memory critically low ({available_mem:.2f} MB). Terminating quantization.") | |
quantize_process.terminate() | |
state["out_of_memory"] = True | |
state["failed_quants"].append(f"{quant_name} (OOM)") | |
break | |
except Exception as e: | |
logger.warning(f"Could not check memory usage: {e}") | |
# Wait a bit before checking again | |
time.sleep(5) | |
# Check if the process completed successfully | |
if quantize_process.poll() is None: | |
# Process is still running, kill it | |
quantize_process.terminate() | |
try: | |
quantize_process.wait(timeout=10) | |
except subprocess.TimeoutExpired: | |
quantize_process.kill() | |
raise Exception("Quantization process timed out or was terminated") | |
# Get process output | |
stdout, stderr = quantize_process.communicate() | |
if quantize_process.returncode != 0: | |
if "out of memory" in stderr.lower() or "allocation failed" in stderr.lower() or "not enough memory" in stderr.lower(): | |
logger.warning(f"Out of memory during {quant_name} quantization") | |
state["out_of_memory"] = True | |
state["failed_quants"].append(f"{quant_name} (OOM)") | |
continue | |
else: | |
raise Exception(f"Error during {quant_name} quantization: {stderr}") | |
# Check if the file was created and has reasonable size | |
if not os.path.exists(quant_output_path) or os.path.getsize(quant_output_path) < 1000000: | |
raise Exception(f"Quantization produced invalid or empty file") | |
# Create or update repository | |
repo_name = f"{model_name}-{quant_name.lower()}-gguf" | |
username = hf_api.whoami()["name"] | |
repo_id = f"{username}/{repo_name}" | |
try: | |
# Check if repo exists | |
hf_api.repo_info(repo_id=repo_id) | |
logger.info(f"Repository {repo_id} already exists") | |
except Exception: | |
# Create repo if it doesn't exist | |
logger.info(f"Creating repository {repo_id}") | |
hf_api.create_repo(repo_id=repo_id, exist_ok=True) | |
# Upload quantized model | |
logger.info(f"Uploading quantized model to {repo_id}") | |
# Create a simple README first (it's smaller) | |
readme_content = f"""# {model_name.capitalize()} - {quant_name} GGUF | |
This repository contains a {quant_name} quantized GGUF version of [{SOURCE_REPO}](https://huggingface.co/{SOURCE_REPO}). | |
## Details | |
- **Quantization Type:** {quant_name} | |
- **Approximate Size:** {quant_config['size_gb']} GB | |
- **Notes:** {quant_config['notes']} | |
- **Original Model:** [Sculptor-AI/Ursa_Minor](https://huggingface.co/{SOURCE_REPO}) | |
- **Auto-generated by:** GGUF Quantizer Space | |
## Usage with llama.cpp | |
```bash | |
# CLI | |
llama-cli --hf-repo {repo_id} --hf-file {model_name}-{quant_name.lower()}.gguf -p "Your prompt here" | |
# Server | |
llama-server --hf-repo {repo_id} --hf-file {model_name}-{quant_name.lower()}.gguf -c 2048 | |
``` | |
""" | |
readme_path = os.path.join(TEMP_DIR, "README.md") | |
with open(readme_path, "w") as f: | |
f.write(readme_content) | |
# Upload README first (it's smaller) | |
hf_api.upload_file( | |
path_or_fileobj=readme_path, | |
path_in_repo="README.md", | |
repo_id=repo_id | |
) | |
# Then upload the model with LFS - this might take a while | |
try: | |
upload_start_time = time.time() | |
max_upload_time = 60 * 60 # 1 hour max upload time | |
# Create a thread to monitor the upload | |
upload_success = [False] | |
upload_error = [None] | |
upload_done = [False] | |
def upload_file_with_timeout(): | |
try: | |
hf_api.upload_file( | |
path_or_fileobj=quant_output_path, | |
path_in_repo=f"{model_name}-{quant_name.lower()}.gguf", | |
repo_id=repo_id | |
) | |
upload_success[0] = True | |
except Exception as e: | |
upload_error[0] = e | |
finally: | |
upload_done[0] = True | |
upload_thread = threading.Thread(target=upload_file_with_timeout) | |
upload_thread.daemon = True | |
upload_thread.start() | |
# Wait for upload to complete or timeout | |
while not upload_done[0]: | |
if time.time() - upload_start_time > max_upload_time: | |
logger.warning(f"Upload timed out after {max_upload_time/60:.1f} minutes") | |
break | |
time.sleep(10) | |
if upload_success[0]: | |
state["completed_quants"].append(quant_name) | |
logger.info(f"Successfully processed {quant_name} quantization") | |
else: | |
error_msg = str(upload_error[0]) if upload_error[0] else "Upload timed out" | |
logger.error(f"Failed to upload quantized model: {error_msg}") | |
state["failed_quants"].append(f"{quant_name} (upload failed)") | |
state["last_error"] = error_msg | |
except Exception as upload_error: | |
logger.error(f"Failed to upload quantized model: {upload_error}") | |
state["failed_quants"].append(f"{quant_name} (upload failed)") | |
state["last_error"] = str(upload_error) | |
# Delete the large file immediately after upload to save space | |
try: | |
os.remove(quant_output_path) | |
except Exception as rm_error: | |
logger.warning(f"Could not remove temporary file: {rm_error}") | |
except subprocess.TimeoutExpired as timeout_error: | |
logger.error(f"Timeout during {quant_name} quantization: {timeout_error}") | |
state["failed_quants"].append(f"{quant_name} (timeout)") | |
state["last_error"] = f"Quantization timed out after 30 minutes" | |
except Exception as e: | |
logger.error(f"Error processing {quant_name} quantization: {e}") | |
state["failed_quants"].append(quant_name) | |
state["last_error"] = str(e) | |
# Final cleanup | |
try: | |
shutil.rmtree(TEMP_DIR) | |
except Exception as e: | |
logger.warning(f"Error cleaning up temporary files: {e}") | |
# Clean up model cache to save space | |
try: | |
shutil.rmtree(MODEL_CACHE_DIR) | |
except Exception as e: | |
logger.warning(f"Error cleaning up model cache: {e}") | |
state["progress"] = 100 | |
state["is_up_to_date"] = True | |
state["is_processing"] = False | |
if state["out_of_memory"]: | |
last_successful = state["completed_quants"][-1] if state["completed_quants"] else "None" | |
state["status_message"] = f"Quantization process stopped due to memory limitations after {last_successful}. Smaller quantizations completed successfully." | |
elif state["failed_quants"]: | |
state["status_message"] = f"Quantization process completed with some failures. {len(state['completed_quants'])}/{total_quants} quantizations were successful." | |
else: | |
state["status_message"] = f"Quantization process completed successfully. All {len(state['completed_quants'])}/{total_quants} quantizations were created." | |
except Exception as e: | |
logger.error(f"Error in quantization worker: {e}") | |
state["is_processing"] = False | |
state["last_error"] = str(e) | |
state["status_message"] = f"Error during quantization process: {str(e)}" | |
save_state() | |
# Create Gradio interface | |
def create_interface(): | |
with gr.Blocks(title="Ursa_Minor GGUF Quantizer", css="footer {visibility: hidden}") as demo: | |
with gr.Row(): | |
gr.Markdown("# Ursa_Minor GGUF Auto Quantizer") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
status_md = gr.Markdown(value=f"### Status: {state['status_message']}") | |
with gr.Row(): | |
check_button = gr.Button("Check for Updates", variant="primary") | |
process_button = gr.Button("Generate Quantizations", variant="secondary") | |
with gr.Row(): | |
last_check = gr.Markdown(value=f"Last Checked: {state['last_checked'].strftime('%Y-%m-%d %H:%M:%S') if state['last_checked'] else 'Never'}") | |
with gr.Row(): | |
up_to_date = gr.Markdown(value=f"Up to Date: {'Yes' if state['is_up_to_date'] else 'No'}") | |
with gr.Accordion("Details", open=True): | |
with gr.Row(): | |
progress = gr.Slider( | |
minimum=0, | |
maximum=100, | |
value=state["progress"], | |
label="Progress", | |
interactive=False | |
) | |
current_task = gr.Markdown(value="") | |
with gr.Row(): | |
completed_md = gr.Markdown(value="### Completed Quantizations") | |
completed_list = gr.Markdown(value="None") | |
with gr.Row(): | |
failed_md = gr.Markdown(value="### Failed Quantizations") | |
failed_list = gr.Markdown(value="None") | |
with gr.Row(): | |
error_md = gr.Markdown(value="### Last Error") | |
error_text = gr.Markdown(value="None") | |
with gr.Column(scale=1): | |
gr.Markdown("### Quantization Types") | |
quant_table = gr.DataFrame( | |
value=[[q["name"], f"{q['size_gb']} GB", q["notes"]] for q in QUANT_CONFIGS], | |
headers=["Type", "Size", "Notes"], | |
interactive=False | |
) | |
# Functions to update the UI | |
def update_status(): | |
# Simply update the text components without changing button properties | |
status_text = f"### Status: {state['status_message']}" | |
last_check_text = f"Last Checked: {state['last_checked'].strftime('%Y-%m-%d %H:%M:%S') if state['last_checked'] else 'Never'}" | |
up_to_date_text = f"Up to Date: {'Yes' if state['is_up_to_date'] else 'No'}" | |
current_task_text = "" | |
if state["is_processing"]: | |
current_quant = state["current_quant"] or "Preparing" | |
current_task_text = f"Current Task: Processing {current_quant} quantization" | |
completed_text = "None" | |
if state["completed_quants"]: | |
completed_items = [] | |
for q in state["completed_quants"]: | |
model_name = os.path.basename(SOURCE_REPO).lower() | |
username = hf_api.whoami()["name"] | |
repo_id = f"{username}/{model_name}-{q.lower()}-gguf" | |
completed_items.append(f"- [{q}](https://huggingface.co/{repo_id})") | |
completed_text = "\n".join(completed_items) | |
failed_text = "None" | |
if state["failed_quants"]: | |
failed_items = [] | |
for q in state["failed_quants"]: | |
if "(" in q: # Check if it has a reason in parentheses | |
name, reason = q.split(" (", 1) | |
reason = reason.rstrip(")") | |
failed_items.append(f"- {name} (Reason: {reason})") | |
else: | |
failed_items.append(f"- {q}") | |
failed_text = "\n".join(failed_items) | |
error_text = "None" | |
if state["last_error"]: | |
error_text = f"```\n{state['last_error']}\n```" | |
return [ | |
status_text, | |
last_check_text, | |
up_to_date_text, | |
state["progress"], | |
current_task_text, | |
completed_text, | |
failed_text, | |
error_text | |
] | |
# Register event handlers | |
check_button.click( | |
fn=lambda: check_for_updates(), | |
outputs=[status_md] | |
).then( | |
fn=update_status, | |
outputs=[ | |
status_md, | |
last_check, | |
up_to_date, | |
progress, | |
current_task, | |
completed_list, | |
failed_list, | |
error_text | |
] | |
) | |
process_button.click( | |
fn=lambda: process_quantization(), | |
outputs=[status_md] | |
).then( | |
fn=update_status, | |
outputs=[ | |
status_md, | |
last_check, | |
up_to_date, | |
progress, | |
current_task, | |
completed_list, | |
failed_list, | |
error_text | |
] | |
) | |
# Add an interval for updating the UI during processing | |
demo.load( | |
fn=update_status, | |
outputs=[ | |
status_md, | |
last_check, | |
up_to_date, | |
progress, | |
current_task, | |
completed_list, | |
failed_list, | |
error_text | |
] | |
) | |
# Schedule periodic checks for updates - but less frequently for free tier | |
def scheduled_check(): | |
while True: | |
try: | |
if not state["is_processing"]: | |
check_for_updates() | |
except Exception as e: | |
logger.error(f"Error in scheduled check: {e}") | |
# Check less frequently to avoid waking up the space too often | |
time.sleep(14400) # Check every 4 hours instead of hourly | |
# Only start the scheduler thread if we're not in a debugging environment | |
if not os.environ.get("GRADIO_DEBUG"): | |
scheduler_thread = threading.Thread(target=scheduled_check) | |
scheduler_thread.daemon = True | |
scheduler_thread.start() | |
logger.info("Started background update checker") | |
return demo | |
# Initialize state from disk | |
load_state() | |
# Create and launch the interface | |
demo = create_interface() | |
demo.queue(max_size=10).launch(debug=True, show_api=False) | |