|
from __future__ import annotations
|
|
from typing import AsyncIterable, Dict, Deque
|
|
from collections import deque
|
|
import requests
|
|
from fastapi_poe import (
|
|
PoeBot,
|
|
QueryRequest,
|
|
PartialResponse,
|
|
SettingsRequest,
|
|
SettingsResponse,
|
|
make_app,
|
|
)
|
|
from fastapi import Header
|
|
from modal import Image, App, asgi_app, Volume, Mount
|
|
import os
|
|
import json
|
|
import re
|
|
import tempfile
|
|
import shutil
|
|
import logging
|
|
|
|
|
|
NGROK_URL = "https://fca7-2601-2c1-280-1320-b881-aac7-9186-9365.ngrok-free.app"
|
|
LM_STUDIO_CHAT_URL = f"{NGROK_URL}/v1/chat/completions"
|
|
|
|
|
|
MODEL_NAME = "bartowski/Qwen2.5-Coder-32B-Instruct-GGUF/Qwen2.5-Coder-32B-Instruct-IQ2_M.gguf"
|
|
|
|
|
|
NEW_BOT_ACCESS_KEY = "YOUR_ACCESS_KEY_HERE"
|
|
|
|
|
|
VOLUME_PATH = "/data/user_histories.json"
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(message)s')
|
|
|
|
|
|
user_histories: Dict[str, Deque[dict]] = {}
|
|
|
|
|
|
MAX_HISTORY_MESSAGES = 50
|
|
|
|
|
|
if os.path.exists(VOLUME_PATH):
|
|
try:
|
|
with open(VOLUME_PATH, "r") as f:
|
|
user_histories = {
|
|
user_id: deque(history, maxlen=MAX_HISTORY_MESSAGES)
|
|
for user_id, history in json.load(f).items()
|
|
}
|
|
logging.info("Loaded existing conversation histories.")
|
|
except Exception as e:
|
|
logging.error(f"Failed to load user histories: {e}")
|
|
user_histories = {}
|
|
else:
|
|
logging.info("No existing conversation history found. Initializing a new history store.")
|
|
|
|
class AnotherSecureLMStudioBot(PoeBot):
|
|
async def get_response(
|
|
self, request: QueryRequest, authorization: str = Header(...)
|
|
) -> AsyncIterable[PartialResponse]:
|
|
"""
|
|
Handle user queries dynamically while validating the Poe access key.
|
|
"""
|
|
|
|
if authorization != NEW_BOT_ACCESS_KEY:
|
|
logging.warning("Unauthorized access key used.")
|
|
|
|
|
|
|
|
user_id = self.get_user_id(request)
|
|
if not user_id:
|
|
yield PartialResponse(text="Error: User identifier not provided.")
|
|
return
|
|
|
|
|
|
if user_id not in user_histories:
|
|
user_histories[user_id] = deque(maxlen=MAX_HISTORY_MESSAGES)
|
|
logging.info(f"Initializing new conversation history for user {user_id}.")
|
|
conversation_history = user_histories[user_id]
|
|
|
|
|
|
user_message = request.query[-1].content
|
|
|
|
|
|
user_message = re.sub(r"[<>]", "", user_message)
|
|
|
|
|
|
conversation_history.append({"role": "user", "content": user_message})
|
|
|
|
|
|
logging.info(f"Conversation history for user {user_id}: {list(conversation_history)}")
|
|
|
|
try:
|
|
|
|
response_text = self.get_chat_completion_with_context(conversation_history)
|
|
|
|
|
|
conversation_history.append({"role": "assistant", "content": response_text})
|
|
|
|
|
|
logging.info(f"Generated response for user {user_id}: {response_text}")
|
|
|
|
|
|
self.save_conversation_history()
|
|
except Exception as e:
|
|
|
|
logging.error(f"An error occurred while processing the request for user {user_id}: {e}")
|
|
response_text = f"An error occurred: {e}"
|
|
|
|
|
|
yield PartialResponse(text=response_text.strip())
|
|
|
|
def get_user_id(self, request: QueryRequest) -> str:
|
|
"""
|
|
Extract or generate a unique user identifier.
|
|
"""
|
|
|
|
if hasattr(request, 'user_id') and request.user_id:
|
|
return request.user_id
|
|
|
|
|
|
return "default_user_id"
|
|
|
|
def get_chat_completion_with_context(self, conversation_history: Deque[dict]) -> str:
|
|
"""
|
|
Send a chat completion request to LM Studio's /v1/chat/completions endpoint,
|
|
including the full conversation history.
|
|
"""
|
|
|
|
payload = {
|
|
"model": MODEL_NAME,
|
|
"messages": list(conversation_history),
|
|
"temperature": 0.7,
|
|
"max_tokens": 1024,
|
|
"stream": False
|
|
}
|
|
|
|
logging.info(f"Sending request to LM Studio with payload:\n{json.dumps(payload, indent=2)}")
|
|
response = requests.post(LM_STUDIO_CHAT_URL, json=payload, timeout=120)
|
|
response.raise_for_status()
|
|
response_data = response.json()
|
|
|
|
|
|
if "choices" in response_data and len(response_data["choices"]) > 0:
|
|
generated_text = response_data["choices"][0].get("message", {}).get("content", "")
|
|
else:
|
|
generated_text = ""
|
|
|
|
|
|
if not generated_text:
|
|
generated_text = "I'm sorry, I couldn't generate a response. Could you please try again?"
|
|
|
|
return generated_text
|
|
|
|
def save_conversation_history(self):
|
|
"""
|
|
Save the current conversation history to the volume in an atomic way.
|
|
"""
|
|
try:
|
|
with tempfile.NamedTemporaryFile('w', delete=False) as tmp_file:
|
|
json.dump(
|
|
{user_id: list(history) for user_id, history in user_histories.items()},
|
|
tmp_file,
|
|
indent=4
|
|
)
|
|
temp_file_path = tmp_file.name
|
|
shutil.move(temp_file_path, VOLUME_PATH)
|
|
logging.info("Successfully saved user conversation histories.")
|
|
except Exception as e:
|
|
logging.error(f"Failed to save user histories: {e}")
|
|
|
|
async def get_settings(self, setting: SettingsRequest) -> SettingsResponse:
|
|
"""
|
|
Configure the bot's capabilities for Poe, such as enabling attachments.
|
|
"""
|
|
return SettingsResponse(
|
|
allow_attachments=True,
|
|
allow_images=True,
|
|
allow_audio=True,
|
|
allow_video=True,
|
|
allow_links=True,
|
|
)
|
|
|
|
|
|
REQUIREMENTS = ["fastapi-poe==0.0.24", "requests==2.31.0"]
|
|
image = Image.debian_slim().pip_install(REQUIREMENTS)
|
|
|
|
|
|
app = App(
|
|
"another-secure-lmstudio-poe-bot",
|
|
mounts=[Mount.from_local_dir("/data", remote_path="/data", recursive=True)]
|
|
)
|
|
|
|
@app.function(image=image)
|
|
@asgi_app()
|
|
def fastapi_app():
|
|
bot = AnotherSecureLMStudioBot()
|
|
return make_app(bot, allow_without_key=True)
|
|
|