import streamlit as st import sqlite3 import hashlib import os import google.generativeai as genai import zipfile from git import Repo from transformers import AutoModelForCausalLM, AutoTokenizer import torch import requests # Database setup DB_FILE = "users.db" def create_user_table(): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( username TEXT PRIMARY KEY, password TEXT ) """) conn.commit() conn.close() def add_user(username, password): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() hashed_password = hashlib.sha256(password.encode()).hexdigest() try: cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, hashed_password)) conn.commit() except sqlite3.IntegrityError: st.error("Username already exists. Please choose a different username.") conn.close() def authenticate_user(username, password): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() hashed_password = hashlib.sha256(password.encode()).hexdigest() cursor.execute("SELECT * FROM users WHERE username = ? AND password = ?", (username, hashed_password)) user = cursor.fetchone() conn.close() return user def initialize_session_state(): if "authenticated" not in st.session_state: st.session_state.authenticated = False if "username" not in st.session_state: st.session_state.username = None if "page" not in st.session_state: st.session_state.page = "login" if "current_project" not in st.session_state: st.session_state.current_project = None if "project_uploaded" not in st.session_state: st.session_state.project_uploaded = False def main(): st.title("SimplifAI") # Initialize session state initialize_session_state() # Initialize database create_user_table() # Page routing logic if st.session_state.page == "login": login_page() elif st.session_state.page == "workspace": workspace_page() elif st.session_state.page == "project_view": project_view_page() elif st.session_state.page == "generate_documentation": generate_documentation_page() elif st.session_state.page == "view_documentation": view_documentation_page() def login_page(): st.subheader("Please Log In or Register to Continue") auth_mode = st.radio("Choose an Option", ["Log In", "Register"], horizontal=True) if auth_mode == "Log In": st.subheader("Log In") username = st.text_input("Username", key="login_username") password = st.text_input("Password", type="password", key="login_password") # Handle single-click login if st.button("Log In"): if authenticate_user(username, password): st.session_state.authenticated = True st.session_state.username = username st.session_state.page = "workspace" else: st.error("Invalid username or password. Please try again.") elif auth_mode == "Register": st.subheader("Register") username = st.text_input("Create Username", key="register_username") password = st.text_input("Create Password", type="password", key="register_password") # Handle single-click registration if st.button("Register"): if username and password: add_user(username, password) st.success("Account created successfully! You can now log in.") else: st.error("Please fill in all fields.") def workspace_page(): # Sidebar with logout button st.sidebar.title(f"Hello, {st.session_state.username}!") if st.sidebar.button("Log Out"): st.session_state.authenticated = False st.session_state.username = None st.session_state.page = "login" # User's folder for projects user_folder = os.path.join("user_projects", st.session_state.username) os.makedirs(user_folder, exist_ok=True) # Refresh project list dynamically projects = [d for d in os.listdir(user_folder) if os.path.isdir(os.path.join(user_folder, d))] # Display "Projects" dropdown selected_project = st.sidebar.selectbox("Projects", ["Select a project"] + projects) if selected_project != "Select a project": st.session_state.current_project = selected_project st.session_state.page = "project_view" st.rerun() # Display success message if a project was uploaded if st.session_state.project_uploaded: st.success(f"Project '{st.session_state.current_project}' uploaded successfully!") st.session_state.project_uploaded = False # Reset after showing the message # Main content area st.subheader("Workspace") st.write("You can create a new project by uploading files or folders, or by cloning a GitHub repository.") # Existing upload functionality... # User action selection action = st.radio("Choose an action", ["Upload Files or Folders", "Clone GitHub Repository"], horizontal=True) project_name = st.text_input("Enter a project name") if action == "Upload Files or Folders": st.subheader("Upload Files or Folders") uploaded_files = st.file_uploader( "Upload one or more files or a .zip archive for folders", accept_multiple_files=True ) if uploaded_files and project_name: if st.button("Upload Project"): project_folder = os.path.join(user_folder, project_name) os.makedirs(project_folder, exist_ok=True) for uploaded_file in uploaded_files: # Save uploaded .zip files or regular files file_path = os.path.join(project_folder, uploaded_file.name) with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) # If a .zip file is uploaded, extract its contents if uploaded_file.name.endswith(".zip"): try: with zipfile.ZipFile(file_path, "r") as zip_ref: zip_ref.extractall(project_folder) os.remove(file_path) # Remove the .zip file after extraction st.success(f"Folder from {uploaded_file.name} extracted successfully!") except zipfile.BadZipFile: st.error(f"File {uploaded_file.name} is not a valid .zip file.") else: st.success(f"File {uploaded_file.name} saved successfully!") # Update session state and trigger a rerun st.session_state.current_project = project_name st.session_state.project_uploaded = True st.rerun() elif action == "Clone GitHub Repository": st.subheader("Clone GitHub Repository") repo_url = st.text_input("Enter the GitHub repository URL") if repo_url and project_name: if st.button("Upload Project"): project_folder = os.path.join(user_folder, project_name) os.makedirs(project_folder, exist_ok=True) try: Repo.clone_from(repo_url, project_folder) # Update session state and trigger a rerun st.session_state.current_project = project_name st.session_state.project_uploaded = True st.rerun() except Exception as e: st.error(f"Failed to clone repository: {e}") #------------------------------------------------------------------------------------------------------------------------------------------------------------------------ # Configure Gemini API gemini = os.getenv("GEMINI") genai.configure(api_key=gemini) model = genai.GenerativeModel("gemini-1.5-flash") def read_project_files(project_path): """Reads all files in the project directory and its subdirectories.""" file_paths = [] for root, _, files in os.walk(project_path): for file in files: # Skip .git files or folders from GitHub clones if ".git" not in root: file_paths.append(os.path.join(root, file)) return file_paths def read_files(file_paths): """Reads content from a list of file paths.""" file_contents = {} for file_path in file_paths: if os.path.exists(file_path): try: # Attempt to read the file as UTF-8 with open(file_path, 'r', encoding='utf-8') as file: file_contents[file_path] = file.read() except UnicodeDecodeError: print(f"Skipping binary or non-UTF-8 file: {file_path}") else: print(f"File not found: {file_path}") return file_contents def generate_prompt(file_contents, functionality_description): """Generates a prompt for Gemini to analyze the files.""" prompt = "Analyze the following code files to identify all functions required to implement the functionality: " prompt += f"'{functionality_description}'.\n\n" for file_path, content in file_contents.items(): prompt += f"File: {os.path.basename(file_path)}\n{content}\n\n" prompt += "For each relevant function, provide:\n" prompt += "1. Which file the function is found in.\n" prompt += "2. The function name.\n" prompt += "3. Dependencies on other functions or modules.\n" prompt += """ Return your output in the following format providing no commentary: Project Summary: Functionality: Functions: : -: -Function Dependencies: """ return prompt def identify_required_functions(project_path, functionality_description): """Identifies required functions for a specified functionality.""" # Gather all file paths in the project directory file_paths = read_project_files(project_path) # Read file contents file_contents = read_files(file_paths) # Generate a refined prompt for Gemini prompt = generate_prompt(file_contents, functionality_description) # Call the Gemini model response = model.generate_content(prompt) # Process and return the response return response.text def extract_cleaned_gemini_output(gemini_output): """ Removes the 'Functions' section and any irrelevant content from the Gemini output. Args: gemini_output (str): The raw output returned by Gemini. Returns: str: Cleaned output without the 'Functions' section. """ lines = gemini_output.splitlines() cleaned_output = [] skip_section = False for line in lines: line = line.strip() # Detect the start of the "Functions:" section and skip it if line.startswith("Functions:"): skip_section = True # Detect the end of the "Functions:" section and resume processing if skip_section and (line.startswith("Tasks:") or line.startswith("Project Summary:")): skip_section = False # Append lines outside the skipped section if not skip_section: cleaned_output.append(line) # Return the cleaned output with blank lines filtered out return "\n".join(line for line in cleaned_output if line) def split_into_chunks(content, chunk_size=1000): """Splits large content into smaller chunks.""" return [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)] # Hugging Face Inference API endpoint for the model API_URL = "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-Coder-32B-Instruct" qwen = os.getenv("QWEN") headers = {"Authorization": f"Bearer {qwen}"} def extract_cleaned_gemini_output(gemini_output): """ Extracts and formats the cleaned output from Gemini to send to Qwen. Args: gemini_output (str): The output returned by Gemini. Returns: str: Cleaned and formatted output for Qwen. """ lines = gemini_output.splitlines() cleaned_output = [] functions_section = False for line in lines: line = line.strip() if line.startswith("Project Summary:") or line.startswith("Functionality:"): cleaned_output.append(line) elif line.startswith("Functions:"): cleaned_output.append(line) functions_section = True elif functions_section and line: cleaned_output.append(line) elif line.startswith("File:") or "Qwen," in line: break return "\n".join(cleaned_output) def clean_output(output): """ Cleans the final output to include only documentation sections. """ lines = output.splitlines() filtered_lines = [] in_valid_section = False for line in lines: line = line.strip() # Include only valid sections if line.startswith("Project Summary:") or line.startswith("Functionality Summary:") or line.startswith("Functionality Flow:"): in_valid_section = True filtered_lines.append(line) elif line.startswith("Function Documentation:"): in_valid_section = True filtered_lines.append(line) # Stop processing at any undesired section elif line.startswith("User-specified functionality:") or line.startswith("Tasks identified by Gemini:"): in_valid_section = False elif in_valid_section and line: filtered_lines.append(line) return "\n".join(filtered_lines) def validate_and_generate_documentation(api_url, headers, gemini_output, functionality_description): """ Uses the Hugging Face Inference API to generate clean and relevant documentation using Qwen. """ # Clean Gemini output cleaned_gemini_output = extract_cleaned_gemini_output(gemini_output) # Generate the refined prompt for Qwen prompt = f""" User-specified functionality: '{functionality_description}' Functions identified by Gemini: {cleaned_gemini_output} Tasks: 1. Generate a project summary: ' Project Summary: ' 2. Refine the user-defined functionality: ' Functionality Summary: ' 3. Describe the functionality flow: ' Functionality Flow: ' 4. Generate detailed documentation for each function: ' Function Documentation: For each relevant function: - Summary: - Inputs:
- Outputs:
- Dependencies: - Data structures:
- Algorithmic Details: - Error Handling: - Assumptions: - Example Usage: ' 5. Return only the required information for the above tasks, and exclude everything else. """ # Prepare payload and call API payload = {"inputs": prompt, "parameters": {"max_new_tokens": 1024}} response = requests.post(api_url, headers=headers, json=payload) # Handle API response if response.status_code == 200: api_response = response.json() output = api_response.get("generated_text", "") if isinstance(api_response, dict) else api_response[0].get("generated_text", "") return clean_output(output) else: raise ValueError(f"Error during API call: {response.status_code}, {response.text}") def generate_documentation_page(): st.subheader(f"Generate Documentation for {st.session_state.current_project}") st.write("Enter the functionality or parts of the project for which you'd like to generate documentation.") # Prompt user for functionality description functionality = st.text_area( "Describe the functionality", placeholder="e.g., Explain the function of the file `main.py`", ) # Button to start analyzing functionality if st.button("Analyze"): if functionality.strip(): st.write("Analyzing project files... Please wait.") # Get the path of the current project user_folder = os.path.join("user_projects", st.session_state.username) project_folder = os.path.join(user_folder, st.session_state.current_project) if os.path.exists(project_folder): try: # Call Gemini to identify required functions gemini_result = identify_required_functions(project_folder, functionality) # Generate documentation using Qwen final_documentation = validate_and_generate_documentation( API_URL, headers, gemini_result, functionality ) # Display the final documentation st.success("Documentation generated successfully!") st.text_area("Generated Documentation", final_documentation, height=600) except Exception as e: st.error(f"An error occurred: {e}") else: st.error("Project folder not found. Ensure the GitHub repository was cloned successfully.") else: st.error("Please enter the functionality to analyze.") #------------------------------------------------------------------------------------------------------------------------------------------------------------------------ def view_documentation_page(): st.subheader(f"View Documentation for {st.session_state.current_project}") st.write("This page will display the generated documentation for the selected project.") if st.button("Back to Project"): st.session_state.page = "project_view" st.rerun() def project_view_page(): # Sidebar with logout and return buttons st.sidebar.title(f"Project: {st.session_state.current_project}") if st.sidebar.button("Back to Workspace"): st.session_state.page = "workspace" st.rerun() if st.sidebar.button("Log Out"): st.session_state.authenticated = False st.session_state.username = None st.session_state.page = "login" st.rerun() # Main content for project page st.subheader(f"Project: {st.session_state.current_project}") st.write("Manage your project and explore its files.") # Buttons for documentation functionality if st.button("Generate Documentation"): st.session_state.page = "generate_documentation" st.rerun() if st.button("View Documentation"): st.session_state.page = "view_documentation" st.rerun() # Toggle file structure display (if required) if "show_file_structure" not in st.session_state: st.session_state.show_file_structure = False if st.button("Show File Structure"): st.session_state.show_file_structure = not st.session_state.show_file_structure if st.session_state.show_file_structure: user_folder = os.path.join("user_projects", st.session_state.username) project_folder = os.path.join(user_folder, st.session_state.current_project) st.write("File structure:") for root, dirs, files in os.walk(project_folder): level = root.replace(project_folder, "").count(os.sep) indent = " " * 4 * level if level == 0: st.write(f"📂 {os.path.basename(root)}") else: with st.expander(f"{indent}📂 {os.path.basename(root)}"): sub_indent = " " * 4 * (level + 1) for file in files: st.write(f"{sub_indent}📄 {file}") if __name__ == "__main__": main()