File size: 3,440 Bytes
8cb7e14
 
 
 
463dbd4
8cb7e14
463dbd4
 
 
8cb7e14
 
 
 
 
 
 
 
 
 
 
 
 
463dbd4
8cb7e14
 
 
 
 
 
463dbd4
8cb7e14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from copy import deepcopy
from typing import Dict, Any

import hydra
<<<<<<< HEAD
from langchain import PromptTemplate
=======
from flows.prompt_template import JinjaPrompt
>>>>>>> removed langchain dependency

from flows.base_flows import AtomicFlow
from flows.messages import UpdateMessage_Generic

from flows.utils import logging

# logging.set_verbosity_debug()  # ToDo: Has no effect on the logger for __name__. Level is warn, and info is not printed
log = logging.get_logger(f"flows.{__name__}")  # ToDo: Is there a better fix?


class HumanStandardInputFlow(AtomicFlow):
    REQUIRED_KEYS_CONFIG = ["request_multi_line_input_flag"]

    query_message_prompt_template: JinjaPrompt = None

    __default_flow_config = {
        "end_of_input_string": "EOI",
        "input_keys": [],
        "description": "Reads input from the user's standard input.",
        "query_message_prompt_template": {
            "_target_": "flows.prompt_template.JinjaPrompt",
            "template": "",
            "input_variables": [],
            "partial_variables": {},
        }
    }

    def __init__(self, query_message_prompt_template, **kwargs):
        super().__init__(**kwargs)
        self.query_message_prompt_template = query_message_prompt_template

    @classmethod
    def _set_up_prompts(cls, config):
        kwargs = {}

        kwargs["query_message_prompt_template"] = \
            hydra.utils.instantiate(config['query_message_prompt_template'], _convert_="partial")
        return kwargs

    @classmethod
    def instantiate_from_config(cls, config):
        flow_config = deepcopy(config)

        kwargs = {"flow_config": flow_config}

        # ~~~ Set up prompts ~~~
        kwargs.update(cls._set_up_prompts(flow_config))

        # ~~~ Instantiate flow ~~~
        return cls(**kwargs)

    @staticmethod
    def _get_message(prompt_template, input_data: Dict[str, Any]):
        template_kwargs = {}
        for input_variable in prompt_template.input_variables:
            template_kwargs[input_variable] = input_data[input_variable]

        msg_content = prompt_template.format(**template_kwargs)
        return msg_content

    def _read_input(self):
        if not self.flow_config["request_multi_line_input_flag"]:
            log.info("Please enter you single-line response and press enter.")
            human_input = input()
            return human_input

        end_of_input_string = self.flow_config["end_of_input_string"]
        log.info(f"Please enter your multi-line response below. "
                 f"To submit the response, write `{end_of_input_string}` on a new line and press enter.")

        content = []
        while True:
            line = input()
            if line == self.flow_config["end_of_input_string"]:
                break
            content.append(line)
        human_input = "\n".join(content)
        return human_input

    def run(self,
            input_data: Dict[str, Any]) -> Dict[str, Any]:

        query_message = self._get_message(self.query_message_prompt_template, input_data)
        state_update_message = UpdateMessage_Generic(
            created_by=self.flow_config['name'],
            updated_flow=self.flow_config["name"],
            data={"query_message": query_message},
        )
        self._log_message(state_update_message)

        log.info(query_message)
        human_input = self._read_input()

        return {"human_input": human_input}