Spaces:
Runtime error
Runtime error
import streamlit as st | |
import hopsworks | |
from mimesis import Generic | |
from mimesis.locales import Locale | |
import pandas as pd | |
import random | |
# Function to print a styled header | |
def print_header(text, font_size=22): | |
res = f'<span style=" font-size: {font_size}px;">{text}</span>' | |
st.markdown(res, unsafe_allow_html=True) | |
# Function to retrieve and start model deployments | |
def get_deployments(): | |
# Displaying a message indicating the process has started | |
st.write("๐ Retrieving and Starting Deployments...") | |
# Logging into the Hopsworks project | |
project = hopsworks.login() | |
fs = project.get_feature_store() | |
interactions_fg = fs.get_feature_group( | |
name="interactions", | |
version=1, | |
) | |
videos_fg = fs.get_feature_group( | |
name="videos", | |
version=1, | |
) | |
# Getting the model serving instance from the project | |
ms = project.get_model_serving() | |
# Retrieving deployments for the query model and ranking model | |
query_model_deployment = ms.get_deployment("querydeployment") | |
ranking_deployment = ms.get_deployment("rankingdeployment") | |
# Starting the ranking deployment with a maximum waiting time of 180 seconds | |
ranking_deployment.start(await_running=180) | |
# Starting the query model deployment with a maximum waiting time of 180 seconds | |
query_model_deployment.start(await_running=180) | |
# Displaying a message indicating that deployments are ready | |
st.write('โ Deployments are ready!') | |
# Returning deployment instances | |
return interactions_fg, videos_fg, ranking_deployment, query_model_deployment | |
def insert_interaction(user_id, video_id, interactions_fg): | |
generic = Generic(locale=Locale.EN) | |
interaction_id = generic.person.identifier(mask='####-##-####') | |
interaction_type = random.choices( | |
['like', 'dislike', 'view', 'comment', 'share', 'skip'], | |
weights=[1.5, 0.2, 3, 0.5, 0.8, 10], k=1 | |
)[0] | |
watch_time = random.randint(1, 50) | |
interaction_df = pd.DataFrame({ | |
'interaction_id': [interaction_id], | |
'interaction_type': [interaction_type], | |
'user_id': [user_id], | |
'video_id': [video_id], | |
'watch_time': [watch_time] | |
}) | |
interactions_fg.insert(interaction_df) | |
# Define function to fetch recommendations | |
def fetch_recommendations(user_id, query_model_deployment): | |
st.write('๐ฎ Getting recommendations...') | |
deployment_input = {"instances": {"user_id": user_id}} | |
prediction = query_model_deployment.predict(deployment_input)['predictions']['ranking'] | |
return prediction | |
# Function to insert interaction and fetch new recommendations | |
def handle_interaction(user_id, video_id, interactions_fg, query_model_deployment): | |
insert_interaction(user_id, video_id, interactions_fg) | |
return fetch_recommendations(user_id, query_model_deployment) | |
# Main Streamlit application logic | |
def main(): | |
st.title('๐ฌ Video Recommender') | |
# Initialize or re-use existing deployments | |
if 'deployments_initialized' not in st.session_state: | |
st.session_state.interactions_fg, st.session_state.videos_fg, st.session_state.ranking_deployment, st.session_state.query_model_deployment = get_deployments() | |
st.session_state['deployments_initialized'] = True | |
# User selection box | |
user_id_option = st.selectbox( | |
'For which user?', | |
('CJ730Y', 'AL707G', 'WS920J', 'YP322C', 'HE156H',), | |
key='user_select' | |
) | |
# Initialize or refresh recommendations | |
if 'recommendations' not in st.session_state or 'refresh' in st.session_state: | |
recommendations = fetch_recommendations(user_id_option, st.session_state.query_model_deployment) | |
random.shuffle(recommendations) | |
st.session_state.recommendations = recommendations | |
st.session_state.pop('refresh', None) | |
print_header('๐ Top 3 Recommendations:') | |
displayed_recommendations = st.session_state.recommendations[:3] | |
for recommendation in displayed_recommendations: | |
video_id = recommendation[1] | |
if st.button(f"๐ Video ID: {video_id}", key=video_id): | |
new_recommendations = handle_interaction( | |
user_id_option, | |
video_id, | |
st.session_state.interactions_fg, | |
st.session_state.query_model_deployment, | |
) | |
random.shuffle(new_recommendations) | |
st.session_state.recommendations = new_recommendations | |
st.experimental_rerun() | |
if st.button("Stop Streamlit"): | |
st.write('โ๏ธ Stopping Deployments...') | |
st.session_state.ranking_deployment.stop() | |
st.session_state.query_model_deployment.stop() | |
st.success('โ App finished successfully!') | |
st.stop() | |
if __name__ == '__main__': | |
main() | |