Spaces:
Sleeping
Sleeping
from PIL import Image, ImageDraw | |
import io | |
import base64 | |
import requests | |
import json | |
from typing import List, Dict, Any | |
from openai import OpenAI | |
import streamlit as st | |
import streamlit.components.v1 as components | |
def custom_file_uploader(): | |
uploader_html = """ | |
<input type="file" id="fileUpload" accept=".png,.jpg,.jpeg" /> | |
<script> | |
const fileInput = document.getElementById("fileUpload"); | |
fileInput.addEventListener("change", function(event) { | |
if (fileInput.files.length > 0) { | |
const file = fileInput.files[0]; | |
const reader = new FileReader(); | |
reader.readAsDataURL(file); | |
reader.onload = function() { | |
const result = reader.result; | |
window.parent.postMessage({type: 'FILE_UPLOAD', file: result}, '*'); | |
}; | |
} | |
}); | |
</script> | |
""" | |
st.markdown(uploader_html, unsafe_allow_html=True) | |
def resize_image(image: Image.Image) -> Image.Image: | |
new_height = int(image.height * 512 / image.width) | |
return image.resize((512, new_height)) | |
def convert_image_to_base64(image: Image.Image) -> str: | |
buffered = io.BytesIO() | |
image.save(buffered, format="PNG") | |
return base64.b64encode(buffered.getvalue()).decode() | |
def post_request_and_parse_response(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: | |
headers = {"Content-Type": "application/json"} | |
response = requests.post(url, json=payload, headers=headers) | |
byte_data = response.content | |
decoded_string = byte_data.decode("utf-8") | |
dict_data = json.loads(decoded_string) | |
return dict_data | |
def draw_bounding_boxes_for_textract(image: Image.Image, json_data: str) -> Image.Image: | |
draw = ImageDraw.Draw(image) | |
try: | |
data = json_data | |
blocks = json.loads(data['body']) if 'body' in data else None | |
except json.JSONDecodeError: | |
return image | |
if blocks is None: | |
return image | |
for item in blocks: | |
if 'BlockType' in item and item['BlockType'] in ['LINE', 'WORD']: | |
bbox = item['Geometry']['BoundingBox'] | |
left, top, width, height = bbox['Left'], bbox['Top'], bbox['Width'], bbox['Height'] | |
left_top = (left * image.width, top * image.height) | |
right_bottom = ((left + width) * image.width, (top + height) * image.height) | |
draw.rectangle([left_top, right_bottom], outline='red', width=2) | |
return image | |
def extract_text_from_textract_blocks(blocks: List[Dict[str, Any]]) -> str: | |
extracted_text = [] | |
blocks = json.loads(blocks) | |
for block in blocks: | |
if isinstance(block, dict): | |
if block.get('BlockType') in ['LINE', 'WORD'] and 'Text' in block: | |
extracted_text.append(block['Text']) | |
return ' '.join(extracted_text) | |
class ChatGPTClient: | |
def __init__(self, api_key: str, protocol: str = "You are a helpful assistant.", body=None): | |
self.api_key = api_key | |
self.client = OpenAI(api_key=self.api_key) | |
self.protocol = protocol | |
self.body = body | |
self.history: List[Dict[str, str]] = [ | |
{"role": "system", "content": self.protocol}, | |
{"role": "user", "content": f"The content provided: {self.body}"} | |
] | |
def append_message(self, role: str, content: str) -> None: | |
if role in ["system", "user", "assistant"]: | |
self.history.append({"role": role, "content": content}) | |
def generate_response(self, question: str) -> str: | |
try: | |
self.append_message("user", question) | |
response = self.client.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=self.history | |
) | |
output = response.choices[0].message.content | |
self.append_message("assistant", output) | |
except Exception as e: | |
output = "Sorry, I couldn't get an answer for that." | |
return output | |