|
import fastapi |
|
from typing import Optional, Dict, Any |
|
import copy |
|
import requests |
|
import json |
|
import os |
|
import sys |
|
from io import StringIO |
|
import ctypes |
|
import subprocess |
|
import logging |
|
from pathlib import Path |
|
from llama_cpp import Llama |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
import random |
|
import time |
|
import inspect |
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
|
tokenizer = AutoTokenizer.from_pretrained("meetkai/functionary-small-v3.2-GGUF", trust_remote_code=True) |
|
model_path = AutoModelForCausalLM.from_pretrained("meetkai/functionary-small-v3.1", trust_remote_code=True) |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
class Tool(fastapi.FastAPI): |
|
def __init__( |
|
self, |
|
tool_name: str, |
|
description: str, |
|
name_for_human: Optional[str] = None, |
|
name_for_model: Optional[str] = None, |
|
description_for_human: Optional[str] = None, |
|
description_for_model: Optional[str] = None, |
|
logo_url: Optional[str] = None, |
|
author_github: Optional[str] = None, |
|
contact_email: str = "", |
|
legal_info_url: str = "", |
|
version: str = "0.1.0", |
|
): |
|
super().__init__( |
|
title=tool_name, |
|
description=description, |
|
version=version, |
|
) |
|
|
|
if name_for_human is None: |
|
name_for_human = tool_name |
|
if name_for_model is None: |
|
name_for_model = name_for_human |
|
if description_for_human is None: |
|
description_for_human = description |
|
if description_for_model is None: |
|
description_for_model = description_for_human |
|
|
|
self.api_info = { |
|
"schema_version": "v1", |
|
"name_for_human": name_for_human, |
|
"name_for_model": name_for_model, |
|
"description_for_human": description_for_human, |
|
"description_for_model": description_for_model, |
|
"auth": { |
|
"type": "none", |
|
}, |
|
"api": { |
|
"type": "openapi", |
|
"url": "/openapi.json", |
|
"is_user_authenticated": False, |
|
}, |
|
"author_github": author_github, |
|
"logo_url": logo_url, |
|
"contact_email": contact_email, |
|
"legal_info_url": legal_info_url, |
|
} |
|
|
|
@self.get("/.well-known/ai-plugin.json", include_in_schema=False) |
|
def get_api_info(request: fastapi.Request): |
|
openapi_path = str(request.url).replace("/.well-known/ai-plugin.json", "/openapi.json") |
|
info = copy.deepcopy(self.api_info) |
|
info["api"]["url"] = str(openapi_path) |
|
return info |
|
|
|
class SelfLearningTool: |
|
def __init__(self): |
|
self.tools = {} |
|
|
|
def add_tool(self, name: str, func: callable): |
|
self.tools[name] = func |
|
|
|
def use_tool(self, name: str, *args, **kwargs): |
|
if name in self.tools: |
|
return self.tools[name](*args, **kwargs) |
|
else: |
|
return f"Tool '{name}' not found." |
|
|
|
def list_tools(self): |
|
return list(self.tools.keys()) |
|
|
|
def remove_tool(self, name: str): |
|
if name in self.tools: |
|
del self.tools[name] |
|
return f"Tool '{name}' removed successfully." |
|
else: |
|
return f"Tool '{name}' not found." |
|
|
|
class PythonREPL: |
|
def __init__(self): |
|
self.globals = {} |
|
self.locals = {} |
|
self.output_buffer = StringIO() |
|
self.self_learning_tool = SelfLearningTool() |
|
|
|
def run(self, command: str) -> str: |
|
old_stdout = sys.stdout |
|
sys.stdout = self.output_buffer |
|
try: |
|
exec(command, self.globals, self.locals) |
|
output = self.output_buffer.getvalue() |
|
except Exception as e: |
|
output = f"Error: {repr(e)}" |
|
finally: |
|
sys.stdout = old_stdout |
|
self.output_buffer.truncate(0) |
|
self.output_buffer.seek(0) |
|
return output |
|
|
|
def add_tool(self, name: str, func: callable): |
|
self.self_learning_tool.add_tool(name, func) |
|
|
|
def use_tool(self, name: str, *args, **kwargs): |
|
return self.self_learning_tool.use_tool(name, *args, **kwargs) |
|
|
|
def list_tools(self): |
|
return self.self_learning_tool.list_tools() |
|
|
|
def remove_tool(self, name: str): |
|
return self.self_learning_tool.remove_tool(name) |
|
|
|
def self_reflect(self): |
|
reflection = "Self-reflection:\n" |
|
reflection += f"Number of defined variables: {len(self.locals)}\n" |
|
reflection += f"Number of available tools: {len(self.list_tools())}\n" |
|
reflection += "Available tools:\n" |
|
for tool in self.list_tools(): |
|
reflection += f"- {tool}\n" |
|
return reflection |
|
|
|
def self_inspect(self): |
|
inspection = "Self-inspection:\n" |
|
for name, value in self.locals.items(): |
|
inspection += f"{name}: {type(value)}\n" |
|
if callable(value): |
|
try: |
|
signature = inspect.signature(value) |
|
inspection += f" Signature: {signature}\n" |
|
except ValueError: |
|
inspection += " Signature: Unable to inspect\n" |
|
return inspection |
|
|
|
def initialize_llm(model_path: str, n_ctx: int, n_threads: int = 4, n_batch: int = 512) -> Llama: |
|
try: |
|
return Llama(model_path=model_path, n_ctx=n_ctx, n_threads=n_threads, n_batch=n_batch, verbose=True) |
|
except Exception as e: |
|
logger.error(f"Failed to initialize LLM: {e}") |
|
raise |
|
|
|
llm = initialize_llm(model_path, 4096) |
|
|
|
def build_tool(config) -> Tool: |
|
tool = Tool( |
|
"Advanced Python REPL", |
|
"Execute sophisticated Python commands with self-learning capabilities", |
|
name_for_model="Advanced Python REPL", |
|
description_for_model=( |
|
"An advanced Python shell for executing complex Python commands. " |
|
"Input should be a valid Python command or script. " |
|
"Use print(...) to see the output of expressions. " |
|
"Capable of handling multi-line code, advanced Python features, " |
|
"and self-learning tools." |
|
), |
|
logo_url="https://your-app-url.com/.well-known/logo.png", |
|
contact_email="[email protected]", |
|
legal_info_url="[email protected]" |
|
) |
|
|
|
python_repl = PythonREPL() |
|
|
|
def sanitize_input(query: str) -> str: |
|
return query.strip().strip("```").strip() |
|
|
|
@tool.get("/run_python") |
|
def run_python(query: str): |
|
sanitized_query = sanitize_input(query) |
|
result = python_repl.run(sanitized_query) |
|
return {"result": result, "execution_time": time.time()} |
|
|
|
@tool.get("/add_tool") |
|
def add_tool(name: str, code: str): |
|
sanitized_code = sanitize_input(code) |
|
try: |
|
exec(f"def {name}({sanitized_code})", python_repl.globals, python_repl.locals) |
|
python_repl.add_tool(name, python_repl.locals[name]) |
|
return f"Tool '{name}' added successfully." |
|
except Exception as e: |
|
return f"Error adding tool: {str(e)}" |
|
|
|
@tool.get("/use_tool") |
|
def use_tool(name: str, args: str): |
|
try: |
|
result = python_repl.use_tool(name, *eval(args)) |
|
return {"result": result} |
|
except Exception as e: |
|
return {"error": str(e)} |
|
|
|
@tool.get("/list_tools") |
|
def list_tools(): |
|
return {"tools": python_repl.list_tools()} |
|
|
|
@tool.get("/remove_tool") |
|
def remove_tool(name: str): |
|
return {"result": python_repl.remove_tool(name)} |
|
|
|
@tool.get("/self_reflect") |
|
def self_reflect(): |
|
return {"reflection": python_repl.self_reflect()} |
|
|
|
@tool.get("/self_inspect") |
|
def self_inspect(): |
|
return {"inspection": python_repl.self_inspect()} |
|
|
|
@tool.get("/write_file") |
|
def write_file(file_path: str, text: str) -> str: |
|
write_path = Path(file_path) |
|
try: |
|
write_path.parent.mkdir(exist_ok=True, parents=False) |
|
with write_path.open("w", encoding="utf-8") as f: |
|
f.write(text) |
|
return f"File written successfully to {file_path}." |
|
except Exception as e: |
|
return "Error: " + str(e) |
|
|
|
@tool.get("/read_file") |
|
def read_file(file_path: str) -> str: |
|
read_path = Path(file_path) |
|
try: |
|
with read_path.open("r", encoding="utf-8") as f: |
|
content = f.read() |
|
return content |
|
except Exception as e: |
|
return "Error: " + str(e) |
|
|
|
return tool |
|
|
|
if __name__ == "__main__": |
|
config = {} |
|
advanced_python_repl = build_tool(config) |
|
|
|
|
|
import uvicorn |
|
uvicorn.run(advanced_python_repl, host="0.0.0.0", port=8000) |
|
|