Spaces:
Running
Running
from datetime import datetime | |
from db.schema import Feedback, Response | |
from db.setup import db_setup | |
# Create operation (Ingest data into Firebase with unique user_id check) | |
def ingest(data: Feedback): | |
ref = db_setup() # Ensure Firebase is initialized | |
print(f"Attempting to ingest feedback data from user '{data.user_id}'...") | |
feedback_data = data.model_dump() | |
feedback_data["time_stamp"] = feedback_data['time_stamp'].isoformat() | |
ref.child('feedback').push(feedback_data) | |
print(f"Feedback data from user '{data.user_id}' pushed to Firebase!") | |
def save_feedback(data: Feedback): | |
ref = db_setup() # Ensure Firebase is initialized | |
print(f"Processing feedback for user '{data.user_id}'...") | |
# Convert Pydantic model to dictionary | |
feedback_data = data.model_dump() | |
feedback_data["time_stamp"] = feedback_data["time_stamp"].isoformat() | |
# Check if the user already has feedback | |
existing_feedback = ref.child("feedback").order_by_child("user_id").equal_to(data.user_id).get() | |
if existing_feedback: | |
# Update existing feedback | |
for key in existing_feedback: | |
ref.child("feedback").child(key).update(feedback_data) | |
print(f"Feedback updated for user '{data.user_id}'!") | |
else: | |
# Insert new feedback | |
ref.child("feedback").push(feedback_data) | |
print(f"(Ingestion) New feedback added for user '{data.user_id}'!") | |
# Read operation (Fetch feedback data from Firebase) | |
def read(user_id: str): | |
"""Fetch feedback data for a specific user from Firebase.""" | |
ref = db_setup() | |
feedback_ref = ref.child('feedback').order_by_child('user_id').equal_to(user_id).get() | |
if feedback_ref: | |
for key, value in feedback_ref.items(): | |
return value # Return the first found entry | |
return None # No progress found | |
# Update operation (Update feedback data in Firebase) | |
def update(feedback_id: int, updated_data: Feedback): | |
ref = db_setup() | |
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get() | |
if feedback_ref: | |
# Assuming we're updating the first entry found | |
for key in feedback_ref: | |
ref.child('feedback').child(key).update(updated_data.dict()) | |
print("Feedback data updated in Firebase!") | |
else: | |
print("Feedback not found to update!") | |
# Delete operation (Remove feedback data from Firebase) | |
def delete(feedback_id: int): | |
ref = db_setup() | |
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get() | |
if feedback_ref: | |
# Assuming we're deleting the first entry found | |
for key in feedback_ref: | |
ref.child('feedback').child(key).delete() | |
print("Feedback data deleted from Firebase!") | |
else: | |
print("Feedback not found to delete!") | |
def reset_feedback_in_db(user_id: str): | |
"""Deletes previous feedback for a user, resetting progress in Firebase.""" | |
ref = db_setup() | |
feedback_ref = ref.child('feedback').order_by_child('user_id').equal_to(user_id).get() | |
if feedback_ref: | |
for key in feedback_ref: | |
ref.child('feedback').child(key).delete() | |
print(f"Feedback data for user '{user_id}' reset in Firebase.") | |