Spaces:
Runtime error
Runtime error
File size: 5,216 Bytes
ed4d993 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
from typing import Optional, Type
from langchain.callbacks.manager import (
AsyncCallbackManagerForToolRun,
CallbackManagerForToolRun,
)
from langchain.pydantic_v1 import BaseModel, Field
from langchain.tools import BaseTool
from neo4j_semantic_ollama.utils import get_candidates, get_user_id, graph
recommendation_query_db_history = """
MERGE (u:User {userId:$user_id})
WITH u
// get recommendation candidates
OPTIONAL MATCH (u)-[r1:RATED]->()<-[r2:RATED]-()-[r3:RATED]->(recommendation)
WHERE r1.rating > 3.5 AND r2.rating > 3.5 AND r3.rating > 3.5
AND NOT EXISTS {(u)-[:RATED]->(recommendation)}
// rank and limit recommendations
WITH u, recommendation, count(*) AS count
ORDER BY count DESC LIMIT 3
RETURN 'title:' + recommendation.title + '\nactors:' +
apoc.text.join([(recommendation)<-[:ACTED_IN]-(a) | a.name], ',') +
'\ngenre:' + apoc.text.join([(recommendation)-[:IN_GENRE]->(a) | a.name], ',')
AS movie
"""
recommendation_query_genre = """
MATCH (m:Movie)-[:IN_GENRE]->(g:Genre {name:$genre})
// filter out already seen movies by the user
WHERE NOT EXISTS {
(m)<-[:RATED]-(:User {userId:$user_id})
}
// rank and limit recommendations
WITH m AS recommendation
ORDER BY recommendation.imdbRating DESC LIMIT 3
RETURN 'title:' + recommendation.title + '\nactors:' +
apoc.text.join([(recommendation)<-[:ACTED_IN]-(a) | a.name], ',') +
'\ngenre:' + apoc.text.join([(recommendation)-[:IN_GENRE]->(a) | a.name], ',')
AS movie
"""
def recommendation_query_movie(genre: bool) -> str:
return f"""
MATCH (m1:Movie)<-[r1:RATED]-()-[r2:RATED]->(m2:Movie)
WHERE r1.rating > 3.5 AND r2.rating > 3.5 and m1.title IN $movieTitles
// filter out already seen movies by the user
AND NOT EXISTS {{
(m2)<-[:RATED]-(:User {{userId:$user_id}})
}}
{'AND EXISTS {(m2)-[:IN_GENRE]->(:Genre {name:$genre})}' if genre else ''}
// rank and limit recommendations
WITH m2 AS recommendation, count(*) AS count
ORDER BY count DESC LIMIT 3
RETURN 'title:' + recommendation.title + '\nactors:' +
apoc.text.join([(recommendation)<-[:ACTED_IN]-(a) | a.name], ',') +
'\ngenre:' + apoc.text.join([(recommendation)-[:IN_GENRE]->(a) | a.name], ',')
AS movie
"""
nl = "\n"
def recommend_movie(movie: Optional[str] = None, genre: Optional[str] = None) -> str:
"""
Recommends movies based on user's history and preference
for a specific movie and/or genre.
Returns:
str: A string containing a list of recommended movies, or an error message.
"""
user_id = get_user_id()
params = {"user_id": user_id, "genre": genre}
if not movie and not genre:
# Try to recommend a movie based on the information in the db
response = graph.query(recommendation_query_db_history, params)
try:
return (
'Recommended movies are: '
f'{f"###Movie {nl}".join([el["movie"] for el in response])}'
)
except Exception:
return "Can you tell us about some of the movies you liked?"
if not movie and genre:
# Recommend top voted movies in the genre the user haven't seen before
response = graph.query(recommendation_query_genre, params)
try:
return (
'Recommended movies are: '
f'{f"###Movie {nl}".join([el["movie"] for el in response])}'
)
except Exception:
return "Something went wrong"
candidates = get_candidates(movie, "movie")
if not candidates:
return "The movie you mentioned wasn't found in the database"
params["movieTitles"] = [el["candidate"] for el in candidates]
query = recommendation_query_movie(bool(genre))
response = graph.query(query, params)
try:
return (
'Recommended movies are: '
f'{f"###Movie {nl}".join([el["movie"] for el in response])}'
)
except Exception:
return "Something went wrong"
all_genres = [
"Action",
"Adventure",
"Animation",
"Children",
"Comedy",
"Crime",
"Documentary",
"Drama",
"Fantasy",
"Film-Noir",
"Horror",
"IMAX",
"Musical",
"Mystery",
"Romance",
"Sci-Fi",
"Thriller",
"War",
"Western",
]
class RecommenderInput(BaseModel):
movie: Optional[str] = Field(description="movie used for recommendation")
genre: Optional[str] = Field(
description=(
"genre used for recommendation. Available options are:" f"{all_genres}"
)
)
class RecommenderTool(BaseTool):
name = "Recommender"
description = "useful for when you need to recommend a movie"
args_schema: Type[BaseModel] = RecommenderInput
def _run(
self,
movie: Optional[str] = None,
genre: Optional[str] = None,
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> str:
"""Use the tool."""
return recommend_movie(movie, genre)
async def _arun(
self,
movie: Optional[str] = None,
genre: Optional[str] = None,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""Use the tool asynchronously."""
return recommend_movie(movie, genre)
|