import os import subprocess import signal import time import json from datetime import datetime from pathlib import Path import threading import traceback os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" import gradio as gr from huggingface_hub import HfApi, list_repo_files, hf_hub_download, login, whoami from apscheduler.schedulers.background import BackgroundScheduler # MODEL_REPO to monitor SOURCE_MODEL_REPO = "Sculptor-AI/Ursa_Minor" CONVERSION_SCRIPT = "./llama.cpp/convert-hf-to-gguf.py" # Updated script path STATUS_FILE = "status.json" # Quantization configurations in order of processing QUANT_CONFIGS = [ {"type": "Q2_K", "size_gb": 0.8, "notes": ""}, {"type": "Q3_K_S", "size_gb": 0.9, "notes": ""}, {"type": "Q3_K_M", "size_gb": 0.9, "notes": "lower quality"}, {"type": "Q3_K_L", "size_gb": 1.0, "notes": ""}, {"type": "IQ4_XS", "size_gb": 1.0, "notes": ""}, {"type": "Q4_K_S", "size_gb": 1.0, "notes": "fast, recommended"}, {"type": "Q4_K_M", "size_gb": 1.1, "notes": "fast, recommended"}, {"type": "Q5_K_S", "size_gb": 1.2, "notes": ""}, {"type": "Q5_K_M", "size_gb": 1.2, "notes": ""}, {"type": "Q6_K", "size_gb": 1.4, "notes": "very good quality"}, {"type": "Q8_0", "size_gb": 1.7, "notes": "fast, best quality"}, {"type": "f16", "size_gb": 3.2, "notes": "16 bpw, overkill"} ] # Global variables for process state processing_lock = threading.Lock() current_status = { "status": "Not started", "last_check": None, "last_updated": None, "last_commit_hash": None, "current_quant": None, "quant_status": {}, "progress": 0, "error": None, "log": [] } def escape(s: str) -> str: """Escape HTML for logging""" s = s.replace("&", "&") s = s.replace("<", "<") s = s.replace(">", ">") s = s.replace('"', """) s = s.replace("\n", "
") return s def log_message(message: str, error: bool = False): """Add message to log with timestamp""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[{timestamp}] {message}" print(log_entry) current_status["log"].append(log_entry) if error: current_status["error"] = message # Keep log size manageable if len(current_status["log"]) > 100: current_status["log"] = current_status["log"][-100:] # Save current status to file save_status() def save_status(): """Save current status to file""" with open(STATUS_FILE, 'w') as f: json.dump(current_status, f) def load_status(): """Load status from file if it exists""" global current_status if os.path.exists(STATUS_FILE): try: with open(STATUS_FILE, 'r') as f: current_status = json.load(f) except Exception as e: log_message(f"Error loading status file: {str(e)}", error=True) def generate_importance_matrix(model_path: str, train_data_path: str, output_path: str): """Generate importance matrix for a model""" imatrix_command = [ "./llama.cpp/llama-imatrix", "-m", model_path, "-f", train_data_path, "-ngl", "99", "--output-frequency", "10", "-o", output_path, ] if not os.path.isfile(model_path): raise Exception(f"Model file not found: {model_path}") log_message(f"Running imatrix command for {model_path}...") process = subprocess.Popen(imatrix_command, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) try: # Monitor the process for output to provide updates for line in process.stdout: log_message(f"imatrix: {line.strip()}") process.wait(timeout=3600) # 1 hour timeout except subprocess.TimeoutExpired: log_message("Imatrix computation timed out. Sending SIGINT to allow graceful termination...", error=True) process.send_signal(signal.SIGINT) try: process.wait(timeout=60) # 1 minute grace period except subprocess.TimeoutExpired: log_message("Imatrix process still didn't terminate. Forcefully terminating process...", error=True) process.kill() stderr = process.stderr.read() if stderr: log_message(f"Imatrix stderr: {stderr}") log_message("Importance matrix generation completed.") def get_last_commit(repo_id: str): """Get the last commit hash of a repository""" try: api = HfApi() # Use the model_info function instead of commit_info info = api.model_info(repo_id) # Get the commit hash from the info return info.sha except Exception as e: log_message(f"Error getting commit info: {str(e)}", error=True) return None def check_for_updates(): """Check if the source model has been updated""" if processing_lock.locked(): log_message("Already processing, skipping update check") return False current_status["status"] = "Checking for updates" current_status["last_check"] = datetime.now().isoformat() try: # Get the latest commit hash latest_commit = get_last_commit(SOURCE_MODEL_REPO) if latest_commit is None: current_status["status"] = "Error checking for updates" return False log_message(f"Latest commit hash: {latest_commit}") log_message(f"Previous commit hash: {current_status.get('last_commit_hash')}") if current_status.get("last_commit_hash") != latest_commit: current_status["status"] = "Update detected" current_status["last_commit_hash"] = latest_commit save_status() return True else: current_status["status"] = "Up to date" save_status() return False except Exception as e: log_message(f"Error checking for updates: {str(e)}", error=True) current_status["status"] = "Error checking for updates" save_status() return False def check_llama_cpp(): """Check if llama.cpp is properly set up and build if needed""" try: if not os.path.exists("llama.cpp"): log_message("llama.cpp directory not found, cloning repository...") subprocess.run(["git", "clone", "https://github.com/ggerganov/llama.cpp"], check=True) # Check for critical files converter_path = os.path.join("llama.cpp", "convert-hf-to-gguf.py") if not os.path.exists(converter_path): # Try alternative path old_converter_path = os.path.join("llama.cpp", "convert_hf_to_gguf.py") if os.path.exists(old_converter_path): log_message(f"Found converter at {old_converter_path}, using this path") global CONVERSION_SCRIPT CONVERSION_SCRIPT = old_converter_path else: log_message("Converter script not found, listing files in llama.cpp...") files = os.listdir("llama.cpp") log_message(f"Files in llama.cpp: {files}") # Search for any converter script for file in files: if file.startswith("convert") and file.endswith(".py"): log_message(f"Found alternative converter: {file}") CONVERSION_SCRIPT = os.path.join("llama.cpp", file) break # Build the tools log_message("Building llama.cpp tools...") os.chdir("llama.cpp") # Check if build directory exists if not os.path.exists("build"): os.makedirs("build") # Configure and build subprocess.run(["cmake", "-B", "build", "-DBUILD_SHARED_LIBS=OFF"], check=True) subprocess.run(["cmake", "--build", "build", "--config", "Release", "-j", "--target", "llama-quantize", "llama-gguf-split", "llama-imatrix"], check=True) # Copy binaries log_message("Copying built binaries...") try: # Different builds may put binaries in different places if os.path.exists(os.path.join("build", "bin")): for binary in ["llama-quantize", "llama-gguf-split", "llama-imatrix"]: src = os.path.join("build", "bin", binary) if os.path.exists(src): subprocess.run(["cp", src, "./"], check=True) else: for binary in ["llama-quantize", "llama-gguf-split", "llama-imatrix"]: src = os.path.join("build", binary) if os.path.exists(src): subprocess.run(["cp", src, "./"], check=True) except Exception as e: log_message(f"Error copying binaries: {str(e)}", error=True) # Return to the original directory os.chdir("..") # Make sure we have the calibration data if not os.path.exists(os.path.join("llama.cpp", "groups_merged.txt")): log_message("Copying calibration data...") if os.path.exists("groups_merged.txt"): subprocess.run(["cp", "groups_merged.txt", "llama.cpp/"], check=True) log_message("llama.cpp setup completed successfully") return True except Exception as e: log_message(f"Error setting up llama.cpp: {str(e)}", error=True) traceback.print_exc() return False def process_model(): """Process the model to create all quantized versions""" if processing_lock.locked(): log_message("Already processing, cannot start another process") return with processing_lock: try: # Check llama.cpp is set up if not check_llama_cpp(): log_message("Failed to set up llama.cpp, aborting", error=True) current_status["status"] = "Error setting up llama.cpp" save_status() return # Validate authentication try: user_info = whoami() log_message(f"Processing as user: {user_info['name']}") except Exception as e: log_message(f"Authentication error: {str(e)}. Please make sure you're logged in.", error=True) current_status["status"] = "Authentication error" save_status() return api = HfApi() model_name = SOURCE_MODEL_REPO.split('/')[-1] current_status["status"] = "Processing" current_status["progress"] = 0 save_status() # Prepare directories if not os.path.exists("downloads"): os.makedirs("downloads") if not os.path.exists("outputs"): os.makedirs("outputs") log_message(f"Starting model processing for {SOURCE_MODEL_REPO}") # Create temp directories for processing with Path("outputs").resolve() as outdir: log_message(f"Output directory: {outdir}") # Download the model log_message(f"Downloading model from {SOURCE_MODEL_REPO}") try: local_dir = Path("downloads") / model_name log_message(f"Local directory: {local_dir}") # Check and download pattern dl_pattern = ["*.md", "*.json", "*.model"] try: files = list_repo_files(SOURCE_MODEL_REPO) has_safetensors = any(file.endswith(".safetensors") for file in files) pattern = "*.safetensors" if has_safetensors else "*.bin" dl_pattern.append(pattern) log_message(f"Using download pattern: {dl_pattern}") except Exception as e: log_message(f"Error checking repo files: {str(e)}", error=True) dl_pattern.append("*.safetensors") dl_pattern.append("*.bin") # Download the model api.snapshot_download( repo_id=SOURCE_MODEL_REPO, local_dir=local_dir, local_dir_use_symlinks=False, allow_patterns=dl_pattern ) log_message("Model downloaded successfully!") # Check for adapter config - if it's a LoRA adapter, this won't work config_dir = local_dir / "config.json" adapter_config_dir = local_dir / "adapter_config.json" if os.path.exists(adapter_config_dir) and not os.path.exists(config_dir): raise Exception('adapter_config.json is present. If you are converting a LoRA adapter to GGUF, please use a different tool.') # Convert to FP16 first fp16_path = str(outdir / f"{model_name}.fp16.gguf") log_message(f"Converting model to FP16: {fp16_path}") # Check if the converter script exists if not os.path.exists(CONVERSION_SCRIPT): log_message(f"Converter script not found at {CONVERSION_SCRIPT}, searching for alternatives", error=True) for root, dirs, files in os.walk("llama.cpp"): for file in files: if file.startswith("convert") and file.endswith(".py"): global CONVERSION_SCRIPT CONVERSION_SCRIPT = os.path.join(root, file) log_message(f"Found converter at {CONVERSION_SCRIPT}") break log_message(f"Using converter script: {CONVERSION_SCRIPT}") result = subprocess.run([ "python", CONVERSION_SCRIPT, str(local_dir), "--outtype", "f16", "--outfile", fp16_path ], shell=False, capture_output=True, text=True) if result.returncode != 0: log_message(f"Converter stderr: {result.stderr}") log_message(f"Converter stdout: {result.stdout}") raise Exception(f"Error converting to fp16: {result.stderr}") log_message("Model converted to fp16 successfully!") # Generate importance matrix for IQ quantizations imatrix_path = str(outdir / "imatrix.dat") train_data_path = "llama.cpp/groups_merged.txt" # Default calibration dataset if not os.path.isfile(train_data_path): log_message(f"Warning: Training data file not found at {train_data_path}, searching alternatives...") # Try to find it elsewhere if os.path.exists("groups_merged.txt"): train_data_path = "groups_merged.txt" log_message(f"Found training data at {train_data_path}") else: log_message("Calibration data not found. Some quantizations may not work.", error=True) try: if os.path.isfile(train_data_path): generate_importance_matrix(fp16_path, train_data_path, imatrix_path) else: imatrix_path = None except Exception as e: log_message(f"Error generating importance matrix: {str(e)}", error=True) imatrix_path = None # Process each quantization type total_quants = len(QUANT_CONFIGS) for i, quant_config in enumerate(QUANT_CONFIGS): quant_type = quant_config["type"] current_status["current_quant"] = quant_type current_status["progress"] = int((i / total_quants) * 100) save_status() log_message(f"Processing quantization {i+1}/{total_quants}: {quant_type}") try: # Check if this is an IQ quantization is_iq_quant = quant_type.startswith("IQ") # Skip if we don't have imatrix and this is an IQ quant if is_iq_quant and (imatrix_path is None or not os.path.exists(imatrix_path)): log_message(f"Skipping {quant_type} as importance matrix is not available", error=True) current_status["quant_status"][quant_type] = "Skipped - No imatrix" continue # Set up the repo name username = user_info["name"] repo_name = f"{model_name}-{quant_type}-GGUF" repo_id = f"{username}/{repo_name}" # Set up output path quant_file_name = f"{model_name.lower()}-{quant_type.lower()}.gguf" if is_iq_quant and quant_type != "f16": quant_file_name = f"{model_name.lower()}-{quant_type.lower()}-imat.gguf" quant_file_path = str(outdir / quant_file_name) # Run quantization if is_iq_quant and quant_type != "f16": quantize_cmd = [ "./llama.cpp/llama-quantize", "--imatrix", imatrix_path, fp16_path, quant_file_path, quant_type ] else: quantize_cmd = [ "./llama.cpp/llama-quantize", fp16_path, quant_file_path, quant_type ] log_message(f"Running quantization command: {' '.join(quantize_cmd)}") result = subprocess.run(quantize_cmd, shell=False, capture_output=True, text=True) if result.returncode != 0: if "out of memory" in result.stderr.lower(): log_message(f"Out of memory error quantizing {quant_type}. Skipping larger models.", error=True) current_status["quant_status"][quant_type] = "Failed - Out of memory" # Break the loop to skip larger models break else: raise Exception(f"Error quantizing {quant_type}: {result.stderr}") log_message(f"Quantized successfully with {quant_type}!") # Create the repo if it doesn't exist log_message(f"Creating/updating repo {repo_id}") try: repo_url = api.create_repo(repo_id=repo_id, exist_ok=True) log_message(f"Repo URL: {repo_url}") except Exception as e: log_message(f"Error creating repo: {str(e)}", error=True) current_status["quant_status"][quant_type] = "Failed - Repo creation error" continue # Create README with model info log_message("Creating README") readme_content = f"""# {repo_name} This model was converted to GGUF format from [`{SOURCE_MODEL_REPO}`](https://huggingface.co/{SOURCE_MODEL_REPO}) using llama.cpp. ## Quantization: {quant_type} Approximate size: {quant_config['size_gb']} GB Notes: {quant_config['notes']} ## Use with llama.cpp Install llama.cpp through brew (works on Mac and Linux) ```bash brew install llama.cpp ``` Invoke the llama.cpp server or the CLI. ### CLI: ```bash llama-cli --hf-repo {repo_id} --hf-file {quant_file_name} -p "The meaning to life and the universe is" ``` ### Server: ```bash llama-server --hf-repo {repo_id} --hf-file {quant_file_name} -c 2048 ``` Note: You can also use this checkpoint directly through the [usage steps](https://github.com/ggerganov/llama.cpp?tab=readme-ov-file#usage) listed in the Llama.cpp repo as well. Step 1: Clone llama.cpp from GitHub. ``` git clone https://github.com/ggerganov/llama.cpp ``` Step 2: Move into the llama.cpp folder and build it with `LLAMA_CURL=1` flag along with other hardware-specific flags (for ex: LLAMA_CUDA=1 for Nvidia GPUs on Linux). ``` cd llama.cpp && LLAMA_CURL=1 make ``` Step 3: Run inference through the main binary. ``` ./llama-cli --hf-repo {repo_id} --hf-file {quant_file_name} -p "The meaning to life and the universe is" ``` or ``` ./llama-server --hf-repo {repo_id} --hf-file {quant_file_name} -c 2048 ``` ## Auto-generated This model version was automatically generated when updates were detected in the source repository. Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} """ readme_path = outdir / "README.md" with open(readme_path, 'w') as f: f.write(readme_content) # Upload the quantized model and README log_message(f"Uploading quantized model: {quant_file_path}") try: api.upload_file( path_or_fileobj=quant_file_path, path_in_repo=quant_file_name, repo_id=repo_id, ) api.upload_file( path_or_fileobj=str(readme_path), path_in_repo="README.md", repo_id=repo_id, ) if os.path.isfile(imatrix_path) and is_iq_quant: log_message(f"Uploading imatrix.dat") api.upload_file( path_or_fileobj=imatrix_path, path_in_repo="imatrix.dat", repo_id=repo_id, ) log_message(f"Successfully uploaded {quant_type} quantization!") current_status["quant_status"][quant_type] = "Success" except Exception as e: log_message(f"Error uploading files: {str(e)}", error=True) current_status["quant_status"][quant_type] = f"Failed - Upload error: {str(e)}" except Exception as e: log_message(f"Error processing {quant_type}: {str(e)}", error=True) current_status["quant_status"][quant_type] = f"Failed: {str(e)}" # Continue with the next quantization # Update status after completion current_status["status"] = "Completed" current_status["progress"] = 100 current_status["last_updated"] = datetime.now().isoformat() log_message("Model processing completed!") except Exception as e: log_message(f"Error during model processing: {str(e)}", error=True) current_status["status"] = "Error" current_status["error"] = str(e) traceback.print_exc() except Exception as e: log_message(f"Error: {str(e)}", error=True) current_status["status"] = "Error" current_status["error"] = str(e) traceback.print_exc() finally: save_status() def check_and_process(): """Check for updates and process if needed""" log_message("Running scheduled check for updates") if check_for_updates(): log_message("Updates detected, starting processing") threading.Thread(target=process_model).start() else: log_message("No updates detected") def create_ui(): """Create the Gradio interface""" with gr.Blocks(css="body { margin: 0; padding: 0; }") as demo: gr.Markdown("# 🦙 Automatic GGUF Quantization for Ursa_Minor") gr.Markdown(f"This space automatically creates quantized GGUF versions of the [Sculptor-AI/Ursa_Minor](https://huggingface.co/{SOURCE_MODEL_REPO}) model whenever it's updated.") with gr.Row(): with gr.Column(scale=2): status_info = gr.HTML(label="Status", value="

Loading status...

") with gr.Column(scale=1): with gr.Row(): check_button = gr.Button("Check for Updates", variant="primary") process_button = gr.Button("Force Processing", variant="secondary") # Remove the 'label' parameter since it's not supported progress_bar = gr.Progress() with gr.Tab("Quantization Status"): quant_status = gr.DataFrame( headers=["Type", "Size (GB)", "Notes", "Status"], value=lambda: [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS], label="Quantization Status" ) with gr.Tab("Logs"): logs = gr.HTML(label="Logs", value="

Loading logs...

") def update_status(): """Update the status display""" status_html = f"""

Current Status: {current_status['status']}

Last Checked: {current_status.get('last_check', 'Never').replace('T', ' ').split('.')[0] if current_status.get('last_check') else 'Never'}

Last Updated: {current_status.get('last_updated', 'Never').replace('T', ' ').split('.')[0] if current_status.get('last_updated') else 'Never'}

Current Quantization: {current_status.get('current_quant', 'None')}

{f'

Error: {current_status["error"]}

' if current_status.get('error') else ''}
""" return status_html def update_logs(): """Update the logs display""" logs_html = "
" for log in current_status["log"]: if "Error" in log or "error" in log: logs_html += f"
{log}
" else: logs_html += f"
{log}
" logs_html += "
" return logs_html def on_check_button(): """Handle check button click""" if check_for_updates(): threading.Thread(target=process_model).start() return update_status(), [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS], update_logs() def on_process_button(): """Handle process button click""" threading.Thread(target=process_model).start() return update_status(), [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS], update_logs() check_button.click(on_check_button, outputs=[status_info, quant_status, logs]) process_button.click(on_process_button, outputs=[status_info, quant_status, logs]) # Set up periodic refresh demo.load(update_status, outputs=[status_info]) demo.load(lambda: [[q["type"], q["size_gb"], q["notes"], current_status["quant_status"].get(q["type"], "Not processed")] for q in QUANT_CONFIGS], outputs=[quant_status]) demo.load(update_logs, outputs=[logs]) refresh_interval = 5 # seconds gr.HTML("") return demo # Initialize def initialize(): """Initialize the application""" # Load status from file load_status() # Check and setup llama.cpp check_llama_cpp() # Schedule regular checks for updates scheduler = BackgroundScheduler() scheduler.add_job(check_and_process, 'interval', minutes=60) # Check every hour scheduler.start() # Run initial check threading.Thread(target=check_and_process).start() if __name__ == "__main__": initialize() demo = create_ui() demo.queue(concurrency_count=1).launch()