from flask import Flask, render_template, request, jsonify import os import tempfile from pathlib import Path from langchain_community.llms import HuggingFaceHub from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.memory import ConversationBufferMemory from datetime import datetime import sqlite3 from contextlib import contextmanager from werkzeug.utils import secure_filename from huggingface_hub import InferenceClient from langchain.llms.base import LLM from typing import Optional, List, Any from transformers import AutoTokenizer, AutoModelForCausalLM from transformers import pipeline # Initialize Flask application app = Flask(__name__) # Configuration PORT = int(os.environ.get("PORT", 7860)) CACHE_DIR = "/tmp/huggingface_cache" UPLOAD_DIR = "/tmp/uploads" DATABASE_PATH = "/tmp/chat_database.db" # Create necessary directories for dir_path in [CACHE_DIR, UPLOAD_DIR]: Path(dir_path).mkdir(parents=True, exist_ok=True) # Set environment variables os.environ['TRANSFORMERS_CACHE'] = CACHE_DIR os.environ['HUGGINGFACE_HUB_CACHE'] = CACHE_DIR for dir_path in [CACHE_DIR, UPLOAD_DIR]: Path(dir_path).mkdir(parents=True, exist_ok=True) # Set environment variables os.environ['TRANSFORMERS_CACHE'] = CACHE_DIR os.environ['HUGGINGFACE_HUB_CACHE'] = CACHE_DIR # Add some debugging to verify the environment variables print(f"Transformers Cache Directory: {os.environ['TRANSFORMERS_CACHE']}") print(f"Hugging Face Hub Cache Directory: {os.environ['HUGGINGFACE_HUB_CACHE']}") # Check if cache directories are writable if os.access(CACHE_DIR, os.W_OK): print(f"Cache directory {CACHE_DIR} is writable.") else: print(f"Cache directory {CACHE_DIR} is NOT writable.") # Configure Flask app app.config['UPLOAD_FOLDER'] = UPLOAD_DIR # Initialize prompt template prompt_template = """ Role: You are Figr Code Assistant, specializing in providing clear, error-free Python code solutions. Context: {important_info} Previous Conversation: {chat_history} Current Request: {user_request} Output Guidelines: 1. Code Format: - Use ```python for code blocks - Use `code` for inline code references - Provide raw text without HTML formatting - Include explanations after code blocks 2. Code Organization: - Default to single, focused code snippets - Only split into multiple snippets if necessary - Mark critical information with [IMPORTANT] prefix Please provide a clear and detailed response: """ prompt = PromptTemplate( input_variables=["user_request", "chat_history", "important_info"], template=prompt_template ) class CustomHuggingFaceInference(LLM): client: Any model: str def __init__(self, token: str): super().__init__() self.client = InferenceClient(token=token) self.model = "mistralai/Mistral-7B-Instruct-v0.1" def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: response = self.client.text_generation( prompt, model=self.model, max_new_tokens=512, temperature=0.7, top_p=0.95, repetition_penalty=1.15 ) return response @property def _identifying_params(self): return {"model": self.model} @property def _llm_type(self): return "custom_huggingface" @contextmanager def get_db_connection(): """Context manager for database connections""" conn = sqlite3.connect(DATABASE_PATH) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def init_db(): """Initialize the database""" with get_db_connection() as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS chats ( id TEXT PRIMARY KEY, title TEXT, date TEXT, last_message TEXT ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id TEXT, role TEXT, content TEXT, timestamp TEXT, FOREIGN KEY (chat_id) REFERENCES chats (id) ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS important_info ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id TEXT, content TEXT, FOREIGN KEY (chat_id) REFERENCES chats (id) ) ''') conn.commit() def initialize_llm(): """Initialize the LLM using transformers pipeline.""" try: model_name = "mistralai/Mistral-7B-Instruct-v0.3" llm = pipeline("text-generation", model=model_name) print("LLM initialized successfully!") return llm except Exception as e: print(f"LLM initialization error: {str(e)}") return None class ChatSession: def __init__(self, session_id): self.session_id = session_id self.memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True ) self._load_chat_history() self._load_important_info() def _load_chat_history(self): """Load chat history from database""" with get_db_connection() as conn: messages = conn.execute( 'SELECT role, content, timestamp FROM messages WHERE chat_id = ? ORDER BY timestamp', (self.session_id,) ).fetchall() self.chat_history = [] for msg in messages: self.chat_history.append({ "role": msg['role'], "content": msg['content'], "timestamp": msg['timestamp'] }) if msg['role'] == "user": self.memory.chat_memory.add_user_message(msg['content']) else: self.memory.chat_memory.add_ai_message(msg['content']) def _load_important_info(self): """Load important info from database""" with get_db_connection() as conn: info = conn.execute( 'SELECT content FROM important_info WHERE chat_id = ?', (self.session_id,) ).fetchall() self.important_info = [row['content'] for row in info] def add_message(self, role, content): timestamp = datetime.now().isoformat() message = { "role": role, "content": content, "timestamp": timestamp } # Store in database with get_db_connection() as conn: conn.execute( 'INSERT INTO messages (chat_id, role, content, timestamp) VALUES (?, ?, ?, ?)', (self.session_id, role, content, timestamp) ) conn.commit() self.chat_history.append(message) # Update memory if role == "user": self.memory.chat_memory.add_user_message(content) else: self.memory.chat_memory.add_ai_message(content) def add_important_info(self, content): """Add important information to database""" with get_db_connection() as conn: conn.execute( 'INSERT INTO important_info (chat_id, content) VALUES (?, ?)', (self.session_id, content) ) conn.commit() self.important_info.append(content) def get_memory_variables(self): return self.memory.load_memory_variables({}) def clear_memory(self): """Clear all memory from database""" with get_db_connection() as conn: conn.execute('DELETE FROM messages WHERE chat_id = ?', (self.session_id,)) conn.execute('DELETE FROM important_info WHERE chat_id = ?', (self.session_id,)) conn.commit() self.memory.clear() self.chat_history = [] self.important_info = [] def clear_chat_history(self): """Clear chat history from database""" with get_db_connection() as conn: conn.execute('DELETE FROM messages WHERE chat_id = ?', (self.session_id,)) conn.commit() self.chat_history = [] self.memory.chat_memory.clear() def clear_important_info(self): """Clear important info from database""" with get_db_connection() as conn: conn.execute('DELETE FROM important_info WHERE chat_id = ?', (self.session_id,)) conn.commit() self.important_info = [] print("Starting application initialization...") try: print("Initializing database...") init_db() print("Database initialized successfully") llm = initialize_llm() except Exception as e: print(f"Fatal initialization error: {e}") raise # Rest of the prompt template and other configurations remain the same prompt_template = """ Role: You are Figr Code Assistant, specializing in providing clear, error-free Python code solutions. Context: {important_info} Previous Conversation: {chat_history} Current Request: {user_request} Output Guidelines: 1. Code Format: - Use ```python for code blocks - Use `code` for inline code references - Provide raw text without HTML formatting - Strictly include explanation only after code blocks 2. Code Organization: - Default to single, focused code snippets for clarity - Only split into multiple snippets(each individually runnable) if: a) Multiple distinct concepts are requested b) Complex functionality requires modular explanation - Mark critical information with [IMPORTANT] prefix and give small explanations with some bold headings if required and in white font always. """ def generate_prompt(user_request, chat_history, important_info): return prompt_template.format( user_request=user_request, chat_history=chat_history, important_info=important_info ) def convert_to_html(raw_text): """Convert markdown to HTML while preserving code blocks with custom buttons""" try: # Create a temporary markdown file with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".md") as temp_input: temp_input.write(raw_text) temp_input_path = temp_input.name # Use pandoc with specific options to preserve code blocks with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as temp_output: temp_output_path = temp_output.name # Use pandoc with specific options cmd = [ "pandoc", temp_input_path, "-f", "markdown", "-t", "html", "--highlight-style=pygments", "--no-highlight", # Disable pandoc's highlighting "-o", temp_output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: with open(temp_output_path, "r") as f: html_content = f.read() # Add custom buttons to code blocks import re def replace_code_block(match): code_class = match.group(1) or '' code_content = match.group(2) return f'''
{code_content}
blocks with our custom wrapper
pattern = r'(.*?)
'
html_content = re.sub(pattern, replace_code_block, html_content, flags=re.DOTALL)
else:
html_content = f"Error: {result.stderr}"
finally:
# Clean up temporary files
if os.path.exists(temp_input_path):
os.remove(temp_input_path)
if os.path.exists(temp_output_path):
os.remove(temp_output_path)
return html_content
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def extract_important_info(response):
important_items = []
for line in response.split('\n'):
if '[IMPORTANT]' in line:
important_items.append(line.replace('[IMPORTANT]', '').strip())
return important_items
def create_new_chat(session_id: str):
"""Create a new chat session with metadata in database"""
with get_db_connection() as conn:
conn.execute(
'INSERT INTO chats (id, title, date, last_message) VALUES (?, ?, ?, ?)',
(session_id, "New Chat", datetime.now().isoformat(), None)
)
conn.commit()
return {
"id": session_id,
"title": "New Chat",
"date": datetime.now().isoformat(),
"last_message": None
}
def update_chat_metadata(session_id: str, last_message: str):
"""Update chat metadata in database"""
title = last_message[:30] + "..." if len(last_message) > 30 else last_message
with get_db_connection() as conn:
conn.execute(
'UPDATE chats SET title = ?, last_message = ? WHERE id = ?',
(title, last_message, session_id)
)
conn.commit()
def format_response(response):
"""Format response with proper code block structure"""
# First, handle code blocks with language specification
formatted = re.sub(
r'```(\w+)\n(.*?)\n```',
lambda
m: f'\n\n\n{m.group(2)}
\n\n',
response,
flags=re.DOTALL
)
# Then handle code blocks without language specification
formatted = re.sub(
r'```\n(.*?)\n```',
lambda
m: f'\n\n\n{m.group(1)}
\n\n',
formatted,
flags=re.DOTALL
)
# Handle inline code
formatted = re.sub(
r'`([^`]+)`',
r'\1
',
formatted
)
return formatted
@app.route("/api/chat-list", methods=["GET"])
def get_chat_list():
"""Get list of all chats from database"""
with get_db_connection() as conn:
chats = conn.execute('SELECT * FROM chats ORDER BY date DESC').fetchall()
return jsonify({
"chats": [dict(chat) for chat in chats]
})
# The rest of your route handlers (convert_to_html, extract_important_info, etc.) remain the same
@app.route("/api/chat", methods=["POST"])
def chat():
try:
# Get the user's message from the request
data = request.json
user_input = data.get("message", "")
print(f"Received message: {user_input}")
# Very simple example for the chat history and important information (can be more complex)
chat_history = "" # Assuming you maintain chat history somehow
important_info = "Provide code examples for Python programming."
# Generate the prompt
prompt = generate_prompt(user_input, chat_history, important_info)
# Get response from the model using transformers pipeline
if llm:
response = llm(prompt, max_length=150)[0]['generated_text']
print(f"Raw response received: {response}")
# Return the model's response
return jsonify({
"success": True,
"response": response
})
else:
raise ValueError("LLM not initialized properly.")
except Exception as e:
print(f"Request error: {str(e)}")
return jsonify({
"success": False,
"response": "Sorry, there was a problem with your request."
})
@app.route("/api/new-chat", methods=["POST"])
def new_chat():
"""Create a new chat session"""
session_id = str(datetime.now().timestamp())
chat = create_new_chat(session_id)
return jsonify({"success": True, "chat": chat})
@app.route("/api/chat-history", methods=["GET"])
def get_chat_history():
"""Get chat history for a specific session"""
session_id = request.args.get("sessionId", "default")
with get_db_connection() as conn:
# Get messages
messages = conn.execute(
'SELECT role, content, timestamp FROM messages WHERE chat_id = ? ORDER BY timestamp',
(session_id,)
).fetchall()
# Format assistant messages if they aren't already formatted
formatted_messages = []
for msg in messages:
message_dict = dict(msg)
if message_dict['role'] == 'assistant' and '```' in message_dict['content']:
# Format the response if it contains code blocks
message_dict['content'] = format_response(message_dict['content'])
formatted_messages.append(message_dict)
# Get important info
important_info = conn.execute(
'SELECT content FROM important_info WHERE chat_id = ?',
(session_id,)
).fetchall()
return jsonify({
"history": formatted_messages,
"important_info": [info['content'] for info in important_info]
})
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'success': False, 'error': 'No file part'})
file = request.files['file']
if file.filename == '':
return jsonify({'success': False, 'error': 'No selected file'})
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
# Read the file content
with open(filepath, 'r') as f:
content = f.read()
# Analyze the code using the LLM
analysis_prompt = f"""
Please analyze this Python code:
{content}
Provide:
1. A clear, small explanation
2. Any potential errors or improvements
3. Suggestions for better practices
- Each in separate neat paragraphs with highlighted headings.
"""
analysis = llm.predict(analysis_prompt)
# Clean up the uploaded file
os.remove(filepath)
return jsonify({
'success': True,
'filename': filename,
'content': content,
'analysis': analysis
})
return jsonify({'success': False, 'error': 'Invalid file type'})
@app.route("/api/clear-memory", methods=["POST"])
def clear_memory():
"""Clear memory based on specified option"""
session_id = request.json.get("sessionId", "default")
clear_option = request.json.get("clearOption", "all")
with get_db_connection() as conn:
try:
if clear_option == "all":
conn.execute('DELETE FROM messages WHERE chat_id = ?', (session_id,))
conn.execute('DELETE FROM important_info WHERE chat_id = ?', (session_id,))
message = "All memory cleared successfully"
elif clear_option == "chat":
conn.execute('DELETE FROM messages WHERE chat_id = ?', (session_id,))
message = "Chat history cleared successfully"
elif clear_option == "important":
conn.execute('DELETE FROM important_info WHERE chat_id = ?', (session_id,))
message = "Important information cleared successfully"
else:
return jsonify({
"success": False,
"message": "Invalid clear option specified"
})
conn.commit()
return jsonify({
"success": True,
"message": message
})
except Exception as e:
return jsonify({
"success": False,
"message": f"Error clearing memory: {str(e)}"
})
@app.route("/api/test-code", methods=["POST"])
def test_code():
try:
data = request.json
code = data.get("code", "")
# Create a temporary file to store the code
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
temp_file = f.name
try:
# Run the code using Python
result = subprocess.run(
['python', temp_file],
capture_output=True,
text=True,
timeout=5 # 5 second timeout for safety
)
# Prepare the response
success = result.returncode == 0
output = result.stdout if success else result.stderr
return jsonify({
"success": success,
"output": output
})
finally:
# Clean up the temporary file
os.unlink(temp_file)
except Exception as e:
return jsonify({
"success": False,
"output": f"Error executing code: {str(e)}"
})
@app.route("/")
def home():
"""Serve the main application page"""
return render_template("index.html")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=PORT)