|
import os |
|
import streamlit as st |
|
from openai import OpenAI |
|
from PyPDF2 import PdfReader |
|
import requests |
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
from urllib.parse import urlparse, parse_qs |
|
from pinecone import Pinecone |
|
import uuid |
|
from dotenv import load_dotenv |
|
import time |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from bs4 import BeautifulSoup |
|
from selenium import webdriver |
|
from selenium.webdriver.chrome.service import Service |
|
from webdriver_manager.chrome import ChromeDriverManager |
|
from selenium.webdriver.chrome.options import Options |
|
import time |
|
import re |
|
from pymongo import MongoClient |
|
from pymongo.errors import ConnectionFailure |
|
from datetime import datetime |
|
|
|
|
|
st.set_page_config(layout="wide") |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
|
|
|
|
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) |
|
|
|
index_name = "lyca" |
|
index = pc.Index(index_name) |
|
|
|
|
|
mongo_uri = os.getenv("MONGODB_URI") |
|
if not mongo_uri: |
|
st.error("MONGO_URI is not set. Please check your .env file.") |
|
else: |
|
print(f"MONGO_URI loaded: {mongo_uri[:10]}...") |
|
|
|
|
|
try: |
|
client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000) |
|
client.server_info() |
|
db = client['lyca'] |
|
sim_swap_collection = db['sim_swap_requests'] |
|
except ConnectionFailure: |
|
st.error("Failed to connect to MongoDB. Please check your connection and try again later.") |
|
sim_swap_collection = None |
|
|
|
def get_embedding(text): |
|
response = client.embeddings.create(input=text, model="text-embedding-3-large") |
|
return response.data[0].embedding |
|
|
|
def process_pdf(file): |
|
reader = PdfReader(file) |
|
text = "" |
|
for page in reader.pages: |
|
text += page.extract_text() + "\n" |
|
return text |
|
|
|
def process_web_link(url): |
|
try: |
|
|
|
chrome_options = Options() |
|
chrome_options.add_argument("--headless") |
|
chrome_options.add_argument("--no-sandbox") |
|
chrome_options.add_argument("--disable-dev-shm-usage") |
|
|
|
|
|
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) |
|
|
|
|
|
driver.get(url) |
|
|
|
|
|
time.sleep(3) |
|
|
|
|
|
page_source = driver.page_source |
|
|
|
|
|
driver.quit() |
|
|
|
|
|
soup = BeautifulSoup(page_source, 'lxml') |
|
|
|
|
|
for script in soup(["script", "style"]): |
|
script.decompose() |
|
|
|
|
|
text = soup.get_text() |
|
|
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = '\n'.join(chunk for chunk in chunks if chunk) |
|
|
|
return text |
|
except Exception as e: |
|
print(f"Error processing web link {url}: {str(e)}") |
|
return f"Error processing {url}: {str(e)}" |
|
|
|
def process_youtube_link(url): |
|
video_id = extract_video_id(url) |
|
transcript = YouTubeTranscriptApi.get_transcript(video_id) |
|
return " ".join([entry['text'] for entry in transcript]) |
|
|
|
def extract_video_id(url): |
|
parsed_url = urlparse(url) |
|
if parsed_url.hostname == 'youtu.be': |
|
return parsed_url.path[1:] |
|
if parsed_url.hostname in ('www.youtube.com', 'youtube.com'): |
|
if parsed_url.path == '/watch': |
|
return parse_qs(parsed_url.query)['v'][0] |
|
if parsed_url.path[:7] == '/embed/': |
|
return parsed_url.path.split('/')[2] |
|
if parsed_url.path[:3] == '/v/': |
|
return parsed_url.path.split('/')[2] |
|
return None |
|
|
|
def process_upload(upload_type, file_or_link, file_name=None): |
|
print(f"Starting process_upload for {upload_type}") |
|
doc_id = str(uuid.uuid4()) |
|
print(f"Generated doc_id: {doc_id}") |
|
|
|
if upload_type == "PDF": |
|
content = process_pdf(file_or_link) |
|
doc_name = file_name or "Uploaded PDF" |
|
elif upload_type == "Web Link": |
|
content = process_web_link(file_or_link) |
|
doc_name = file_or_link |
|
elif upload_type == "YouTube Link": |
|
content = process_youtube_link(file_or_link) |
|
doc_name = f"YouTube: {file_or_link}" |
|
else: |
|
print("Invalid upload type") |
|
return "Invalid upload type" |
|
|
|
content_length = len(content) |
|
print(f"Content extracted, length: {content_length}") |
|
|
|
|
|
if content_length < 10000: |
|
chunk_size = 1000 |
|
elif content_length < 100000: |
|
chunk_size = 2000 |
|
else: |
|
chunk_size = 4000 |
|
print(f"Using chunk size: {chunk_size}") |
|
|
|
chunks = [content[i:i+chunk_size] for i in range(0, content_length, chunk_size)] |
|
|
|
vectors = [] |
|
with ThreadPoolExecutor() as executor: |
|
futures = [executor.submit(process_chunk, chunk, doc_id, i, upload_type, doc_name) for i, chunk in enumerate(chunks)] |
|
|
|
for future in as_completed(futures): |
|
vectors.append(future.result()) |
|
|
|
progress = len(vectors) / len(chunks) |
|
st.session_state.upload_progress.progress(progress) |
|
|
|
print(f"Generated {len(vectors)} vectors") |
|
|
|
index.upsert(vectors=vectors) |
|
print("Vectors upserted to Pinecone") |
|
|
|
return f"Processing complete for {upload_type}. Document Name: {doc_name}" |
|
|
|
def process_chunk(chunk, doc_id, i, upload_type, doc_name): |
|
embedding = get_embedding(chunk) |
|
return (f"{doc_id}_{i}", embedding, { |
|
"text": chunk, |
|
"type": upload_type, |
|
"doc_id": doc_id, |
|
"doc_name": doc_name, |
|
"chunk_index": i |
|
}) |
|
|
|
def get_relevant_context(query, top_k=5): |
|
print(f"Getting relevant context for query: {query}") |
|
query_embedding = get_embedding(query) |
|
|
|
search_results = index.query(vector=query_embedding, top_k=top_k, include_metadata=True) |
|
print(f"Found {len(search_results['matches'])} relevant results") |
|
|
|
|
|
sorted_results = sorted(search_results['matches'], key=lambda x: (x['metadata']['doc_id'], x['metadata']['chunk_index'])) |
|
|
|
context = "\n".join([result['metadata']['text'] for result in sorted_results]) |
|
return context, sorted_results |
|
|
|
def check_lyca_data_loaded(): |
|
|
|
stats = index.describe_index_stats() |
|
return stats['total_vector_count'] > 0 |
|
|
|
def load_lyca_mobile_data(): |
|
if check_lyca_data_loaded(): |
|
return "Lyca Mobile data is already loaded." |
|
|
|
lyca_links = [line.strip() for line in open('links.txt', 'r')] |
|
for link in lyca_links: |
|
process_upload("Web Link", link) |
|
return "Lyca Mobile data loaded into vector database" |
|
|
|
def general_conversation(message): |
|
response = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[ |
|
{"role": "system", "content": "You are a helpful assistant for Lyca Mobile customers. If you don't know the answer, politely say so."}, |
|
{"role": "user", "content": message} |
|
] |
|
) |
|
return response.choices[0].message.content |
|
|
|
def is_sim_swap_request(message): |
|
sim_swap_keywords = {'sim', 'swap', 'change', 'new', 'replace'} |
|
|
|
message = message.rstrip('?') |
|
message_words = set(message.lower().split()) |
|
return len(sim_swap_keywords.intersection(message_words)) >= 2 |
|
|
|
|
|
print(f"is_sim_swap_request result: {is_sim_swap_request('how to change my sim?')}") |
|
|
|
def trigger_sim_swap_workflow(): |
|
st.session_state.workflow = 'sim_swap' |
|
st.session_state.workflow_step = 0 |
|
|
|
def process_sim_swap_workflow(): |
|
st.subheader("SIM Swap Request Form") |
|
|
|
with st.form("sim_swap_form"): |
|
full_name = st.text_input("Please enter your full name:") |
|
phone_number = st.text_input("Please enter your phone number:") |
|
email = st.text_input("Please enter your email address:") |
|
current_sim = st.text_input("Please enter your current SIM card number:") |
|
reason = st.text_area("Please enter the reason for SIM swap:") |
|
|
|
submitted = st.form_submit_button("Submit") |
|
|
|
if submitted: |
|
if sim_swap_collection is None: |
|
st.error("Unable to process your request due to a database connection issue. Please try again later.") |
|
else: |
|
user_data = { |
|
"full_name": full_name, |
|
"phone_number": phone_number, |
|
"email": email, |
|
"current_sim": current_sim, |
|
"reason": reason, |
|
"timestamp": datetime.now() |
|
} |
|
|
|
try: |
|
sim_swap_collection.insert_one(user_data) |
|
st.success("Thank you for providing your information. Your SIM swap request has been submitted and stored successfully.") |
|
st.session_state.workflow = None |
|
except Exception as e: |
|
st.error(f"An error occurred while storing your information: {str(e)}") |
|
st.warning("Please try submitting your request again. If the problem persists, please contact support.") |
|
|
|
def chat_with_ai(message): |
|
try: |
|
query_embedding = get_embedding(message) |
|
context, results = get_relevant_context(message) |
|
|
|
if results and results[0]['score'] >= 0.4: |
|
messages = [ |
|
{"role": "system", "content": "You are a helpful assistant for Lyca Mobile. Use the following information to answer the user's question, but don't mention the context directly in your response. If the information isn't in the context, say you don't know."}, |
|
{"role": "system", "content": f"Context: {context}"}, |
|
{"role": "user", "content": message} |
|
] |
|
|
|
response = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=messages |
|
) |
|
|
|
ai_response = response.choices[0].message.content |
|
|
|
sources = [ |
|
{ |
|
"doc_id": result['metadata']['doc_id'], |
|
"doc_name": result['metadata']['doc_name'], |
|
"chunk_index": result['metadata']['chunk_index'], |
|
"text": result['metadata']['text'], |
|
"type": result['metadata']['type'], |
|
"score": result['score'] |
|
} |
|
for result in results |
|
] |
|
else: |
|
|
|
ai_response = general_conversation(message) |
|
sources = [] |
|
|
|
return ai_response, sources |
|
except Exception as e: |
|
print(f"Error in chat_with_ai: {str(e)}") |
|
return "I'm sorry, but I encountered an error while processing your request. Please try again later.", [] |
|
|
|
def clear_database(): |
|
print("Clearing database...") |
|
index.delete(delete_all=True) |
|
print("Database cleared") |
|
return "Database cleared successfully." |
|
|
|
|
|
st.title("Lyca Mobile Assistant") |
|
|
|
if 'workflow' not in st.session_state: |
|
st.session_state.workflow = None |
|
st.session_state.workflow_data = [] |
|
st.session_state.workflow_step = 0 |
|
|
|
if 'chat_history' not in st.session_state: |
|
st.session_state.chat_history = [] |
|
|
|
|
|
col1, col2 = st.columns([2, 1]) |
|
|
|
with col1: |
|
st.header("Chat") |
|
|
|
if st.session_state.workflow == 'sim_swap': |
|
process_sim_swap_workflow() |
|
else: |
|
|
|
for message in st.session_state.chat_history: |
|
st.markdown(f"**{'You' if message['role'] == 'user' else 'AI'}:** {message['content']}") |
|
|
|
user_input = st.text_input("How can I assist you with Lyca Mobile today?") |
|
if st.button("Send"): |
|
if user_input: |
|
|
|
print(f"User input: {user_input}") |
|
is_swap_request = is_sim_swap_request(user_input) |
|
print(f"Is sim swap request: {is_swap_request}") |
|
|
|
if is_swap_request: |
|
print("Triggering SIM swap workflow") |
|
st.session_state.chat_history.append({"role": "user", "content": user_input}) |
|
st.session_state.chat_history.append({"role": "assistant", "content": "Certainly! I can help you with changing your SIM. Please fill out the following form to start the SIM swap process."}) |
|
st.session_state.workflow = 'sim_swap' |
|
else: |
|
print("Proceeding with regular chat flow") |
|
|
|
st.session_state.chat_progress = st.progress(0) |
|
response, sources = chat_with_ai(user_input) |
|
st.session_state.chat_progress.progress(1.0) |
|
|
|
|
|
st.session_state.chat_history.append({"role": "user", "content": user_input}) |
|
st.session_state.chat_history.append({"role": "assistant", "content": response}) |
|
|
|
|
|
st.markdown("**You:** " + user_input) |
|
st.markdown("**AI:** " + response) |
|
|
|
|
|
st.session_state.sources = sources |
|
st.session_state.chat_progress.empty() |
|
else: |
|
st.warning("Please enter a question.") |
|
|
|
with col2: |
|
st.header("Source Information") |
|
if 'sources' in st.session_state and st.session_state.sources: |
|
for i, source in enumerate(st.session_state.sources, 1): |
|
with st.expander(f"Source {i} - {source['type']} ({source['doc_name']})"): |
|
st.markdown(f"**Chunk Index:** {source['chunk_index']}") |
|
st.text(source['text']) |
|
else: |
|
st.info("Ask a question to see source information here.") |