File size: 8,981 Bytes
7998f26 db31752 00f7a31 7998f26 e0c0600 7998f26 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
import fastapi
from typing import Optional, Dict, Any
import copy
import requests
import json
import os
import sys
from io import StringIO
import ctypes
import subprocess
import logging
from pathlib import Path
from llama_cpp import Llama
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time
import inspect
# Load model directly
from transformers import AutoTokenizer, AutoModelForCausalLM
tokenizer = AutoTokenizer.from_pretrained("meetkai/functionary-small-v3.2-GGUF", trust_remote_code=True)
model_path = AutoModelForCausalLM.from_pretrained("sentence-transformers/all-MiniLM-L6-v2", trust_remote_code=True)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class Tool(fastapi.FastAPI):
def __init__(
self,
tool_name: str,
description: str,
name_for_human: Optional[str] = None,
name_for_model: Optional[str] = None,
description_for_human: Optional[str] = None,
description_for_model: Optional[str] = None,
logo_url: Optional[str] = None,
author_github: Optional[str] = None,
contact_email: str = "",
legal_info_url: str = "",
version: str = "0.1.0",
):
super().__init__(
title=tool_name,
description=description,
version=version,
)
if name_for_human is None:
name_for_human = tool_name
if name_for_model is None:
name_for_model = name_for_human
if description_for_human is None:
description_for_human = description
if description_for_model is None:
description_for_model = description_for_human
self.api_info = {
"schema_version": "v1",
"name_for_human": name_for_human,
"name_for_model": name_for_model,
"description_for_human": description_for_human,
"description_for_model": description_for_model,
"auth": {
"type": "none",
},
"api": {
"type": "openapi",
"url": "/openapi.json",
"is_user_authenticated": False,
},
"author_github": author_github,
"logo_url": logo_url,
"contact_email": contact_email,
"legal_info_url": legal_info_url,
}
@self.get("/.well-known/ai-plugin.json", include_in_schema=False)
def get_api_info(request: fastapi.Request):
openapi_path = str(request.url).replace("/.well-known/ai-plugin.json", "/openapi.json")
info = copy.deepcopy(self.api_info)
info["api"]["url"] = str(openapi_path)
return info
class SelfLearningTool:
def __init__(self):
self.tools = {}
def add_tool(self, name: str, func: callable):
self.tools[name] = func
def use_tool(self, name: str, *args, **kwargs):
if name in self.tools:
return self.tools[name](*args, **kwargs)
else:
return f"Tool '{name}' not found."
def list_tools(self):
return list(self.tools.keys())
def remove_tool(self, name: str):
if name in self.tools:
del self.tools[name]
return f"Tool '{name}' removed successfully."
else:
return f"Tool '{name}' not found."
class PythonREPL:
def __init__(self):
self.globals = {}
self.locals = {}
self.output_buffer = StringIO()
self.self_learning_tool = SelfLearningTool()
def run(self, command: str) -> str:
old_stdout = sys.stdout
sys.stdout = self.output_buffer
try:
exec(command, self.globals, self.locals)
output = self.output_buffer.getvalue()
except Exception as e:
output = f"Error: {repr(e)}"
finally:
sys.stdout = old_stdout
self.output_buffer.truncate(0)
self.output_buffer.seek(0)
return output
def add_tool(self, name: str, func: callable):
self.self_learning_tool.add_tool(name, func)
def use_tool(self, name: str, *args, **kwargs):
return self.self_learning_tool.use_tool(name, *args, **kwargs)
def list_tools(self):
return self.self_learning_tool.list_tools()
def remove_tool(self, name: str):
return self.self_learning_tool.remove_tool(name)
def self_reflect(self):
reflection = "Self-reflection:\n"
reflection += f"Number of defined variables: {len(self.locals)}\n"
reflection += f"Number of available tools: {len(self.list_tools())}\n"
reflection += "Available tools:\n"
for tool in self.list_tools():
reflection += f"- {tool}\n"
return reflection
def self_inspect(self):
inspection = "Self-inspection:\n"
for name, value in self.locals.items():
inspection += f"{name}: {type(value)}\n"
if callable(value):
try:
signature = inspect.signature(value)
inspection += f" Signature: {signature}\n"
except ValueError:
inspection += " Signature: Unable to inspect\n"
return inspection
def initialize_llm(model_path: str, n_ctx: int, n_threads: int = 4, n_batch: int = 512) -> Llama:
try:
return Llama(model_path=model_path, n_ctx=n_ctx, n_threads=n_threads, n_batch=n_batch, verbose=True)
except Exception as e:
logger.error(f"Failed to initialize LLM: {e}")
raise
llm = initialize_llm(model_path, 4096)
def build_tool(config) -> Tool:
tool = Tool(
"Advanced Python REPL",
"Execute sophisticated Python commands with self-learning capabilities",
name_for_model="Advanced Python REPL",
description_for_model=(
"An advanced Python shell for executing complex Python commands. "
"Input should be a valid Python command or script. "
"Use print(...) to see the output of expressions. "
"Capable of handling multi-line code, advanced Python features, "
"and self-learning tools."
),
logo_url="https://your-app-url.com/.well-known/logo.png",
contact_email="[email protected]",
legal_info_url="[email protected]"
)
python_repl = PythonREPL()
def sanitize_input(query: str) -> str:
return query.strip().strip("```").strip()
@tool.get("/run_python")
def run_python(query: str):
sanitized_query = sanitize_input(query)
result = python_repl.run(sanitized_query)
return {"result": result, "execution_time": time.time()}
@tool.get("/add_tool")
def add_tool(name: str, code: str):
sanitized_code = sanitize_input(code)
try:
exec(f"def {name}({sanitized_code})", python_repl.globals, python_repl.locals)
python_repl.add_tool(name, python_repl.locals[name])
return f"Tool '{name}' added successfully."
except Exception as e:
return f"Error adding tool: {str(e)}"
@tool.get("/use_tool")
def use_tool(name: str, args: str):
try:
result = python_repl.use_tool(name, *eval(args))
return {"result": result}
except Exception as e:
return {"error": str(e)}
@tool.get("/list_tools")
def list_tools():
return {"tools": python_repl.list_tools()}
@tool.get("/remove_tool")
def remove_tool(name: str):
return {"result": python_repl.remove_tool(name)}
@tool.get("/self_reflect")
def self_reflect():
return {"reflection": python_repl.self_reflect()}
@tool.get("/self_inspect")
def self_inspect():
return {"inspection": python_repl.self_inspect()}
@tool.get("/write_file")
def write_file(file_path: str, text: str) -> str:
write_path = Path(file_path)
try:
write_path.parent.mkdir(exist_ok=True, parents=False)
with write_path.open("w", encoding="utf-8") as f:
f.write(text)
return f"File written successfully to {file_path}."
except Exception as e:
return "Error: " + str(e)
@tool.get("/read_file")
def read_file(file_path: str) -> str:
read_path = Path(file_path)
try:
with read_path.open("r", encoding="utf-8") as f:
content = f.read()
return content
except Exception as e:
return "Error: " + str(e)
return tool
if __name__ == "__main__":
config = {} # Add any necessary configuration
advanced_python_repl = build_tool(config)
# Run the FastAPI server
import uvicorn
uvicorn.run(advanced_python_repl, host="0.0.0.0", port=8000)
|