File size: 7,655 Bytes
542f845
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from __future__ import annotations
from typing import AsyncIterable, Dict, Deque
from collections import deque
import requests
from fastapi_poe import (
    PoeBot,
    QueryRequest,
    PartialResponse,
    SettingsRequest,
    SettingsResponse,
    make_app,
)
from fastapi import Header
from modal import Image, App, asgi_app, Volume, Mount
import os
import json
import re
import tempfile
import shutil
import logging

# LM Studio endpoint configurations
NGROK_URL = "https://fca7-2601-2c1-280-1320-b881-aac7-9186-9365.ngrok-free.app"
LM_STUDIO_CHAT_URL = f"{NGROK_URL}/v1/chat/completions"

# Hardcoded model name for LM Studio
MODEL_NAME = "bartowski/Qwen2.5-Coder-32B-Instruct-GGUF/Qwen2.5-Coder-32B-Instruct-IQ2_M.gguf"

# Poe bot access key for the new bot
NEW_BOT_ACCESS_KEY = "YOUR_ACCESS_KEY_HERE"  # Replace with your actual access key

# Path to store conversation history in volume
VOLUME_PATH = "/data/user_histories.json"

# Configure logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(message)s')

# Dictionary to track user-specific conversation history (initially empty)
user_histories: Dict[str, Deque[dict]] = {}

# Set a maximum number of messages to keep in the history
MAX_HISTORY_MESSAGES = 50  # Adjustable based on expected conversation length

# Load existing conversation history from volume if available
if os.path.exists(VOLUME_PATH):
    try:
        with open(VOLUME_PATH, "r") as f:
            user_histories = {
                user_id: deque(history, maxlen=MAX_HISTORY_MESSAGES)
                for user_id, history in json.load(f).items()
            }
        logging.info("Loaded existing conversation histories.")
    except Exception as e:
        logging.error(f"Failed to load user histories: {e}")
        user_histories = {}
else:
    logging.info("No existing conversation history found. Initializing a new history store.")

class AnotherSecureLMStudioBot(PoeBot):
    async def get_response(

        self, request: QueryRequest, authorization: str = Header(...)

    ) -> AsyncIterable[PartialResponse]:
        """

        Handle user queries dynamically while validating the Poe access key.

        """
        # Validate the Poe access key
        if authorization != NEW_BOT_ACCESS_KEY:
            logging.warning("Unauthorized access key used.")
            # Proceed without raising an error for testing purposes

        # Extract user identifier
        user_id = self.get_user_id(request)
        if not user_id:
            yield PartialResponse(text="Error: User identifier not provided.")
            return

        # Get or create user-specific conversation history
        if user_id not in user_histories:
            user_histories[user_id] = deque(maxlen=MAX_HISTORY_MESSAGES)
            logging.info(f"Initializing new conversation history for user {user_id}.")
        conversation_history = user_histories[user_id]

        # Extract the user's message
        user_message = request.query[-1].content

        # Sanitize user input to prevent injection attacks
        user_message = re.sub(r"[<>]", "", user_message)

        # Append the user's message to the conversation history
        conversation_history.append({"role": "user", "content": user_message})

        # Log the conversation history before generating a response
        logging.info(f"Conversation history for user {user_id}: {list(conversation_history)}")

        try:
            # Generate response based on conversation history
            response_text = self.get_chat_completion_with_context(conversation_history)

            # Add bot response to conversation history
            conversation_history.append({"role": "assistant", "content": response_text})

            # Log the response generated
            logging.info(f"Generated response for user {user_id}: {response_text}")

            # Save updated conversation history to volume
            self.save_conversation_history()
        except Exception as e:
            # Graceful error handling
            logging.error(f"An error occurred while processing the request for user {user_id}: {e}")
            response_text = f"An error occurred: {e}"

        # Yield the response
        yield PartialResponse(text=response_text.strip())

    def get_user_id(self, request: QueryRequest) -> str:
        """

        Extract or generate a unique user identifier.

        """
        # Use request.user_id if available
        if hasattr(request, 'user_id') and request.user_id:
            return request.user_id

        # Fallback: use a fixed identifier
        return "default_user_id"

    def get_chat_completion_with_context(self, conversation_history: Deque[dict]) -> str:
        """

        Send a chat completion request to LM Studio's /v1/chat/completions endpoint,

        including the full conversation history.

        """
        # Prepare the payload
        payload = {
            "model": MODEL_NAME,
            "messages": list(conversation_history),
            "temperature": 0.7,  # Adjust as needed
            "max_tokens": 1024,
            "stream": False
        }

        logging.info(f"Sending request to LM Studio with payload:\n{json.dumps(payload, indent=2)}")
        response = requests.post(LM_STUDIO_CHAT_URL, json=payload, timeout=120)
        response.raise_for_status()
        response_data = response.json()

        # Use the assistant response content from the response
        if "choices" in response_data and len(response_data["choices"]) > 0:
            generated_text = response_data["choices"][0].get("message", {}).get("content", "")
        else:
            generated_text = ""

        # Fallback in case of empty content
        if not generated_text:
            generated_text = "I'm sorry, I couldn't generate a response. Could you please try again?"

        return generated_text

    def save_conversation_history(self):
        """

        Save the current conversation history to the volume in an atomic way.

        """
        try:
            with tempfile.NamedTemporaryFile('w', delete=False) as tmp_file:
                json.dump(
                    {user_id: list(history) for user_id, history in user_histories.items()},
                    tmp_file,
                    indent=4
                )
                temp_file_path = tmp_file.name
            shutil.move(temp_file_path, VOLUME_PATH)
            logging.info("Successfully saved user conversation histories.")
        except Exception as e:
            logging.error(f"Failed to save user histories: {e}")

    async def get_settings(self, setting: SettingsRequest) -> SettingsResponse:
        """

        Configure the bot's capabilities for Poe, such as enabling attachments.

        """
        return SettingsResponse(
            allow_attachments=True,
            allow_images=True,
            allow_audio=True,
            allow_video=True,
            allow_links=True,
        )

# Modal configuration for the new bot
REQUIREMENTS = ["fastapi-poe==0.0.24", "requests==2.31.0"]
image = Image.debian_slim().pip_install(REQUIREMENTS)

# Mount the volume to persist user histories
app = App(
    "another-secure-lmstudio-poe-bot",
    mounts=[Mount.from_local_dir("/data", remote_path="/data", recursive=True)]
)

@app.function(image=image)
@asgi_app()
def fastapi_app():
    bot = AnotherSecureLMStudioBot()
    return make_app(bot, allow_without_key=True)