import streamlit as st import requests import os import sqlite3 import time import uuid import datetime import hashlib import json import pandas as pd # FastAPI base URL #BASE_URL = "http://localhost:8000" import os API_URL=os.getenv("API_URL") API_TOKEN=os.getenv("API_TOKEN") BASE_URL=API_URL #API_URL = "https://api-inference.huggingface.co/models/your-username/your-private-model" headers = {"Authorization":f"Bearer {API_TOKEN}"} def query(payload): response = requests.post(API_URL, headers=headers, json=payload) return response.json() #data = query({"inputs": "Hello, how are you?"}) #print(data) st.title("Generative AI Demos") def generate_unique_hash(filename: str, uuid: str) -> str: # Generate a UUID for the session or device device_uuid = uuid # Get the current date and time current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Combine filename, current time, and UUID into a single string combined_string = f"{filename}-{current_time}-{device_uuid}" # Generate a hash using SHA256 unique_hash = hashlib.sha256(combined_string.encode()).hexdigest() return unique_hash # Function to generate or retrieve a UUID from local storage uuid_script = """ """ ga_script = """ """ # Add Google Analytics to the Streamlit app st.components.v1.html(ga_script, height=0, width=0) # Execute the JavaScript in the Streamlit app st.components.v1.html(uuid_script, height=0, width=0) # Store and display UUID in a non-editable text field using session state if 'uuid' not in st.session_state: st.session_state['uuid'] = str(uuid.uuid4()) uuid_from_js = st.session_state['uuid'] # Retrieve UUID from DOM if uuid_from_js is None: st.error("Unable to retrieve UUID from the browser.") else: # Display UUID in a non-editable text field st.text_input("Your UUID", value=uuid_from_js, disabled=True) # Define tabs tab1, tab2,tab3 = st.tabs(["Review Analyzer", "Presentation Creator","Semantic Search"]) with tab1: st.header("Review Analyzer") uploaded_file = st.file_uploader("Upload your reviews CSV file", type=["csv"],key=2) if uploaded_file is not None: en1 = generate_unique_hash(uploaded_file.name, uuid_from_js) files = {"file": (en1, uploaded_file.getvalue(), "text/csv")} st.info("Calling model inference. Please wait...") response = requests.post(f"{BASE_URL}/upload/", files=files, headers=headers) if response.status_code == 200: st.info("Processing started. Please wait...") # Poll for completion while True: status_response = requests.get(f"{BASE_URL}/status/{en1}", headers=headers) if status_response.status_code == 200 and (status_response.json()["status"] == "complete" or status_response.json()["status"] == "error"): if status_response.json()["status"] == "complete": st.success("File processed successfully.") download_response = requests.get(f"{BASE_URL}/download/{en1}", headers=headers) if download_response.status_code == 200: st.download_button( label="Download Processed File", data=download_response.content, file_name=f"processed_{en1}", mime="text/csv" ) break time.sleep(10) else: st.error("Failed to upload file for processing.") with tab2: st.header("Presentation Creator") # Input URL for presentation creation presentation_url = st.text_input("Enter the URL for the presentation content") if presentation_url: #unique_id = generate_unique_hash(presentation_url, str(uuid.uuid4())) st.info("Creating presentation. Please wait...") # Mock payload for processing the URL payload = {"url": presentation_url, "id": uuid_from_js} response = requests.post(f"{BASE_URL}/presentation_creator", json=payload, headers=headers) print("response",response) if response.status_code == 200: st.info("Processing started. Please wait...") unique_id=response.json()["filename"] # Poll for completion print("unique id is ",unique_id,BASE_URL) while True: status_response = requests.get(f"{BASE_URL}/status/{unique_id}", headers=headers) print("status_response",status_response) if status_response.status_code == 200 and (status_response.json()["status"] == "complete" or status_response.json()["status"] == "error"): if status_response.json()["status"] == "complete": st.success("Presentation created successfully.") download_response = requests.get(f"{BASE_URL}/download/{unique_id}", headers=headers) if download_response.status_code == 200: st.download_button( label="Download Presentation File", data=download_response.content, file_name=f"presentation_{unique_id}.pptx", mime="application/pdf" ) else: st.error("error in downloading the presentation file ") else: st.error("error in creating presentation") break time.sleep(10) else: st.error("Failed to create presentation.") with tab3: st.header("Semantic Search") # Create a form for the inputs and submit button with st.form(key='semantic_search_form'): # Input URL for presentation creation presentation_url = st.text_input("Enter the URL for the semantic search") search_query = st.text_input("Enter your query") # Submit button inside the form submit_button = st.form_submit_button(label="Submit") if submit_button: if presentation_url and search_query: #unique_id = generate_unique_hash(presentation_url, str(uuid.uuid4())) st.info("Performing semantic search. Please wait...") # Mock payload for processing the URL payload = {"url": presentation_url, "id": uuid_from_js,"search_query":search_query,"rerank": True} response = requests.post(f"{BASE_URL}/semantic_search", json=payload, headers=headers) print("response",response.json()) if response.status_code == 200: st.info("Processing started. Please wait...") unique_id=response.json()["filename"] # Poll for completion print("unique id is ",unique_id,BASE_URL) while True: status_response = requests.get(f"{BASE_URL}/status/{unique_id}", headers=headers) print("status_response",status_response.json()) if status_response.status_code == 200 and (status_response.json()["status"] == "complete" or status_response.json()["status"] == "error"): if status_response.json()["status"] == "complete": st.success("Presentation created successfully.") download_response = requests.get(f"{BASE_URL}/download/{unique_id}", headers=headers) if download_response.status_code == 200: #print("download_response",download_response.content) # Load JSON data into a Python list of dictionaries data = json.loads(download_response.content) # Convert the list of dictionaries to a DataFrame df = pd.DataFrame(data) df["page_content"]=df["page_content"].str.split('#####',n=1).str[1].str.strip() df = df[["page_content","similarity_score","reranking_score"]] # Display the DataFrame in Streamlit as an interactive dataframe # # Alternatively, display it as a static table st.dataframe(df) else: st.error("error in downloading the presentation file ") else: st.error("error in creating presentation") break time.sleep(2) else: st.error("Failed to create presentation.") else: st.error("Please enter both a URL and a query.") # uploaded_file = st.file_uploader("Upload your reviews CSV file", type=["csv"],key=1) # if uploaded_file is not None: # # Save uploaded file to FastAPI # en1 = generate_unique_hash(uploaded_file.name, uuid_from_js) # files = {"file": (en1, uploaded_file.getvalue(), "text/csv")} # st.info("Calling model inference. Please wait...") # response = requests.post(f"{BASE_URL}/upload/", files=files,headers=headers) # print("response to file upload is ",response) # if response.status_code == 200: # st.info("Processing started. Please wait...") # # Poll for completion # while True: # status_response = requests.get(f"{BASE_URL}/status/{en1}",headers=headers) # if status_response.status_code == 200 and (status_response.json()["status"] == "complete" or status_response.json()["status"]=="error"): # if status_response.json()["status"] == "complete": # st.success("File processed successfully.") # download_response = requests.get(f"{BASE_URL}/download/{en1}",headers=headers) # if download_response.status_code == 200: # st.download_button( # label="Download Processed File", # data=download_response.content, # file_name=f"processed_{en1}", # mime="text/csv" # ) # break # time.sleep(10) # else: # st.error("Failed to upload file for processing.")