Scripts / main2.py
Aborman's picture
Upload folder using huggingface_hub
542f845 verified
from __future__ import annotations
from typing import AsyncIterable, Dict, Deque
from collections import deque
import requests
from fastapi_poe import (
PoeBot,
QueryRequest,
PartialResponse,
SettingsRequest,
SettingsResponse,
make_app,
)
from fastapi import Header
from modal import Image, App, asgi_app, Volume, Mount
import os
import json
import re
import tempfile
import shutil
import logging
# LM Studio endpoint configurations
NGROK_URL = "https://fca7-2601-2c1-280-1320-b881-aac7-9186-9365.ngrok-free.app"
LM_STUDIO_CHAT_URL = f"{NGROK_URL}/v1/chat/completions"
# Hardcoded model name for LM Studio
MODEL_NAME = "bartowski/Qwen2.5-Coder-32B-Instruct-GGUF/Qwen2.5-Coder-32B-Instruct-IQ2_M.gguf"
# Poe bot access key for the new bot
NEW_BOT_ACCESS_KEY = "YOUR_ACCESS_KEY_HERE" # Replace with your actual access key
# Path to store conversation history in volume
VOLUME_PATH = "/data/user_histories.json"
# Configure logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(message)s')
# Dictionary to track user-specific conversation history (initially empty)
user_histories: Dict[str, Deque[dict]] = {}
# Set a maximum number of messages to keep in the history
MAX_HISTORY_MESSAGES = 50 # Adjustable based on expected conversation length
# Load existing conversation history from volume if available
if os.path.exists(VOLUME_PATH):
try:
with open(VOLUME_PATH, "r") as f:
user_histories = {
user_id: deque(history, maxlen=MAX_HISTORY_MESSAGES)
for user_id, history in json.load(f).items()
}
logging.info("Loaded existing conversation histories.")
except Exception as e:
logging.error(f"Failed to load user histories: {e}")
user_histories = {}
else:
logging.info("No existing conversation history found. Initializing a new history store.")
class AnotherSecureLMStudioBot(PoeBot):
async def get_response(
self, request: QueryRequest, authorization: str = Header(...)
) -> AsyncIterable[PartialResponse]:
"""
Handle user queries dynamically while validating the Poe access key.
"""
# Validate the Poe access key
if authorization != NEW_BOT_ACCESS_KEY:
logging.warning("Unauthorized access key used.")
# Proceed without raising an error for testing purposes
# Extract user identifier
user_id = self.get_user_id(request)
if not user_id:
yield PartialResponse(text="Error: User identifier not provided.")
return
# Get or create user-specific conversation history
if user_id not in user_histories:
user_histories[user_id] = deque(maxlen=MAX_HISTORY_MESSAGES)
logging.info(f"Initializing new conversation history for user {user_id}.")
conversation_history = user_histories[user_id]
# Extract the user's message
user_message = request.query[-1].content
# Sanitize user input to prevent injection attacks
user_message = re.sub(r"[<>]", "", user_message)
# Append the user's message to the conversation history
conversation_history.append({"role": "user", "content": user_message})
# Log the conversation history before generating a response
logging.info(f"Conversation history for user {user_id}: {list(conversation_history)}")
try:
# Generate response based on conversation history
response_text = self.get_chat_completion_with_context(conversation_history)
# Add bot response to conversation history
conversation_history.append({"role": "assistant", "content": response_text})
# Log the response generated
logging.info(f"Generated response for user {user_id}: {response_text}")
# Save updated conversation history to volume
self.save_conversation_history()
except Exception as e:
# Graceful error handling
logging.error(f"An error occurred while processing the request for user {user_id}: {e}")
response_text = f"An error occurred: {e}"
# Yield the response
yield PartialResponse(text=response_text.strip())
def get_user_id(self, request: QueryRequest) -> str:
"""
Extract or generate a unique user identifier.
"""
# Use request.user_id if available
if hasattr(request, 'user_id') and request.user_id:
return request.user_id
# Fallback: use a fixed identifier
return "default_user_id"
def get_chat_completion_with_context(self, conversation_history: Deque[dict]) -> str:
"""
Send a chat completion request to LM Studio's /v1/chat/completions endpoint,
including the full conversation history.
"""
# Prepare the payload
payload = {
"model": MODEL_NAME,
"messages": list(conversation_history),
"temperature": 0.7, # Adjust as needed
"max_tokens": 1024,
"stream": False
}
logging.info(f"Sending request to LM Studio with payload:\n{json.dumps(payload, indent=2)}")
response = requests.post(LM_STUDIO_CHAT_URL, json=payload, timeout=120)
response.raise_for_status()
response_data = response.json()
# Use the assistant response content from the response
if "choices" in response_data and len(response_data["choices"]) > 0:
generated_text = response_data["choices"][0].get("message", {}).get("content", "")
else:
generated_text = ""
# Fallback in case of empty content
if not generated_text:
generated_text = "I'm sorry, I couldn't generate a response. Could you please try again?"
return generated_text
def save_conversation_history(self):
"""
Save the current conversation history to the volume in an atomic way.
"""
try:
with tempfile.NamedTemporaryFile('w', delete=False) as tmp_file:
json.dump(
{user_id: list(history) for user_id, history in user_histories.items()},
tmp_file,
indent=4
)
temp_file_path = tmp_file.name
shutil.move(temp_file_path, VOLUME_PATH)
logging.info("Successfully saved user conversation histories.")
except Exception as e:
logging.error(f"Failed to save user histories: {e}")
async def get_settings(self, setting: SettingsRequest) -> SettingsResponse:
"""
Configure the bot's capabilities for Poe, such as enabling attachments.
"""
return SettingsResponse(
allow_attachments=True,
allow_images=True,
allow_audio=True,
allow_video=True,
allow_links=True,
)
# Modal configuration for the new bot
REQUIREMENTS = ["fastapi-poe==0.0.24", "requests==2.31.0"]
image = Image.debian_slim().pip_install(REQUIREMENTS)
# Mount the volume to persist user histories
app = App(
"another-secure-lmstudio-poe-bot",
mounts=[Mount.from_local_dir("/data", remote_path="/data", recursive=True)]
)
@app.function(image=image)
@asgi_app()
def fastapi_app():
bot = AnotherSecureLMStudioBot()
return make_app(bot, allow_without_key=True)