Spaces:
Running
Running
import os | |
import json | |
import hashlib | |
import logging | |
import tempfile | |
from datetime import datetime | |
import pygsheets | |
import pandas as pd | |
from typing import List | |
from pydantic import BaseModel | |
from fastapi import FastAPI, HTTPException, status, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import JSONResponse | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
app = FastAPI(title="Data Samples API") | |
# Add CORS middleware to allow web requests | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Allows all origins | |
allow_credentials=True, | |
allow_methods=["*"], # Allows all methods | |
allow_headers=["*"], # Allows all headers | |
) | |
# Input model for samples (contains only text) | |
class Sample(BaseModel): | |
text: str | |
# Response model (includes the generated ID and timestamps) | |
class SampleResponse(BaseModel): | |
id: str | |
text: str | |
created_datetime: str | |
modified_datetime: str | |
# Exception handler for Google Sheets issues | |
async def general_exception_handler(request: Request, exc: Exception): | |
logger.error(f"Unhandled exception: {exc}") | |
return JSONResponse( | |
status_code=500, | |
content={"detail": f"Internal server error: {str(exc)}"} | |
) | |
# Google Sheets setup - using environment variables for credentials | |
def get_worksheet(): | |
try: | |
# Check if running in HuggingFace Spaces or local environment | |
if "GOOGLE_SERVICE_JSON_CONTENT" in os.environ: | |
# Get credentials from environment variable | |
service_account_info = json.loads(os.environ["GOOGLE_SERVICE_JSON_CONTENT"]) | |
# Create a temporary JSON file with the credentials | |
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.json') as temp_file: | |
json.dump(service_account_info, temp_file) | |
temp_filepath = temp_file.name | |
try: | |
# Authorize with the temporary file | |
gc = pygsheets.authorize(service_file=temp_filepath) | |
# Open the spreadsheet and return the first worksheet | |
sh = gc.open('drivelology') | |
return sh[0] | |
finally: | |
# Clean up the temporary file | |
if os.path.exists(temp_filepath): | |
os.unlink(temp_filepath) | |
else: | |
# Local development - use the JSON file directly | |
gc = pygsheets.authorize(service_file='drivelology-1b65510988e8.json') | |
sh = gc.open('drivelology') | |
return sh[0] # Return the first worksheet | |
except Exception as e: | |
logger.error(f"Error accessing Google Sheets: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to access Google Sheets: {str(e)}") | |
# Helper function to generate ID from text | |
def generate_id_from_text(text: str) -> str: | |
# Create SHA-256 hash of the text | |
return hashlib.sha256(text.encode()).hexdigest()[:16] # Using first 16 chars of hash for ID | |
# Helper function to get current datetime as string | |
def get_current_datetime() -> str: | |
return datetime.now().isoformat() | |
# Helper functions for Google Sheets operations | |
def load_samples_from_sheets(): | |
try: | |
worksheet = get_worksheet() | |
records = worksheet.get_all_records() | |
# Debug log | |
logger.info(f"Retrieved {len(records)} records from Google Sheets") | |
samples = {} | |
for record in records: | |
if record and 'id' in record and 'text' in record: | |
# Handle potential missing fields with defaults | |
sample_id = record['id'] | |
created_time = record.get('created_datetime', get_current_datetime()) | |
modified_time = record.get('modified_datetime', created_time) | |
samples[sample_id] = SampleResponse( | |
id=sample_id, | |
text=record['text'], | |
created_datetime=created_time, | |
modified_datetime=modified_time | |
) | |
logger.info(f"Processed {len(samples)} valid samples") | |
return samples | |
except Exception as e: | |
logger.error(f"Error loading samples from sheets: {e}") | |
# Return empty dict on error to avoid crashing | |
return {} | |
def save_samples_to_sheets(samples): | |
try: | |
worksheet = get_worksheet() | |
# Convert samples to DataFrame | |
data = [] | |
for sample in samples.values(): | |
data.append({ | |
'id': sample.id, | |
'text': sample.text, | |
'created_datetime': sample.created_datetime, | |
'modified_datetime': sample.modified_datetime | |
}) | |
# Create DataFrame and update the worksheet | |
df = pd.DataFrame(data) | |
if not data: | |
# If no samples, create an empty dataframe with the right columns | |
df = pd.DataFrame(columns=['id', 'text', 'created_datetime', 'modified_datetime']) | |
# Clear the worksheet and update with new data | |
worksheet.clear() | |
worksheet.set_dataframe(df, (0, 0)) | |
logger.info(f"Successfully saved {len(samples)} samples to Google Sheets") | |
return True | |
except Exception as e: | |
logger.error(f"Error saving samples to sheets: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to save data: {str(e)}") | |
# Get samples from Google Sheets | |
def get_samples(): | |
return load_samples_from_sheets() | |
async def add_sample(sample: Sample): | |
"""Add a new data sample""" | |
try: | |
# Load current samples | |
samples = get_samples() | |
# Generate ID based on text content | |
sample_id = generate_id_from_text(sample.text) | |
# Get current datetime | |
current_time = get_current_datetime() | |
# Create a sample response with ID, text, and timestamps | |
sample_response = SampleResponse( | |
id=sample_id, | |
text=sample.text, | |
created_datetime=current_time, | |
modified_datetime=current_time | |
) | |
# Store the sample | |
samples[sample_id] = sample_response | |
# Save to Google Sheets | |
save_samples_to_sheets(samples) | |
logger.info(f"Added new sample with ID: {sample_id}") | |
return sample_response | |
except Exception as e: | |
logger.error(f"Error adding sample: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to add sample: {str(e)}") | |
async def get_all_samples(): | |
"""Get all data samples""" | |
try: | |
samples = get_samples() | |
result = list(samples.values()) | |
logger.info(f"Returning {len(result)} samples") | |
return result | |
except Exception as e: | |
logger.error(f"Error retrieving samples: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to retrieve samples: {str(e)}") | |
async def get_sample(sample_id: str): | |
"""Get a specific data sample by ID""" | |
try: | |
samples = get_samples() | |
if sample_id not in samples: | |
logger.warning(f"Sample ID not found: {sample_id}") | |
raise HTTPException(status_code=404, detail="Sample not found") | |
return samples[sample_id] | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving sample {sample_id}: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to retrieve sample: {str(e)}") | |
async def update_sample(sample_id: str, updated_sample: Sample): | |
"""Update an existing data sample""" | |
try: | |
samples = get_samples() | |
if sample_id not in samples: | |
logger.warning(f"Attempted to update non-existent sample: {sample_id}") | |
raise HTTPException(status_code=404, detail="Sample not found") | |
# Get the existing sample to preserve created_datetime | |
existing_sample = samples[sample_id] | |
# Get current datetime for modified_datetime | |
current_time = get_current_datetime() | |
# Create updated sample with original ID and updated timestamps | |
updated_sample_response = SampleResponse( | |
id=sample_id, | |
text=updated_sample.text, | |
created_datetime=existing_sample.created_datetime, | |
modified_datetime=current_time | |
) | |
# Store the updated sample | |
samples[sample_id] = updated_sample_response | |
# Save to Google Sheets | |
save_samples_to_sheets(samples) | |
logger.info(f"Updated sample with ID: {sample_id}") | |
return updated_sample_response | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error updating sample {sample_id}: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to update sample: {str(e)}") | |
async def delete_sample(sample_id: str): | |
"""Delete a data sample""" | |
try: | |
samples = get_samples() | |
if sample_id not in samples: | |
logger.warning(f"Attempted to delete non-existent sample: {sample_id}") | |
raise HTTPException(status_code=404, detail="Sample not found") | |
# Remove the sample | |
del samples[sample_id] | |
# Save to Google Sheets | |
save_samples_to_sheets(samples) | |
logger.info(f"Deleted sample with ID: {sample_id}") | |
return None | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error deleting sample {sample_id}: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to delete sample: {str(e)}") | |
# Initialize the Google Sheet with headers if needed | |
def initialize_sheet(): | |
try: | |
worksheet = get_worksheet() | |
# Check if the worksheet has headers | |
records = worksheet.get_all_records() | |
# If empty, add headers | |
if not records: | |
logger.info("Initializing empty Google Sheet with headers") | |
df = pd.DataFrame(columns=['id', 'text', 'created_datetime', 'modified_datetime']) | |
worksheet.set_dataframe(df, (0, 0)) | |
logger.info("Google Sheet initialized successfully") | |
except Exception as e: | |
logger.error(f"Error initializing sheet: {e}") | |
# Don't raise exception here to allow the app to start even if sheet init fails | |
pass | |
# Initialize the sheet when the application starts | |
initialize_sheet() | |
# Mount the static files directory at the end after defining all API routes | |
app.mount("/", StaticFiles(directory="static", html=True), name="static") |