Spaces:
Sleeping
Sleeping
import platform | |
import streamlit as st | |
import psutil | |
from typing import List, Dict, Optional, Any, Tuple | |
from dataclasses import dataclass | |
from enum import Enum | |
import logging | |
import time | |
import ast | |
import pylint.lint | |
import radon.complexity | |
import radon.metrics | |
from pylint.lint import Run | |
from pylint.reporters import JSONReporter | |
from coverage import Coverage | |
import bandit | |
from bandit.core import manager | |
from datetime import datetime | |
import os | |
import sys | |
import requests | |
import asyncio | |
import statistics | |
import json | |
import traceback | |
from pathlib import Path | |
# Set logging level from environment variable | |
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO')) | |
class CodeMetricsAnalyzer: | |
"""Analyzes code metrics using various tools""" | |
def __init__(self): | |
self.metrics_history = [] | |
def analyze_code_quality(self, file_path: str) -> Dict[str, Any]: | |
"""Analyzes code quality using multiple metrics""" | |
try: | |
# Pylint analysis | |
pylint_score = self._run_pylint(file_path) | |
# Complexity analysis | |
complexity_score = self._analyze_complexity(file_path) | |
# Test coverage analysis | |
coverage_score = self._analyze_test_coverage(file_path) | |
# Security analysis | |
security_score = self._analyze_security(file_path) | |
# Calculate overall quality score | |
quality_score = self._calculate_overall_score( | |
pylint_score, | |
complexity_score, | |
coverage_score, | |
security_score | |
) | |
metrics = { | |
"quality_score": quality_score, | |
"pylint_score": pylint_score, | |
"complexity_score": complexity_score, | |
"coverage_score": coverage_score, | |
"security_score": security_score, | |
"timestamp": datetime.now() | |
} | |
self.metrics_history.append(metrics) | |
return metrics | |
except Exception as e: | |
logging.error(f"Error analyzing code metrics: {str(e)}") | |
return { | |
"error": str(e), | |
"quality_score": 0.0, | |
"timestamp": datetime.now() | |
} | |
def _run_pylint(self, file_path: str) -> float: | |
"""Runs pylint analysis""" | |
try: | |
reporter = JSONReporter() | |
Run([file_path], reporter=reporter, do_exit=False) | |
score = reporter.data.get('score', 0.0) | |
return float(score) / 10.0 # Normalize to 0-1 scale | |
except Exception as e: | |
logging.error(f"Pylint analysis error: {str(e)}") | |
return 0.0 | |
def _analyze_complexity(self, file_path: str) -> float: | |
"""Analyzes code complexity""" | |
try: | |
with open(file_path, 'r') as file: | |
code = file.read() | |
# Calculate cyclomatic complexity | |
complexity = radon.complexity.cc_visit(code) | |
avg_complexity = sum(item.complexity for item in complexity) / len(complexity) if complexity else 0 | |
# Normalize complexity score (0-1 scale, lower is better) | |
normalized_score = 1.0 - min(avg_complexity / 10.0, 1.0) | |
return normalized_score | |
except Exception as e: | |
logging.error(f"Complexity analysis error: {str(e)}") | |
return 0.0 | |
async def _analyze_current_state(self, project_name: str) -> Dict[str, Any]: | |
"""Analyze current project state with detailed metrics.""" | |
try: | |
self.logger.info(f"Analyzing current state for project: {project_name}") | |
# Collect code metrics | |
code_metrics = await self._collect_code_metrics(project_name) | |
self.logger.info("Code metrics collected successfully.") | |
# Analyze test coverage | |
test_coverage = await self._analyze_test_coverage(project_name) | |
self.logger.info("Test coverage analysis completed.") | |
# Check security vulnerabilities | |
security_analysis = await self._analyze_security(project_name) | |
self.logger.info("Security analysis completed.") | |
# Measure performance metrics | |
performance_metrics = await self._measure_performance(project_name) | |
self.logger.info("Performance metrics measured.") | |
# Determine if requirements are met | |
meets_requirements = await self._check_requirements( | |
code_metrics, | |
test_coverage, | |
security_analysis, | |
performance_metrics | |
) | |
self.logger.info("Requirements check completed.") | |
return { | |
"code_metrics": code_metrics, | |
"test_coverage": test_coverage, | |
"security_analysis": security_analysis, | |
"performance_metrics": performance_metrics, | |
"meets_requirements": meets_requirements, | |
"timestamp": datetime.now() | |
} | |
except Exception as e: | |
self.logger.error(f"Error analyzing current state: {str(e)}") | |
raise | |
def _analyze_security(self, file_path: str) -> float: | |
"""Analyzes code security using bandit""" | |
try: | |
conf = manager.BanditManager() | |
conf.discover_files([file_path]) | |
conf.run_tests() | |
# Calculate security score based on findings | |
total_issues = len(conf.get_issue_list()) | |
max_severity = max((issue.severity for issue in conf.get_issue_list()), default=0) | |
# Normalize security score (0-1 scale, higher is better) | |
security_score = 1.0 - (total_issues * max_severity) / 10.0 | |
return max(0.0, min(1.0, security_score)) | |
except Exception as e: | |
logging.error(f"Security analysis error: {str(e)}") | |
return 0.0 | |
def _calculate_overall_score(self, pylint_score: float, complexity_score: float, | |
coverage_score: float, security_score: float) -> float: | |
"""Calculates overall code quality score""" | |
weights = { | |
'pylint': 0.3, | |
'complexity': 0.2, | |
'coverage': 0.25, | |
'security': 0.25 | |
} | |
overall_score = ( | |
weights['pylint'] * pylint_score + | |
weights['complexity'] * complexity_score + | |
weights['coverage'] * coverage_score + | |
weights['security'] * security_score | |
) | |
return max(0.0, min(1.0, overall_score)) | |
def get_metrics_history(self) -> List[Dict[str, Any]]: | |
"""Returns the history of metrics measurements""" | |
return self.metrics_history | |
def get_trend_analysis(self) -> Dict[str, Any]: | |
"""Analyzes trends in metrics over time""" | |
if not self.metrics_history: | |
return {"status": "No metrics history available"} | |
trends = { | |
"quality_score": self._calculate_trend([m["quality_score"] for m in self.metrics_history]), | |
"coverage_score": self._calculate_trend([m["coverage_score"] for m in self.metrics_history]), | |
"security_score": self._calculate_trend([m["security_score"] for m in self.metrics_history]) | |
} | |
return trends | |
def _calculate_trend(self, values: List[float]) -> Dict[str, Any]: | |
"""Calculates trend statistics for a metric""" | |
if not values: | |
return {"trend": "unknown", "change": 0.0} | |
recent_values = values[-3:] # Look at last 3 measurements | |
if len(recent_values) < 2: | |
return {"trend": "insufficient data", "change": 0.0} | |
change = recent_values[-1] - recent_values[0] | |
trend = "improving" if change > 0 else "declining" if change < 0 else "stable" | |
return { | |
"trend": trend, | |
"change": change, | |
"current": recent_values[-1], | |
"previous": recent_values[0] | |
} | |
class WorkspaceManager: | |
"""Manages the workspace for the Autonomous Agent System.""" | |
def __init__(self, workspace_dir: str): | |
self.workspace_dir = workspace_dir | |
def get_workspace_tree(self) -> Dict[str, Any]: | |
"""Get the structure of the workspace.""" | |
# Placeholder implementation | |
return {"workspace": "tree_structure"} | |
def create_file(self, filename: str, content: str) -> str: | |
"""Create a new file in the workspace.""" | |
file_path = os.path.join(self.workspace_dir, filename) | |
with open(file_path, 'w') as file: | |
file.write(content) | |
return f"File '{filename}' created successfully." | |
def delete_file(self, filename: str) -> str: | |
"""Delete a file from the workspace.""" | |
file_path = os.path.join(self.workspace_dir, filename) | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
return f"File '{filename}' deleted successfully." | |
return f"File '{filename}' not found." | |
class AutonomousAgentApp: | |
"""Main application class for the Autonomous Agent System""" | |
def __init__(self): | |
self.workspace_manager = WorkspaceManager(workspace_dir=os.getenv('WORKSPACE_DIR', 'workspace')) | |
self.pipeline = self._initialize_pipeline() | |
self.refinement_loop = self.RefinementLoop(pipeline=self.pipeline) | |
self.interface = self.StreamlitInterface(self) | |
def _initialize_pipeline(self) -> 'AutonomousAgentApp.DevelopmentPipeline': | |
"""Initialize the development pipeline""" | |
return self.DevelopmentPipeline( | |
workspace_manager=self.workspace_manager, | |
tool_manager=self._setup_tool_manager() | |
) | |
def _setup_tool_manager(self): | |
"""Setup tool manager with configuration""" | |
return self.ToolManager() # Use self.ToolManager | |
class ChatSystem: | |
"""Manages the chat interaction between users and the autonomous system""" | |
def __init__(self, agent: 'AutonomousAgentApp.AutonomousAgent'): | |
self.agent = agent | |
self.chat_history = [] | |
self.active_tasks = {} | |
self.command_handlers = { | |
'/task': self.handle_task_command, | |
'/status': self.handle_status_command, | |
'/stop': self.handle_stop_command, | |
'/help': self.handle_help_command, | |
'/modify': self.handle_modify_command | |
} | |
def render_chat_interface(self): | |
"""Render the chat interface in Streamlit sidebar""" | |
with st.sidebar: | |
st.markdown("---") | |
st.subheader("System Chat") | |
# Chat controls | |
if st.button("Clear Chat History"): | |
self.clear_chat_history() | |
# Chat history display | |
chat_container = st.container() | |
with chat_container: | |
for message in self.chat_history: | |
self._render_message(message) | |
# Input area | |
user_input = st.text_input("Type message/command...", key="chat_input") | |
if st.button("Send", key="send_message"): | |
self.process_user_input(user_input) | |
class RefinementLoop: | |
"""Manages the iterative refinement process""" | |
def __init__(self, pipeline): | |
self.pipeline = pipeline | |
self.max_iterations = 10 | |
self.quality_metrics = QualityMetrics() | |
self.logger = logging.getLogger(__name__) | |
self.current_iteration = 0 | |
self.history = [] | |
async def run_refinement_cycle(self, task: str) -> Dict[str, Any]: | |
"""Run a complete refinement cycle for the given task""" | |
self.logger.info(f"Starting refinement cycle for task: {task}") | |
self.current_iteration = 0 | |
try: | |
while self.current_iteration < self.max_iterations: | |
self.logger.info(f"Starting iteration {self.current_iteration + 1}") | |
# Execute pipeline stages | |
planning_result = await self.pipeline.execute_stage( | |
self.pipeline.PipelineStage.PLANNING, | |
{"task": task} | |
) | |
development_result = await self.pipeline.execute_stage( | |
self.pipeline.PipelineStage.DEVELOPMENT, | |
planning_result["result"] | |
) | |
testing_result = await self.pipeline.execute_stage( | |
self.pipeline.PipelineStage.TESTING, | |
development_result["result"] | |
) | |
# Analyze results | |
quality_analysis = self._analyze_quality(testing_result["result"]) | |
# Record iteration history | |
self.history.append({ | |
"iteration": self.current_iteration, | |
"quality_metrics": quality_analysis, | |
"timestamp": datetime.now() | |
}) | |
# Check if quality requirements are met | |
if self._meets_quality_requirements(quality_analysis): | |
self.logger.info("Quality requirements met. Refinement cycle complete.") | |
return self._prepare_final_result(quality_analysis) | |
self.current_iteration += 1 | |
return { | |
"status": "max_iterations_reached", | |
"iterations_completed": self.current_iteration, | |
"final_quality": quality_analysis | |
} | |
except Exception as e: | |
self.logger.error(f"Error in refinement cycle: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
def _analyze_quality(self, result: Dict[str, Any]) -> Dict[str, float]: | |
"""Analyze the quality metrics of the current iteration""" | |
return { | |
"code_quality": self.quality_metrics.code_quality_score, | |
"test_coverage": self.quality_metrics.test_coverage, | |
"security_score": float(self.quality_metrics.security_score) | |
} | |
def _meets_quality_requirements(self, quality_analysis: Dict[str, float]) -> bool: | |
"""Check if the current quality metrics meet the requirements""" | |
thresholds = self.quality_metrics.thresholds | |
return ( | |
quality_analysis["code_quality"] >= thresholds["code_quality"] and | |
quality_analysis["test_coverage"] >= thresholds["test_coverage"] and | |
quality_analysis["security_score"] >= thresholds["security"] | |
) | |
def _prepare_final_result(self, quality_analysis: Dict[str, float]) -> Dict[str, Any]: | |
"""Prepare the final result of the refinement cycle""" | |
return { | |
"status": "success", | |
"iterations_completed": self.current_iteration, | |
"final_quality": quality_analysis, | |
"history": self.history | |
} | |
def get_refinement_history(self) -> List[Dict[str, Any]]: | |
"""Get the history of refinement iterations""" | |
return self.history | |
def run(self): | |
"""Main application entry point""" | |
try: | |
logging.info("Starting Autonomous Agent Application") | |
self.interface.render_main_interface() | |
except Exception as e: | |
logging.error(f"Application error: {str(e)}") | |
st.error("An error occurred while starting the application. Please check the logs.") | |
raise | |
class QualityMetrics: | |
"""Advanced quality metrics tracking and analysis""" | |
metrics_analyzer: CodeMetricsAnalyzer = None | |
code_quality_score: float = 0.0 | |
test_coverage: float = 0.0 | |
security_score: str = "unknown" | |
performance_score: float = 0.0 | |
metrics_analyzer: CodeMetricsAnalyzer = None | |
def __post_init__(self): | |
self.metrics_analyzer = CodeMetricsAnalyzer() | |
self.history = [] | |
self.thresholds = { | |
"code_quality": 0.85, | |
"test_coverage": 0.90, | |
"security": 0.85, | |
"performance": 0.80 | |
} | |
class CodeAnalyzer: | |
def __init__(self): | |
self.history = [] | |
self.code_quality_score = 0.0 | |
self.test_coverage = 0.0 | |
self.security_score = "0.0" | |
def _get_project_files(self, project_name: str) -> list: | |
# Dummy implementation for example purposes | |
return ["file1.py", "file2.py"] | |
def analyze_code(self, project_name: str) -> Dict[str, Any]: | |
"""Comprehensive code analysis""" | |
try: | |
# Get all Python files in the project | |
project_files = self._get_project_files(project_name) | |
aggregated_metrics = { | |
"code_quality": 0.0, | |
"test_coverage": 0.0, | |
"security": 0.0, | |
"performance": 0.0, | |
"files_analyzed": len(project_files), | |
"detailed_metrics": [] | |
} | |
for file_path in project_files: | |
metrics = self.metrics_analyzer.analyze_code_quality(file_path) | |
aggregated_metrics["detailed_metrics"].append({ | |
"file": file_path, | |
"metrics": metrics | |
}) | |
# Update aggregated scores | |
aggregated_metrics["code_quality"] += metrics["quality_score"] | |
aggregated_metrics["test_coverage"] += metrics["coverage_score"] | |
aggregated_metrics["security"] += metrics["security_score"] | |
# Calculate averages | |
if project_files: | |
for key in ["code_quality", "test_coverage", "security"]: | |
aggregated_metrics[key] /= len(project_files) | |
# Update instance variables | |
self.code_quality_score = aggregated_metrics["code_quality"] | |
self.test_coverage = aggregated_metrics["test_coverage"] | |
self.security_score = str(aggregated_metrics["security"]) | |
# Add to history | |
self.history.append({ | |
"timestamp": datetime.now(), | |
"metrics": aggregated_metrics | |
}) | |
return aggregated_metrics | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return {} | |
except Exception as e: | |
logging.error(f"Error in code analysis: {str(e)}") | |
return { | |
"error": str(e), | |
"code_quality": 0.0, | |
"test_coverage": 0.0, | |
"security": "error", | |
"performance": 0.0 | |
} | |
def _get_project_files(self, project_name: str) -> List[str]: | |
"""Get all Python files in the project""" | |
project_dir = os.path.join(os.getcwd(), project_name) | |
python_files = [] | |
for root, _, files in os.walk(project_dir): | |
for file in files: | |
if file.endswith('.py'): | |
python_files.append(os.path.join(root, file)) | |
return python_files | |
def get_improvement_suggestions(self) -> List[str]: | |
"""Generate improvement suggestions based on metrics""" | |
suggestions = [] | |
latest_metrics = self.history[-1]["metrics"] if self.history else None | |
if not latest_metrics: | |
return ["No metrics available for analysis"] | |
if latest_metrics["code_quality"] < self.thresholds["code_quality"]: | |
suggestions.append( | |
f"Code quality score ({latest_metrics['code_quality']:.2f}) is below threshold " | |
f"({self.thresholds['code_quality']}). Consider refactoring complex methods." | |
) | |
if latest_metrics["test_coverage"] < self.thresholds["test_coverage"]: | |
suggestions.append( | |
f"Test coverage ({latest_metrics['test_coverage']:.2f}) is below threshold " | |
f"({self.thresholds['test_coverage']}). Add more unit tests." | |
) | |
if float(latest_metrics["security"]) < self.thresholds["security"]: | |
suggestions.append( | |
f"Security score ({latest_metrics['security']}) is below threshold " | |
f"({self.thresholds['security']}). Address security vulnerabilities." | |
) | |
return suggestions | |
class StreamlitInterface: | |
"""Streamlit UI integration for the Autonomous Agent system.""" | |
def main(): | |
autonomous_agent_app = AutonomousAgentApp() | |
app.run() | |
def __init__(self, app: 'AutonomousAgentApp'): # Use string forward reference | |
self.app = app | |
self.chat_system = self.app.ChatSystem(self.app.autonomous_agent) | |
def render_main_interface(self): | |
"""Render the main Streamlit interface.""" | |
st.title("Autonomous Agent System") | |
# Add chat interface to sidebar | |
self.chat_system.render_chat_interface() | |
# Main content tabs | |
tab_names = ["Autonomous Agent", "Workspace Management", "Settings"] | |
selected_tab = st.selectbox("Select a Tab", tab_names) | |
if selected_tab == "Autonomous Agent": | |
self.render_autonomous_agent_tab() | |
elif selected_tab == "Workspace Management": | |
self.render_workspace_management_tab() | |
elif selected_tab == "Settings": | |
self.render_settings_tab() | |
def render_autonomous_agent_tab(self): | |
"""Render the Autonomous Agent tab.""" | |
st.header("Autonomous Agent") | |
task = st.text_area("Enter a task for the autonomous agent:") | |
if st.button("Run Autonomous Agent"): | |
if task: | |
# Run the autonomous agent with the provided task | |
try: | |
result = asyncio.run(self.app.refinement_loop.run_refinement_cycle(task)) | |
st.success(f"Result: {result}") | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
def render_workspace_management_tab(self): | |
"""Render the Workspace Management tab with a workspace explorer.""" | |
st.header("Workspace Management") | |
# Workspace Explorer | |
st.subheader("Workspace Explorer") | |
workspace_tree = self.app.workspace_manager.get_workspace_tree() | |
self._render_tree(workspace_tree) | |
# File creation | |
st.subheader("Create a File") | |
new_filename = st.text_input("Enter filename:") | |
new_file_content = st.text_area("Enter file content:") | |
if st.button("Create File"): | |
if new_filename and new_file_content: | |
result = self.app.workspace_manager.create_file(new_filename, new_file_content) | |
st.success(result) | |
else: | |
st.error("Filename and content are required.") | |
# File deletion | |
st.subheader("Delete a File") | |
delete_filename = st.text_input("Enter filename to delete:") | |
if st.button("Delete File"): | |
if delete_filename: | |
result = self.app.workspace_manager.delete_file(delete_filename) | |
st.success(result) | |
else: | |
st.error("Filename is required.") | |
def _render_tree(self, tree: Dict[str, Any], level: int = 0): | |
"""Recursively render the workspace directory tree.""" | |
if tree["type"] == "file": | |
st.write(" " * level + f"📄 {tree['name']}") | |
elif tree["type"] == "directory": | |
st.write(" " * level + f"📁 {tree['name']}") | |
for child in tree["children"]: | |
self._render_tree(child, level + 1) | |
def render_settings_tab(self): | |
"""Render the Settings tab.""" | |
st.header("Application Settings") | |
# Section 1: Refinement Process Configuration | |
st.subheader("Refinement Process Settings") | |
# Adjust maximum refinement iterations | |
current_max_iter = self.app.refinement_loop.max_iterations | |
new_max_iter = st.number_input( | |
"Maximum Refinement Iterations", | |
min_value=1, | |
max_value=20, | |
value=current_max_iter, | |
help="Maximum number of refinement cycles to perform" | |
) | |
if new_max_iter != current_max_iter: | |
self.app.refinement_loop.max_iterations = new_max_iter | |
st.success(f"Updated maximum iterations to {new_max_iter}") | |
# Section 2: Quality Threshold Configuration | |
st.subheader("Quality Thresholds") | |
# Get current thresholds | |
thresholds = self.app.refinement_loop.quality_metrics.thresholds | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
new_code_quality = st.slider( | |
"Code Quality Threshold", | |
0.0, 1.0, thresholds["code_quality"], | |
help="Minimum acceptable code quality score" | |
) | |
with col2: | |
new_test_coverage = st.slider( | |
"Test Coverage Threshold", | |
0.0, 1.0, thresholds["test_coverage"], | |
help="Minimum required test coverage" | |
) | |
with col3: | |
new_security = st.slider( | |
"Security Threshold", | |
0.0, 1.0, thresholds["security"], | |
help="Minimum acceptable security score" | |
) | |
if st.button("Update Quality Thresholds"): | |
self.app.refinement_loop.quality_metrics.thresholds.update({ | |
"code_quality": new_code_quality, | |
"test_coverage": new_test_coverage, | |
"security": new_security | |
}) | |
st.success("Quality thresholds updated!") | |
# Section 3: Performance Configuration | |
st.subheader("Performance Settings") | |
# Concurrency settings | |
concurrency_level = st.selectbox( | |
"Max Concurrency", | |
options=[1, 2, 4, 8], | |
index=2, | |
help="Maximum parallel tasks for code analysis" | |
) | |
# Resource limits | |
mem_limit = st.slider( | |
"Memory Limit (GB)", | |
1, 16, 4, | |
help="Maximum memory allocation for pipeline operations" | |
) | |
# Section 4: Security Settings | |
st.subheader("Security Configuration") | |
# Security rules toggle | |
enable_security_scan = st.checkbox( | |
"Enable Real-time Security Scanning", | |
value=True, | |
help="Perform continuous security analysis during development" | |
) | |
# Severity level filtering | |
security_level = st.selectbox( | |
"Minimum Security Severity Level", | |
["Low", "Medium", "High", "Critical"], | |
index=1, | |
help="Minimum severity level to trigger security alerts" | |
) | |
# Section 5: Workspace Configuration | |
st.subheader("Workspace Settings") | |
current_workspace = self.app.workspace_manager.workspace_dir | |
st.write(f"Current Workspace: `{current_workspace}`") | |
# Workspace actions | |
if st.button("Clear Workspace Cache"): | |
self.app.workspace_manager.clean_cache() | |
st.success("Workspace cache cleared!") | |
# Section 6: Diagnostic Settings | |
st.subheader("Diagnostics") | |
# Logging controls | |
log_level = st.selectbox( | |
"Logging Level", | |
["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], | |
index=1 | |
) | |
st.session_state.log_level = log_level # Store in session state | |
logging.getLogger().setLevel(log_level) | |
# Debug mode toggle | |
debug_mode = st.checkbox("Enable Debug Mode") | |
st.session_state.debug_mode = debug_mode # Store in session state | |
if debug_mode: | |
self.app.refinement_loop.logger.setLevel(logging.DEBUG) | |
else: | |
self.app.refinement_loop.logger.setLevel(logging.INFO) | |
# Section 7: System Information | |
st.subheader("System Info") | |
st.write(f"Python Version: {sys.version}") | |
st.write(f"Platform: {platform.platform()}") | |
st.write(f"Available Memory: {psutil.virtual_memory().available / (1024**3):.1f} GB free") | |
if __name__ == "__main__": | |
app = AutonomousAgentApp() # Create an instance of the app | |
app.run() # Call the run method to start the application |