Spaces:
Build error
Build error
| import os | |
| import subprocess | |
| import streamlit as st # For Streamlit | |
| import gradio as gr # For Gradio | |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
| import black | |
| from pylint import lint | |
| from io import StringIO | |
| import openai | |
| import sys | |
| from datetime import datetime | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from typing import List, Dict, Optional | |
| from utils.code_utils import ( | |
| refine_code, | |
| test_code, | |
| integrate_code, | |
| CodeRefinementError, | |
| CodeTestingError, | |
| CodeIntegrationError, | |
| ) | |
| # --- Define custom exceptions for better error handling --- | |
| class InvalidActionError(Exception): | |
| """Raised when an invalid action is provided.""" | |
| pass | |
| class InvalidInputError(Exception): | |
| """Raised when invalid input is provided for an action.""" | |
| pass | |
| class CodeGenerationError(Exception): | |
| """Raised when code generation fails.""" | |
| pass | |
| class AppTestingError(Exception): | |
| """Raised when app testing fails.""" | |
| pass | |
| class WorkspaceExplorerError(Exception): | |
| """Raised when workspace exploration fails.""" | |
| pass | |
| class PromptManagementError(Exception): | |
| """Raised when prompt management fails.""" | |
| pass | |
| class SearchError(Exception): | |
| """Raised when search fails.""" | |
| pass | |
| # --- Define a class for the AI Agent --- | |
| class AIAgent: | |
| def __init__(self): | |
| # --- Initialize tools and attributes --- | |
| self.tools = { | |
| "SEARCH": self.search, | |
| "CODEGEN": self.code_generation, | |
| "REFINE-CODE": refine_code, # Use external function | |
| "TEST-CODE": test_code, # Use external function | |
| "INTEGRATE-CODE": integrate_code, # Use external function | |
| "TEST-APP": self.test_app, | |
| "GENERATE-REPORT": self.generate_report, | |
| "WORKSPACE-EXPLORER": self.workspace_explorer, | |
| "ADD_PROMPT": self.add_prompt, | |
| "ACTION_PROMPT": self.action_prompt, | |
| "COMPRESS_HISTORY_PROMPT": self.compress_history_prompt, | |
| "LOG_PROMPT": self.log_prompt, | |
| "LOG_RESPONSE": self.log_response, | |
| "MODIFY_PROMPT": self.modify_prompt, | |
| "PREFIX": self.prefix, | |
| "SEARCH_QUERY": self.search_query, | |
| "READ_PROMPT": self.read_prompt, | |
| "TASK_PROMPT": self.task_prompt, | |
| "UNDERSTAND_TEST_RESULTS_PROMPT": self.understand_test_results_prompt, | |
| } | |
| self.task_history: List[Dict[str, str]] = [] | |
| self.current_task: Optional[str] = None | |
| self.search_engine_url: str = "https://www.google.com/search?q=" # Default search engine | |
| self.prompts: List[str] = [] # Store prompts for future use | |
| self.code_generator = pipeline('text-generation', model='gpt2') # Initialize code generator | |
| # --- Implement search functionality --- | |
| def search(self, query: str) -> List[str]: | |
| """Performs a web search using the specified search engine.""" | |
| search_url = self.search_engine_url + query | |
| try: | |
| response = requests.get(search_url) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| results = soup.find_all('a', href=True) | |
| return [result['href'] for result in results] | |
| except requests.exceptions.RequestException as e: | |
| raise SearchError(f"Error during search: {e}") | |
| # --- Implement code generation functionality --- | |
| def code_generation(self, snippet: str) -> str: | |
| """Generates code based on the provided snippet.""" | |
| try: | |
| generated_text = self.code_generator(snippet, max_length=500, num_return_sequences=1)[0]['generated_text'] | |
| return generated_text | |
| except Exception as e: | |
| raise CodeGenerationError(f"Error during code generation: {e}") | |
| # --- Implement app testing functionality --- | |
| def test_app(self) -> str: | |
| """Tests the functionality of the app.""" | |
| try: | |
| subprocess.run(['streamlit', 'run', 'app.py'], check=True) | |
| return "App tested successfully." | |
| except subprocess.CalledProcessError as e: | |
| raise AppTestingError(f"Error during app testing: {e}") | |
| # --- Implement report generation functionality --- | |
| def generate_report(self) -> str: | |
| """Generates a report based on the task history.""" | |
| report = f"## Task Report: {self.current_task}\n\n" | |
| for task in self.task_history: | |
| report += f"**Action:** {task['action']}\n" | |
| report += f"**Input:** {task['input']}\n" | |
| report += f"**Output:** {task['output']}\n\n" | |
| return report | |
| # --- Implement workspace exploration functionality --- | |
| def workspace_explorer(self) -> str: | |
| """Provides a workspace explorer functionality.""" | |
| try: | |
| current_directory = os.getcwd() | |
| directories = [] | |
| files = [] | |
| for item in os.listdir(current_directory): | |
| item_path = os.path.join(current_directory, item) | |
| if os.path.isdir(item_path): | |
| directories.append(item) | |
| elif os.path.isfile(item_path): | |
| files.append(item) | |
| return f"**Directories:** {directories}\n**Files:** {files}" | |
| except Exception as e: | |
| raise WorkspaceExplorerError(f"Error during workspace exploration: {e}") | |
| # --- Implement prompt management functionality --- | |
| def add_prompt(self, prompt: str) -> str: | |
| """Adds a new prompt to the agent's knowledge base.""" | |
| try: | |
| self.prompts.append(prompt) | |
| return f"Prompt '{prompt}' added successfully." | |
| except Exception as e: | |
| raise PromptManagementError(f"Error adding prompt: {e}") | |
| # --- Implement prompt generation functionality --- | |
| def action_prompt(self, action: str) -> str: | |
| """Provides a prompt for a specific action.""" | |
| try: | |
| if action == "SEARCH": | |
| return "What do you want to search for?" | |
| elif action == "CODEGEN": | |
| return "Provide a code snippet to generate code from." | |
| elif action == "REFINE-CODE": | |
| return "Provide the file path of the code to refine." | |
| elif action == "TEST-CODE": | |
| return "Provide the file path of the code to test." | |
| elif action == "INTEGRATE-CODE": | |
| return "Provide the file path and code snippet to integrate." | |
| elif action == "TEST-APP": | |
| return "Test the application." | |
| elif action == "GENERATE-REPORT": | |
| return "Generate a report based on the task history." | |
| elif action == "WORKSPACE-EXPLORER": | |
| return "Explore the current workspace." | |
| elif action == "ADD_PROMPT": | |
| return "Enter the new prompt to add." | |
| elif action == "ACTION_PROMPT": | |
| return "Enter the action to get a prompt for." | |
| elif action == "COMPRESS_HISTORY_PROMPT": | |
| return "Compress the task history." | |
| elif action == "LOG_PROMPT": | |
| return "Enter the event to log." | |
| elif action == "LOG_RESPONSE": | |
| return "Log the specified event." | |
| elif action == "MODIFY_PROMPT": | |
| return "Enter the prompt to modify." | |
| elif action == "PREFIX": | |
| return "Enter the text to add a prefix to." | |
| elif action == "SEARCH_QUERY": | |
| return "Enter the topic to generate a search query for." | |
| elif action == "READ_PROMPT": | |
| return "Enter the file path to read." | |
| elif action == "TASK_PROMPT": | |
| return "Enter the new task to start." | |
| elif action == "UNDERSTAND_TEST_RESULTS_PROMPT": | |
| return "Enter your question about the test results." | |
| else: | |
| raise InvalidActionError("Please provide a valid action.") | |
| except InvalidActionError as e: | |
| raise e | |
| # --- Implement prompt generation functionality --- | |
| def compress_history_prompt(self) -> str: | |
| """Provides a prompt to compress the task history.""" | |
| return "Do you want to compress the task history?" | |
| # --- Implement prompt generation functionality --- | |
| def log_prompt(self) -> str: | |
| """Provides a prompt to log a specific event.""" | |
| return "What event do you want to log?" | |
| # --- Implement logging functionality --- | |
| def log_response(self, event: str) -> str: | |
| """Logs the specified event.""" | |
| print(f"Event logged: {event}") | |
| return "Event logged successfully." | |
| # --- Implement prompt modification functionality --- | |
| def modify_prompt(self, prompt: str) -> str: | |
| """Modifies an existing prompt.""" | |
| try: | |
| # Find the prompt to modify | |
| # Update the prompt | |
| return f"Prompt '{prompt}' modified successfully." | |
| except Exception as e: | |
| raise PromptManagementError(f"Error modifying prompt: {e}") | |
| # --- Implement prefix functionality --- | |
| def prefix(self, text: str) -> str: | |
| """Adds a prefix to the provided text.""" | |
| return f"PREFIX: {text}" | |
| # --- Implement search query generation functionality --- | |
| def search_query(self, query: str) -> str: | |
| """Provides a search query for the specified topic.""" | |
| return f"Search query: {query}" | |
| # --- Implement file reading functionality --- | |
| def read_prompt(self, file_path: str) -> str: | |
| """Provides a prompt to read the contents of a file.""" | |
| try: | |
| with open(file_path, 'r') as f: | |
| contents = f.read() | |
| return contents | |
| except FileNotFoundError: | |
| raise InvalidInputError(f"Error: File not found: {file_path}") | |
| except Exception as e: | |
| raise InvalidInputError(f"Error reading file: {e}") | |
| # --- Implement task prompt generation functionality --- | |
| def task_prompt(self) -> str: | |
| """Provides a prompt to start a new task.""" | |
| return "What task do you want to start?" | |
| # --- Implement test results understanding prompt generation functionality --- | |
| def understand_test_results_prompt(self) -> str: | |
| """Provides a prompt to understand the test results.""" | |
| return "What do you want to know about the test results?" | |
| # --- Implement input handling functionality --- | |
| def handle_input(self, input_str: str): | |
| """Handles user input and executes the corresponding action.""" | |
| try: | |
| action, *args = input_str.split() | |
| if action in self.tools: | |
| if args: | |
| self.task_history.append({ | |
| "action": action, | |
| "input": " ".join(args), | |
| "output": self.tools[action](" ".join(args)) | |
| }) | |
| else: | |
| self.task_history.append({ | |
| "action": action, | |
| "input": None, | |
| "output": self.tools[action]() | |
| }) | |
| print(f"Action: {action}\nInput: {' '.join(args)}\nOutput: {self.tools[action](' '.join(args))}") | |
| else: | |
| raise InvalidActionError("Invalid action. Please choose a valid action from the list of tools.") | |
| except (InvalidActionError, InvalidInputError, CodeGenerationError, | |
| CodeRefinementError, CodeTestingError, CodeIntegrationError, | |
| AppTestingError, WorkspaceExplorerError, PromptManagementError, | |
| SearchError) as e: | |
| print(f"Error: {e}") | |
| # --- Implement the main loop of the agent --- | |
| def run(self): | |
| """Runs the agent continuously, waiting for user input.""" | |
| while True: | |
| input_str = input("Enter a command for the AI Agent: ") | |
| self.handle_input(input_str) | |
| # --- Streamlit Integration --- | |
| if __name__ == '__main__': | |
| agent = AIAgent() | |
| st.title("AI Agent") | |
| st.write("Enter a command for the AI Agent:") | |
| input_str = st.text_input("") | |
| agent.handle_input(input_str) | |
| agent.run() |