Elron commited on
Commit
a66b8be
·
verified ·
1 Parent(s): 89c6547

Upload operator.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operator.py +41 -24
operator.py CHANGED
@@ -5,7 +5,6 @@ from typing import Any, Dict, Generator, List, Optional
5
 
6
  from .artifact import Artifact
7
  from .dataclass import NonPositionalField
8
- from .random_utils import nested_seed
9
  from .stream import MultiStream, Stream
10
 
11
 
@@ -75,20 +74,23 @@ class StreamSource(StreamingOperator):
75
  class SourceOperator(StreamSource):
76
  """A class representing a source operator in the streaming system.
77
 
78
- A source operator is responsible for generating the data stream from some source, such as a database or a file. This is the starting point of a stream processing pipeline. The `SourceOperator` class is a type of `StreamSource`, which is a special type of `StreamingOperator` that generates an output stream but does not take any input streams.
 
 
 
79
 
80
- When called, a `SourceOperator` invokes its `process` method, which should be implemented by all subclasses to generate the required `MultiStream`.
 
81
 
82
  """
83
 
84
  caching: bool = NonPositionalField(default=None)
85
 
86
  def __call__(self) -> MultiStream:
87
- with nested_seed():
88
- multi_stream = self.process()
89
- if self.caching is not None:
90
- multi_stream.set_caching(self.caching)
91
- return multi_stream
92
 
93
  @abstractmethod
94
  def process(self) -> MultiStream:
@@ -107,11 +109,10 @@ class StreamInitializerOperator(StreamSource):
107
  caching: bool = NonPositionalField(default=None)
108
 
109
  def __call__(self, *args, **kwargs) -> MultiStream:
110
- with nested_seed():
111
- multi_stream = self.process(*args, **kwargs)
112
- if self.caching is not None:
113
- multi_stream.set_caching(self.caching)
114
- return self.process(*args, **kwargs)
115
 
116
  @abstractmethod
117
  def process(self, *args, **kwargs) -> MultiStream:
@@ -127,11 +128,14 @@ class MultiStreamOperator(StreamingOperator):
127
  caching: bool = NonPositionalField(default=None)
128
 
129
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
130
- with nested_seed():
131
- result = self._process_multi_stream(multi_stream)
132
- if self.caching is not None:
133
- result.set_caching(self.caching)
134
- return result
 
 
 
135
 
136
  def _process_multi_stream(
137
  self, multi_stream: Optional[MultiStream] = None
@@ -150,7 +154,12 @@ class MultiStreamOperator(StreamingOperator):
150
  class SingleStreamOperator(MultiStreamOperator):
151
  """A class representing a single-stream operator in the streaming system.
152
 
153
- A single-stream operator is a type of `MultiStreamOperator` that operates on individual `Stream` objects within a `MultiStream`. It iterates through each `Stream` in the `MultiStream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each `Stream`.
 
 
 
 
 
154
  """
155
 
156
  apply_to_streams: List[str] = NonPositionalField(
@@ -211,7 +220,13 @@ class SingleStreamOperator(MultiStreamOperator):
211
  class PagedStreamOperator(SingleStreamOperator):
212
  """A class representing a paged-stream operator in the streaming system.
213
 
214
- A paged-stream operator is a type of `SingleStreamOperator` that operates on a page of instances in a `Stream` at a time, where a page is a subset of instances. The `process` method should be implemented by subclasses to define the specific operations to be performed on each page.
 
 
 
 
 
 
215
  """
216
 
217
  page_size: int = 1000
@@ -371,7 +386,8 @@ class InstanceOperatorWithMultiStreamAccess(StreamingOperator):
371
  class SequentialOperator(MultiStreamOperator):
372
  """A class representing a sequential operator in the streaming system.
373
 
374
- A sequential operator is a type of `MultiStreamOperator` that applies a sequence of other operators to a `MultiStream`. It maintains a list of `StreamingOperator`s and applies them in order to the `MultiStream`.
 
375
  """
376
 
377
  max_steps = None
@@ -407,7 +423,9 @@ class SequentialOperator(MultiStreamOperator):
407
  class SourceSequentialOperator(SequentialOperator):
408
  """A class representing a source sequential operator in the streaming system.
409
 
410
- A source sequential operator is a type of `SequntialOperator` that starts with a source operator. The first operator in its list of steps is a `StreamSource`, which generates the initial `MultiStream` that the other operators then process.
 
 
411
  """
412
 
413
  def __call__(self) -> MultiStream:
@@ -430,8 +448,7 @@ class SequentialOperatorInitilizer(SequentialOperator):
430
  """
431
 
432
  def __call__(self, *args, **kwargs) -> MultiStream:
433
- with nested_seed():
434
- return self.process(*args, **kwargs)
435
 
436
  def process(self, *args, **kwargs) -> MultiStream:
437
  assert (
 
5
 
6
  from .artifact import Artifact
7
  from .dataclass import NonPositionalField
 
8
  from .stream import MultiStream, Stream
9
 
10
 
 
74
  class SourceOperator(StreamSource):
75
  """A class representing a source operator in the streaming system.
76
 
77
+ A source operator is responsible for generating the data stream from some source, such as a database or a file.
78
+ This is the starting point of a stream processing pipeline.
79
+ The `SourceOperator` class is a type of `StreamSource`, which is a special type of `StreamingOperator`
80
+ that generates an output stream but does not take any input streams.
81
 
82
+ When called, a `SourceOperator` invokes its `process` method, which should be implemented by all subclasses
83
+ to generate the required `MultiStream`.
84
 
85
  """
86
 
87
  caching: bool = NonPositionalField(default=None)
88
 
89
  def __call__(self) -> MultiStream:
90
+ multi_stream = self.process()
91
+ if self.caching is not None:
92
+ multi_stream.set_caching(self.caching)
93
+ return multi_stream
 
94
 
95
  @abstractmethod
96
  def process(self) -> MultiStream:
 
109
  caching: bool = NonPositionalField(default=None)
110
 
111
  def __call__(self, *args, **kwargs) -> MultiStream:
112
+ multi_stream = self.process(*args, **kwargs)
113
+ if self.caching is not None:
114
+ multi_stream.set_caching(self.caching)
115
+ return self.process(*args, **kwargs)
 
116
 
117
  @abstractmethod
118
  def process(self, *args, **kwargs) -> MultiStream:
 
128
  caching: bool = NonPositionalField(default=None)
129
 
130
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
131
+ self.before_process_multi_stream()
132
+ result = self._process_multi_stream(multi_stream)
133
+ if self.caching is not None:
134
+ result.set_caching(self.caching)
135
+ return result
136
+
137
+ def before_process_multi_stream(self):
138
+ pass
139
 
140
  def _process_multi_stream(
141
  self, multi_stream: Optional[MultiStream] = None
 
154
  class SingleStreamOperator(MultiStreamOperator):
155
  """A class representing a single-stream operator in the streaming system.
156
 
157
+ A single-stream operator is a type of `MultiStreamOperator` that operates on individual
158
+ `Stream` objects within a `MultiStream`. It iterates through each `Stream` in the `MultiStream`
159
+ and applies the `process` method.
160
+ The `process` method should be implemented by subclasses to define the specific operations
161
+ to be performed on each `Stream`.
162
+
163
  """
164
 
165
  apply_to_streams: List[str] = NonPositionalField(
 
220
  class PagedStreamOperator(SingleStreamOperator):
221
  """A class representing a paged-stream operator in the streaming system.
222
 
223
+ A paged-stream operator is a type of `SingleStreamOperator` that operates on a page of instances
224
+ in a `Stream` at a time, where a page is a subset of instances.
225
+ The `process` method should be implemented by subclasses to define the specific operations
226
+ to be performed on each page.
227
+
228
+ Args:
229
+ page_size (int): The size of each page in the stream. Defaults to 1000.
230
  """
231
 
232
  page_size: int = 1000
 
386
  class SequentialOperator(MultiStreamOperator):
387
  """A class representing a sequential operator in the streaming system.
388
 
389
+ A sequential operator is a type of `MultiStreamOperator` that applies a sequence of other operators to a
390
+ `MultiStream`. It maintains a list of `StreamingOperator`s and applies them in order to the `MultiStream`.
391
  """
392
 
393
  max_steps = None
 
423
  class SourceSequentialOperator(SequentialOperator):
424
  """A class representing a source sequential operator in the streaming system.
425
 
426
+ A source sequential operator is a type of `SequentialOperator` that starts with a source operator.
427
+ The first operator in its list of steps is a `StreamSource`, which generates the initial `MultiStream`
428
+ that the other operators then process.
429
  """
430
 
431
  def __call__(self) -> MultiStream:
 
448
  """
449
 
450
  def __call__(self, *args, **kwargs) -> MultiStream:
451
+ return self.process(*args, **kwargs)
 
452
 
453
  def process(self, *args, **kwargs) -> MultiStream:
454
  assert (