Spaces:
Running
Running
import json | |
import os | |
import magic | |
from dotenv import load_dotenv | |
from docx import Document | |
from docx.shared import Inches | |
from docx.enum.text import WD_ALIGN_PARAGRAPH | |
from flask_cors import CORS | |
from flask import Flask, request, jsonify | |
from supabase import create_client, Client | |
import logging | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
# load_dotenv(dotenv_path='.env.local') | |
load_dotenv() | |
app = Flask(__name__) | |
CORS(app) | |
url: str = 'https://dtzuqtvroalrjhgdcowq.supabase.co/' | |
key: str = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImR0enVxdHZyb2FscmpoZ2Rjb3dxIiwicm9sZSI6ImFub24iLCJpYXQiOjE3MjU0NDk3MzIsImV4cCI6MjA0MTAyNTczMn0.WrIvwEOq4CqCb8IkU8G4jiWkf9DM1JxGd2_aTN4vlV4' | |
supabase: Client = create_client(url, key) | |
def get_file_by_id(file_id): | |
try: | |
response = supabase.table("files").select("*").eq("id", file_id).single().execute() | |
file = response.data | |
if not file: | |
raise ValueError(response.error.message if response.error else "File not found.") | |
file_path = file.get("file_path") | |
file_name = file.get("name") | |
if not file_path: | |
raise ValueError("File path is missing in the metadata.") | |
# Fetch the actual file content from Supabase storage | |
file_data = supabase.storage.from_('files').download(file_path) | |
return file_name, file_data | |
except Exception as e: | |
print("Error fetching file:", e) | |
return jsonify({"error": str(e)}), 500 | |
def get_file_type(file_path): | |
try: | |
# Use python-magic to detect the MIME type of the file | |
mime = magic.Magic(mime=True) | |
file_type = mime.from_file(file_path) | |
return file_type | |
except Exception as e: | |
print("Error fetching file:", e) | |
return jsonify({"error": str(e)}), 500 | |
def insert_file_record(user_id, doc): | |
try: | |
file_type = get_file_type(doc) | |
file_record = { | |
"user_id": user_id, | |
"description": "", | |
"file_path": "", | |
"name": "letterhead-" + os.path.basename(doc), | |
"size": os.path.getsize(doc), | |
"tokens": 0, | |
"type": file_type, | |
} | |
response = supabase.table("files").insert(file_record).execute() | |
return response | |
except Exception as e: | |
print("Error fetching file:", e) | |
return jsonify({"error": str(e)}), 500 | |
def upload_file_to_storage(file, metadata): | |
# Replace with the actual upload implementation | |
file_path = f"{metadata['user_id']}/{metadata['file_id']}" | |
# file_content = file.read() # Read the file content as bytes | |
file_type = get_file_type(file) | |
try: | |
with open(file, 'rb') as f: | |
response = supabase.storage.from_("files").upload( | |
file=f, | |
path=file_path, | |
file_options={"cache-control": "3600", "content-type": file_type, "upsert": "false"}, | |
) | |
file_path = response.path | |
return file_path | |
except Exception as e: | |
print("Error uploading file:", e) | |
return jsonify({"error": str(e)}), 500 | |
def update_file_record(file_id, updates): | |
try: | |
response = supabase.table("files").update(updates).eq("id", file_id).execute() | |
return response | |
except Exception as e: | |
print("Error while updating record:", e) | |
return jsonify({"error": str(e)}), 500 | |
def insert_text_and_image_at_end(full_path, full_image_path, text_to_insert, include_signature, signature_position, letterhead_address): | |
try: | |
doc = Document(full_path) | |
# Replace placeholder <<SENDER_ADDRESS>> with the letterhead address | |
for paragraph in doc.paragraphs: | |
if ("<<SENDER_ADDRESS>>" in paragraph.text and letterhead_address): | |
for run in paragraph.runs: | |
run.text = run.text.replace("<<SENDER_ADDRESS>>", letterhead_address) | |
# Add the new text at the end of the document | |
doc.add_paragraph(text_to_insert) | |
# Add the image at the end of the document with position adjustment | |
if (include_signature and full_image_path): | |
image_paragraph = doc.add_paragraph() | |
run = image_paragraph.add_run() | |
run.add_picture(full_image_path, width=Inches(1), height=Inches(1)) | |
# Adjust the alignment based on signature_position | |
if signature_position == 'left': | |
image_paragraph.alignment = WD_ALIGN_PARAGRAPH.LEFT | |
elif signature_position == 'right': | |
image_paragraph.alignment = WD_ALIGN_PARAGRAPH.RIGHT | |
elif signature_position == 'center': | |
image_paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
# Save the document with the inserted text and image | |
doc.save(full_path) | |
return full_path | |
except Exception as e: | |
print("Error while inserting text:", e) | |
return jsonify({"error": str(e)}), 500 | |
def fetch_image(bucket_name: str, image_path: str): | |
try: | |
# Download file from Supabase storage | |
file_data = supabase.storage.from_(bucket_name).download(image_path) | |
# Use python-magic to detect MIME type from the file data | |
mime_type = magic.Magic(mime=True).from_buffer(file_data) | |
current_directory = os.path.dirname(os.path.abspath(__file__)) | |
os.makedirs('letterhead', exist_ok=True) | |
letterhead_image_path = image_path.split('/')[-1] + "." + mime_type.split('/')[-1] | |
full_letterhead_image_path = os.path.join(current_directory, "letterhead", letterhead_image_path) | |
with open(full_letterhead_image_path, 'wb') as f: | |
f.write(file_data) | |
return full_letterhead_image_path | |
except Exception as e: | |
print(f"Error: {e}") | |
return jsonify({"error": str(e)}), 500 | |
def delete_all_files(directory): | |
keep_file = "WARNING-DO-NOT-DELETE.txt" | |
try: | |
# Loop through each file in the directory | |
for filename in os.listdir(directory): | |
file_path = os.path.join(directory, filename) | |
# Check if it's a file and not the one to keep | |
if os.path.isfile(file_path) and filename != keep_file: | |
os.remove(file_path) | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return jsonify({"error": str(e)}), 500 | |
def letterhead(): | |
data = request.get_json() | |
try: | |
# Log the data instead of saving it to a file | |
logging.info("Received Data: %s", data) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
# Extract data | |
chat_settings = data.get("chatSettings") | |
profile = data.get("profile") | |
letterhead_data = data.get("letterheadData") | |
try: | |
file_name, file_data = get_file_by_id(chat_settings["letterheadFileId"]) | |
current_directory = os.path.dirname(os.path.abspath(__file__)) | |
full_letterhead_file_path = os.path.join(current_directory, "letterhead", file_name) | |
full_letterhead_signature_path = None | |
if (letterhead_data["includeSignature"] and (chat_settings["letterheadSignatureImagePath"])): | |
full_letterhead_signature_path = fetch_image("assistant_images", (chat_settings["letterheadSignatureImagePath"])) | |
text_to_insert = letterhead_data["letterheadContent"] | |
include_signature = letterhead_data["includeSignature"] | |
signature_position = letterhead_data["signaturePosition"] | |
letterhead_address = letterhead_data["letterheadAddress"] | |
with open(full_letterhead_file_path, "wb") as f: | |
if hasattr(file_data, "read"): | |
f.write(file_data.read()) | |
else: # If it's raw bytes | |
f.write(file_data) | |
modified_doc = insert_text_and_image_at_end(full_letterhead_file_path, full_letterhead_signature_path, text_to_insert, include_signature,signature_position, letterhead_address) | |
created_file = insert_file_record(profile["user_id"], modified_doc) | |
file_data = created_file.json() | |
file_data_json = json.loads(file_data) | |
print("file data: ", file_data); | |
print("file data json", file_data_json) | |
print("modified doc", modified_doc) | |
file_path = upload_file_to_storage(modified_doc, { | |
"name": file_data_json["data"][0]["name"], | |
"user_id": file_data_json["data"][0]["user_id"], | |
"file_id": file_data_json["data"][0]["id"], | |
}) | |
print("file path: ", file_path) | |
update_file_record(file_data_json["data"][0]["id"], {"file_path": file_path}) | |
current_directory = os.path.dirname(os.path.abspath(__file__)) | |
file_deleting_directory_path = os.path.join(current_directory, "letterhead") | |
delete_all_files(file_deleting_directory_path) | |
message = f"letterheadFileId:{file_data_json['data'][0]['id']} Your letterhead is successfully created." | |
return jsonify({ "message": message }), 200 | |
except ValueError as e: | |
return jsonify({"error": str(e)}), 404 | |
# except Exception as e: | |
# return jsonify({"error": str(e)}), 500 | |
if __name__ == '__main__': | |
app.run(debug=True) | |
print('working') |