Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile, Response, HTTPException | |
from fastapi.responses import JSONResponse, FileResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from PIL import Image | |
import io | |
import sqlite3 | |
from pydantic import BaseModel, EmailStr | |
from pathlib import Path | |
from model import YOLOModel | |
import shutil | |
from openpyxl import Workbook | |
from openpyxl.drawing.image import Image as ExcelImage | |
import os | |
yolo = YOLOModel() | |
UPLOAD_FOLDER = Path("./uploads") | |
UPLOAD_FOLDER.mkdir(exist_ok=True) | |
app = FastAPI() | |
cropped_images_dir = "cropped_images" | |
# Initialize SQLite database | |
def init_db(): | |
conn = sqlite3.connect('users.db') | |
c = conn.cursor() | |
c.execute(''' | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
firstName TEXT NOT NULL, | |
lastName TEXT NOT NULL, | |
country TEXT, | |
number TEXT, -- Phone number stored as TEXT to allow various formats | |
email TEXT UNIQUE NOT NULL, -- Email should be unique and non-null | |
password TEXT NOT NULL -- Password will be stored as a string (hashed ideally) | |
) | |
''') | |
conn.commit() | |
conn.close() | |
init_db() | |
class UserSignup(BaseModel): | |
firstName: str | |
lastName: str | |
country: str | |
number: str | |
email: EmailStr | |
password: str | |
class UserLogin(BaseModel): | |
email: str | |
password: str | |
async def signup(user_data: UserSignup): | |
try: | |
conn = sqlite3.connect('users.db') | |
c = conn.cursor() | |
# Check if user already exists | |
c.execute("SELECT * FROM users WHERE email = ?", (user_data.email,)) | |
if c.fetchone(): | |
raise HTTPException(status_code=400, detail="Email already registered") | |
# Insert new user | |
c.execute(""" | |
INSERT INTO users (firstName, lastName, country, number, email, password) | |
VALUES (?, ?, ?, ?, ?, ?) | |
""", (user_data.firstName, user_data.lastName, user_data.country, user_data.number, user_data.email, user_data.password)) | |
conn.commit() | |
conn.close() | |
return {"message": "User registered successfully", "email": user_data.email} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def login(user_data: UserLogin): | |
try: | |
conn = sqlite3.connect('users.db') | |
c = conn.cursor() | |
# Find user | |
c.execute("SELECT * FROM users WHERE email = ? AND password = ?", | |
(user_data.email, user_data.password)) | |
user = c.fetchone() | |
conn.close() | |
if not user: | |
raise HTTPException(status_code=401, detail="Invalid credentials") | |
return { | |
"message": "Login successful", | |
"user": { | |
"firstName": user[1], | |
"lastName": user[2], | |
"email": user[3] | |
} | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def upload_image(image: UploadFile = File(...)): | |
# print(f'\n\t\tUPLOADED!!!!') | |
try: | |
file_path = UPLOAD_FOLDER / image.filename | |
with file_path.open("wb") as buffer: | |
shutil.copyfileobj(image.file, buffer) | |
# print(f'Starting to pass into model, {file_path}') | |
# Perform YOLO inference | |
predictions = yolo.predict(str(file_path)) | |
print(f'\n\n\n{predictions}\n\n\ \n\t\t\t\tare predictions') | |
# Clean up uploaded file | |
file_path.unlink() # Remove file after processing | |
return JSONResponse(content={"items": predictions}) | |
except Exception as e: | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
def cleanup_images(directory: str): | |
"""Remove all images in the directory.""" | |
for file in Path(directory).glob("*"): | |
file.unlink() | |
async def generate_excel(predictions: list): | |
# Create an Excel workbook | |
workbook = Workbook() | |
sheet = workbook.active | |
sheet.title = "Predictions" | |
# Add headers | |
headers = ["Category", "Confidence", "Predicted Brand", "Price", "Details", "Detected Text", "Image"] | |
sheet.append(headers) | |
for idx, prediction in enumerate(predictions): | |
# Extract details from the prediction | |
category = prediction["category"] | |
confidence = prediction["confidence"] | |
predicted_brand = prediction["predicted_brand"] | |
price = prediction["price"] | |
details = prediction["details"] | |
detected_text = prediction["detected_text"] | |
cropped_image_path = prediction["image_path"] | |
# Append data row | |
sheet.append([category, confidence, predicted_brand, price, details, detected_text]) | |
# Add the image to the Excel file (if it exists) | |
if os.path.exists(cropped_image_path): | |
img = ExcelImage(cropped_image_path) | |
img.width, img.height = 50, 50 # Resize image to fit into the cell | |
sheet.add_image(img, f"G{idx + 2}") # Place in the "Image" column | |
excel_file_path = "predictions_with_images.xlsx" | |
workbook.save(excel_file_path) | |
# Cleanup after saving | |
cleanup_images(cropped_images_dir) | |
# Serve the Excel file as a response | |
return FileResponse( | |
excel_file_path, | |
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
filename="predictions_with_images.xlsx" | |
) | |
# code to accept the localhost to get images from | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["http://192.168.56.1:3000", "http://192.168.56.1:3001", "http://localhost:3000"], | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) |