|
from flask import Flask, render_template, request, jsonify |
|
import os |
|
import tempfile |
|
from pathlib import Path |
|
from langchain_community.llms import HuggingFaceHub |
|
from langchain.prompts import PromptTemplate |
|
from langchain.chains import LLMChain |
|
from langchain.memory import ConversationBufferMemory |
|
from datetime import datetime |
|
import sqlite3 |
|
from contextlib import contextmanager |
|
from werkzeug.utils import secure_filename |
|
from huggingface_hub import InferenceClient |
|
from langchain.llms.base import LLM |
|
from typing import Optional, List, Any |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
from transformers import pipeline |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
PORT = int(os.environ.get("PORT", 7860)) |
|
CACHE_DIR = "/tmp/huggingface_cache" |
|
UPLOAD_DIR = "/tmp/uploads" |
|
DATABASE_PATH = "/tmp/chat_database.db" |
|
|
|
|
|
for dir_path in [CACHE_DIR, UPLOAD_DIR]: |
|
Path(dir_path).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
os.environ['TRANSFORMERS_CACHE'] = CACHE_DIR |
|
os.environ['HUGGINGFACE_HUB_CACHE'] = CACHE_DIR |
|
|
|
for dir_path in [CACHE_DIR, UPLOAD_DIR]: |
|
Path(dir_path).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
os.environ['TRANSFORMERS_CACHE'] = CACHE_DIR |
|
os.environ['HUGGINGFACE_HUB_CACHE'] = CACHE_DIR |
|
|
|
|
|
print(f"Transformers Cache Directory: {os.environ['TRANSFORMERS_CACHE']}") |
|
print(f"Hugging Face Hub Cache Directory: {os.environ['HUGGINGFACE_HUB_CACHE']}") |
|
|
|
|
|
if os.access(CACHE_DIR, os.W_OK): |
|
print(f"Cache directory {CACHE_DIR} is writable.") |
|
else: |
|
print(f"Cache directory {CACHE_DIR} is NOT writable.") |
|
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_DIR |
|
|
|
|
|
prompt_template = """ |
|
Role: You are Figr Code Assistant, specializing in providing clear, error-free Python code solutions. |
|
|
|
Context: |
|
{important_info} |
|
|
|
Previous Conversation: |
|
{chat_history} |
|
|
|
Current Request: |
|
{user_request} |
|
|
|
Output Guidelines: |
|
1. Code Format: |
|
- Use ```python for code blocks |
|
- Use `code` for inline code references |
|
- Provide raw text without HTML formatting |
|
- Include explanations after code blocks |
|
|
|
2. Code Organization: |
|
- Default to single, focused code snippets |
|
- Only split into multiple snippets if necessary |
|
- Mark critical information with [IMPORTANT] prefix |
|
|
|
Please provide a clear and detailed response: |
|
""" |
|
|
|
prompt = PromptTemplate( |
|
input_variables=["user_request", "chat_history", "important_info"], |
|
template=prompt_template |
|
) |
|
|
|
class CustomHuggingFaceInference(LLM): |
|
client: Any |
|
model: str |
|
|
|
def __init__(self, token: str): |
|
super().__init__() |
|
self.client = InferenceClient(token=token) |
|
self.model = "mistralai/Mistral-7B-Instruct-v0.1" |
|
|
|
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: |
|
response = self.client.text_generation( |
|
prompt, |
|
model=self.model, |
|
max_new_tokens=512, |
|
temperature=0.7, |
|
top_p=0.95, |
|
repetition_penalty=1.15 |
|
) |
|
return response |
|
|
|
@property |
|
def _identifying_params(self): |
|
return {"model": self.model} |
|
|
|
@property |
|
def _llm_type(self): |
|
return "custom_huggingface" |
|
|
|
|
|
|
|
@contextmanager |
|
def get_db_connection(): |
|
"""Context manager for database connections""" |
|
conn = sqlite3.connect(DATABASE_PATH) |
|
conn.row_factory = sqlite3.Row |
|
try: |
|
yield conn |
|
finally: |
|
conn.close() |
|
|
|
def init_db(): |
|
"""Initialize the database""" |
|
with get_db_connection() as conn: |
|
conn.execute(''' |
|
CREATE TABLE IF NOT EXISTS chats ( |
|
id TEXT PRIMARY KEY, |
|
title TEXT, |
|
date TEXT, |
|
last_message TEXT |
|
) |
|
''') |
|
|
|
conn.execute(''' |
|
CREATE TABLE IF NOT EXISTS messages ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
chat_id TEXT, |
|
role TEXT, |
|
content TEXT, |
|
timestamp TEXT, |
|
FOREIGN KEY (chat_id) REFERENCES chats (id) |
|
) |
|
''') |
|
|
|
conn.execute(''' |
|
CREATE TABLE IF NOT EXISTS important_info ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
chat_id TEXT, |
|
content TEXT, |
|
FOREIGN KEY (chat_id) REFERENCES chats (id) |
|
) |
|
''') |
|
conn.commit() |
|
|
|
def initialize_llm(): |
|
"""Initialize the LLM using transformers pipeline.""" |
|
try: |
|
model_name = "mistralai/Mistral-7B-Instruct-v0.3" |
|
llm = pipeline("text-generation", model=model_name) |
|
print("LLM initialized successfully!") |
|
return llm |
|
except Exception as e: |
|
print(f"LLM initialization error: {str(e)}") |
|
return None |
|
|
|
|
|
class ChatSession: |
|
def __init__(self, session_id): |
|
self.session_id = session_id |
|
self.memory = ConversationBufferMemory( |
|
memory_key="chat_history", |
|
return_messages=True |
|
) |
|
self._load_chat_history() |
|
self._load_important_info() |
|
|
|
def _load_chat_history(self): |
|
"""Load chat history from database""" |
|
with get_db_connection() as conn: |
|
messages = conn.execute( |
|
'SELECT role, content, timestamp FROM messages WHERE chat_id = ? ORDER BY timestamp', |
|
(self.session_id,) |
|
).fetchall() |
|
|
|
self.chat_history = [] |
|
for msg in messages: |
|
self.chat_history.append({ |
|
"role": msg['role'], |
|
"content": msg['content'], |
|
"timestamp": msg['timestamp'] |
|
}) |
|
if msg['role'] == "user": |
|
self.memory.chat_memory.add_user_message(msg['content']) |
|
else: |
|
self.memory.chat_memory.add_ai_message(msg['content']) |
|
|
|
def _load_important_info(self): |
|
"""Load important info from database""" |
|
with get_db_connection() as conn: |
|
info = conn.execute( |
|
'SELECT content FROM important_info WHERE chat_id = ?', |
|
(self.session_id,) |
|
).fetchall() |
|
self.important_info = [row['content'] for row in info] |
|
|
|
def add_message(self, role, content): |
|
timestamp = datetime.now().isoformat() |
|
message = { |
|
"role": role, |
|
"content": content, |
|
"timestamp": timestamp |
|
} |
|
|
|
|
|
with get_db_connection() as conn: |
|
conn.execute( |
|
'INSERT INTO messages (chat_id, role, content, timestamp) VALUES (?, ?, ?, ?)', |
|
(self.session_id, role, content, timestamp) |
|
) |
|
conn.commit() |
|
|
|
self.chat_history.append(message) |
|
|
|
|
|
if role == "user": |
|
self.memory.chat_memory.add_user_message(content) |
|
else: |
|
self.memory.chat_memory.add_ai_message(content) |
|
|
|
def add_important_info(self, content): |
|
"""Add important information to database""" |
|
with get_db_connection() as conn: |
|
conn.execute( |
|
'INSERT INTO important_info (chat_id, content) VALUES (?, ?)', |
|
(self.session_id, content) |
|
) |
|
conn.commit() |
|
self.important_info.append(content) |
|
|
|
def get_memory_variables(self): |
|
return self.memory.load_memory_variables({}) |
|
|
|
def clear_memory(self): |
|
"""Clear all memory from database""" |
|
with get_db_connection() as conn: |
|
conn.execute('DELETE FROM messages WHERE chat_id = ?', (self.session_id,)) |
|
conn.execute('DELETE FROM important_info WHERE chat_id = ?', (self.session_id,)) |
|
conn.commit() |
|
|
|
self.memory.clear() |
|
self.chat_history = [] |
|
self.important_info = [] |
|
|
|
def clear_chat_history(self): |
|
"""Clear chat history from database""" |
|
with get_db_connection() as conn: |
|
conn.execute('DELETE FROM messages WHERE chat_id = ?', (self.session_id,)) |
|
conn.commit() |
|
|
|
self.chat_history = [] |
|
self.memory.chat_memory.clear() |
|
|
|
def clear_important_info(self): |
|
"""Clear important info from database""" |
|
with get_db_connection() as conn: |
|
conn.execute('DELETE FROM important_info WHERE chat_id = ?', (self.session_id,)) |
|
conn.commit() |
|
|
|
self.important_info = [] |
|
|
|
print("Starting application initialization...") |
|
|
|
try: |
|
print("Initializing database...") |
|
init_db() |
|
print("Database initialized successfully") |
|
llm = initialize_llm() |
|
|
|
|
|
|
|
except Exception as e: |
|
print(f"Fatal initialization error: {e}") |
|
raise |
|
|
|
|
|
|
|
prompt_template = """ |
|
Role: You are Figr Code Assistant, specializing in providing clear, error-free Python code solutions. |
|
|
|
Context: |
|
{important_info} |
|
|
|
Previous Conversation: |
|
{chat_history} |
|
|
|
Current Request: |
|
{user_request} |
|
|
|
Output Guidelines: |
|
1. Code Format: |
|
- Use ```python for code blocks |
|
- Use `code` for inline code references |
|
- Provide raw text without HTML formatting |
|
- Strictly include explanation only after code blocks |
|
|
|
2. Code Organization: |
|
- Default to single, focused code snippets for clarity |
|
- Only split into multiple snippets(each individually runnable) if: |
|
a) Multiple distinct concepts are requested |
|
b) Complex functionality requires modular explanation |
|
|
|
- Mark critical information with [IMPORTANT] prefix and give small explanations with some bold headings if required and in white font always. |
|
""" |
|
|
|
def generate_prompt(user_request, chat_history, important_info): |
|
return prompt_template.format( |
|
user_request=user_request, |
|
chat_history=chat_history, |
|
important_info=important_info |
|
) |
|
|
|
|
|
def convert_to_html(raw_text): |
|
"""Convert markdown to HTML while preserving code blocks with custom buttons""" |
|
try: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".md") as temp_input: |
|
temp_input.write(raw_text) |
|
temp_input_path = temp_input.name |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as temp_output: |
|
temp_output_path = temp_output.name |
|
|
|
|
|
cmd = [ |
|
"pandoc", |
|
temp_input_path, |
|
"-f", "markdown", |
|
"-t", "html", |
|
"--highlight-style=pygments", |
|
"--no-highlight", |
|
"-o", temp_output_path |
|
] |
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
|
if result.returncode == 0: |
|
with open(temp_output_path, "r") as f: |
|
html_content = f.read() |
|
|
|
|
|
import re |
|
def replace_code_block(match): |
|
code_class = match.group(1) or '' |
|
code_content = match.group(2) |
|
return f''' |
|
<div class="code-block-wrapper"> |
|
<button class="test-button">Test Code</button> |
|
<button class="copy-button">Copy Code</button> |
|
<pre><code class="hljs {code_class}">{code_content}</code></pre> |
|
<div class="test-results"></div> |
|
</div> |
|
''' |
|
|
|
|
|
pattern = r'<pre><code class="([^"]*)">(.*?)</code></pre>' |
|
html_content = re.sub(pattern, replace_code_block, html_content, flags=re.DOTALL) |
|
|
|
else: |
|
html_content = f"Error: {result.stderr}" |
|
|
|
finally: |
|
|
|
if os.path.exists(temp_input_path): |
|
os.remove(temp_input_path) |
|
if os.path.exists(temp_output_path): |
|
os.remove(temp_output_path) |
|
|
|
return html_content |
|
|
|
|
|
def allowed_file(filename): |
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
|
|
|
def extract_important_info(response): |
|
important_items = [] |
|
for line in response.split('\n'): |
|
if '[IMPORTANT]' in line: |
|
important_items.append(line.replace('[IMPORTANT]', '').strip()) |
|
return important_items |
|
|
|
def create_new_chat(session_id: str): |
|
"""Create a new chat session with metadata in database""" |
|
with get_db_connection() as conn: |
|
conn.execute( |
|
'INSERT INTO chats (id, title, date, last_message) VALUES (?, ?, ?, ?)', |
|
(session_id, "New Chat", datetime.now().isoformat(), None) |
|
) |
|
conn.commit() |
|
|
|
return { |
|
"id": session_id, |
|
"title": "New Chat", |
|
"date": datetime.now().isoformat(), |
|
"last_message": None |
|
} |
|
|
|
|
|
def update_chat_metadata(session_id: str, last_message: str): |
|
"""Update chat metadata in database""" |
|
title = last_message[:30] + "..." if len(last_message) > 30 else last_message |
|
with get_db_connection() as conn: |
|
conn.execute( |
|
'UPDATE chats SET title = ?, last_message = ? WHERE id = ?', |
|
(title, last_message, session_id) |
|
) |
|
conn.commit() |
|
|
|
|
|
def format_response(response): |
|
"""Format response with proper code block structure""" |
|
|
|
formatted = re.sub( |
|
r'```(\w+)\n(.*?)\n```', |
|
lambda |
|
m: f'<div class="code-block-wrapper">\n<button class="test-button">Test Code</button>\n<button class="copy-button">Copy Code</button>\n<pre><code class="hljs {m.group(1)}">{m.group(2)}</code></pre>\n<div class="test-results"></div>\n</div>', |
|
response, |
|
flags=re.DOTALL |
|
) |
|
|
|
|
|
formatted = re.sub( |
|
r'```\n(.*?)\n```', |
|
lambda |
|
m: f'<div class="code-block-wrapper">\n<button class="test-button">Test Code</button>\n<button class="copy-button">Copy Code</button>\n<pre><code class="hljs">{m.group(1)}</code></pre>\n<div class="test-results"></div>\n</div>', |
|
formatted, |
|
flags=re.DOTALL |
|
) |
|
|
|
|
|
formatted = re.sub( |
|
r'`([^`]+)`', |
|
r'<code class="inline-code">\1</code>', |
|
formatted |
|
) |
|
|
|
return formatted |
|
|
|
|
|
@app.route("/api/chat-list", methods=["GET"]) |
|
def get_chat_list(): |
|
"""Get list of all chats from database""" |
|
with get_db_connection() as conn: |
|
chats = conn.execute('SELECT * FROM chats ORDER BY date DESC').fetchall() |
|
return jsonify({ |
|
"chats": [dict(chat) for chat in chats] |
|
}) |
|
|
|
|
|
|
|
|
|
@app.route("/api/chat", methods=["POST"]) |
|
def chat(): |
|
try: |
|
|
|
data = request.json |
|
user_input = data.get("message", "") |
|
print(f"Received message: {user_input}") |
|
|
|
|
|
chat_history = "" |
|
important_info = "Provide code examples for Python programming." |
|
|
|
|
|
prompt = generate_prompt(user_input, chat_history, important_info) |
|
|
|
|
|
if llm: |
|
response = llm(prompt, max_length=150)[0]['generated_text'] |
|
print(f"Raw response received: {response}") |
|
|
|
|
|
return jsonify({ |
|
"success": True, |
|
"response": response |
|
}) |
|
|
|
else: |
|
raise ValueError("LLM not initialized properly.") |
|
|
|
except Exception as e: |
|
print(f"Request error: {str(e)}") |
|
return jsonify({ |
|
"success": False, |
|
"response": "Sorry, there was a problem with your request." |
|
}) |
|
|
|
|
|
|
|
@app.route("/api/new-chat", methods=["POST"]) |
|
def new_chat(): |
|
"""Create a new chat session""" |
|
session_id = str(datetime.now().timestamp()) |
|
chat = create_new_chat(session_id) |
|
return jsonify({"success": True, "chat": chat}) |
|
|
|
|
|
@app.route("/api/chat-history", methods=["GET"]) |
|
def get_chat_history(): |
|
"""Get chat history for a specific session""" |
|
session_id = request.args.get("sessionId", "default") |
|
|
|
with get_db_connection() as conn: |
|
|
|
messages = conn.execute( |
|
'SELECT role, content, timestamp FROM messages WHERE chat_id = ? ORDER BY timestamp', |
|
(session_id,) |
|
).fetchall() |
|
|
|
|
|
formatted_messages = [] |
|
for msg in messages: |
|
message_dict = dict(msg) |
|
if message_dict['role'] == 'assistant' and '```' in message_dict['content']: |
|
|
|
message_dict['content'] = format_response(message_dict['content']) |
|
formatted_messages.append(message_dict) |
|
|
|
|
|
important_info = conn.execute( |
|
'SELECT content FROM important_info WHERE chat_id = ?', |
|
(session_id,) |
|
).fetchall() |
|
|
|
return jsonify({ |
|
"history": formatted_messages, |
|
"important_info": [info['content'] for info in important_info] |
|
}) |
|
|
|
|
|
@app.route('/api/upload', methods=['POST']) |
|
def upload_file(): |
|
if 'file' not in request.files: |
|
return jsonify({'success': False, 'error': 'No file part'}) |
|
|
|
file = request.files['file'] |
|
if file.filename == '': |
|
return jsonify({'success': False, 'error': 'No selected file'}) |
|
|
|
if file and allowed_file(file.filename): |
|
filename = secure_filename(file.filename) |
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
file.save(filepath) |
|
|
|
|
|
with open(filepath, 'r') as f: |
|
content = f.read() |
|
|
|
|
|
analysis_prompt = f""" |
|
Please analyze this Python code: |
|
|
|
{content} |
|
|
|
Provide: |
|
1. A clear, small explanation |
|
2. Any potential errors or improvements |
|
3. Suggestions for better practices |
|
|
|
- Each in separate neat paragraphs with highlighted headings. |
|
""" |
|
|
|
analysis = llm.predict(analysis_prompt) |
|
|
|
|
|
os.remove(filepath) |
|
|
|
return jsonify({ |
|
'success': True, |
|
'filename': filename, |
|
'content': content, |
|
'analysis': analysis |
|
}) |
|
|
|
return jsonify({'success': False, 'error': 'Invalid file type'}) |
|
|
|
|
|
@app.route("/api/clear-memory", methods=["POST"]) |
|
def clear_memory(): |
|
"""Clear memory based on specified option""" |
|
session_id = request.json.get("sessionId", "default") |
|
clear_option = request.json.get("clearOption", "all") |
|
|
|
with get_db_connection() as conn: |
|
try: |
|
if clear_option == "all": |
|
conn.execute('DELETE FROM messages WHERE chat_id = ?', (session_id,)) |
|
conn.execute('DELETE FROM important_info WHERE chat_id = ?', (session_id,)) |
|
message = "All memory cleared successfully" |
|
elif clear_option == "chat": |
|
conn.execute('DELETE FROM messages WHERE chat_id = ?', (session_id,)) |
|
message = "Chat history cleared successfully" |
|
elif clear_option == "important": |
|
conn.execute('DELETE FROM important_info WHERE chat_id = ?', (session_id,)) |
|
message = "Important information cleared successfully" |
|
else: |
|
return jsonify({ |
|
"success": False, |
|
"message": "Invalid clear option specified" |
|
}) |
|
|
|
conn.commit() |
|
return jsonify({ |
|
"success": True, |
|
"message": message |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({ |
|
"success": False, |
|
"message": f"Error clearing memory: {str(e)}" |
|
}) |
|
|
|
|
|
@app.route("/api/test-code", methods=["POST"]) |
|
def test_code(): |
|
try: |
|
data = request.json |
|
code = data.get("code", "") |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: |
|
f.write(code) |
|
temp_file = f.name |
|
|
|
try: |
|
|
|
result = subprocess.run( |
|
['python', temp_file], |
|
capture_output=True, |
|
text=True, |
|
timeout=5 |
|
) |
|
|
|
|
|
success = result.returncode == 0 |
|
output = result.stdout if success else result.stderr |
|
|
|
return jsonify({ |
|
"success": success, |
|
"output": output |
|
}) |
|
|
|
finally: |
|
|
|
os.unlink(temp_file) |
|
|
|
except Exception as e: |
|
return jsonify({ |
|
"success": False, |
|
"output": f"Error executing code: {str(e)}" |
|
}) |
|
|
|
|
|
@app.route("/") |
|
def home(): |
|
"""Serve the main application page""" |
|
return render_template("index.html") |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=PORT) |