Spaces:
Sleeping
Sleeping
import base64 | |
import re | |
import time | |
from os import getenv | |
import py7zr | |
import requests | |
import streamlit as st | |
import streamlit_authenticator as stauth | |
from bs4 import BeautifulSoup | |
from planning_ai.common.utils import Paths | |
from planning_ai.main import main as report_main | |
from planning_ai.preprocessing.azure_doc import azure_process_pdfs | |
from planning_ai.preprocessing.gcpt3 import main as preprocess_main | |
st.set_page_config(layout="wide") | |
st.markdown( | |
""" | |
<style> | |
/* Hide Streamlit's default header, footer */ | |
header {visibility: hidden;} | |
footer {visibility: hidden;} | |
/* Custom footer with logo */ | |
.custom-footer { | |
position: fixed; | |
bottom: 10px; | |
right: 10px; | |
z-index: 100; | |
} | |
/* Styling the top bar */ | |
.top-bar { | |
background-color: #0A3D91; | |
color: white; | |
padding: 15px; | |
font-size: 32px; | |
font-weight: bold; | |
position: fixed; | |
top: 0; | |
left: 0; | |
width: 100%; | |
z-index: 1000; | |
display: flex; | |
justify-content: space-between; | |
align-items: center; | |
} | |
/* Contact button styling to match the other buttons */ | |
.top-bar .contact-button { | |
background-color: #0A3D91; | |
border-radius: 0px; | |
color: white; | |
border: none; | |
padding: 10px 20px; | |
font-size: 16px; | |
cursor: pointer; | |
text-decoration: none; /* Remove underline */ | |
} | |
.top-bar .contact-button:hover { | |
background-color: #045D8C; | |
} | |
/* Footer image */ | |
.footer img { | |
height: 40px; | |
width: auto; | |
} | |
.stButton > button { | |
background-color: #0A3D91; | |
border-radius: 0px; | |
color: white; | |
border: none; | |
} | |
.stButton > button:hover { | |
background-color: #045D8C; | |
} | |
</style> | |
""", | |
unsafe_allow_html=True, | |
) | |
# Encode the image to base64 | |
def get_image_base64(path): | |
with open(path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode("utf-8") | |
logo_base64 = get_image_base64("logo.png") | |
# Add the logo in the footer using base64 | |
st.markdown( | |
f""" | |
<div class="custom-footer"> | |
<img src="data:image/png;base64,{logo_base64}" width="200"> | |
</div> | |
""", | |
unsafe_allow_html=True, | |
) | |
# Top bar content | |
st.markdown( | |
""" | |
<div class="top-bar"> | |
<div class="title">Planning AI</div> | |
<a href="mailto:[email protected]" class="contact-button">Contact</a> | |
</div> | |
""", | |
unsafe_allow_html=True, | |
) | |
# Load authentication secrets | |
auth = st.secrets.to_dict() | |
if "credentials" not in auth: | |
auth["credentials"] = { | |
"usernames": { | |
"admin": { | |
"email": getenv("EMAIL"), | |
"password": getenv("PASSWORD"), | |
"first_name": "Admin", | |
"last_name": "Admin", | |
"logged_in": False, | |
"roles": ["viewer"], | |
} | |
} | |
} | |
auth["cookie"] = { | |
"name": "some_cookie_name", | |
"key": "some_signature_key", | |
"expiry_days": 30, | |
} | |
# Initialize the authenticator | |
authenticator = stauth.Authenticate( | |
auth["credentials"], | |
auth["cookie"]["name"], | |
auth["cookie"]["key"], | |
auth["cookie"]["expiry_days"], | |
) | |
UPLOAD_DIR = Paths.RAW / "gcpt3" | |
def initialize_session_state(): | |
"""Initialize session state variables.""" | |
if "chapters" not in st.session_state: | |
st.session_state["chapters"] = False | |
if "files_extracted" not in st.session_state: | |
st.session_state["files_extracted"] = False | |
if "completed" not in st.session_state: | |
st.session_state["completed"] = False | |
if "start_time" not in st.session_state: | |
st.session_state["start_time"] = None | |
if "end_time" not in st.session_state: | |
st.session_state["end_time"] = None | |
def get_chapters(consultation_url: str): | |
if not consultation_url: | |
return "None", ["None"] | |
try: | |
response = requests.get(consultation_url) | |
except requests.exceptions.RequestException: | |
st.error("Use a valid URL.") | |
return "", [] | |
if not response.ok: | |
st.error("Failed to fetch consultation document") | |
return "", [] | |
soup = BeautifulSoup(response.text, "html.parser") | |
h2_tags = soup.find_all("h2") | |
if not len(h2_tags) >= 2: | |
st.error("Invalid page format - not enough <h2> headers") | |
return "", [] | |
first_h2 = h2_tags[0] | |
second_h2 = h2_tags[1] | |
# Collect links between the first and second <h2> | |
links_between = [] | |
for sibling in first_h2.find_all_next(): | |
if sibling == second_h2: # Stop when reaching the second <h2> | |
break | |
if sibling.name == "a": # If it's a link | |
link_text = sibling.text.strip() | |
if link_text: | |
links_between.append(link_text) | |
cleaned_links = [re.sub(r"\s*\(.*?\)$", "", link) for link in links_between] | |
cleaned_title = first_h2.text.strip() | |
return cleaned_title, cleaned_links | |
def specify_chapters(): | |
st.title("Specify Chapters") | |
st.write( | |
"Please specify the Consultation Document URL from the Consultation Hub. This will autopopulate the chapter headings for the final document. \n\n**Please ensure that the final chapter headings are correct.**" | |
) | |
chapters = [] | |
consultation_url = st.text_input( | |
"Consultation Document URL", | |
key="consultation_url", | |
placeholder="https://oc2.greatercambridgeplanning.org/document/1314", | |
) | |
title, chapters = get_chapters(consultation_url) | |
st.write(f"**Title:** {title}") | |
st.write("**Chapters:**", "\n- " + "\n- ".join(chapters)) | |
st.write( | |
"**If the chapter headings are incorrect, please add them manually below, separated by commas.**" | |
) | |
chapters = st.text_input( | |
"Chapter Headings", | |
key="chapter_headings", | |
placeholder=", ".join(chapters), | |
value=", ".join(chapters), | |
) | |
chapters = [chapter.strip() for chapter in chapters.split(",")] | |
with open(Paths.RAW / "chapters.txt", "w") as f: | |
f.write("\n".join(chapters)) | |
with open(Paths.RAW / "title.txt", "w") as f: | |
f.write(title) | |
st.button( | |
"Save Chapters", on_click=lambda: st.session_state.update({"chapters": True}) | |
) | |
def upload_and_extract_files(): | |
"""Handle file upload and extraction.""" | |
main1, main2 = st.columns(2) | |
with main1: | |
st.title("Introduction") | |
st.write( | |
""" | |
This program allows you to process JDi `.json` files automatically, to extract detailed information using AI, and produce comprehensive reports. For each _'representation document'_ two AI generated documents are produced. | |
1. **Representation Summary** documents contain automatically generated summaries of each representation, these representations are numbered sequentially, and by their unique ID. | |
2. **Executive Report** documents contain first an executive summary of the key points extracted from response documents, following this, a **Profile of Submissions** plots the demographic and geographic distribution of responses. Finally this document details **Themes and Policies**, where key themes and policies by response are highlighted, with notable information from responses bullet-pointed. This document contains inline citations, which relate back to the numbers associated with responses in the **Representation Summary Documents**. Citations are included to allow readers to manually verify the claims and points made by the AI model. | |
""" | |
) | |
with main2: | |
st.title("Upload JDi files") | |
st.write( | |
""" | |
1. Upload your `.json` files here as a `7zip` file. | |
2. Please ensure that the `.json` files follow the correct format:""" | |
) | |
with st.expander("**File Format example**"): | |
st.write( | |
r""" | |
```json | |
{ | |
"id": 10008, | |
"method": "Paper", | |
"respondentpostcode": "CB2 9NE", | |
"text": "", | |
"attachments": [ | |
{ | |
"id": 3803, | |
"url": "http://www.cambridge.gov.uk/public/ldf/localplan2031/15417.pdf", | |
"published": false | |
} | |
], | |
"representations": [ | |
{ | |
"id": 15417, | |
"support/object": "Object", | |
"document": "Issues and Options Report", | |
"documentelementid": 29785, | |
"documentelementtitle": "3 - Spatial Strategy, Question 3.10", | |
"summary": "No more green belt taken away, which is prime agricultural land. Noise pollution & light pollution for surrounding villages and new houses being built, no bus services either!" | |
}, | |
] | |
} | |
``` | |
""" | |
) | |
if uploaded_file := st.file_uploader("Choose a `.7z` file:", type="7z"): | |
with st.spinner("Extracting files...", show_time=True): | |
try: | |
# Remove old files | |
_ = [file.unlink() for file in UPLOAD_DIR.glob("*.json")] | |
# Extract new files | |
with py7zr.SevenZipFile(uploaded_file, mode="r") as archive: | |
archive.extractall(path=UPLOAD_DIR) | |
st.session_state["files_extracted"] = True | |
st.success( | |
f"Extracted `{len(list(UPLOAD_DIR.glob('*.json')))}` files." | |
) | |
except Exception as e: | |
st.error(f"Failed to extract files {e}") | |
def build_report(): | |
"""Build the report from extracted files.""" | |
# Remove old files | |
_ = [file.unlink() for file in (Paths.OUT / "summaries").rglob("*.pdf")] | |
st.title("Build Report") | |
st.write( | |
"Once the files are extracted, click the button below to build the report.\n\n" | |
"Do **not** close this page while the report is being built." | |
) | |
if not st.session_state["start_time"]: | |
if st.button("Build Report", type="primary"): | |
st.session_state["start_time"] = time.time() | |
with st.spinner("Preprocessing files...", show_time=True): | |
try: | |
preprocess_main() | |
time_taken = time.time() - st.session_state["start_time"] | |
st.success( | |
f"Preprocessing completed successfully in {time_taken:.1f} seconds!" | |
) | |
except Exception as e: | |
st.error(f"An error occurred during preprocessing: {e}") | |
with st.spinner("Extracting text from PDFs...", show_time=True): | |
try: | |
azure_process_pdfs() | |
time_taken = time.time() - st.session_state["start_time"] | |
st.success( | |
f"Text extraction completed successfully in {time_taken:.1f} seconds!" | |
) | |
except Exception as e: | |
st.error(f"An error occurred during PDF text extraction: {e}") | |
with st.spinner("Building report...", show_time=True): | |
report_main() | |
st.session_state["end_time"] = time.time() | |
st.session_state["completed"] = True | |
total_time = ( | |
st.session_state["end_time"] - st.session_state["start_time"] | |
) | |
st.success(f"Report building completed in {total_time:.1f} seconds!") | |
def display_download_buttons(rep): | |
"""Display download buttons for the generated reports.""" | |
# remove some old intermediate files | |
_ = [file.unlink() for file in (Paths.STAGING / "pdfs_azure").glob("*.pdf")] | |
with open((Paths.RAW / "failed_downloads.txt"), "w") as f: | |
f.write("") | |
st.success("Reports built successfully! Please click download buttons below.") | |
st.write("---") | |
st.header("Download Reports") | |
st.markdown( | |
""" | |
The following download buttons provides links to the final report, | |
alongside summaries of the representations used to built this report. | |
""" | |
) | |
# Add some spacing and better organization | |
st.markdown("---") | |
# Create a container for the Executive Reports | |
with st.expander("**Executive Reports**"): | |
summaries_pdf_path = Paths.SUMMARY / f"Overview_of_Public_Submissions-{rep}.pdf" | |
summaries_docx_path = ( | |
Paths.SUMMARY / f"Overview_of_Public_Submissions-{rep}.docx" | |
) | |
with st.container(): | |
st.subheader(f"Executive Report for {rep}") | |
col1, col2 = st.columns(2) | |
with col1: | |
with open(summaries_pdf_path, "rb") as pdf_file: | |
st.download_button( | |
label="Download PDF Version", | |
data=pdf_file, | |
file_name=f"Overview_of_Public_Submissions-{rep}.pdf", | |
mime="application/pdf", | |
use_container_width=True, | |
key=f"exec_pdf_{hash(rep)}", | |
) | |
with col2: | |
with open(summaries_docx_path, "rb") as docx_file: | |
st.download_button( | |
label="Download DOCX Version", | |
data=docx_file, | |
file_name=f"Overview_of_Public_Submissions-{rep}.docx", | |
mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
use_container_width=True, | |
key=f"exec_docx_{hash(rep)}", | |
) | |
st.markdown("---") | |
# Create a container for the Representation Summaries | |
with st.expander("**Representation Summaries**"): | |
report_pdf_path = Paths.SUMMARY / f"Summaries_of_Public_Submissions-{rep}.pdf" | |
report_docx_path = Paths.SUMMARY / f"Summaries_of_Public_Submissions-{rep}.docx" | |
with st.container(): | |
st.subheader(f"Representation Summary for {rep}") | |
col1, col2 = st.columns(2) | |
with col1: | |
with open(report_pdf_path, "rb") as pdf_file: | |
st.download_button( | |
label="Download PDF Version", | |
data=pdf_file, | |
file_name=f"Summaries_of_Public_Submissions-{rep}.pdf", | |
mime="application/pdf", | |
use_container_width=True, | |
key=f"rep_pdf_{hash(rep)}", | |
) | |
with col2: | |
with open(report_docx_path, "rb") as docx_file: | |
st.download_button( | |
label="Download DOCX Version", | |
data=docx_file, | |
file_name=f"Summaries_of_Public_Submissions-{rep}.docx", | |
mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
use_container_width=True, | |
key=f"rep_docx_{hash(rep)}", | |
) | |
st.markdown("---") | |
def reset_session(): | |
st.session_state["chapters"] = False | |
st.session_state["files_extracted"] = False | |
st.session_state["completed"] = False | |
st.session_state["start_time"] = None | |
st.session_state["end_time"] = None | |
def main(): | |
"""Main function to run the Streamlit app.""" | |
authenticator.login() | |
initialize_session_state() | |
# Handle authentication states | |
if st.session_state["authentication_status"] is False: | |
st.error("Username/password is incorrect") | |
elif st.session_state["authentication_status"] is None: | |
st.warning("Please enter your username and password") | |
# Reset session if not authenticated | |
if not st.session_state["authentication_status"]: | |
reset_session() | |
return | |
if st.session_state["authentication_status"]: | |
authenticator.logout() # show logout button | |
# Step 1: Specify chapters | |
if not st.session_state["chapters"]: | |
specify_chapters() | |
# Step 2: Upload and extract files | |
if not st.session_state["files_extracted"] and st.session_state["chapters"]: | |
upload_and_extract_files() | |
# Step 3: Build report if files are ready | |
if st.session_state["files_extracted"]: | |
build_report() | |
# Step 4: Show download buttons when complete | |
with open(Paths.RAW / "title.txt", "r") as f: | |
rep = f.read().strip() | |
if st.session_state["completed"]: | |
display_download_buttons(rep) | |
if __name__ == "__main__": | |
main() | |