Elron commited on
Commit
d292ceb
·
1 Parent(s): 7e6fc99

Upload operators.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operators.py +112 -16
operators.py CHANGED
@@ -1,6 +1,7 @@
1
  from dataclasses import field
2
  from typing import Any, Dict, Generator, Iterable, List, Optional, Union
3
 
 
4
  from .artifact import Artifact, fetch_artifact
5
  from .operator import (
6
  MultiStream,
@@ -10,16 +11,33 @@ from .operator import (
10
  Stream,
11
  StreamInitializerOperator,
12
  StreamInstanceOperator,
 
13
  )
14
  from .stream import MultiStream, Stream
 
 
 
15
 
16
 
17
  class FromIterables(StreamInitializerOperator):
 
 
 
 
 
 
18
  def process(self, iterables: Dict[str, Iterable]) -> MultiStream:
19
  return MultiStream.from_iterables(iterables)
20
 
21
 
22
  class MapInstanceValues(StreamInstanceOperator):
 
 
 
 
 
 
 
23
  mappers: Dict[str, Dict[str, str]]
24
  strict: bool = True
25
 
@@ -45,30 +63,53 @@ class MapInstanceValues(StreamInstanceOperator):
45
  return result
46
 
47
 
48
- def flatten_dict(d: Dict[str, Any], parent_key: str = "", sep: str = "_") -> Dict[str, Any]:
49
- items = []
50
- for k, v in d.items():
51
- new_key = parent_key + sep + k if parent_key else k
52
- if isinstance(v, dict):
53
- items.extend(flatten_dict(v, new_key, sep=sep).items())
54
- else:
55
- items.append((new_key, v))
56
- return dict(items)
57
-
58
-
59
  class FlattenInstances(StreamInstanceOperator):
 
 
 
 
 
 
 
 
 
 
60
  def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
61
- return flatten_dict(instance)
62
 
63
 
64
  class AddFields(StreamInstanceOperator):
 
 
 
 
 
 
65
  fields: Dict[str, object]
66
 
67
  def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
68
- return {**instance, **self.fields}
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
 
71
  class ArtifactFetcherMixin:
 
 
 
 
 
 
72
  cache: Dict[str, Artifact] = {}
73
 
74
  @classmethod
@@ -80,6 +121,14 @@ class ArtifactFetcherMixin:
80
 
81
 
82
  class ApplyValueOperatorsField(StreamInstanceOperator, ArtifactFetcherMixin):
 
 
 
 
 
 
 
 
83
  value_field: str
84
  operators_field: str
85
  default_operators: List[str] = None
@@ -103,6 +152,12 @@ class ApplyValueOperatorsField(StreamInstanceOperator, ArtifactFetcherMixin):
103
 
104
 
105
  class FilterByValues(SingleStreamOperator):
 
 
 
 
 
 
106
  values: Dict[str, Any]
107
 
108
  def process(self, stream: Stream, stream_name: str = None) -> Generator:
@@ -112,6 +167,12 @@ class FilterByValues(SingleStreamOperator):
112
 
113
 
114
  class Unique(SingleStreamReducer):
 
 
 
 
 
 
115
  fields: List[str] = field(default_factory=list)
116
 
117
  @staticmethod
@@ -133,10 +194,13 @@ class Unique(SingleStreamReducer):
133
  return list(seen)
134
 
135
 
136
- from .text_utils import nested_tuple_to_string
137
-
138
-
139
  class SplitByValue(MultiStreamOperator):
 
 
 
 
 
 
140
  fields: List[str] = field(default_factory=list)
141
 
142
  def process(self, multi_stream: MultiStream) -> MultiStream:
@@ -156,6 +220,13 @@ class SplitByValue(MultiStreamOperator):
156
 
157
 
158
  class ApplyStreamOperatorsField(SingleStreamOperator, ArtifactFetcherMixin):
 
 
 
 
 
 
 
159
  field: str
160
  reversed: bool = False
161
 
@@ -180,6 +251,12 @@ class ApplyStreamOperatorsField(SingleStreamOperator, ArtifactFetcherMixin):
180
 
181
 
182
  class AddFieldNamePrefix(StreamInstanceOperator):
 
 
 
 
 
 
183
  prefix_dict: Dict[str, str]
184
 
185
  def prepare(self):
@@ -190,6 +267,14 @@ class AddFieldNamePrefix(StreamInstanceOperator):
190
 
191
 
192
  class MergeStreams(MultiStreamOperator):
 
 
 
 
 
 
 
 
193
  new_stream_name: str = "all"
194
  add_origin_stream_name: bool = True
195
  origin_stream_name_field_name: str = "origin"
@@ -203,3 +288,14 @@ class MergeStreams(MultiStreamOperator):
203
 
204
  def process(self, multi_stream: MultiStream) -> MultiStream:
205
  return MultiStream({self.new_stream_name: Stream(self.merge, gen_kwargs={"multi_stream": multi_stream})})
 
 
 
 
 
 
 
 
 
 
 
 
1
  from dataclasses import field
2
  from typing import Any, Dict, Generator, Iterable, List, Optional, Union
3
 
4
+ from .text_utils import nested_tuple_to_string
5
  from .artifact import Artifact, fetch_artifact
6
  from .operator import (
7
  MultiStream,
 
11
  Stream,
12
  StreamInitializerOperator,
13
  StreamInstanceOperator,
14
+ PagedStreamOperator,
15
  )
16
  from .stream import MultiStream, Stream
17
+ from .utils import flatten_dict
18
+ import random
19
+ from .utils import dict_query
20
 
21
 
22
  class FromIterables(StreamInitializerOperator):
23
+ """
24
+ Creates a MultiStream from iterables.
25
+
26
+ Args:
27
+ iterables (Dict[str, Iterable]): A dictionary where each key-value pair represents a stream name and its corresponding iterable.
28
+ """
29
  def process(self, iterables: Dict[str, Iterable]) -> MultiStream:
30
  return MultiStream.from_iterables(iterables)
31
 
32
 
33
  class MapInstanceValues(StreamInstanceOperator):
34
+ """
35
+ Maps values in each instance of a stream based on the provided mappers.
36
+
37
+ Args:
38
+ mappers (Dict[str, Dict[str, str]]): A dictionary where each key-value pair represents a field in the instance and a mapper for that field.
39
+ strict (bool): If True, the operator will raise a KeyError if a value is not in its corresponding mapper. If False, unmapped values will be left unchanged. Defaults to True.
40
+ """
41
  mappers: Dict[str, Dict[str, str]]
42
  strict: bool = True
43
 
 
63
  return result
64
 
65
 
 
 
 
 
 
 
 
 
 
 
 
66
  class FlattenInstances(StreamInstanceOperator):
67
+ """
68
+ Flattens each instance in a stream, making nested dictionary entries into top-level entries.
69
+
70
+ Args:
71
+ parent_key (str): A prefix to use for the flattened keys. Defaults to an empty string.
72
+ sep (str): The separator to use when concatenating nested keys. Defaults to "_".
73
+ """
74
+ parent_key: str = ""
75
+ sep: str = "_"
76
+
77
  def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
78
+ return flatten_dict(instance, parent_key=self.parent_key, sep=self.sep)
79
 
80
 
81
  class AddFields(StreamInstanceOperator):
82
+ """
83
+ Adds specified fields to each instance in a stream.
84
+
85
+ Args:
86
+ fields (Dict[str, object]): The fields to add to each instance.
87
+ """
88
  fields: Dict[str, object]
89
 
90
  def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
91
+ instance.update(self.fields)
92
+ return instance
93
+
94
+
95
+ class MapNestedDictValuesByQueries(StreamInstanceOperator):
96
+ field_to_query: Dict[str, str]
97
+
98
+ def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
99
+ updates = {}
100
+ for field, query in self.field_to_query.items():
101
+ updates[field] = dict_query(instance, query)
102
+ instance.update(updates)
103
+ return instance
104
 
105
 
106
  class ArtifactFetcherMixin:
107
+ """
108
+ Provides a way to fetch and cache artifacts in the system.
109
+
110
+ Args:
111
+ cache (Dict[str, Artifact]): A cache for storing fetched artifacts.
112
+ """
113
  cache: Dict[str, Artifact] = {}
114
 
115
  @classmethod
 
121
 
122
 
123
  class ApplyValueOperatorsField(StreamInstanceOperator, ArtifactFetcherMixin):
124
+ """
125
+ Applies value operators to each instance in a stream based on specified fields.
126
+
127
+ Args:
128
+ value_field (str): The field containing the value to be operated on.
129
+ operators_field (str): The field containing the operators to be applied.
130
+ default_operators (List[str]): A list of default operators to be used if no operators are found in the instance.
131
+ """
132
  value_field: str
133
  operators_field: str
134
  default_operators: List[str] = None
 
152
 
153
 
154
  class FilterByValues(SingleStreamOperator):
155
+ """
156
+ Filters a stream, yielding only instances that match specified values.
157
+
158
+ Args:
159
+ values (Dict[str, Any]): The values that instances should match to be included in the output.
160
+ """
161
  values: Dict[str, Any]
162
 
163
  def process(self, stream: Stream, stream_name: str = None) -> Generator:
 
167
 
168
 
169
  class Unique(SingleStreamReducer):
170
+ """
171
+ Reduces a stream to unique instances based on specified fields.
172
+
173
+ Args:
174
+ fields (List[str]): The fields that should be unique in each instance.
175
+ """
176
  fields: List[str] = field(default_factory=list)
177
 
178
  @staticmethod
 
194
  return list(seen)
195
 
196
 
 
 
 
197
  class SplitByValue(MultiStreamOperator):
198
+ """
199
+ Splits a MultiStream into multiple streams based on unique values in specified fields.
200
+
201
+ Args:
202
+ fields (List[str]): The fields to use when splitting the MultiStream.
203
+ """
204
  fields: List[str] = field(default_factory=list)
205
 
206
  def process(self, multi_stream: MultiStream) -> MultiStream:
 
220
 
221
 
222
  class ApplyStreamOperatorsField(SingleStreamOperator, ArtifactFetcherMixin):
223
+ """
224
+ Applies stream operators to a stream based on specified fields in each instance.
225
+
226
+ Args:
227
+ field (str): The field containing the operators to be applied.
228
+ reversed (bool): Whether to apply the operators in reverse order.
229
+ """
230
  field: str
231
  reversed: bool = False
232
 
 
251
 
252
 
253
  class AddFieldNamePrefix(StreamInstanceOperator):
254
+ """
255
+ Adds a prefix to each field name in each instance of a stream.
256
+
257
+ Args:
258
+ prefix_dict (Dict[str, str]): A dictionary mapping stream names to prefixes.
259
+ """
260
  prefix_dict: Dict[str, str]
261
 
262
  def prepare(self):
 
267
 
268
 
269
  class MergeStreams(MultiStreamOperator):
270
+ """
271
+ Merges multiple streams into a single stream.
272
+
273
+ Args:
274
+ new_stream_name (str): The name of the new stream resulting from the merge.
275
+ add_origin_stream_name (bool): Whether to add the origin stream name to each instance.
276
+ origin_stream_name_field_name (str): The field name for the origin stream name.
277
+ """
278
  new_stream_name: str = "all"
279
  add_origin_stream_name: bool = True
280
  origin_stream_name_field_name: str = "origin"
 
288
 
289
  def process(self, multi_stream: MultiStream) -> MultiStream:
290
  return MultiStream({self.new_stream_name: Stream(self.merge, gen_kwargs={"multi_stream": multi_stream})})
291
+
292
+ class Shuffle(PagedStreamOperator):
293
+ """
294
+ Shuffles the order of instances in each page of a stream.
295
+
296
+ Args:
297
+ page_size (int): The size of each page in the stream. Defaults to 1000.
298
+ """
299
+ def process(self, page: List[Dict], stream_name: str = None) -> Generator:
300
+ random.shuffle(page)
301
+ yield from page