|
import os |
|
import re |
|
import logging |
|
import requests |
|
import PyPDF2 |
|
import numpy as np |
|
import pandas as pd |
|
from io import BytesIO |
|
from typing import List, Dict, Tuple, Optional |
|
from urllib.parse import urlparse, urljoin |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from bs4 import BeautifulSoup |
|
from pathlib import Path |
|
from datetime import datetime |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from requests.adapters import HTTPAdapter |
|
from urllib3.util.retry import Retry |
|
from transformers import pipeline |
|
from sentence_transformers import SentenceTransformer, util |
|
import torch |
|
import spacy |
|
import matplotlib.pyplot as plt |
|
from utils import sanitize_filename |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
PROHIBITED_TERMS = [ |
|
"gratis", "garantizado", "rentabilidad asegurada", "sin compromiso", |
|
"resultados inmediatos", "cero riesgo", "sin letra pequeña" |
|
] |
|
|
|
class SEOSpaceAnalyzer: |
|
def __init__(self, max_urls: int = 20, max_workers: int = 4): |
|
self.max_urls = max_urls |
|
self.max_workers = max_workers |
|
self.session = self._configure_session() |
|
self.models = self._load_models() |
|
self.base_dir = Path("content_storage") |
|
self.base_dir.mkdir(parents=True, exist_ok=True) |
|
self.current_analysis: Dict = {} |
|
|
|
def _configure_session(self): |
|
session = requests.Session() |
|
retry = Retry(total=3, backoff_factor=1, |
|
status_forcelist=[500, 502, 503, 504], |
|
allowed_methods=["GET"]) |
|
session.mount("http://", HTTPAdapter(max_retries=retry)) |
|
session.mount("https://", HTTPAdapter(max_retries=retry)) |
|
session.headers.update({ |
|
"User-Agent": "SEOAnalyzer/1.0", |
|
"Accept-Language": "es-ES,es;q=0.9" |
|
}) |
|
return session |
|
|
|
def _load_models(self): |
|
device = 0 if torch.cuda.is_available() else -1 |
|
return { |
|
"spacy": spacy.load("es_core_news_lg"), |
|
"summarizer": pipeline("summarization", model="facebook/bart-large-cnn", device=device), |
|
"ner": pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=device), |
|
"semantic": SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2"), |
|
"zeroshot": pipeline("zero-shot-classification", model="facebook/bart-large-mnli") |
|
} |
|
|
|
def analyze_sitemap( |
|
self, |
|
sitemap_url: str, |
|
progress_callback: Optional[callable] = None, |
|
status_callback: Optional[callable] = None |
|
) -> Tuple: |
|
urls = self._parse_sitemap(sitemap_url) |
|
if not urls: |
|
return {"error": "No se pudieron extraer URLs"}, [], {}, {}, {}, {}, {} |
|
results = [] |
|
batch_size = 5 |
|
num_urls = min(len(urls), self.max_urls) |
|
total_batches = (num_urls + batch_size - 1) // batch_size |
|
|
|
for batch_index in range(total_batches): |
|
start = batch_index * batch_size |
|
batch_urls = urls[start:start+batch_size] |
|
if status_callback: |
|
status_callback(f"Procesando batch {batch_index+1}/{total_batches}: {batch_urls}") |
|
with ThreadPoolExecutor(max_workers=len(batch_urls)) as executor: |
|
futures = {executor.submit(self._process_url, url): url for url in batch_urls} |
|
for future in as_completed(futures): |
|
try: |
|
results.append(future.result()) |
|
except Exception as e: |
|
results.append({"url": futures[future], "status": "error", "error": str(e)}) |
|
if progress_callback: |
|
progress_callback(batch_index+1, total_batches) |
|
|
|
|
|
summaries, entities = self._apply_nlp(results) |
|
similarities = self._compute_similarity(results) |
|
flags = self._flag_prohibited_terms(results) |
|
topics = self._classify_topics(results) |
|
seo_tags = self._generate_seo_tags(results, summaries, topics, flags) |
|
|
|
self.current_analysis = { |
|
"stats": self._calculate_stats(results), |
|
"content_analysis": self._analyze_content(results), |
|
"links": self._analyze_links(results), |
|
"recommendations": self._generate_recommendations(results), |
|
"details": results, |
|
"summaries": summaries, |
|
"entities": entities, |
|
"similarities": similarities, |
|
"flags": flags, |
|
"topics": topics, |
|
"seo_tags": seo_tags, |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
a = self.current_analysis |
|
|
|
return ( |
|
a["stats"], a["recommendations"], a["content_analysis"], |
|
a["links"], a["details"], a["similarities"], |
|
a["seo_tags"] |
|
) |
|
|
|
def _process_url(self, url: str) -> Dict: |
|
try: |
|
response = self.session.get(url, timeout=10) |
|
content_type = response.headers.get("Content-Type", "") |
|
if "application/pdf" in content_type: |
|
return self._process_pdf(url, response.content) |
|
return self._process_html(url, response.text) |
|
except Exception as e: |
|
return {"url": url, "status": "error", "error": str(e)} |
|
|
|
def _process_html(self, url: str, html: str) -> Dict: |
|
soup = BeautifulSoup(html, "html.parser") |
|
text = re.sub(r"\s+", " ", soup.get_text()) |
|
return { |
|
"url": url, |
|
"type": "html", |
|
"status": "success", |
|
"content": text, |
|
"word_count": len(text.split()), |
|
"metadata": self._extract_metadata(soup), |
|
"links": self._extract_links(soup, url) |
|
} |
|
|
|
def _process_pdf(self, url: str, content: bytes) -> Dict: |
|
try: |
|
reader = PyPDF2.PdfReader(BytesIO(content)) |
|
text = "".join(p.extract_text() or "" for p in reader.pages) |
|
return { |
|
"url": url, |
|
"type": "pdf", |
|
"status": "success", |
|
"content": text, |
|
"word_count": len(text.split()), |
|
"page_count": len(reader.pages) |
|
} |
|
except Exception as e: |
|
return {"url": url, "status": "error", "error": str(e)} |
|
|
|
def _extract_metadata(self, soup: BeautifulSoup) -> Dict: |
|
meta = {"title": "", "description": ""} |
|
if soup.title: |
|
meta["title"] = soup.title.string.strip() |
|
for tag in soup.find_all("meta"): |
|
if tag.get("name") == "description": |
|
meta["description"] = tag.get("content", "") |
|
return meta |
|
|
|
def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict]: |
|
links = [] |
|
base_domain = urlparse(base_url).netloc |
|
for tag in soup.find_all("a", href=True): |
|
href = tag["href"] |
|
full_url = urljoin(base_url, href) |
|
netloc = urlparse(full_url).netloc |
|
links.append({ |
|
"url": full_url, |
|
"type": "internal" if netloc == base_domain else "external", |
|
"anchor": tag.get_text(strip=True) |
|
}) |
|
return links |
|
|
|
def _parse_sitemap(self, sitemap_url: str) -> List[str]: |
|
try: |
|
r = self.session.get(sitemap_url) |
|
soup = BeautifulSoup(r.text, "lxml-xml") |
|
return [loc.text for loc in soup.find_all("loc")] |
|
except Exception as e: |
|
logger.error(f"Error al parsear sitemap {sitemap_url}: {e}") |
|
return [] |
|
|
|
def _save_content(self, url: str, content: bytes) -> None: |
|
try: |
|
parsed = urlparse(url) |
|
domain_dir = self.base_dir / parsed.netloc |
|
path = parsed.path.lstrip("/") |
|
if not path or path.endswith("/"): |
|
path = os.path.join(path, "index.html") |
|
safe_path = sanitize_filename(path) |
|
save_path = domain_dir / safe_path |
|
save_path.parent.mkdir(parents=True, exist_ok=True) |
|
new_hash = hash(content) |
|
if save_path.exists(): |
|
with open(save_path, "rb") as f: |
|
if hash(f.read()) == new_hash: |
|
logger.debug(f"El contenido de {url} ya está guardado.") |
|
return |
|
with open(save_path, "wb") as f: |
|
f.write(content) |
|
logger.info(f"Guardado contenido en: {save_path}") |
|
except Exception as e: |
|
logger.error(f"Error guardando contenido para {url}: {e}") |
|
|
|
def _calculate_stats(self, results: List[Dict]) -> Dict: |
|
success = [r for r in results if r.get("status") == "success"] |
|
return { |
|
"total": len(results), |
|
"success": len(success), |
|
"failed": len(results) - len(success), |
|
"avg_words": round(np.mean([r.get("word_count", 0) for r in success]) if success else 0, 1) |
|
} |
|
|
|
def _analyze_content(self, results: List[Dict]) -> Dict: |
|
texts = [r["content"] for r in results if r.get("status") == "success" and r.get("content")] |
|
if not texts: |
|
return {} |
|
tfidf = TfidfVectorizer(max_features=20, stop_words=list(self.models["spacy"].Defaults.stop_words)) |
|
tfidf.fit(texts) |
|
top = tfidf.get_feature_names_out().tolist() |
|
return {"top_keywords": top, "samples": texts[:3]} |
|
|
|
def _analyze_links(self, results: List[Dict]) -> Dict: |
|
all_links = [] |
|
for r in results: |
|
all_links.extend(r.get("links", [])) |
|
if not all_links: |
|
return {} |
|
df = pd.DataFrame(all_links) |
|
return { |
|
"internal_links": df[df["type"] == "internal"]["url"].value_counts().head(10).to_dict(), |
|
"external_links": df[df["type"] == "external"]["url"].value_counts().head(10).to_dict() |
|
} |
|
|
|
def _apply_nlp(self, results: List[Dict]) -> Tuple[Dict, Dict]: |
|
summaries, entities = {}, {} |
|
for r in results: |
|
if r.get("status") != "success" or not r.get("content"): |
|
continue |
|
text = r["content"][:1024] |
|
try: |
|
summaries[r["url"]] = self.models["summarizer"](text, max_length=100, min_length=30)[0]["summary_text"] |
|
ents = self.models["ner"](text) |
|
entities[r["url"]] = list({e["word"] for e in ents if e["score"] > 0.8}) |
|
except Exception as e: |
|
continue |
|
return summaries, entities |
|
|
|
def _compute_similarity(self, results: List[Dict]) -> Dict[str, List[Dict]]: |
|
docs = [(r["url"], r["content"]) for r in results if r.get("status") == "success" and r.get("content")] |
|
if len(docs) < 2: |
|
return {} |
|
urls, texts = zip(*docs) |
|
emb = self.models["semantic"].encode(texts, convert_to_tensor=True) |
|
sim = util.pytorch_cos_sim(emb, emb) |
|
return { |
|
urls[i]: [{"url": urls[j], "score": float(sim[i][j])} |
|
for j in np.argsort(-sim[i]) if i != j][:3] |
|
for i in range(len(urls)) |
|
} |
|
|
|
def _flag_prohibited_terms(self, results: List[Dict]) -> Dict[str, List[str]]: |
|
flags = {} |
|
for r in results: |
|
found = [term for term in PROHIBITED_TERMS if term in r.get("content", "").lower()] |
|
if found: |
|
flags[r["url"]] = found |
|
return flags |
|
|
|
def _classify_topics(self, results: List[Dict]) -> Dict[str, List[str]]: |
|
labels = [ |
|
"hipotecas", "préstamos", "cuentas", "tarjetas", |
|
"seguros", "inversión", "educación financiera" |
|
] |
|
topics = {} |
|
for r in results: |
|
if r.get("status") != "success": |
|
continue |
|
try: |
|
res = self.models["zeroshot"](r["content"][:1000], candidate_labels=labels, multi_label=True) |
|
topics[r["url"]] = [l for l, s in zip(res["labels"], res["scores"]) if s > 0.5] |
|
except Exception as e: |
|
continue |
|
return topics |
|
|
|
def _generate_seo_tags(self, results: List[Dict], summaries: Dict, topics: Dict, flags: Dict) -> Dict[str, Dict]: |
|
seo_tags = {} |
|
for r in results: |
|
url = r["url"] |
|
base = summaries.get(url, r.get("content", "")[:300]) |
|
topic = topics.get(url, ["contenido"])[0] if topics.get(url) else "contenido" |
|
try: |
|
prompt = f"Genera un título SEO formal y una meta descripción para contenido sobre {topic}: {base}" |
|
output = self.models["summarizer"](prompt, max_length=60, min_length=20)[0]["summary_text"] |
|
title, desc = output.split(".")[0], output |
|
except Exception as e: |
|
title, desc = "", "" |
|
seo_tags[url] = { |
|
"title": title, |
|
"meta_description": desc, |
|
"flags": flags.get(url, []) |
|
} |
|
return seo_tags |
|
|
|
def _generate_recommendations(self, results: List[Dict]) -> List[str]: |
|
recs = [] |
|
if any(r.get("word_count", 0) < 300 for r in results): |
|
recs.append("✍️ Algunos contenidos son demasiado breves (<300 palabras)") |
|
if any("gratis" in r.get("content", "").lower() for r in results): |
|
recs.append("⚠️ Detectado uso de lenguaje no permitido") |
|
return recs or ["✅ Todo parece correcto"] |
|
|
|
def plot_internal_links(self, links: Dict) -> any: |
|
if not links or not links.get("internal_links"): |
|
fig, ax = plt.subplots() |
|
ax.text(0.5, 0.5, "No hay enlaces internos", ha="center") |
|
return fig |
|
top = links["internal_links"] |
|
fig, ax = plt.subplots() |
|
ax.barh(list(top.keys()), list(top.values())) |
|
ax.set_title("Top Enlaces Internos") |
|
plt.tight_layout() |
|
return fig |
|
|