SEO / seo_analyzer.py
Merlintxu's picture
Update seo_analyzer.py
c2b829f verified
import os
import re
import logging
import requests
import PyPDF2
import numpy as np
import pandas as pd
from io import BytesIO
from typing import List, Dict, Tuple, Optional
from urllib.parse import urlparse, urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import BeautifulSoup
from pathlib import Path
from datetime import datetime
from sklearn.feature_extraction.text import TfidfVectorizer
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from transformers import pipeline
from sentence_transformers import SentenceTransformer, util
import torch
import spacy
import matplotlib.pyplot as plt
from utils import sanitize_filename
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Términos prohibidos (ejemplo)
PROHIBITED_TERMS = [
"gratis", "garantizado", "rentabilidad asegurada", "sin compromiso",
"resultados inmediatos", "cero riesgo", "sin letra pequeña"
]
class SEOSpaceAnalyzer:
def __init__(self, max_urls: int = 20, max_workers: int = 4):
self.max_urls = max_urls
self.max_workers = max_workers
self.session = self._configure_session()
self.models = self._load_models()
self.base_dir = Path("content_storage")
self.base_dir.mkdir(parents=True, exist_ok=True)
self.current_analysis: Dict = {}
def _configure_session(self):
session = requests.Session()
retry = Retry(total=3, backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET"])
session.mount("http://", HTTPAdapter(max_retries=retry))
session.mount("https://", HTTPAdapter(max_retries=retry))
session.headers.update({
"User-Agent": "SEOAnalyzer/1.0",
"Accept-Language": "es-ES,es;q=0.9"
})
return session
def _load_models(self):
device = 0 if torch.cuda.is_available() else -1
return {
"spacy": spacy.load("es_core_news_lg"),
"summarizer": pipeline("summarization", model="facebook/bart-large-cnn", device=device),
"ner": pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=device),
"semantic": SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2"),
"zeroshot": pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
}
def analyze_sitemap(
self,
sitemap_url: str,
progress_callback: Optional[callable] = None,
status_callback: Optional[callable] = None
) -> Tuple:
urls = self._parse_sitemap(sitemap_url)
if not urls:
return {"error": "No se pudieron extraer URLs"}, [], {}, {}, {}, {}, {}
results = []
batch_size = 5
num_urls = min(len(urls), self.max_urls)
total_batches = (num_urls + batch_size - 1) // batch_size
for batch_index in range(total_batches):
start = batch_index * batch_size
batch_urls = urls[start:start+batch_size]
if status_callback:
status_callback(f"Procesando batch {batch_index+1}/{total_batches}: {batch_urls}")
with ThreadPoolExecutor(max_workers=len(batch_urls)) as executor:
futures = {executor.submit(self._process_url, url): url for url in batch_urls}
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
results.append({"url": futures[future], "status": "error", "error": str(e)})
if progress_callback:
progress_callback(batch_index+1, total_batches)
# Aplicar procesos de NLP a los resultados
summaries, entities = self._apply_nlp(results)
similarities = self._compute_similarity(results)
flags = self._flag_prohibited_terms(results)
topics = self._classify_topics(results)
seo_tags = self._generate_seo_tags(results, summaries, topics, flags)
self.current_analysis = {
"stats": self._calculate_stats(results),
"content_analysis": self._analyze_content(results),
"links": self._analyze_links(results),
"recommendations": self._generate_recommendations(results),
"details": results,
"summaries": summaries,
"entities": entities,
"similarities": similarities,
"flags": flags,
"topics": topics,
"seo_tags": seo_tags,
"timestamp": datetime.now().isoformat()
}
a = self.current_analysis
# Retornamos 7 outputs (sin summaries, que no se muestran en la UI)
return (
a["stats"], a["recommendations"], a["content_analysis"],
a["links"], a["details"], a["similarities"],
a["seo_tags"]
)
def _process_url(self, url: str) -> Dict:
try:
response = self.session.get(url, timeout=10)
content_type = response.headers.get("Content-Type", "")
if "application/pdf" in content_type:
return self._process_pdf(url, response.content)
return self._process_html(url, response.text)
except Exception as e:
return {"url": url, "status": "error", "error": str(e)}
def _process_html(self, url: str, html: str) -> Dict:
soup = BeautifulSoup(html, "html.parser")
text = re.sub(r"\s+", " ", soup.get_text())
return {
"url": url,
"type": "html",
"status": "success",
"content": text,
"word_count": len(text.split()),
"metadata": self._extract_metadata(soup),
"links": self._extract_links(soup, url)
}
def _process_pdf(self, url: str, content: bytes) -> Dict:
try:
reader = PyPDF2.PdfReader(BytesIO(content))
text = "".join(p.extract_text() or "" for p in reader.pages)
return {
"url": url,
"type": "pdf",
"status": "success",
"content": text,
"word_count": len(text.split()),
"page_count": len(reader.pages)
}
except Exception as e:
return {"url": url, "status": "error", "error": str(e)}
def _extract_metadata(self, soup: BeautifulSoup) -> Dict:
meta = {"title": "", "description": ""}
if soup.title:
meta["title"] = soup.title.string.strip()
for tag in soup.find_all("meta"):
if tag.get("name") == "description":
meta["description"] = tag.get("content", "")
return meta
def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict]:
links = []
base_domain = urlparse(base_url).netloc
for tag in soup.find_all("a", href=True):
href = tag["href"]
full_url = urljoin(base_url, href)
netloc = urlparse(full_url).netloc
links.append({
"url": full_url,
"type": "internal" if netloc == base_domain else "external",
"anchor": tag.get_text(strip=True)
})
return links
def _parse_sitemap(self, sitemap_url: str) -> List[str]:
try:
r = self.session.get(sitemap_url)
soup = BeautifulSoup(r.text, "lxml-xml")
return [loc.text for loc in soup.find_all("loc")]
except Exception as e:
logger.error(f"Error al parsear sitemap {sitemap_url}: {e}")
return []
def _save_content(self, url: str, content: bytes) -> None:
try:
parsed = urlparse(url)
domain_dir = self.base_dir / parsed.netloc
path = parsed.path.lstrip("/")
if not path or path.endswith("/"):
path = os.path.join(path, "index.html")
safe_path = sanitize_filename(path)
save_path = domain_dir / safe_path
save_path.parent.mkdir(parents=True, exist_ok=True)
new_hash = hash(content)
if save_path.exists():
with open(save_path, "rb") as f:
if hash(f.read()) == new_hash:
logger.debug(f"El contenido de {url} ya está guardado.")
return
with open(save_path, "wb") as f:
f.write(content)
logger.info(f"Guardado contenido en: {save_path}")
except Exception as e:
logger.error(f"Error guardando contenido para {url}: {e}")
def _calculate_stats(self, results: List[Dict]) -> Dict:
success = [r for r in results if r.get("status") == "success"]
return {
"total": len(results),
"success": len(success),
"failed": len(results) - len(success),
"avg_words": round(np.mean([r.get("word_count", 0) for r in success]) if success else 0, 1)
}
def _analyze_content(self, results: List[Dict]) -> Dict:
texts = [r["content"] for r in results if r.get("status") == "success" and r.get("content")]
if not texts:
return {}
tfidf = TfidfVectorizer(max_features=20, stop_words=list(self.models["spacy"].Defaults.stop_words))
tfidf.fit(texts)
top = tfidf.get_feature_names_out().tolist()
return {"top_keywords": top, "samples": texts[:3]}
def _analyze_links(self, results: List[Dict]) -> Dict:
all_links = []
for r in results:
all_links.extend(r.get("links", []))
if not all_links:
return {}
df = pd.DataFrame(all_links)
return {
"internal_links": df[df["type"] == "internal"]["url"].value_counts().head(10).to_dict(),
"external_links": df[df["type"] == "external"]["url"].value_counts().head(10).to_dict()
}
def _apply_nlp(self, results: List[Dict]) -> Tuple[Dict, Dict]:
summaries, entities = {}, {}
for r in results:
if r.get("status") != "success" or not r.get("content"):
continue
text = r["content"][:1024]
try:
summaries[r["url"]] = self.models["summarizer"](text, max_length=100, min_length=30)[0]["summary_text"]
ents = self.models["ner"](text)
entities[r["url"]] = list({e["word"] for e in ents if e["score"] > 0.8})
except Exception as e:
continue
return summaries, entities
def _compute_similarity(self, results: List[Dict]) -> Dict[str, List[Dict]]:
docs = [(r["url"], r["content"]) for r in results if r.get("status") == "success" and r.get("content")]
if len(docs) < 2:
return {}
urls, texts = zip(*docs)
emb = self.models["semantic"].encode(texts, convert_to_tensor=True)
sim = util.pytorch_cos_sim(emb, emb)
return {
urls[i]: [{"url": urls[j], "score": float(sim[i][j])}
for j in np.argsort(-sim[i]) if i != j][:3]
for i in range(len(urls))
}
def _flag_prohibited_terms(self, results: List[Dict]) -> Dict[str, List[str]]:
flags = {}
for r in results:
found = [term for term in PROHIBITED_TERMS if term in r.get("content", "").lower()]
if found:
flags[r["url"]] = found
return flags
def _classify_topics(self, results: List[Dict]) -> Dict[str, List[str]]:
labels = [
"hipotecas", "préstamos", "cuentas", "tarjetas",
"seguros", "inversión", "educación financiera"
]
topics = {}
for r in results:
if r.get("status") != "success":
continue
try:
res = self.models["zeroshot"](r["content"][:1000], candidate_labels=labels, multi_label=True)
topics[r["url"]] = [l for l, s in zip(res["labels"], res["scores"]) if s > 0.5]
except Exception as e:
continue
return topics
def _generate_seo_tags(self, results: List[Dict], summaries: Dict, topics: Dict, flags: Dict) -> Dict[str, Dict]:
seo_tags = {}
for r in results:
url = r["url"]
base = summaries.get(url, r.get("content", "")[:300])
topic = topics.get(url, ["contenido"])[0] if topics.get(url) else "contenido"
try:
prompt = f"Genera un título SEO formal y una meta descripción para contenido sobre {topic}: {base}"
output = self.models["summarizer"](prompt, max_length=60, min_length=20)[0]["summary_text"]
title, desc = output.split(".")[0], output
except Exception as e:
title, desc = "", ""
seo_tags[url] = {
"title": title,
"meta_description": desc,
"flags": flags.get(url, [])
}
return seo_tags
def _generate_recommendations(self, results: List[Dict]) -> List[str]:
recs = []
if any(r.get("word_count", 0) < 300 for r in results):
recs.append("✍️ Algunos contenidos son demasiado breves (<300 palabras)")
if any("gratis" in r.get("content", "").lower() for r in results):
recs.append("⚠️ Detectado uso de lenguaje no permitido")
return recs or ["✅ Todo parece correcto"]
def plot_internal_links(self, links: Dict) -> any:
if not links or not links.get("internal_links"):
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "No hay enlaces internos", ha="center")
return fig
top = links["internal_links"]
fig, ax = plt.subplots()
ax.barh(list(top.keys()), list(top.values()))
ax.set_title("Top Enlaces Internos")
plt.tight_layout()
return fig