letterhead / analyzePdf.py
gosign's picture
Update analyzePdf.py
f19b84d verified
import json
import os
import magic
from dotenv import load_dotenv
from docx import Document
from docx.shared import Inches
from docx.enum.text import WD_ALIGN_PARAGRAPH
from flask_cors import CORS
from flask import Flask, request, jsonify
from supabase import create_client, Client
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
# load_dotenv(dotenv_path='.env.local')
load_dotenv()
app = Flask(__name__)
CORS(app)
url: str = 'https://dtzuqtvroalrjhgdcowq.supabase.co/'
key: str = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImR0enVxdHZyb2FscmpoZ2Rjb3dxIiwicm9sZSI6ImFub24iLCJpYXQiOjE3MjU0NDk3MzIsImV4cCI6MjA0MTAyNTczMn0.WrIvwEOq4CqCb8IkU8G4jiWkf9DM1JxGd2_aTN4vlV4'
supabase: Client = create_client(url, key)
def get_file_by_id(file_id):
try:
response = supabase.table("files").select("*").eq("id", file_id).single().execute()
file = response.data
if not file:
raise ValueError(response.error.message if response.error else "File not found.")
file_path = file.get("file_path")
file_name = file.get("name")
if not file_path:
raise ValueError("File path is missing in the metadata.")
# Fetch the actual file content from Supabase storage
file_data = supabase.storage.from_('files').download(file_path)
return file_name, file_data
except Exception as e:
print("Error fetching file:", e)
return jsonify({"error": str(e)}), 500
def get_file_type(file_path):
try:
# Use python-magic to detect the MIME type of the file
mime = magic.Magic(mime=True)
file_type = mime.from_file(file_path)
return file_type
except Exception as e:
print("Error fetching file:", e)
return jsonify({"error": str(e)}), 500
def insert_file_record(user_id, doc):
try:
file_type = get_file_type(doc)
file_record = {
"user_id": user_id,
"description": "",
"file_path": "",
"name": "letterhead-" + os.path.basename(doc),
"size": os.path.getsize(doc),
"tokens": 0,
"type": file_type,
}
response = supabase.table("files").insert(file_record).execute()
return response
except Exception as e:
print("Error fetching file:", e)
return jsonify({"error": str(e)}), 500
def upload_file_to_storage(file, metadata):
# Replace with the actual upload implementation
file_path = f"{metadata['user_id']}/{metadata['file_id']}"
# file_content = file.read() # Read the file content as bytes
file_type = get_file_type(file)
try:
with open(file, 'rb') as f:
response = supabase.storage.from_("files").upload(
file=f,
path=file_path,
file_options={"cache-control": "3600", "content-type": file_type, "upsert": "false"},
)
file_path = response.path
return file_path
except Exception as e:
print("Error uploading file:", e)
return jsonify({"error": str(e)}), 500
def update_file_record(file_id, updates):
try:
response = supabase.table("files").update(updates).eq("id", file_id).execute()
return response
except Exception as e:
print("Error while updating record:", e)
return jsonify({"error": str(e)}), 500
def insert_text_and_image_at_end(full_path, full_image_path, text_to_insert, include_signature, signature_position, letterhead_address):
try:
doc = Document(full_path)
# Replace placeholder <<SENDER_ADDRESS>> with the letterhead address
for paragraph in doc.paragraphs:
if ("<<SENDER_ADDRESS>>" in paragraph.text and letterhead_address):
for run in paragraph.runs:
run.text = run.text.replace("<<SENDER_ADDRESS>>", letterhead_address)
# Add the new text at the end of the document
doc.add_paragraph(text_to_insert)
# Add the image at the end of the document with position adjustment
if (include_signature and full_image_path):
image_paragraph = doc.add_paragraph()
run = image_paragraph.add_run()
run.add_picture(full_image_path, width=Inches(1), height=Inches(1))
# Adjust the alignment based on signature_position
if signature_position == 'left':
image_paragraph.alignment = WD_ALIGN_PARAGRAPH.LEFT
elif signature_position == 'right':
image_paragraph.alignment = WD_ALIGN_PARAGRAPH.RIGHT
elif signature_position == 'center':
image_paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Save the document with the inserted text and image
doc.save(full_path)
return full_path
except Exception as e:
print("Error while inserting text:", e)
return jsonify({"error": str(e)}), 500
def fetch_image(bucket_name: str, image_path: str):
try:
# Download file from Supabase storage
file_data = supabase.storage.from_(bucket_name).download(image_path)
# Use python-magic to detect MIME type from the file data
mime_type = magic.Magic(mime=True).from_buffer(file_data)
current_directory = os.path.dirname(os.path.abspath(__file__))
os.makedirs('letterhead', exist_ok=True)
letterhead_image_path = image_path.split('/')[-1] + "." + mime_type.split('/')[-1]
full_letterhead_image_path = os.path.join(current_directory, "letterhead", letterhead_image_path)
with open(full_letterhead_image_path, 'wb') as f:
f.write(file_data)
return full_letterhead_image_path
except Exception as e:
print(f"Error: {e}")
return jsonify({"error": str(e)}), 500
def delete_all_files(directory):
keep_file = "WARNING-DO-NOT-DELETE.txt"
try:
# Loop through each file in the directory
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
# Check if it's a file and not the one to keep
if os.path.isfile(file_path) and filename != keep_file:
os.remove(file_path)
except Exception as e:
print(f"An error occurred: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/api/letterhead", methods=["POST"])
def letterhead():
data = request.get_json()
try:
# Log the data instead of saving it to a file
logging.info("Received Data: %s", data)
except Exception as e:
return jsonify({"error": str(e)}), 500
# Extract data
chat_settings = data.get("chatSettings")
profile = data.get("profile")
letterhead_data = data.get("letterheadData")
try:
file_name, file_data = get_file_by_id(chat_settings["letterheadFileId"])
current_directory = os.path.dirname(os.path.abspath(__file__))
full_letterhead_file_path = os.path.join(current_directory, "letterhead", file_name)
full_letterhead_signature_path = None
if (letterhead_data["includeSignature"] and (chat_settings["letterheadSignatureImagePath"])):
full_letterhead_signature_path = fetch_image("assistant_images", (chat_settings["letterheadSignatureImagePath"]))
text_to_insert = letterhead_data["letterheadContent"]
include_signature = letterhead_data["includeSignature"]
signature_position = letterhead_data["signaturePosition"]
letterhead_address = letterhead_data["letterheadAddress"]
with open(full_letterhead_file_path, "wb") as f:
if hasattr(file_data, "read"):
f.write(file_data.read())
else: # If it's raw bytes
f.write(file_data)
modified_doc = insert_text_and_image_at_end(full_letterhead_file_path, full_letterhead_signature_path, text_to_insert, include_signature,signature_position, letterhead_address)
created_file = insert_file_record(profile["user_id"], modified_doc)
file_data = created_file.json()
file_data_json = json.loads(file_data)
print("file data: ", file_data);
print("file data json", file_data_json)
print("modified doc", modified_doc)
file_path = upload_file_to_storage(modified_doc, {
"name": file_data_json["data"][0]["name"],
"user_id": file_data_json["data"][0]["user_id"],
"file_id": file_data_json["data"][0]["id"],
})
print("file path: ", file_path)
update_file_record(file_data_json["data"][0]["id"], {"file_path": file_path})
current_directory = os.path.dirname(os.path.abspath(__file__))
file_deleting_directory_path = os.path.join(current_directory, "letterhead")
delete_all_files(file_deleting_directory_path)
message = f"letterheadFileId:{file_data_json['data'][0]['id']} Your letterhead is successfully created."
return jsonify({ "message": message }), 200
except ValueError as e:
return jsonify({"error": str(e)}), 404
# except Exception as e:
# return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)
print('working')