Spaces:
Running
Running
import asyncio | |
import json | |
import os | |
import uuid | |
from datetime import datetime | |
from json import dumps | |
from fastapi import Body, FastAPI, HTTPException, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import (FileResponse, HTMLResponse, JSONResponse, StreamingResponse) | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from pydantic import BaseModel | |
from loguru import logger | |
import uvicorn | |
import aiohttp | |
app = FastAPI() | |
OPENMANUS_ENDPOINT_URL = os.getenv("OPENMANUS_ENDPOINT_URL") | |
if not OPENMANUS_ENDPOINT_URL: | |
raise EnvironmentError("OPENMANUS_ENDPOINT_URL environment variable must be set") | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
templates = Jinja2Templates(directory="templates") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
class Task(BaseModel): | |
id: str | |
prompt: str | |
created_at: datetime | |
status: str | |
steps: list = [] | |
def model_dump(self, *args, **kwargs): | |
data = super().model_dump(*args, **kwargs) | |
data["created_at"] = self.created_at.isoformat() | |
return data | |
class TaskManager: | |
def __init__(self): | |
self.tasks = {} | |
self.queues = {} | |
def create_task(self, prompt: str) -> Task: | |
task_id = str(uuid.uuid4()) | |
task = Task( | |
id=task_id, prompt=prompt, created_at=datetime.now(), status="pending" | |
) | |
self.tasks[task_id] = task | |
self.queues[task_id] = asyncio.Queue() | |
return task | |
async def update_task_step( | |
self, task_id: str, step: int, result: str, step_type: str = "step" | |
): | |
if task_id in self.tasks: | |
task = self.tasks[task_id] | |
task.steps.append({"step": step, "result": result, "type": step_type}) | |
await self.queues[task_id].put( | |
{"type": step_type, "step": step, "result": result} | |
) | |
await self.queues[task_id].put( | |
{"type": "status", "status": task.status, "steps": task.steps} | |
) | |
async def complete_task(self, task_id: str): | |
if task_id in self.tasks: | |
task = self.tasks[task_id] | |
task.status = "completed" | |
await self.queues[task_id].put( | |
{"type": "status", "status": task.status, "steps": task.steps} | |
) | |
await self.queues[task_id].put({"type": "complete"}) | |
async def fail_task(self, task_id: str, error: str): | |
if task_id in self.tasks: | |
self.tasks[task_id].status = f"failed: {error}" | |
await self.queues[task_id].put({"type": "error", "message": error}) | |
task_manager = TaskManager() | |
def get_available_themes(): | |
"""扫描themes目录获取所有可用主题""" | |
themes_dir = "static/themes" | |
if not os.path.exists(themes_dir): | |
return [{"id": "openmanus", "name": "Manus", "description": "默认主题"}] | |
themes = [] | |
for item in os.listdir(themes_dir): | |
theme_path = os.path.join(themes_dir, item) | |
if os.path.isdir(theme_path): | |
# 验证主题文件夹是否包含必要的文件 | |
templates_dir = os.path.join(theme_path, "templates") | |
static_dir = os.path.join(theme_path, "static") | |
config_file = os.path.join(theme_path, "theme.json") | |
if os.path.exists(templates_dir) and os.path.exists(static_dir): | |
if os.path.exists(os.path.join(templates_dir, "chat.html")): | |
theme_info = {"id": item, "name": item, "description": ""} | |
# 如果有配置文件,读取主题名称和描述 | |
if os.path.exists(config_file): | |
try: | |
with open(config_file, "r", encoding="utf-8") as f: | |
config = json.load(f) | |
theme_info["name"] = config.get("name", item) | |
theme_info["description"] = config.get( | |
"description", "" | |
) | |
except Exception as e: | |
print(f"读取主题配置文件出错: {str(e)}") | |
themes.append(theme_info) | |
# 确保Normal主题始终存在 | |
normal_exists = any(theme["id"] == "openmanus" for theme in themes) | |
if not normal_exists: | |
themes.append({"id": "openmanus", "name": "Manus", "description": "默认主题"}) | |
return themes | |
async def index(request: Request): | |
# 获取可用主题列表 | |
themes = get_available_themes() | |
# 对主题进行排序:Normal在前,cyberpunk在后,其他主题按原顺序 | |
sorted_themes = [] | |
normal_theme = None | |
cyberpunk_theme = None | |
other_themes = [] | |
for theme in themes: | |
if theme["id"] == "openmanus": | |
normal_theme = theme | |
elif theme["id"] == "cyberpunk": | |
cyberpunk_theme = theme | |
else: | |
other_themes.append(theme) | |
# 按照指定顺序组合主题 | |
if normal_theme: | |
sorted_themes.append(normal_theme) | |
if cyberpunk_theme: | |
sorted_themes.append(cyberpunk_theme) | |
sorted_themes.extend(other_themes) | |
return templates.TemplateResponse( | |
"index.html", {"request": request, "themes": sorted_themes} | |
) | |
async def chat(request: Request): | |
theme = request.query_params.get("theme", "openmanus") | |
# 尝试从主题文件夹加载chat.html | |
theme_chat_path = f"static/themes/{theme}/templates/chat.html" | |
if os.path.exists(theme_chat_path): | |
with open(theme_chat_path, "r", encoding="utf-8") as f: | |
content = f.read() | |
# 读取主题配置文件 | |
theme_config_path = f"static/themes/{theme}/theme.json" | |
theme_name = theme | |
if os.path.exists(theme_config_path): | |
try: | |
with open(theme_config_path, "r", encoding="utf-8") as f: | |
config = json.load(f) | |
theme_name = config.get("name", theme) | |
except Exception: | |
pass | |
# 将主题名称添加到HTML标题中 | |
content = content.replace( | |
"<title>Manus</title>", f"<title>Manus - {theme_name}</title>" | |
) | |
return HTMLResponse(content=content) | |
else: | |
# 默认使用templates中的chat.html | |
return templates.TemplateResponse("chat.html", {"request": request}) | |
async def download_file(file_path: str): | |
if not os.path.exists(file_path): | |
raise HTTPException(status_code=404, detail="File not found") | |
return FileResponse(file_path, filename=os.path.basename(file_path)) | |
async def create_task(prompt: str = Body(..., embed=True)): | |
task = task_manager.create_task(prompt) | |
asyncio.create_task(run_task(task.id, prompt)) | |
return {"task_id": task.id} | |
async def run_task(task_id: str, prompt: str): | |
try: | |
task_manager.tasks[task_id].status = "running" | |
async def on_think(thought): | |
await task_manager.update_task_step(task_id, 0, thought, "think") | |
async def on_tool_execute(tool, input): | |
await task_manager.update_task_step( | |
task_id, 0, f"Executing tool: {tool}\nInput: {input}", "tool" | |
) | |
async def on_action(action): | |
await task_manager.update_task_step( | |
task_id, 0, f"Executing action: {action}", "act" | |
) | |
async def on_run(step, result): | |
await task_manager.update_task_step(task_id, step, result, "run") | |
class SSELogHandler: | |
def __init__(self, task_id): | |
self.task_id = task_id | |
async def __call__(self, message): | |
import re | |
# Extract - Subsequent Content | |
cleaned_message = re.sub(r"^.*? - ", "", message) | |
cleaned_message = re.sub(r"^.*? - ", "", cleaned_message) | |
event_type = "log" | |
if "✨ Manus's thoughts:" in cleaned_message: | |
event_type = "think" | |
elif "🛠️ Manus selected" in cleaned_message: | |
event_type = "tool" | |
elif "🎯 Tool" in cleaned_message: | |
event_type = "act" | |
elif "📝 Oops!" in cleaned_message: | |
event_type = "error" | |
elif "🏁 Special tool" in cleaned_message: | |
event_type = "complete" | |
elif "🎉 Manus result:" in cleaned_message: | |
event_type = "result" | |
cleaned_message = cleaned_message.replace("🎉 Manus result:", "") | |
await task_manager.update_task_step( | |
self.task_id, 1, cleaned_message, event_type | |
) | |
return | |
await task_manager.update_task_step( | |
self.task_id, 0, cleaned_message, event_type | |
) | |
sse_handler = SSELogHandler(task_id) | |
logger.add(sse_handler) | |
import re | |
def has_log_prefix(message): | |
# 检查字符串是否包含两个 "|" 且以 " - " 分割前缀和内容 | |
return re.match(r"^.*?\|.*?\|.*? - ", message) is not None | |
async def call_manus(url: str, prompt: str): | |
generate_kwargs = { | |
"prompt": prompt, | |
} | |
async with aiohttp.ClientSession() as session: | |
async with session.post( | |
url=url, | |
json=generate_kwargs, | |
timeout=aiohttp.ClientTimeout(total=3600) | |
) as response: | |
buffer = "" | |
async for line in response.content: | |
decode_line = line.decode('utf-8') | |
if has_log_prefix(decode_line) and len(buffer)>0: | |
logger.info(buffer) | |
buffer = "" | |
else: | |
buffer += decode_line | |
if buffer: | |
logger.info(buffer) | |
await call_manus(OPENMANUS_ENDPOINT_URL, prompt) | |
await task_manager.update_task_step(task_id, 1, "", "result") | |
await task_manager.complete_task(task_id) | |
except Exception as e: | |
await task_manager.fail_task(task_id, str(e)) | |
async def task_events(task_id: str): | |
async def event_generator(): | |
if task_id not in task_manager.queues: | |
yield f"event: error\ndata: {dumps({'message': 'Task not found'})}\n\n" | |
return | |
queue = task_manager.queues[task_id] | |
task = task_manager.tasks.get(task_id) | |
if task: | |
yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n" | |
while True: | |
try: | |
event = await queue.get() | |
formatted_event = dumps(event) | |
yield ": heartbeat\n\n" | |
if event["type"] == "complete": | |
yield f"event: complete\ndata: {formatted_event}\n\n" | |
break | |
elif event["type"] == "error": | |
yield f"event: error\ndata: {formatted_event}\n\n" | |
break | |
elif event["type"] == "step": | |
task = task_manager.tasks.get(task_id) | |
if task: | |
yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n" | |
yield f"event: {event['type']}\ndata: {formatted_event}\n\n" | |
elif event["type"] in ["think", "tool", "act", "run"]: | |
yield f"event: {event['type']}\ndata: {formatted_event}\n\n" | |
else: | |
yield f"event: {event['type']}\ndata: {formatted_event}\n\n" | |
except asyncio.CancelledError: | |
print(f"Client disconnected for task {task_id}") | |
break | |
except Exception as e: | |
print(f"Error in event stream: {str(e)}") | |
yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n" | |
break | |
return StreamingResponse( | |
event_generator(), | |
media_type="text/event-stream", | |
headers={ | |
"Cache-Control": "no-cache", | |
"Connection": "keep-alive", | |
"X-Accel-Buffering": "no", | |
}, | |
) | |
async def get_tasks(): | |
sorted_tasks = sorted( | |
task_manager.tasks.values(), key=lambda task: task.created_at, reverse=True | |
) | |
return JSONResponse( | |
content=[task.model_dump() for task in sorted_tasks], | |
headers={"Content-Type": "application/json"}, | |
) | |
async def get_task(task_id: str): | |
if task_id not in task_manager.tasks: | |
raise HTTPException(status_code=404, detail="Task not found") | |
return task_manager.tasks[task_id] | |
async def generic_exception_handler(request: Request, exc: Exception): | |
return JSONResponse( | |
status_code=500, content={"message": f"Server error: {str(exc)}"} | |
) | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=7860) |