File size: 7,319 Bytes
9ae77d3 26e1774 0d1fedf 26e1774 f173779 9d05186 68856cf 10f6029 d5f755c 10f6029 d0dd26e 10f6029 c153d6e 10f6029 26e1774 0d1fedf 26e1774 9ae77d3 0d1fedf 635344d 68856cf 26e1774 68856cf 26e1774 edfcb6a 26e1774 0d1fedf 9ae77d3 26e1774 4393529 aae2a9b 10f6029 aae2a9b 9ae77d3 4393529 d2df03b bb1f655 d2df03b 9d05186 b2b8786 9d05186 d2df03b f0efa3b d2df03b 9ae77d3 d2df03b 18f5113 aae2a9b bb1f655 aae2a9b 9d05186 b2b8786 9d05186 b2b8786 9d05186 aae2a9b fbe3348 10f6029 aae2a9b 2ad6adc aae2a9b d6f16db 9ae77d3 d6f16db bf01bb4 c38cffb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
from flask import Flask, request, jsonify
import os
from hiou import hugging
import json
import io
import random
import string
from mimetypes import guess_type
import requests
import time
from PIL import Image
def compress_image(input_stream: io.BytesIO, max_size_mb=4) -> io.BytesIO:
with Image.open(input_stream) as img:
img = Image.new(img.mode, img.size)
img.paste(img)
quality = 95
step = 5
output_stream = io.BytesIO()
while True:
output_stream.seek(0)
img.save(output_stream, format="JPEG", quality=quality)
output_stream_size_mb = len(output_stream.getvalue()) / (1024 * 1024)
if output_stream_size_mb <= max_size_mb:
output_stream.seek(0)
print(f"Image compressed to {output_stream_size_mb:.2f} MB")
return output_stream
quality -= step
if quality < step:
print("Cannot compress the image to the desired size.")
output_stream.seek(0)
return output_stream
def read_image_from_stream(image_data, subscription_key, endpoint):
headers = {
'Ocp-Apim-Subscription-Key': subscription_key,
'Content-Type': 'application/octet-stream'
}
img = image_data.read()
response = requests.post(f"{endpoint}/vision/v3.2/read/analyze", headers=headers, data=img)
if response.status_code == 202:
read_response_headers = response.headers
operation_location = read_response_headers["Operation-Location"]
else:
raise Exception(f"Unexpected response status: {response.status_code}, {response.text}")
return operation_location
def get_read_result(operation_location, subscription_key):
headers = {
'Ocp-Apim-Subscription-Key': subscription_key
}
while True:
response = requests.get(operation_location, headers=headers)
if response.status_code == 200: # HTTP 200 indicates success
read_result = response.json()
status = read_result['status']
if status == 'succeeded':
break
elif status in ['failed', 'notStarted', 'running']:
time.sleep(1) # Wait before polling again
else:
raise Exception(f"Unexpected status: {status}")
else:
raise Exception(f"Unexpected response status: {response.status_code}")
return read_result
def process_image(image_path, subscription_key, endpoint):
operation_location = read_image_from_stream(image_path, subscription_key, endpoint)
operation_id = operation_location.split("/")[-1]
operation_location = f"{endpoint}/vision/v3.2/read/analyzeResults/{operation_id}"
read_result = get_read_result(operation_location, subscription_key)
if read_result['status'] == 'succeeded':
output = []
for text_result in read_result['analyzeResult']['readResults']:
for line in text_result['lines']:
output.append(line['text'])
return " ".join(output).replace("\n", " ") # Join lines and replace newlines with spaces
else:
return "Processing failed or did not succeed."
def get_answer(image: io.BytesIO, question: str):
filename = image.name
spid = os.getenv('SPID')
hug = hugging(spid)
filelink = hug.upload(image)
image.seek(0, 2)
file_size = image.tell()
image.seek(0)
image_mime_type, _ = guess_type(image.name)
data = [
{
"meta": {
"_type": "gradio.FileData"
},
"mime_type": image_mime_type,
"orig_name": filename,
"path": filelink.split("=", 1)[1],
"size": file_size,
"url": filelink
}
]
datas = os.getenv('DATA')
jsnd = json.loads(datas)
data.append(jsnd)
data.append(
question
)
data.append(
{
"tab_index": 0
}
)
hug.filnal_setup(data, 2, 15)
hug.start()
return hug.output.get("data")[0]
def valuate_qna(p: str, r: str) -> str:
spid2 = os.getenv('SPID2')
hug = hugging(spid2)
data = [
p,
r
]
hug.filnal_setup(data, 0, 12)
hug.start()
return hug.output.get("data", [None])[0]
def ocr(image: io.BytesIO) -> str:
return process_image(image, os.getenv('AZKEY'), os.getenv('AZURL'))
app = Flask(__name__)
@app.route('/getans', methods=['POST'])
def gettans_task():
if 'file' not in request.files:
if len(request.data) > 0:
pass
else:
return jsonify({"status" : False, "msg" : "No file found"}), 400
if 'file' in request.files:
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
file_content = io.BytesIO(file.read())
file_content.name = file.filename
else:
rawFile = request.data
file_content = io.BytesIO(rawFile)
filename = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) + ".png"
file_content.name = request.args.get("name", filename)
headers = dict(request.headers)
if not headers.get("KEY") != os.getenv("KEY"):
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404
print(f"QUESTION ASKED : {headers.get('QU', '')}")
answer = get_answer(file_content, headers.get('QU', ''))
return jsonify({"status" : True, "ANS" : answer})
@app.route('/ocr', methods=['POST'])
def task_ocr():
if 'file' not in request.files:
if len(request.data) > 0:
pass
else:
return jsonify({"status" : False, "msg" : "No file found"}), 400
if 'file' in request.files:
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
file_content = io.BytesIO(file.read())
file_content.name = file.filename
else:
rawFile = request.data
file_content = io.BytesIO(rawFile)
filename = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10)) + ".png"
file_content.name = request.args.get("name", filename)
headers = dict(request.headers)
if not headers.get("KEY") != os.getenv("KEY"):
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404
size = len(file_content.getvalue())
if size > 4194304:
return jsonify({"status" : False, "msg" : "File size is greater then 4MB"}), 404
answer = ocr(file_content)
return jsonify({"status" : True, "data" : answer})
@app.route('/check', methods=['POST', 'GET'])
def check_f():
jsndata = request.json
p = jsndata.get("p")
r = jsndata.get("r")
headers = dict(request.headers)
if not headers.get("KEY") != os.getenv("KEY"):
return jsonify({"status" : False, "msg" : "Invalid API Key"}), 404
data = valuate_qna(p, r)
return jsonify(
{
"status" : True,
"data" : data
}
)
@app.route('/')
def home():
return jsonify({"message": "Welcome to my Flask API!"})
@app.route('/confirm')
def confirm_product():
return jsonify({'status' : True})
|