Spaces:
Running
Running
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, pipeline | |
import difflib | |
import spacy | |
import re | |
from nltk.sentiment import SentimentIntensityAnalyzer | |
import nltk | |
from collections import Counter | |
import uvicorn | |
import os | |
import torch | |
# Download NLTK resources | |
try: | |
nltk.download('vader_lexicon', quiet=True) | |
nltk.download('punkt', quiet=True) | |
nltk.download('stopwords', quiet=True) | |
except: | |
print("Could not download NLTK resources. Some features may be limited.") | |
app = FastAPI() | |
# Configure CORS | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Allows all origins | |
allow_credentials=True, | |
allow_methods=["*"], # Allows all methods | |
allow_headers=["*"], # Allows all headers | |
) | |
# Global variable for the pipeline | |
humanize_pipe = None | |
# Load NLP models | |
try: | |
# Load spaCy model | |
nlp = spacy.load("en_core_web_sm") | |
# Initialize sentiment analyzer | |
sentiment_analyzer = SentimentIntensityAnalyzer() | |
print("NLP models loaded successfully!") | |
except Exception as e: | |
print(f"Error loading NLP models: {e}") | |
# Create fallback functions if models fail to load | |
def mock_function(text): | |
return "Model could not be loaded. This is a fallback response." | |
def get_humanize_pipeline(): | |
""" | |
Lazy-load the humanization pipeline on first use. | |
Uses standard settings that don't require accelerate. | |
""" | |
global humanize_pipe | |
if humanize_pipe is None: | |
try: | |
print("Loading the humanizer model on CPU...") | |
# Force CPU usage | |
device = torch.device("cpu") | |
# Load model with basic settings (no accelerate needed) | |
model = AutoModelForSeq2SeqLM.from_pretrained( | |
"danibor/flan-t5-base-humanizer", | |
torch_dtype=torch.float32 # Use float32 instead of float16 for CPU | |
) | |
tokenizer = AutoTokenizer.from_pretrained("danibor/flan-t5-base-humanizer") | |
# Create pipeline with basic settings | |
humanize_pipe = pipeline( | |
"text2text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
device=device # Explicitly specify CPU | |
) | |
print("Humanizer model loaded successfully!") | |
return humanize_pipe | |
except Exception as e: | |
print(f"Error loading humanizer model: {e}") | |
# Create a simple pipeline-like function that just returns the input | |
def simple_pipeline(text, **kwargs): | |
return [{"generated_text": f"Could not process: {text} (Model failed to load)"}] | |
humanize_pipe = simple_pipeline | |
return humanize_pipe | |
return humanize_pipe | |
# Define request models | |
class TextRequest(BaseModel): | |
text: str | |
class HumanizeResponse(BaseModel): | |
original_text: str | |
humanized_text: str | |
diff: list | |
original_word_count: int | |
humanized_word_count: int | |
nlp_analysis: dict | |
class AnalyzeResponse(BaseModel): | |
text: str | |
word_count: int | |
sentiment: dict | |
entities: dict | |
key_phrases: list | |
readability: dict | |
complexity: dict | |
async def humanize_text(request: TextRequest): | |
input_text = request.text | |
try: | |
# Get or initialize the pipeline | |
pipeline = get_humanize_pipeline() | |
# Generate humanized text with basic settings | |
result = pipeline( | |
input_text, | |
max_length=min(500, len(input_text) * 2), # Limit max length | |
do_sample=True | |
) | |
humanized_text = result[0]['generated_text'] | |
# Get the differences | |
diff = get_diff(input_text, humanized_text) | |
# Process both texts with NLP | |
nlp_analysis = perform_nlp_analysis(input_text, humanized_text) | |
return { | |
'original_text': input_text, | |
'humanized_text': humanized_text, | |
'diff': diff, | |
'original_word_count': len(input_text.split()), | |
'humanized_word_count': len(humanized_text.split()), | |
'nlp_analysis': nlp_analysis | |
} | |
except Exception as e: | |
print(f"Error in humanize endpoint: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error processing text: {str(e)}") | |
def get_diff(text1, text2): | |
""" | |
Generate a list of changes between two texts. | |
Returns a list of tuples (operation, text) | |
where operation is '+' for addition, '-' for deletion, or ' ' for unchanged. | |
""" | |
d = difflib.Differ() | |
diff = list(d.compare(text1.split(), text2.split())) | |
result = [] | |
for item in diff: | |
operation = item[0] | |
if operation in ['+', '-', ' ']: | |
text = item[2:] | |
result.append({'operation': operation, 'text': text}) | |
return result | |
def perform_nlp_analysis(original_text, humanized_text): | |
""" | |
Perform comprehensive NLP analysis on both original and humanized text. | |
""" | |
result = {} | |
# Process both texts with spaCy | |
original_doc = nlp(original_text) | |
humanized_doc = nlp(humanized_text) | |
# Sentiment analysis | |
original_sentiment = sentiment_analyzer.polarity_scores(original_text) | |
humanized_sentiment = sentiment_analyzer.polarity_scores(humanized_text) | |
# Extract named entities | |
original_entities = extract_entities(original_doc) | |
humanized_entities = extract_entities(humanized_doc) | |
# Extract key phrases using noun chunks | |
original_phrases = extract_key_phrases(original_doc) | |
humanized_phrases = extract_key_phrases(humanized_doc) | |
# Readability metrics | |
original_readability = calculate_readability(original_text) | |
humanized_readability = calculate_readability(humanized_text) | |
# Complexity metrics | |
original_complexity = analyze_complexity(original_doc) | |
humanized_complexity = analyze_complexity(humanized_doc) | |
# Compile all results | |
result = { | |
'original': { | |
'sentiment': original_sentiment, | |
'entities': original_entities, | |
'key_phrases': original_phrases, | |
'readability': original_readability, | |
'complexity': original_complexity | |
}, | |
'humanized': { | |
'sentiment': humanized_sentiment, | |
'entities': humanized_entities, | |
'key_phrases': humanized_phrases, | |
'readability': humanized_readability, | |
'complexity': humanized_complexity | |
} | |
} | |
return result | |
def extract_entities(doc): | |
"""Extract and categorize named entities from a spaCy document.""" | |
entities = {} | |
for ent in doc.ents: | |
if ent.label_ not in entities: | |
entities[ent.label_] = [] | |
if ent.text not in entities[ent.label_]: | |
entities[ent.label_].append(ent.text) | |
return entities | |
def extract_key_phrases(doc): | |
"""Extract key phrases using noun chunks.""" | |
return [chunk.text for chunk in doc.noun_chunks][:10] # Limit to top 10 | |
def calculate_readability(text): | |
"""Calculate basic readability metrics.""" | |
# Count sentences | |
sentences = len(list(nltk.sent_tokenize(text))) | |
if sentences == 0: | |
sentences = 1 # Avoid division by zero | |
# Count words | |
words = len(text.split()) | |
if words == 0: | |
words = 1 # Avoid division by zero | |
# Average words per sentence | |
avg_words_per_sentence = words / sentences | |
# Count syllables (simplified approach) | |
syllables = count_syllables(text) | |
# Calculate Flesch Reading Ease | |
flesch = 206.835 - 1.015 * (words / sentences) - 84.6 * (syllables / words) | |
return { | |
'sentence_count': sentences, | |
'word_count': words, | |
'avg_words_per_sentence': round(avg_words_per_sentence, 2), | |
'syllable_count': syllables, | |
'flesch_reading_ease': round(flesch, 2) | |
} | |
def count_syllables(text): | |
"""Count syllables in text (simplified approach).""" | |
# This is a simplified syllable counter | |
text = text.lower() | |
text = re.sub(r'[^a-zA-Z]', ' ', text) | |
words = text.split() | |
count = 0 | |
for word in words: | |
word = word.strip() | |
if not word: | |
continue | |
# Count vowel groups as syllables | |
if word[-1] == 'e': | |
word = word[:-1] | |
vowel_count = len(re.findall(r'[aeiouy]+', word)) | |
if vowel_count == 0: | |
vowel_count = 1 | |
count += vowel_count | |
return count | |
def analyze_complexity(doc): | |
"""Analyze text complexity using POS tags and dependency parsing.""" | |
# Count POS tags | |
pos_counts = Counter([token.pos_ for token in doc]) | |
# Calculate lexical diversity | |
total_tokens = len(doc) | |
unique_tokens = len(set([token.text.lower() for token in doc])) | |
lexical_diversity = unique_tokens / total_tokens if total_tokens > 0 else 0 | |
# Count dependency relationship types | |
dep_counts = Counter([token.dep_ for token in doc]) | |
return { | |
'pos_distribution': dict(pos_counts), | |
'lexical_diversity': round(lexical_diversity, 4), | |
'dependency_types': dict(dep_counts) | |
} | |
async def analyze_text(request: TextRequest): | |
"""Endpoint to just analyze text without humanizing it.""" | |
input_text = request.text | |
try: | |
# Process text with NLP | |
doc = nlp(input_text) | |
# Analyze text | |
sentiment = sentiment_analyzer.polarity_scores(input_text) | |
entities = extract_entities(doc) | |
key_phrases = extract_key_phrases(doc) | |
readability = calculate_readability(input_text) | |
complexity = analyze_complexity(doc) | |
return { | |
'text': input_text, | |
'word_count': len(input_text.split()), | |
'sentiment': sentiment, | |
'entities': entities, | |
'key_phrases': key_phrases, | |
'readability': readability, | |
'complexity': complexity | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error analyzing text: {str(e)}") | |
# Add a root endpoint for Hugging Face Spaces health check | |
async def root(): | |
return {"message": "Text Analysis and Humanization API is running!"} | |
# For local development | |
if __name__ == "__main__": | |
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |