microhugs / definitions.py
acecalisto3's picture
Update definitions.py
240020f verified
raw
history blame
29.8 kB
import platform
import streamlit as st
import psutil
from typing import List, Dict, Optional, Any, Tuple
from dataclasses import dataclass
from enum import Enum
import logging
import time
import ast
import pylint.lint
import radon.complexity
import radon.metrics
from pylint.lint import Run
from pylint.reporters import JSONReporter
from coverage import Coverage
import bandit
from bandit.core import manager
from datetime import datetime
import os
import sys
import requests
import asyncio
import statistics
import json
import traceback
from pathlib import Path
# Set logging level from environment variable
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO'))
class CodeMetricsAnalyzer:
"""Analyzes code metrics using various tools"""
def __init__(self):
self.metrics_history = []
def analyze_code_quality(self, file_path: str) -> Dict[str, Any]:
"""Analyzes code quality using multiple metrics"""
try:
# Pylint analysis
pylint_score = self._run_pylint(file_path)
# Complexity analysis
complexity_score = self._analyze_complexity(file_path)
# Test coverage analysis
coverage_score = self._analyze_test_coverage(file_path)
# Security analysis
security_score = self._analyze_security(file_path)
# Calculate overall quality score
quality_score = self._calculate_overall_score(
pylint_score,
complexity_score,
coverage_score,
security_score
)
metrics = {
"quality_score": quality_score,
"pylint_score": pylint_score,
"complexity_score": complexity_score,
"coverage_score": coverage_score,
"security_score": security_score,
"timestamp": datetime.now()
}
self.metrics_history.append(metrics)
return metrics
except Exception as e:
logging.error(f"Error analyzing code metrics: {str(e)}")
return {
"error": str(e),
"quality_score": 0.0,
"timestamp": datetime.now()
}
def _run_pylint(self, file_path: str) -> float:
"""Runs pylint analysis"""
try:
reporter = JSONReporter()
Run([file_path], reporter=reporter, do_exit=False)
score = reporter.data.get('score', 0.0)
return float(score) / 10.0 # Normalize to 0-1 scale
except Exception as e:
logging.error(f"Pylint analysis error: {str(e)}")
return 0.0
def _analyze_complexity(self, file_path: str) -> float:
"""Analyzes code complexity"""
try:
with open(file_path, 'r') as file:
code = file.read()
# Calculate cyclomatic complexity
complexity = radon.complexity.cc_visit(code)
avg_complexity = sum(item.complexity for item in complexity) / len(complexity) if complexity else 0
# Normalize complexity score (0-1 scale, lower is better)
normalized_score = 1.0 - min(avg_complexity / 10.0, 1.0)
return normalized_score
except Exception as e:
logging.error(f"Complexity analysis error: {str(e)}")
return 0.0
async def _analyze_current_state(self, project_name: str) -> Dict[str, Any]:
"""Analyze current project state with detailed metrics."""
try:
self.logger.info(f"Analyzing current state for project: {project_name}")
# Collect code metrics
code_metrics = await self._collect_code_metrics(project_name)
self.logger.info("Code metrics collected successfully.")
# Analyze test coverage
test_coverage = await self._analyze_test_coverage(project_name)
self.logger.info("Test coverage analysis completed.")
# Check security vulnerabilities
security_analysis = await self._analyze_security(project_name)
self.logger.info("Security analysis completed.")
# Measure performance metrics
performance_metrics = await self._measure_performance(project_name)
self.logger.info("Performance metrics measured.")
# Determine if requirements are met
meets_requirements = await self._check_requirements(
code_metrics,
test_coverage,
security_analysis,
performance_metrics
)
self.logger.info("Requirements check completed.")
return {
"code_metrics": code_metrics,
"test_coverage": test_coverage,
"security_analysis": security_analysis,
"performance_metrics": performance_metrics,
"meets_requirements": meets_requirements,
"timestamp": datetime.now()
}
except Exception as e:
self.logger.error(f"Error analyzing current state: {str(e)}")
raise
def _analyze_security(self, file_path: str) -> float:
"""Analyzes code security using bandit"""
try:
conf = manager.BanditManager()
conf.discover_files([file_path])
conf.run_tests()
# Calculate security score based on findings
total_issues = len(conf.get_issue_list())
max_severity = max((issue.severity for issue in conf.get_issue_list()), default=0)
# Normalize security score (0-1 scale, higher is better)
security_score = 1.0 - (total_issues * max_severity) / 10.0
return max(0.0, min(1.0, security_score))
except Exception as e:
logging.error(f"Security analysis error: {str(e)}")
return 0.0
def _calculate_overall_score(self, pylint_score: float, complexity_score: float,
coverage_score: float, security_score: float) -> float:
"""Calculates overall code quality score"""
weights = {
'pylint': 0.3,
'complexity': 0.2,
'coverage': 0.25,
'security': 0.25
}
overall_score = (
weights['pylint'] * pylint_score +
weights['complexity'] * complexity_score +
weights['coverage'] * coverage_score +
weights['security'] * security_score
)
return max(0.0, min(1.0, overall_score))
def get_metrics_history(self) -> List[Dict[str, Any]]:
"""Returns the history of metrics measurements"""
return self.metrics_history
def get_trend_analysis(self) -> Dict[str, Any]:
"""Analyzes trends in metrics over time"""
if not self.metrics_history:
return {"status": "No metrics history available"}
trends = {
"quality_score": self._calculate_trend([m["quality_score"] for m in self.metrics_history]),
"coverage_score": self._calculate_trend([m["coverage_score"] for m in self.metrics_history]),
"security_score": self._calculate_trend([m["security_score"] for m in self.metrics_history])
}
return trends
def _calculate_trend(self, values: List[float]) -> Dict[str, Any]:
"""Calculates trend statistics for a metric"""
if not values:
return {"trend": "unknown", "change": 0.0}
recent_values = values[-3:] # Look at last 3 measurements
if len(recent_values) < 2:
return {"trend": "insufficient data", "change": 0.0}
change = recent_values[-1] - recent_values[0]
trend = "improving" if change > 0 else "declining" if change < 0 else "stable"
return {
"trend": trend,
"change": change,
"current": recent_values[-1],
"previous": recent_values[0]
}
class WorkspaceManager:
"""Manages the workspace for the Autonomous Agent System."""
def __init__(self, workspace_dir: str):
self.workspace_dir = workspace_dir
def get_workspace_tree(self) -> Dict[str, Any]:
"""Get the structure of the workspace."""
# Placeholder implementation
return {"workspace": "tree_structure"}
def create_file(self, filename: str, content: str) -> str:
"""Create a new file in the workspace."""
file_path = os.path.join(self.workspace_dir, filename)
with open(file_path, 'w') as file:
file.write(content)
return f"File '{filename}' created successfully."
def delete_file(self, filename: str) -> str:
"""Delete a file from the workspace."""
file_path = os.path.join(self.workspace_dir, filename)
if os.path.exists(file_path):
os.remove(file_path)
return f"File '{filename}' deleted successfully."
return f"File '{filename}' not found."
class AutonomousAgentApp:
"""Main application class for the Autonomous Agent System"""
def __init__(self):
self.workspace_manager = WorkspaceManager(workspace_dir=os.getenv('WORKSPACE_DIR', 'workspace'))
self.pipeline = self._initialize_pipeline()
self.refinement_loop = self.RefinementLoop(pipeline=self.pipeline)
self.interface = self.StreamlitInterface(self)
def _initialize_pipeline(self) -> 'AutonomousAgentApp.DevelopmentPipeline':
"""Initialize the development pipeline"""
return self.DevelopmentPipeline(
workspace_manager=self.workspace_manager,
tool_manager=self._setup_tool_manager()
)
def _setup_tool_manager(self):
"""Setup tool manager with configuration"""
return self.ToolManager() # Use self.ToolManager
class ChatSystem:
"""Manages the chat interaction between users and the autonomous system"""
def __init__(self, agent: 'AutonomousAgentApp.AutonomousAgent'):
self.agent = agent
self.chat_history = []
self.active_tasks = {}
self.command_handlers = {
'/task': self.handle_task_command,
'/status': self.handle_status_command,
'/stop': self.handle_stop_command,
'/help': self.handle_help_command,
'/modify': self.handle_modify_command
}
def render_chat_interface(self):
"""Render the chat interface in Streamlit sidebar"""
with st.sidebar:
st.markdown("---")
st.subheader("System Chat")
# Chat controls
if st.button("Clear Chat History"):
self.clear_chat_history()
# Chat history display
chat_container = st.container()
with chat_container:
for message in self.chat_history:
self._render_message(message)
# Input area
user_input = st.text_input("Type message/command...", key="chat_input")
if st.button("Send", key="send_message"):
self.process_user_input(user_input)
class RefinementLoop:
"""Manages the iterative refinement process"""
def __init__(self, pipeline):
self.pipeline = pipeline
self.max_iterations = 10
self.quality_metrics = QualityMetrics()
self.logger = logging.getLogger(__name__)
self.current_iteration = 0
self.history = []
async def run_refinement_cycle(self, task: str) -> Dict[str, Any]:
"""Run a complete refinement cycle for the given task"""
self.logger.info(f"Starting refinement cycle for task: {task}")
self.current_iteration = 0
try:
while self.current_iteration < self.max_iterations:
self.logger.info(f"Starting iteration {self.current_iteration + 1}")
# Execute pipeline stages
planning_result = await self.pipeline.execute_stage(
self.pipeline.PipelineStage.PLANNING,
{"task": task}
)
development_result = await self.pipeline.execute_stage(
self.pipeline.PipelineStage.DEVELOPMENT,
planning_result["result"]
)
testing_result = await self.pipeline.execute_stage(
self.pipeline.PipelineStage.TESTING,
development_result["result"]
)
# Analyze results
quality_analysis = self._analyze_quality(testing_result["result"])
# Record iteration history
self.history.append({
"iteration": self.current_iteration,
"quality_metrics": quality_analysis,
"timestamp": datetime.now()
})
# Check if quality requirements are met
if self._meets_quality_requirements(quality_analysis):
self.logger.info("Quality requirements met. Refinement cycle complete.")
return self._prepare_final_result(quality_analysis)
self.current_iteration += 1
return {
"status": "max_iterations_reached",
"iterations_completed": self.current_iteration,
"final_quality": quality_analysis
}
except Exception as e:
self.logger.error(f"Error in refinement cycle: {str(e)}")
return {"status": "error", "error": str(e)}
def _analyze_quality(self, result: Dict[str, Any]) -> Dict[str, float]:
"""Analyze the quality metrics of the current iteration"""
return {
"code_quality": self.quality_metrics.code_quality_score,
"test_coverage": self.quality_metrics.test_coverage,
"security_score": float(self.quality_metrics.security_score)
}
def _meets_quality_requirements(self, quality_analysis: Dict[str, float]) -> bool:
"""Check if the current quality metrics meet the requirements"""
thresholds = self.quality_metrics.thresholds
return (
quality_analysis["code_quality"] >= thresholds["code_quality"] and
quality_analysis["test_coverage"] >= thresholds["test_coverage"] and
quality_analysis["security_score"] >= thresholds["security"]
)
def _prepare_final_result(self, quality_analysis: Dict[str, float]) -> Dict[str, Any]:
"""Prepare the final result of the refinement cycle"""
return {
"status": "success",
"iterations_completed": self.current_iteration,
"final_quality": quality_analysis,
"history": self.history
}
def get_refinement_history(self) -> List[Dict[str, Any]]:
"""Get the history of refinement iterations"""
return self.history
def run(self):
"""Main application entry point"""
try:
logging.info("Starting Autonomous Agent Application")
self.interface.render_main_interface()
except Exception as e:
logging.error(f"Application error: {str(e)}")
st.error("An error occurred while starting the application. Please check the logs.")
raise
@dataclass
class QualityMetrics:
"""Advanced quality metrics tracking and analysis"""
metrics_analyzer: CodeMetricsAnalyzer = None
code_quality_score: float = 0.0
test_coverage: float = 0.0
security_score: str = "unknown"
performance_score: float = 0.0
metrics_analyzer: CodeMetricsAnalyzer = None
def __post_init__(self):
self.metrics_analyzer = CodeMetricsAnalyzer()
self.history = []
self.thresholds = {
"code_quality": 0.85,
"test_coverage": 0.90,
"security": 0.85,
"performance": 0.80
}
class CodeAnalyzer:
def __init__(self):
self.history = []
self.code_quality_score = 0.0
self.test_coverage = 0.0
self.security_score = "0.0"
def _get_project_files(self, project_name: str) -> list:
# Dummy implementation for example purposes
return ["file1.py", "file2.py"]
def analyze_code(self, project_name: str) -> Dict[str, Any]:
"""Comprehensive code analysis"""
try:
# Get all Python files in the project
project_files = self._get_project_files(project_name)
aggregated_metrics = {
"code_quality": 0.0,
"test_coverage": 0.0,
"security": 0.0,
"performance": 0.0,
"files_analyzed": len(project_files),
"detailed_metrics": []
}
for file_path in project_files:
metrics = self.metrics_analyzer.analyze_code_quality(file_path)
aggregated_metrics["detailed_metrics"].append({
"file": file_path,
"metrics": metrics
})
# Update aggregated scores
aggregated_metrics["code_quality"] += metrics["quality_score"]
aggregated_metrics["test_coverage"] += metrics["coverage_score"]
aggregated_metrics["security"] += metrics["security_score"]
# Calculate averages
if project_files:
for key in ["code_quality", "test_coverage", "security"]:
aggregated_metrics[key] /= len(project_files)
# Update instance variables
self.code_quality_score = aggregated_metrics["code_quality"]
self.test_coverage = aggregated_metrics["test_coverage"]
self.security_score = str(aggregated_metrics["security"])
# Add to history
self.history.append({
"timestamp": datetime.now(),
"metrics": aggregated_metrics
})
return aggregated_metrics
except Exception as e:
print(f"An error occurred: {e}")
return {}
except Exception as e:
logging.error(f"Error in code analysis: {str(e)}")
return {
"error": str(e),
"code_quality": 0.0,
"test_coverage": 0.0,
"security": "error",
"performance": 0.0
}
def _get_project_files(self, project_name: str) -> List[str]:
"""Get all Python files in the project"""
project_dir = os.path.join(os.getcwd(), project_name)
python_files = []
for root, _, files in os.walk(project_dir):
for file in files:
if file.endswith('.py'):
python_files.append(os.path.join(root, file))
return python_files
def get_improvement_suggestions(self) -> List[str]:
"""Generate improvement suggestions based on metrics"""
suggestions = []
latest_metrics = self.history[-1]["metrics"] if self.history else None
if not latest_metrics:
return ["No metrics available for analysis"]
if latest_metrics["code_quality"] < self.thresholds["code_quality"]:
suggestions.append(
f"Code quality score ({latest_metrics['code_quality']:.2f}) is below threshold "
f"({self.thresholds['code_quality']}). Consider refactoring complex methods."
)
if latest_metrics["test_coverage"] < self.thresholds["test_coverage"]:
suggestions.append(
f"Test coverage ({latest_metrics['test_coverage']:.2f}) is below threshold "
f"({self.thresholds['test_coverage']}). Add more unit tests."
)
if float(latest_metrics["security"]) < self.thresholds["security"]:
suggestions.append(
f"Security score ({latest_metrics['security']}) is below threshold "
f"({self.thresholds['security']}). Address security vulnerabilities."
)
return suggestions
class StreamlitInterface:
"""Streamlit UI integration for the Autonomous Agent system."""
def main():
autonomous_agent_app = AutonomousAgentApp()
app.run()
def __init__(self, app: 'AutonomousAgentApp'): # Use string forward reference
self.app = app
self.chat_system = self.app.ChatSystem(self.app.autonomous_agent)
def render_main_interface(self):
"""Render the main Streamlit interface."""
st.title("Autonomous Agent System")
# Add chat interface to sidebar
self.chat_system.render_chat_interface()
# Main content tabs
tab_names = ["Autonomous Agent", "Workspace Management", "Settings"]
selected_tab = st.selectbox("Select a Tab", tab_names)
if selected_tab == "Autonomous Agent":
self.render_autonomous_agent_tab()
elif selected_tab == "Workspace Management":
self.render_workspace_management_tab()
elif selected_tab == "Settings":
self.render_settings_tab()
def render_autonomous_agent_tab(self):
"""Render the Autonomous Agent tab."""
st.header("Autonomous Agent")
task = st.text_area("Enter a task for the autonomous agent:")
if st.button("Run Autonomous Agent"):
if task:
# Run the autonomous agent with the provided task
try:
result = asyncio.run(self.app.refinement_loop.run_refinement_cycle(task))
st.success(f"Result: {result}")
except Exception as e:
st.error(f"An error occurred: {str(e)}")
def render_workspace_management_tab(self):
"""Render the Workspace Management tab with a workspace explorer."""
st.header("Workspace Management")
# Workspace Explorer
st.subheader("Workspace Explorer")
workspace_tree = self.app.workspace_manager.get_workspace_tree()
self._render_tree(workspace_tree)
# File creation
st.subheader("Create a File")
new_filename = st.text_input("Enter filename:")
new_file_content = st.text_area("Enter file content:")
if st.button("Create File"):
if new_filename and new_file_content:
result = self.app.workspace_manager.create_file(new_filename, new_file_content)
st.success(result)
else:
st.error("Filename and content are required.")
# File deletion
st.subheader("Delete a File")
delete_filename = st.text_input("Enter filename to delete:")
if st.button("Delete File"):
if delete_filename:
result = self.app.workspace_manager.delete_file(delete_filename)
st.success(result)
else:
st.error("Filename is required.")
def _render_tree(self, tree: Dict[str, Any], level: int = 0):
"""Recursively render the workspace directory tree."""
if tree["type"] == "file":
st.write(" " * level + f"📄 {tree['name']}")
elif tree["type"] == "directory":
st.write(" " * level + f"📁 {tree['name']}")
for child in tree["children"]:
self._render_tree(child, level + 1)
def render_settings_tab(self):
"""Render the Settings tab."""
st.header("Application Settings")
# Section 1: Refinement Process Configuration
st.subheader("Refinement Process Settings")
# Adjust maximum refinement iterations
current_max_iter = self.app.refinement_loop.max_iterations
new_max_iter = st.number_input(
"Maximum Refinement Iterations",
min_value=1,
max_value=20,
value=current_max_iter,
help="Maximum number of refinement cycles to perform"
)
if new_max_iter != current_max_iter:
self.app.refinement_loop.max_iterations = new_max_iter
st.success(f"Updated maximum iterations to {new_max_iter}")
# Section 2: Quality Threshold Configuration
st.subheader("Quality Thresholds")
# Get current thresholds
thresholds = self.app.refinement_loop.quality_metrics.thresholds
col1, col2, col3 = st.columns(3)
with col1:
new_code_quality = st.slider(
"Code Quality Threshold",
0.0, 1.0, thresholds["code_quality"],
help="Minimum acceptable code quality score"
)
with col2:
new_test_coverage = st.slider(
"Test Coverage Threshold",
0.0, 1.0, thresholds["test_coverage"],
help="Minimum required test coverage"
)
with col3:
new_security = st.slider(
"Security Threshold",
0.0, 1.0, thresholds["security"],
help="Minimum acceptable security score"
)
if st.button("Update Quality Thresholds"):
self.app.refinement_loop.quality_metrics.thresholds.update({
"code_quality": new_code_quality,
"test_coverage": new_test_coverage,
"security": new_security
})
st.success("Quality thresholds updated!")
# Section 3: Performance Configuration
st.subheader("Performance Settings")
# Concurrency settings
concurrency_level = st.selectbox(
"Max Concurrency",
options=[1, 2, 4, 8],
index=2,
help="Maximum parallel tasks for code analysis"
)
# Resource limits
mem_limit = st.slider(
"Memory Limit (GB)",
1, 16, 4,
help="Maximum memory allocation for pipeline operations"
)
# Section 4: Security Settings
st.subheader("Security Configuration")
# Security rules toggle
enable_security_scan = st.checkbox(
"Enable Real-time Security Scanning",
value=True,
help="Perform continuous security analysis during development"
)
# Severity level filtering
security_level = st.selectbox(
"Minimum Security Severity Level",
["Low", "Medium", "High", "Critical"],
index=1,
help="Minimum severity level to trigger security alerts"
)
# Section 5: Workspace Configuration
st.subheader("Workspace Settings")
current_workspace = self.app.workspace_manager.workspace_dir
st.write(f"Current Workspace: `{current_workspace}`")
# Workspace actions
if st.button("Clear Workspace Cache"):
self.app.workspace_manager.clean_cache()
st.success("Workspace cache cleared!")
# Section 6: Diagnostic Settings
st.subheader("Diagnostics")
# Logging controls
log_level = st.selectbox(
"Logging Level",
["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
index=1
)
st.session_state.log_level = log_level # Store in session state
logging.getLogger().setLevel(log_level)
# Debug mode toggle
debug_mode = st.checkbox("Enable Debug Mode")
st.session_state.debug_mode = debug_mode # Store in session state
if debug_mode:
self.app.refinement_loop.logger.setLevel(logging.DEBUG)
else:
self.app.refinement_loop.logger.setLevel(logging.INFO)
# Section 7: System Information
st.subheader("System Info")
st.write(f"Python Version: {sys.version}")
st.write(f"Platform: {platform.platform()}")
st.write(f"Available Memory: {psutil.virtual_memory().available / (1024**3):.1f} GB free")
if __name__ == "__main__":
app = AutonomousAgentApp() # Create an instance of the app
app.run() # Call the run method to start the application