Spaces:
Runtime error
Runtime error
import pandas as pd | |
import numpy as np | |
from zipfile import ZipFile | |
import tensorflow as tf | |
from tensorflow import keras | |
from pathlib import Path | |
import matplotlib.pyplot as plt | |
import gradio as gr | |
from huggingface_hub import from_pretrained_keras | |
from collections import defaultdict | |
import math | |
import networkx as nx | |
model = from_pretrained_keras("bpHigh/Node2Vec_MovieLens") | |
# Download the actual data from http://files.grouplens.org/datasets/movielens/ml-latest-small.zip" | |
movielens_data_file_url = "http://files.grouplens.org/datasets/movielens/ml-latest-small.zip" | |
movielens_zipped_file = keras.utils.get_file("ml-latest-small.zip", movielens_data_file_url, extract=False) | |
keras_datasets_path = Path(movielens_zipped_file).parents[0] | |
movielens_dir = keras_datasets_path / "ml-latest-small" | |
# Only extract the data the first time the script is run. | |
if not movielens_dir.exists(): | |
with ZipFile(movielens_zipped_file, "r") as zip: | |
# Extract files | |
print("Extracting all the files now...") | |
zip.extractall(path=keras_datasets_path) | |
print("Done!") | |
# Read the Movies csv | |
movies = pd.read_csv(f"{movielens_dir}/movies.csv") | |
# Create a `movieId` string. | |
movies["movieId"] = movies["movieId"].apply(lambda x: f"movie_{x}") | |
# Load ratings to a DataFrame. | |
ratings = pd.read_csv(f"{movielens_dir}/ratings.csv") | |
# Convert the `ratings` to floating point | |
ratings["rating"] = ratings["rating"].apply(lambda x: float(x)) | |
# Create the `movie_id` string. | |
ratings["movieId"] = ratings["movieId"].apply(lambda x: f"movie_{x}") | |
# Implement two utility functions for the movies DataFrame. | |
def get_movie_title_by_id(movieId): | |
return list(movies[movies.movieId == movieId].title)[0] | |
def get_movie_id_by_title(title): | |
return list(movies[movies.title == title].movieId)[0] | |
# Create Weighted Edges between movies | |
min_rating = 5 | |
pair_frequency = defaultdict(int) | |
item_frequency = defaultdict(int) | |
# Filter instances where rating is greater than or equal to min_rating. | |
rated_movies = ratings[ratings.rating >= min_rating] | |
# Group instances by user. | |
movies_grouped_by_users = list(rated_movies.groupby("userId")) | |
for group in movies_grouped_by_users: | |
# Get a list of movies rated by the user. | |
current_movies = list(group[1]["movieId"]) | |
for i in range(len(current_movies)): | |
item_frequency[current_movies[i]] += 1 | |
for j in range(i + 1, len(current_movies)): | |
x = min(current_movies[i], current_movies[j]) | |
y = max(current_movies[i], current_movies[j]) | |
pair_frequency[(x, y)] += 1 | |
# Create the graph with the nodes and the edges | |
min_weight = 10 | |
D = math.log(sum(item_frequency.values())) | |
# Create the movies undirected graph. | |
movies_graph = nx.Graph() | |
# Add weighted edges between movies. | |
# This automatically adds the movie nodes to the graph. | |
for pair in pair_frequency: | |
x, y = pair | |
xy_frequency = pair_frequency[pair] | |
x_frequency = item_frequency[x] | |
y_frequency = item_frequency[y] | |
pmi = math.log(xy_frequency) - math.log(x_frequency) - math.log(y_frequency) + D | |
weight = pmi * xy_frequency | |
# Only include edges with weight >= min_weight. | |
if weight >= min_weight: | |
movies_graph.add_edge(x, y, weight=weight) | |
# Create vocabulary and a mapping from tokens to integer indices | |
vocabulary = ["NA"] + list(movies_graph.nodes) | |
vocabulary_lookup = {token: idx for idx, token in enumerate(vocabulary)} | |
# Analyze the learnt embeddings. | |
movie_embeddings = model.get_layer("item_embeddings").get_weights()[0] | |
# Find Related Movies | |
movie_titles = list(movies['title']) | |
def find_related_movies(movie_title, k): | |
k = int(k) | |
query_embeddings = [] | |
movieId = get_movie_id_by_title(movie_title) | |
token_id = vocabulary_lookup[movieId] | |
query_embedding = movie_embeddings[token_id] | |
query_embeddings.append(query_embedding) | |
query_embeddings = np.array(query_embeddings) | |
similarities = tf.linalg.matmul( | |
tf.math.l2_normalize(query_embeddings), | |
tf.math.l2_normalize(movie_embeddings), | |
transpose_b=True, | |
) | |
_, indices = tf.math.top_k(similarities, k) | |
indices = indices.numpy().tolist() | |
similar_tokens = indices[0] | |
related_movies = [] | |
for token in similar_tokens: | |
similar_movieId = vocabulary[token] | |
similar_title = get_movie_title_by_id(similar_movieId) | |
related_movies.append(similar_title) | |
related_movies_df = pd.DataFrame({'Related Movies':related_movies}) | |
return related_movies_df | |
demo = gr.Blocks() | |
with demo: | |
gr.Markdown(""" | |
<div> | |
<h1 style='text-align: center'>Find Related Movies</h1> | |
<h2>Choose the specific movie from the dropdown and see the top k related Movies</h2> | |
Note: The dropdown menu provides movie options from the Movielens dataset. | |
</div> | |
""") | |
with gr.Box(): | |
gr.Markdown( | |
""" | |
### Input | |
#### Select a movie to find other related movies. | |
""") | |
inp1 = gr.Dropdown(movie_titles) | |
gr.Markdown( | |
""" | |
<br> | |
""") | |
gr.Markdown( | |
""" | |
#### Number of related movies you wanna find? | |
""") | |
inp2 = gr.Number() | |
btn = gr.Button("Run") | |
with gr.Box(): | |
gr.Markdown( | |
""" | |
### Output | |
#### Top K related movies. | |
""") | |
df1 = gr.DataFrame(headers=["title"], datatype=["str"], interactive=False) | |
with gr.Row(): | |
gr.Markdown( | |
""" | |
<h4>Credits</h4> | |
Author: <a href="https://www.linkedin.com/in/khalid-salama-24403144/"> Khalid Salama</a>.<br> | |
Based on the following Keras example <a href="https://keras.io/examples/graph/node2vec_movielens/"> Graph representation learning with node2vec</a> by Khalid Salama<br> | |
Check out the model <a href="https://huggingface.co/keras-io/Node2Vec_MovieLens">here</a> | |
""" | |
) | |
btn.click(fn=find_related_movies, inputs=[inp1,inp2], outputs=df1) | |
demo.launch(debug=True) |