SkazuHD's picture
init space
d660b02
from datetime import datetime as dt
from pathlib import Path
import click
from loguru import logger
from llm_engineering import settings
from pipelines import (
digital_data_etl,
end_to_end_data,
evaluating,
export_artifact_to_json,
feature_engineering,
generate_datasets,
training,
)
from clearml import PipelineDecorator
import yaml
from pathlib import Path
def parse_yaml_config(config_path):
"""Parse YAML config file."""
with open(config_path, 'r') as file:
config = yaml.safe_load(file)
return config
@click.command(
help="""
LLM Engineering project CLI v0.0.1.
Main entry point for the pipeline execution.
This entrypoint is where everything comes together.
Run the ZenML LLM Engineering project pipelines with various options.
Run a pipeline with the required parameters. This executes
all steps in the pipeline in the correct order using the orchestrator
stack component that is configured in your active ZenML stack.
Examples:
\b
# Run the pipeline with default options
python run.py
\b
# Run the pipeline without cache
python run.py --no-cache
\b
# Run only the ETL pipeline
python run.py --only-etl
"""
)
@click.option(
"--no-cache",
is_flag=True,
default=False,
help="Disable caching for the pipeline run.",
)
@click.option(
"--run-end-to-end-data",
is_flag=True,
default=False,
help="Whether to run all the data pipelines in one go.",
)
@click.option(
"--run-etl",
is_flag=True,
default=False,
help="Whether to run the ETL pipeline.",
)
@click.option(
"--run-export-artifact-to-json",
is_flag=True,
default=False,
help="Whether to run the Artifact -> JSON pipeline",
)
@click.option(
"--etl-config-filename",
default="digital_data_etl_paul_iusztin.yaml",
help="Filename of the ETL config file.",
)
@click.option(
"--run-feature-engineering",
is_flag=True,
default=False,
help="Whether to run the FE pipeline.",
)
@click.option(
"--run-generate-instruct-datasets",
is_flag=True,
default=False,
help="Whether to run the instruct dataset generation pipeline.",
)
@click.option(
"--run-generate-preference-datasets",
is_flag=True,
default=False,
help="Whether to run the preference dataset generation pipeline.",
)
@click.option(
"--run-training",
is_flag=True,
default=False,
help="Whether to run the training pipeline.",
)
@click.option(
"--run-evaluation",
is_flag=True,
default=False,
help="Whether to run the evaluation pipeline.",
)
@click.option(
"--export-settings",
is_flag=True,
default=False,
help="Whether to export your settings to ZenML or not.",
)
def main(
no_cache: bool = False,
run_end_to_end_data: bool = False,
run_etl: bool = False,
etl_config_filename: str = "digital_data_etl_cs370.yaml",
run_export_artifact_to_json: bool = False,
run_feature_engineering: bool = False,
run_generate_instruct_datasets: bool = False,
run_generate_preference_datasets: bool = False,
run_training: bool = False,
run_evaluation: bool = False,
export_settings: bool = False,
) -> None:
assert (
run_end_to_end_data
or run_etl
or run_export_artifact_to_json
or run_feature_engineering
or run_generate_instruct_datasets
or run_generate_preference_datasets
or run_training
or run_evaluation
or export_settings
), "Please specify an action to run."
if export_settings:
logger.info("Exporting settings to ZenML secrets.")
settings.export()
pipeline_args = {
"enable_cache": not no_cache,
}
root_dir = Path(__file__).resolve().parent.parent
PipelineDecorator.run_locally()
if run_end_to_end_data:
run_args_end_to_end = {}
pipeline_args["config_path"] = root_dir / "configs" / "end_to_end_data.yaml"
assert pipeline_args["config_path"].exists(), f"Config file not found: {pipeline_args['config_path']}"
pipeline_args["run_name"] = f"end_to_end_data_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
run_args_end_to_end = parse_yaml_config(pipeline_args["config_path"])
end_to_end_data(**run_args_end_to_end.get("parameters"))
if run_etl:
run_args_etl = {}
pipeline_args["config_path"] = root_dir / "configs" / etl_config_filename
assert pipeline_args["config_path"].exists(), f"Config file not found: {pipeline_args['config_path']}"
pipeline_args["run_name"] = f"digital_data_etl_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
run_args_etl = parse_yaml_config(pipeline_args["config_path"])
digital_data_etl(**run_args_etl.get("parameters"))
if run_export_artifact_to_json:
run_args_etl = {}
pipeline_args["config_path"] = root_dir / "configs" / "export_artifact_to_json.yaml"
assert pipeline_args["config_path"].exists(), f"Config file not found: {pipeline_args['config_path']}"
pipeline_args["run_name"] = f"export_artifact_to_json_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
run_args_etl = parse_yaml_config(pipeline_args["config_path"])
export_artifact_to_json(**run_args_etl.get("parameters"))
if run_feature_engineering:
run_args_fe = {}
pipeline_args["config_path"] = root_dir / "configs" / "feature_engineering.yaml"
pipeline_args["run_name"] = f"feature_engineering_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
run_args_fe = parse_yaml_config(pipeline_args["config_path"])
logger.warning(pipeline_args)
logger.warning(run_args_fe)
feature_engineering(**run_args_fe.get("parameters"))
if run_generate_instruct_datasets:
run_args_cd = {}
pipeline_args["config_path"] = root_dir / "configs" / "generate_instruct_datasets.yaml"
pipeline_args["run_name"] = f"generate_instruct_datasets_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
run_args_cd = parse_yaml_config(pipeline_args["config_path"])
generate_datasets(**run_args_cd.get("parameters"))
if run_generate_preference_datasets:
run_args_cd = {}
pipeline_args["config_path"] = root_dir / "configs" / "generate_preference_datasets.yaml"
pipeline_args["run_name"] = f"generate_preference_datasets_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
run_args_cd = parse_yaml_config(pipeline_args["config_path"])
generate_datasets(**run_args_cd.get("parameters"))
if run_training:
run_args_cd = {}
pipeline_args["config_path"] = root_dir / "configs" / "training.yaml"
pipeline_args["run_name"] = f"training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
run_args_cd = parse_yaml_config(pipeline_args["config_path"])
training(**run_args_cd.get("parameters"))
if run_evaluation:
run_args_cd = {}
pipeline_args["config_path"] = root_dir / "configs" / "evaluating.yaml"
pipeline_args["run_name"] = f"evaluation_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
run_args_cd = parse_yaml_config(pipeline_args["config_path"])
evaluating(**run_args_cd.get("parameters"))
if __name__ == "__main__":
main()