import fastapi from typing import Optional, Dict, Any import copy import requests import json import os import sys from io import StringIO import ctypes import subprocess import logging from pathlib import Path from llama_cpp import Llama from concurrent.futures import ThreadPoolExecutor, as_completed import random import time import inspect # Load model directly from transformers import AutoTokenizer, AutoModelForCausalLM tokenizer = AutoTokenizer.from_pretrained("meetkai/functionary-small-v3.1", trust_remote_code=True) model_path = AutoModelForCausalLM.from_pretrained("meetkai/functionary-small-v3.1", trust_remote_code=True) # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class Tool(fastapi.FastAPI): def __init__( self, tool_name: str, description: str, name_for_human: Optional[str] = None, name_for_model: Optional[str] = None, description_for_human: Optional[str] = None, description_for_model: Optional[str] = None, logo_url: Optional[str] = None, author_github: Optional[str] = None, contact_email: str = "", legal_info_url: str = "", version: str = "0.1.0", ): super().__init__( title=tool_name, description=description, version=version, ) if name_for_human is None: name_for_human = tool_name if name_for_model is None: name_for_model = name_for_human if description_for_human is None: description_for_human = description if description_for_model is None: description_for_model = description_for_human self.api_info = { "schema_version": "v1", "name_for_human": name_for_human, "name_for_model": name_for_model, "description_for_human": description_for_human, "description_for_model": description_for_model, "auth": { "type": "none", }, "api": { "type": "openapi", "url": "/openapi.json", "is_user_authenticated": False, }, "author_github": author_github, "logo_url": logo_url, "contact_email": contact_email, "legal_info_url": legal_info_url, } @self.get("/.well-known/ai-plugin.json", include_in_schema=False) def get_api_info(request: fastapi.Request): openapi_path = str(request.url).replace("/.well-known/ai-plugin.json", "/openapi.json") info = copy.deepcopy(self.api_info) info["api"]["url"] = str(openapi_path) return info class SelfLearningTool: def __init__(self): self.tools = {} def add_tool(self, name: str, func: callable): self.tools[name] = func def use_tool(self, name: str, *args, **kwargs): if name in self.tools: return self.tools[name](*args, **kwargs) else: return f"Tool '{name}' not found." def list_tools(self): return list(self.tools.keys()) def remove_tool(self, name: str): if name in self.tools: del self.tools[name] return f"Tool '{name}' removed successfully." else: return f"Tool '{name}' not found." class PythonREPL: def __init__(self): self.globals = {} self.locals = {} self.output_buffer = StringIO() self.self_learning_tool = SelfLearningTool() def run(self, command: str) -> str: old_stdout = sys.stdout sys.stdout = self.output_buffer try: exec(command, self.globals, self.locals) output = self.output_buffer.getvalue() except Exception as e: output = f"Error: {repr(e)}" finally: sys.stdout = old_stdout self.output_buffer.truncate(0) self.output_buffer.seek(0) return output def add_tool(self, name: str, func: callable): self.self_learning_tool.add_tool(name, func) def use_tool(self, name: str, *args, **kwargs): return self.self_learning_tool.use_tool(name, *args, **kwargs) def list_tools(self): return self.self_learning_tool.list_tools() def remove_tool(self, name: str): return self.self_learning_tool.remove_tool(name) def self_reflect(self): reflection = "Self-reflection:\n" reflection += f"Number of defined variables: {len(self.locals)}\n" reflection += f"Number of available tools: {len(self.list_tools())}\n" reflection += "Available tools:\n" for tool in self.list_tools(): reflection += f"- {tool}\n" return reflection def self_inspect(self): inspection = "Self-inspection:\n" for name, value in self.locals.items(): inspection += f"{name}: {type(value)}\n" if callable(value): try: signature = inspect.signature(value) inspection += f" Signature: {signature}\n" except ValueError: inspection += " Signature: Unable to inspect\n" return inspection def initialize_llm(model_path: str, n_ctx: int, n_threads: int = 4, n_batch: int = 512) -> Llama: try: return Llama(model_path=model_path, n_ctx=n_ctx, n_threads=n_threads, n_batch=n_batch, verbose=True) except Exception as e: logger.error(f"Failed to initialize LLM: {e}") raise llm = initialize_llm(model_path, 4096) def build_tool(config) -> Tool: tool = Tool( "Advanced Python REPL", "Execute sophisticated Python commands with self-learning capabilities", name_for_model="Advanced Python REPL", description_for_model=( "An advanced Python shell for executing complex Python commands. " "Input should be a valid Python command or script. " "Use print(...) to see the output of expressions. " "Capable of handling multi-line code, advanced Python features, " "and self-learning tools." ), logo_url="https://your-app-url.com/.well-known/logo.png", contact_email="hello@contact.com", legal_info_url="hello@legal.com" ) python_repl = PythonREPL() def sanitize_input(query: str) -> str: return query.strip().strip("```").strip() @tool.get("/run_python") def run_python(query: str): sanitized_query = sanitize_input(query) result = python_repl.run(sanitized_query) return {"result": result, "execution_time": time.time()} @tool.get("/add_tool") def add_tool(name: str, code: str): sanitized_code = sanitize_input(code) try: exec(f"def {name}({sanitized_code})", python_repl.globals, python_repl.locals) python_repl.add_tool(name, python_repl.locals[name]) return f"Tool '{name}' added successfully." except Exception as e: return f"Error adding tool: {str(e)}" @tool.get("/use_tool") def use_tool(name: str, args: str): try: result = python_repl.use_tool(name, *eval(args)) return {"result": result} except Exception as e: return {"error": str(e)} @tool.get("/list_tools") def list_tools(): return {"tools": python_repl.list_tools()} @tool.get("/remove_tool") def remove_tool(name: str): return {"result": python_repl.remove_tool(name)} @tool.get("/self_reflect") def self_reflect(): return {"reflection": python_repl.self_reflect()} @tool.get("/self_inspect") def self_inspect(): return {"inspection": python_repl.self_inspect()} @tool.get("/write_file") def write_file(file_path: str, text: str) -> str: write_path = Path(file_path) try: write_path.parent.mkdir(exist_ok=True, parents=False) with write_path.open("w", encoding="utf-8") as f: f.write(text) return f"File written successfully to {file_path}." except Exception as e: return "Error: " + str(e) @tool.get("/read_file") def read_file(file_path: str) -> str: read_path = Path(file_path) try: with read_path.open("r", encoding="utf-8") as f: content = f.read() return content except Exception as e: return "Error: " + str(e) return tool if __name__ == "__main__": config = {} # Add any necessary configuration advanced_python_repl = build_tool(config) # Run the FastAPI server import uvicorn uvicorn.run(advanced_python_repl, host="0.0.0.0", port=8000)