File size: 3,798 Bytes
d10b65b
 
d08fbc6
0a1b314
d10b65b
cc5f321
d10b65b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c5f8a6a
 
 
 
 
 
 
 
 
d10b65b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0a1b314
d10b65b
 
 
 
 
 
 
d08fbc6
d10b65b
 
cc5f321
d08fbc6
d10b65b
cc5f321
d08fbc6
d10b65b
 
 
d08fbc6
 
 
 
0a1b314
d10b65b
 
 
 
 
 
 
 
 
 
cc5f321
d10b65b
 
 
 
 
 
 
 
9d5b4c0
 
 
 
 
cc5f321
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from typing import Any, Generator, List, Optional

from .dict_utils import dict_get, dict_set
from .operators import FieldOperator, StreamOperator
from .stream import Stream
from .utils import recursive_shallow_copy


class Dictify(FieldOperator):
    with_keys: List[str]

    def process_value(self, tup: Any) -> Any:
        return dict(zip(self.with_keys, tup))


class Wrap(FieldOperator):
    inside: str

    def verify(self):
        super().verify()
        if self.inside not in ["list", "tuple", "set"]:
            raise ValueError(
                f"Wrap.inside support only types: [list, tuple, set], got {self.inside}"
            )

    def process_value(self, value: Any) -> Any:
        if self.inside == "list":
            return [value]
        if self.inside == "tuple":
            return (value,)
        return {
            value,
        }


class Chunk(FieldOperator):
    size: int

    def process_value(self, collection: Any) -> Any:
        return [
            collection[i : i + self.size] for i in range(0, len(collection), self.size)
        ]


class Slice(FieldOperator):
    start: Optional[int] = None
    stop: Optional[int] = None
    step: Optional[int] = None

    def process_value(self, collection: Any) -> Any:
        slicer = slice(self.start, self.stop, self.step)
        return collection[slicer]


class Get(FieldOperator):
    item: Any

    def process_value(self, collection: Any) -> Any:
        return collection[self.item]


class DuplicateByList(StreamOperator):
    field: str
    to_field: Optional[str] = None
    use_deep_copy: bool = False

    def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
        to_field = self.field if self.to_field is None else self.to_field
        for instance in stream:
            elements = dict_get(instance, self.field)
            for element in elements:
                if self.use_deep_copy:
                    instance_copy = recursive_shallow_copy(instance)

                else:
                    instance_copy = instance.copy()
                dict_set(instance_copy, to_field, element)
                yield instance_copy


class Explode(DuplicateByList):
    pass


class DuplicateBySubLists(StreamOperator):
    field: str
    to_field: Optional[str] = None
    use_deep_copy: bool = False

    def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
        to_field = self.field if self.to_field is None else self.to_field
        for instance in stream:
            elements = instance[self.field]
            for i in range(1, len(elements) + 1):
                if self.use_deep_copy:
                    instance_copy = recursive_shallow_copy(instance)
                    instance_copy[to_field] = elements[:i]
                else:
                    instance_copy = {
                        **instance,
                        self.field: elements,
                        to_field: elements[:i],
                    }
                yield instance_copy


class GetLength(FieldOperator):
    def process_value(self, collection: Any) -> Any:
        return len(collection)


class Filter(FieldOperator):
    values: List[Any]

    def process_value(self, collection: Any) -> Any:
        # If collection is a list, tuple, or set
        if isinstance(collection, (list, set, tuple)):
            return type(collection)(
                item for item in collection if item not in self.values
            )

        # If collection is a dictionary, filter by keys
        if isinstance(collection, dict):
            return {k: v for k, v in collection.items() if k not in self.values}

        # If collection is of an unsupported type
        raise TypeError(f"Unsupported collection type: {type(collection)}")