from adaptors.db import get_story, save_story from models.story import Story from loguru import logger import json from pprint import pprint import helpers import prompts import random from adaptors.llm import answer from adaptors.transcriber import transcribe from adaptors.voice import say_new DEFAULT_WORLD = (lambda: random.choice(["Tolkien's Middle Earth", "Future Space Exploration world of Asimov's Foundation Trilogy", "The world of Lewis Carrol's Alice in the Wonderland"]))() DEFAULT_HERO = "I don't know, please choose something interesting and surprising" DEFAULT_PLOT = "I don't know, please come up with something unexpected" DEFAULT_ENDING = (lambda: random.choice(["happy", "tragic", "funny", "unexpected"]))() DEFAULT_STYLE = (lambda: random.choice(["Epic", "Funny", "Poetic"]))() ''' Here we manage the flow and state of the story ''' def do_homeros(user_input, story_data, settings): # if we are in text input mode: engage whisper to transcribe text if user_input and not settings["use_text_input"]: logger.debug('initiating transcription') user_input = transcribe(user_input) logger.debug(user_input) #TODO refactor. naming in the function is old story = story_data # story hasn't started if story["status"] == "not_started": logger.debug("status: initiating a new story") next_message = helpers.get_fixed_msg("welcome") story["status"] = "checking_magic_word" # we are checking the magic word or it is wrong and we need to ask for it again elif story["status"] == "checking_magic_word" or story["status"] == "wrong_magic_word": logger.debug("status: checking magic word") magic_word_correct = helpers.check_magic_word(user_input) if magic_word_correct: story = init_story(story) # if default settings is true - skip the asking, including magic word and just start the story if settings["default_settings"] and len(story["chunks"]) == 0: story = define_metadata(DEFAULT_WORLD, "world", story) story = define_metadata(DEFAULT_HERO, "hero", story) story = define_metadata(DEFAULT_PLOT, "plot", story) story = define_metadata(DEFAULT_ENDING, "ending", story) story = define_metadata(DEFAULT_STYLE, "style", story) story["status"] = "ongoing" story = start_story(story) next_message = story["chunks"][-1]["audio_url"] return next_message, story, settings else: story["status"] = "defining_metadata_world" next_message = helpers.get_fixed_msg("ask_world") else: story["status"] = "wrong_magic_word" next_message = helpers.get_fixed_msg("wrong_magic_word") # defining the world elif story["status"] == "defining_metadata_world": logger.debug("status: magic word is wrong") story = define_metadata(user_input, "world", story) story["status"] = "defining_metadata_hero" next_message = helpers.get_fixed_msg("ask_hero") # defining the hero elif story["status"] == "defining_metadata_hero": logger.debug("status: defining the hero") story = define_metadata(user_input, "hero", story) story["status"] = "defining_metadata_plot" next_message = helpers.get_fixed_msg("ask_plot") # defining the plot elif story["status"] == "defining_metadata_plot": logger.debug("status: defining the plot") story = define_metadata(user_input, "plot", story) story["status"] = "defining_metadata_ending" next_message = helpers.get_fixed_msg("ask_ending") # defining the ending elif story["status"] == "defining_metadata_ending": logger.debug("status: defining the ending") story = define_metadata(user_input, "ending", story) story["status"] = "defining_metadata_style" next_message = helpers.get_fixed_msg("ask_style") # defining the style and starting the story with the first chunk elif story["status"] == "defining_metadata_style": logger.debug("status: defining the style") story = define_metadata(user_input, "style", story) story["status"] = "ongoing" story = start_story(story) next_message = story["chunks"][-1]["audio_url"] # we are in the middle of the story - evaluate if time to end, or continue elif story["status"] == "ongoing": if evaluate_story(story, settings)["is_time_to_end"]: logger.debug("status: activating story finish") story = finish_story(user_input, story) story["status"] = "finished" else: story = continue_story(user_input, story) story["status"] = "ongoing" next_message = story["chunks"][-1]["audio_url"] # story has ended, but the user still inputting. tell them it's over elif story["status"] == "finished": next_message = helpers.get_fixed_msg("no_more_story") story["status"] = "finished" else: raise Exception("strange story status") logger.error(f"we have a story status {story['status']} we didn't catch...") logger.debug(story) return next_message, story, settings ''' initiates a new story and saves in DB ''' def init_story(story_data): story_data["uuid"] = helpers.gen_unique_id() story = Story( uuid=story_data['uuid'], status=story_data['status'] ) save_story(story) return story.to_dict() ''' defines a field of metadata (world, style, plot etc) ''' def define_metadata(user_input, field, story_data): story = get_story(story_data["uuid"]) setattr(story, field, user_input) save_story(story) return story.to_dict() ''' creates the first chunk of the story ''' def start_story(story_data): return continue_story("Please begin", story_data) ''' main function that manages adding the next chunk to a story (first text, then audio) ''' def continue_story(user_input, story_data): story = get_story(story_data["uuid"]) chunks = json.loads(story.chunks) messages = json.loads(story.messages) messages.append({ "role":"user", "content": user_input }) next_chunk_text = create_next_chunk_text(user_input, story) next_chunk_audio = create_next_chunk_audio(next_chunk_text) messages.append({ "role":"assistant", "content": next_chunk_text }) chunks.append({ "text" : next_chunk_text, "audio_url" : next_chunk_audio }) pprint(chunks) pprint(messages) story.chunks = json.dumps(chunks) story.messages = json.dumps(messages) story.status = "ongoing" save_story(story) return story.to_dict() ''' generates the last part of the story and changes status to "finished" ''' def finish_story(user_input, story_data): story = get_story(story_data["uuid"]) chunks = json.loads(story.chunks) messages = json.loads(story.messages) user_input = user_input + "\n\nPlease finish the story now." messages.append({ "role":"user", "content": user_input }) next_chunk_text = create_next_chunk_text(user_input, story) next_chunk_audio = create_next_chunk_audio(next_chunk_text) chunks.append({ "text" : next_chunk_text, "audio_url" : next_chunk_audio }) story.chunks = json.dumps(chunks) story.status = "finished" save_story(story) return story.to_dict() ''' generates the next chunk of the story ''' def create_next_chunk_text(user_input, story): next_chunk = "" # get system message from prompts system_message = prompts.get( prompt_name = "storyteller_general", substitutions = { "WORLD" : story.world, "HERO" : story.hero, "PLOT" : story.plot, "ENDING" : story.ending, "STYLE" : story.style, } ) # get history of messages messages = json.loads(story.messages) # add user message to history of messages messages.append({ "role" : "user", "content": user_input }) # get llm to answer next_chunk = answer(system_message, messages) # return the answer return next_chunk ''' evaluates the story up until now and returns a dict with the result of the evaluation ''' def evaluate_story(story, settings): evaluation = {} story_len = len(story["chunks"]) logger.debug(story_len) evaluation["is_time_to_end"] = story_len >= settings["max_len"] return evaluation ''' turns next story chunk into audio and returns a URL ''' def create_next_chunk_audio(text): return say_new(text)