|
from PIL import Image, ImageDraw |
|
import io |
|
import base64 |
|
import requests |
|
import json |
|
from typing import List, Dict, Any |
|
from openai import OpenAI |
|
|
|
import streamlit as st |
|
import streamlit.components.v1 as components |
|
|
|
|
|
def custom_file_uploader(): |
|
uploader_html = """ |
|
<input type="file" id="fileUpload" accept=".png,.jpg,.jpeg" /> |
|
<script> |
|
// Hide the 'Take photo' option (works on some browsers, not universally enforced) |
|
document.getElementById("fileUpload").capture = false; |
|
</script> |
|
""" |
|
|
|
components.html(uploader_html, height=100) |
|
|
|
def resize_image(image: Image.Image) -> Image.Image: |
|
new_height = int(image.height * 512 / image.width) |
|
return image.resize((512, new_height)) |
|
|
|
def convert_image_to_base64(image: Image.Image) -> str: |
|
buffered = io.BytesIO() |
|
image.save(buffered, format="PNG") |
|
return base64.b64encode(buffered.getvalue()).decode() |
|
|
|
def post_request_and_parse_response(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: |
|
headers = {"Content-Type": "application/json"} |
|
response = requests.post(url, json=payload, headers=headers) |
|
byte_data = response.content |
|
decoded_string = byte_data.decode("utf-8") |
|
dict_data = json.loads(decoded_string) |
|
return dict_data |
|
|
|
def draw_bounding_boxes_for_textract(image: Image.Image, json_data: str) -> Image.Image: |
|
draw = ImageDraw.Draw(image) |
|
try: |
|
data = json_data |
|
blocks = json.loads(data['body']) if 'body' in data else None |
|
except json.JSONDecodeError: |
|
return image |
|
|
|
if blocks is None: |
|
return image |
|
|
|
for item in blocks: |
|
if 'BlockType' in item and item['BlockType'] in ['LINE', 'WORD']: |
|
bbox = item['Geometry']['BoundingBox'] |
|
left, top, width, height = bbox['Left'], bbox['Top'], bbox['Width'], bbox['Height'] |
|
left_top = (left * image.width, top * image.height) |
|
right_bottom = ((left + width) * image.width, (top + height) * image.height) |
|
draw.rectangle([left_top, right_bottom], outline='red', width=2) |
|
|
|
return image |
|
|
|
def extract_text_from_textract_blocks(blocks: List[Dict[str, Any]]) -> str: |
|
extracted_text = [] |
|
blocks = json.loads(blocks) |
|
for block in blocks: |
|
if isinstance(block, dict): |
|
if block.get('BlockType') in ['LINE', 'WORD'] and 'Text' in block: |
|
extracted_text.append(block['Text']) |
|
return ' '.join(extracted_text) |
|
|
|
class ChatGPTClient: |
|
def __init__(self, api_key: str, protocol: str = "You are a helpful assistant.", body=None): |
|
self.api_key = api_key |
|
self.client = OpenAI(api_key=self.api_key) |
|
self.protocol = protocol |
|
self.body = body |
|
self.history: List[Dict[str, str]] = [ |
|
{"role": "system", "content": self.protocol}, |
|
{"role": "user", "content": f"The content provided: {self.body}"} |
|
] |
|
|
|
def append_message(self, role: str, content: str) -> None: |
|
if role in ["system", "user", "assistant"]: |
|
self.history.append({"role": role, "content": content}) |
|
|
|
def generate_response(self, question: str) -> str: |
|
try: |
|
self.append_message("user", question) |
|
response = self.client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=self.history |
|
) |
|
output = response.choices[0].message.content |
|
self.append_message("assistant", output) |
|
except Exception as e: |
|
output = "Sorry, I couldn't get an answer for that." |
|
return output |
|
|