Spaces:
Sleeping
Sleeping
import platform | |
import streamlit as st | |
import psutil | |
from typing import List, Dict, Optional, Any, Tuple | |
from dataclasses import dataclass | |
from enum import Enum | |
import logging | |
import time | |
import ast | |
import pylint.lint | |
import radon.complexity | |
import radon.metrics | |
from pylint.lint import Run | |
from pylint.reporters import JSONReporter | |
from coverage import Coverage | |
import bandit | |
from bandit.core import manager | |
from datetime import datetime | |
import os | |
import sys | |
import requests | |
import asyncio | |
import statistics | |
import json | |
import traceback | |
from pathlib import Path | |
class PipelineStage(Enum): | |
"""Pipeline stages for the development process.""" | |
PLANNING = 1 | |
DEVELOPMENT = 2 | |
TESTING = 3 | |
# Set logging level from environment variable | |
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO')) | |
def main(): | |
autonomous_agent_app = AutonomousAgentApp() | |
app.run() | |
class AutonomousAgentApp: | |
"""Main application class for the Autonomous Agent System""" | |
def __init__(self): | |
self.autonomous_agent = AutonomousAgent(self) | |
self.workspace_manager = self.autonomous_agent.workspace_manager | |
self.refinement_loop = self.autonomous_agent.refinement_loop | |
self.interface = self.autonomous_agent.interface | |
def run(self): | |
"""Main entry point for the application""" | |
self.interface.render_main_interface() | |
class CodeMetricsAnalyzer: | |
"""Analyzes code metrics using various tools""" | |
def __init__(self): | |
self.metrics_history = [] | |
def analyze_code_quality(self, file_path: str) -> Dict[str, Any]: | |
"""Analyzes code quality using multiple metrics""" | |
try: | |
# Pylint analysis | |
pylint_score = self._run_pylint(file_path) | |
# Complexity analysis | |
complexity_score = self._analyze_complexity(file_path) | |
# Test coverage analysis | |
coverage_score = self._analyze_test_coverage(file_path) | |
# Security analysis | |
security_score = self._analyze_security(file_path) | |
# Calculate overall quality score | |
quality_score = self._calculate_overall_score( | |
pylint_score, | |
complexity_score, | |
coverage_score, | |
security_score | |
) | |
metrics = { | |
"quality_score": quality_score, | |
"pylint_score": pylint_score, | |
"complexity_score": complexity_score, | |
"coverage_score": coverage_score, | |
"security_score": security_score, | |
"timestamp": datetime.now() | |
} | |
self.metrics_history.append(metrics) | |
return metrics | |
except Exception as e: | |
logging.error(f"Error analyzing code metrics: {str(e)}") | |
return { | |
"error": str(e), | |
"quality_score": 0.0, | |
"timestamp": datetime.now() | |
} | |
def _run_pylint(self, file_path: str) -> float: | |
"""Runs pylint analysis""" | |
try: | |
reporter = JSONReporter() | |
Run([file_path], reporter=reporter, do_exit=False) | |
score = reporter.data.get('score', 0.0) | |
return float(score) / 10.0 # Normalize to 0-1 scale | |
except Exception as e: | |
logging.error(f"Pylint analysis error: {str(e)}") | |
return 0.0 | |
def _analyze_complexity(self, file_path: str) -> float: | |
"""Analyzes code complexity""" | |
try: | |
with open(file_path, 'r') as file: | |
code = file.read() | |
# Calculate cyclomatic complexity | |
complexity = radon.complexity.cc_visit(code) | |
avg_complexity = sum(item.complexity for item in complexity) / len(complexity) if complexity else 0 | |
# Normalize complexity score (0-1 scale, lower is better) | |
normalized_score = 1.0 - min(avg_complexity / 10.0, 1.0) | |
return normalized_score | |
except Exception as e: | |
logging.error(f"Complexity analysis error: {str(e)}") | |
return 0.0 | |
def _analyze_security(self, file_path: str) -> float: | |
"""Analyzes code security using bandit""" | |
try: | |
conf = manager.BanditManager() | |
conf.discover_files([file_path]) | |
conf.run_tests() | |
# Calculate security score based on findings | |
total_issues = len(conf.get_issue_list()) | |
max_severity = max((issue.severity for issue in conf.get_issue_list()), default=0) | |
# Normalize security score (0-1 scale, higher is better) | |
security_score = 1.0 - (total_issues * max_severity) / 10.0 | |
return max(0.0, min(1.0, security_score)) | |
except Exception as e: | |
logging.error(f"Security analysis error: {str(e)}") | |
return 0.0 | |
def _calculate_overall_score(self, pylint_score: float, complexity_score: float, | |
coverage_score: float, security_score: float) -> float: | |
"""Calculates overall code quality score""" | |
weights = { | |
'pylint': 0.3, | |
'complexity': 0.2, | |
'coverage': 0.25, | |
'security': 0.25 | |
} | |
overall_score = ( | |
weights['pylint'] * pylint_score + | |
weights['complexity'] * complexity_score + | |
weights['coverage'] * coverage_score + | |
weights['security'] * security_score | |
) | |
return max(0.0, min(1.0, overall_score)) | |
def get_metrics_history(self) -> List[Dict[str, Any]]: | |
"""Returns the history of metrics measurements""" | |
return self.metrics_history | |
def get_trend_analysis(self) -> Dict[str, Any]: | |
"""Analyzes trends in metrics over time""" | |
if not self.metrics_history: | |
return {"status": "No metrics history available"} | |
trends = { | |
"quality_score": self._calculate_trend([m["quality_score"] for m in self.metrics_history]), | |
"coverage_score": self._calculate_trend([m["coverage_score"] for m in self.metrics_history]), | |
"security_score": self._calculate_trend([m["security_score"] for m in self.metrics_history]) | |
} | |
return trends | |
def _calculate_trend(self, values: List[float]) -> Dict[str, Any]: | |
"""Calculates trend statistics for a metric""" | |
if not values: | |
return {"trend": "unknown", "change": 0.0} | |
recent_values = values[-3:] # Look at last 3 measurements | |
if len(recent_values) < 2: | |
return {"trend": "insufficient data", "change": 0.0} | |
change = recent_values[-1] - recent_values[0] | |
trend = "improving" if change > 0 else "declining" if change < 0 else "stable" | |
return { | |
"trend": trend, | |
"change": change, | |
"current": recent_values[-1], | |
"previous": recent_values[0] | |
} | |
class WorkspaceManager: | |
"""Manages the workspace for the Autonomous Agent System.""" | |
def __init__(self, workspace_dir: str): | |
self.workspace_dir = workspace_dir | |
def get_workspace_tree(self) -> Dict[str, Any]: | |
"""Get the structure of the workspace.""" | |
# Placeholder implementation | |
return {"workspace": "tree_structure"} | |
def create_file(self, filename: str, content: str) -> str: | |
"""Create a new file in the workspace.""" | |
file_path = os.path.join(self.workspace_dir, filename) | |
with open(file_path, 'w') as file: | |
file.write(content) | |
return f"File '{filename}' created successfully." | |
def delete_file(self, filename: str) -> str: | |
"""Delete a file from the workspace.""" | |
file_path = os.path.join(self.workspace_dir, filename) | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
return f"File '{filename}' deleted successfully." | |
return f"File '{filename}' not found." | |
class ToolManager: | |
"""Manages tools for the autonomous agent system.""" | |
def __init__(self): | |
self.tools = {} | |
def add_tool(self, tool_name, tool_config): | |
"""Add a tool to the tool manager.""" | |
self.tools[tool_name] = tool_config | |
def get_tool(self, tool_name): | |
"""Get a tool from the tool manager.""" | |
return self.tools.get(tool_name) | |
def remove_tool(self, tool_name): | |
"""Remove a tool from the tool manager.""" | |
if tool_name in self.tools: | |
del self.tools[tool_name] | |
class QualityMetrics: | |
"""Advanced quality metrics tracking and analysis""" | |
def __init__(self): | |
self.metrics_analyzer = CodeMetricsAnalyzer() | |
self.code_quality_score = 0.0 | |
self.test_coverage = 0.0 | |
self.security_score = "unknown" | |
self.performance_score = 0.0 | |
self.history = [] | |
self.thresholds = { | |
"code_quality": 0.85, | |
"test_coverage": 0.90, | |
"security": 0.85, | |
"performance": 0.80 | |
} | |
class AutonomousAgent: | |
"""Autonomous agent for the system.""" | |
def __init__(self, app): | |
self.app = app | |
self.workspace_manager = WorkspaceManager(workspace_dir=os.getenv('WORKSPACE_DIR', 'workspace')) | |
self.pipeline = self._initialize_pipeline() | |
self.refinement_loop = RefinementLoop(pipeline=self.pipeline) | |
self.interface = self.StreamlitInterface(self) | |
self.tools_repository = self._initialize_tool_repository() | |
self.chat_system = ChatSystem(self) | |
def _setup_tool_manager(self): | |
"""Setup tool manager with configuration.""" | |
return ToolManager() | |
def _initialize_pipeline(self) -> 'DevelopmentPipeline': | |
"""Initialize the development pipeline.""" | |
return DevelopmentPipeline( | |
workspace_manager=self.workspace_manager, | |
tool_manager=self._setup_tool_manager() | |
) | |
def initialize_tool_repository(self, tool_repository: object) -> None: | |
"""Initializes the tool repository.""" | |
self._tool_repository = tool_repository | |
def build_tool(self, tool_name, task): | |
"""Builds a tool.""" | |
tool = self.tool_repository.get_tool(tool_name) | |
if tool: | |
tool.run(task) | |
return f"{tool_name} built and ran successfully." | |
else: | |
return f"{tool_name} not found in tool repository." | |
def build_agent(self, agent_name, role): | |
"""Builds an agent.""" | |
agent = self._create_agent(agent_name, role) | |
if agent: | |
return f"{agent_name} agent built successfully." | |
else: | |
return f"{agent_name} agent creation failed." | |
def _create_agent(self, agent_name, role): | |
"""Creates a new agent.""" | |
if role == "development": | |
return DevelopmentAgent(agent_name) | |
elif role == "testing": | |
return TestingAgent(agent_name) | |
elif role == "security": | |
return SecurityAgent(agent_name) | |
else: | |
return None | |
class DevelopmentPipeline: | |
def __init__(self, workspace_manager, tool_manager): | |
"""Initialize the development pipeline with the given workspace and tool managers.""" | |
self.workspace_manager = workspace_manager | |
self.tool_manager = tool_manager | |
self.logger = logging.getLogger(__name__) | |
async def execute_stage(self, stage: PipelineStage, input_data: Dict) -> Dict[str, Any]: | |
"""Execute a pipeline stage and return results.""" | |
self.logger.info(f"Executing pipeline stage: {stage.value}") | |
try: | |
if stage == PipelineStage.PLANNING: | |
return await self._handle_planning(input_data) | |
elif stage == PipelineStage.DEVELOPMENT: | |
return await self._handle_development(input_data) | |
elif stage == PipelineStage.TESTING: | |
return await self._handle_testing(input_data) | |
else: | |
raise ValueError(f"Unknown pipeline stage: {stage}") | |
except Exception as e: | |
self.logger.error(f"Error in {stage.value} stage: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
async def _handle_planning(self, input_data: Dict) -> Dict: | |
"""Handle planning stage execution.""" | |
self.logger.info("Handling planning stage") | |
try: | |
task = input_data.get("task", "") | |
if not task: | |
raise ValueError("No task provided for planning") | |
# Step 1: Analyze the task and break it into subtasks | |
subtasks = self._break_down_task(task) | |
# Step 2: Generate a development plan | |
development_plan = { | |
"task": task, | |
"subtasks": subtasks, | |
"milestones": self._define_milestones(subtasks), | |
"timeline": self._estimate_timeline(subtasks) | |
} | |
# Step 3: Create initial project artifacts (e.g., requirements.txt) | |
self.workspace_manager.create_file("requirements.txt", self._generate_requirements(subtasks)) | |
return { | |
"status": "success", | |
"result": {"plan": development_plan}, | |
"artifacts": ["requirements.txt"] | |
} | |
except Exception as e: | |
self.logger.error(f"Error in planning stage: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
def _break_down_task(self, task: str) -> List[str]: | |
"""Break down a task into smaller subtasks.""" | |
return [f"Subtask {i+1}: {part}" for i, part in enumerate(task.split(","))] | |
def _define_milestones(self, subtasks: List[str]) -> List[str]: | |
"""Define milestones based on subtasks.""" | |
return [f"Complete {subtask}" for subtask in subtasks] | |
def _estimate_timeline(self, subtasks: List[str]) -> Dict[str, int]: | |
"""Estimate a timeline for the subtasks.""" | |
return {subtask: 1 for subtask in subtasks} | |
def _generate_requirements(self, subtasks: List[str]) -> str: | |
"""Generate a requirements document based on subtasks.""" | |
return "\n".join([f"Requirement: {subtask}" for subtask in subtasks]) | |
async def _handle_development(self, input_data: Dict) -> Dict: | |
"""Handle development stage execution.""" | |
self.logger.info("Handling development stage") | |
try: | |
plan = input_data.get("result", {}).get("plan", {}) | |
if not plan: | |
raise ValueError("No development plan provided") | |
# Step 1: Generate boilerplate code | |
self.workspace_manager.create_file("main.py", self._generate_boilerplate_code(plan)) | |
# Step 2: Implement functionality for each subtask | |
for subtask in plan.get("subtasks", []): | |
self._implement_subtask(subtask) | |
return { | |
"status": "success", | |
"result": {"code": "print('Hello World')"}, | |
"artifacts": ["main.py"] | |
} | |
except Exception as e: | |
self.logger.error(f"Error in development stage: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
def _generate_boilerplate_code(self, plan: Dict) -> str: | |
"""Generated boilerplate code based on the development plan.""" | |
return """f"# Project: {plan.get('task', 'Untitled')} | |
# Subtasks: | |
{''.join([f'# {subtask} for subtask in plan.get('subtasks', [])])} | |
def main(): | |
print('Hello World') | |
if __name__ == '__main__': | |
main()""""" | |
def _implement_subtask(self, subtask: str) -> None: | |
"""Implement functionality for a subtask.""" | |
with open(os.path.join(self.workspace_manager.workspace_dir, "main.py"), "a") as file: | |
file.write(f"\n# Implementation for {subtask}\n") | |
async def _handle_testing(self, input_data: Dict) -> Dict: | |
"""Handle testing stage execution.""" | |
self.logger.info("Handling testing stage") | |
try: | |
code_path = os.path.join(self.workspace_manager.workspace_dir, "main.py") | |
if not os.path.exists(code_path): | |
raise FileNotFoundError("No code found for testing") | |
# Step 1: Run unit tests | |
test_results = self._run_unit_tests(code_path) | |
# Step 2: Generate a test report | |
test_report = self._generate_test_report(test_results) | |
self.workspace_manager.create_file("test_report.html", test_report) | |
return { | |
"status": "success", | |
"result": {"test_results": test_results}, | |
"artifacts": ["test_report.html"] | |
} | |
except Exception as e: | |
self.logger.error(f"Error in testing stage: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
def _run_unit_tests(self, code_path: str) -> Dict[str, Any]: | |
"""Run unit tests on the code.""" | |
return { | |
"tests_run": 5, | |
"tests_passed": 5, | |
"tests_failed": 0, | |
"coverage": "100%" | |
} | |
def _generate_test_report(self, test_results: Dict) -> str: | |
"""Generate an HTML test report.""" | |
return f""" | |
<html> | |
<head><title>Test Report</title></head> | |
<body> | |
<h1>Test Report</h1> | |
<ul> | |
<li>Tests Run: {test_results.get('tests_run', 0)}</li> | |
<li>Tests Passed: {test_results.get('tests_passed', 0)}</li> | |
<li>Tests Failed: {test_results.get('tests_failed', 0)}</li> | |
<li>Coverage: {test_results.get('coverage', '0%')}</li> | |
</ul> | |
</body> | |
</html> | |
""" | |
class RefinementLoop: | |
"""Manages the iterative refinement process.""" | |
def __init__(self, pipeline): | |
self.pipeline = pipeline | |
self.max_iterations = 10 | |
self.quality_metrics = QualityMetrics() | |
self.logger = logging.getLogger(__name__) | |
self.current_iteration = 0 | |
self.history = [] | |
async def run_refinement_cycle(self, task: str) -> Dict[str, Any]: | |
"""Run a complete refinement cycle for the given task.""" | |
self.logger.info(f"Starting refinement cycle for task: {task}") | |
self.current_iteration = 0 | |
try: | |
while self.current_iteration < self.max_iterations: | |
self.logger.info(f"Starting iteration {self.current_iteration + 1}") | |
# Execute pipeline stages | |
planning_result = await self.pipeline.execute_stage( | |
PipelineStage.PLANNING, | |
{"task": task} | |
) | |
development_result = await self.pipeline.execute_stage( | |
PipelineStage.DEVELOPMENT, | |
planning_result["result"] | |
) | |
testing_result = await self.pipeline.execute_stage( | |
PipelineStage.TESTING, | |
development_result["result"] | |
) | |
# Analyze results | |
quality_analysis = self._analyze_quality(testing_result["result"]) | |
# Record iteration history | |
self.history.append({ | |
"iteration": self.current_iteration, | |
"quality_metrics": quality_analysis, | |
"timestamp": datetime.now() | |
}) | |
# Check if quality requirements are met | |
if self._meets_quality_requirements(quality_analysis): | |
self.logger.info("Quality requirements met. Refinement cycle complete.") | |
return self._prepare_final_result(quality_analysis) | |
self.current_iteration += 1 | |
return { | |
"status": "max_iterations_reached", | |
"iterations_completed": self.current_iteration, | |
"final_quality": quality_analysis | |
} | |
except Exception as e: | |
self.logger.error(f"Error in refinement cycle: {str(e)}") | |
return {"status": "error", "error": str(e)} | |
def _analyze_quality(self, result: Dict[str, Any]) -> Dict[str, float]: | |
"""Analyze the quality metrics of the current iteration.""" | |
return { | |
"code_quality": self.quality_metrics.code_quality_score, | |
"test_coverage": self.quality_metrics.test_coverage, | |
"security_score": float(self.quality_metrics.security_score) | |
} | |
def _meets_quality_requirements(self, quality_analysis: Dict[str, float]) -> bool: | |
"""Check if the current quality metrics meet the requirements.""" | |
thresholds = self.quality_metrics.thresholds | |
return ( | |
quality_analysis["code_quality"] >= thresholds["code_quality"] and | |
quality_analysis["test_coverage"] >= thresholds["test_coverage"] and | |
quality_analysis["security_score"] >= thresholds["security"] | |
) | |
def _prepare_final_result(self, quality_analysis: Dict[str, float]) -> Dict[str, Any]: | |
"""Prepare the final result of the refinement cycle.""" | |
return { | |
"status": "success", | |
"iterations_completed": self.current_iteration, | |
"final_quality": quality_analysis, | |
"history": self.history | |
} | |
def get_refinement_history(self) -> List[Dict[str, Any]]: | |
"""Get the history of refinement iterations.""" | |
return self.history | |
class ChatSystem: | |
"""Manages chat interaction between users and the autonomous system.""" | |
def __init__(self, agent): | |
self.agent = agent | |
self.chat_history = [] | |
self.active_tasks = {} | |
self.command_handlers = { | |
'/task': self.handle_task_command, | |
'/status': self.handle_status_command, | |
'/stop': self.handle_stop_command, | |
'/help': self.handle_help_command, | |
'/modify': self.handle_modify_command | |
} | |
self.logger = logging.getLogger(__name__) | |
def render_chat_interface(self): | |
"""Render the chat interface in Streamlit sidebar.""" | |
with st.sidebar: | |
st.markdown("---") | |
st.subheader("System Chat") | |
if st.button("Clear Chat History"): | |
self.clear_chat_history() | |
chat_container = st.container() | |
with chat_container: | |
for message in self.chat_history: | |
self._render_message(message) | |
user_input = st.text_input("Type message/command...", key="chat_input") | |
if st.button("Send", key="send_message"): | |
self.process_user_input(user_input) | |
def handle_task_command(self, input_data: Dict): | |
"""Handle task command.""" | |
self.logger.info("Handling task command") | |
task = input_data.get("task", input_data.get("input", "")) | |
asyncio.create_task(self.agent.app.refinement_loop.run_refinement_cycle(task)) | |
return "Task command initiated" | |
def handle_status_command(self, input_data: Dict): | |
"""Handle status command.""" | |
self.logger.info("Handling status command") | |
return { | |
"status": "success", | |
"history": self.agent.app.refinement_loop.get_refinement_history() | |
} | |
def handle_stop_command(self, input_data: Dict): | |
"""Handle stop command.""" | |
self.logger.info("Handling stop command") | |
# Add logic to stop current task | |
return "Stop command handled" | |
def handle_help_command(self, input_data: Dict): | |
"""Handle help command.""" | |
self.logger.info("Handling help command") | |
return """ | |
Available commands: | |
/task <task_name> - Run the autonomous agent with the given task | |
/status - Get the current status of the refinement cycle | |
/stop - Stop the current task | |
/help - Show this help message | |
/modify - Modify current task parameters | |
""" | |
def handle_modify_command(self, input_data: Dict): | |
"""Handle modify command.""" | |
self.logger.info("Handling modify command") | |
return "Modify command handled" | |
def clear_chat_history(self): | |
"""Clear the chat history.""" | |
self.logger.info("Clearing chat history") | |
self.chat_history.clear() | |
return "Chat history cleared" | |
def _render_message(self, message: str): | |
"""Render a chat message.""" | |
st.write(message) | |
def process_user_input(self, user_input: str): | |
"""Process user input.""" | |
self.logger.info("Processing user input") | |
command = user_input.strip().split()[0] | |
if command in self.command_handlers: | |
result = self.command_handlers[command]({"input": user_input}) | |
self.chat_history.append(f"User: {user_input}") | |
self.chat_history.append(f"System: {result}") | |
class StreamlitInterface: | |
"""Streamlit UI integration for the Autonomous Agent system.""" | |
def __init__(self, app): | |
self.app = app | |
self.chat_system = ChatSystem(self.app.autonomous_agent) | |
def render_main_interface(self): | |
"""Render the main Streamlit interface.""" | |
st.title("Autonomous Agent System") | |
# Add chat interface to sidebar | |
self.chat_system.render_chat_interface() | |
# Main content tabs | |
tab_names = ["Autonomous Agent", "Workspace Management", "Settings"] | |
selected_tab = st.selectbox("Select a Tab", tab_names) | |
if selected_tab == "Autonomous Agent": | |
self.render_autonomous_agent_tab() | |
elif selected_tab == "Workspace Management": | |
self.render_workspace_management_tab() | |
elif selected_tab == "Settings": | |
self.render_settings_tab() | |
def render_autonomous_agent_tab(self): | |
"""Render the Autonomous Agent tab.""" | |
st.header("Autonomous Agent") | |
task = st.text_area("Enter a task for the autonomous agent:") | |
if st.button("Run Autonomous Agent"): | |
if task: | |
try: | |
result = asyncio.run(self.app.refinement_loop.run_refinement_cycle(task)) | |
st.success(f"Result: {result}") | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
def render_workspace_management_tab(self): | |
"""Render the Workspace Management tab.""" | |
st.header("Workspace Management") | |
workspace_tree = self.app.workspace_manager.get_workspace_tree() | |
st.write(workspace_tree) | |
def render_settings_tab(self): | |
"""Render the Settings tab.""" | |
st.header("Settings") | |
# Refinement Process Settings | |
st.subheader("Refinement Process") | |
max_iterations = st.slider( | |
"Maximum Iterations", | |
min_value=1, | |
max_value=20, | |
value=self.app.refinement_loop.max_iterations | |
) | |
if max_iterations != self.app.refinement_loop.max_iterations: | |
self.app.refinement_loop.max_iterations = max_iterations | |
st.success(f"Updated maximum iterations to {max_iterations}") | |
# Quality Metrics Settings | |
st.subheader("Quality Metrics") | |
metrics = self.app.refinement_loop.quality_metrics | |
col1, col2 = st.columns(2) | |
with col1: | |
code_quality = st.slider( | |
"Code Quality Threshold", | |
0.0, 1.0, | |
metrics.thresholds["code_quality"] | |
) | |
with col2: | |
test_coverage = st.slider( | |
"Test Coverage Threshold", | |
0.0, 1.0, | |
metrics.thresholds["test_coverage"] | |
) | |
if st.button("Update Thresholds"): | |
metrics.thresholds.update({ | |
"code_quality": code_quality, | |
"test_coverage": test_coverage | |
}) | |
st.success("Quality thresholds updated") | |
if __name__ == "__main__": | |
app = AutonomousAgentApp() | |
app.interface.render_main_interface() |