Spaces:
Sleeping
Sleeping
| from PIL import Image, ImageDraw | |
| import io | |
| import base64 | |
| import requests | |
| import json | |
| from typing import List, Dict, Any | |
| from openai import OpenAI | |
| import streamlit as st | |
| import streamlit.components.v1 as components | |
| def custom_file_uploader(): | |
| uploader_html = """ | |
| <input type="file" id="fileUpload" accept=".png,.jpg,.jpeg" /> | |
| <script> | |
| const fileInput = document.getElementById("fileUpload"); | |
| fileInput.addEventListener("change", function(event) { | |
| if (fileInput.files.length > 0) { | |
| const file = fileInput.files[0]; | |
| const reader = new FileReader(); | |
| reader.readAsDataURL(file); | |
| reader.onload = function() { | |
| const result = reader.result; | |
| window.parent.postMessage({type: 'FILE_UPLOAD', file: result}, '*'); | |
| }; | |
| } | |
| }); | |
| </script> | |
| """ | |
| st.markdown(uploader_html, unsafe_allow_html=True) | |
| def resize_image(image: Image.Image) -> Image.Image: | |
| new_height = int(image.height * 512 / image.width) | |
| return image.resize((512, new_height)) | |
| def convert_image_to_base64(image: Image.Image) -> str: | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="PNG") | |
| return base64.b64encode(buffered.getvalue()).decode() | |
| def post_request_and_parse_response(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: | |
| headers = {"Content-Type": "application/json"} | |
| response = requests.post(url, json=payload, headers=headers) | |
| byte_data = response.content | |
| decoded_string = byte_data.decode("utf-8") | |
| dict_data = json.loads(decoded_string) | |
| return dict_data | |
| def draw_bounding_boxes_for_textract(image: Image.Image, json_data: str) -> Image.Image: | |
| draw = ImageDraw.Draw(image) | |
| try: | |
| data = json_data | |
| blocks = json.loads(data['body']) if 'body' in data else None | |
| except json.JSONDecodeError: | |
| return image | |
| if blocks is None: | |
| return image | |
| for item in blocks: | |
| if 'BlockType' in item and item['BlockType'] in ['LINE', 'WORD']: | |
| bbox = item['Geometry']['BoundingBox'] | |
| left, top, width, height = bbox['Left'], bbox['Top'], bbox['Width'], bbox['Height'] | |
| left_top = (left * image.width, top * image.height) | |
| right_bottom = ((left + width) * image.width, (top + height) * image.height) | |
| draw.rectangle([left_top, right_bottom], outline='red', width=2) | |
| return image | |
| def extract_text_from_textract_blocks(blocks: List[Dict[str, Any]]) -> str: | |
| extracted_text = [] | |
| blocks = json.loads(blocks) | |
| for block in blocks: | |
| if isinstance(block, dict): | |
| if block.get('BlockType') in ['LINE', 'WORD'] and 'Text' in block: | |
| extracted_text.append(block['Text']) | |
| return ' '.join(extracted_text) | |
| class ChatGPTClient: | |
| def __init__(self, api_key: str, protocol: str = "You are a helpful assistant.", body=None): | |
| self.api_key = api_key | |
| self.client = OpenAI(api_key=self.api_key) | |
| self.protocol = protocol | |
| self.body = body | |
| self.history: List[Dict[str, str]] = [ | |
| {"role": "system", "content": self.protocol}, | |
| {"role": "user", "content": f"The content provided: {self.body}"} | |
| ] | |
| def append_message(self, role: str, content: str) -> None: | |
| if role in ["system", "user", "assistant"]: | |
| self.history.append({"role": role, "content": content}) | |
| def generate_response(self, question: str) -> str: | |
| try: | |
| self.append_message("user", question) | |
| response = self.client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=self.history | |
| ) | |
| output = response.choices[0].message.content | |
| self.append_message("assistant", output) | |
| except Exception as e: | |
| output = "Sorry, I couldn't get an answer for that." | |
| return output | |