Elron commited on
Commit
9e30502
·
verified ·
1 Parent(s): 5ab6d60

Upload operator.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operator.py +31 -4
operator.py CHANGED
@@ -4,14 +4,41 @@ from dataclasses import field
4
  from typing import Any, Dict, Generator, List, Optional
5
 
6
  from .artifact import Artifact
7
- from .dataclass import NonPositionalField
8
  from .stream import MultiStream, Stream
 
9
 
10
 
11
  class Operator(Artifact):
12
  pass
13
 
14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  class OperatorError(Exception):
16
  def __init__(self, exception: Exception, operators: List[Operator]):
17
  super().__init__(
@@ -31,7 +58,7 @@ class OperatorError(Exception):
31
  return cls(exception, [operator])
32
 
33
 
34
- class StreamingOperator(Artifact):
35
  """Base class for all stream operators in the streaming model.
36
 
37
  Stream operators are a key component of the streaming model and are responsible for processing continuous data streams.
@@ -149,7 +176,7 @@ class MultiStreamOperator(StreamingOperator):
149
  pass
150
 
151
  def process_instance(self, instance, stream_name="tmp"):
152
- multi_stream = {stream_name: [instance]}
153
  processed_multi_stream = self(multi_stream)
154
  return next(iter(processed_multi_stream[stream_name]))
155
 
@@ -168,7 +195,7 @@ class SingleStreamOperator(MultiStreamOperator):
168
  apply_to_streams: List[str] = NonPositionalField(
169
  default=None
170
  ) # None apply to all streams
171
- dont_apply_to_streams: List[str] = NonPositionalField(default_factory=None)
172
 
173
  def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
174
  result = {}
 
4
  from typing import Any, Dict, Generator, List, Optional
5
 
6
  from .artifact import Artifact
7
+ from .dataclass import InternalField, NonPositionalField
8
  from .stream import MultiStream, Stream
9
+ from .utils import is_module_available
10
 
11
 
12
  class Operator(Artifact):
13
  pass
14
 
15
 
16
+ class PackageRequirementsMixin(Artifact):
17
+ _requirements_list: List[str] = InternalField(default_factory=list)
18
+
19
+ def verify(self):
20
+ super().verify()
21
+ self.check_missing_requirements()
22
+
23
+ def check_missing_requirements(self, requirements=None):
24
+ if requirements is None:
25
+ requirements = self._requirements_list
26
+ missing_packages = []
27
+ for package in requirements:
28
+ if not is_module_available(package):
29
+ missing_packages.append(package)
30
+ if missing_packages:
31
+ raise MissingRequirementsError(self.__class__.__name__, missing_packages)
32
+
33
+
34
+ class MissingRequirementsError(Exception):
35
+ def __init__(self, class_name, missing_packages):
36
+ self.class_name = class_name
37
+ self.missing_packages = missing_packages
38
+ self.message = f"{self.class_name} requires the following missing package(s): {', '.join(self.missing_packages)}"
39
+ super().__init__(self.message)
40
+
41
+
42
  class OperatorError(Exception):
43
  def __init__(self, exception: Exception, operators: List[Operator]):
44
  super().__init__(
 
58
  return cls(exception, [operator])
59
 
60
 
61
+ class StreamingOperator(Operator, PackageRequirementsMixin):
62
  """Base class for all stream operators in the streaming model.
63
 
64
  Stream operators are a key component of the streaming model and are responsible for processing continuous data streams.
 
176
  pass
177
 
178
  def process_instance(self, instance, stream_name="tmp"):
179
+ multi_stream = MultiStream({stream_name: [instance]})
180
  processed_multi_stream = self(multi_stream)
181
  return next(iter(processed_multi_stream[stream_name]))
182
 
 
195
  apply_to_streams: List[str] = NonPositionalField(
196
  default=None
197
  ) # None apply to all streams
198
+ dont_apply_to_streams: List[str] = NonPositionalField(default=None)
199
 
200
  def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
201
  result = {}