Spaces:
paola1
/
Sleeping

paola1 commited on
Commit
63fb485
·
verified ·
1 Parent(s): 3821a22

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +108 -65
app.py CHANGED
@@ -1,23 +1,23 @@
1
  from typing import List, Dict, Optional
2
  from datetime import datetime, timedelta
3
  from fastapi import FastAPI, HTTPException, Query, Body, Request
4
- from pydantic import BaseModel, validator
5
  import json
6
  import os
 
7
 
8
  app = FastAPI()
9
 
10
  # Data storage (in-memory)
11
  user_data: Dict[str, dict] = {} # Key: IP address, Value: User entry
12
 
13
- # Data storage file
14
- DATA_FILE = "user_data.json"
15
-
16
  # --- Data Models ---
17
  class UserEntry(BaseModel):
18
  ip_address: str
19
- device_type: str
20
- timestamp: datetime
 
 
21
 
22
  @validator("ip_address")
23
  def validate_ip_address(cls, value):
@@ -33,36 +33,19 @@ class UserEntry(BaseModel):
33
  raise ValueError("Invalid IP address value")
34
  return value
35
 
36
- # --- Helper Functions ---
37
- def load_data():
38
- """Loads data from the JSON file into the in-memory store."""
39
- global user_data
40
- if os.path.exists(DATA_FILE):
41
- try:
42
- with open(DATA_FILE, "r") as f:
43
- data = json.load(f)
44
- # Convert timestamp strings back to datetime objects
45
- user_data = {
46
- ip: {**entry, "timestamp": datetime.fromisoformat(entry["timestamp"])}
47
- for ip, entry in data.items()
48
- }
49
- except Exception as e:
50
- print(f"Error loading data from {DATA_FILE}: {e}")
51
- # Handle the error gracefully, maybe start with an empty dataset
52
- user_data = {}
53
-
54
- def save_data():
55
- """Saves the in-memory data to the JSON file."""
56
- try:
57
- with open(DATA_FILE, "w") as f:
58
- # Convert datetime objects to ISO format for JSON serialization
59
- serializable_data = {
60
- ip: {**entry, "timestamp": entry["timestamp"].isoformat()}
61
- for ip, entry in user_data.items()
62
- }
63
- json.dump(serializable_data, f, indent=4)
64
- except Exception as e:
65
- print(f"Error saving data to {DATA_FILE}: {e}")
66
 
67
  def clean_old_data():
68
  """Deletes data older than 7 days."""
@@ -71,18 +54,11 @@ def clean_old_data():
71
  ips_to_delete = [ip for ip, entry in user_data.items() if entry["timestamp"] < cutoff_time]
72
  for ip in ips_to_delete:
73
  del user_data[ip]
74
- if ips_to_delete:
75
- save_data() # Save changes after deleting old data
76
-
77
- # Load data on startup
78
- load_data()
79
 
80
  # --- API Endpoints ---
81
  @app.post("/auto_entry/", response_model=UserEntry, status_code=201)
82
  async def create_auto_user_entry(
83
- request: Request,
84
- device_type: str = Body(...),
85
- timestamp: Optional[datetime] = Body(None)
86
  ):
87
  """
88
  Endpoint to automatically record user entry by extracting the IP address
@@ -94,20 +70,26 @@ async def create_auto_user_entry(
94
  if "x-forwarded-for" in request.headers:
95
  ip_address = request.headers["x-forwarded-for"].split(",")[0]
96
 
97
- # If timestamp is not provided, use the current time
98
- if not timestamp:
99
- timestamp = datetime.now()
 
 
 
 
 
100
 
101
  # Create a UserEntry object
102
  entry_data = UserEntry(
103
  ip_address=ip_address,
104
  device_type=device_type,
105
- timestamp=timestamp
 
 
106
  )
107
 
108
  # Save the entry
109
  user_data[ip_address] = entry_data.dict()
110
- save_data()
111
 
112
  return entry_data
113
 
@@ -116,19 +98,6 @@ async def create_auto_user_entry(
116
  except Exception as e:
117
  raise HTTPException(status_code=500, detail=f"Internal server error: {e}")
118
 
119
- @app.post("/entry/", response_model=UserEntry, status_code=201)
120
- async def create_user_entry(entry_data: UserEntry):
121
- """Endpoint to record user entry."""
122
- try:
123
- entry_data.timestamp = datetime.now()
124
- user_data[entry_data.ip_address] = entry_data.dict()
125
- save_data()
126
- return entry_data
127
- except ValueError as ve:
128
- raise HTTPException(status_code=400, detail=str(ve))
129
- except Exception as e:
130
- raise HTTPException(status_code=500, detail=f"Internal server error: {e}")
131
-
132
  @app.get("/analytics/")
133
  async def get_user_analytics(period: str = Query(..., enum=["last_hour", "last_day", "last_7_day"])):
134
  """Endpoint to get user analytics."""
@@ -178,19 +147,92 @@ async def import_user_data(data: Dict[str, dict] = Body(...)):
178
  try:
179
  # Validate the imported entry
180
  entry = UserEntry(**entry_dict)
181
- entry.timestamp = datetime.fromisoformat(entry_dict["timestamp"]) # Ensure timestamp is datetime
182
  user_data[ip] = entry.dict()
183
  imported_count += 1
184
  except Exception as e:
185
  print(f"Error importing entry for IP {ip}: {e}") # Log individual import errors
186
-
187
- save_data()
188
  return {"message": f"Successfully imported {imported_count} user entries."}
189
  except json.JSONDecodeError:
190
  raise HTTPException(status_code=400, detail="Invalid JSON format")
191
  except Exception as e:
192
  raise HTTPException(status_code=500, detail=f"Error importing data: {e}")
193
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
194
  # --- Background Task (Optional, for regular cleanup) ---
195
  async def scheduled_cleanup():
196
  """Periodically clean up old data."""
@@ -206,6 +248,7 @@ from fastapi import BackgroundTasks
206
  async def startup_event():
207
  # You can uncomment this to run the background task
208
  asyncio.create_task(scheduled_cleanup())
 
209
  pass
210
 
211
  # --- Error Handling (Advanced - using exception handlers) ---
 
1
  from typing import List, Dict, Optional
2
  from datetime import datetime, timedelta
3
  from fastapi import FastAPI, HTTPException, Query, Body, Request
4
+ from pydantic import BaseModel, validator, root_validator
5
  import json
6
  import os
7
+ from user_agents import parse
8
 
9
  app = FastAPI()
10
 
11
  # Data storage (in-memory)
12
  user_data: Dict[str, dict] = {} # Key: IP address, Value: User entry
13
 
 
 
 
14
  # --- Data Models ---
15
  class UserEntry(BaseModel):
16
  ip_address: str
17
+ device_type: str = "N/A"
18
+ timestamp: datetime = datetime.now()
19
+ browser: str = "N/A"
20
+ OS: str = "N/A"
21
 
22
  @validator("ip_address")
23
  def validate_ip_address(cls, value):
 
33
  raise ValueError("Invalid IP address value")
34
  return value
35
 
36
+ @root_validator(pre=True)
37
+ def set_default_values(cls, values):
38
+ """Set default values for missing fields."""
39
+ defaults = {
40
+ "device_type": "N/A",
41
+ "browser": "N/A",
42
+ "OS": "N/A",
43
+ "timestamp": datetime.now()
44
+ }
45
+ for key, default_value in defaults.items():
46
+ if key not in values or values[key] is None:
47
+ values[key] = default_value
48
+ return values
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
  def clean_old_data():
51
  """Deletes data older than 7 days."""
 
54
  ips_to_delete = [ip for ip, entry in user_data.items() if entry["timestamp"] < cutoff_time]
55
  for ip in ips_to_delete:
56
  del user_data[ip]
 
 
 
 
 
57
 
58
  # --- API Endpoints ---
59
  @app.post("/auto_entry/", response_model=UserEntry, status_code=201)
60
  async def create_auto_user_entry(
61
+ request: Request
 
 
62
  ):
63
  """
64
  Endpoint to automatically record user entry by extracting the IP address
 
70
  if "x-forwarded-for" in request.headers:
71
  ip_address = request.headers["x-forwarded-for"].split(",")[0]
72
 
73
+ user_agent = request.headers.get("User-Agent", "N/A")
74
+ user_agent_parsed = parse(user_agent)
75
+
76
+ device_type = "Mobile" if user_agent_parsed.is_mobile else "Tablet" if user_agent_parsed.is_tablet else "Desktop"
77
+ browser_name = user_agent_parsed.browser.family if user_agent_parsed else "N/A"
78
+ os_name = user_agent_parsed.os.family if user_agent_parsed else "N/A"
79
+
80
+ timestamp = datetime.now()
81
 
82
  # Create a UserEntry object
83
  entry_data = UserEntry(
84
  ip_address=ip_address,
85
  device_type=device_type,
86
+ timestamp=timestamp,
87
+ browser=browser_name,
88
+ OS=os_name
89
  )
90
 
91
  # Save the entry
92
  user_data[ip_address] = entry_data.dict()
 
93
 
94
  return entry_data
95
 
 
98
  except Exception as e:
99
  raise HTTPException(status_code=500, detail=f"Internal server error: {e}")
100
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
  @app.get("/analytics/")
102
  async def get_user_analytics(period: str = Query(..., enum=["last_hour", "last_day", "last_7_day"])):
103
  """Endpoint to get user analytics."""
 
147
  try:
148
  # Validate the imported entry
149
  entry = UserEntry(**entry_dict)
150
+ entry.timestamp = datetime.fromisoformat(entry_dict.get("timestamp", datetime.now().isoformat())) # Ensure timestamp is datetime
151
  user_data[ip] = entry.dict()
152
  imported_count += 1
153
  except Exception as e:
154
  print(f"Error importing entry for IP {ip}: {e}") # Log individual import errors
 
 
155
  return {"message": f"Successfully imported {imported_count} user entries."}
156
  except json.JSONDecodeError:
157
  raise HTTPException(status_code=400, detail="Invalid JSON format")
158
  except Exception as e:
159
  raise HTTPException(status_code=500, detail=f"Error importing data: {e}")
160
 
161
+ # Data storage (in-memory)
162
+ user_data: Dict[str, dict] = {} # Key: IP address, Value: User entry
163
+ poll_data: Dict[str, dict] = {} # Key: Poll name, Value: Poll details
164
+ poll_responses: Dict[str, Dict[str, int]] = {} # Key: Poll name, Value: {IP: response}
165
+
166
+ # --- Data Models ---
167
+ class PollCreate(BaseModel):
168
+ poll_name: str
169
+ question: str
170
+ options: List[str]
171
+
172
+ class PollEntry(BaseModel):
173
+ poll_name: str
174
+ response: int
175
+
176
+ # --- API Endpoints ---
177
+ @app.post("/poll/create/", status_code=201)
178
+ async def create_poll(poll: PollCreate):
179
+ """Endpoint to create a new poll."""
180
+ if poll.poll_name in poll_data:
181
+ raise HTTPException(status_code=400, detail="Poll with this name already exists.")
182
+ poll_data[poll.poll_name] = {
183
+ "question": poll.question,
184
+ "options": poll.options,
185
+ "created_at": datetime.now()
186
+ }
187
+ poll_responses[poll.poll_name] = {}
188
+ return {"message": "Poll created successfully.", "poll_name": poll.poll_name}
189
+
190
+ @app.post("/poll/entry/", status_code=201)
191
+ async def create_poll_entry(request: Request, poll_entry: PollEntry):
192
+ """Endpoint to record a user's response to a poll."""
193
+ ip_address = request.client.host
194
+ if "x-forwarded-for" in request.headers:
195
+ ip_address = request.headers["x-forwarded-for"].split(",")[0]
196
+
197
+ if poll_entry.poll_name not in poll_data:
198
+ raise HTTPException(status_code=404, detail="Poll not found.")
199
+
200
+ if poll_entry.response < 1 or poll_entry.response > len(poll_data[poll_entry.poll_name]["options"]):
201
+ raise HTTPException(status_code=400, detail="Invalid response option.")
202
+
203
+ poll_responses[poll_entry.poll_name][ip_address] = poll_entry.response
204
+ return {"message": "Poll entry recorded successfully."}
205
+
206
+ @app.get("/poll/analytics/")
207
+ async def get_poll_analytics(poll_name: str = Query(..., description="Name of the poll")):
208
+ """Endpoint to get analytics for a specific poll."""
209
+ if poll_name not in poll_data:
210
+ raise HTTPException(status_code=404, detail="Poll not found.")
211
+
212
+ responses = poll_responses[poll_name]
213
+ response_counts = {option: 0 for option in range(1, len(poll_data[poll_name]["options"]) + 1)}
214
+ for response in responses.values():
215
+ response_counts[response] += 1
216
+
217
+ return {
218
+ "poll_name": poll_name,
219
+ "question": poll_data[poll_name]["question"],
220
+ "options": poll_data[poll_name]["options"],
221
+ "response_counts": response_counts,
222
+ "total_responses": len(responses)
223
+ }
224
+
225
+ # --- Background Task for Poll Deletion ---
226
+ async def scheduled_poll_cleanup():
227
+ """Periodically clean up old polls."""
228
+ while True:
229
+ now = datetime.now()
230
+ polls_to_delete = [poll_name for poll_name, poll in poll_data.items() if (now - poll["created_at"]).days >= 7]
231
+ for poll_name in polls_to_delete:
232
+ del poll_data[poll_name]
233
+ del poll_responses[poll_name]
234
+ await asyncio.sleep(60 * 60 * 24) # Clean every 24 hours
235
+
236
  # --- Background Task (Optional, for regular cleanup) ---
237
  async def scheduled_cleanup():
238
  """Periodically clean up old data."""
 
248
  async def startup_event():
249
  # You can uncomment this to run the background task
250
  asyncio.create_task(scheduled_cleanup())
251
+ asyncio.create_task(scheduled_poll_cleanup())
252
  pass
253
 
254
  # --- Error Handling (Advanced - using exception handlers) ---