Upload operators.py with huggingface_hub
Browse files- operators.py +32 -22
operators.py
CHANGED
@@ -180,7 +180,7 @@ class MapInstanceValues(StreamInstanceOperator):
|
|
180 |
if value is not None:
|
181 |
if (self.process_every_value is True) and (not isinstance(value, list)):
|
182 |
raise ValueError(
|
183 |
-
f"'process_every_field' == True is allowed only when all fields which have mappers, i.e., {list(self.mappers.keys())} are lists.
|
184 |
)
|
185 |
if isinstance(value, list) and self.process_every_value:
|
186 |
for i, val in enumerate(value):
|
@@ -285,7 +285,7 @@ class RemoveFields(StreamInstanceOperator):
|
|
285 |
return instance
|
286 |
|
287 |
|
288 |
-
class
|
289 |
"""A general stream instance operator that processes the values of a field (or multiple ones).
|
290 |
|
291 |
Args:
|
@@ -336,7 +336,7 @@ class FieldOperator(StreamInstanceOperator):
|
|
336 |
# self._field_to_field is built explicitly by pairs, or copied from argument 'field_to_field'
|
337 |
if self.field_to_field is None:
|
338 |
return
|
339 |
-
# for backward compatibility also allow list of
|
340 |
if isoftype(self.field_to_field, List[List[str]]) or isoftype(
|
341 |
self.field_to_field, List[Tuple[str, str]]
|
342 |
):
|
@@ -365,7 +365,7 @@ class FieldOperator(StreamInstanceOperator):
|
|
365 |
)
|
366 |
|
367 |
@abstractmethod
|
368 |
-
def
|
369 |
pass
|
370 |
|
371 |
def prepare(self):
|
@@ -408,9 +408,12 @@ class FieldOperator(StreamInstanceOperator):
|
|
408 |
) from e
|
409 |
try:
|
410 |
if self.process_every_value:
|
411 |
-
new_value = [
|
|
|
|
|
|
|
412 |
else:
|
413 |
-
new_value = self.
|
414 |
except Exception as e:
|
415 |
raise ValueError(
|
416 |
f"Failed to process '{from_field}' from {instance} due to : {e}"
|
@@ -427,6 +430,15 @@ class FieldOperator(StreamInstanceOperator):
|
|
427 |
return instance
|
428 |
|
429 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
430 |
class RenameFields(FieldOperator):
|
431 |
"""Renames fields.
|
432 |
|
@@ -773,11 +785,11 @@ class Apply(StreamInstanceOperator):
|
|
773 |
return ".".join(parts)
|
774 |
|
775 |
def str_to_function(self, function_str: str) -> Callable:
|
776 |
-
|
777 |
-
if len(
|
778 |
-
return __builtins__[
|
779 |
|
780 |
-
module_name, function_name =
|
781 |
if module_name in __builtins__:
|
782 |
obj = __builtins__[module_name]
|
783 |
elif module_name in globals():
|
@@ -892,34 +904,32 @@ class TakeByField(StreamInstanceOperator):
|
|
892 |
return instance
|
893 |
|
894 |
|
895 |
-
class
|
896 |
-
"""Slightly
|
897 |
|
898 |
When task was classification, argument 'select_from' can be used to list the other potential classes, as a
|
899 |
relevant perturbation
|
900 |
"""
|
901 |
|
902 |
select_from: List[Any] = []
|
903 |
-
|
904 |
|
905 |
def verify(self):
|
906 |
assert (
|
907 |
-
0 <= self.
|
908 |
-
), f"'
|
909 |
|
910 |
def prepare(self):
|
911 |
super().prepare()
|
912 |
self.random_generator = new_random_generator(sub_seed="CopyWithPerturbation")
|
913 |
|
914 |
def process_value(self, value: Any) -> Any:
|
915 |
-
|
916 |
-
|
917 |
-
)
|
918 |
-
if not perturbate:
|
919 |
return value
|
920 |
|
921 |
if value in self.select_from:
|
922 |
-
# 80% of cases, return a decent class, otherwise,
|
923 |
if self.random_generator.random() < 0.8:
|
924 |
return self.random_generator.choice(self.select_from)
|
925 |
|
@@ -1397,7 +1407,7 @@ class ExtractMostCommonFieldValues(MultiStreamOperator):
|
|
1397 |
else:
|
1398 |
# content of 'field' is a list and process_every_value == True: add one occurrence on behalf of each individual value
|
1399 |
counter.update(instance[self.field])
|
1400 |
-
# here counter counts occurrences of individual values, or
|
1401 |
values_and_counts = counter.most_common()
|
1402 |
if self.overall_top_frequency_percent < 100:
|
1403 |
top_frequency = (
|
@@ -1606,7 +1616,7 @@ class ApplyMetric(SingleStreamOperator, ArtifactFetcherMixin):
|
|
1606 |
# by the first listed metric (as desired).
|
1607 |
metric_names = list(reversed(metric_names))
|
1608 |
|
1609 |
-
# Workaround: The metric/MetricPipeline modifies the stream itself,
|
1610 |
# for further metrics' processing, instead of just modifying the score field.
|
1611 |
# Here we keep all the fields besides the score, and restore them after the metric finishes.
|
1612 |
first_instance = stream.peek()
|
|
|
180 |
if value is not None:
|
181 |
if (self.process_every_value is True) and (not isinstance(value, list)):
|
182 |
raise ValueError(
|
183 |
+
f"'process_every_field' == True is allowed only when all fields which have mappers, i.e., {list(self.mappers.keys())} are lists. Instance = {instance}"
|
184 |
)
|
185 |
if isinstance(value, list) and self.process_every_value:
|
186 |
for i, val in enumerate(value):
|
|
|
285 |
return instance
|
286 |
|
287 |
|
288 |
+
class InstanceFieldOperator(StreamInstanceOperator):
|
289 |
"""A general stream instance operator that processes the values of a field (or multiple ones).
|
290 |
|
291 |
Args:
|
|
|
336 |
# self._field_to_field is built explicitly by pairs, or copied from argument 'field_to_field'
|
337 |
if self.field_to_field is None:
|
338 |
return
|
339 |
+
# for backward compatibility also allow list of tuples of two strings
|
340 |
if isoftype(self.field_to_field, List[List[str]]) or isoftype(
|
341 |
self.field_to_field, List[Tuple[str, str]]
|
342 |
):
|
|
|
365 |
)
|
366 |
|
367 |
@abstractmethod
|
368 |
+
def process_instance_value(self, value: Any, instance: Dict[str, Any]):
|
369 |
pass
|
370 |
|
371 |
def prepare(self):
|
|
|
408 |
) from e
|
409 |
try:
|
410 |
if self.process_every_value:
|
411 |
+
new_value = [
|
412 |
+
self.process_instance_value(value, instance)
|
413 |
+
for value in old_value
|
414 |
+
]
|
415 |
else:
|
416 |
+
new_value = self.process_instance_value(old_value, instance)
|
417 |
except Exception as e:
|
418 |
raise ValueError(
|
419 |
f"Failed to process '{from_field}' from {instance} due to : {e}"
|
|
|
430 |
return instance
|
431 |
|
432 |
|
433 |
+
class FieldOperator(InstanceFieldOperator):
|
434 |
+
def process_instance_value(self, value: Any, instance: Dict[str, Any]):
|
435 |
+
return self.process_value(value)
|
436 |
+
|
437 |
+
@abstractmethod
|
438 |
+
def process_value(self, value: Any) -> Any:
|
439 |
+
pass
|
440 |
+
|
441 |
+
|
442 |
class RenameFields(FieldOperator):
|
443 |
"""Renames fields.
|
444 |
|
|
|
785 |
return ".".join(parts)
|
786 |
|
787 |
def str_to_function(self, function_str: str) -> Callable:
|
788 |
+
parts = function_str.split(".", 1)
|
789 |
+
if len(parts) == 1:
|
790 |
+
return __builtins__[parts[0]]
|
791 |
|
792 |
+
module_name, function_name = parts
|
793 |
if module_name in __builtins__:
|
794 |
obj = __builtins__[module_name]
|
795 |
elif module_name in globals():
|
|
|
904 |
return instance
|
905 |
|
906 |
|
907 |
+
class Perturb(FieldOperator):
|
908 |
+
"""Slightly perturbs the contents of 'field'. Could be Handy for imitating prediction from given target.
|
909 |
|
910 |
When task was classification, argument 'select_from' can be used to list the other potential classes, as a
|
911 |
relevant perturbation
|
912 |
"""
|
913 |
|
914 |
select_from: List[Any] = []
|
915 |
+
percentage_to_perturb: int = 1 # 1 percent
|
916 |
|
917 |
def verify(self):
|
918 |
assert (
|
919 |
+
0 <= self.percentage_to_perturb and self.percentage_to_perturb <= 100
|
920 |
+
), f"'percentage_to_perturb' should be in the range 0..100. Received {self.percentage_to_perturb}"
|
921 |
|
922 |
def prepare(self):
|
923 |
super().prepare()
|
924 |
self.random_generator = new_random_generator(sub_seed="CopyWithPerturbation")
|
925 |
|
926 |
def process_value(self, value: Any) -> Any:
|
927 |
+
perturb = self.random_generator.randint(1, 100) <= self.percentage_to_perturb
|
928 |
+
if not perturb:
|
|
|
|
|
929 |
return value
|
930 |
|
931 |
if value in self.select_from:
|
932 |
+
# 80% of cases, return a decent class, otherwise, perturb the value itself as follows
|
933 |
if self.random_generator.random() < 0.8:
|
934 |
return self.random_generator.choice(self.select_from)
|
935 |
|
|
|
1407 |
else:
|
1408 |
# content of 'field' is a list and process_every_value == True: add one occurrence on behalf of each individual value
|
1409 |
counter.update(instance[self.field])
|
1410 |
+
# here counter counts occurrences of individual values, or tuples.
|
1411 |
values_and_counts = counter.most_common()
|
1412 |
if self.overall_top_frequency_percent < 100:
|
1413 |
top_frequency = (
|
|
|
1616 |
# by the first listed metric (as desired).
|
1617 |
metric_names = list(reversed(metric_names))
|
1618 |
|
1619 |
+
# Workaround: The metric/MetricPipeline modifies the stream itself, sometimes making it incompatible
|
1620 |
# for further metrics' processing, instead of just modifying the score field.
|
1621 |
# Here we keep all the fields besides the score, and restore them after the metric finishes.
|
1622 |
first_instance = stream.peek()
|