auto-gguf-quant / app.py
Kaileh57's picture
fix
e744ef3
raw
history blame
30.8 kB
import os
import subprocess
import signal
import time
import json
from datetime import datetime
from pathlib import Path
import threading
import traceback
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False"
import gradio as gr
from huggingface_hub import HfApi, list_repo_files, hf_hub_download, login, whoami
from apscheduler.schedulers.background import BackgroundScheduler
# MODEL_REPO to monitor
SOURCE_MODEL_REPO = "Sculptor-AI/Ursa_Minor"
CONVERSION_SCRIPT = "./llama.cpp/convert-hf-to-gguf.py" # Updated script path
STATUS_FILE = "status.json"
# Quantization configurations in order of processing
QUANT_CONFIGS = [
{"type": "Q2_K", "size_gb": 0.8, "notes": ""},
{"type": "Q3_K_S", "size_gb": 0.9, "notes": ""},
{"type": "Q3_K_M", "size_gb": 0.9, "notes": "lower quality"},
{"type": "Q3_K_L", "size_gb": 1.0, "notes": ""},
{"type": "IQ4_XS", "size_gb": 1.0, "notes": ""},
{"type": "Q4_K_S", "size_gb": 1.0, "notes": "fast, recommended"},
{"type": "Q4_K_M", "size_gb": 1.1, "notes": "fast, recommended"},
{"type": "Q5_K_S", "size_gb": 1.2, "notes": ""},
{"type": "Q5_K_M", "size_gb": 1.2, "notes": ""},
{"type": "Q6_K", "size_gb": 1.4, "notes": "very good quality"},
{"type": "Q8_0", "size_gb": 1.7, "notes": "fast, best quality"},
{"type": "f16", "size_gb": 3.2, "notes": "16 bpw, overkill"}
]
# Global variables for process state
processing_lock = threading.Lock()
current_status = {
"status": "Not started",
"last_check": None,
"last_updated": None,
"last_commit_hash": None,
"current_quant": None,
"quant_status": {},
"progress": 0,
"error": None,
"log": []
}
def escape(s: str) -> str:
"""Escape HTML for logging"""
s = s.replace("&", "&")
s = s.replace("<", "&lt;")
s = s.replace(">", "&gt;")
s = s.replace('"', "&quot;")
s = s.replace("\n", "<br/>")
return s
def log_message(message: str, error: bool = False):
"""Add message to log with timestamp"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {message}"
print(log_entry)
current_status["log"].append(log_entry)
if error:
current_status["error"] = message
# Keep log size manageable
if len(current_status["log"]) > 100:
current_status["log"] = current_status["log"][-100:]
# Save current status to file
save_status()
def save_status():
"""Save current status to file"""
with open(STATUS_FILE, 'w') as f:
json.dump(current_status, f)
def load_status():
"""Load status from file if it exists"""
global current_status
if os.path.exists(STATUS_FILE):
try:
with open(STATUS_FILE, 'r') as f:
current_status = json.load(f)
except Exception as e:
log_message(f"Error loading status file: {str(e)}", error=True)
def generate_importance_matrix(model_path: str, train_data_path: str, output_path: str):
"""Generate importance matrix for a model"""
imatrix_command = [
"./llama.cpp/llama-imatrix",
"-m", model_path,
"-f", train_data_path,
"-ngl", "99",
"--output-frequency", "10",
"-o", output_path,
]
if not os.path.isfile(model_path):
raise Exception(f"Model file not found: {model_path}")
log_message(f"Running imatrix command for {model_path}...")
process = subprocess.Popen(imatrix_command, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
try:
# Monitor the process for output to provide updates
for line in process.stdout:
log_message(f"imatrix: {line.strip()}")
process.wait(timeout=3600) # 1 hour timeout
except subprocess.TimeoutExpired:
log_message("Imatrix computation timed out. Sending SIGINT to allow graceful termination...", error=True)
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=60) # 1 minute grace period
except subprocess.TimeoutExpired:
log_message("Imatrix process still didn't terminate. Forcefully terminating process...", error=True)
process.kill()
stderr = process.stderr.read()
if stderr:
log_message(f"Imatrix stderr: {stderr}")
log_message("Importance matrix generation completed.")
def get_last_commit(repo_id: str):
"""Get the last commit hash of a repository"""
try:
api = HfApi()
# Use the model_info function instead of commit_info
info = api.model_info(repo_id)
# Get the commit hash from the info
return info.sha
except Exception as e:
log_message(f"Error getting commit info: {str(e)}", error=True)
return None
def check_for_updates():
"""Check if the source model has been updated"""
if processing_lock.locked():
log_message("Already processing, skipping update check")
return False
current_status["status"] = "Checking for updates"
current_status["last_check"] = datetime.now().isoformat()
try:
# Get the latest commit hash
latest_commit = get_last_commit(SOURCE_MODEL_REPO)
if latest_commit is None:
current_status["status"] = "Error checking for updates"
return False
log_message(f"Latest commit hash: {latest_commit}")
log_message(f"Previous commit hash: {current_status.get('last_commit_hash')}")
if current_status.get("last_commit_hash") != latest_commit:
current_status["status"] = "Update detected"
current_status["last_commit_hash"] = latest_commit
save_status()
return True
else:
current_status["status"] = "Up to date"
save_status()
return False
except Exception as e:
log_message(f"Error checking for updates: {str(e)}", error=True)
current_status["status"] = "Error checking for updates"
save_status()
return False
def check_llama_cpp():
"""Check if llama.cpp is properly set up and build if needed"""
try:
if not os.path.exists("llama.cpp"):
log_message("llama.cpp directory not found, cloning repository...")
subprocess.run(["git", "clone", "https://github.com/ggerganov/llama.cpp"], check=True)
# Check for critical files
converter_path = os.path.join("llama.cpp", "convert-hf-to-gguf.py")
if not os.path.exists(converter_path):
# Try alternative path
old_converter_path = os.path.join("llama.cpp", "convert_hf_to_gguf.py")
if os.path.exists(old_converter_path):
log_message(f"Found converter at {old_converter_path}, using this path")
global CONVERSION_SCRIPT
CONVERSION_SCRIPT = old_converter_path
else:
log_message("Converter script not found, listing files in llama.cpp...")
files = os.listdir("llama.cpp")
log_message(f"Files in llama.cpp: {files}")
# Search for any converter script
for file in files:
if file.startswith("convert") and file.endswith(".py"):
log_message(f"Found alternative converter: {file}")
CONVERSION_SCRIPT = os.path.join("llama.cpp", file)
break
# Build the tools
log_message("Building llama.cpp tools...")
os.chdir("llama.cpp")
# Check if build directory exists
if not os.path.exists("build"):
os.makedirs("build")
# Configure and build
subprocess.run(["cmake", "-B", "build", "-DBUILD_SHARED_LIBS=OFF"], check=True)
subprocess.run(["cmake", "--build", "build", "--config", "Release", "-j", "--target", "llama-quantize", "llama-gguf-split", "llama-imatrix"], check=True)
# Copy binaries
log_message("Copying built binaries...")
try:
# Different builds may put binaries in different places
if os.path.exists(os.path.join("build", "bin")):
for binary in ["llama-quantize", "llama-gguf-split", "llama-imatrix"]:
src = os.path.join("build", "bin", binary)
if os.path.exists(src):
subprocess.run(["cp", src, "./"], check=True)
else:
for binary in ["llama-quantize", "llama-gguf-split", "llama-imatrix"]:
src = os.path.join("build", binary)
if os.path.exists(src):
subprocess.run(["cp", src, "./"], check=True)
except Exception as e:
log_message(f"Error copying binaries: {str(e)}", error=True)
# Return to the original directory
os.chdir("..")
# Make sure we have the calibration data
if not os.path.exists(os.path.join("llama.cpp", "groups_merged.txt")):
log_message("Copying calibration data...")
if os.path.exists("groups_merged.txt"):
subprocess.run(["cp", "groups_merged.txt", "llama.cpp/"], check=True)
log_message("llama.cpp setup completed successfully")
return True
except Exception as e:
log_message(f"Error setting up llama.cpp: {str(e)}", error=True)
traceback.print_exc()
return False
def process_model():
"""Process the model to create all quantized versions"""
if processing_lock.locked():
log_message("Already processing, cannot start another process")
return
with processing_lock:
try:
# Check llama.cpp is set up
if not check_llama_cpp():
log_message("Failed to set up llama.cpp, aborting", error=True)
current_status["status"] = "Error setting up llama.cpp"
save_status()
return
# Validate authentication
try:
user_info = whoami()
log_message(f"Processing as user: {user_info['name']}")
except Exception as e:
log_message(f"Authentication error: {str(e)}. Please make sure you're logged in.", error=True)
current_status["status"] = "Authentication error"
save_status()
return
api = HfApi()
model_name = SOURCE_MODEL_REPO.split('/')[-1]
current_status["status"] = "Processing"
current_status["progress"] = 0
save_status()
# Prepare directories
if not os.path.exists("downloads"):
os.makedirs("downloads")
if not os.path.exists("outputs"):
os.makedirs("outputs")
log_message(f"Starting model processing for {SOURCE_MODEL_REPO}")
# Create temp directories for processing
with Path("outputs").resolve() as outdir:
log_message(f"Output directory: {outdir}")
# Download the model
log_message(f"Downloading model from {SOURCE_MODEL_REPO}")
try:
local_dir = Path("downloads") / model_name
log_message(f"Local directory: {local_dir}")
# Check and download pattern
dl_pattern = ["*.md", "*.json", "*.model"]
try:
files = list_repo_files(SOURCE_MODEL_REPO)
has_safetensors = any(file.endswith(".safetensors") for file in files)
pattern = "*.safetensors" if has_safetensors else "*.bin"
dl_pattern.append(pattern)
log_message(f"Using download pattern: {dl_pattern}")
except Exception as e:
log_message(f"Error checking repo files: {str(e)}", error=True)
dl_pattern.append("*.safetensors")
dl_pattern.append("*.bin")
# Download the model
api.snapshot_download(
repo_id=SOURCE_MODEL_REPO,
local_dir=local_dir,
local_dir_use_symlinks=False,
allow_patterns=dl_pattern
)
log_message("Model downloaded successfully!")
# Check for adapter config - if it's a LoRA adapter, this won't work
config_dir = local_dir / "config.json"
adapter_config_dir = local_dir / "adapter_config.json"
if os.path.exists(adapter_config_dir) and not os.path.exists(config_dir):
raise Exception('adapter_config.json is present. If you are converting a LoRA adapter to GGUF, please use a different tool.')
# Convert to FP16 first
fp16_path = str(outdir / f"{model_name}.fp16.gguf")
log_message(f"Converting model to FP16: {fp16_path}")
# Check if the converter script exists
if not os.path.exists(CONVERSION_SCRIPT):
log_message(f"Converter script not found at {CONVERSION_SCRIPT}, searching for alternatives", error=True)
for root, dirs, files in os.walk("llama.cpp"):
for file in files:
if file.startswith("convert") and file.endswith(".py"):
global CONVERSION_SCRIPT
CONVERSION_SCRIPT = os.path.join(root, file)
log_message(f"Found converter at {CONVERSION_SCRIPT}")
break
log_message(f"Using converter script: {CONVERSION_SCRIPT}")
result = subprocess.run([
"python", CONVERSION_SCRIPT, str(local_dir), "--outtype", "f16", "--outfile", fp16_path
], shell=False, capture_output=True, text=True)
if result.returncode != 0:
log_message(f"Converter stderr: {result.stderr}")
log_message(f"Converter stdout: {result.stdout}")
raise Exception(f"Error converting to fp16: {result.stderr}")
log_message("Model converted to fp16 successfully!")
# Generate importance matrix for IQ quantizations
imatrix_path = str(outdir / "imatrix.dat")
train_data_path = "llama.cpp/groups_merged.txt" # Default calibration dataset
if not os.path.isfile(train_data_path):
log_message(f"Warning: Training data file not found at {train_data_path}, searching alternatives...")
# Try to find it elsewhere
if os.path.exists("groups_merged.txt"):
train_data_path = "groups_merged.txt"
log_message(f"Found training data at {train_data_path}")
else:
log_message("Calibration data not found. Some quantizations may not work.", error=True)
try:
if os.path.isfile(train_data_path):
generate_importance_matrix(fp16_path, train_data_path, imatrix_path)
else:
imatrix_path = None
except Exception as e:
log_message(f"Error generating importance matrix: {str(e)}", error=True)
imatrix_path = None
# Process each quantization type
total_quants = len(QUANT_CONFIGS)
for i, quant_config in enumerate(QUANT_CONFIGS):
quant_type = quant_config["type"]
current_status["current_quant"] = quant_type
current_status["progress"] = int((i / total_quants) * 100)
save_status()
log_message(f"Processing quantization {i+1}/{total_quants}: {quant_type}")
try:
# Check if this is an IQ quantization
is_iq_quant = quant_type.startswith("IQ")
# Skip if we don't have imatrix and this is an IQ quant
if is_iq_quant and (imatrix_path is None or not os.path.exists(imatrix_path)):
log_message(f"Skipping {quant_type} as importance matrix is not available", error=True)
current_status["quant_status"][quant_type] = "Skipped - No imatrix"
continue
# Set up the repo name
username = user_info["name"]
repo_name = f"{model_name}-{quant_type}-GGUF"
repo_id = f"{username}/{repo_name}"
# Set up output path
quant_file_name = f"{model_name.lower()}-{quant_type.lower()}.gguf"
if is_iq_quant and quant_type != "f16":
quant_file_name = f"{model_name.lower()}-{quant_type.lower()}-imat.gguf"
quant_file_path = str(outdir / quant_file_name)
# Run quantization
if is_iq_quant and quant_type != "f16":
quantize_cmd = [
"./llama.cpp/llama-quantize",
"--imatrix", imatrix_path, fp16_path, quant_file_path, quant_type
]
else:
quantize_cmd = [
"./llama.cpp/llama-quantize",
fp16_path, quant_file_path, quant_type
]
log_message(f"Running quantization command: {' '.join(quantize_cmd)}")
result = subprocess.run(quantize_cmd, shell=False, capture_output=True, text=True)
if result.returncode != 0:
if "out of memory" in result.stderr.lower():
log_message(f"Out of memory error quantizing {quant_type}. Skipping larger models.", error=True)
current_status["quant_status"][quant_type] = "Failed - Out of memory"
# Break the loop to skip larger models
break
else:
raise Exception(f"Error quantizing {quant_type}: {result.stderr}")
log_message(f"Quantized successfully with {quant_type}!")
# Create the repo if it doesn't exist
log_message(f"Creating/updating repo {repo_id}")
try:
repo_url = api.create_repo(repo_id=repo_id, exist_ok=True)
log_message(f"Repo URL: {repo_url}")
except Exception as e:
log_message(f"Error creating repo: {str(e)}", error=True)
current_status["quant_status"][quant_type] = "Failed - Repo creation error"
continue
# Create README with model info
log_message("Creating README")
readme_content = f"""# {repo_name}
This model was converted to GGUF format from [`{SOURCE_MODEL_REPO}`](https://huggingface.co/{SOURCE_MODEL_REPO}) using llama.cpp.
## Quantization: {quant_type}
Approximate size: {quant_config['size_gb']} GB
Notes: {quant_config['notes']}
## Use with llama.cpp
Install llama.cpp through brew (works on Mac and Linux)
```bash
brew install llama.cpp
```
Invoke the llama.cpp server or the CLI.
### CLI:
```bash
llama-cli --hf-repo {repo_id} --hf-file {quant_file_name} -p "The meaning to life and the universe is"
```
### Server:
```bash
llama-server --hf-repo {repo_id} --hf-file {quant_file_name} -c 2048
```
Note: You can also use this checkpoint directly through the [usage steps](https://github.com/ggerganov/llama.cpp?tab=readme-ov-file#usage) listed in the Llama.cpp repo as well.
Step 1: Clone llama.cpp from GitHub.
```
git clone https://github.com/ggerganov/llama.cpp
```
Step 2: Move into the llama.cpp folder and build it with `LLAMA_CURL=1` flag along with other hardware-specific flags (for ex: LLAMA_CUDA=1 for Nvidia GPUs on Linux).
```
cd llama.cpp && LLAMA_CURL=1 make
```
Step 3: Run inference through the main binary.
```
./llama-cli --hf-repo {repo_id} --hf-file {quant_file_name} -p "The meaning to life and the universe is"
```
or
```
./llama-server --hf-repo {repo_id} --hf-file {quant_file_name} -c 2048
```
## Auto-generated
This model version was automatically generated when updates were detected in the source repository.
Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
readme_path = outdir / "README.md"
with open(readme_path, 'w') as f:
f.write(readme_content)
# Upload the quantized model and README
log_message(f"Uploading quantized model: {quant_file_path}")
try:
api.upload_file(
path_or_fileobj=quant_file_path,
path_in_repo=quant_file_name,
repo_id=repo_id,
)
api.upload_file(
path_or_fileobj=str(readme_path),
path_in_repo="README.md",
repo_id=repo_id,
)
if os.path.isfile(imatrix_path) and is_iq_quant:
log_message(f"Uploading imatrix.dat")
api.upload_file(
path_or_fileobj=imatrix_path,
path_in_repo="imatrix.dat",
repo_id=repo_id,
)
log_message(f"Successfully uploaded {quant_type} quantization!")
current_status["quant_status"][quant_type] = "Success"
except Exception as e:
log_message(f"Error uploading files: {str(e)}", error=True)
current_status["quant_status"][quant_type] = f"Failed - Upload error: {str(e)}"
except Exception as e:
log_message(f"Error processing {quant_type}: {str(e)}", error=True)
current_status["quant_status"][quant_type] = f"Failed: {str(e)}"
# Continue with the next quantization
# Update status after completion
current_status["status"] = "Completed"
current_status["progress"] = 100
current_status["last_updated"] = datetime.now().isoformat()
log_message("Model processing completed!")
except Exception as e:
log_message(f"Error during model processing: {str(e)}", error=True)
current_status["status"] = "Error"
current_status["error"] = str(e)
traceback.print_exc()
except Exception as e:
log_message(f"Error: {str(e)}", error=True)
current_status["status"] = "Error"
current_status["error"] = str(e)
traceback.print_exc()
finally:
save_status()
def check_and_process():
"""Check for updates and process if needed"""
log_message("Running scheduled check for updates")
if check_for_updates():
log_message("Updates detected, starting processing")
threading.Thread(target=process_model).start()
else:
log_message("No updates detected")
def create_ui():
"""Create the Gradio interface"""
with gr.Blocks(css="body { margin: 0; padding: 0; }") as demo:
gr.Markdown("# 🦙 Automatic GGUF Quantization for Ursa_Minor")
gr.Markdown(f"This space automatically creates quantized GGUF versions of the [Sculptor-AI/Ursa_Minor](https://huggingface.co/{SOURCE_MODEL_REPO}) model whenever it's updated.")
with gr.Row():
with gr.Column(scale=2):
status_info = gr.HTML(label="Status", value="<p>Loading status...</p>")
with gr.Column(scale=1):
with gr.Row():
check_button = gr.Button("Check for Updates", variant="primary")
process_button = gr.Button("Force Processing", variant="secondary")
# Remove the 'label' parameter since it's not supported
progress_bar = gr.Progress()
with gr.Tab("Quantization Status"):
quant_status = gr.DataFrame(
headers=["Type", "Size (GB)", "Notes", "Status"],
value=lambda: [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS],
label="Quantization Status"
)
with gr.Tab("Logs"):
logs = gr.HTML(label="Logs", value="<p>Loading logs...</p>")
def update_status():
"""Update the status display"""
status_html = f"""
<div style="border: 1px solid #ddd; padding: 15px; border-radius: 5px;">
<h3>Current Status: <span style="color: {'green' if current_status['status'] == 'Up to date' else 'blue' if current_status['status'] == 'Processing' else 'red' if 'Error' in current_status['status'] else 'orange'}">{current_status['status']}</span></h3>
<p><strong>Last Checked:</strong> {current_status.get('last_check', 'Never').replace('T', ' ').split('.')[0] if current_status.get('last_check') else 'Never'}</p>
<p><strong>Last Updated:</strong> {current_status.get('last_updated', 'Never').replace('T', ' ').split('.')[0] if current_status.get('last_updated') else 'Never'}</p>
<p><strong>Current Quantization:</strong> {current_status.get('current_quant', 'None')}</p>
{f'<p style="color: red;"><strong>Error:</strong> {current_status["error"]}</p>' if current_status.get('error') else ''}
</div>
"""
return status_html
def update_logs():
"""Update the logs display"""
logs_html = "<div style='height: 400px; overflow-y: auto; background-color: #f9f9f9; padding: 10px; font-family: monospace; white-space: pre-wrap;'>"
for log in current_status["log"]:
if "Error" in log or "error" in log:
logs_html += f"<div style='color: red;'>{log}</div>"
else:
logs_html += f"<div>{log}</div>"
logs_html += "</div>"
return logs_html
def on_check_button():
"""Handle check button click"""
if check_for_updates():
threading.Thread(target=process_model).start()
return update_status(), [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS], update_logs()
def on_process_button():
"""Handle process button click"""
threading.Thread(target=process_model).start()
return update_status(), [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS], update_logs()
check_button.click(on_check_button, outputs=[status_info, quant_status, logs])
process_button.click(on_process_button, outputs=[status_info, quant_status, logs])
# Set up periodic refresh
demo.load(update_status, outputs=[status_info])
demo.load(lambda: [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS], outputs=[quant_status])
demo.load(update_logs, outputs=[logs])
refresh_interval = 5 # seconds
gr.HTML("<script>setInterval(function(){ Array.from(document.querySelectorAll('button[id*=Refresh-Button]')).forEach(b => b.click()); }, " + str(refresh_interval * 1000) + ");</script>")
return demo
# Initialize
def initialize():
"""Initialize the application"""
# Load status from file
load_status()
# Check and setup llama.cpp
check_llama_cpp()
# Schedule regular checks for updates
scheduler = BackgroundScheduler()
scheduler.add_job(check_and_process, 'interval', minutes=60) # Check every hour
scheduler.start()
# Run initial check
threading.Thread(target=check_and_process).start()
if __name__ == "__main__":
initialize()
demo = create_ui()
demo.queue(concurrency_count=1).launch()