auto-gguf-quant / app.py
Kaileh57's picture
fix
77060e9
import os
import subprocess
import signal
import time
import json
from datetime import datetime
from pathlib import Path
import threading
import traceback
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False"
import gradio as gr
from huggingface_hub import HfApi, list_repo_files, hf_hub_download, login, whoami
from apscheduler.schedulers.background import BackgroundScheduler
# MODEL_REPO to monitor
SOURCE_MODEL_REPO = "Sculptor-AI/Ursa_Minor"
CONVERSION_SCRIPT = "./llama.cpp/convert-hf-to-gguf.py" # Updated script path
STATUS_FILE = "status.json"
# Quantization configurations in order of processing
QUANT_CONFIGS = [
{"type": "Q2_K", "size_gb": 0.8, "notes": ""},
{"type": "Q3_K_S", "size_gb": 0.9, "notes": ""},
{"type": "Q3_K_M", "size_gb": 0.9, "notes": "lower quality"},
{"type": "Q3_K_L", "size_gb": 1.0, "notes": ""},
{"type": "IQ4_XS", "size_gb": 1.0, "notes": ""},
{"type": "Q4_K_S", "size_gb": 1.0, "notes": "fast, recommended"},
{"type": "Q4_K_M", "size_gb": 1.1, "notes": "fast, recommended"},
{"type": "Q5_K_S", "size_gb": 1.2, "notes": ""},
{"type": "Q5_K_M", "size_gb": 1.2, "notes": ""},
{"type": "Q6_K", "size_gb": 1.4, "notes": "very good quality"},
{"type": "Q8_0", "size_gb": 1.7, "notes": "fast, best quality"},
{"type": "f16", "size_gb": 3.2, "notes": "16 bpw, overkill"}
]
# Global variables for process state
processing_lock = threading.Lock()
current_status = {
"status": "Not started",
"last_check": None,
"last_updated": None,
"last_commit_hash": None,
"current_quant": None,
"quant_status": {},
"progress": 0,
"error": None,
"log": []
}
def escape(s: str) -> str:
"""Escape HTML for logging"""
s = s.replace("&", "&")
s = s.replace("<", "&lt;")
s = s.replace(">", "&gt;")
s = s.replace('"', "&quot;")
s = s.replace("\n", "<br/>")
return s
def log_message(message: str, error: bool = False):
"""Add message to log with timestamp"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {message}"
print(log_entry)
current_status["log"].append(log_entry)
if error:
current_status["error"] = message
# Keep log size manageable
if len(current_status["log"]) > 100:
current_status["log"] = current_status["log"][-100:]
# Save current status to file
save_status()
def save_status():
"""Save current status to file"""
with open(STATUS_FILE, 'w') as f:
json.dump(current_status, f)
def load_status():
"""Load status from file if it exists"""
global current_status
if os.path.exists(STATUS_FILE):
try:
with open(STATUS_FILE, 'r') as f:
current_status = json.load(f)
except Exception as e:
log_message(f"Error loading status file: {str(e)}", error=True)
def generate_importance_matrix(model_path: str, train_data_path: str, output_path: str):
"""Generate importance matrix for a model"""
imatrix_command = [
"./llama.cpp/llama-imatrix",
"-m", model_path,
"-f", train_data_path,
"-ngl", "99",
"--output-frequency", "10",
"-o", output_path,
]
if not os.path.isfile(model_path):
raise Exception(f"Model file not found: {model_path}")
log_message(f"Running imatrix command for {model_path}...")
process = subprocess.Popen(imatrix_command, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
try:
# Monitor the process for output to provide updates
for line in process.stdout:
log_message(f"imatrix: {line.strip()}")
process.wait(timeout=3600) # 1 hour timeout
except subprocess.TimeoutExpired:
log_message("Imatrix computation timed out. Sending SIGINT to allow graceful termination...", error=True)
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=60) # 1 minute grace period
except subprocess.TimeoutExpired:
log_message("Imatrix process still didn't terminate. Forcefully terminating process...", error=True)
process.kill()
stderr = process.stderr.read()
if stderr:
log_message(f"Imatrix stderr: {stderr}")
log_message("Importance matrix generation completed.")
def get_last_commit(repo_id: str):
"""Get the last commit hash of a repository"""
try:
api = HfApi()
# Use the model_info function instead of commit_info
info = api.model_info(repo_id)
# Get the commit hash from the info
return info.sha
except Exception as e:
log_message(f"Error getting commit info: {str(e)}", error=True)
return None
def check_for_updates():
"""Check if the source model has been updated"""
if processing_lock.locked():
log_message("Already processing, skipping update check")
return False
current_status["status"] = "Checking for updates"
current_status["last_check"] = datetime.now().isoformat()
try:
# Get the latest commit hash
latest_commit = get_last_commit(SOURCE_MODEL_REPO)
if latest_commit is None:
current_status["status"] = "Error checking for updates"
return False
log_message(f"Latest commit hash: {latest_commit}")
log_message(f"Previous commit hash: {current_status.get('last_commit_hash')}")
if current_status.get("last_commit_hash") != latest_commit:
current_status["status"] = "Update detected"
current_status["last_commit_hash"] = latest_commit
save_status()
return True
else:
current_status["status"] = "Up to date"
save_status()
return False
except Exception as e:
log_message(f"Error checking for updates: {str(e)}", error=True)
current_status["status"] = "Error checking for updates"
save_status()
return False
def check_llama_cpp():
"""Check if llama.cpp is properly set up and build if needed"""
global CONVERSION_SCRIPT # Declare global at the beginning of the function
try:
if not os.path.exists("llama.cpp"):
log_message("llama.cpp directory not found, cloning repository...")
subprocess.run(["git", "clone", "https://github.com/ggerganov/llama.cpp"], check=True)
# Check for critical files
converter_path = os.path.join("llama.cpp", "convert-hf-to-gguf.py")
if not os.path.exists(converter_path):
# Try alternative path
old_converter_path = os.path.join("llama.cpp", "convert_hf_to_gguf.py")
if os.path.exists(old_converter_path):
log_message(f"Found converter at {old_converter_path}, using this path")
CONVERSION_SCRIPT = old_converter_path
else:
log_message("Converter script not found, listing files in llama.cpp...")
files = os.listdir("llama.cpp")
log_message(f"Files in llama.cpp: {files}")
# Search for any converter script
for file in files:
if file.startswith("convert") and file.endswith(".py"):
log_message(f"Found alternative converter: {file}")
CONVERSION_SCRIPT = os.path.join("llama.cpp", file)
break
# Build the tools
log_message("Building llama.cpp tools...")
os.chdir("llama.cpp")
# Check if build directory exists
if not os.path.exists("build"):
os.makedirs("build")
# Configure and build
subprocess.run(["cmake", "-B", "build", "-DBUILD_SHARED_LIBS=OFF"], check=True)
subprocess.run(["cmake", "--build", "build", "--config", "Release", "-j", "--target", "llama-quantize", "llama-gguf-split", "llama-imatrix"], check=True)
# Copy binaries
log_message("Copying built binaries...")
try:
# Different builds may put binaries in different places
if os.path.exists(os.path.join("build", "bin")):
for binary in ["llama-quantize", "llama-gguf-split", "llama-imatrix"]:
src = os.path.join("build", "bin", binary)
if os.path.exists(src):
subprocess.run(["cp", src, "./"], check=True)
else:
for binary in ["llama-quantize", "llama-gguf-split", "llama-imatrix"]:
src = os.path.join("build", binary)
if os.path.exists(src):
subprocess.run(["cp", src, "./"], check=True)
except Exception as e:
log_message(f"Error copying binaries: {str(e)}", error=True)
# Return to the original directory
os.chdir("..")
# Make sure we have the calibration data
if not os.path.exists(os.path.join("llama.cpp", "groups_merged.txt")):
log_message("Copying calibration data...")
if os.path.exists("groups_merged.txt"):
subprocess.run(["cp", "groups_merged.txt", "llama.cpp/"], check=True)
log_message("llama.cpp setup completed successfully")
return True
except Exception as e:
log_message(f"Error setting up llama.cpp: {str(e)}", error=True)
traceback.print_exc()
return False
def process_model():
"""Process the model to create all quantized versions"""
global CONVERSION_SCRIPT # Declare global at the beginning of the function
if processing_lock.locked():
log_message("Already processing, cannot start another process")
return
with processing_lock:
try:
# Check llama.cpp is set up
if not check_llama_cpp():
log_message("Failed to set up llama.cpp, aborting", error=True)
current_status["status"] = "Error setting up llama.cpp"
save_status()
return
# Validate authentication
try:
user_info = whoami()
log_message(f"Processing as user: {user_info['name']}")
except Exception as e:
log_message(f"Authentication error: {str(e)}. Please make sure you're logged in.", error=True)
current_status["status"] = "Authentication error"
save_status()
return
api = HfApi()
model_name = SOURCE_MODEL_REPO.split('/')[-1]
current_status["status"] = "Processing"
current_status["progress"] = 0
save_status()
# Prepare directories
if not os.path.exists("downloads"):
os.makedirs("downloads")
if not os.path.exists("outputs"):
os.makedirs("outputs")
log_message(f"Starting model processing for {SOURCE_MODEL_REPO}")
# Create temp directories for processing
with Path("outputs").resolve() as outdir:
log_message(f"Output directory: {outdir}")
# Download the model
log_message(f"Downloading model from {SOURCE_MODEL_REPO}")
try:
local_dir = Path("downloads") / model_name
log_message(f"Local directory: {local_dir}")
# Check and download pattern
dl_pattern = ["*.md", "*.json", "*.model"]
try:
files = list_repo_files(SOURCE_MODEL_REPO)
has_safetensors = any(file.endswith(".safetensors") for file in files)
pattern = "*.safetensors" if has_safetensors else "*.bin"
dl_pattern.append(pattern)
log_message(f"Using download pattern: {dl_pattern}")
except Exception as e:
log_message(f"Error checking repo files: {str(e)}", error=True)
dl_pattern.append("*.safetensors")
dl_pattern.append("*.bin")
# Download the model
api.snapshot_download(
repo_id=SOURCE_MODEL_REPO,
local_dir=local_dir,
local_dir_use_symlinks=False,
allow_patterns=dl_pattern
)
log_message("Model downloaded successfully!")
# Check for adapter config - if it's a LoRA adapter, this won't work
config_dir = local_dir / "config.json"
adapter_config_dir = local_dir / "adapter_config.json"
if os.path.exists(adapter_config_dir) and not os.path.exists(config_dir):
raise Exception('adapter_config.json is present. If you are converting a LoRA adapter to GGUF, please use a different tool.')
# Convert to FP16 first
fp16_path = str(outdir / f"{model_name}.fp16.gguf")
log_message(f"Converting model to FP16: {fp16_path}")
# Check if the converter script exists
if not os.path.exists(CONVERSION_SCRIPT):
log_message(f"Converter script not found at {CONVERSION_SCRIPT}, searching for alternatives", error=True)
for root, dirs, files in os.walk("llama.cpp"):
for file in files:
if file.startswith("convert") and file.endswith(".py"):
CONVERSION_SCRIPT = os.path.join(root, file)
log_message(f"Found converter at {CONVERSION_SCRIPT}")
break
log_message(f"Using converter script: {CONVERSION_SCRIPT}")
result = subprocess.run([
"python", CONVERSION_SCRIPT, str(local_dir), "--outtype", "f16", "--outfile", fp16_path
], shell=False, capture_output=True, text=True)
if result.returncode != 0:
log_message(f"Converter stderr: {result.stderr}")
log_message(f"Converter stdout: {result.stdout}")
raise Exception(f"Error converting to fp16: {result.stderr}")
log_message("Model converted to fp16 successfully!")
# Generate importance matrix for IQ quantizations
imatrix_path = str(outdir / "imatrix.dat")
train_data_path = "llama.cpp/groups_merged.txt" # Default calibration dataset
if not os.path.isfile(train_data_path):
log_message(f"Warning: Training data file not found at {train_data_path}, searching alternatives...")
# Try to find it elsewhere
if os.path.exists("groups_merged.txt"):
train_data_path = "groups_merged.txt"
log_message(f"Found training data at {train_data_path}")
else:
log_message("Calibration data not found. Some quantizations may not work.", error=True)
try:
if os.path.isfile(train_data_path):
generate_importance_matrix(fp16_path, train_data_path, imatrix_path)
else:
imatrix_path = None
except Exception as e:
log_message(f"Error generating importance matrix: {str(e)}", error=True)
imatrix_path = None
# Process each quantization type
total_quants = len(QUANT_CONFIGS)
for i, quant_config in enumerate(QUANT_CONFIGS):
quant_type = quant_config["type"]
current_status["current_quant"] = quant_type
current_status["progress"] = int((i / total_quants) * 100)
save_status()
log_message(f"Processing quantization {i+1}/{total_quants}: {quant_type}")
try:
# Check if this is an IQ quantization
is_iq_quant = quant_type.startswith("IQ")
# Skip if we don't have imatrix and this is an IQ quant
if is_iq_quant and (imatrix_path is None or not os.path.exists(imatrix_path)):
log_message(f"Skipping {quant_type} as importance matrix is not available", error=True)
current_status["quant_status"][quant_type] = "Skipped - No imatrix"
continue
# Set up the repo name
username = user_info["name"]
repo_name = f"{model_name}-{quant_type}-GGUF"
repo_id = f"{username}/{repo_name}"
# Set up output path
quant_file_name = f"{model_name.lower()}-{quant_type.lower()}.gguf"
if is_iq_quant and quant_type != "f16":
quant_file_name = f"{model_name.lower()}-{quant_type.lower()}-imat.gguf"
quant_file_path = str(outdir / quant_file_name)
# Run quantization
if is_iq_quant and quant_type != "f16":
quantize_cmd = [
"./llama.cpp/llama-quantize",
"--imatrix", imatrix_path, fp16_path, quant_file_path, quant_type
]
else:
quantize_cmd = [
"./llama.cpp/llama-quantize",
fp16_path, quant_file_path, quant_type
]
log_message(f"Running quantization command: {' '.join(quantize_cmd)}")
result = subprocess.run(quantize_cmd, shell=False, capture_output=True, text=True)
if result.returncode != 0:
if "out of memory" in result.stderr.lower():
log_message(f"Out of memory error quantizing {quant_type}. Skipping larger models.", error=True)
current_status["quant_status"][quant_type] = "Failed - Out of memory"
# Break the loop to skip larger models
break
else:
raise Exception(f"Error quantizing {quant_type}: {result.stderr}")
log_message(f"Quantized successfully with {quant_type}!")
# Create the repo if it doesn't exist
log_message(f"Creating/updating repo {repo_id}")
try:
repo_url = api.create_repo(repo_id=repo_id, exist_ok=True)
log_message(f"Repo URL: {repo_url}")
except Exception as e:
log_message(f"Error creating repo: {str(e)}", error=True)
current_status["quant_status"][quant_type] = "Failed - Repo creation error"
continue
# Create README with model info
log_message("Creating README")
readme_content = f"""# {repo_name}
This model was converted to GGUF format from [`{SOURCE_MODEL_REPO}`](https://huggingface.co/{SOURCE_MODEL_REPO}) using llama.cpp.
## Quantization: {quant_type}
Approximate size: {quant_config['size_gb']} GB
Notes: {quant_config['notes']}
## Use with llama.cpp
Install llama.cpp through brew (works on Mac and Linux)
```bash
brew install llama.cpp
```
Invoke the llama.cpp server or the CLI.
### CLI:
```bash
llama-cli --hf-repo {repo_id} --hf-file {quant_file_name} -p "The meaning to life and the universe is"
```
### Server:
```bash
llama-server --hf-repo {repo_id} --hf-file {quant_file_name} -c 2048
```
Note: You can also use this checkpoint directly through the [usage steps](https://github.com/ggerganov/llama.cpp?tab=readme-ov-file#usage) listed in the Llama.cpp repo as well.
Step 1: Clone llama.cpp from GitHub.
```
git clone https://github.com/ggerganov/llama.cpp
```
Step 2: Move into the llama.cpp folder and build it with `LLAMA_CURL=1` flag along with other hardware-specific flags (for ex: LLAMA_CUDA=1 for Nvidia GPUs on Linux).
```
cd llama.cpp && LLAMA_CURL=1 make
```
Step 3: Run inference through the main binary.
```
./llama-cli --hf-repo {repo_id} --hf-file {quant_file_name} -p "The meaning to life and the universe is"
```
or
```
./llama-server --hf-repo {repo_id} --hf-file {quant_file_name} -c 2048
```
## Auto-generated
This model version was automatically generated when updates were detected in the source repository.
Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
readme_path = outdir / "README.md"
with open(readme_path, 'w') as f:
f.write(readme_content)
# Upload the quantized model and README
log_message(f"Uploading quantized model: {quant_file_path}")
try:
api.upload_file(
path_or_fileobj=quant_file_path,
path_in_repo=quant_file_name,
repo_id=repo_id,
)
api.upload_file(
path_or_fileobj=str(readme_path),
path_in_repo="README.md",
repo_id=repo_id,
)
if os.path.isfile(imatrix_path) and is_iq_quant:
log_message(f"Uploading imatrix.dat")
api.upload_file(
path_or_fileobj=imatrix_path,
path_in_repo="imatrix.dat",
repo_id=repo_id,
)
log_message(f"Successfully uploaded {quant_type} quantization!")
current_status["quant_status"][quant_type] = "Success"
except Exception as e:
log_message(f"Error uploading files: {str(e)}", error=True)
current_status["quant_status"][quant_type] = f"Failed - Upload error: {str(e)}"
except Exception as e:
log_message(f"Error processing {quant_type}: {str(e)}", error=True)
current_status["quant_status"][quant_type] = f"Failed: {str(e)}"
# Continue with the next quantization
# Update status after completion
current_status["status"] = "Completed"
current_status["progress"] = 100
current_status["last_updated"] = datetime.now().isoformat()
log_message("Model processing completed!")
except Exception as e:
log_message(f"Error during model processing: {str(e)}", error=True)
current_status["status"] = "Error"
current_status["error"] = str(e)
traceback.print_exc()
except Exception as e:
log_message(f"Error: {str(e)}", error=True)
current_status["status"] = "Error"
current_status["error"] = str(e)
traceback.print_exc()
finally:
save_status()
def check_and_process():
"""Check for updates and process if needed"""
log_message("Running scheduled check for updates")
if check_for_updates():
log_message("Updates detected, starting processing")
threading.Thread(target=process_model).start()
else:
log_message("No updates detected")
def create_ui():
"""Create the Gradio interface"""
with gr.Blocks(css="body { margin: 0; padding: 0; }") as demo:
gr.Markdown("# 🦙 Automatic GGUF Quantization for Ursa_Minor")
gr.Markdown(f"This space automatically creates quantized GGUF versions of the [Sculptor-AI/Ursa_Minor](https://huggingface.co/{SOURCE_MODEL_REPO}) model whenever it's updated.")
with gr.Row():
with gr.Column(scale=2):
status_info = gr.HTML(label="Status", value="<p>Loading status...</p>")
with gr.Column(scale=1):
with gr.Row():
check_button = gr.Button("Check for Updates", variant="primary")
process_button = gr.Button("Force Processing", variant="secondary")
# Remove the 'label' parameter since it's not supported
progress_bar = gr.Progress()
with gr.Tab("Quantization Status"):
quant_status = gr.DataFrame(
headers=["Type", "Size (GB)", "Notes", "Status"],
value=lambda: [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS],
label="Quantization Status"
)
with gr.Tab("Logs"):
logs = gr.HTML(label="Logs", value="<p>Loading logs...</p>")
def update_status():
"""Update the status display"""
status_html = f"""
<div style="border: 1px solid #ddd; padding: 15px; border-radius: 5px;">
<h3>Current Status: <span style="color: {'green' if current_status['status'] == 'Up to date' else 'blue' if current_status['status'] == 'Processing' else 'red' if 'Error' in current_status['status'] else 'orange'}">{current_status['status']}</span></h3>
<p><strong>Last Checked:</strong> {current_status.get('last_check', 'Never').replace('T', ' ').split('.')[0] if current_status.get('last_check') else 'Never'}</p>
<p><strong>Last Updated:</strong> {current_status.get('last_updated', 'Never').replace('T', ' ').split('.')[0] if current_status.get('last_updated') else 'Never'}</p>
<p><strong>Current Quantization:</strong> {current_status.get('current_quant', 'None')}</p>
{f'<p style="color: red;"><strong>Error:</strong> {current_status["error"]}</p>' if current_status.get('error') else ''}
</div>
"""
return status_html
def update_logs():
"""Update the logs display"""
logs_html = "<div style='height: 400px; overflow-y: auto; background-color: #f9f9f9; padding: 10px; font-family: monospace; white-space: pre-wrap;'>"
for log in current_status["log"]:
if "Error" in log or "error" in log:
logs_html += f"<div style='color: red;'>{log}</div>"
else:
logs_html += f"<div>{log}</div>"
logs_html += "</div>"
return logs_html
def on_check_button():
"""Handle check button click"""
if check_for_updates():
threading.Thread(target=process_model).start()
return update_status(), [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS], update_logs()
def on_process_button():
"""Handle process button click"""
threading.Thread(target=process_model).start()
return update_status(), [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS], update_logs()
check_button.click(on_check_button, outputs=[status_info, quant_status, logs])
process_button.click(on_process_button, outputs=[status_info, quant_status, logs])
# Set up periodic refresh
demo.load(update_status, outputs=[status_info])
demo.load(lambda: [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS], outputs=[quant_status])
demo.load(update_logs, outputs=[logs])
refresh_interval = 5 # seconds
gr.HTML("<script>setInterval(function(){ Array.from(document.querySelectorAll('button[id*=Refresh-Button]')).forEach(b => b.click()); }, " + str(refresh_interval * 1000) + ");</script>")
return demo
# Initialize
def initialize():
"""Initialize the application"""
global QUANT_CONFIGS
# Sort configurations by size (smallest first)
QUANT_CONFIGS = sorted(QUANT_CONFIGS, key=lambda x: x["size_gb"])
# Rest of the initialization code...
load_status()
check_llama_cpp()
scheduler = BackgroundScheduler()
scheduler.add_job(check_and_process, 'interval', minutes=60)
threading.Thread(target=check_and_process).start()
if __name__ == "__main__":
initialize()
demo = create_ui()
# Fixed queue parameter
demo.queue(max_size=1).launch()