Elron commited on
Commit
1fbdf4c
·
1 Parent(s): 64458da

Upload operator.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operator.py +123 -32
operator.py CHANGED
@@ -30,18 +30,56 @@ class OperatorError(Exception):
30
 
31
 
32
  class StreamingOperator(Artifact):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  @abstractmethod
34
  def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
35
- pass
 
 
 
 
 
 
 
 
36
 
37
 
38
  class StreamSource(StreamingOperator):
 
 
 
 
 
 
 
 
39
  @abstractmethod
40
  def __call__(self) -> MultiStream:
41
  pass
42
 
43
 
44
  class SourceOperator(StreamSource):
 
 
 
 
 
 
 
 
45
  def __call__(self) -> MultiStream:
46
  return self.process()
47
 
@@ -51,6 +89,15 @@ class SourceOperator(StreamSource):
51
 
52
 
53
  class StreamInitializerOperator(StreamSource):
 
 
 
 
 
 
 
 
 
54
  def __call__(self, *args, **kwargs) -> MultiStream:
55
  return self.process(*args, **kwargs)
56
 
@@ -60,6 +107,11 @@ class StreamInitializerOperator(StreamSource):
60
 
61
 
62
  class MultiStreamOperator(StreamingOperator):
 
 
 
 
 
63
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
64
  return self._process_multi_stream(multi_stream)
65
 
@@ -74,6 +126,11 @@ class MultiStreamOperator(StreamingOperator):
74
 
75
 
76
  class SingleStreamOperator(MultiStreamOperator):
 
 
 
 
 
77
  def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
78
  result = {}
79
  for stream_name, stream in multi_stream.items():
@@ -94,17 +151,34 @@ class SingleStreamOperator(MultiStreamOperator):
94
  pass
95
 
96
 
97
- # class StreamGeneratorOperator(SingleStreamOperator):
 
 
98
 
99
- # def stream(self, stream):
100
- # return Stream(self.process, gen_kwargs={'stream': stream})
 
 
 
 
 
 
 
 
 
 
101
 
102
- # @abstractmethod
103
- # def process(self, stream: Stream) -> Generator:
104
- # yield None
105
 
106
 
107
  class SingleStreamReducer(StreamingOperator):
 
 
 
 
 
108
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> Dict[str, Any]:
109
  result = {}
110
  for stream_name, stream in multi_stream.items():
@@ -119,6 +193,11 @@ class SingleStreamReducer(StreamingOperator):
119
 
120
 
121
  class StreamInstanceOperator(SingleStreamOperator):
 
 
 
 
 
122
  def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
123
  for instance in stream:
124
  yield self._process_instance(instance, stream_name)
@@ -132,6 +211,11 @@ class StreamInstanceOperator(SingleStreamOperator):
132
 
133
 
134
  class StreamInstanceOperatorValidator(StreamInstanceOperator):
 
 
 
 
 
135
  @abstractmethod
136
  def validate(self, instance):
137
  pass
@@ -146,6 +230,11 @@ class StreamInstanceOperatorValidator(StreamInstanceOperator):
146
 
147
 
148
  class InstanceOperator(Artifact):
 
 
 
 
 
149
  def __call__(self, data: dict) -> dict:
150
  return self.process(data)
151
 
@@ -155,6 +244,11 @@ class InstanceOperator(Artifact):
155
 
156
 
157
  class FieldOperator(Artifact):
 
 
 
 
 
158
  def __call__(self, data: Dict[str, Any], field: str) -> dict:
159
  value = self.process(data[field])
160
  data[field] = value
@@ -165,32 +259,14 @@ class FieldOperator(Artifact):
165
  pass
166
 
167
 
168
- # class NamedStreamInstanceOperator(StreamingOperator):
169
-
170
- # def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
171
- # result = {}
172
- # for stream_name, stream in multi_stream.items():
173
- # stream = Stream(self.generator, gen_kwargs={'stream': stream, 'stream_name': stream_name})
174
- # result[stream_name] = stream
175
- # return MultiStream(result)
176
-
177
- # def verify_first_instance(self, instance):
178
- # pass
179
-
180
- # def generator(self, stream, stream_name):
181
- # iterator = iter(stream)
182
- # first_instance = next(iterator)
183
- # result = self.process(first_instance, stream_name)
184
- # self.verify_first_instance(result)
185
- # yield result
186
- # yield from (self.process(instance) for instance in iterator)
187
-
188
- # @abstractmethod
189
- # def process(self, instance: dict, stream_name: str) -> dict:
190
- # pass
191
-
192
-
193
  class InstanceOperatorWithGlobalAccess(StreamingOperator):
 
 
 
 
 
 
 
194
  accessible_streams: Union[MultiStream, List[str]] = None
195
  cache_accessible_streams: bool = True
196
 
@@ -222,6 +298,11 @@ class InstanceOperatorWithGlobalAccess(StreamingOperator):
222
 
223
 
224
  class SequntialOperator(MultiStreamOperator):
 
 
 
 
 
225
  steps: List[StreamingOperator] = field(default_factory=list)
226
 
227
  def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
@@ -231,6 +312,11 @@ class SequntialOperator(MultiStreamOperator):
231
 
232
 
233
  class SourceSequntialOperator(SequntialOperator):
 
 
 
 
 
234
  def __call__(self) -> MultiStream:
235
  return super().__call__()
236
 
@@ -242,6 +328,11 @@ class SourceSequntialOperator(SequntialOperator):
242
 
243
 
244
  class SequntialOperatorInitilizer(SequntialOperator):
 
 
 
 
 
245
  def __call__(self, *args, **kwargs) -> MultiStream:
246
  return self.process(*args, **kwargs)
247
 
 
30
 
31
 
32
  class StreamingOperator(Artifact):
33
+ """
34
+ Base class for all stream operators in the streaming model.
35
+
36
+ Stream operators are a key component of the streaming model and are responsible for processing continuous data streams.
37
+ They perform operations such as transformations, aggregations, joins, windowing and more on these streams.
38
+ There are several types of stream operators, including source operators, processing operators, etc.
39
+
40
+ As a `StreamingOperator`, this class is responsible for performing operations on a stream, and must be implemented by all other specific types of stream operators in the system.
41
+ When called, a `StreamingOperator` must return a MultiStream.
42
+
43
+ As a subclass of `Artifact`, every `StreamingOperator` can be saved in a catalog for further usage or reference.
44
+
45
+ """
46
+
47
  @abstractmethod
48
  def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
49
+ """
50
+ Abstract method that performs operations on the stream.
51
+
52
+ Args:
53
+ streams (Optional[MultiStream]): The input MultiStream, which can be None.
54
+
55
+ Returns:
56
+ MultiStream: The output MultiStream resulting from the operations performed on the input.
57
+ """
58
 
59
 
60
  class StreamSource(StreamingOperator):
61
+ """
62
+ A class representing a stream source operator in the streaming system.
63
+
64
+ A stream source operator is a special type of `StreamingOperator` that generates a data stream without taking any input streams. It serves as the starting point in a stream processing pipeline, providing the initial data that other operators in the pipeline can process.
65
+
66
+ When called, a `StreamSource` should generate a `MultiStream`. This behavior must be implemented by any classes that inherit from `StreamSource`.
67
+
68
+ """
69
  @abstractmethod
70
  def __call__(self) -> MultiStream:
71
  pass
72
 
73
 
74
  class SourceOperator(StreamSource):
75
+ """
76
+ A class representing a source operator in the streaming system.
77
+
78
+ A source operator is responsible for generating the data stream from some source, such as a database or a file. This is the starting point of a stream processing pipeline. The `SourceOperator` class is a type of `StreamSource`, which is a special type of `StreamingOperator` that generates an output stream but does not take any input streams.
79
+
80
+ When called, a `SourceOperator` invokes its `process` method, which should be implemented by all subclasses to generate the required `MultiStream`.
81
+
82
+ """
83
  def __call__(self) -> MultiStream:
84
  return self.process()
85
 
 
89
 
90
 
91
  class StreamInitializerOperator(StreamSource):
92
+ """
93
+ A class representing a stream initializer operator in the streaming system.
94
+
95
+ A stream initializer operator is a special type of `StreamSource` that is capable of taking parameters during the stream generation process. This can be useful in situations where the stream generation process needs to be customized or configured based on certain parameters.
96
+
97
+ When called, a `StreamInitializerOperator` invokes its `process` method, passing any supplied arguments and keyword arguments. The `process` method should be implemented by all subclasses to generate the required `MultiStream` based on the given arguments and keyword arguments.
98
+
99
+ """
100
+
101
  def __call__(self, *args, **kwargs) -> MultiStream:
102
  return self.process(*args, **kwargs)
103
 
 
107
 
108
 
109
  class MultiStreamOperator(StreamingOperator):
110
+ """
111
+ A class representing a multi-stream operator in the streaming system.
112
+
113
+ A multi-stream operator is a type of `StreamingOperator` that operates on an entire MultiStream object at once. It takes a `MultiStream` as input and produces a `MultiStream` as output. The `process` method should be implemented by subclasses to define the specific operations to be performed on the input `MultiStream`.
114
+ """
115
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
116
  return self._process_multi_stream(multi_stream)
117
 
 
126
 
127
 
128
  class SingleStreamOperator(MultiStreamOperator):
129
+ """
130
+ A class representing a single-stream operator in the streaming system.
131
+
132
+ A single-stream operator is a type of `MultiStreamOperator` that operates on individual `Stream` objects within a `MultiStream`. It iterates through each `Stream` in the `MultiStream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each `Stream`.
133
+ """
134
  def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
135
  result = {}
136
  for stream_name, stream in multi_stream.items():
 
151
  pass
152
 
153
 
154
+ class PagedStreamOperator(SingleStreamOperator):
155
+ """
156
+ A class representing a paged-stream operator in the streaming system.
157
 
158
+ A paged-stream operator is a type of `SingleStreamOperator` that operates on a page of instances in a `Stream` at a time, where a page is a subset of instances. The `process` method should be implemented by subclasses to define the specific operations to be performed on each page.
159
+ """
160
+ page_size: int = 1000
161
+
162
+ def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
163
+ page = []
164
+ for instance in stream:
165
+ page.append(instance)
166
+ if len(page) >= self.page_size:
167
+ yield from self.process(page, stream_name)
168
+ page = []
169
+ yield from self.process(page, stream_name)
170
 
171
+ @abstractmethod
172
+ def process(self, page: List[Dict], stream_name: str = None) -> Generator:
173
+ pass
174
 
175
 
176
  class SingleStreamReducer(StreamingOperator):
177
+ """
178
+ A class representing a single-stream reducer in the streaming system.
179
+
180
+ A single-stream reducer is a type of `StreamingOperator` that operates on individual `Stream` objects within a `MultiStream` and reduces each `Stream` to a single output value. The `process` method should be implemented by subclasses to define the specific reduction operation to be performed on each `Stream`.
181
+ """
182
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> Dict[str, Any]:
183
  result = {}
184
  for stream_name, stream in multi_stream.items():
 
193
 
194
 
195
  class StreamInstanceOperator(SingleStreamOperator):
196
+ """
197
+ A class representing a stream instance operator in the streaming system.
198
+
199
+ A stream instance operator is a type of `SingleStreamOperator` that operates on individual instances within a `Stream`. It iterates through each instance in the `Stream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each instance.
200
+ """
201
  def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
202
  for instance in stream:
203
  yield self._process_instance(instance, stream_name)
 
211
 
212
 
213
  class StreamInstanceOperatorValidator(StreamInstanceOperator):
214
+ """
215
+ A class representing a stream instance operator validator in the streaming system.
216
+
217
+ A stream instance operator validator is a type of `StreamInstanceOperator` that includes a validation step. It operates on individual instances within a `Stream` and validates the result of processing each instance.
218
+ """
219
  @abstractmethod
220
  def validate(self, instance):
221
  pass
 
230
 
231
 
232
  class InstanceOperator(Artifact):
233
+ """
234
+ A class representing an instance operator in the streaming system.
235
+
236
+ An instance operator is a type of `Artifact` that operates on a single instance (represented as a dict) at a time. It takes an instance as input and produces a transformed instance as output.
237
+ """
238
  def __call__(self, data: dict) -> dict:
239
  return self.process(data)
240
 
 
244
 
245
 
246
  class FieldOperator(Artifact):
247
+ """
248
+ A class representing a field operator in the streaming system.
249
+
250
+ A field operator is a type of `Artifact` that operates on a single field within an instance. It takes an instance and a field name as input, processes the field, and updates the field in the instance with the processed value.
251
+ """
252
  def __call__(self, data: Dict[str, Any], field: str) -> dict:
253
  value = self.process(data[field])
254
  data[field] = value
 
259
  pass
260
 
261
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
262
  class InstanceOperatorWithGlobalAccess(StreamingOperator):
263
+ """
264
+ A class representing an instance operator with global access in the streaming system.
265
+
266
+ An instance operator with global access is a type of `StreamingOperator` that operates on individual instances within a `Stream` and can also access other streams.
267
+ It uses the `accessible_streams` attribute to determine which other streams it has access to.
268
+ In order to make this efficient and to avoid qudratic complexity, it caches the accessible streams by default.
269
+ """
270
  accessible_streams: Union[MultiStream, List[str]] = None
271
  cache_accessible_streams: bool = True
272
 
 
298
 
299
 
300
  class SequntialOperator(MultiStreamOperator):
301
+ """
302
+ A class representing a sequential operator in the streaming system.
303
+
304
+ A sequential operator is a type of `MultiStreamOperator` that applies a sequence of other operators to a `MultiStream`. It maintains a list of `StreamingOperator`s and applies them in order to the `MultiStream`.
305
+ """
306
  steps: List[StreamingOperator] = field(default_factory=list)
307
 
308
  def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
 
312
 
313
 
314
  class SourceSequntialOperator(SequntialOperator):
315
+ """
316
+ A class representing a source sequential operator in the streaming system.
317
+
318
+ A source sequential operator is a type of `SequntialOperator` that starts with a source operator. The first operator in its list of steps is a `StreamSource`, which generates the initial `MultiStream` that the other operators then process.
319
+ """
320
  def __call__(self) -> MultiStream:
321
  return super().__call__()
322
 
 
328
 
329
 
330
  class SequntialOperatorInitilizer(SequntialOperator):
331
+ """
332
+ A class representing a sequential operator initializer in the streaming system.
333
+
334
+ A sequential operator initializer is a type of `SequntialOperator` that starts with a stream initializer operator. The first operator in its list of steps is a `StreamInitializerOperator`, which generates the initial `MultiStream` based on the provided arguments and keyword arguments.
335
+ """
336
  def __call__(self, *args, **kwargs) -> MultiStream:
337
  return self.process(*args, **kwargs)
338