from fastapi import FastAPI ,Request ,Form, UploadFile, File from fastapi.responses import HTMLResponse, FileResponse,StreamingResponse,JSONResponse import os import io from PIL import ImageOps,Image ,ImageFilter #from transformers import pipeline import matplotlib.pyplot as plt import numpy as np import ast from server import * import cv2 from typing import Optional import base64 from fastapi import FastAPI, Request from slowapi import Limiter from slowapi.util import get_remote_address from fastapi.responses import JSONResponse #http://localhost:8000 app = FastAPI() limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter # Register the Limiter with FastAPI @app.middleware("http") async def add_process_time_header(request: Request, call_next): response = await call_next(request) return response @app.exception_handler(429) async def rate_limit_exceeded(request: Request, exc): ''' return JSONResponse( status_code=429, content={"detail": "Too Many Requests ,please just try 3 times per hour"}, )''' return JSONResponse( status_code=429, content={ "detail": "Too Many Requests ,please just try 3 times per hour", "status": 0 }) # Root route @app.get('/') def main(): return "Hello From Background remover !" @app.post("/items") @limiter.limit("3/hour") # must have parameter request async def read_items(request: Request, type_of_filters: str = Form(...)): return {"items": str(type_of_filters)} @app.post('/imageStep1') @limiter.limit("1/hour") # must have parameter request async def image_step1(request: Request,image_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),type_of_filters: str = Form(...), blur_radius: str = Form(...)): #return {"type ":type_of_filters ,"radius":blur_radius,"image":image_file,"back":background_image} #type_of_filters : cam / ...remove / ...back input_to_type_of_filters = None if background_image and background_image.filename: contents__back = await background_image.read() image_back = Image.open(io.BytesIO(contents__back)) input_to_type_of_filters = image_back else: input_to_type_of_filters = type_of_filters contents_img = await image_file.read() image = Image.open(io.BytesIO(contents_img)) output_step1 =SegmenterBackground().Back_step1(image,input_to_type_of_filters,int(blur_radius)) if (output_step1[-1] == 0): return { "detail": output_step1[0], "status": output_step1[-1] } produced_image = output_step1[0] ''' # Save the processed image to a temporary file #output_file_path_tmp = "/tmp/tmp_processed_image.png" #produced_image.save(output_file_path_tmp) # return FileResponse(output_file_path_tmp, media_type='image/png', filename="/tmp/tmp_processed_image.png") ''' # Convert the image to base64 to return it in the response buffered = io.BytesIO() produced_image.save(buffered, format="PNG") encoded_img = base64.b64encode(buffered.getvalue()).decode("utf-8") # Returning both text and the base64 image return { "message": output_step1[1], "image_base64": encoded_img, "status": output_step1[-1] } @app.post('/imageStep2') async def image_step2(image_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),type_of_filters: str = Form(...), things_replace: str = Form(...), blur_radius: str = Form(...)): #things_replace : from what detected. things_replace=ast.literal_eval(things_replace) blur_radius=int(blur_radius) input_to_type_of_filters=None if background_image and background_image.filename: contents__back = await background_image.read() image_back = Image.open(io.BytesIO(contents__back)) input_to_type_of_filters = image_back else: input_to_type_of_filters = type_of_filters contents = await image_file.read() image = Image.open(io.BytesIO(contents)) produced_image=SegmenterBackground().Back_step2(image,input_to_type_of_filters,things_replace,int(blur_radius)) # Save the processed image to a temporary file output_file_path_tmp = "/tmp/tmp_processed_image.png" produced_image.save(output_file_path_tmp) # Return the processed image for download return FileResponse(output_file_path_tmp, media_type='image/png', filename="/tmp/tmp_processed_image.png") @app.post('/Video') async def Video(video_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),kind_back: str = Form(...) ,type_of_filters: str = Form(...),blur_radius: str = Form(...)):#--->,background_image: UploadFile = File(...)): #video_data = await video_file.read() #nparr = np.frombuffer(video_data, np.uint8) #video_path=cv2.imdecode(nparr, cv2.IMREAD_COLOR) #named this as just passed as it's path blur_radius=int(blur_radius) kind_back=ast.literal_eval(kind_back) input_to_type_of_filters=None if background_image and background_image.filename: contents__back = await background_image.read() image_back = Image.open(io.BytesIO(contents__back)) input_to_type_of_filters = image_back else: input_to_type_of_filters = type_of_filters input_path_toWrite = f'/tmp/tmp_imput.avi'#{video_file.filename} output_path = '/tmp/tmp_output.avi' with open(input_path_toWrite, 'wb') as f: f.write(await video_file.read()) #--------> mp4? (when tried ,worked on it although) sound?? SegmenterBackground().Back_video(input_path_toWrite, output_path,input_to_type_of_filters,kind_back,blur_radius)#video,background_image,what_remove,blur_radius=23) return StreamingResponse(open(output_path, "rb"), media_type="video/avi") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)