{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": { "collapsed": true, "pycharm": { "name": "#%% md\n" } }, "source": [ "# Validate analytics JSON\n", "\n", "### ✅ Prerequisites\n", "\n", "[Python 3.10](https://www.python.org/downloads/)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from typing import Literal\n", "import json\n", "\n", "def read_json(filename: str, encoding=\"utf-8\"):\n", " with open(filename, mode=\"r\", encoding=encoding) as fp:\n", " return json.load(fp)\n", "\n", "\n", "def is_valid_model(model: dict) -> bool:\n", " if \"model_id\" not in model:\n", " raise ValueError(f\"Missing mandatory 'model_id' field in {model}\")\n", " if \"name\" not in model:\n", " raise ValueError(f\"Missing mandatory 'model_id' field in {model}\")\n", " if \"owner\" not in model:\n", " raise ValueError(f\"Missing mandatory 'model_id' field in {model}\")\n", "\n", " return True\n", "\n", "\n", "def is_valid_metric(metric: dict) -> bool:\n", " def is_valid_metric_value(metric_value: dict) -> bool:\n", " # Validate \"value\" field\n", " if \"value\" not in metric_value or not metric_value[\"value\"]:\n", " raise ValueError(f\"Missing mandatory 'value' field in {metric_value}\")\n", "\n", " if not (\n", " isinstance(metric_value[\"value\"], str)\n", " or isinstance(metric_value[\"value\"], float)\n", " or isinstance(metric_value[\"value\"], int)\n", " ):\n", " raise ValueError(\n", " f\"Invalid type: {type(metric_value['value'])} for 'value' field in {metric_value}\"\n", " )\n", "\n", " return True\n", "\n", " # Validate \"name\" field\n", " if \"name\" not in metric:\n", " raise ValueError(f\"Missing mandatory 'name' field in {metric}\")\n", "\n", " if not isinstance(metric[\"name\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(metric['name'])} for 'name' field in {metric}\"\n", " )\n", "\n", " # Validate \"author\" field\n", " if \"author\" not in metric:\n", " raise ValueError(f\"Missing mandatory 'name' field in {metric}\")\n", "\n", " if not isinstance(metric[\"author\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(metric['author'])} for 'author' field in {metric}\"\n", " )\n", "\n", " if metric[\"author\"] not in [\"human\", \"algorithm\"]:\n", " raise ValueError(f\"Unsupported author: {metric['author']} in {metric}\")\n", "\n", " # Validate \"type\" field\n", " if \"type\" not in metric:\n", " raise ValueError(f\"Missing mandatory 'type' field in {metric}\")\n", "\n", " if metric[\"type\"] not in [\"categorical\", \"numerical\", \"text\"]:\n", " raise ValueError(f\"Unsupported type: {metric['type']} in {metric}\")\n", "\n", " # Validate \"categorical\" type metric\n", " if metric[\"type\"] == \"categorical\" and (\n", " \"values\" not in metric or not metric[\"values\"]\n", " ):\n", " raise ValueError(\n", " f\"Missing mandatory 'values' field for 'categorical' type metric in {metric}\"\n", " )\n", "\n", " if metric[\"type\"] == \"categorical\" and not all(\n", " [\n", " is_valid_metric_value(metric_value=metric_value)\n", " for metric_value in metric[\"values\"]\n", " ]\n", " ):\n", " raise ValueError(\n", " f\"Invalid metric values for 'categorical' type of metric in {metric}\"\n", " )\n", "\n", " # Validate \"numerical\" type metric\n", " if metric[\"type\"] == \"numerical\" and not (\n", " \"range\" in metric or metric[\"range\"] or 2 <= len(metric[\"range\"]) > 3\n", " ):\n", " raise ValueError(\n", " f\"Missing or invalid 'range' field for 'numerical' type of metric in {metric}\"\n", " )\n", "\n", " # Validate \"aggregator\" field\n", " if metric[\"type\"] != \"text\" and \"aggregator\" not in metric:\n", " raise ValueError(f\"Missing mandatory 'aggregator' field in {metric}\")\n", "\n", " if metric[\"type\"] == \"numerical\" and metric[\"aggregator\"] != \"average\":\n", " raise ValueError(\n", " f\"Invalid 'aggregator' field for 'numerical' type of metric in {metric}\"\n", " )\n", "\n", " # Validate 'display_name' field, if present\n", " if \"display_name\" in metric and not isinstance(metric[\"display_name\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(metric['display_name'])} for 'display_name' field in {metric}\"\n", " )\n", "\n", " return True\n", "\n", "\n", "def is_valid_document(document: dict) -> bool:\n", " # Validate \"document_id\" field\n", " if \"document_id\" not in document:\n", " raise ValueError(f\"Missing mandatory 'document_id' field in {document}\")\n", "\n", " if not isinstance(document[\"document_id\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(document['document_id'])} for 'document_id' field in {document}\"\n", " )\n", "\n", " # Validate \"text\" field\n", " if \"text\" not in document:\n", " raise ValueError(f\"Missing mandatory 'text' field in {document}\")\n", "\n", " if not isinstance(document[\"text\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(document['text'])} for 'text' field in {document}\"\n", " )\n", "\n", " # Validate 'title' field, if present\n", " if \"title\" in document and not isinstance(document[\"title\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(document['title'])} for 'title' field in {document}\"\n", " )\n", "\n", " # Validate 'url' field, if present\n", " if \"url\" in document and not isinstance(document[\"url\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(document['url'])} for 'url' field in {document}\"\n", " )\n", "\n", " return True\n", "\n", "\n", "def is_valid_task(task: dict) -> bool:\n", " def is_valid_context(context: dict) -> bool:\n", " # Validate \"document_id\" field\n", " if \"document_id\" not in context:\n", " raise ValueError(f\"Missing mandatory 'document_id' field in {context}\")\n", "\n", " if not isinstance(context[\"document_id\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(context['document_id'])} for 'document_id' field in {context}\"\n", " )\n", "\n", " return True\n", "\n", " # Validate \"task_id\" field\n", " if \"task_id\" not in task:\n", " raise ValueError(f\"Missing mandatory 'task_id' field in {task}\")\n", "\n", " if not isinstance(task[\"task_id\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(task['task_id'])} for 'task_id' field in {task}\"\n", " )\n", "\n", " # Validate \"task_type\" field\n", " if \"task_type\" not in task:\n", " raise ValueError(f\"Missing mandatory 'task_type' field in {task}\")\n", "\n", " if not isinstance(task[\"task_type\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(task['task_type'])} for 'task_type' field in {task}\"\n", " )\n", "\n", " if task[\"task_type\"] not in [\"question_answering\", \"conversation\", \"rag\", \"text_generation\", \"json_generation\"]:\n", " raise ValueError(f\"Invalid task_type: {task['task_type']} in {task}\")\n", "\n", " # Validate `contexts` field\n", " if not all([is_valid_context(context=context) for context in task[\"contexts\"]]):\n", " raise ValueError(f\"Invalid context values in {task}\")\n", "\n", " return True\n", "\n", "\n", "def is_valid_evaluation(\n", " evaluation: dict, metrics: list[str], models: list[str]\n", ") -> bool:\n", " def is_valid_annotations(annotations: dict, metric: str) -> bool:\n", " for annotator_id, rating in annotations.items():\n", " if not isinstance(annotator_id, str):\n", " raise ValueError(\n", " f\"Invalid type: {type(annotator_id)} for 'annotator_id' in {annotations} for '{metric}' metric in evaluation with with task_id: {evaluation['task_id']} and model_id: {evaluation['model_id']}\"\n", " )\n", "\n", " if not isinstance(rating, dict):\n", " raise ValueError(\n", " f\"Invalid type: {type(rating)} for 'rating' in {annotations} for '{metric}' metric in evaluation with with task_id: {evaluation['task_id']} and model_id: {evaluation['model_id']}\"\n", " )\n", "\n", " # Validate \"task_id\" field\n", " if \"value\" not in rating:\n", " raise ValueError(\n", " f\"Missing mandatory 'value' field in {rating} for '{metric}' metric in evaluation with with task_id: {evaluation['task_id']} and model_id: {evaluation['model_id']}\"\n", " )\n", "\n", " if not (\n", " isinstance(rating[\"value\"], str)\n", " or isinstance(rating[\"value\"], float)\n", " or isinstance(rating[\"value\"], int)\n", " ):\n", " raise ValueError(\n", " f\"Invalid type: {type(rating['value'])} for 'value' in {rating} for '{metric}' metric in evaluation with with task_id: {evaluation['task_id']} and model_id: {evaluation['model_id']}\"\n", " )\n", "\n", " return True\n", "\n", " # Validate \"task_id\" field\n", " if \"task_id\" not in evaluation:\n", " raise ValueError(f\"Missing mandatory 'task_id' field in {evaluation}\")\n", "\n", " if not isinstance(evaluation[\"task_id\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(evaluation['task_id'])} for 'task_id' field in {evaluation}\"\n", " )\n", "\n", " # Validate \"model_id\" field\n", " if \"model_id\" not in evaluation:\n", " raise ValueError(f\"Missing mandatory 'model_id' field in {evaluation}\")\n", "\n", " if not isinstance(evaluation[\"model_id\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(evaluation['model_id'])} for 'model_id' field in {evaluation}\"\n", " )\n", "\n", " if evaluation[\"model_id\"] not in models:\n", " raise ValueError(\n", " f\"Invalid model with model_id: {evaluation['model_id']} for evaluation with task_id: {evaluation['task_id']}\"\n", " )\n", "\n", " # Validate \"model_response\" field\n", " if \"task_id\" not in evaluation:\n", " raise ValueError(f\"Missing mandatory 'model_response' field in {evaluation}\")\n", "\n", " if not isinstance(evaluation[\"model_response\"], str):\n", " raise ValueError(\n", " f\"Invalid type: {type(evaluation['model_response'])} for 'model_response' field in {evaluation}\"\n", " )\n", "\n", " # Validate \"annotations\" field\n", " if \"annotations\" not in evaluation:\n", " raise ValueError(f\"Missing mandatory 'annotations' field in {evaluation}\")\n", "\n", " if not all(\n", " is_valid_annotations(annotations=annotations, metric=metric)\n", " for metric, annotations in evaluation[\"annotations\"].items()\n", " ):\n", " raise ValueError(\n", " f\"Invalid annotations in evaluation with with task_id: {evaluation['task_id']} and model_id: {evaluation['model_id']}\"\n", " )\n", "\n", " return True\n", "\n", "\n", "def validate(data: dict, level: Literal[\"minimal\", \"aggresive\"] = \"minimal\") -> None:\n", " # Validate \"models\" field\n", " if \"models\" not in data:\n", " raise ValueError(f\"Missing mandatory 'models' field in {data}\")\n", "\n", " if not all(is_valid_model(model) for model in data[\"models\"]):\n", " raise ValueError(f\"Invalid model in {data['models']}\")\n", "\n", " # Validate \"metrics\" field\n", " if \"metrics\" not in data:\n", " raise ValueError(f\"Missing mandatory 'metrics' field in {data}\")\n", "\n", " if not all(is_valid_metric(metric) for metric in data[\"metrics\"]):\n", " raise ValueError(f\"Invalid metric in {data['metrics']}\")\n", "\n", " # Validate \"documents\" field\n", " if \"documents\" not in data:\n", " raise ValueError(f\"Missing mandatory 'documents' field in {data}\")\n", "\n", " if not all(is_valid_document(document) for document in data[\"documents\"]):\n", " raise ValueError(f\"Invalid document in {data['documents']}\")\n", "\n", " # Validate \"tasks\" field\n", " if \"tasks\" not in data:\n", " raise ValueError(f\"Missing mandatory 'tasks' field in {data}\")\n", "\n", " if not all(is_valid_task(task) for task in data[\"tasks\"]):\n", " raise ValueError(f\"Invalid task in {data['tasks']}\")\n", "\n", " # Warn about duplicate task IDs\n", " task_ids = set()\n", " for task in data[\"tasks\"]:\n", " task_id = task[\"task_id\"]\n", " if task_id in task_ids:\n", " print(f\"Duplicate task_id: {task_id} found in 'tasks' field\")\n", " else:\n", " task_ids.add(task_id)\n", "\n", " # Validate \"evaluations\" field\n", " if \"evaluations\" not in data:\n", " raise ValueError(f\"Missing mandatory 'evaluations' field in {data}\")\n", "\n", " applicable_metrics = [metric[\"name\"] for metric in data[\"metrics\"]]\n", " applicable_models = [model[\"model_id\"] for model in data[\"models\"]]\n", " if not all(\n", " is_valid_evaluation(\n", " evaluation, metrics=applicable_metrics, models=applicable_models\n", " )\n", " for evaluation in data[\"evaluations\"]\n", " ):\n", " raise ValueError(f\"Invalid evaluation in {data['evaluations']}\")\n", "\n", " # Validate evaluations exists for all task for all models with all metrics\n", " evaluated_models_per_task = {}\n", " evaluated_metrics_per_model_per_task = {}\n", " for evaluation in data[\"evaluations\"]:\n", " task_id = evaluation[\"task_id\"]\n", " model_id = evaluation[\"model_id\"]\n", " try:\n", " evaluated_models_per_task[task_id].append(model_id)\n", " except KeyError:\n", " evaluated_models_per_task[task_id] = [model_id]\n", "\n", " for metric in evaluation[\"annotations\"].keys():\n", " try:\n", " evaluated_metrics_per_model_per_task[f\"{task_id}:++:{model_id}\"].append(\n", " metric\n", " )\n", " except KeyError:\n", " evaluated_metrics_per_model_per_task[f\"{task_id}:++:{model_id}\"] = [\n", " metric\n", " ]\n", "\n", " evaluated_task_ids = set(evaluated_models_per_task.keys())\n", " if evaluated_task_ids != task_ids:\n", " if len(evaluated_task_ids) > len(task_ids):\n", " print(\n", " f\"Evaluations found for following additional tasks: {evaluated_task_ids - task_ids}\"\n", " )\n", " elif len(task_ids) > len(evaluated_task_ids):\n", " print(\n", " f\"Missing evaluations following tasks: {task_ids - evaluated_task_ids}\"\n", " )\n", " else:\n", " print(\n", " f\"Missing evaluations following tasks: {task_ids - evaluated_task_ids}\"\n", " )\n", " print(\n", " f\"Evaluations found for following additional tasks: {evaluated_task_ids - task_ids}\"\n", " )\n", "\n", " evaluations_with_missing_models = {}\n", " evaluations_with_additional_models = {}\n", " for task_id, models in evaluated_models_per_task.items():\n", " if set(models) != set(applicable_models):\n", " if set(applicable_models) - set(models):\n", " evaluations_with_missing_models[task_id] = set(applicable_models) - set(\n", " models\n", " )\n", " elif set(models) - set(applicable_models):\n", " evaluations_with_additional_models[task_id] = set(models) - set(\n", " applicable_models\n", " )\n", "\n", " if evaluations_with_missing_models:\n", " for task_id, missing_models in evaluations_with_missing_models.items():\n", " print(\n", " f\"Missing following models: {missing_models} for task with task_id: {task_id}\"\n", " )\n", "\n", " evaluations_per_model_with_missing_metrics = {}\n", " evaluations_per_model_with_additional_metrics = {}\n", " for key, metrics in evaluated_metrics_per_model_per_task.items():\n", " if set(metrics) != set(applicable_metrics):\n", " if set(applicable_metrics) - set(metrics):\n", " evaluations_per_model_with_missing_metrics[key] = set(\n", " applicable_metrics\n", " ) - set(metrics)\n", " elif set(metrics) - set(applicable_metrics):\n", " evaluations_per_model_with_additional_metrics[key] = set(metrics) - set(\n", " applicable_metrics\n", " )\n", "\n", " if evaluations_per_model_with_missing_metrics:\n", " for key, missing_metrics in evaluations_per_model_with_missing_metrics.items():\n", " segments = key.split(\":++:\")\n", " print(\n", " f\"Missing following metrics: {missing_metrics} for task with task_id: {segments[0]} and model_id: {segments[1]}\"\n", " )\n", "\n", " # Additional checks\n", " if level == \"aggresive\":\n", " if evaluations_with_additional_models:\n", " print(\"====================================================\")\n", " print(\"Evaluations with additional models\")\n", " print(\"====================================================\")\n", " for (\n", " task_id,\n", " additional_models,\n", " ) in evaluations_with_additional_models.items():\n", " print(f\"Task ID: {task_id}\\tAdditional models: {additional_models}\")\n", "\n", " if evaluations_per_model_with_additional_metrics:\n", " print(\"====================================================\")\n", " print(\"Evaluations with additional metrics\")\n", " print(\"====================================================\")\n", " for (\n", " key,\n", " additional_metrics,\n", " ) in evaluations_per_model_with_additional_metrics.items():\n", " segments = key.split(\":++:\")\n", " print(\n", " f\"Task ID: {segments[0]}\\tModel: {segments[1]}\\tAdditional metrics: {additional_metrics}\"\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run validator\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "validate(\n", " data=read_json(\n", " filename=\"\"\n", " ),\n", " level=\"aggresive\",\n", ")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.13" } }, "nbformat": 4, "nbformat_minor": 1 }