|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import datetime
|
|
from datetime import datetime
|
|
import gradio as gr
|
|
import json
|
|
import os
|
|
import logging
|
|
import requests
|
|
|
|
from tqdm import tqdm
|
|
|
|
from App_Function_Libraries.Utils import sanitize_filename
|
|
|
|
from Article_Extractor_Lib import scrape_article
|
|
from Local_Summarization_Lib import summarize_with_llama, summarize_with_oobabooga, summarize_with_tabbyapi, \
|
|
summarize_with_vllm, summarize_with_kobold, save_summary_to_file, summarize_with_local_llm
|
|
from Summarization_General_Lib import summarize_with_openai, summarize_with_anthropic, summarize_with_cohere, summarize_with_groq, summarize_with_openrouter, summarize_with_deepseek, summarize_with_huggingface
|
|
from SQLite_DB import Database, create_tables, add_media_with_keywords
|
|
|
|
|
|
|
|
|
|
|
|
def ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, custom_prompt):
|
|
try:
|
|
|
|
if not content.strip():
|
|
raise ValueError("Content is empty.")
|
|
|
|
db = Database()
|
|
create_tables()
|
|
keyword_list = keywords.split(',') if keywords else ["default"]
|
|
keyword_str = ', '.join(keyword_list)
|
|
|
|
|
|
url = url or 'Unknown'
|
|
title = title or 'Unknown'
|
|
author = author or 'Unknown'
|
|
keywords = keywords or 'default'
|
|
summary = summary or 'No summary available'
|
|
ingestion_date = ingestion_date or datetime.datetime.now().strftime('%Y-%m-%d')
|
|
|
|
|
|
logging.debug(f"URL: {url}")
|
|
logging.debug(f"Title: {title}")
|
|
logging.debug(f"Author: {author}")
|
|
logging.debug(f"Content: {content[:50]}... (length: {len(content)})")
|
|
logging.debug(f"Keywords: {keywords}")
|
|
logging.debug(f"Summary: {summary}")
|
|
logging.debug(f"Ingestion Date: {ingestion_date}")
|
|
logging.debug(f"Custom Prompt: {custom_prompt}")
|
|
|
|
|
|
if not url:
|
|
logging.error("URL is missing.")
|
|
raise ValueError("URL is missing.")
|
|
if not title:
|
|
logging.error("Title is missing.")
|
|
raise ValueError("Title is missing.")
|
|
if not content:
|
|
logging.error("Content is missing.")
|
|
raise ValueError("Content is missing.")
|
|
if not keywords:
|
|
logging.error("Keywords are missing.")
|
|
raise ValueError("Keywords are missing.")
|
|
if not summary:
|
|
logging.error("Summary is missing.")
|
|
raise ValueError("Summary is missing.")
|
|
if not ingestion_date:
|
|
logging.error("Ingestion date is missing.")
|
|
raise ValueError("Ingestion date is missing.")
|
|
if not custom_prompt:
|
|
logging.error("Custom prompt is missing.")
|
|
raise ValueError("Custom prompt is missing.")
|
|
|
|
|
|
result = add_media_with_keywords(
|
|
url=url,
|
|
title=title,
|
|
media_type='article',
|
|
content=content,
|
|
keywords=keyword_str or "article_default",
|
|
prompt=custom_prompt or None,
|
|
summary=summary or "No summary generated",
|
|
transcription_model=None,
|
|
author=author or 'Unknown',
|
|
ingestion_date=ingestion_date
|
|
)
|
|
return result
|
|
except Exception as e:
|
|
logging.error(f"Failed to ingest article to the database: {e}")
|
|
return str(e)
|
|
|
|
|
|
def scrape_and_summarize_multiple(urls, custom_prompt_arg, api_name, api_key, keywords, custom_article_titles):
|
|
urls = [url.strip() for url in urls.split('\n') if url.strip()]
|
|
custom_titles = custom_article_titles.split('\n') if custom_article_titles else []
|
|
|
|
results = []
|
|
errors = []
|
|
|
|
|
|
progress = gr.Progress()
|
|
|
|
for i, url in tqdm(enumerate(urls), total=len(urls), desc="Processing URLs"):
|
|
custom_title = custom_titles[i] if i < len(custom_titles) else None
|
|
try:
|
|
result = scrape_and_summarize(url, custom_prompt_arg, api_name, api_key, keywords, custom_title)
|
|
results.append(f"Results for URL {i + 1}:\n{result}")
|
|
except Exception as e:
|
|
error_message = f"Error processing URL {i + 1} ({url}): {str(e)}"
|
|
errors.append(error_message)
|
|
results.append(f"Failed to process URL {i + 1}: {url}")
|
|
|
|
|
|
progress((i + 1) / len(urls), desc=f"Processed {i + 1}/{len(urls)} URLs")
|
|
|
|
|
|
combined_output = "\n".join(results)
|
|
if errors:
|
|
combined_output += "\n\nErrors encountered:\n" + "\n".join(errors)
|
|
|
|
return combined_output
|
|
|
|
|
|
def scrape_and_summarize(url, custom_prompt_arg, api_name, api_key, keywords, custom_article_title):
|
|
try:
|
|
|
|
article_data = scrape_article(url)
|
|
print(f"Scraped Article Data: {article_data}")
|
|
if not article_data:
|
|
return "Failed to scrape the article."
|
|
|
|
|
|
title = custom_article_title.strip() if custom_article_title else article_data.get('title', 'Untitled')
|
|
author = article_data.get('author', 'Unknown')
|
|
content = article_data.get('content', '')
|
|
ingestion_date = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
print(f"Title: {title}, Author: {author}, Content Length: {len(content)}")
|
|
|
|
|
|
article_custom_prompt = custom_prompt_arg or "Summarize this article."
|
|
|
|
|
|
summary = None
|
|
if api_name:
|
|
logging.debug(f"Article_Summarizer: Summarization being performed by {api_name}")
|
|
|
|
|
|
sanitized_title = sanitize_filename(title)
|
|
json_file_path = os.path.join("Results", f"{sanitized_title}_segments.json")
|
|
|
|
with open(json_file_path, 'w') as json_file:
|
|
json.dump([{'text': content}], json_file, indent=2)
|
|
|
|
try:
|
|
if api_name.lower() == 'openai':
|
|
|
|
summary = summarize_with_openai(api_key, json_file_path, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "anthropic":
|
|
|
|
summary = summarize_with_anthropic(api_key, json_file_path, article_custom_prompt)
|
|
elif api_name.lower() == "cohere":
|
|
|
|
summary = summarize_with_cohere(api_key, json_file_path, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "groq":
|
|
logging.debug(f"MAIN: Trying to summarize with groq")
|
|
|
|
summary = summarize_with_groq(api_key, json_file_path, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "openrouter":
|
|
logging.debug(f"MAIN: Trying to summarize with OpenRouter")
|
|
|
|
summary = summarize_with_openrouter(api_key, json_file_path, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "deepseek":
|
|
logging.debug(f"MAIN: Trying to summarize with DeepSeek")
|
|
|
|
summary = summarize_with_deepseek(api_key, json_file_path, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "llama.cpp":
|
|
logging.debug(f"MAIN: Trying to summarize with Llama.cpp")
|
|
|
|
summary = summarize_with_llama(json_file_path, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "kobold":
|
|
logging.debug(f"MAIN: Trying to summarize with Kobold.cpp")
|
|
|
|
summary = summarize_with_kobold(json_file_path, api_key, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "ooba":
|
|
|
|
summary = summarize_with_oobabooga(json_file_path, api_key, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "tabbyapi":
|
|
|
|
summary = summarize_with_tabbyapi(json_file_path, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "vllm":
|
|
logging.debug(f"MAIN: Trying to summarize with VLLM")
|
|
|
|
summary = summarize_with_vllm(json_file_path, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "local-llm":
|
|
logging.debug(f"MAIN: Trying to summarize with Local LLM")
|
|
summary = summarize_with_local_llm(json_file_path, article_custom_prompt)
|
|
|
|
elif api_name.lower() == "huggingface":
|
|
logging.debug(f"MAIN: Trying to summarize with huggingface")
|
|
|
|
summarize_with_huggingface(api_key, json_file_path, article_custom_prompt)
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
logging.error(f"Connection error while trying to summarize with {api_name}: {str(e)}")
|
|
|
|
if summary:
|
|
logging.info(f"Article_Summarizer: Summary generated using {api_name} API")
|
|
save_summary_to_file(summary, json_file_path)
|
|
else:
|
|
summary = "Summary not available"
|
|
logging.warning(f"Failed to generate summary using {api_name} API")
|
|
|
|
else:
|
|
summary = "Article Summarization: No API provided for summarization."
|
|
|
|
print(f"Summary: {summary}")
|
|
|
|
|
|
ingestion_result = ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date,
|
|
article_custom_prompt)
|
|
|
|
return f"Title: {title}\nAuthor: {author}\nIngestion Result: {ingestion_result}\n\nSummary: {summary}\n\nArticle Contents: {content}"
|
|
except Exception as e:
|
|
logging.error(f"Error processing URL {url}: {str(e)}")
|
|
return f"Failed to process URL {url}: {str(e)}"
|
|
|
|
|
|
def ingest_unstructured_text(text, custom_prompt, api_name, api_key, keywords, custom_article_title):
|
|
title = custom_article_title.strip() if custom_article_title else "Unstructured Text"
|
|
author = "Unknown"
|
|
ingestion_date = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
|
|
if api_name:
|
|
json_file_path = f"Results/{title.replace(' ', '_')}_segments.json"
|
|
with open(json_file_path, 'w') as json_file:
|
|
json.dump([{'text': text}], json_file, indent=2)
|
|
|
|
if api_name.lower() == 'openai':
|
|
summary = summarize_with_openai(api_key, json_file_path, custom_prompt)
|
|
|
|
else:
|
|
summary = "Unsupported API."
|
|
else:
|
|
summary = "No API provided for summarization."
|
|
|
|
|
|
ingestion_result = ingest_article_to_db('Unstructured Text', title, author, text, keywords, summary, ingestion_date,
|
|
custom_prompt)
|
|
return f"Title: {title}\nSummary: {summary}\nIngestion Result: {ingestion_result}"
|
|
|
|
|
|
|
|
|
|
|
|
|