File size: 7,655 Bytes
542f845 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
from __future__ import annotations
from typing import AsyncIterable, Dict, Deque
from collections import deque
import requests
from fastapi_poe import (
PoeBot,
QueryRequest,
PartialResponse,
SettingsRequest,
SettingsResponse,
make_app,
)
from fastapi import Header
from modal import Image, App, asgi_app, Volume, Mount
import os
import json
import re
import tempfile
import shutil
import logging
# LM Studio endpoint configurations
NGROK_URL = "https://fca7-2601-2c1-280-1320-b881-aac7-9186-9365.ngrok-free.app"
LM_STUDIO_CHAT_URL = f"{NGROK_URL}/v1/chat/completions"
# Hardcoded model name for LM Studio
MODEL_NAME = "bartowski/Qwen2.5-Coder-32B-Instruct-GGUF/Qwen2.5-Coder-32B-Instruct-IQ2_M.gguf"
# Poe bot access key for the new bot
NEW_BOT_ACCESS_KEY = "YOUR_ACCESS_KEY_HERE" # Replace with your actual access key
# Path to store conversation history in volume
VOLUME_PATH = "/data/user_histories.json"
# Configure logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(message)s')
# Dictionary to track user-specific conversation history (initially empty)
user_histories: Dict[str, Deque[dict]] = {}
# Set a maximum number of messages to keep in the history
MAX_HISTORY_MESSAGES = 50 # Adjustable based on expected conversation length
# Load existing conversation history from volume if available
if os.path.exists(VOLUME_PATH):
try:
with open(VOLUME_PATH, "r") as f:
user_histories = {
user_id: deque(history, maxlen=MAX_HISTORY_MESSAGES)
for user_id, history in json.load(f).items()
}
logging.info("Loaded existing conversation histories.")
except Exception as e:
logging.error(f"Failed to load user histories: {e}")
user_histories = {}
else:
logging.info("No existing conversation history found. Initializing a new history store.")
class AnotherSecureLMStudioBot(PoeBot):
async def get_response(
self, request: QueryRequest, authorization: str = Header(...)
) -> AsyncIterable[PartialResponse]:
"""
Handle user queries dynamically while validating the Poe access key.
"""
# Validate the Poe access key
if authorization != NEW_BOT_ACCESS_KEY:
logging.warning("Unauthorized access key used.")
# Proceed without raising an error for testing purposes
# Extract user identifier
user_id = self.get_user_id(request)
if not user_id:
yield PartialResponse(text="Error: User identifier not provided.")
return
# Get or create user-specific conversation history
if user_id not in user_histories:
user_histories[user_id] = deque(maxlen=MAX_HISTORY_MESSAGES)
logging.info(f"Initializing new conversation history for user {user_id}.")
conversation_history = user_histories[user_id]
# Extract the user's message
user_message = request.query[-1].content
# Sanitize user input to prevent injection attacks
user_message = re.sub(r"[<>]", "", user_message)
# Append the user's message to the conversation history
conversation_history.append({"role": "user", "content": user_message})
# Log the conversation history before generating a response
logging.info(f"Conversation history for user {user_id}: {list(conversation_history)}")
try:
# Generate response based on conversation history
response_text = self.get_chat_completion_with_context(conversation_history)
# Add bot response to conversation history
conversation_history.append({"role": "assistant", "content": response_text})
# Log the response generated
logging.info(f"Generated response for user {user_id}: {response_text}")
# Save updated conversation history to volume
self.save_conversation_history()
except Exception as e:
# Graceful error handling
logging.error(f"An error occurred while processing the request for user {user_id}: {e}")
response_text = f"An error occurred: {e}"
# Yield the response
yield PartialResponse(text=response_text.strip())
def get_user_id(self, request: QueryRequest) -> str:
"""
Extract or generate a unique user identifier.
"""
# Use request.user_id if available
if hasattr(request, 'user_id') and request.user_id:
return request.user_id
# Fallback: use a fixed identifier
return "default_user_id"
def get_chat_completion_with_context(self, conversation_history: Deque[dict]) -> str:
"""
Send a chat completion request to LM Studio's /v1/chat/completions endpoint,
including the full conversation history.
"""
# Prepare the payload
payload = {
"model": MODEL_NAME,
"messages": list(conversation_history),
"temperature": 0.7, # Adjust as needed
"max_tokens": 1024,
"stream": False
}
logging.info(f"Sending request to LM Studio with payload:\n{json.dumps(payload, indent=2)}")
response = requests.post(LM_STUDIO_CHAT_URL, json=payload, timeout=120)
response.raise_for_status()
response_data = response.json()
# Use the assistant response content from the response
if "choices" in response_data and len(response_data["choices"]) > 0:
generated_text = response_data["choices"][0].get("message", {}).get("content", "")
else:
generated_text = ""
# Fallback in case of empty content
if not generated_text:
generated_text = "I'm sorry, I couldn't generate a response. Could you please try again?"
return generated_text
def save_conversation_history(self):
"""
Save the current conversation history to the volume in an atomic way.
"""
try:
with tempfile.NamedTemporaryFile('w', delete=False) as tmp_file:
json.dump(
{user_id: list(history) for user_id, history in user_histories.items()},
tmp_file,
indent=4
)
temp_file_path = tmp_file.name
shutil.move(temp_file_path, VOLUME_PATH)
logging.info("Successfully saved user conversation histories.")
except Exception as e:
logging.error(f"Failed to save user histories: {e}")
async def get_settings(self, setting: SettingsRequest) -> SettingsResponse:
"""
Configure the bot's capabilities for Poe, such as enabling attachments.
"""
return SettingsResponse(
allow_attachments=True,
allow_images=True,
allow_audio=True,
allow_video=True,
allow_links=True,
)
# Modal configuration for the new bot
REQUIREMENTS = ["fastapi-poe==0.0.24", "requests==2.31.0"]
image = Image.debian_slim().pip_install(REQUIREMENTS)
# Mount the volume to persist user histories
app = App(
"another-secure-lmstudio-poe-bot",
mounts=[Mount.from_local_dir("/data", remote_path="/data", recursive=True)]
)
@app.function(image=image)
@asgi_app()
def fastapi_app():
bot = AnotherSecureLMStudioBot()
return make_app(bot, allow_without_key=True)
|