Spaces:
Sleeping
Sleeping
import gradio as gr | |
from openai import OpenAI | |
import requests | |
import json | |
import httpx | |
import os | |
import logging | |
from fake_useragent import UserAgent | |
from typing import Optional, List, Dict, Tuple | |
from itertools import cycle | |
from datetime import datetime | |
from bs4 import BeautifulSoup | |
from googlesearch import search | |
from newsapi import NewsApiClient | |
import markdown | |
import re | |
import time | |
import random | |
from tenacity import retry, wait_exponential, stop_after_attempt | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
class RaindropSearchBot: | |
def __init__(self): | |
self.openai_api_key = os.getenv('openaikey') | |
self.raindrop_api_token = os.getenv('raindroptoken') | |
self.newsapi_key = os.getenv('newsapikey') | |
if not all([self.openai_api_key, self.raindrop_api_token, self.newsapi_key]): | |
raise EnvironmentError( | |
"Missing required environment variables. Please ensure all API keys are set." | |
) | |
# Updated OpenAI client initialization | |
self.client = OpenAI( | |
api_key=self.openai_api_key, | |
http_client=httpx.Client( | |
timeout=60.0, | |
follow_redirects=True | |
) | |
) | |
self.newsapi = NewsApiClient(api_key=self.newsapi_key) | |
self.min_delay = 5 # Increased minimum delay | |
self.max_delay = 15 # Increased maximum delay | |
self.ua = UserAgent() | |
self.setup_proxies() | |
def get_next_proxy(self) -> dict: | |
"""Get next proxy from the rotation""" | |
try: | |
proxy = next(self.proxy_cycle) | |
return { | |
'http': proxy, | |
'https': proxy | |
} | |
except StopIteration: | |
logger.warning("No proxies available, returning empty proxy dict") | |
return {} | |
def get_alternative_search_results(self, query: str) -> List[Dict]: | |
"""Implement alternative search engine if Google fails""" | |
try: | |
from duckduckgo_search import DDGS | |
self.random_delay() | |
with DDGS() as ddgs: | |
results = list(ddgs.text(query, max_results=5)) | |
return [{ | |
'title': result.get('title', ''), | |
'link': result.get('link', ''), | |
'snippet': result.get('body', '') | |
} for result in results] | |
except Exception as e: | |
logger.error(f"Alternative search failed: {e}") | |
return [] | |
def search_with_fallback(self, query: str) -> List[Dict]: | |
"""Search with fallback to alternative search engines""" | |
try: | |
return self.get_google_results(query) | |
except Exception as e: | |
logger.warning(f"Google search failed: {e}") | |
try: | |
# Implement alternative search engine here | |
# For example: DuckDuckGo, Bing, etc. | |
return self.get_alternative_search_results(query) | |
except Exception as e: | |
logger.error(f"All search attempts failed: {e}") | |
return [] | |
def setup_proxies(self): | |
"""Setup proxy rotation""" | |
# Free proxy list - replace with your paid proxy service for better reliability | |
self.proxies = [ | |
'http://proxy1.example.com:8080', | |
'http://proxy2.example.com:8080', | |
# Add more proxies here | |
] | |
self.proxy_cycle = cycle(self.proxies) | |
def random_delay(self): | |
"""Enhanced random delay with jitter""" | |
base_delay = random.uniform(self.min_delay, self.max_delay) | |
jitter = random.uniform(-1, 1) # Add/subtract up to 1 second | |
delay = max(0, base_delay + jitter) | |
time.sleep(delay) | |
def get_google_results(self, query: str, num_results: int = 5) -> List[Dict]: | |
"""Get Google search results with improved handling""" | |
try: | |
search_results = [] | |
session = self.create_session() | |
# Break the search into smaller chunks | |
chunk_size = 3 | |
for i in range(0, num_results, chunk_size): | |
# Add substantial random delay between chunks | |
self.random_delay() | |
try: | |
chunk_results = list(search( | |
query, | |
num_results=min(chunk_size, num_results - i), | |
advanced=True, | |
lang="en", | |
sleep_interval=random.uniform(5, 10), # Random delay between requests | |
timeout=30 | |
)) | |
for result in chunk_results: | |
search_results.append({ | |
'title': result.title, | |
'link': result.url, | |
'snippet': result.description | |
}) | |
# Add random delay between chunks | |
time.sleep(random.uniform(8, 15)) | |
except Exception as e: | |
logger.warning(f"Error in search chunk {i}: {e}") | |
continue | |
return search_results | |
except Exception as e: | |
logger.error(f"Google search error: {e}") | |
raise | |
def get_news_results(self, query: str, num_results: int = 5) -> List[Dict]: | |
"""Get news articles using NewsAPI with retry and delay.""" | |
try: | |
# Add random delay before making the request | |
self.random_delay() | |
news_results = self.newsapi.get_everything( | |
q=query, | |
language='en', | |
sort_by='relevancy', | |
page_size=num_results | |
) | |
return news_results.get('articles', []) | |
except Exception as e: | |
logger.error(f"News API error: {e}") | |
return [] | |
def extract_content_from_url(self, url: str) -> Optional[str]: | |
"""Extract main content from a URL using BeautifulSoup with retry and delay.""" | |
try: | |
# Add random delay before making the request | |
self.random_delay() | |
headers = { | |
'User-Agent': self.get_random_user_agent(), | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'DNT': '1', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Remove unwanted elements | |
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'iframe']): | |
element.decompose() | |
# Get title | |
title = soup.title.string if soup.title else '' | |
# Get main content | |
# First try common content containers | |
content_containers = soup.select('article, main, .content, .post-content, .entry-content') | |
if content_containers: | |
content = content_containers[0].get_text(separator='\n', strip=True) | |
else: | |
# Fallback to all paragraphs | |
paragraphs = soup.find_all('p') | |
content = '\n'.join(p.get_text(strip=True) for p in paragraphs) | |
# Combine and clean | |
full_content = f"{title}\n\n{content}" | |
# Clean up the text | |
full_content = re.sub(r'\n\s*\n', '\n\n', full_content) # Remove extra newlines | |
full_content = re.sub(r'\s+', ' ', full_content) # Normalize whitespace | |
return full_content if full_content.strip() else None | |
except Exception as e: | |
logger.error(f"Error extracting content from {url}: {e}") | |
return None | |
def get_random_user_agent(self) -> str: | |
"""Get random user agent using fake-useragent""" | |
return self.ua.random | |
def create_session(self) -> requests.Session: | |
"""Create a session with random user agent and proxy""" | |
session = requests.Session() | |
session.headers.update({ | |
'User-Agent': self.get_random_user_agent(), | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'DNT': '1', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
'Sec-Fetch-Dest': 'document', | |
'Sec-Fetch-Mode': 'navigate', | |
'Sec-Fetch-Site': 'none', | |
'Sec-Fetch-User': '?1', | |
'Cache-Control': 'max-age=0' | |
}) | |
session.proxies = self.get_next_proxy() | |
return session | |
def get_content_and_summary(self, request: str, item: Dict, source_type: str) -> Dict: | |
"""Get content and generate summary for a single item.""" | |
try: | |
# Get URL based on source type | |
url = item.get('link') or item.get('url') | |
if not url: | |
logger.warning(f"No URL found in item from {source_type}") | |
return item | |
# For Raindrop items, use existing excerpt if available | |
if source_type == 'raindrop' and item.get('excerpt'): | |
content = item['excerpt'] | |
else: | |
content = self.extract_content_from_url(url) | |
if not content: | |
logger.warning(f"No content extracted from {url}") | |
item['detailed_summary'] = "Content extraction failed." | |
return item | |
# Generate summary focused on the query topic | |
try: | |
prompt = f""" | |
Analyze this content and provide a detailed summary focusing on key points related to the user request: | |
{request} | |
Content: {content[:4000]} # Limit content length for token constraints | |
Requirements: | |
1. Focus on the most important facts and findings related to the topic | |
2. Include specific data points and quotes if relevant | |
3. Organize the information logically | |
4. Keep the summary to 2-3 paragraphs | |
5. Highlight any unique insights from this source | |
6. No need to add a conclusion | |
""" | |
response = self.client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.3, | |
max_tokens=300 | |
) | |
item['detailed_summary'] = response.choices[0].message.content | |
item['processed_content'] = content[:1000] # Store truncated content for later use | |
except Exception as e: | |
logger.error(f"Error generating summary: {e}") | |
item['detailed_summary'] = "Summary generation failed." | |
return item | |
except Exception as e: | |
logger.error(f"Error processing item from {source_type}: {e}") | |
return item | |
def search_raindrop(self, search_query: str) -> List[Dict]: | |
"""Search Raindrop.io with enhanced error handling and logging.""" | |
logger.info(f"Searching Raindrop with query: {search_query}") | |
headers = { | |
"Authorization": f"Bearer {self.raindrop_api_token}" | |
} | |
# Test API connection first | |
try: | |
test_response = requests.get( | |
"https://api.raindrop.io/rest/v1/user", | |
headers=headers | |
) | |
if test_response.status_code != 200: | |
logger.error(f"API test failed: {test_response.status_code}") | |
return [] | |
except Exception as e: | |
logger.error(f"API connection error: {e}") | |
return [] | |
# Perform search | |
try: | |
params = { | |
"search": search_query, | |
"perpage": 50, | |
"sort": "-created", | |
"page": 0 | |
} | |
response = requests.get( | |
"https://api.raindrop.io/rest/v1/raindrops/0", | |
headers=headers, | |
params=params | |
) | |
if response.status_code == 200: | |
data = response.json() | |
items = data.get("items", []) | |
logger.info(f"Found {len(items)} results") | |
return items | |
else: | |
logger.error(f"Search failed: {response.status_code}") | |
return [] | |
except Exception as e: | |
logger.error(f"Search error: {e}") | |
return [] | |
def process_all_results(self, request, raindrop_results: List[Dict], | |
google_results: List[Dict], | |
news_results: List[Dict]) -> Tuple[List[Dict], List[Dict], List[Dict]]: | |
"""Process and enrich all results with content and summaries.""" | |
processed_raindrop = [] | |
for item in raindrop_results: | |
processed_item = self.get_content_and_summary(request, item, 'raindrop') | |
if processed_item.get('detailed_summary'): | |
processed_raindrop.append(processed_item) | |
# Add delay between processing items | |
self.random_delay() | |
processed_google = [] | |
for item in google_results: | |
processed_item = self.get_content_and_summary(request, item, 'google') | |
if processed_item.get('detailed_summary'): | |
processed_google.append(processed_item) | |
# Add delay between processing items | |
self.random_delay() | |
processed_news = [] | |
for item in news_results: | |
processed_item = self.get_content_and_summary(request, item, 'news') | |
if processed_item.get('detailed_summary'): | |
processed_news.append(processed_item) | |
# Add delay between processing items | |
self.random_delay() | |
return processed_raindrop, processed_google, processed_news | |
def generate_essay_response(self, results: Tuple[List[Dict], List[Dict], List[Dict]], | |
user_query: str) -> str: | |
"""Generate a structured essay-style response with references.""" | |
raindrop_results, google_results, news_results = results | |
# Collect all content for analysis | |
all_content = "" | |
reference_map = {} | |
ref_counter = 1 | |
def get_url(item): | |
"""Helper function to get URL from item regardless of field name""" | |
if 'link' in item: | |
return item['link'] | |
elif 'url' in item: | |
return item['url'] | |
return None | |
# Process Raindrop results | |
for item in raindrop_results: | |
url = get_url(item) | |
if url and item.get('detailed_summary'): | |
all_content += f"\n{item['detailed_summary']}\n" | |
reference_map[url] = ref_counter | |
ref_counter += 1 | |
# Process Google results | |
for item in google_results: | |
url = get_url(item) | |
if url and item.get('detailed_summary'): | |
all_content += f"\n{item['detailed_summary']}\n" | |
reference_map[url] = ref_counter | |
ref_counter += 1 | |
# Process News results | |
for item in news_results: | |
url = get_url(item) | |
if url and item.get('detailed_summary'): | |
all_content += f"\n{item['detailed_summary']}\n" | |
reference_map[url] = ref_counter | |
ref_counter += 1 | |
try: | |
prompt = f""" | |
Create a comprehensive essay-style analysis about: {user_query} | |
Use this content as your reference source material: | |
{all_content} | |
Requirements: | |
1. Structure the response in clear sections with markdown headers | |
2. Include an introduction and conclusion | |
3. Use reference numbers [n] to cite sources | |
4. Make connections between different sources | |
5. Highlight key findings and trends | |
6. Address any contradictions or gaps | |
7. Use markdown formatting for better readability | |
Format the response as a proper academic essay with sections and sources. | |
""" | |
response = self.client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.5, | |
max_tokens=1500 | |
) | |
essay = response.choices[0].message.content | |
# Replace reference placeholders with actual reference numbers | |
for url, ref_num in reference_map.items(): | |
essay = essay.replace(f'[URL:{url}]', f'[{ref_num}]') | |
return essay | |
except Exception as e: | |
logger.error(f"Error generating essay: {e}") | |
return "Error generating analysis." | |
def format_results(self, results: Tuple[List[Dict], List[Dict], List[Dict]], | |
essay: str) -> str: | |
"""Format the essay and results with detailed summaries.""" | |
raindrop_results, google_results, news_results = results | |
output = f"{essay}\n\n" | |
output += "---\n\n" | |
output += "# References and Detailed Summaries\n\n" | |
ref_counter = 1 | |
# Format Raindrop results | |
if raindrop_results: | |
output += "## π Bookmarked Sources\n\n" | |
for item in raindrop_results: | |
output += f"### [{ref_counter}] {item.get('title', 'No Title')}\n" | |
output += f"**Link**: {item.get('link')}\n" | |
if item.get('tags'): | |
output += f"**Tags**: {', '.join(item['tags'])}\n" | |
if item.get('created'): | |
output += f"**Created**: {item['created'][:10]}\n" | |
output += "\n**Summary**:\n" | |
output += f"{item.get('detailed_summary', 'No summary available.')}\n\n" | |
ref_counter += 1 | |
# Format Google results | |
if google_results: | |
output += "## π Web Sources\n\n" | |
for item in google_results: | |
output += f"### [{ref_counter}] {item.get('title', 'No Title')}\n" | |
output += f"**Link**: {item.get('link')}\n" | |
output += "\n**Summary**:\n" | |
output += f"{item.get('detailed_summary', 'No summary available.')}\n\n" | |
ref_counter += 1 | |
# Format News results | |
if news_results: | |
output += "## π° Recent News\n\n" | |
for item in news_results: | |
output += f"### [{ref_counter}] {item.get('title', 'No Title')}\n" | |
output += f"**Link**: {item.get('url')}\n" | |
if item.get('source', {}).get('name'): | |
output += f"**Source**: {item['source']['name']}\n" | |
if item.get('publishedAt'): | |
output += f"**Published**: {item['publishedAt'][:10]}\n" | |
output += "\n**Summary**:\n" | |
output += f"{item.get('detailed_summary', 'No summary available.')}\n\n" | |
ref_counter += 1 | |
return output | |
def process_request(self, user_request: str) -> str: | |
"""Process user request with improved error handling and query generation.""" | |
try: | |
# Generate optimized search query | |
search_query = self.generate_search_queries(user_request) | |
logger.info(f"Processing request: {search_query}") | |
# Get search results with fallback | |
google_results = self.search_with_fallback(search_query) | |
# Add delay before news API call | |
self.random_delay() | |
# Get news results | |
news_results = self.get_news_results(search_query) | |
# Process all results - Fix: Pass the user_request as first argument | |
processed_results = self.process_all_results( | |
request=user_request, | |
raindrop_results=[], # Empty list for raindrop results | |
google_results=google_results, | |
news_results=news_results | |
) | |
# Generate response | |
essay = self.generate_essay_response(processed_results, user_request) | |
# Format and return results | |
return self.format_results(processed_results, essay) | |
except Exception as e: | |
logger.error(f"Error processing request: {e}") | |
return f""" | |
An error occurred while processing your request: {str(e)} | |
Please try again with a different search query or contact support if the problem persists. | |
""" | |
def generate_search_queries(self, user_request: str) -> str: | |
""" | |
Generate optimized search queries from user request. | |
Args: | |
user_request (str): The original user query | |
Returns: | |
str: Optimized search query | |
""" | |
try: | |
# Clean and preprocess the user request | |
cleaned_request = self.preprocess_query(user_request) | |
# Generate search query using GPT | |
prompt = f""" | |
Convert this search request into an optimized search query using proper search operators. | |
Request: {cleaned_request} | |
Guidelines: | |
- Focus on key concepts and synonyms | |
- Use combination of keywords that would appear in titles or descriptions | |
- Return only the search terms, no explanation | |
- Include alternative phrasings | |
- Keep it concise (max 6-8 key terms/phrases) | |
- use the formatting authorised in raindrop search: | |
o use " for exact search (ex: "artificial intelligence") | |
o use - to exclude some terms (ex: -math) // Do not exclude terms that are potentially relevant | |
o use match:OR for alternatives (ex: apple match:OR banana ) | |
o use match:AND for inclusion of both cases systematically (ex: apple match:AND banana ) | |
o use parenthesis for combinations ( ex: sugar match:AND (banana match:OR apple) ) | |
Example elaborate request: ("artificial intelligence" match:OR AI) -"machine learning" | |
Use your judgement, think step by steps. | |
Return only the search query terms. | |
""" | |
response = self.client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.3, | |
max_tokens=100 | |
) | |
optimized_query = response.choices[0].message.content.strip() | |
logger.info(f"Generated search query: {optimized_query}") | |
return optimized_query | |
except Exception as e: | |
logger.error(f"Error generating search queries: {e}") | |
# Fallback to using the original request if query generation fails | |
return user_request | |
def preprocess_query(self, query: str) -> str: | |
""" | |
Preprocess the user query to remove unnecessary elements and standardize format. | |
Args: | |
query (str): Original query string | |
Returns: | |
str: Cleaned query string | |
""" | |
try: | |
# Convert to lowercase | |
query = query.lower() | |
# Remove extra whitespace | |
query = ' '.join(query.split()) | |
# Remove special characters except basic punctuation | |
query = re.sub(r'[^a-z0-9\s\'".,?!-]', '', query) | |
# Remove multiple punctuation marks | |
query = re.sub(r'([.,?!])\1+', r'\1', query) | |
# Ensure proper spacing around quotes | |
query = re.sub(r'(?<=[^\s])"', ' "', query) | |
query = re.sub(r'"(?=[^\s])', '" ', query) | |
return query | |
except Exception as e: | |
logger.error(f"Error preprocessing query: {e}") | |
return query | |
# Initialize bot | |
bot = RaindropSearchBot() | |
# Create Gradio interface | |
def chatbot_interface(user_input: str) -> str: | |
return bot.process_request(user_input) | |
def convert_to_markdown(output_text: str) -> gr.Markdown: | |
try: | |
# Create a new Gradio Markdown component with the output text | |
output_textMarkdown = gr.Markdown( | |
value=output_text, | |
render=True, | |
visible=True | |
) | |
return output_textMarkdown | |
except Exception as e: | |
logger.error(f"Error converting to markdown: {e}") | |
# Return error message as markdown if conversion fails | |
return gr.Markdown( | |
value="Error converting content to markdown format. Please try again.", | |
visible=True | |
) | |
# Define and launch the interface | |
with gr.Blocks(title="Enhanced Search Assistant", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# π Enhanced Search Assistant | |
Enter your search request in natural language, and I'll find and analyze information from multiple sources: | |
- Your bookmarked content | |
- Web search results | |
- Recent news articles | |
""") | |
with gr.Row(): | |
input_text = gr.Textbox( | |
label="What would you like to search for?", | |
placeholder="Enter your search query here...", | |
lines=2 | |
) | |
with gr.Row(): | |
searchbutton = gr.Button("π Search", variant="primary") | |
with gr.Column(): | |
with gr.Accordion("Editable version", open=False): | |
with gr.Column(): | |
output_text = gr.Textbox( | |
label="Analysis and Results - editable", | |
lines=20, | |
interactive=True | |
) | |
refreshbutton = gr.Button("Refresh", variant="primary") | |
output_textMarkdown = gr.Markdown( | |
label="Analysis and Results", | |
height=600, | |
max_height=800 | |
) | |
searchbutton.click( | |
fn=chatbot_interface, | |
inputs=input_text, | |
outputs=output_text | |
).then( | |
fn=convert_to_markdown, | |
inputs=output_text, | |
outputs=output_textMarkdown) | |
refreshbutton.click( | |
fn=convert_to_markdown, | |
inputs=output_text, | |
outputs=output_textMarkdown) | |
# Launch the interface | |
if __name__ == "__main__": | |
demo.launch(share=True) | |