|
from fastapi import FastAPI, File, Form, UploadFile |
|
from fastapi.responses import JSONResponse |
|
from google.cloud import storage |
|
import os |
|
import io |
|
import numpy as np |
|
import cv2 |
|
from PIL import Image |
|
import uuid |
|
import base64 |
|
|
|
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "./da-kalbe-63ee33c9cdbb.json" |
|
bucket_name = "da-kalbe-ml-result-png" |
|
storage_client = storage.Client() |
|
bucket = storage_client.bucket(bucket_name) |
|
|
|
app = FastAPI() |
|
|
|
|
|
def upload_to_gcs(image: Image, filename: str): |
|
"""Uploads an image to Google Cloud Storage.""" |
|
try: |
|
blob = bucket.blob(filename) |
|
image_buffer = io.BytesIO() |
|
image.save(image_buffer, format='PNG') |
|
image_buffer.seek(0) |
|
blob.upload_from_file(image_buffer, content_type='image/png') |
|
except Exception as e: |
|
return {'error': f"An unexpected error occurred: {e}"} |
|
|
|
def upload_folder_images(image_path, enhanced_image_path): |
|
|
|
folder_name = os.path.splitext(os.path.basename(image_path))[0] |
|
|
|
|
|
bucket.blob(folder_name + '/').upload_from_string('', content_type='application/x-www-form-urlencoded') |
|
|
|
|
|
original_image = Image.open(image_path) |
|
enhanced_image = Image.open(enhanced_image_path) |
|
|
|
|
|
upload_to_gcs(original_image, folder_name + '/' + 'original_image.png') |
|
upload_to_gcs(enhanced_image, folder_name + '/' + enhancement_type + '.png') |
|
|
|
def calculate_mse(original_image, enhanced_image): |
|
mse = np.mean((original_image - enhanced_image) ** 2) |
|
return mse |
|
|
|
def calculate_psnr(original_image, enhanced_image): |
|
mse = calculate_mse(original_image, enhanced_image) |
|
if mse == 0: |
|
return float('inf') |
|
max_pixel_value = 255.0 |
|
psnr = 20 * np.log10(max_pixel_value / np.sqrt(mse)) |
|
return psnr |
|
|
|
def calculate_maxerr(original_image, enhanced_image): |
|
maxerr = np.max((original_image - enhanced_image) ** 2) |
|
return maxerr |
|
|
|
def calculate_l2rat(original_image, enhanced_image): |
|
l2norm_ratio = np.sum(original_image ** 2) / np.sum((original_image - enhanced_image) ** 2) |
|
return l2norm_ratio |
|
|
|
def process_image(original_image, enhancement_type, fix_monochrome=True): |
|
if fix_monochrome and original_image.shape[-1] == 3: |
|
original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2GRAY) |
|
|
|
image = original_image - np.min(original_image) |
|
image = image / np.max(original_image) |
|
image = (image * 255).astype(np.uint8) |
|
|
|
enhanced_image = enhance_image(image, enhancement_type) |
|
|
|
mse = calculate_mse(original_image, enhanced_image) |
|
psnr = calculate_psnr(original_image, enhanced_image) |
|
maxerr = calculate_maxerr(original_image, enhanced_image) |
|
l2rat = calculate_l2rat(original_image, enhanced_image) |
|
|
|
return enhanced_image, mse, psnr, maxerr, l2rat |
|
|
|
def apply_clahe(image): |
|
clahe = cv2.createCLAHE(clipLimit=40.0, tileGridSize=(8, 8)) |
|
return clahe.apply(image) |
|
|
|
def invert(image): |
|
return cv2.bitwise_not(image) |
|
|
|
def hp_filter(image, kernel=None): |
|
if kernel is None: |
|
kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) |
|
return cv2.filter2D(image, -1, kernel) |
|
|
|
def unsharp_mask(image, radius=5, amount=2): |
|
def usm(image, radius, amount): |
|
blurred = cv2.GaussianBlur(image, (0, 0), radius) |
|
sharpened = cv2.addWeighted(image, 1.0 + amount, blurred, -amount, 0) |
|
return sharpened |
|
return usm(image, radius, amount) |
|
|
|
def hist_eq(image): |
|
return cv2.equalizeHist(image) |
|
|
|
def enhance_image(image, enhancement_type): |
|
if enhancement_type == "Invert": |
|
return invert(image) |
|
elif enhancement_type == "High Pass Filter": |
|
return hp_filter(image) |
|
elif enhancement_type == "Unsharp Masking": |
|
return unsharp_mask(image) |
|
elif enhancement_type == "Histogram Equalization": |
|
return hist_eq(image) |
|
elif enhancement_type == "CLAHE": |
|
return apply_clahe(image) |
|
else: |
|
raise ValueError(f"Unknown enhancement type: {enhancement_type}") |
|
|
|
@app.post("/process_image") |
|
async def process_image_api(image: UploadFile = File(...), enhancement_type: str = Form(...)): |
|
"""Processes an uploaded image and returns the enhanced image and metrics.""" |
|
|
|
if not image: |
|
return JSONResponse(status_code=400, content={'error': 'No image file provided'}) |
|
|
|
allowed_extensions = {'png', 'jpg', 'jpeg'} |
|
if '.' not in image.filename or image.filename.split('.')[-1].lower() not in allowed_extensions: |
|
return JSONResponse(status_code=400, content={'error': 'Invalid image file'}) |
|
|
|
try: |
|
|
|
image_pil = Image.open(image.file).convert('RGB') |
|
|
|
|
|
image_np = np.array(image_pil) |
|
|
|
|
|
enhanced_image, mse, psnr, maxerr, l2rat = process_image(image_np, enhancement_type) |
|
|
|
|
|
enhanced_image_pil = Image.fromarray(enhanced_image) |
|
|
|
|
|
image_buffer = io.BytesIO() |
|
enhanced_image_pil.save(image_buffer, format='PNG') |
|
image_buffer.seek(0) |
|
|
|
|
|
image_base64 = base64.b64encode(image_buffer.getvalue()).decode('utf-8') |
|
|
|
response = { |
|
'message': 'Image processed successfully!', |
|
'processed_image': image_base64, |
|
'mse': float(mse), |
|
'psnr': float(psnr), |
|
'maxerr': float(maxerr), |
|
'l2rat': float(l2rat) |
|
} |
|
upload_folder_images(image, enhanced_image_pil) |
|
return JSONResponse(status_code=200, content=response) |
|
|
|
except Exception as e: |
|
return JSONResponse(status_code=500, content={'error': f'Error processing image: {str(e)}'}) |
|
|