from typing import List, Dict, Optional
from datetime import datetime, timedelta
from fastapi import FastAPI, HTTPException, Query, Body, Request
from pydantic import BaseModel, validator, root_validator
import json
import os
from user_agents import parse

app = FastAPI()

# Data storage (in-memory)
user_data: Dict[str, dict] = {}  # Key: IP address, Value: User entry

# --- Data Models ---
class UserEntry(BaseModel):
    ip_address: str
    device_type: str = "N/A"
    timestamp: datetime =
    browser: str = "N/A"
    OS: str = "N/A"

    def validate_ip_address(cls, value):
        parts = value.split('.')
        if len(parts) != 4:
            raise ValueError("Invalid IP address format")
        for part in parts:
                num = int(part)
                if not 0 <= num <= 255:
                    raise ValueError("Invalid IP address value")
            except ValueError:
                raise ValueError("Invalid IP address value")
        return value

    def set_default_values(cls, values):
        """Set default values for missing fields."""
        defaults = {
            "device_type": "N/A",
            "browser": "N/A",
            "OS": "N/A",
        for key, default_value in defaults.items():
            if key not in values or values[key] is None:
                values[key] = default_value
        return values

def clean_old_data():
    """Deletes data older than 7 days."""
    global user_data
    cutoff_time = - timedelta(days=7)
    ips_to_delete = [ip for ip, entry in user_data.items() if entry["timestamp"] < cutoff_time]
    for ip in ips_to_delete:
        del user_data[ip]

# --- API Endpoints ---"/auto_entry/", response_model=UserEntry, status_code=201)
async def create_auto_user_entry(
    request: Request
    Endpoint to automatically record user entry by extracting the IP address
    from the request and taking device_type and optional timestamp as input.
        # Automatically extract the client's IP address
        ip_address =
        if "x-forwarded-for" in request.headers:
            ip_address = request.headers["x-forwarded-for"].split(",")[0]

        user_agent = request.headers.get("User-Agent", "N/A")
        user_agent_parsed = parse(user_agent)
        device_type = "Mobile" if user_agent_parsed.is_mobile else "Tablet" if user_agent_parsed.is_tablet else "Desktop"
        browser_name = if user_agent_parsed else "N/A"
        os_name = if user_agent_parsed else "N/A"

        timestamp =

        # Create a UserEntry object
        entry_data = UserEntry(

        # Save the entry
        user_data[ip_address] = entry_data.dict()

        return entry_data

    except ValueError as ve:
        raise HTTPException(status_code=400, detail=str(ve))
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Internal server error: {e}")

async def get_user_analytics(period: str = Query(..., enum=["last_hour", "last_day", "last_7_day"])):
    """Endpoint to get advanced user analytics."""
        clean_old_data()  # Clean data before processing

        now =
        if period == "last_hour":
            cutoff = now - timedelta(hours=1)
        elif period == "last_day":
            cutoff = now - timedelta(days=1)
        elif period == "last_7_day":
            cutoff = now - timedelta(days=7)
            raise HTTPException(status_code=400, detail="Invalid period specified")

        filtered_data = [entry for entry in user_data.values() if entry["timestamp"] >= cutoff]
        unique_users = len(set(entry["ip_address"] for entry in filtered_data))
        device_counts: Dict[str, int] = {}
        browser_counts: Dict[str, int] = {}
        os_counts: Dict[str, int] = {}

        for entry in filtered_data:
            device_counts[entry["device_type"]] = device_counts.get(entry["device_type"], 0) + 1
            browser_counts[entry["browser"]] = browser_counts.get(entry["browser"], 0) + 1
            os_counts[entry["OS"]] = os_counts.get(entry["OS"], 0) + 1

        # Calculate percentages
        total_entries = len(filtered_data)
        device_percentages = {device: (count / total_entries) * 100 for device, count in device_counts.items()}
        browser_percentages = {browser: (count / total_entries) * 100 for browser, count in browser_counts.items()}
        os_percentages = {os: (count / total_entries) * 100 for os, count in os_counts.items()}

        return {
            "total_unique_users": unique_users,
            "device_type_info": {
                "counts": device_counts,
                "percentages": device_percentages
            "browser_info": {
                "counts": browser_counts,
                "percentages": browser_percentages
            "os_info": {
                "counts": os_counts,
                "percentages": os_percentages
            "total_entries": total_entries
    except HTTPException as he:
        raise he
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error generating analytics: {e}")

@app.get("/export/", response_model=Dict[str, UserEntry])
async def export_user_data():
    """Endpoint to export all user data in JSON format."""
        clean_old_data()  # Ensure no old data is exported
        return user_data
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error exporting data: {e}")"/import/")
async def import_user_data(data: Dict[str, dict] = Body(...)):
    """Endpoint to import user data from JSON."""
        imported_count = 0
        for ip, entry_dict in data.items():
                # Validate the imported entry
                entry = UserEntry(**entry_dict)
                entry.timestamp = datetime.fromisoformat(entry_dict.get("timestamp", # Ensure timestamp is datetime
                user_data[ip] = entry.dict()
                imported_count += 1
            except Exception as e:
                print(f"Error importing entry for IP {ip}: {e}")  # Log individual import errors
        return {"message": f"Successfully imported {imported_count} user entries."}
    except json.JSONDecodeError:
        raise HTTPException(status_code=400, detail="Invalid JSON format")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error importing data: {e}")

class PollCreate(BaseModel):
    poll_name: str
    question: str
    options: List[str]

class PollEntry(BaseModel):
    poll_name: str
    response: int

# --- API Endpoints ---"/poll/create/", status_code=201)
async def create_poll(poll: PollCreate):
    """Endpoint to create a new poll."""
    if poll.poll_name in poll_data:
        raise HTTPException(status_code=400, detail="Poll with this name already exists.")
    poll_data[poll.poll_name] = {
        "question": poll.question,
        "options": poll.options,
    poll_responses[poll.poll_name] = {}
    return {"message": "Poll created successfully.", "poll_name": poll.poll_name}"/poll/entry/", status_code=201)
async def create_poll_entry(request: Request, poll_entry: PollEntry):
    """Endpoint to record a user's response to a poll."""
    ip_address =
    if "x-forwarded-for" in request.headers:
        ip_address = request.headers["x-forwarded-for"].split(",")[0]

    if poll_entry.poll_name not in poll_data:
        raise HTTPException(status_code=404, detail="Poll not found.")

    if poll_entry.response < 1 or poll_entry.response > len(poll_data[poll_entry.poll_name]["options"]):
        raise HTTPException(status_code=400, detail="Invalid response option.")

    poll_responses[poll_entry.poll_name][ip_address] = poll_entry.response
    return {"message": "Poll entry recorded successfully."}

async def get_poll_analytics(poll_name: str = Query(..., description="Name of the poll")):
    """Endpoint to get analytics for a specific poll."""
    if poll_name not in poll_data:
        raise HTTPException(status_code=404, detail="Poll not found.")

    responses = poll_responses[poll_name]
    response_counts = {option: 0 for option in range(1, len(poll_data[poll_name]["options"]) + 1)}
    for response in responses.values():
        response_counts[response] += 1

    return {
        "poll_name": poll_name,
        "question": poll_data[poll_name]["question"],
        "options": poll_data[poll_name]["options"],
        "response_counts": response_counts,
        "total_responses": len(responses)

# --- Background Task for Poll Deletion ---
async def scheduled_poll_cleanup():
    """Periodically clean up old polls."""
    while True:
        now =
        polls_to_delete = [poll_name for poll_name, poll in poll_data.items() if (now - poll["created_at"]).days >= 7]
        for poll_name in polls_to_delete:
            del poll_data[poll_name]
            del poll_responses[poll_name]
        await asyncio.sleep(60 * 60 * 24)  # Clean every 24 hours
# --- Background Task (Optional, for regular cleanup) ---
async def scheduled_cleanup():
    """Periodically clean up old data."""
    while True:
        await asyncio.sleep(60 * 60)  # Clean every hour

# Import asyncio if you use the background task
import asyncio
from fastapi import BackgroundTasks

async def startup_event():
    # You can uncomment this to run the background task

# --- Error Handling (Advanced - using exception handlers) ---
from fastapi import Request
from fastapi.responses import JSONResponse

async def http_exception_handler(request: Request, exc: HTTPException):
    return JSONResponse(
        content={"message": exc.detail},

async def general_exception_handler(request: Request, exc: Exception):
    return JSONResponse(
        content={"message": f"An unexpected error occurred: {exc}"},

if __name__ == "__main__":
    import uvicorn, host="", port=8083, debug=True)