Backg / app.py
LapStore
make better error handling
51a8240
raw
history blame
6.44 kB
from fastapi import FastAPI ,Request ,Form, UploadFile, File
from fastapi.responses import HTMLResponse, FileResponse,StreamingResponse,JSONResponse
import os
import io
from PIL import ImageOps,Image ,ImageFilter
#from transformers import pipeline
import matplotlib.pyplot as plt
import numpy as np
import ast
from server import *
import cv2
from typing import Optional
import base64
from fastapi import FastAPI, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from fastapi.responses import JSONResponse
#http://localhost:8000
app = FastAPI()
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
# Register the Limiter with FastAPI
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
response = await call_next(request)
return response
@app.exception_handler(429)
async def rate_limit_exceeded(request: Request, exc):
'''
return JSONResponse(
status_code=429,
content={"detail": "Too Many Requests ,please just try 3 times per hour"},
)'''
return JSONResponse(
status_code=429,
content={
"detail": "Too Many Requests ,please just try 3 times per hour",
"status": 0
})
# Root route
@app.get('/')
def main():
return "Hello From Background remover !"
@app.post("/items")
@limiter.limit("3/hour") # must have parameter request
async def read_items(request: Request, type_of_filters: str = Form(...)):
return {"items": str(type_of_filters)}
@app.post('/imageStep1')
@limiter.limit("1/hour") # must have parameter request
async def image_step1(request: Request,image_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),type_of_filters: str = Form(...), blur_radius: str = Form(...)):
#return {"type ":type_of_filters ,"radius":blur_radius,"image":image_file,"back":background_image}
#type_of_filters : cam / ...remove / ...back
input_to_type_of_filters = None
if background_image and background_image.filename:
contents__back = await background_image.read()
image_back = Image.open(io.BytesIO(contents__back))
input_to_type_of_filters = image_back
else:
input_to_type_of_filters = type_of_filters
contents_img = await image_file.read()
image = Image.open(io.BytesIO(contents_img))
output_step1 =SegmenterBackground().Back_step1(image,input_to_type_of_filters,int(blur_radius))
if (output_step1[-1] == 0):
return {
"detail": output_step1[0],
"status": output_step1[-1]
}
produced_image = output_step1[0]
'''
# Save the processed image to a temporary file
#output_file_path_tmp = "/tmp/tmp_processed_image.png"
#produced_image.save(output_file_path_tmp)
# return FileResponse(output_file_path_tmp, media_type='image/png', filename="/tmp/tmp_processed_image.png")
'''
# Convert the image to base64 to return it in the response
buffered = io.BytesIO()
produced_image.save(buffered, format="PNG")
encoded_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Returning both text and the base64 image
return {
"message": output_step1[1],
"image_base64": encoded_img,
"status": output_step1[-1]
}
@app.post('/imageStep2')
async def image_step2(image_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),type_of_filters: str = Form(...),
things_replace: str = Form(...), blur_radius: str = Form(...)):
#things_replace : from what detected.
things_replace=ast.literal_eval(things_replace)
blur_radius=int(blur_radius)
input_to_type_of_filters=None
if background_image and background_image.filename:
contents__back = await background_image.read()
image_back = Image.open(io.BytesIO(contents__back))
input_to_type_of_filters = image_back
else:
input_to_type_of_filters = type_of_filters
contents = await image_file.read()
image = Image.open(io.BytesIO(contents))
produced_image=SegmenterBackground().Back_step2(image,input_to_type_of_filters,things_replace,int(blur_radius))
# Save the processed image to a temporary file
output_file_path_tmp = "/tmp/tmp_processed_image.png"
produced_image.save(output_file_path_tmp)
# Return the processed image for download
return FileResponse(output_file_path_tmp, media_type='image/png', filename="/tmp/tmp_processed_image.png")
@app.post('/Video')
async def Video(video_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),kind_back: str = Form(...)
,type_of_filters: str = Form(...),blur_radius: str = Form(...)):#--->,background_image: UploadFile = File(...)):
#video_data = await video_file.read()
#nparr = np.frombuffer(video_data, np.uint8)
#video_path=cv2.imdecode(nparr, cv2.IMREAD_COLOR) #named this as just passed as it's path
blur_radius=int(blur_radius)
kind_back=ast.literal_eval(kind_back)
input_to_type_of_filters=None
if background_image and background_image.filename:
contents__back = await background_image.read()
image_back = Image.open(io.BytesIO(contents__back))
input_to_type_of_filters = image_back
else:
input_to_type_of_filters = type_of_filters
input_path_toWrite = f'/tmp/tmp_imput.avi'#{video_file.filename}
output_path = '/tmp/tmp_output.avi'
with open(input_path_toWrite, 'wb') as f:
f.write(await video_file.read())
#--------> mp4? (when tried ,worked on it although) sound??
SegmenterBackground().Back_video(input_path_toWrite, output_path,input_to_type_of_filters,kind_back,blur_radius)#video,background_image,what_remove,blur_radius=23)
return StreamingResponse(open(output_path, "rb"), media_type="video/avi")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)