File size: 5,720 Bytes
4a51346
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
from typing import Union, Tuple

from clickhouse_connect.driver.common import unescape_identifier


# pylint: disable=too-many-branches
def parse_callable(expr) -> Tuple[str, Tuple[Union[str, int], ...], str]:
    """
    Parses a single level ClickHouse optionally 'callable' function/identifier.  The identifier is returned as the
    first value in the response tuple.  If the expression is callable -- i.e. an identifier followed by 0 or more
    arguments in parentheses, the second returned value is a tuple of the comma separated arguments.  The third and
    final tuple value is any text remaining after the initial expression for further parsing/processing.

    Examples:
      "Tuple(String, Enum('one' = 1, 'two' = 2))" will return "Tuple", ("String", "Enum('one' = 1,'two' = 2)"), ""
      "MergeTree() PARTITION BY key" will return "MergeTree", (), "PARTITION BY key"

    :param expr:  ClickHouse DDL or Column Name expression
    :return: Tuple of the identifier, a tuple of arguments, and remaining text
    """
    expr = expr.strip()
    pos = expr.find('(')
    space = expr.find(' ')
    if pos == -1 and space == -1:
        return expr, (), ''
    if space != -1 and (pos == -1 or space < pos):
        return expr[:space], (), expr[space:].strip()
    name = expr[:pos]
    pos += 1  # Skip first paren
    values = []
    value = ''
    in_str = False
    level = 0

    def add_value():
        try:
            values.append(int(value))
        except ValueError:
            values.append(value)

    while True:
        char = expr[pos]
        pos += 1
        if in_str:
            value += char
            if char == "'":
                in_str = False
            elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:pos + 2] != "')":
                value += expr[pos]
                pos += 1
        else:
            if level == 0:
                if char == ' ':
                    space = pos
                    temp_char = expr[space]
                    while temp_char == ' ':
                        space += 1
                        temp_char = expr[space]
                    if not value or temp_char in "()',=><0":
                        char = temp_char
                        pos = space + 1
                if char == ',':
                    add_value()
                    value = ''
                    continue
                if char == ')':
                    break
            if char == "'" and (not value or 'Enum' in value):
                in_str = True
            elif char == '(':
                level += 1
            elif char == ')' and level:
                level -= 1
            value += char
    if value != '':
        add_value()
    return name, tuple(values), expr[pos:].strip()


def parse_enum(expr) -> Tuple[Tuple[str], Tuple[int]]:
    """
    Parse a ClickHouse enum definition expression of the form ('key1' = 1, 'key2' = 2)
    :param expr: ClickHouse enum expression/arguments
    :return: Parallel tuples of string enum keys and integer enum values
    """
    keys = []
    values = []
    pos = expr.find('(') + 1
    in_key = False
    key = []
    value = []
    while True:
        char = expr[pos]
        pos += 1
        if in_key:
            if char == "'":
                keys.append(''.join(key))
                key = []
                in_key = False
            elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:] != "')":
                key.append(expr[pos])
                pos += 1
            else:
                key.append(char)
        elif char not in (' ', '='):
            if char == ',':
                values.append(int(''.join(value)))
                value = []
            elif char == ')':
                values.append(int(''.join(value)))
                break
            elif char == "'" and not value:
                in_key = True
            else:
                value.append(char)
    values, keys = zip(*sorted(zip(values, keys)))
    return tuple(keys), tuple(values)


def parse_columns(expr: str):
    """
    Parse a ClickHouse column list of the form (col1 String, col2 Array(Tuple(String, Int32))).  This also handles
    unnamed columns (such as Tuple definitions).  Mixed named and unnamed columns are not currently supported.
    :param expr: ClickHouse enum expression/arguments
    :return: Parallel tuples of column types and column types (strings)
    """
    names = []
    columns = []
    pos = 1
    named = False
    level = 0
    label = ''
    in_str = False
    while True:
        char = expr[pos]
        pos += 1
        if in_str:
            if "'" == char:
                in_str = False
            elif char == '\\' and expr[pos] == "'" and expr[pos:pos + 4] != "' = " and expr[pos:pos + 2] != "')":
                label += expr[pos]
                pos += 1
        else:
            if level == 0:
                if char == ' ':
                    if label and not named:
                        names.append(unescape_identifier(label))
                        label = ''
                        named = True
                    char = ''
                elif char == ',':
                    columns.append(label)
                    named = False
                    label = ''
                    continue
                elif char == ')':
                    columns.append(label)
                    break
            if char == "'" and (not label or 'Enum' in label):
                in_str = True
            elif char == '(':
                level += 1
            elif char == ')':
                level -= 1
        label += char
    return tuple(names), tuple(columns)