File size: 3,798 Bytes
d10b65b d08fbc6 0a1b314 d10b65b cc5f321 d10b65b c5f8a6a d10b65b 0a1b314 d10b65b d08fbc6 d10b65b cc5f321 d08fbc6 d10b65b cc5f321 d08fbc6 d10b65b d08fbc6 0a1b314 d10b65b cc5f321 d10b65b 9d5b4c0 cc5f321 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
from typing import Any, Generator, List, Optional
from .dict_utils import dict_get, dict_set
from .operators import FieldOperator, StreamOperator
from .stream import Stream
from .utils import recursive_shallow_copy
class Dictify(FieldOperator):
with_keys: List[str]
def process_value(self, tup: Any) -> Any:
return dict(zip(self.with_keys, tup))
class Wrap(FieldOperator):
inside: str
def verify(self):
super().verify()
if self.inside not in ["list", "tuple", "set"]:
raise ValueError(
f"Wrap.inside support only types: [list, tuple, set], got {self.inside}"
)
def process_value(self, value: Any) -> Any:
if self.inside == "list":
return [value]
if self.inside == "tuple":
return (value,)
return {
value,
}
class Chunk(FieldOperator):
size: int
def process_value(self, collection: Any) -> Any:
return [
collection[i : i + self.size] for i in range(0, len(collection), self.size)
]
class Slice(FieldOperator):
start: Optional[int] = None
stop: Optional[int] = None
step: Optional[int] = None
def process_value(self, collection: Any) -> Any:
slicer = slice(self.start, self.stop, self.step)
return collection[slicer]
class Get(FieldOperator):
item: Any
def process_value(self, collection: Any) -> Any:
return collection[self.item]
class DuplicateByList(StreamOperator):
field: str
to_field: Optional[str] = None
use_deep_copy: bool = False
def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
to_field = self.field if self.to_field is None else self.to_field
for instance in stream:
elements = dict_get(instance, self.field)
for element in elements:
if self.use_deep_copy:
instance_copy = recursive_shallow_copy(instance)
else:
instance_copy = instance.copy()
dict_set(instance_copy, to_field, element)
yield instance_copy
class Explode(DuplicateByList):
pass
class DuplicateBySubLists(StreamOperator):
field: str
to_field: Optional[str] = None
use_deep_copy: bool = False
def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
to_field = self.field if self.to_field is None else self.to_field
for instance in stream:
elements = instance[self.field]
for i in range(1, len(elements) + 1):
if self.use_deep_copy:
instance_copy = recursive_shallow_copy(instance)
instance_copy[to_field] = elements[:i]
else:
instance_copy = {
**instance,
self.field: elements,
to_field: elements[:i],
}
yield instance_copy
class GetLength(FieldOperator):
def process_value(self, collection: Any) -> Any:
return len(collection)
class Filter(FieldOperator):
values: List[Any]
def process_value(self, collection: Any) -> Any:
# If collection is a list, tuple, or set
if isinstance(collection, (list, set, tuple)):
return type(collection)(
item for item in collection if item not in self.values
)
# If collection is a dictionary, filter by keys
if isinstance(collection, dict):
return {k: v for k, v in collection.items() if k not in self.values}
# If collection is of an unsupported type
raise TypeError(f"Unsupported collection type: {type(collection)}")
|