metric / standard.py
Elron's picture
Upload folder using huggingface_hub
9245edf verified
raw
history blame
20.9 kB
from typing import List, Optional, Union
from .artifact import fetch_artifact
from .augmentors import Augmentor, NullAugmentor
from .card import TaskCard
from .collections_operators import GetLength
from .dataclass import Field, InternalField, NonPositionalField, OptionalField
from .error_utils import UnitxtError
from .formats import Format, SystemFormat
from .logging_utils import get_logger
from .operator import SequentialOperator, SourceSequentialOperator, StreamingOperator
from .operators import Set, StreamRefiner
from .recipe import Recipe
from .schema import FinalizeDataset
from .serializers import SingleTypeSerializer
from .settings_utils import get_constants, get_settings
from .splitters import ConstantSizeSample, RandomSizeSample, Sampler, SeparateSplit
from .stream import MultiStream
from .system_prompts import EmptySystemPrompt, SystemPrompt
from .task import Task
from .templates import ApplyRandomTemplate, ApplySingleTemplate, Template, TemplatesList
from .type_utils import isoftype
from .utils import LRUCache
constants = get_constants()
settings = get_settings()
logger = get_logger()
# Used to give meaningful name to recipe steps
class CreateDemosPool(SeparateSplit):
pass
class BaseRecipe(Recipe, SourceSequentialOperator):
# Base parameters
card: TaskCard = None
task: Task = None
template: Union[Template, List[Template], TemplatesList] = None
system_prompt: SystemPrompt = Field(default_factory=EmptySystemPrompt)
format: Format = None
serializer: Union[SingleTypeSerializer, List[SingleTypeSerializer]] = None
# Additional parameters
template_card_index: int = NonPositionalField(default=None)
metrics: List[str] = NonPositionalField(default=None)
postprocessors: List[str] = NonPositionalField(default=None)
group_by: List[Union[str, List[str]]] = []
loader_limit: int = None
max_train_instances: int = None
max_validation_instances: int = None
max_test_instances: int = None
train_refiner: StreamRefiner = OptionalField(default_factory=StreamRefiner)
validation_refiner: StreamRefiner = OptionalField(default_factory=StreamRefiner)
test_refiner: StreamRefiner = OptionalField(default_factory=StreamRefiner)
demos_pool_size: int = None
num_demos: Optional[Union[int, List[int]]] = 0
demos_removed_from_data: bool = True
demos_pool_name: str = "demos_pool"
demos_taken_from: str = "train"
demos_field: str = "demos"
sampler: Sampler = None
augmentor: Union[Augmentor, List[Augmentor]] = OptionalField(default=None)
steps: List[StreamingOperator] = InternalField(default_factory=list)
# shared class cache
_demos_pool_cache = LRUCache(max_size=10)
def before_process_multi_stream(self):
super().before_process_multi_stream()
@property
def max_demos_size(self):
if isinstance(self.num_demos, list):
return max(self.num_demos)
return self.num_demos
def verify(self):
super().verify()
if self.task is None and self.card is None:
raise ValueError("Set card or task in the recipe")
if self.card is None and (
self.num_demos > 0 or self.demos_pool_size is not None
):
raise ValueError(
"To use num_demos and demos_pool_size in recipe set a card."
)
if self.use_demos:
if self.demos_pool_size is None or self.demos_pool_size < 1:
raise ValueError(
"When using demonstrations both num_demos and demos_pool_size should be assigned with positive integers."
)
if self.demos_pool_size < self.max_demos_size:
raise ValueError(
f"num_demos (got: {self.max_demos_size}) should not exceed demos_pool_size (got: {self.demos_pool_size})"
)
if self.loader_limit and self.demos_pool_size > self.loader_limit:
raise ValueError(
f"demos_pool_size should not exceed loader_limit ({self.loader_limit}), Got demos_pool_size={self.demos_pool_size}"
)
if self.loader_limit:
if self.max_test_instances and self.max_test_instances > self.loader_limit:
raise ValueError(
f"max_test_instances should not exceed loader_limit ({self.loader_limit}), Got max_test_instances={self.max_test_instances}"
)
if (
self.max_validation_instances
and self.max_validation_instances > self.loader_limit
):
raise ValueError(
f"max_validation_instances should not exceed loader_limit ({self.loader_limit}), Got max_validation_instances={self.max_validation_instances}"
)
if (
self.max_train_instances
and self.max_train_instances > self.loader_limit
):
raise ValueError(
f"max_train_instances should not exceed loader_limit ({self.loader_limit}), Got max_train_instances={self.max_train_instances}"
)
if self.metrics is not None and not isinstance(self.metrics, List):
raise ValueError(
f"metrics must be a list of metrics. Got metrics = {self.metrics}"
)
if self.postprocessors is not None and not isinstance(
self.postprocessors, List
):
raise ValueError(
f"post processors must be a list of post processor. Got postprocessors = {self.postprocessors}"
)
if self.format is not None and not isinstance(self.format, Format):
raise ValueError(
f"format parameter must be a list of of class derived from Format. Got format = {self.format}"
)
if self.template is None:
raise ValueError(
"You must set in the recipe either `template`, `template_card_index`."
)
if isinstance(self.template, list):
for template in self.template:
self.verify_template(template)
else:
self.verify_template(self.template)
if self.serializer is not None:
if not isinstance(self.serializer, list):
self.serializer = [self.serializer]
self.template.serializer.add_serializers(self.serializer)
def prepare_refiners(self):
self.train_refiner.max_instances = self.max_train_instances
self.train_refiner.apply_to_streams = ["train"]
self.processing.steps.append(self.train_refiner)
self.validation_refiner.max_instances = self.max_validation_instances
self.validation_refiner.apply_to_streams = ["validation"]
self.processing.steps.append(self.validation_refiner)
self.test_refiner.max_instances = self.max_test_instances
self.test_refiner.apply_to_streams = ["test"]
self.processing.steps.append(self.test_refiner)
def verify_template(self, template):
if not isinstance(template, Template):
raise ValueError(
f"template argument must be an object of type Template. Got template = {template}"
)
def set_pipelines(self):
self.loading = SequentialOperator(
__description__="Loading the data from the data source."
)
self.metadata = SequentialOperator(
__description__="Adding metadata (e.g. format, system prompt, template) "
)
self.standardization = SequentialOperator(
__description__="Standardizing the raw dataset fields to task field definition."
)
self.processing = SequentialOperator(
__description__="Setting task fields (and selecting demos per sample if needed)."
)
self.verbalization = SequentialOperator()
self.verbalization.__description__ = "Verbalizing the input to the model and gold references to the 'source', 'target' and 'references' fields."
self.finalize = SequentialOperator()
self.finalize.__description__ = "Adding post processors. Removing intermediate fields. Creating the final output dataset."
self.steps = [
self.loading,
self.metadata,
self.standardization,
self.processing,
self.metadata,
self.verbalization,
self.finalize,
]
self.inference_instance = SequentialOperator()
self.inference_instance.steps = [
self.metadata,
self.processing,
self.metadata,
]
self.inference_demos = SourceSequentialOperator()
self.inference_demos.steps = [
self.loading,
self.metadata,
self.standardization,
self.processing,
self.metadata,
]
self.inference = SequentialOperator()
self.inference.steps = [self.metadata, self.verbalization, self.finalize]
def production_preprocess(self, task_instances):
ms = MultiStream.from_iterables({constants.inference_stream: task_instances})
return list(self.inference_instance(ms)[constants.inference_stream])
def production_demos_pool(self):
if self.use_demos:
demos_pool = self.__class__._demos_pool_cache.get(str(self), None)
if demos_pool is None:
demos_pool = list(self.inference_demos()[self.demos_pool_name])
self.__class__._demos_pool_cache[str(self)] = demos_pool
return demos_pool
return []
@property
def has_custom_demos_pool(self):
return self.demos_pool_size is not None and self.demos_pool_size > 0
@property
def use_demos(self):
return self.num_demos is not None and self.max_demos_size > 0
def produce(self, task_instances):
"""Use the recipe in production to produce model ready query from standard task instance."""
self.before_process_multi_stream()
streams = {
constants.inference_stream: self.production_preprocess(task_instances),
}
if self.use_demos:
streams[self.demos_pool_name] = self.production_demos_pool()
multi_stream = MultiStream.from_iterables(streams)
multi_stream = self.inference(multi_stream)
return list(multi_stream[constants.inference_stream])
def reset(self):
self.reset_pipeline()
def reset_pipeline(self):
if self.format is None:
if settings.default_format is not None:
self.format, _ = fetch_artifact(settings.default_format)
else:
self.format = SystemFormat()
if self.card and self.card.preprocess_steps is None:
self.card.preprocess_steps = []
if self.task is None:
self.task = self.card.task
self.set_pipelines()
if self.card is not None:
loader = self.card.loader
if self.loader_limit:
loader.loader_limit = self.loader_limit
logger.info(f"Loader line limit was set to {self.loader_limit}")
self.loading.steps.append(loader)
# This is required in case loader_limit is not enforced by the loader
if self.loader_limit:
self.loading.steps.append(
StreamRefiner(max_instances=self.loader_limit)
)
self.metadata.steps.append(
Set(
fields={
"recipe_metadata/system_prompt": self.system_prompt,
"recipe_metadata/format": self.format,
}
)
)
if self.card:
self.standardization.steps.extend(self.card.preprocess_steps)
self.processing.steps.append(self.task)
if self.augmentor is not None and not isoftype(self.augmentor, NullAugmentor):
if (
self.card.task.augmentable_inputs is None
or len(self.task.augmentable_inputs) == 0
):
raise UnitxtError(
f"You specified augmentor in the recipe but the got task without augmentable_inputs: {self.task}"
)
if not isinstance(self.augmentor, list):
self.augmentor = [self.augmentor]
for augmentor in self.augmentor:
augmentor.set_fields(self.card.task.augmentable_inputs)
self.processing.steps.append(augmentor)
if self.has_custom_demos_pool:
self.processing.steps.append(
CreateDemosPool(
from_split=self.demos_taken_from,
to_split_names=[self.demos_pool_name, self.demos_taken_from],
to_split_sizes=[int(self.demos_pool_size)],
remove_targets_from_source_split=self.demos_removed_from_data,
)
)
if self.use_demos:
if self.sampler is None:
if self.card.sampler is None:
raise ValueError(
"Unexpected None value for card.sampler. "
"To use num_demos > 0, please set a sampler on the TaskCard."
)
self.sampler = self.card.sampler
self.prepare_refiners()
if self.use_demos:
if isinstance(self.num_demos, int):
self.verbalization.steps.append(
ConstantSizeSample(
from_stream=self.demos_pool_name,
to_field=self.demos_field,
sampler=self.sampler,
sample_size=self.num_demos,
)
)
self.verbalization.steps.append(
Set(fields={"recipe_metadata/num_demos": self.num_demos})
)
elif isinstance(self.num_demos, list):
self.verbalization.steps.append(
RandomSizeSample(
from_stream=self.demos_pool_name,
to_field=self.demos_field,
sampler=self.sampler,
sample_sizes=self.num_demos,
)
)
self.verbalization.steps.append(
GetLength(field="demos", to_field="recipe_metadata/num_demos")
)
else:
raise ValueError("num_demos must be int or List[int]")
if isinstance(self.template, list):
self.verbalization.steps.append(
ApplyRandomTemplate(
templates=self.template, demos_field=self.demos_field
)
)
else:
self.verbalization.steps.append(
ApplySingleTemplate(
template=self.template, demos_field=self.demos_field
)
)
else:
self.verbalization.steps.append(
Set(fields={"recipe_metadata/num_demos": 0})
)
if isinstance(self.template, list):
self.verbalization.steps.append(
ApplyRandomTemplate(templates=self.template)
)
else:
self.verbalization.steps.append(
ApplySingleTemplate(template=self.template)
)
self.verbalization.steps.append(self.system_prompt)
self.verbalization.steps.append(self.format)
if self.postprocessors is not None:
self.finalize.steps.append(
Set(fields={"postprocessors": self.postprocessors})
)
if self.metrics is not None:
self.finalize.steps.append(Set(fields={"metrics": self.metrics}))
self.finalize.steps.append(FinalizeDataset(group_by=self.group_by))
def prepare(self):
if isinstance(self.template, TemplatesList):
self.template = self.template.items
self.reset_pipeline()
class StandardRecipeWithIndexes(BaseRecipe):
template_card_index: int = None
def prepare(self):
assert (
self.template_card_index is None or self.template is None
), f"Specify either template ({self.template}) or template_card_index ({self.template_card_index}) but not both"
if self.template_card_index is None and self.template is None:
if self.card is not None:
self.template_card_index = (
0
if isinstance(self.card.templates, list)
else next(iter(self.card.templates.keys()))
)
logger.warning(
"Template was not specified in recipe, using the first template from the card by default."
)
else:
raise ValueError(
"Specify a template or template_card_index, or a card to get a default template from."
)
if self.template_card_index is not None:
try:
self.template = self.card.templates[self.template_card_index]
except Exception as e:
if isinstance(self.card.templates, dict):
options = list(self.card.templates.keys())
else:
options = list(range(0, len(self.card.templates)))
raise ValueError(
f"card_template_index '{self.template_card_index}' is not defined in card. Possible card_template_index options: {options}"
) from e
super().prepare()
class StandardRecipe(StandardRecipeWithIndexes):
"""This class represents a standard recipe for data processing and preparation.
This class can be used to prepare a recipe.
with all necessary steps, refiners and renderers included. It allows to set various
parameters and steps in a sequential manner for preparing the recipe.
Attributes:
card (TaskCard): TaskCard object associated with the recipe.
template (Template, optional): Template object to be used for the recipe.
system_prompt (SystemPrompt, optional): SystemPrompt object to be used for the recipe.
loader_limit (int, optional): Specifies the maximum number of instances per stream to be returned from the loader (used to reduce loading time in large datasets)
format (SystemFormat, optional): SystemFormat object to be used for the recipe.
metrics (List[str]): list of catalog metrics to use with this recipe.
postprocessors (List[str]): list of catalog processors to apply at post processing. (Not recommended to use from here)
group_by (List[Union[str, List[str]]]): list of task_data or metadata keys to group global scores by.
train_refiner (StreamRefiner, optional): Train refiner to be used in the recipe.
max_train_instances (int, optional): Maximum training instances for the refiner.
validation_refiner (StreamRefiner, optional): Validation refiner to be used in the recipe.
max_validation_instances (int, optional): Maximum validation instances for the refiner.
test_refiner (StreamRefiner, optional): Test refiner to be used in the recipe.
max_test_instances (int, optional): Maximum test instances for the refiner.
demos_pool_size (int, optional): Size of the demos pool.
num_demos (int, optional): Number of demos to be used.
demos_pool_name (str, optional): Name of the demos pool. Default is "demos_pool".
demos_taken_from (str, optional): Specifies from where the demos are taken. Default is "train".
demos_field (str, optional): Field name for demos. Default is "demos".
demos_removed_from_data (bool, optional): whether to remove the demos from the source data, Default is True
sampler (Sampler, optional): The Sampler used to select the demonstrations when num_demos > 0.
steps (List[StreamingOperator], optional): List of StreamingOperator objects to be used in the recipe.
augmentor (Augmentor) : Augmentor to be used to pseudo randomly augment the source text
instruction_card_index (int, optional): Index of instruction card to be used for preparing the recipe.
template_card_index (int, optional): Index of template card to be used for preparing the recipe.
Methods:
prepare(): This overridden method is used for preparing the recipe
by arranging all the steps, refiners, and renderers in a sequential manner.
Raises:
AssertionError: If both template and template_card_index are specified at the same time.
"""
pass