from flask import Flask, request, jsonify import os from hiou import hugging import json import io import random import string from mimetypes import guess_type import requests import time from PIL import Image def compress_image(input_stream: io.BytesIO, max_size_mb=4) -> io.BytesIO: with Image.open(input_stream) as img: img = Image.new(img.mode, img.size) img.paste(img) quality = 95 step = 5 output_stream = io.BytesIO() while True: output_stream.seek(0) img.save(output_stream, format="JPEG", quality=quality) output_stream_size_mb = len(output_stream.getvalue()) / (1024 * 1024) if output_stream_size_mb <= max_size_mb: output_stream.seek(0) print(f"Image compressed to {output_stream_size_mb:.2f} MB") return output_stream quality -= step if quality < step: print("Cannot compress the image to the desired size.") output_stream.seek(0) return output_stream def read_image_from_stream(image_data, subscription_key, endpoint): headers = { 'Ocp-Apim-Subscription-Key': subscription_key, 'Content-Type': 'application/octet-stream' } img = image_data.read() response = requests.post(f"{endpoint}/vision/v3.2/read/analyze", headers=headers, data=img) if response.status_code == 202: read_response_headers = response.headers operation_location = read_response_headers["Operation-Location"] else: raise Exception(f"Unexpected response status: {response.status_code}, {response.text}") return operation_location def get_read_result(operation_location, subscription_key): headers = { 'Ocp-Apim-Subscription-Key': subscription_key } while True: response = requests.get(operation_location, headers=headers) if response.status_code == 200: # HTTP 200 indicates success read_result = response.json() status = read_result['status'] if status == 'succeeded': break elif status in ['failed', 'notStarted', 'running']: time.sleep(1) # Wait before polling again else: raise Exception(f"Unexpected status: {status}") else: raise Exception(f"Unexpected response status: {response.status_code}") return read_result def process_image(image_path, subscription_key, endpoint): operation_location = read_image_from_stream(image_path, subscription_key, endpoint) operation_id = operation_location.split("/")[-1] operation_location = f"{endpoint}/vision/v3.2/read/analyzeResults/{operation_id}" read_result = get_read_result(operation_location, subscription_key) if read_result['status'] == 'succeeded': output = [] for text_result in read_result['analyzeResult']['readResults']: for line in text_result['lines']: output.append(line['text']) return " ".join(output).replace("\n", " ") # Join lines and replace newlines with spaces else: return "Processing failed or did not succeed." def get_answer(image: io.BytesIO, question: str): filename = image.name spid = os.getenv('SPID') hug = hugging(spid) filelink = hug.upload(image) image.seek(0, 2) file_size = image.tell() image.seek(0) image_mime_type, _ = guess_type(image.name) data = [ { "meta": { "_type": "gradio.FileData" }, "mime_type": image_mime_type, "orig_name": filename, "path": filelink.split("=", 1)[1], "size": file_size, "url": filelink } ] datas = os.getenv('DATA') jsnd = json.loads(datas) data.append(jsnd) data.append( question ) data.append( { "tab_index": 0 } ) hug.filnal_setup(data, 2, 15) hug.start() return hug.output.get("data")[0] def valuate_qna(p: str, r: str) -> str: spid2 = os.getenv('SPID2') hug = hugging(spid2) data = [ p, r ] hug.filnal_setup(data, 0, 12) hug.start() return hug.output.get("data", [None])[0] def ocr(image: io.BytesIO) -> str: return process_image(image, os.getenv('AZKEY'), os.getenv('AZURL')) app = Flask(__name__) @app.route('/getans', methods=['POST']) def gettans_task(): if 'file' not in request.files: if len(request.data) > 0: pass else: return jsonify({"status" : False, "msg" : "No file found"}), 400 if 'file' in request.files: file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 file_content = io.BytesIO(file.read()) file_content.name = file.filename else: rawFile = request.data file_content = io.BytesIO(rawFile) filename = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) + ".png" file_content.name = request.args.get("name", filename) headers = dict(request.headers) if not headers.get("KEY") != os.getenv("KEY"): return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404 print(f"QUESTION ASKED : {headers.get('QU', '')}") answer = get_answer(file_content, headers.get('QU', '')) return jsonify({"status" : True, "ANS" : answer}) @app.route('/ocr', methods=['POST']) def task_ocr(): if 'file' not in request.files: if len(request.data) > 0: pass else: return jsonify({"status" : False, "msg" : "No file found"}), 400 if 'file' in request.files: file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 file_content = io.BytesIO(file.read()) file_content.name = file.filename else: rawFile = request.data file_content = io.BytesIO(rawFile) filename = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) + ".png" file_content.name = request.args.get("name", filename) headers = dict(request.headers) if not headers.get("KEY") != os.getenv("KEY"): return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404 size = len(file_content.getvalue()) if size > 4194304: return jsonify({"status" : False, "msg" : "File size is greater then 4MB"}), 404 answer = ocr(file_content) return jsonify({"status" : True, "data" : answer}) @app.route('/check', methods=['POST', 'GET']) def check_f(): jsndata = request.json p = jsndata.get("p") r = jsndata.get("r") headers = dict(request.headers) if not headers.get("KEY") != os.getenv("KEY"): return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404 data = valuate_qna(p, r) return jsonify( { "status" : True, "data" : data } ) @app.route('/') def home(): return jsonify({"message": "Welcome to my Flask API!"}) @app.route('/confirm') def confirm_product(): return jsonify({'status' : True})