import json import os import streamlit as st from streamlit import session_state as ss from streamlit_local_storage import LocalStorage from form.form import build_form_data_from_answers, write_pdf_form from llm_manager.llm_parser import LlmParser from local_storage.entities import PersonalDetails, LocationDetails, ContractorDetails from prompts.prompts_manager import PromptsManager from enums import Questions as Q from repository.repository import get_repository from repository import ModelRoles, Model from utils.parsing_utils import check_for_missing_answers ls: LocalStorage = LocalStorage() def in_hf() -> bool: return os.getenv("env") == "hf" class UIManager: def __init__(self): self.pm: PromptsManager = PromptsManager() self.repository = (build_repo_from_environment(self.pm.system_prompt) or get_repository("testing", Model("fakeModel", ModelRoles("a", "b", "c")))) @staticmethod def get_current_step(): return ss.get("step") @staticmethod def _build_base_ui(): st.markdown("## Dubai Asset Management red tape cutter") def build_ui_for_initial_state(self, user_message): help_ = user_message self._build_base_ui() with st.form("Please describe your request"): user_input = st.text_area("Your input", height=700, label_visibility="hidden", placeholder=help_, help=help_) signature = st.file_uploader("Your signature", key="file_upload") ss["signature"] = signature submit_button = st.form_submit_button() if submit_button: ss["user_input"] = user_input ss["step"] = "parsing_answers" st.rerun() def build_ui_for_parsing_answers(self): self._build_base_ui() with st.status("initialising LLM"): self.repository.init() with st.status("waiting for LLM"): answer = self.repository.send_prompt(self.pm.verify_user_input_prompt(ss["user_input"])) st.write(f"answers from LLM: {answer['content']}") with st.status("Checking for missing answers"): answers = LlmParser.parse_verification_prompt_answers(answer['content']) ss["answers"] = answers if len(answers) != len(Q): ss["step"] = "parsing_error" st.rerun() ss["missing_answers"] = check_for_missing_answers(ss["answers"]) if not ss.get("missing_answers"): ss["step"] = "check_category" else: ss["step"] = "ask_again" st.rerun() def build_ui_for_ask_again(self): self._build_base_ui() with st.form("form1"): for ma in ss["missing_answers"]: st.text_input(self.pm.questions[ma].lower(), key=ma) submitted = st.form_submit_button("Submit answers") if submitted: for ma in ss["missing_answers"]: ss["answers"][ma] = ss[ma] ss["step"] = "check_category" st.rerun() def build_ui_for_check_category(self): self._build_base_ui() with st.status("finding the work categories applicable to your work"): answer = self.repository.send_prompt(self.pm.get_work_category(ss["answers"][1])) categories = LlmParser.parse_get_categories_answer(answer['content']) with st.status("categories found, creating PDF form"): form_data, filename = build_form_data_from_answers(ss["answers"], categories, ss.get("signature")) pdf_form = write_pdf_form(form_data) pdf_form_filename = filename ss["pdf_form"] = pdf_form ss["pdf_form_filename"] = pdf_form_filename ss["step"] = "form_created" st.rerun() def build_ui_for_form_created(self): self._build_base_ui() st.download_button("download form", ss["pdf_form"], file_name=ss["pdf_form_filename"], mime="application/pdf") start_over_button = st.button("Start over") if start_over_button: del ss["step"] del ss["pdf_form"] del ss["pdf_form_filename"] if "signature" in ss: del ss["signature"] st.rerun() def build_ui_for_parsing_error(self): def build_form_fragment(form_, col, title, *questions): form_.text(title) for user_data in questions: with col: form_.text_input(self.pm.questions_to_field_labels()[user_data], value=ss.get("answers", {}) .get(user_data), key=f"fq_{user_data.value}") with col: form_.text_input("Save as", key=title.replace(" ", "_")) self._build_base_ui() f = st.form("Please check the following information and correct fix any inaccuracies") col1, col2 = f.columns(2) build_form_fragment(f, col1, "your details", Q.FULL_NAME, Q.CONTACT_NUMBER, Q.YOUR_EMAIL) build_form_fragment(f, col2, "work details", Q.WORK_TO_DO, Q.START_DATE, Q.END_DATE) build_form_fragment(f, col1, "location details", Q.COMMUNITY, Q.BUILDING, Q.UNIT_APT_NUMBER, Q.OWNER_OR_TENANT) build_form_fragment(f, col2, "contractor details", Q.COMPANY_NAME, Q.COMPANY_NUMBER, Q.COMPANY_EMAIL) submit_data = f.form_submit_button() if submit_data: for i in range(len(Q)): ss["answers"][Q(i)] = ss[f"fq_{i}"] for details_key, func in [("your_details", self._get_personal_details), ("location_details", self._get_location_details), ("contractor_details", self._get_contractor_details)]: details = func(details_key) if details: key = ss[details_key] # get the name under which this data should be saved ls.setItem(key, json.dumps(details)) @staticmethod def _get_personal_details(personal_details_key) -> PersonalDetails | None: if ss.get(personal_details_key): return PersonalDetails(ss[f"fq_{Q.FULL_NAME}"], ss[f"fq_{Q.FULL_NAME}"], ss[f"fq_{Q.CONTACT_NUMBER}"]) return None @staticmethod def _get_location_details(location_details_key) -> LocationDetails | None: if ss.get(location_details_key): return LocationDetails(ss[f"fq_{Q.OWNER_OR_TENANT}"], ss[f"fq_{Q.COMMUNITY}"], ss[f"fq_{Q.BUILDING}"], ss[f"fq_{Q.UNIT_APT_NUMBER}"]) return None @staticmethod def _get_contractor_details(contractor_details_key) -> ContractorDetails | None: if ss.get(contractor_details_key): return ContractorDetails(ss[f"fq_{Q.COMPANY_NAME}"], ss[f"fq_{Q.COMPANY_NUMBER}"], ss[f"fq_{Q.COMPANY_EMAIL}"]) return None