Elron commited on
Commit
e7f788e
·
1 Parent(s): 4868000

Upload operator.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operator.py +255 -0
operator.py ADDED
@@ -0,0 +1,255 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from .stream import MultiStream, Stream
2
+ from .artifact import Artifact
3
+
4
+ from abc import abstractmethod
5
+ from typing import Optional, List, Dict, Generator, Union, Any
6
+ from dataclasses import field
7
+
8
+
9
+ class Operator(Artifact):
10
+ pass
11
+
12
+
13
+ class OperatorError(Exception):
14
+ def __init__(self, exception: Exception, operators: List[Operator]):
15
+ super().__init__(
16
+ "This error was raised by the following operators: "
17
+ + ",\n".join([str(operator) for operator in operators])
18
+ + "."
19
+ )
20
+ self.exception = exception
21
+ self.operators = operators
22
+
23
+ @classmethod
24
+ def from_operator_error(cls, exception: Exception, operator: Operator):
25
+ return cls(exception.exception, [*exception.operators, operator])
26
+
27
+ @classmethod
28
+ def from_exception(cls, exception: Exception, operator: Operator):
29
+ return cls(exception, [operator])
30
+
31
+
32
+ class StreamingOperator(Artifact):
33
+ @abstractmethod
34
+ def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
35
+ pass
36
+
37
+
38
+ class StreamSource(StreamingOperator):
39
+ @abstractmethod
40
+ def __call__(self) -> MultiStream:
41
+ pass
42
+
43
+
44
+ class SourceOperator(StreamSource):
45
+ def __call__(self) -> MultiStream:
46
+ return self.process()
47
+
48
+ @abstractmethod
49
+ def process(self) -> MultiStream:
50
+ pass
51
+
52
+
53
+ class StreamInitializerOperator(StreamSource):
54
+ def __call__(self, *args, **kwargs) -> MultiStream:
55
+ return self.process(*args, **kwargs)
56
+
57
+ @abstractmethod
58
+ def process(self, *args, **kwargs) -> MultiStream:
59
+ pass
60
+
61
+
62
+ class MultiStreamOperator(StreamingOperator):
63
+ def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
64
+ return self._process_multi_stream(multi_stream)
65
+
66
+ def _process_multi_stream(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
67
+ result = self.process(multi_stream)
68
+ assert isinstance(result, MultiStream), "MultiStreamOperator must return a MultiStream"
69
+ return result
70
+
71
+ @abstractmethod
72
+ def process(self, multi_stream: MultiStream) -> MultiStream:
73
+ pass
74
+
75
+
76
+ class SingleStreamOperator(MultiStreamOperator):
77
+ def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
78
+ result = {}
79
+ for stream_name, stream in multi_stream.items():
80
+ stream = self._process_single_stream(stream, stream_name)
81
+ assert isinstance(stream, Stream), "SingleStreamOperator must return a Stream"
82
+ result[stream_name] = stream
83
+
84
+ return MultiStream(result)
85
+
86
+ def _process_single_stream(self, stream: Stream, stream_name: str = None) -> Stream:
87
+ return Stream(self._process_stream, gen_kwargs={"stream": stream, "stream_name": stream_name})
88
+
89
+ def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
90
+ yield from self.process(stream, stream_name)
91
+
92
+ @abstractmethod
93
+ def process(self, stream: Stream, stream_name: str = None) -> Generator:
94
+ pass
95
+
96
+
97
+ # class StreamGeneratorOperator(SingleStreamOperator):
98
+
99
+ # def stream(self, stream):
100
+ # return Stream(self.process, gen_kwargs={'stream': stream})
101
+
102
+ # @abstractmethod
103
+ # def process(self, stream: Stream) -> Generator:
104
+ # yield None
105
+
106
+
107
+ class SingleStreamReducer(StreamingOperator):
108
+ def __call__(self, multi_stream: Optional[MultiStream] = None) -> Dict[str, Any]:
109
+ result = {}
110
+ for stream_name, stream in multi_stream.items():
111
+ stream = self.process(stream)
112
+ result[stream_name] = stream
113
+
114
+ return result
115
+
116
+ @abstractmethod
117
+ def process(self, stream: Stream) -> Any:
118
+ pass
119
+
120
+
121
+ class StreamInstanceOperator(SingleStreamOperator):
122
+ def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
123
+ for instance in stream:
124
+ yield self._process_instance(instance, stream_name)
125
+
126
+ def _process_instance(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
127
+ return self.process(instance, stream_name)
128
+
129
+ @abstractmethod
130
+ def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
131
+ pass
132
+
133
+
134
+ class StreamInstanceOperatorValidator(StreamInstanceOperator):
135
+ @abstractmethod
136
+ def validate(self, instance):
137
+ pass
138
+
139
+ def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
140
+ iterator = iter(stream)
141
+ first_instance = next(iterator)
142
+ result = self._process_instance(first_instance, stream_name)
143
+ self.validate(result)
144
+ yield result
145
+ yield from (self._process_instance(instance, stream_name) for instance in iterator)
146
+
147
+
148
+ class InstanceOperator(Artifact):
149
+ def __call__(self, data: dict) -> dict:
150
+ return self.process(data)
151
+
152
+ @abstractmethod
153
+ def process(self, data: dict) -> dict:
154
+ pass
155
+
156
+
157
+ class FieldOperator(Artifact):
158
+ def __call__(self, data: Dict[str, Any], field: str) -> dict:
159
+ value = self.process(data[field])
160
+ data[field] = value
161
+ return data
162
+
163
+ @abstractmethod
164
+ def process(self, value: Any) -> Any:
165
+ pass
166
+
167
+
168
+ # class NamedStreamInstanceOperator(StreamingOperator):
169
+
170
+ # def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
171
+ # result = {}
172
+ # for stream_name, stream in multi_stream.items():
173
+ # stream = Stream(self.generator, gen_kwargs={'stream': stream, 'stream_name': stream_name})
174
+ # result[stream_name] = stream
175
+ # return MultiStream(result)
176
+
177
+ # def verify_first_instance(self, instance):
178
+ # pass
179
+
180
+ # def generator(self, stream, stream_name):
181
+ # iterator = iter(stream)
182
+ # first_instance = next(iterator)
183
+ # result = self.process(first_instance, stream_name)
184
+ # self.verify_first_instance(result)
185
+ # yield result
186
+ # yield from (self.process(instance) for instance in iterator)
187
+
188
+ # @abstractmethod
189
+ # def process(self, instance: dict, stream_name: str) -> dict:
190
+ # pass
191
+
192
+
193
+ class InstanceOperatorWithGlobalAccess(StreamingOperator):
194
+ accessible_streams: Union[MultiStream, List[str]] = None
195
+ cache_accessible_streams: bool = True
196
+
197
+ def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
198
+ result = {}
199
+
200
+ if isinstance(self.accessible_streams, list):
201
+ # cache the accessible streams:
202
+ self.accessible_streams = MultiStream(
203
+ {stream_name: multi_stream[stream_name] for stream_name in self.accessible_streams}
204
+ )
205
+
206
+ if self.cache_accessible_streams:
207
+ for stream in self.accessible_streams.values():
208
+ stream.set_caching(True)
209
+
210
+ for stream_name, stream in multi_stream.items():
211
+ stream = Stream(self.generator, gen_kwargs={"stream": stream, "multi_stream": self.accessible_streams})
212
+ result[stream_name] = stream
213
+
214
+ return MultiStream(result)
215
+
216
+ def generator(self, stream, multi_stream):
217
+ yield from (self.process(instance, multi_stream) for instance in stream)
218
+
219
+ @abstractmethod
220
+ def process(self, instance: dict, multi_stream: MultiStream) -> dict:
221
+ pass
222
+
223
+
224
+ class SequntialOperator(MultiStreamOperator):
225
+ steps: List[StreamingOperator] = field(default_factory=list)
226
+
227
+ def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
228
+ for operator in self.steps:
229
+ multi_stream = operator(multi_stream)
230
+ return multi_stream
231
+
232
+
233
+ class SourceSequntialOperator(SequntialOperator):
234
+ def __call__(self) -> MultiStream:
235
+ return super().__call__()
236
+
237
+ def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
238
+ multi_stream = self.steps[0]()
239
+ for operator in self.steps[1:]:
240
+ multi_stream = operator(multi_stream)
241
+ return multi_stream
242
+
243
+
244
+ class SequntialOperatorInitilizer(SequntialOperator):
245
+ def __call__(self, *args, **kwargs) -> MultiStream:
246
+ return self.process(*args, **kwargs)
247
+
248
+ def process(self, *args, **kwargs) -> MultiStream:
249
+ assert isinstance(
250
+ self.steps[0], StreamInitializerOperator
251
+ ), "The first step in a SequntialOperatorInitilizer must be a StreamInitializerOperator"
252
+ multi_stream = self.steps[0](*args, **kwargs)
253
+ for operator in self.steps[1:]:
254
+ multi_stream = operator(multi_stream)
255
+ return multi_stream