File size: 4,150 Bytes
ed4d993
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import base64
import re
from typing import List, Union

from pydantic import BaseModel

from .core import Invoker, Prompty, SimpleModel


class PromptyChatParser(Invoker):
    def __init__(self, prompty: Prompty) -> None:
        self.prompty = prompty
        self.roles = ["assistant", "function", "system", "user", "human", "ai"]
        self.path = self.prompty.file.parent

    def inline_image(self, image_item: str) -> str:
        # pass through if it's a url or base64 encoded
        if image_item.startswith("http") or image_item.startswith("data"):
            return image_item
        # otherwise, it's a local file - need to base64 encode it
        else:
            image_path = self.path / image_item
            with open(image_path, "rb") as f:
                base64_image = base64.b64encode(f.read()).decode("utf-8")

            if image_path.suffix == ".png":
                return f"data:image/png;base64,{base64_image}"
            elif image_path.suffix == ".jpg":
                return f"data:image/jpeg;base64,{base64_image}"
            elif image_path.suffix == ".jpeg":
                return f"data:image/jpeg;base64,{base64_image}"
            else:
                raise ValueError(
                    f"Invalid image format {image_path.suffix} - currently only .png "
                    "and .jpg / .jpeg are supported."
                )

    def parse_content(self, content: str) -> Union[str, List]:
        """for parsing inline images"""
        # regular expression to parse markdown images
        image = r"(?P<alt>!\[[^\]]*\])\((?P<filename>.*?)(?=\"|\))\)"
        matches = re.findall(image, content, flags=re.MULTILINE)
        if len(matches) > 0:
            content_items = []
            content_chunks = re.split(image, content, flags=re.MULTILINE)
            current_chunk = 0
            for i in range(len(content_chunks)):
                # image entry
                if (
                    current_chunk < len(matches)
                    and content_chunks[i] == matches[current_chunk][0]
                ):
                    content_items.append(
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": self.inline_image(
                                    matches[current_chunk][1].split(" ")[0].strip()
                                )
                            },
                        }
                    )
                # second part of image entry
                elif (
                    current_chunk < len(matches)
                    and content_chunks[i] == matches[current_chunk][1]
                ):
                    current_chunk += 1
                # text entry
                else:
                    if len(content_chunks[i].strip()) > 0:
                        content_items.append(
                            {"type": "text", "text": content_chunks[i].strip()}
                        )
            return content_items
        else:
            return content

    def invoke(self, data: BaseModel) -> BaseModel:
        assert isinstance(data, SimpleModel)
        messages = []
        separator = r"(?i)^\s*#?\s*(" + "|".join(self.roles) + r")\s*:\s*\n"

        # get valid chunks - remove empty items
        chunks = [
            item
            for item in re.split(separator, data.item, flags=re.MULTILINE)
            if len(item.strip()) > 0
        ]

        # if no starter role, then inject system role
        if chunks[0].strip().lower() not in self.roles:
            chunks.insert(0, "system")

        # if last chunk is role entry, then remove (no content?)
        if chunks[-1].strip().lower() in self.roles:
            chunks.pop()

        if len(chunks) % 2 != 0:
            raise ValueError("Invalid prompt format")

        # create messages
        for i in range(0, len(chunks), 2):
            role = chunks[i].strip().lower()
            content = chunks[i + 1].strip()
            messages.append({"role": role, "content": self.parse_content(content)})

        return SimpleModel[list](item=messages)