Spaces:
Sleeping
Sleeping
import os | |
from openai import OpenAI | |
from pinecone import Pinecone, ServerlessSpec | |
import uuid | |
from dotenv import load_dotenv | |
from bs4 import BeautifulSoup | |
import requests | |
import time | |
import argparse | |
from playwright.sync_api import sync_playwright | |
load_dotenv() | |
# Set up OpenAI client | |
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
# Set up Pinecone | |
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) | |
index_name = "lyca" # Your index name | |
def ensure_index_exists(): | |
try: | |
index = pc.Index(index_name) | |
print(f"Index '{index_name}' already exists.") | |
except Exception as e: | |
print(f"Index '{index_name}' does not exist. Creating it now...") | |
pc.create_index( | |
name=index_name, | |
dimension=3072, # Dimension for text-embedding-3-large | |
metric="cosine", | |
spec=ServerlessSpec( | |
cloud="aws", | |
region="us-west-2" | |
) | |
) | |
print(f"Index '{index_name}' created successfully.") | |
return pc.Index(index_name) | |
def get_embedding(text): | |
response = client.embeddings.create(input=text, model="text-embedding-3-large") | |
return response.data[0].embedding | |
def process_web_link(url): | |
try: | |
with sync_playwright() as p: | |
browser = p.chromium.launch(headless=True) | |
page = browser.new_page() | |
page.goto(url) | |
# Wait for the content to load | |
time.sleep(5) # Adjust this value if needed | |
# Get the full page content | |
content = page.content() | |
browser.close() | |
# Parse the page content using BeautifulSoup | |
soup = BeautifulSoup(content, 'lxml') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Get text | |
text = soup.get_text() | |
# Clean up the text | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = '\n'.join(chunk for chunk in chunks if chunk) | |
return text | |
except Exception as e: | |
print(f"Error processing web link {url}: {str(e)}") | |
return f"Error processing {url}: {str(e)}" | |
def process_and_upsert_link(url, index): | |
print(f"Processing {url}") | |
content = process_web_link(url) | |
doc_id = str(uuid.uuid4()) | |
content = content[:5000] | |
content_length = len(content) | |
print(f"Content extracted, length: {content_length}") | |
embedding = get_embedding(content) | |
vector = (doc_id, embedding, { | |
"text": content, | |
"type": "Web Link", | |
"doc_id": doc_id, | |
"doc_name": url, | |
"chunk_index": 0 | |
}) | |
print(f"Generated vector for {url}") | |
index.upsert(vectors=[vector]) | |
print(f"Vector upserted to Pinecone for {url}") | |
def clean_database(index): | |
try: | |
print("Cleaning the database...") | |
index.delete(delete_all=True) | |
print("Database cleaned.") | |
except Exception as e: | |
print(f"Error cleaning database: {str(e)}") | |
print("Continuing with the script...") | |
def main(): | |
parser = argparse.ArgumentParser(description="Process web links and upsert to Pinecone.") | |
parser.add_argument("--clean", action="store_true", help="Clean the database before upserting") | |
args = parser.parse_args() | |
index = ensure_index_exists() | |
if args.clean: | |
clean_database(index) | |
with open('links.txt', 'r') as file: | |
links = [line.strip() for line in file if line.strip()] | |
for link in links: | |
process_and_upsert_link(link, index) | |
if __name__ == "__main__": | |
main() |