import json import os import magic from dotenv import load_dotenv from docx import Document from docx.shared import Inches from docx.enum.text import WD_ALIGN_PARAGRAPH from flask_cors import CORS from flask import Flask, request, jsonify from supabase import create_client, Client import logging # Configure logging logging.basicConfig(level=logging.INFO) # load_dotenv(dotenv_path='.env.local') load_dotenv() app = Flask(__name__) CORS(app) url: str = 'https://dtzuqtvroalrjhgdcowq.supabase.co/' key: str = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImR0enVxdHZyb2FscmpoZ2Rjb3dxIiwicm9sZSI6ImFub24iLCJpYXQiOjE3MjU0NDk3MzIsImV4cCI6MjA0MTAyNTczMn0.WrIvwEOq4CqCb8IkU8G4jiWkf9DM1JxGd2_aTN4vlV4' supabase: Client = create_client(url, key) def get_file_by_id(file_id): try: response = supabase.table("files").select("*").eq("id", file_id).single().execute() file = response.data if not file: raise ValueError(response.error.message if response.error else "File not found.") file_path = file.get("file_path") file_name = file.get("name") if not file_path: raise ValueError("File path is missing in the metadata.") # Fetch the actual file content from Supabase storage file_data = supabase.storage.from_('files').download(file_path) return file_name, file_data except Exception as e: print("Error fetching file:", e) return jsonify({"error": str(e)}), 500 def get_file_type(file_path): try: # Use python-magic to detect the MIME type of the file mime = magic.Magic(mime=True) file_type = mime.from_file(file_path) return file_type except Exception as e: print("Error fetching file:", e) return jsonify({"error": str(e)}), 500 def insert_file_record(user_id, doc): try: file_type = get_file_type(doc) file_record = { "user_id": user_id, "description": "", "file_path": "", "name": "letterhead-" + os.path.basename(doc), "size": os.path.getsize(doc), "tokens": 0, "type": file_type, } response = supabase.table("files").insert(file_record).execute() return response except Exception as e: print("Error fetching file:", e) return jsonify({"error": str(e)}), 500 def upload_file_to_storage(file, metadata): # Replace with the actual upload implementation file_path = f"{metadata['user_id']}/{metadata['file_id']}" # file_content = file.read() # Read the file content as bytes file_type = get_file_type(file) try: with open(file, 'rb') as f: response = supabase.storage.from_("files").upload( file=f, path=file_path, file_options={"cache-control": "3600", "content-type": file_type, "upsert": "false"}, ) file_path = response.path return file_path except Exception as e: print("Error uploading file:", e) return jsonify({"error": str(e)}), 500 def update_file_record(file_id, updates): try: response = supabase.table("files").update(updates).eq("id", file_id).execute() return response except Exception as e: print("Error while updating record:", e) return jsonify({"error": str(e)}), 500 def insert_text_and_image_at_end(full_path, full_image_path, text_to_insert, include_signature, signature_position, letterhead_address): try: doc = Document(full_path) # Replace placeholder <> with the letterhead address for paragraph in doc.paragraphs: if ("<>" in paragraph.text and letterhead_address): for run in paragraph.runs: run.text = run.text.replace("<>", letterhead_address) # Add the new text at the end of the document doc.add_paragraph(text_to_insert) # Add the image at the end of the document with position adjustment if (include_signature and full_image_path): image_paragraph = doc.add_paragraph() run = image_paragraph.add_run() run.add_picture(full_image_path, width=Inches(1), height=Inches(1)) # Adjust the alignment based on signature_position if signature_position == 'left': image_paragraph.alignment = WD_ALIGN_PARAGRAPH.LEFT elif signature_position == 'right': image_paragraph.alignment = WD_ALIGN_PARAGRAPH.RIGHT elif signature_position == 'center': image_paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER # Save the document with the inserted text and image doc.save(full_path) return full_path except Exception as e: print("Error while inserting text:", e) return jsonify({"error": str(e)}), 500 def fetch_image(bucket_name: str, image_path: str): try: # Download file from Supabase storage file_data = supabase.storage.from_(bucket_name).download(image_path) # Use python-magic to detect MIME type from the file data mime_type = magic.Magic(mime=True).from_buffer(file_data) current_directory = os.path.dirname(os.path.abspath(__file__)) os.makedirs('letterhead', exist_ok=True) letterhead_image_path = image_path.split('/')[-1] + "." + mime_type.split('/')[-1] full_letterhead_image_path = os.path.join(current_directory, "letterhead", letterhead_image_path) with open(full_letterhead_image_path, 'wb') as f: f.write(file_data) return full_letterhead_image_path except Exception as e: print(f"Error: {e}") return jsonify({"error": str(e)}), 500 def delete_all_files(directory): keep_file = "WARNING-DO-NOT-DELETE.txt" try: # Loop through each file in the directory for filename in os.listdir(directory): file_path = os.path.join(directory, filename) # Check if it's a file and not the one to keep if os.path.isfile(file_path) and filename != keep_file: os.remove(file_path) except Exception as e: print(f"An error occurred: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/letterhead", methods=["POST"]) def letterhead(): data = request.get_json() try: # Log the data instead of saving it to a file logging.info("Received Data: %s", data) except Exception as e: return jsonify({"error": str(e)}), 500 # Extract data chat_settings = data.get("chatSettings") profile = data.get("profile") letterhead_data = data.get("letterheadData") try: file_name, file_data = get_file_by_id(chat_settings["letterheadFileId"]) current_directory = os.path.dirname(os.path.abspath(__file__)) full_letterhead_file_path = os.path.join(current_directory, "letterhead", file_name) full_letterhead_signature_path = None if (letterhead_data["includeSignature"] and (chat_settings["letterheadSignatureImagePath"])): full_letterhead_signature_path = fetch_image("assistant_images", (chat_settings["letterheadSignatureImagePath"])) text_to_insert = letterhead_data["letterheadContent"] include_signature = letterhead_data["includeSignature"] signature_position = letterhead_data["signaturePosition"] letterhead_address = letterhead_data["letterheadAddress"] with open(full_letterhead_file_path, "wb") as f: if hasattr(file_data, "read"): f.write(file_data.read()) else: # If it's raw bytes f.write(file_data) modified_doc = insert_text_and_image_at_end(full_letterhead_file_path, full_letterhead_signature_path, text_to_insert, include_signature,signature_position, letterhead_address) created_file = insert_file_record(profile["user_id"], modified_doc) file_data = created_file.json() file_data_json = json.loads(file_data) print("file data: ", file_data); print("file data json", file_data_json) print("modified doc", modified_doc) file_path = upload_file_to_storage(modified_doc, { "name": file_data_json["data"][0]["name"], "user_id": file_data_json["data"][0]["user_id"], "file_id": file_data_json["data"][0]["id"], }) print("file path: ", file_path) update_file_record(file_data_json["data"][0]["id"], {"file_path": file_path}) current_directory = os.path.dirname(os.path.abspath(__file__)) file_deleting_directory_path = os.path.join(current_directory, "letterhead") delete_all_files(file_deleting_directory_path) message = f"letterheadFileId:{file_data_json['data'][0]['id']} Your letterhead is successfully created." return jsonify({ "message": message }), 200 except ValueError as e: return jsonify({"error": str(e)}), 404 # except Exception as e: # return jsonify({"error": str(e)}), 500 if __name__ == '__main__': app.run(debug=True) print('working')