enricorampazzo's picture
fixed Dockerfile and package imports
e382003
import re
import streamlit as st
from streamlit import session_state as ss
from streamlit.runtime.scriptrunner_utils.exceptions import StopException
from email_manager.email_manager import EmailManager
from enums.enums import Steps, DetailsType, Questions as Q
from form.form import work_categories, build_form_data_from_answers, write_pdf_form
from llm_manager.llm_parser import LlmParser
from local_storage.entities import PersonalDetails, LocationDetails, ContractorDetails, SavedDetails, \
detail_classes
from local_storage.ls_manager import LocalStorageManager
from prompts.prompts_manager import PromptsManager
from repository import Model, ModelRoles
from repository.repository import build_repo_from_environment, get_repository
from utils.parsing_utils import check_for_missing_answers
class UIManager:
def __init__(self):
self.find_tags_regex = re.compile(r"@(\S*)")
self.pm: PromptsManager = PromptsManager(work_categories=work_categories)
self.repository = (build_repo_from_environment(self.pm.system_prompt) or
get_repository("testing",
Model("fakeModel",
ModelRoles("a", "b", "c"))))
self.update_in_progress = False
self.lsm = None
self.em = EmailManager(self.pm, self.repository)
@staticmethod
def get_current_step() -> int:
try:
return ss.get("step") or Steps.INITIAL_STATE.value
except StopException:
return Steps.INITIAL_STATE.value
@staticmethod
def set_current_step(step: Steps):
ss["step"] = step.value
def _build_base_ui(self):
st.markdown("## Dubai Asset Management red tape cutter")
self.lsm = LocalStorageManager() if not self.lsm else self.lsm
with st.sidebar:
st.markdown("### Personal details")
self.build_details_checkboxes(DetailsType.PERSONAL_DETAILS)
st.markdown("### Locations details")
self.build_details_checkboxes(DetailsType.LOCATION_DETAILS)
st.markdown("### Contractors details")
self.build_details_checkboxes(DetailsType.CONTRACTOR_DETAILS)
def build_ui_for_initial_state(self, user_message):
help_ = user_message + "\n".join(self.pm.questions)
self._build_base_ui()
with st.form("Please describe your request"):
st.text_area("Your input", height=700, label_visibility="hidden", placeholder=help_,
help=help_, key="user_input")
signature = st.file_uploader("Your signature", key="file_upload")
ss["signature"] = signature
submit_button = st.form_submit_button()
if submit_button:
self.set_current_step(Steps.PARSING_ANSWERS)
st.rerun()
def build_ui_for_parsing_answers(self):
self._build_base_ui()
with st.status("parsing user input for tags"):
tags = self.find_tags_regex.findall(ss["user_input"])
ss["user_input"] = self.find_tags_regex.sub("",ss["user_input"])
details = [self.lsm.get_detail(t) for t in tags]
with st.status("initialising LLM"):
self.repository.init()
with st.status("waiting for LLM"):
answer = self.repository.send_prompt(self.pm.verify_user_input_prompt(ss["user_input"], details))
st.write(f"answers from LLM: {answer['content']}")
with st.status("Checking for missing answers"):
answers = LlmParser.parse_verification_prompt_answers(answer['content'], details)
ss["answers"] = answers
if len(answers) != len(Q):
self.set_current_step(Steps.PARSING_ERROR)
st.rerun()
ss["missing_answers"] = check_for_missing_answers(ss["answers"])
if not ss.get("missing_answers"):
self.set_current_step(Steps.FIND_CATEGORIES)
else:
self.set_current_step(Steps.ASK_AGAIN)
st.rerun()
def build_ui_for_ask_again(self):
self._build_base_ui()
with st.form("form1"):
for ma in ss["missing_answers"]:
st.text_input(self.pm.questions[ma.value].lower(), key=ma)
submitted = st.form_submit_button("Submit answers")
if submitted:
for ma in ss["missing_answers"]:
ss["answers"][ma] = ss[ma]
self.set_current_step(Steps.FIND_CATEGORIES)
st.rerun()
def build_ui_for_check_category(self):
self._build_base_ui()
with st.status("finding the work categories applicable to your work"):
answer = self.repository.send_prompt(self.pm.get_work_category(ss["answers"][Q.WORK_TO_DO]))
categories = LlmParser.parse_get_categories_answer(answer['content'])
ss["categories"] = categories
self.set_current_step(Steps.VALIDATE_DATA)
st.rerun()
def build_ui_for_form_created(self):
self._build_base_ui()
st.download_button("download form", ss["pdf_form"],
file_name=ss["pdf_form_filename"], mime="application/pdf")
em = self.em.create_email(ss["answers"], ss["pdf_form"], ss["pdf_form_filename"])
st.download_button("download email with attached form", em.as_bytes(), file_name="work_permit_email.eml")
start_over_button = st.button("Start over")
if start_over_button:
del ss["step"]
del ss["pdf_form"]
del ss["pdf_form_filename"]
if "signature" in ss:
del ss["signature"]
st.rerun()
def _integrate_llm_answers_with_user_corrections(self):
for i in range(len(Q)):
ss["answers"][Q(i)] = ss[f"fq_{Q(i).name}"]
for details_key, func in [("your_details", self._get_personal_details),
("location_details", self._get_location_details),
("contractor_details", self._get_contractor_details)]:
details = func(details_key)
if details:
key = ss[details_key] # get the name under which this data should be saved
self.lsm.save_details(details, key)
self.set_current_step(Steps.FIND_CATEGORIES)
st.rerun()
def _create_pdf_form(self):
with st.status("categories found, creating PDF form"):
form_data, filename = build_form_data_from_answers(ss["answers"], ss["categories"],
ss.get("signature"))
pdf_form = write_pdf_form(form_data)
pdf_form_filename = filename
ss["pdf_form"] = pdf_form
ss["pdf_form_filename"] = pdf_form_filename
self.set_current_step(Steps.FORM_CREATED)
st.rerun()
def build_ui_for_validate_data_after_correction(self):
self._build_validation_form(False, self._integrate_llm_answers_with_user_corrections,
"Find work categories")
def build_ui_to_confirm_form_data(self):
self._build_validation_form(True, self._create_pdf_form,
"Create work permit request")
@staticmethod
def _get_personal_details(personal_details_key) -> PersonalDetails | None:
key_ = ss.get(personal_details_key)
if key_:
details = PersonalDetails(ss[f"fq_{Q.FULL_NAME.name}"], ss[f"fq_{Q.YOUR_EMAIL.name}"],
ss[f"fq_{Q.CONTACT_NUMBER.name}"])
return details
return None
@staticmethod
def _get_location_details(location_details_key) -> LocationDetails | None:
if ss.get(location_details_key):
return LocationDetails(ss[f"fq_{Q.OWNER_OR_TENANT.name}"], ss[f"fq_{Q.COMMUNITY.name}"],
ss[f"fq_{Q.BUILDING.name}"], ss[f"fq_{Q.UNIT_APT_NUMBER.name}"])
return None
@staticmethod
def _get_contractor_details(contractor_details_key) -> ContractorDetails | None:
if ss.get(contractor_details_key):
return ContractorDetails(ss[f"fq_{Q.COMPANY_NAME.name}"], ss[f"fq_{Q.COMPANY_NUMBER.name}"],
ss[f"fq_{Q.COMPANY_EMAIL.name}"])
return None
def _build_validation_form(self, show_categories: bool, on_submit, submit_button_label):
def build_form_fragment(form_, col, title, add_save, *questions):
form_.text(title)
for user_data in questions:
with col:
form_.text_input(self.pm.questions_to_field_labels()[user_data], value=ss.get("answers", {})
.get(user_data), key=f"fq_{user_data.name}")
if add_save:
with col:
form_.text_input("Save as", key=title.replace(" ", "_"))
self._build_base_ui()
f = st.form("Please check the following information and correct fix any inaccuracies")
col1, col2 = f.columns(2)
build_form_fragment(f, col1, "your details", True, Q.FULL_NAME, Q.CONTACT_NUMBER, Q.YOUR_EMAIL)
build_form_fragment(f, col2, "work details", False, Q.WORK_TO_DO, Q.START_DATE, Q.END_DATE)
build_form_fragment(f, col1, "location details", True, Q.COMMUNITY, Q.BUILDING, Q.UNIT_APT_NUMBER,
Q.OWNER_OR_TENANT)
build_form_fragment(f, col2, "contractor details", True, Q.COMPANY_NAME, Q.COMPANY_NUMBER, Q.COMPANY_EMAIL)
if show_categories:
for k, wc in work_categories.items():
f.checkbox(label=wc, key=k, value=k in ss["categories"])
submit_data = f.form_submit_button(label=submit_button_label)
if submit_data:
on_submit()
def build_details_checkboxes(self, dt: DetailsType):
details = self.lsm.get_details(dt)
with st.container(border=True):
col1, col2 = st.columns(2)
with col1:
st.markdown(f"#### {dt.title()}")
with col2:
st.markdown("#### Default")
for d in details:
with col1:
st.checkbox(label=d.short_description(), key=f"{dt.name}_{d.key}",
on_change=self._update_user_prompt, args=[dt, d.key])
with col2:
st.toggle(f"favourite_{d.key}", label_visibility="hidden", value=ss.get(f"DEFAULT_{dt.name}"))
add_new = st.button(f"Add {dt.title()}")
if add_new:
self.add_new_detail_dialog(dt)
@st.dialog("Add new")
def add_new_detail_dialog(self, type_: DetailsType):
new_item = detail_classes[type_]()
with st.form("new item", border=False):
fields_labels = new_item.widget_labels()
for k, v in fields_labels.items():
st.text_input(label=v, key=k)
btn_save = st.form_submit_button("Save")
if btn_save:
for k in fields_labels:
setattr(new_item, k, ss[k])
self.lsm.save_details(new_item)
st.rerun()
def _update_user_prompt(self, type_: DetailsType, key: str):
if not self.update_in_progress:
self.update_in_progress = True
checkbox_key = f"{type_.name}_{key}"
if ss.get(checkbox_key) is True:
# if the checkbox is _selected_
to_deselect = [d for d in self.lsm.get_details(type_) if d.key != key]
for td in to_deselect:
# deselect other checkbox in the same type
ss[f"{type_.name}_{td}"] = False
if f"@{td}" in ss.get("user_input"):
# remove the key associated with this checkbox from the user input textarea
ss["user_input"] = ss.get["user_input"].replace(f"@{td}", "")
# add the key associated to the newly selected checkbox in the user input textarea
ss["user_input"] = f"{ss.get('user_input', '')} @{key}".strip()
else:
# remove the key associated to the newly deselected checkbox in the user input textarea
if f"@{key}" in ss.get("user_input"):
ss["user_input"] = ss.get("user_input", "").replace(f"@{key}", "").strip()
self.update_in_progress = False