import os import logging import re import requests import hashlib import PyPDF2 import numpy as np import pandas as pd from io import BytesIO from typing import List, Dict, Any, Tuple from urllib.parse import urlparse, urljoin from concurrent.futures import ThreadPoolExecutor, as_completed from bs4 import BeautifulSoup from pathlib import Path from datetime import datetime from sklearn.feature_extraction.text import TfidfVectorizer from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from transformers import pipeline from sentence_transformers import SentenceTransformer import torch import subprocess import sys import spacy import matplotlib.pyplot as plt from utils import sanitize_filename logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class SEOSpaceAnalyzer: def __init__(self, max_urls: int = 20, max_workers: int = 4) -> None: """ Inicializa la sesión HTTP, carga modelos NLP y prepara el directorio de almacenamiento. """ self.max_urls = max_urls self.max_workers = max_workers self.session = self._configure_session() self.models = self._load_models() self.base_dir = Path("content_storage") self.base_dir.mkdir(parents=True, exist_ok=True) self.current_analysis: Dict[str, Any] = {} def _load_models(self) -> Dict[str, Any]: """Carga los modelos NLP de Hugging Face y spaCy.""" try: device = 0 if torch.cuda.is_available() else -1 logger.info("Cargando modelos NLP...") models = { 'summarizer': pipeline("summarization", model="facebook/bart-large-cnn", device=device), 'ner': pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=device), 'semantic': SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2'), 'spacy': spacy.load("es_core_news_lg") } logger.info("Modelos cargados correctamente.") return models except Exception as e: logger.error(f"Error cargando modelos: {e}") raise def _configure_session(self) -> requests.Session: """Configura una sesión HTTP con reintentos y headers personalizados.""" session = requests.Session() retry = Retry( total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504], allowed_methods=['GET', 'HEAD'] ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) session.headers.update({ 'User-Agent': 'Mozilla/5.0 (compatible; SEOBot/1.0)', 'Accept-Language': 'es-ES,es;q=0.9' }) return session def analyze_sitemap(self, sitemap_url: str) -> Tuple[Dict, List[str], Dict, Dict, List[Dict]]: """ Procesa el sitemap: extrae URLs, analiza cada página individualmente y devuelve datos agregados. """ try: urls = self._parse_sitemap(sitemap_url) if not urls: return {"error": "No se pudieron extraer URLs del sitemap"}, [], {}, {}, [] results: List[Dict] = [] with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = {executor.submit(self._process_url, url): url for url in urls[:self.max_urls]} for future in as_completed(futures): url = futures[future] try: res = future.result() results.append(res) logger.info(f"Procesado: {url}") except Exception as e: logger.error(f"Error procesando {url}: {e}") results.append({'url': url, 'status': 'error', 'error': str(e)}) self.current_analysis = { 'stats': self._calculate_stats(results), 'content_analysis': self._analyze_content(results), 'links': self._analyze_links(results), 'recommendations': self._generate_seo_recommendations(results), 'details': results, 'timestamp': datetime.now().isoformat() } analysis = self.current_analysis return analysis['stats'], analysis['recommendations'], analysis['content_analysis'], analysis['links'], analysis['details'] except Exception as e: logger.error(f"Error en análisis: {e}") return {"error": str(e)}, [], {}, {}, [] def _process_url(self, url: str) -> Dict: """Procesa una URL individual extrayendo contenido, metadatos y enlaces.""" try: response = self.session.get(url, timeout=15) response.raise_for_status() content_type = response.headers.get('Content-Type', '') result: Dict[str, Any] = {'url': url, 'status': 'success'} if 'application/pdf' in content_type: result.update(self._process_pdf(response.content)) elif 'text/html' in content_type: result.update(self._process_html(response.text, url)) else: result.update({'type': 'unknown', 'content': '', 'word_count': 0}) self._save_content(url, response.content) return result except requests.exceptions.Timeout as e: logger.error(f"Timeout al procesar {url}: {e}") return {'url': url, 'status': 'error', 'error': "Timeout"} except requests.exceptions.HTTPError as e: logger.error(f"HTTPError al procesar {url}: {e}") return {'url': url, 'status': 'error', 'error': "HTTP Error"} except Exception as e: logger.error(f"Error inesperado en {url}: {e}") return {'url': url, 'status': 'error', 'error': str(e)} def _process_html(self, html: str, base_url: str) -> Dict: """Extrae y limpia el contenido HTML, metadatos y enlaces de la página.""" soup = BeautifulSoup(html, 'html.parser') clean_text = self._clean_text(soup.get_text()) return { 'type': 'html', 'content': clean_text, 'word_count': len(clean_text.split()), 'metadata': self._extract_metadata(soup), 'links': self._extract_links(soup, base_url) } def _process_pdf(self, content: bytes) -> Dict: """Extrae texto de un documento PDF y calcula estadísticas básicas.""" try: text = "" with BytesIO(content) as pdf_file: reader = PyPDF2.PdfReader(pdf_file) for page in reader.pages: extracted = page.extract_text() text += extracted if extracted else "" clean_text = self._clean_text(text) return { 'type': 'pdf', 'content': clean_text, 'word_count': len(clean_text.split()), 'page_count': len(reader.pages) } except PyPDF2.errors.PdfReadError as e: logger.error(f"Error leyendo PDF: {e}") return {'type': 'pdf', 'error': str(e)} except Exception as e: logger.error(f"Error procesando PDF: {e}") return {'type': 'pdf', 'error': str(e)} def _clean_text(self, text: str) -> str: """Limpia y normaliza el texto removiendo espacios y caracteres especiales.""" if not text: return "" text = re.sub(r'\s+', ' ', text) return re.sub(r'[^\w\sáéíóúñÁÉÍÓÚÑ]', ' ', text).strip() def _extract_metadata(self, soup: BeautifulSoup) -> Dict: """Extrae metadatos relevantes (título, descripción, keywords, Open Graph) de la página.""" metadata = {'title': '', 'description': '', 'keywords': [], 'og': {}} if soup.title and soup.title.string: metadata['title'] = soup.title.string.strip()[:200] for meta in soup.find_all('meta'): name = meta.get('name', '').lower() prop = meta.get('property', '').lower() content = meta.get('content', '') if name == 'description': metadata['description'] = content[:300] elif name == 'keywords': metadata['keywords'] = [kw.strip() for kw in content.split(',') if kw.strip()] elif prop.startswith('og:'): metadata['og'][prop[3:]] = content return metadata def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict]: """Extrae enlaces de la página, distinguiendo entre internos y externos.""" links: List[Dict] = [] base_netloc = urlparse(base_url).netloc for tag in soup.find_all('a', href=True): try: href = tag['href'].strip() if not href or href.startswith('javascript:'): continue full_url = urljoin(base_url, href) parsed = urlparse(full_url) links.append({ 'url': full_url, 'type': 'internal' if parsed.netloc == base_netloc else 'external', 'anchor': self._clean_text(tag.get_text())[:100], 'file_type': self._get_file_type(parsed.path) }) except Exception as e: logger.warning(f"Error procesando enlace {tag.get('href')}: {e}") continue return links def _get_file_type(self, path: str) -> str: """Determina el tipo de archivo según la extensión.""" ext = Path(path).suffix.lower() return ext[1:] if ext else 'html' def _parse_sitemap(self, sitemap_url: str) -> List[str]: """Parsea un sitemap XML (y posibles índices de sitemaps) para extraer URLs.""" try: response = self.session.get(sitemap_url, timeout=10) response.raise_for_status() if 'xml' not in response.headers.get('Content-Type', ''): logger.warning(f"El sitemap no parece ser XML: {sitemap_url}") return [] soup = BeautifulSoup(response.text, 'lxml-xml') urls: List[str] = [] if soup.find('sitemapindex'): for sitemap in soup.find_all('loc'): url = sitemap.text.strip() if url.endswith('.xml'): urls.extend(self._parse_sitemap(url)) else: urls = [loc.text.strip() for loc in soup.find_all('loc')] filtered_urls = list({url for url in urls if url.startswith('http')}) return filtered_urls except Exception as e: logger.error(f"Error al parsear sitemap {sitemap_url}: {e}") return [] def _save_content(self, url: str, content: bytes) -> None: """ Guarda el contenido descargado en una estructura de directorios organizada por dominio, sanitizando el nombre del archivo y evitando sobrescribir archivos idénticos mediante hash. """ try: parsed = urlparse(url) domain_dir = self.base_dir / parsed.netloc raw_path = parsed.path.lstrip('/') # Si la ruta está vacía o termina en '/', asigna 'index.html' if not raw_path or raw_path.endswith('/'): raw_path = os.path.join(raw_path, 'index.html') if raw_path else 'index.html' safe_path = sanitize_filename(raw_path) save_path = domain_dir / safe_path save_path.parent.mkdir(parents=True, exist_ok=True) new_hash = hashlib.md5(content).hexdigest() if save_path.exists(): with open(save_path, 'rb') as f: existing_content = f.read() existing_hash = hashlib.md5(existing_content).hexdigest() if new_hash == existing_hash: logger.debug(f"El contenido de {url} ya está guardado.") return with open(save_path, 'wb') as f: f.write(content) logger.info(f"Guardado contenido en: {save_path}") except Exception as e: logger.error(f"Error guardando contenido para {url}: {e}") def _calculate_stats(self, results: List[Dict]) -> Dict: """Calcula estadísticas generales del análisis.""" successful = [r for r in results if r.get('status') == 'success'] content_types = [r.get('type', 'unknown') for r in successful] avg_word_count = round(np.mean([r.get('word_count', 0) for r in successful]) if successful else 0, 1) return { 'total_urls': len(results), 'successful': len(successful), 'failed': len(results) - len(successful), 'content_types': pd.Series(content_types).value_counts().to_dict(), 'avg_word_count': avg_word_count, 'failed_urls': [r['url'] for r in results if r.get('status') != 'success'] } def _analyze_content(self, results: List[Dict]) -> Dict: """ Genera un análisis de contenido agregado usando TF-IDF para extraer las palabras clave principales y muestras. """ successful = [r for r in results if r.get('status') == 'success' and r.get('content')] texts = [r['content'] for r in successful if len(r['content'].split()) > 10] if not texts: return {'top_keywords': [], 'content_samples': []} try: stop_words = list(self.models['spacy'].Defaults.stop_words) vectorizer = TfidfVectorizer(stop_words=stop_words, max_features=50, ngram_range=(1, 2)) tfidf = vectorizer.fit_transform(texts) feature_names = vectorizer.get_feature_names_out() sorted_indices = np.argsort(np.asarray(tfidf.sum(axis=0)).ravel())[-10:] top_keywords = feature_names[sorted_indices][::-1].tolist() except Exception as e: logger.error(f"Error en análisis TF-IDF: {e}") top_keywords = [] samples = [{'url': r['url'], 'sample': (r['content'][:500] + '...') if len(r['content']) > 500 else r['content']} for r in successful[:3]] return {'top_keywords': top_keywords, 'content_samples': samples} def _analyze_links(self, results: List[Dict]) -> Dict: """Genera un análisis de enlaces internos, dominios externos, anclas y tipos de archivos.""" all_links = [] for result in results: if result.get('links'): all_links.extend(result['links']) if not all_links: return {'internal_links': {}, 'external_domains': {}, 'common_anchors': {}, 'file_types': {}} df = pd.DataFrame(all_links) return { 'internal_links': df[df['type'] == 'internal']['url'].value_counts().head(20).to_dict(), 'external_domains': df[df['type'] == 'external']['url'].apply(lambda x: urlparse(x).netloc).value_counts().head(10).to_dict(), 'common_anchors': df['anchor'].value_counts().head(10).to_dict(), 'file_types': df['file_type'].value_counts().to_dict() } def _generate_seo_recommendations(self, results: List[Dict]) -> List[str]: """Genera recomendaciones SEO en base a las deficiencias encontradas en el análisis.""" successful = [r for r in results if r.get('status') == 'success'] if not successful: return ["No se pudo analizar ningún contenido exitosamente"] recs = [] missing_titles = sum(1 for r in successful if not r.get('metadata', {}).get('title')) if missing_titles: recs.append(f"📌 Añadir títulos a {missing_titles} páginas") short_descriptions = sum(1 for r in successful if not r.get('metadata', {}).get('description')) if short_descriptions: recs.append(f"📌 Añadir meta descripciones a {short_descriptions} páginas") short_content = sum(1 for r in successful if r.get('word_count', 0) < 300) if short_content: recs.append(f"📝 Ampliar contenido en {short_content} páginas (menos de 300 palabras)") all_links = [link for r in results for link in r.get('links', [])] if all_links: df_links = pd.DataFrame(all_links) internal_links = df_links[df_links['type'] == 'internal'] if len(internal_links) > 100: recs.append(f"🔗 Optimizar estructura de enlaces internos ({len(internal_links)} enlaces)") return recs if recs else ["✅ No se detectaron problemas críticos de SEO"] def plot_internal_links(self, links_data: Dict) -> Any: """ Genera un gráfico de barras horizontales mostrando los 20 principales enlaces internos. Si no existen datos, se muestra un mensaje en el gráfico. """ internal_links = links_data.get('internal_links', {}) fig, ax = plt.subplots() if not internal_links: ax.text(0.5, 0.5, 'No hay enlaces internos', horizontalalignment='center', verticalalignment='center', transform=ax.transAxes) ax.axis('off') else: names = list(internal_links.keys()) counts = list(internal_links.values()) ax.barh(names, counts) ax.set_xlabel("Cantidad de enlaces") ax.set_title("Top 20 Enlaces Internos") plt.tight_layout() return fig