planning-ai / app.py
cjber's picture
fix: attempt to fix flickering on hf
2b51dec
raw
history blame
17.3 kB
import base64
import re
import time
from os import getenv
import py7zr
import requests
import streamlit as st
import streamlit_authenticator as stauth
from bs4 import BeautifulSoup
from planning_ai.common.utils import Paths
from planning_ai.main import main as report_main
from planning_ai.preprocessing.azure_doc import azure_process_pdfs
from planning_ai.preprocessing.gcpt3 import main as preprocess_main
st.set_page_config(layout="wide")
st.markdown(
"""
<style>
/* Hide Streamlit's default header, footer */
header {visibility: hidden;}
footer {visibility: hidden;}
/* Custom footer with logo */
.custom-footer {
position: fixed;
bottom: 10px;
right: 10px;
z-index: 100;
}
/* Styling the top bar */
.top-bar {
background-color: #0A3D91;
color: white;
padding: 15px;
font-size: 32px;
font-weight: bold;
position: fixed;
top: 0;
left: 0;
width: 100%;
z-index: 1000;
display: flex;
justify-content: space-between;
align-items: center;
}
/* Contact button styling to match the other buttons */
.top-bar .contact-button {
background-color: #0A3D91;
border-radius: 0px;
color: white;
border: none;
padding: 10px 20px;
font-size: 16px;
cursor: pointer;
text-decoration: none; /* Remove underline */
}
.top-bar .contact-button:hover {
background-color: #045D8C;
}
/* Footer image */
.footer img {
height: 40px;
width: auto;
}
.stButton > button {
background-color: #0A3D91;
border-radius: 0px;
color: white;
border: none;
}
.stButton > button:hover {
background-color: #045D8C;
}
</style>
""",
unsafe_allow_html=True,
)
# Encode the image to base64
def get_image_base64(path):
with open(path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
logo_base64 = get_image_base64("logo.png")
# Add the logo in the footer using base64
st.markdown(
f"""
<div class="custom-footer">
<img src="data:image/png;base64,{logo_base64}" width="200">
</div>
""",
unsafe_allow_html=True,
)
# Top bar content
st.markdown(
"""
<div class="top-bar">
<div class="title">Planning AI</div>
<a href="mailto:[email protected]" class="contact-button">Contact</a>
</div>
""",
unsafe_allow_html=True,
)
# Load authentication secrets
auth = st.secrets.to_dict()
if "credentials" not in auth:
auth["credentials"] = {
"usernames": {
"admin": {
"email": getenv("EMAIL"),
"password": getenv("PASSWORD"),
"first_name": "Admin",
"last_name": "Admin",
"logged_in": False,
"roles": ["viewer"],
}
}
}
auth["cookie"] = {
"name": "some_cookie_name",
"key": "some_signature_key",
"expiry_days": 30,
}
# Initialize the authenticator
authenticator = stauth.Authenticate(
auth["credentials"],
auth["cookie"]["name"],
auth["cookie"]["key"],
auth["cookie"]["expiry_days"],
)
UPLOAD_DIR = Paths.RAW / "gcpt3"
def initialize_session_state():
"""Initialize session state variables."""
if "chapters" not in st.session_state:
st.session_state["chapters"] = False
if "files_extracted" not in st.session_state:
st.session_state["files_extracted"] = False
if "completed" not in st.session_state:
st.session_state["completed"] = False
if "start_time" not in st.session_state:
st.session_state["start_time"] = None
if "end_time" not in st.session_state:
st.session_state["end_time"] = None
def get_chapters(consultation_url: str):
if not consultation_url:
return "None", ["None"]
try:
response = requests.get(consultation_url)
except requests.exceptions.RequestException:
st.error("Use a valid URL.")
return "", []
if not response.ok:
st.error("Failed to fetch consultation document")
return "", []
soup = BeautifulSoup(response.text, "html.parser")
h2_tags = soup.find_all("h2")
if not len(h2_tags) >= 2:
st.error("Invalid page format - not enough <h2> headers")
return "", []
first_h2 = h2_tags[0]
second_h2 = h2_tags[1]
# Collect links between the first and second <h2>
links_between = []
for sibling in first_h2.find_all_next():
if sibling == second_h2: # Stop when reaching the second <h2>
break
if sibling.name == "a": # If it's a link
link_text = sibling.text.strip()
if link_text:
links_between.append(link_text)
cleaned_links = [re.sub(r"\s*\(.*?\)$", "", link) for link in links_between]
cleaned_title = first_h2.text.strip()
return cleaned_title, cleaned_links
def specify_chapters():
st.title("Specify Chapters")
st.write(
"Please specify the Consultation Document URL from the Consultation Hub. This will autopopulate the chapter headings for the final document. \n\n**Please ensure that the final chapter headings are correct.**"
)
chapters = []
consultation_url = st.text_input(
"Consultation Document URL",
key="consultation_url",
placeholder="https://oc2.greatercambridgeplanning.org/document/1314",
)
title, chapters = get_chapters(consultation_url)
st.write(f"**Title:** {title}")
st.write("**Chapters:**", "\n- " + "\n- ".join(chapters))
st.write(
"**If the chapter headings are incorrect, please add them manually below, separated by commas.**"
)
chapters = st.text_input(
"Chapter Headings",
key="chapter_headings",
placeholder=", ".join(chapters),
value=", ".join(chapters),
)
chapters = [chapter.strip() for chapter in chapters.split(",")]
with open(Paths.RAW / "chapters.txt", "w") as f:
f.write("\n".join(chapters))
with open(Paths.RAW / "title.txt", "w") as f:
f.write(title)
st.button(
"Save Chapters", on_click=lambda: st.session_state.update({"chapters": True})
)
def upload_and_extract_files():
"""Handle file upload and extraction."""
main1, main2 = st.columns(2)
with main1:
st.title("Introduction")
st.write(
"""
This program allows you to process JDi `.json` files automatically, to extract detailed information using AI, and produce comprehensive reports. For each _'representation document'_ two AI generated documents are produced.
1. **Representation Summary** documents contain automatically generated summaries of each representation, these representations are numbered sequentially, and by their unique ID.
2. **Executive Report** documents contain first an executive summary of the key points extracted from response documents, following this, a **Profile of Submissions** plots the demographic and geographic distribution of responses. Finally this document details **Themes and Policies**, where key themes and policies by response are highlighted, with notable information from responses bullet-pointed. This document contains inline citations, which relate back to the numbers associated with responses in the **Representation Summary Documents**. Citations are included to allow readers to manually verify the claims and points made by the AI model.
"""
)
with main2:
st.title("Upload JDi files")
st.write(
"""
1. Upload your `.json` files here as a `7zip` file.
2. Please ensure that the `.json` files follow the correct format:"""
)
with st.expander("**File Format example**"):
st.write(
r"""
```json
{
"id": 10008,
"method": "Paper",
"respondentpostcode": "CB2 9NE",
"text": "",
"attachments": [
{
"id": 3803,
"url": "http://www.cambridge.gov.uk/public/ldf/localplan2031/15417.pdf",
"published": false
}
],
"representations": [
{
"id": 15417,
"support/object": "Object",
"document": "Issues and Options Report",
"documentelementid": 29785,
"documentelementtitle": "3 - Spatial Strategy, Question 3.10",
"summary": "No more green belt taken away, which is prime agricultural land. Noise pollution & light pollution for surrounding villages and new houses being built, no bus services either!"
},
]
}
```
"""
)
if uploaded_file := st.file_uploader("Choose a `.7z` file:", type="7z"):
with st.spinner("Extracting files...", show_time=True):
try:
# Remove old files
_ = [file.unlink() for file in UPLOAD_DIR.glob("*.json")]
# Extract new files
with py7zr.SevenZipFile(uploaded_file, mode="r") as archive:
archive.extractall(path=UPLOAD_DIR)
st.session_state["files_extracted"] = True
st.success(
f"Extracted `{len(list(UPLOAD_DIR.glob('*.json')))}` files."
)
except Exception as e:
st.error(f"Failed to extract files {e}")
def build_report():
"""Build the report from extracted files."""
# Remove old files
_ = [file.unlink() for file in (Paths.OUT / "summaries").rglob("*.pdf")]
st.title("Build Report")
st.write(
"Once the files are extracted, click the button below to build the report.\n\n"
"Do **not** close this page while the report is being built."
)
if not st.session_state["start_time"]:
if st.button("Build Report", type="primary"):
st.session_state["start_time"] = time.time()
with st.spinner("Preprocessing files...", show_time=True):
try:
preprocess_main()
time_taken = time.time() - st.session_state["start_time"]
st.success(
f"Preprocessing completed successfully in {time_taken:.1f} seconds!"
)
except Exception as e:
st.error(f"An error occurred during preprocessing: {e}")
with st.spinner("Extracting text from PDFs...", show_time=True):
try:
azure_process_pdfs()
time_taken = time.time() - st.session_state["start_time"]
st.success(
f"Text extraction completed successfully in {time_taken:.1f} seconds!"
)
except Exception as e:
st.error(f"An error occurred during PDF text extraction: {e}")
with st.spinner("Building report...", show_time=True):
report_main()
st.session_state["end_time"] = time.time()
st.session_state["completed"] = True
total_time = (
st.session_state["end_time"] - st.session_state["start_time"]
)
st.success(f"Report building completed in {total_time:.1f} seconds!")
def display_download_buttons(rep):
"""Display download buttons for the generated reports."""
# remove some old intermediate files
_ = [file.unlink() for file in (Paths.STAGING / "pdfs_azure").glob("*.pdf")]
with open((Paths.RAW / "failed_downloads.txt"), "w") as f:
f.write("")
st.success("Reports built successfully! Please click download buttons below.")
st.write("---")
st.header("Download Reports")
st.markdown(
"""
The following download buttons provides links to the final report,
alongside summaries of the representations used to built this report.
"""
)
# Add some spacing and better organization
st.markdown("---")
# Create a container for the Executive Reports
with st.expander("**Executive Reports**"):
summaries_pdf_path = Paths.SUMMARY / f"Overview_of_Public_Submissions-{rep}.pdf"
summaries_docx_path = (
Paths.SUMMARY / f"Overview_of_Public_Submissions-{rep}.docx"
)
with st.container():
st.subheader(f"Executive Report for {rep}")
col1, col2 = st.columns(2)
with col1:
with open(summaries_pdf_path, "rb") as pdf_file:
st.download_button(
label="Download PDF Version",
data=pdf_file,
file_name=f"Overview_of_Public_Submissions-{rep}.pdf",
mime="application/pdf",
use_container_width=True,
key=f"exec_pdf_{hash(rep)}",
)
with col2:
with open(summaries_docx_path, "rb") as docx_file:
st.download_button(
label="Download DOCX Version",
data=docx_file,
file_name=f"Overview_of_Public_Submissions-{rep}.docx",
mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
use_container_width=True,
key=f"exec_docx_{hash(rep)}",
)
st.markdown("---")
# Create a container for the Representation Summaries
with st.expander("**Representation Summaries**"):
report_pdf_path = Paths.SUMMARY / f"Summaries_of_Public_Submissions-{rep}.pdf"
report_docx_path = Paths.SUMMARY / f"Summaries_of_Public_Submissions-{rep}.docx"
with st.container():
st.subheader(f"Representation Summary for {rep}")
col1, col2 = st.columns(2)
with col1:
with open(report_pdf_path, "rb") as pdf_file:
st.download_button(
label="Download PDF Version",
data=pdf_file,
file_name=f"Summaries_of_Public_Submissions-{rep}.pdf",
mime="application/pdf",
use_container_width=True,
key=f"rep_pdf_{hash(rep)}",
)
with col2:
with open(report_docx_path, "rb") as docx_file:
st.download_button(
label="Download DOCX Version",
data=docx_file,
file_name=f"Summaries_of_Public_Submissions-{rep}.docx",
mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
use_container_width=True,
key=f"rep_docx_{hash(rep)}",
)
st.markdown("---")
def reset_session():
st.session_state["chapters"] = False
st.session_state["files_extracted"] = False
st.session_state["completed"] = False
st.session_state["start_time"] = None
st.session_state["end_time"] = None
def main():
"""Main function to run the Streamlit app."""
authenticator.login()
initialize_session_state()
# Handle authentication states
if st.session_state["authentication_status"] is False:
st.error("Username/password is incorrect")
elif st.session_state["authentication_status"] is None:
st.warning("Please enter your username and password")
# Reset session if not authenticated
if not st.session_state["authentication_status"]:
reset_session()
return
if st.session_state["authentication_status"]:
authenticator.logout() # show logout button
# Step 1: Specify chapters
if not st.session_state["chapters"]:
specify_chapters()
# Step 2: Upload and extract files
if not st.session_state["files_extracted"] and st.session_state["chapters"]:
upload_and_extract_files()
# Step 3: Build report if files are ready
if st.session_state["files_extracted"]:
build_report()
# Step 4: Show download buttons when complete
with open(Paths.RAW / "title.txt", "r") as f:
rep = f.read().strip()
if st.session_state["completed"]:
display_download_buttons(rep)
if __name__ == "__main__":
main()