flaskapp / main.py
5m4ck3r's picture
Update main.py
c38cffb verified
raw
history blame
7.32 kB
from flask import Flask, request, jsonify
import os
from hiou import hugging
import json
import io
import random
import string
from mimetypes import guess_type
import requests
import time
from PIL import Image
def compress_image(input_stream: io.BytesIO, max_size_mb=4) -> io.BytesIO:
with Image.open(input_stream) as img:
img = Image.new(img.mode, img.size)
img.paste(img)
quality = 95
step = 5
output_stream = io.BytesIO()
while True:
output_stream.seek(0)
img.save(output_stream, format="JPEG", quality=quality)
output_stream_size_mb = len(output_stream.getvalue()) / (1024 * 1024)
if output_stream_size_mb <= max_size_mb:
output_stream.seek(0)
print(f"Image compressed to {output_stream_size_mb:.2f} MB")
return output_stream
quality -= step
if quality < step:
print("Cannot compress the image to the desired size.")
output_stream.seek(0)
return output_stream
def read_image_from_stream(image_data, subscription_key, endpoint):
headers = {
'Ocp-Apim-Subscription-Key': subscription_key,
'Content-Type': 'application/octet-stream'
}
img = image_data.read()
response = requests.post(f"{endpoint}/vision/v3.2/read/analyze", headers=headers, data=img)
if response.status_code == 202:
read_response_headers = response.headers
operation_location = read_response_headers["Operation-Location"]
else:
raise Exception(f"Unexpected response status: {response.status_code}, {response.text}")
return operation_location
def get_read_result(operation_location, subscription_key):
headers = {
'Ocp-Apim-Subscription-Key': subscription_key
}
while True:
response = requests.get(operation_location, headers=headers)
if response.status_code == 200: # HTTP 200 indicates success
read_result = response.json()
status = read_result['status']
if status == 'succeeded':
break
elif status in ['failed', 'notStarted', 'running']:
time.sleep(1) # Wait before polling again
else:
raise Exception(f"Unexpected status: {status}")
else:
raise Exception(f"Unexpected response status: {response.status_code}")
return read_result
def process_image(image_path, subscription_key, endpoint):
operation_location = read_image_from_stream(image_path, subscription_key, endpoint)
operation_id = operation_location.split("/")[-1]
operation_location = f"{endpoint}/vision/v3.2/read/analyzeResults/{operation_id}"
read_result = get_read_result(operation_location, subscription_key)
if read_result['status'] == 'succeeded':
output = []
for text_result in read_result['analyzeResult']['readResults']:
for line in text_result['lines']:
output.append(line['text'])
return " ".join(output).replace("\n", " ") # Join lines and replace newlines with spaces
else:
return "Processing failed or did not succeed."
def get_answer(image: io.BytesIO, question: str):
filename = image.name
spid = os.getenv('SPID')
hug = hugging(spid)
filelink = hug.upload(image)
image.seek(0, 2)
file_size = image.tell()
image.seek(0)
image_mime_type, _ = guess_type(image.name)
data = [
{
"meta": {
"_type": "gradio.FileData"
},
"mime_type": image_mime_type,
"orig_name": filename,
"path": filelink.split("=", 1)[1],
"size": file_size,
"url": filelink
}
]
datas = os.getenv('DATA')
jsnd = json.loads(datas)
data.append(jsnd)
data.append(
question
)
data.append(
{
"tab_index": 0
}
)
hug.filnal_setup(data, 2, 15)
hug.start()
return hug.output.get("data")[0]
def valuate_qna(p: str, r: str) -> str:
spid2 = os.getenv('SPID2')
hug = hugging(spid2)
data = [
p,
r
]
hug.filnal_setup(data, 0, 12)
hug.start()
return hug.output.get("data", [None])[0]
def ocr(image: io.BytesIO) -> str:
return process_image(image, os.getenv('AZKEY'), os.getenv('AZURL'))
app = Flask(__name__)
@app.route('/getans', methods=['POST'])
def gettans_task():
if 'file' not in request.files:
if len(request.data) > 0:
pass
else:
return jsonify({"status" : False, "msg" : "No file found"}), 400
if 'file' in request.files:
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
file_content = io.BytesIO(file.read())
file_content.name = file.filename
else:
rawFile = request.data
file_content = io.BytesIO(rawFile)
filename = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) + ".png"
file_content.name = request.args.get("name", filename)
headers = dict(request.headers)
if not headers.get("KEY") != os.getenv("KEY"):
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404
print(f"QUESTION ASKED : {headers.get('QU', '')}")
answer = get_answer(file_content, headers.get('QU', ''))
return jsonify({"status" : True, "ANS" : answer})
@app.route('/ocr', methods=['POST'])
def task_ocr():
if 'file' not in request.files:
if len(request.data) > 0:
pass
else:
return jsonify({"status" : False, "msg" : "No file found"}), 400
if 'file' in request.files:
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
file_content = io.BytesIO(file.read())
file_content.name = file.filename
else:
rawFile = request.data
file_content = io.BytesIO(rawFile)
filename = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) + ".png"
file_content.name = request.args.get("name", filename)
headers = dict(request.headers)
if not headers.get("KEY") != os.getenv("KEY"):
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404
size = len(file_content.getvalue())
if size > 4194304:
return jsonify({"status" : False, "msg" : "File size is greater then 4MB"}), 404
answer = ocr(file_content)
return jsonify({"status" : True, "data" : answer})
@app.route('/check', methods=['POST', 'GET'])
def check_f():
jsndata = request.json
p = jsndata.get("p")
r = jsndata.get("r")
headers = dict(request.headers)
if not headers.get("KEY") != os.getenv("KEY"):
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404
data = valuate_qna(p, r)
return jsonify(
{
"status" : True,
"data" : data
}
)
@app.route('/')
def home():
return jsonify({"message": "Welcome to my Flask API!"})
@app.route('/confirm')
def confirm_product():
return jsonify({'status' : True})