import streamlit as st from llama_cpp import Llama from huggingface_hub import hf_hub_download import os, gc, shutil, re from itertools import islice from duckduckgo_search import DDGS # Latest class-based interface :contentReference[oaicite:0]{index=0} # ----- Custom CSS for pretty formatting of internal reasoning ----- CUSTOM_CSS = """ """ st.markdown(CUSTOM_CSS, unsafe_allow_html=True) # ----- Set a threshold for required free storage (in bytes) ----- REQUIRED_SPACE_BYTES = 5 * 1024 ** 3 # 5 GB # ----- Function to perform DuckDuckGo search and retrieve concise context ----- def retrieve_context(query, max_results=2, max_chars_per_result=150): """ Query DuckDuckGo for the given search query and return a concatenated context string. Uses the DDGS().text() generator (with region, safesearch, and timelimit parameters) and limits the results using islice. Each result's title and snippet are combined into context. """ try: with DDGS() as ddgs: results_gen = ddgs.text(query, region="wt-wt", safesearch="off", timelimit="y") results = list(islice(results_gen, max_results)) context = "" if results: for i, result in enumerate(results, start=1): title = result.get("title", "No Title") snippet = result.get("body", "")[:max_chars_per_result] context += f"Result {i}:\nTitle: {title}\nSnippet: {snippet}\n\n" return context.strip() except Exception as e: st.error(f"Error during retrieval: {e}") return "" # ----- Available models ----- MODELS = { "Qwen2.5-0.5B-Instruct (Q4_K_M)": { "repo_id": "Qwen/Qwen2.5-0.5B-Instruct-GGUF", "filename": "qwen2.5-0.5b-instruct-q4_k_m.gguf", "description": "Qwen2.5-0.5B-Instruct (Q4_K_M)" }, "Gemma-3.1B-it (Q4_K_M)": { "repo_id": "unsloth/gemma-3-1b-it-GGUF", "filename": "gemma-3-1b-it-Q4_K_M.gguf", "description": "Gemma-3.1B-it (Q4_K_M)" }, "Qwen2.5-7B-Instruct (Q2_K)": { "repo_id": "Qwen/Qwen2.5-7B-Instruct-GGUF", "filename": "qwen2.5-7b-instruct-q2_k.gguf", "description": "Qwen2.5-7B Instruct (Q2_K)" }, "Gemma-3-4B-IT (Q4_K_M)": { "repo_id": "unsloth/gemma-3-4b-it-GGUF", "filename": "gemma-3-4b-it-Q4_K_M.gguf", "description": "Gemma 3 4B IT (Q4_K_M)" }, "Phi-4-mini-Instruct (Q4_K_M)": { "repo_id": "unsloth/Phi-4-mini-instruct-GGUF", "filename": "Phi-4-mini-instruct-Q4_K_M.gguf", "description": "Phi-4 Mini Instruct (Q4_K_M)" }, "Meta-Llama-3.1-8B-Instruct (Q2_K)": { "repo_id": "MaziyarPanahi/Meta-Llama-3.1-8B-Instruct-GGUF", "filename": "Meta-Llama-3.1-8B-Instruct.Q2_K.gguf", "description": "Meta-Llama-3.1-8B-Instruct (Q2_K)" }, "DeepSeek-R1-Distill-Llama-8B (Q2_K)": { "repo_id": "unsloth/DeepSeek-R1-Distill-Llama-8B-GGUF", "filename": "DeepSeek-R1-Distill-Llama-8B-Q2_K.gguf", "description": "DeepSeek-R1-Distill-Llama-8B (Q2_K)" }, "Mistral-7B-Instruct-v0.3 (IQ3_XS)": { "repo_id": "MaziyarPanahi/Mistral-7B-Instruct-v0.3-GGUF", "filename": "Mistral-7B-Instruct-v0.3.IQ3_XS.gguf", "description": "Mistral-7B-Instruct-v0.3 (IQ3_XS)" }, "Qwen2.5-Coder-7B-Instruct (Q2_K)": { "repo_id": "Qwen/Qwen2.5-Coder-7B-Instruct-GGUF", "filename": "qwen2.5-coder-7b-instruct-q2_k.gguf", "description": "Qwen2.5-Coder-7B-Instruct (Q2_K)" }, } # ----- Sidebar settings ----- with st.sidebar: st.header("⚙️ Settings") selected_model_name = st.selectbox("Select Model", list(MODELS.keys())) system_prompt_base = st.text_area("System Prompt", value="You are a helpful assistant.", height=80) max_tokens = st.slider("Max tokens", 64, 1024, 256, step=32) # Adjust for lower memory usage temperature = st.slider("Temperature", 0.1, 2.0, 0.7) top_k = st.slider("Top-K", 1, 100, 40) top_p = st.slider("Top-P", 0.1, 1.0, 0.95) repeat_penalty = st.slider("Repetition Penalty", 1.0, 2.0, 1.1) # Checkbox to enable the DuckDuckGo search feature (disabled by default) enable_search = st.checkbox("Enable Web Search", value=False) if st.button("📦 Show Disk Usage"): try: usage = shutil.disk_usage(".") used = usage.used / (1024 ** 3) free = usage.free / (1024 ** 3) st.info(f"Disk Used: {used:.2f} GB | Free: {free:.2f} GB") except Exception as e: st.error(f"Disk usage error: {e}") # ----- Define selected model and path ----- selected_model = MODELS[selected_model_name] model_path = os.path.join("models", selected_model["filename"]) # Ensure model directory exists os.makedirs("models", exist_ok=True) # ----- Helper functions for model management ----- def try_load_model(path): try: return Llama( model_path=path, n_ctx=512, # Reduced context window to save memory n_threads=1, # Fewer threads for resource-constrained environments n_threads_batch=1, n_batch=2, # Lower batch size to conserve memory n_gpu_layers=0, use_mlock=False, use_mmap=True, verbose=False, ) except Exception as e: return str(e) def download_model(): with st.spinner(f"Downloading {selected_model['filename']}..."): hf_hub_download( repo_id=selected_model["repo_id"], filename=selected_model["filename"], local_dir="./models", local_dir_use_symlinks=False, ) def validate_or_download_model(): if not os.path.exists(model_path): free_space = shutil.disk_usage(".").free if free_space < REQUIRED_SPACE_BYTES: st.info("Insufficient storage. Consider cleaning up old models.") download_model() result = try_load_model(model_path) if isinstance(result, str): st.warning(f"Initial load failed: {result}\nAttempting re-download...") try: os.remove(model_path) except Exception: pass download_model() result = try_load_model(model_path) if isinstance(result, str): st.error(f"Model still failed after re-download: {result}") st.stop() return result return result # ----- Session state initialization ----- if "model_name" not in st.session_state: st.session_state.model_name = None if "llm" not in st.session_state: st.session_state.llm = None if "chat_history" not in st.session_state: st.session_state.chat_history = [] if "pending_response" not in st.session_state: st.session_state.pending_response = False # ----- Load model if changed ----- if st.session_state.model_name != selected_model_name: if st.session_state.llm is not None: del st.session_state.llm gc.collect() st.session_state.llm = validate_or_download_model() st.session_state.model_name = selected_model_name llm = st.session_state.llm # ----- Display title and caption ----- st.title(f"🧠 {selected_model['description']} (Streamlit + GGUF)") st.caption(f"Powered by `llama.cpp` | Model: {selected_model['filename']}") # Render existing chat history for chat in st.session_state.chat_history: with st.chat_message(chat["role"]): st.markdown(chat["content"]) # ----- Chat input and integrated RAG with memory optimizations ----- user_input = st.chat_input("Ask something...") if user_input: if st.session_state.pending_response: st.warning("Please wait for the assistant to finish responding.") else: # Append the user query to chat history st.session_state.chat_history.append({"role": "user", "content": user_input}) with st.chat_message("user"): st.markdown(user_input) st.session_state.pending_response = True # Only retrieve search context if search feature is enabled if enable_search: retrieved_context = retrieve_context(user_input, max_results=2, max_chars_per_result=150) else: retrieved_context = "" st.sidebar.markdown("### Retrieved Context" if enable_search else "Web Search Disabled") st.sidebar.text(retrieved_context or "No context found.") # Build an augmented system prompt that includes the retrieved context if available if retrieved_context: augmented_prompt = ( "Use the following recent web search context to help answer the query:\n\n" f"{retrieved_context}\n\nUser Query: {user_input}" ) else: augmented_prompt = f"User Query: {user_input}" full_system_prompt = system_prompt_base.strip() + "\n\n" + augmented_prompt # Limit conversation history to the last 2 turns MAX_TURNS = 2 trimmed_history = st.session_state.chat_history[-(MAX_TURNS * 2):] messages = [{"role": "system", "content": full_system_prompt}] + trimmed_history # Generate response with the LLM in a streaming fashion with st.chat_message("assistant"): visible_placeholder = st.empty() full_response = "" stream = llm.create_chat_completion( messages=messages, max_tokens=max_tokens, temperature=temperature, top_k=top_k, top_p=top_p, repeat_penalty=repeat_penalty, stream=True, ) for chunk in stream: if "choices" in chunk: delta = chunk["choices"][0]["delta"].get("content", "") full_response += delta # Clean internal reasoning markers before display visible_response = re.sub(r".*?", "", full_response, flags=re.DOTALL) visible_response = re.sub(r".*$", "", visible_response, flags=re.DOTALL) visible_placeholder.markdown(visible_response) st.session_state.chat_history.append({"role": "assistant", "content": full_response}) st.session_state.pending_response = False gc.collect() # Trigger garbage collection to free memory