from flask import Flask, request, jsonify |
import os |
from hiou import hugging |
import json |
import io |
import random |
import string |
from mimetypes import guess_type |
import requests |
import time |
from PIL import Image |
def compress_image(input_stream: io.BytesIO, max_size_mb=4) -> io.BytesIO: |
with Image.open(input_stream) as img: |
img = Image.new(img.mode, img.size) |
img.paste(img) |
quality = 95 |
step = 5 |
output_stream = io.BytesIO() |
while True: |
output_stream.seek(0) |
img.save(output_stream, format="JPEG", quality=quality) |
output_stream_size_mb = len(output_stream.getvalue()) / (1024 * 1024) |
if output_stream_size_mb <= max_size_mb: |
output_stream.seek(0) |
print(f"Image compressed to {output_stream_size_mb:.2f} MB") |
return output_stream |
quality -= step |
if quality < step: |
print("Cannot compress the image to the desired size.") |
output_stream.seek(0) |
return output_stream |
def read_image_from_stream(image_data, subscription_key, endpoint): |
headers = { |
'Ocp-Apim-Subscription-Key': subscription_key, |
'Content-Type': 'application/octet-stream' |
} |
img = image_data.read() |
response = requests.post(f"{endpoint}/vision/v3.2/read/analyze", headers=headers, data=img) |
if response.status_code == 202: |
read_response_headers = response.headers |
operation_location = read_response_headers["Operation-Location"] |
else: |
raise Exception(f"Unexpected response status: {response.status_code}, {response.text}") |
return operation_location |
def get_read_result(operation_location, subscription_key): |
headers = { |
'Ocp-Apim-Subscription-Key': subscription_key |
} |
while True: |
response = requests.get(operation_location, headers=headers) |
if response.status_code == 200: |
read_result = response.json() |
status = read_result['status'] |
if status == 'succeeded': |
break |
elif status in ['failed', 'notStarted', 'running']: |
time.sleep(1) |
else: |
raise Exception(f"Unexpected status: {status}") |
else: |
raise Exception(f"Unexpected response status: {response.status_code}") |
return read_result |
def process_image(image_path, subscription_key, endpoint): |
operation_location = read_image_from_stream(image_path, subscription_key, endpoint) |
operation_id = operation_location.split("/")[-1] |
operation_location = f"{endpoint}/vision/v3.2/read/analyzeResults/{operation_id}" |
read_result = get_read_result(operation_location, subscription_key) |
if read_result['status'] == 'succeeded': |
output = [] |
for text_result in read_result['analyzeResult']['readResults']: |
for line in text_result['lines']: |
output.append(line['text']) |
return " ".join(output).replace("\n", " ") |
else: |
return "Processing failed or did not succeed." |
def get_answer(image: io.BytesIO, question: str): |
filename = image.name |
spid = os.getenv('SPID') |
hug = hugging(spid) |
filelink = hug.upload(image) |
image.seek(0, 2) |
file_size = image.tell() |
image.seek(0) |
image_mime_type, _ = guess_type(image.name) |
data = [ |
{ |
"meta": { |
"_type": "gradio.FileData" |
}, |
"mime_type": image_mime_type, |
"orig_name": filename, |
"path": filelink.split("=", 1)[1], |
"size": file_size, |
"url": filelink |
} |
] |
datas = os.getenv('DATA') |
jsnd = json.loads(datas) |
data.append(jsnd) |
data.append( |
question |
) |
data.append( |
{ |
"tab_index": 0 |
} |
) |
hug.filnal_setup(data, 2, 15) |
hug.start() |
return hug.output.get("data")[0] |
def valuate_qna(p: str, r: str) -> str: |
spid2 = os.getenv('SPID2') |
hug = hugging(spid2) |
data = [ |
p, |
r |
] |
hug.filnal_setup(data, 0, 12) |
hug.start() |
return hug.output.get("data", [None])[0] |
def ocr(image: io.BytesIO) -> str: |
return process_image(image, os.getenv('AZKEY'), os.getenv('AZURL')) |
app = Flask(__name__) |
@app.route('/getans', methods=['POST']) |
def gettans_task(): |
if 'file' not in request.files: |
if len(request.data) > 0: |
pass |
else: |
return jsonify({"status" : False, "msg" : "No file found"}), 400 |
if 'file' in request.files: |
file = request.files['file'] |
if file.filename == '': |
return jsonify({"error": "No selected file"}), 400 |
file_content = io.BytesIO(file.read()) |
file_content.name = file.filename |
else: |
rawFile = request.data |
file_content = io.BytesIO(rawFile) |
filename = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) + ".png" |
file_content.name = request.args.get("name", filename) |
headers = dict(request.headers) |
if not headers.get("KEY") != os.getenv("KEY"): |
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404 |
print(f"QUESTION ASKED : {headers.get('QU', '')}") |
answer = get_answer(file_content, headers.get('QU', '')) |
return jsonify({"status" : True, "ANS" : answer}) |
@app.route('/ocr', methods=['POST']) |
def task_ocr(): |
if 'file' not in request.files: |
if len(request.data) > 0: |
pass |
else: |
return jsonify({"status" : False, "msg" : "No file found"}), 400 |
if 'file' in request.files: |
file = request.files['file'] |
if file.filename == '': |
return jsonify({"error": "No selected file"}), 400 |
file_content = io.BytesIO(file.read()) |
file_content.name = file.filename |
else: |
rawFile = request.data |
file_content = io.BytesIO(rawFile) |
filename = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) + ".png" |
file_content.name = request.args.get("name", filename) |
headers = dict(request.headers) |
if not headers.get("KEY") != os.getenv("KEY"): |
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404 |
size = len(file_content.getvalue()) |
if size > 4194304: |
return jsonify({"status" : False, "msg" : "File size is greater then 4MB"}), 404 |
answer = ocr(file_content) |
return jsonify({"status" : True, "data" : answer}) |
@app.route('/check', methods=['POST', 'GET']) |
def check_f(): |
jsndata = request.json |
p = jsndata.get("p") |
r = jsndata.get("r") |
headers = dict(request.headers) |
if not headers.get("KEY") != os.getenv("KEY"): |
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404 |
data = valuate_qna(p, r) |
return jsonify( |
{ |
"status" : True, |
"data" : data |
} |
) |
@app.route('/') |
def home(): |
return jsonify({"message": "Welcome to my Flask API!"}) |
@app.route('/confirm') |
def confirm_product(): |
return jsonify({'status' : True}) |