Spaces:
Runtime error
Runtime error
import fastapi | |
from typing import Optional, Dict, Any | |
import copy | |
import requests | |
import json | |
import os | |
import sys | |
from io import StringIO | |
import ctypes | |
import subprocess | |
import logging | |
from pathlib import Path | |
from llama_cpp import Llama | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import random | |
import time | |
import inspect | |
# Load model directly | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
tokenizer = AutoTokenizer.from_pretrained("meetkai/functionary-small-v3.1", trust_remote_code=True) | |
model_path = AutoModelForCausalLM.from_pretrained("meetkai/functionary-small-v3.1", trust_remote_code=True) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
class Tool(fastapi.FastAPI): | |
def __init__( | |
self, | |
tool_name: str, | |
description: str, | |
name_for_human: Optional[str] = None, | |
name_for_model: Optional[str] = None, | |
description_for_human: Optional[str] = None, | |
description_for_model: Optional[str] = None, | |
logo_url: Optional[str] = None, | |
author_github: Optional[str] = None, | |
contact_email: str = "", | |
legal_info_url: str = "", | |
version: str = "0.1.0", | |
): | |
super().__init__( | |
title=tool_name, | |
description=description, | |
version=version, | |
) | |
if name_for_human is None: | |
name_for_human = tool_name | |
if name_for_model is None: | |
name_for_model = name_for_human | |
if description_for_human is None: | |
description_for_human = description | |
if description_for_model is None: | |
description_for_model = description_for_human | |
self.api_info = { | |
"schema_version": "v1", | |
"name_for_human": name_for_human, | |
"name_for_model": name_for_model, | |
"description_for_human": description_for_human, | |
"description_for_model": description_for_model, | |
"auth": { | |
"type": "none", | |
}, | |
"api": { | |
"type": "openapi", | |
"url": "/openapi.json", | |
"is_user_authenticated": False, | |
}, | |
"author_github": author_github, | |
"logo_url": logo_url, | |
"contact_email": contact_email, | |
"legal_info_url": legal_info_url, | |
} | |
def get_api_info(request: fastapi.Request): | |
openapi_path = str(request.url).replace("/.well-known/ai-plugin.json", "/openapi.json") | |
info = copy.deepcopy(self.api_info) | |
info["api"]["url"] = str(openapi_path) | |
return info | |
class SelfLearningTool: | |
def __init__(self): | |
self.tools = {} | |
def add_tool(self, name: str, func: callable): | |
self.tools[name] = func | |
def use_tool(self, name: str, *args, **kwargs): | |
if name in self.tools: | |
return self.tools[name](*args, **kwargs) | |
else: | |
return f"Tool '{name}' not found." | |
def list_tools(self): | |
return list(self.tools.keys()) | |
def remove_tool(self, name: str): | |
if name in self.tools: | |
del self.tools[name] | |
return f"Tool '{name}' removed successfully." | |
else: | |
return f"Tool '{name}' not found." | |
class PythonREPL: | |
def __init__(self): | |
self.globals = {} | |
self.locals = {} | |
self.output_buffer = StringIO() | |
self.self_learning_tool = SelfLearningTool() | |
def run(self, command: str) -> str: | |
old_stdout = sys.stdout | |
sys.stdout = self.output_buffer | |
try: | |
exec(command, self.globals, self.locals) | |
output = self.output_buffer.getvalue() | |
except Exception as e: | |
output = f"Error: {repr(e)}" | |
finally: | |
sys.stdout = old_stdout | |
self.output_buffer.truncate(0) | |
self.output_buffer.seek(0) | |
return output | |
def add_tool(self, name: str, func: callable): | |
self.self_learning_tool.add_tool(name, func) | |
def use_tool(self, name: str, *args, **kwargs): | |
return self.self_learning_tool.use_tool(name, *args, **kwargs) | |
def list_tools(self): | |
return self.self_learning_tool.list_tools() | |
def remove_tool(self, name: str): | |
return self.self_learning_tool.remove_tool(name) | |
def self_reflect(self): | |
reflection = "Self-reflection:\n" | |
reflection += f"Number of defined variables: {len(self.locals)}\n" | |
reflection += f"Number of available tools: {len(self.list_tools())}\n" | |
reflection += "Available tools:\n" | |
for tool in self.list_tools(): | |
reflection += f"- {tool}\n" | |
return reflection | |
def self_inspect(self): | |
inspection = "Self-inspection:\n" | |
for name, value in self.locals.items(): | |
inspection += f"{name}: {type(value)}\n" | |
if callable(value): | |
try: | |
signature = inspect.signature(value) | |
inspection += f" Signature: {signature}\n" | |
except ValueError: | |
inspection += " Signature: Unable to inspect\n" | |
return inspection | |
def initialize_llm(model_path: str, n_ctx: int, n_threads: int = 4, n_batch: int = 512) -> Llama: | |
try: | |
return Llama(model_path=model_path, n_ctx=n_ctx, n_threads=n_threads, n_batch=n_batch, verbose=True) | |
except Exception as e: | |
logger.error(f"Failed to initialize LLM: {e}") | |
raise | |
llm = initialize_llm('../models/lil.gguf', 4096) | |
def build_tool(config) -> Tool: | |
tool = Tool( | |
"Advanced Python REPL", | |
"Execute sophisticated Python commands with self-learning capabilities", | |
name_for_model="Advanced Python REPL", | |
description_for_model=( | |
"An advanced Python shell for executing complex Python commands. " | |
"Input should be a valid Python command or script. " | |
"Use print(...) to see the output of expressions. " | |
"Capable of handling multi-line code, advanced Python features, " | |
"and self-learning tools." | |
), | |
logo_url="https://your-app-url.com/.well-known/logo.png", | |
contact_email="[email protected]", | |
legal_info_url="[email protected]" | |
) | |
python_repl = PythonREPL() | |
def sanitize_input(query: str) -> str: | |
return query.strip().strip("```").strip() | |
def run_python(query: str): | |
sanitized_query = sanitize_input(query) | |
result = python_repl.run(sanitized_query) | |
return {"result": result, "execution_time": time.time()} | |
def add_tool(name: str, code: str): | |
sanitized_code = sanitize_input(code) | |
try: | |
exec(f"def {name}({sanitized_code})", python_repl.globals, python_repl.locals) | |
python_repl.add_tool(name, python_repl.locals[name]) | |
return f"Tool '{name}' added successfully." | |
except Exception as e: | |
return f"Error adding tool: {str(e)}" | |
def use_tool(name: str, args: str): | |
try: | |
result = python_repl.use_tool(name, *eval(args)) | |
return {"result": result} | |
except Exception as e: | |
return {"error": str(e)} | |
def list_tools(): | |
return {"tools": python_repl.list_tools()} | |
def remove_tool(name: str): | |
return {"result": python_repl.remove_tool(name)} | |
def self_reflect(): | |
return {"reflection": python_repl.self_reflect()} | |
def self_inspect(): | |
return {"inspection": python_repl.self_inspect()} | |
def write_file(file_path: str, text: str) -> str: | |
write_path = Path(file_path) | |
try: | |
write_path.parent.mkdir(exist_ok=True, parents=False) | |
with write_path.open("w", encoding="utf-8") as f: | |
f.write(text) | |
return f"File written successfully to {file_path}." | |
except Exception as e: | |
return "Error: " + str(e) | |
def read_file(file_path: str) -> str: | |
read_path = Path(file_path) | |
try: | |
with read_path.open("r", encoding="utf-8") as f: | |
content = f.read() | |
return content | |
except Exception as e: | |
return "Error: " + str(e) | |
return tool | |
if __name__ == "__main__": | |
config = {} # Add any necessary configuration | |
advanced_python_repl = build_tool(config) | |
# Run the FastAPI server | |
import uvicorn | |
uvicorn.run(advanced_python_repl, host="0.0.0.0", port=8000) | |