Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import os | |
from datetime import datetime | |
from dotenv import load_dotenv | |
from supabase import create_client, Client | |
from pinecone import Pinecone | |
from sentence_transformers import SentenceTransformer | |
from typing import List, Dict | |
load_dotenv() | |
SUPABASE_URL = os.getenv("DB_URL") | |
SUPABASE_KEY = os.getenv("DB_KEY") | |
pinecone_api_key = os.getenv("PINECONE") | |
supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
pc = Pinecone(api_key=pinecone_api_key) | |
index = pc.Index("focus-guru") | |
model = SentenceTransformer("all-MiniLM-L6-v2") | |
def ingest_user_progress( | |
supabase_client: Client, | |
user_id: int, | |
video_id: str, | |
rating: float, | |
time_spent: int, | |
play_count: int, | |
completed: bool | |
): | |
data = { | |
"user_id": user_id, | |
"video_id": video_id, | |
"rating": rating, | |
"time_spent": time_spent, | |
"play_count": play_count, | |
"completed": completed, | |
"updated_at": datetime.now().isoformat() | |
} | |
response = supabase_client.table("user_progress").insert(data, upsert=True).execute() | |
return response.data | |
def gradio_ingest(user_input): | |
try: | |
data = json.loads(user_input) | |
user_id = int(data.get("user_id", 0)) | |
video_id = data.get("video_id", "") | |
rating = float(data.get("rating", 0)) | |
time_spent = int(data.get("time_spent", 0)) | |
play_count = int(data.get("play_count", 0)) | |
completed = bool(data.get("completed", False)) | |
except Exception as e: | |
return f"<p style='color: red;'>Error parsing input: {e}</p>" | |
res = ingest_user_progress(supabase_client, user_id, video_id, rating, time_spent, play_count, completed) | |
return f"<p style='color: green;'>Ingested data: {res}</p>" | |
def recommend_playlists_by_package_and_module(assessment_output, index, model): | |
report_text = assessment_output.get("report", "") | |
packages = assessment_output.get("package", []) | |
modules = ["Nutrition", "Exercise", "Meditation"] | |
recommendations = {} | |
if not report_text: | |
for pkg in packages: | |
recommendations[pkg] = {mod: {"title": "No playlist found", "description": ""} for mod in modules} | |
return recommendations | |
query_embedding = model.encode(report_text, convert_to_numpy=True).tolist() | |
for pkg in packages: | |
recommendations[pkg] = {} | |
for mod in modules: | |
filter_dict = {"type": "playlist", "Package": pkg, "Module": mod} | |
results = index.query(vector=query_embedding, top_k=1, include_metadata=True, filter=filter_dict) | |
if results["matches"]: | |
match = results["matches"][0] | |
metadata = match["metadata"] | |
title = metadata.get("Playlist Name", "Unknown Playlist") | |
description = metadata.get("Description", "") | |
recommendations[pkg][mod] = {"title": title, "description": description} | |
else: | |
recommendations[pkg][mod] = {"title": "No playlist found", "description": ""} | |
return recommendations | |
def gradio_recommend_playlist(input_json): | |
try: | |
assessment_data = json.loads(input_json) | |
except json.JSONDecodeError: | |
return "<p style='color: red;'>Error: Invalid JSON format</p>" | |
if "package" not in assessment_data or "report" not in assessment_data: | |
return "<p style='color: red;'>Error: Missing 'package' or 'report' field</p>" | |
recs = recommend_playlists_by_package_and_module(assessment_data, index, model) | |
html_output = "<div style='padding: 20px; font-family: Arial, sans-serif;'>" | |
for pkg, mod_recs in recs.items(): | |
html_output += f"<h2>{pkg} Package</h2><div style='display: flex; flex-wrap: wrap; gap: 20px;'>" | |
for mod, rec in mod_recs.items(): | |
html_output += f""" | |
<div style="border: 1px solid #ccc; border-radius: 8px; padding: 15px; width: 260px;"> | |
<h3>{mod} Module</h3> | |
<strong>{rec['title']}</strong> | |
<p>{rec['description']}</p> | |
</div> | |
""" | |
html_output += "</div>" | |
html_output += "</div>" | |
return html_output | |
def recommend_videos(user_id: int, K: int = 5, M: int = 10, N: int = 5) -> Dict: | |
response = supabase_client.table("user_progress").select("video_id, rating, completed, play_count, videos!inner(playlist_id)").eq("user_id", user_id).execute() | |
interactions = response.data | |
if not interactions: | |
return { | |
"note": "No interactions recorded for this user yet. Please watch or rate some videos.", | |
"recommendations": [] | |
} | |
for inter in interactions: | |
rating = inter["rating"] if inter["rating"] is not None else 0 | |
completed_val = 1 if inter["completed"] else 0 | |
play_count = inter["play_count"] | |
inter["engagement"] = rating + 2 * completed_val + play_count | |
top_videos = sorted(interactions, key=lambda x: x["engagement"], reverse=True)[:K] | |
watched_completed_videos = {i["video_id"] for i in interactions if i["completed"]} | |
watched_incomplete_videos = {i["video_id"] for i in interactions if not i["completed"]} | |
candidates = {} | |
for top_video in top_videos: | |
query_id = f"video_{top_video['video_id']}" | |
response = index.query(id=query_id, top_k=M + 1, include_metadata=True) | |
for match in response.get("matches", []): | |
if match["id"] == query_id: | |
continue | |
metadata = match.get("metadata", {}) | |
vid = metadata.get("vid") | |
if not vid: | |
continue | |
if vid in watched_completed_videos: | |
continue | |
similarity = match["score"] | |
pid = metadata.get("PID") | |
boost = 1.1 if pid == top_video["videos"]["playlist_id"] else 1.0 | |
partial_score = top_video["engagement"] * similarity * boost | |
if vid in candidates: | |
candidates[vid]["total_score"] += partial_score | |
else: | |
candidates[vid] = {"total_score": partial_score, "metadata": metadata} | |
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1]["total_score"], reverse=True)[:N] | |
recommendations = [] | |
for vid, data in sorted_candidates: | |
metadata = data["metadata"] | |
video_title = metadata.get("video_title", "Untitled Video") | |
if vid in watched_incomplete_videos: | |
video_title += " (Incomplete)" | |
recommendations.append({ | |
"video_id": vid, | |
"title": video_title, | |
"description": metadata.get("video_description", ""), | |
"score": data["total_score"] | |
}) | |
note_text = "Based on your engagement, here are some recommended videos from the same playlist." | |
return {"note": note_text, "recommendations": recommendations} | |
def gradio_recommend_videos(user_id_input): | |
try: | |
user_id = int(user_id_input) | |
except Exception as e: | |
return f"Error: {e}", "" | |
result = recommend_videos(user_id) | |
note_text = result["note"] | |
recs = result["recommendations"] | |
if not recs: | |
return note_text, "" | |
html_output = "<div>" | |
# Use black cards with white text and orange border for visibility | |
for rec in recs: | |
html_output += f""" | |
<div style="background: #000; color: #fff; border: 2px solid orange; border-radius: 8px; margin-bottom: 10px; padding: 15px;"> | |
<h3 style="margin-top: 0;">{rec['title']}</h3> | |
<p style="margin: 0;">{rec['description']}</p> | |
<p style="margin: 0;"><strong>Score:</strong> {rec['score']:.2f}</p> | |
</div> | |
""" | |
html_output += "</div>" | |
return note_text, html_output | |
with gr.Blocks() as demo: | |
with gr.Tabs(): | |
with gr.TabItem("Playlist Recommendation"): | |
playlist_input = gr.Textbox( | |
lines=10, | |
label="Assessment Data (JSON)", | |
placeholder='''{ | |
"package": ["Focus", "Insomnia"], | |
"report": "Based on your responses, you may struggle with focus, anxiety, and burnout..." | |
}''' | |
) | |
playlist_output = gr.HTML(label="Recommended Playlists") | |
playlist_btn = gr.Button("Get Playlist Recommendations") | |
playlist_btn.click(fn=gradio_recommend_playlist, inputs=playlist_input, outputs=playlist_output) | |
with gr.TabItem("Video Recommendation"): | |
user_id_input = gr.Textbox(lines=1, label="User ID", placeholder="1") | |
note_output = gr.Textbox(label="Recommendation Note", interactive=False) | |
videos_output = gr.HTML(label="Recommended Videos") | |
videos_btn = gr.Button("Get Video Recommendations") | |
videos_btn.click(fn=gradio_recommend_videos, inputs=user_id_input, outputs=[note_output, videos_output]) | |
with gr.TabItem("User Interaction Ingestion"): | |
ingest_input = gr.Textbox( | |
lines=10, | |
label="User Progress Data (JSON)", | |
placeholder='''{ | |
"user_id": 1, | |
"video_id": "abc123", | |
"rating": 4.5, | |
"time_spent": 300, | |
"play_count": 1, | |
"completed": false | |
}''' | |
) | |
ingest_output = gr.HTML(label="Ingestion Result") | |
ingest_btn = gr.Button("Ingest Data") | |
ingest_btn.click(fn=gradio_ingest, inputs=ingest_input, outputs=ingest_output) | |
demo.launch() | |