# Validate analytics JSON

### ✅ Prerequisites

[Python 3.10](https://www.python.org/downloads/)


In [None]:
from typing import Literal
import json

def read_json(filename: str, encoding="utf-8"):
 with open(filename, mode="r", encoding=encoding) as fp:
 return json.load(fp)


def is_valid_model(model: dict) -> bool:
 if "model_id" not in model:
 raise ValueError(f"Missing mandatory 'model_id' field in {model}")
 if "name" not in model:
 raise ValueError(f"Missing mandatory 'model_id' field in {model}")
 if "owner" not in model:
 raise ValueError(f"Missing mandatory 'model_id' field in {model}")

 return True


def is_valid_metric(metric: dict) -> bool:
 def is_valid_metric_value(metric_value: dict) -> bool:
 # Validate "value" field
 if "value" not in metric_value or not metric_value["value"]:
 raise ValueError(f"Missing mandatory 'value' field in {metric_value}")

 if not (
 isinstance(metric_value["value"], str)
 or isinstance(metric_value["value"], float)
 or isinstance(metric_value["value"], int)
 ):
 raise ValueError(
 f"Invalid type: {type(metric_value['value'])} for 'value' field in {metric_value}"
 )

 return True

 # Validate "name" field
 if "name" not in metric:
 raise ValueError(f"Missing mandatory 'name' field in {metric}")

 if not isinstance(metric["name"], str):
 raise ValueError(
 f"Invalid type: {type(metric['name'])} for 'name' field in {metric}"
 )

 # Validate "author" field
 if "author" not in metric:
 raise ValueError(f"Missing mandatory 'name' field in {metric}")

 if not isinstance(metric["author"], str):
 raise ValueError(
 f"Invalid type: {type(metric['author'])} for 'author' field in {metric}"
 )

 if metric["author"] not in ["human", "algorithm"]:
 raise ValueError(f"Unsupported author: {metric['author']} in {metric}")

 # Validate "type" field
 if "type" not in metric:
 raise ValueError(f"Missing mandatory 'type' field in {metric}")

 if metric["type"] not in ["categorical", "numerical", "text"]:
 raise ValueError(f"Unsupported type: {metric['type']} in {metric}")

 # Validate "categorical" type metric
 if metric["type"] == "categorical" and (
 "values" not in metric or not metric["values"]
 ):
 raise ValueError(
 f"Missing mandatory 'values' field for 'categorical' type metric in {metric}"
 )

 if metric["type"] == "categorical" and not all(
 [
 is_valid_metric_value(metric_value=metric_value)
 for metric_value in metric["values"]
 ]
 ):
 raise ValueError(
 f"Invalid metric values for 'categorical' type of metric in {metric}"
 )

 # Validate "numerical" type metric
 if metric["type"] == "numerical" and not (
 "range" in metric or metric["range"] or 2 <= len(metric["range"]) > 3
 ):
 raise ValueError(
 f"Missing or invalid 'range' field for 'numerical' type of metric in {metric}"
 )

 # Validate "aggregator" field
 if metric["type"] != "text" and "aggregator" not in metric:
 raise ValueError(f"Missing mandatory 'aggregator' field in {metric}")

 if metric["type"] == "numerical" and metric["aggregator"] != "average":
 raise ValueError(
 f"Invalid 'aggregator' field for 'numerical' type of metric in {metric}"
 )

 # Validate 'display_name' field, if present
 if "display_name" in metric and not isinstance(metric["display_name"], str):
 raise ValueError(
 f"Invalid type: {type(metric['display_name'])} for 'display_name' field in {metric}"
 )

 return True


def is_valid_document(document: dict) -> bool:
 # Validate "document_id" field
 if "document_id" not in document:
 raise ValueError(f"Missing mandatory 'document_id' field in {document}")

 if not isinstance(document["document_id"], str):
 raise ValueError(
 f"Invalid type: {type(document['document_id'])} for 'document_id' field in {document}"
 )

 # Validate "text" field
 if "text" not in document:
 raise ValueError(f"Missing mandatory 'text' field in {document}")

 if not isinstance(document["text"], str):
 raise ValueError(
 f"Invalid type: {type(document['text'])} for 'text' field in {document}"
 )

 # Validate 'title' field, if present
 if "title" in document and not isinstance(document["title"], str):
 raise ValueError(
 f"Invalid type: {type(document['title'])} for 'title' field in {document}"
 )

 # Validate 'url' field, if present
 if "url" in document and not isinstance(document["url"], str):
 raise ValueError(
 f"Invalid type: {type(document['url'])} for 'url' field in {document}"
 )

 return True


def is_valid_task(task: dict) -> bool:
 def is_valid_context(context: dict) -> bool:
 # Validate "document_id" field
 if "document_id" not in context:
 raise ValueError(f"Missing mandatory 'document_id' field in {context}")

 if not isinstance(context["document_id"], str):
 raise ValueError(
 f"Invalid type: {type(context['document_id'])} for 'document_id' field in {context}"
 )

 return True

 # Validate "task_id" field
 if "task_id" not in task:
 raise ValueError(f"Missing mandatory 'task_id' field in {task}")

 if not isinstance(task["task_id"], str):
 raise ValueError(
 f"Invalid type: {type(task['task_id'])} for 'task_id' field in {task}"
 )

 # Validate "task_type" field
 if "task_type" not in task:
 raise ValueError(f"Missing mandatory 'task_type' field in {task}")

 if not isinstance(task["task_type"], str):
 raise ValueError(
 f"Invalid type: {type(task['task_type'])} for 'task_type' field in {task}"
 )

 if task["task_type"] not in ["question_answering", "conversation", "rag", "text_generation", "json_generation"]:
 raise ValueError(f"Invalid task_type: {task['task_type']} in {task}")

 # Validate `contexts` field
 if not all([is_valid_context(context=context) for context in task["contexts"]]):
 raise ValueError(f"Invalid context values in {task}")

 return True


def is_valid_evaluation(
 evaluation: dict, metrics: list[str], models: list[str]
) -> bool:
 def is_valid_annotations(annotations: dict, metric: str) -> bool:
 for annotator_id, rating in annotations.items():
 if not isinstance(annotator_id, str):
 raise ValueError(
 f"Invalid type: {type(annotator_id)} for 'annotator_id' in {annotations} for '{metric}' metric in evaluation with with task_id: {evaluation['task_id']} and model_id: {evaluation['model_id']}"
 )

 if not isinstance(rating, dict):
 raise ValueError(
 f"Invalid type: {type(rating)} for 'rating' in {annotations} for '{metric}' metric in evaluation with with task_id: {evaluation['task_id']} and model_id: {evaluation['model_id']}"
 )

 # Validate "task_id" field
 if "value" not in rating:
 raise ValueError(
 f"Missing mandatory 'value' field in {rating} for '{metric}' metric in evaluation with with task_id: {evaluation['task_id']} and model_id: {evaluation['model_id']}"
 )

 if not (
 isinstance(rating["value"], str)
 or isinstance(rating["value"], float)
 or isinstance(rating["value"], int)
 ):
 raise ValueError(
 f"Invalid type: {type(rating['value'])} for 'value' in {rating} for '{metric}' metric in evaluation with with task_id: {evaluation['task_id']} and model_id: {evaluation['model_id']}"
 )

 return True

 # Validate "task_id" field
 if "task_id" not in evaluation:
 raise ValueError(f"Missing mandatory 'task_id' field in {evaluation}")

 if not isinstance(evaluation["task_id"], str):
 raise ValueError(
 f"Invalid type: {type(evaluation['task_id'])} for 'task_id' field in {evaluation}"
 )

 # Validate "model_id" field
 if "model_id" not in evaluation:
 raise ValueError(f"Missing mandatory 'model_id' field in {evaluation}")

 if not isinstance(evaluation["model_id"], str):
 raise ValueError(
 f"Invalid type: {type(evaluation['model_id'])} for 'model_id' field in {evaluation}"
 )

 if evaluation["model_id"] not in models:
 raise ValueError(
 f"Invalid model with model_id: {evaluation['model_id']} for evaluation with task_id: {evaluation['task_id']}"
 )

 # Validate "model_response" field
 if "task_id" not in evaluation:
 raise ValueError(f"Missing mandatory 'model_response' field in {evaluation}")

 if not isinstance(evaluation["model_response"], str):
 raise ValueError(
 f"Invalid type: {type(evaluation['model_response'])} for 'model_response' field in {evaluation}"
 )

 # Validate "annotations" field
 if "annotations" not in evaluation:
 raise ValueError(f"Missing mandatory 'annotations' field in {evaluation}")

 if not all(
 is_valid_annotations(annotations=annotations, metric=metric)
 for metric, annotations in evaluation["annotations"].items()
 ):
 raise ValueError(
 f"Invalid annotations in evaluation with with task_id: {evaluation['task_id']} and model_id: {evaluation['model_id']}"
 )

 return True


def validate(data: dict, level: Literal["minimal", "aggresive"] = "minimal") -> None:
 # Validate "models" field
 if "models" not in data:
 raise ValueError(f"Missing mandatory 'models' field in {data}")

 if not all(is_valid_model(model) for model in data["models"]):
 raise ValueError(f"Invalid model in {data['models']}")

 # Validate "metrics" field
 if "metrics" not in data:
 raise ValueError(f"Missing mandatory 'metrics' field in {data}")

 if not all(is_valid_metric(metric) for metric in data["metrics"]):
 raise ValueError(f"Invalid metric in {data['metrics']}")

 # Validate "documents" field
 if "documents" not in data:
 raise ValueError(f"Missing mandatory 'documents' field in {data}")

 if not all(is_valid_document(document) for document in data["documents"]):
 raise ValueError(f"Invalid document in {data['documents']}")

 # Validate "tasks" field
 if "tasks" not in data:
 raise ValueError(f"Missing mandatory 'tasks' field in {data}")

 if not all(is_valid_task(task) for task in data["tasks"]):
 raise ValueError(f"Invalid task in {data['tasks']}")

 # Warn about duplicate task IDs
 task_ids = set()
 for task in data["tasks"]:
 task_id = task["task_id"]
 if task_id in task_ids:
 print(f"Duplicate task_id: {task_id} found in 'tasks' field")
 else:
 task_ids.add(task_id)

 # Validate "evaluations" field
 if "evaluations" not in data:
 raise ValueError(f"Missing mandatory 'evaluations' field in {data}")

 applicable_metrics = [metric["name"] for metric in data["metrics"]]
 applicable_models = [model["model_id"] for model in data["models"]]
 if not all(
 is_valid_evaluation(
 evaluation, metrics=applicable_metrics, models=applicable_models
 )
 for evaluation in data["evaluations"]
 ):
 raise ValueError(f"Invalid evaluation in {data['evaluations']}")

 # Validate evaluations exists for all task for all models with all metrics
 evaluated_models_per_task = {}
 evaluated_metrics_per_model_per_task = {}
 for evaluation in data["evaluations"]:
 task_id = evaluation["task_id"]
 model_id = evaluation["model_id"]
 try:
 evaluated_models_per_task[task_id].append(model_id)
 except KeyError:
 evaluated_models_per_task[task_id] = [model_id]

 for metric in evaluation["annotations"].keys():
 try:
 evaluated_metrics_per_model_per_task[f"{task_id}:++:{model_id}"].append(
 metric
 )
 except KeyError:
 evaluated_metrics_per_model_per_task[f"{task_id}:++:{model_id}"] = [
 metric
 ]

 evaluated_task_ids = set(evaluated_models_per_task.keys())
 if evaluated_task_ids != task_ids:
 if len(evaluated_task_ids) > len(task_ids):
 print(
 f"Evaluations found for following additional tasks: {evaluated_task_ids - task_ids}"
 )
 elif len(task_ids) > len(evaluated_task_ids):
 print(
 f"Missing evaluations following tasks: {task_ids - evaluated_task_ids}"
 )
 else:
 print(
 f"Missing evaluations following tasks: {task_ids - evaluated_task_ids}"
 )
 print(
 f"Evaluations found for following additional tasks: {evaluated_task_ids - task_ids}"
 )

 evaluations_with_missing_models = {}
 evaluations_with_additional_models = {}
 for task_id, models in evaluated_models_per_task.items():
 if set(models) != set(applicable_models):
 if set(applicable_models) - set(models):
 evaluations_with_missing_models[task_id] = set(applicable_models) - set(
 models
 )
 elif set(models) - set(applicable_models):
 evaluations_with_additional_models[task_id] = set(models) - set(
 applicable_models
 )

 if evaluations_with_missing_models:
 for task_id, missing_models in evaluations_with_missing_models.items():
 print(
 f"Missing following models: {missing_models} for task with task_id: {task_id}"
 )

 evaluations_per_model_with_missing_metrics = {}
 evaluations_per_model_with_additional_metrics = {}
 for key, metrics in evaluated_metrics_per_model_per_task.items():
 if set(metrics) != set(applicable_metrics):
 if set(applicable_metrics) - set(metrics):
 evaluations_per_model_with_missing_metrics[key] = set(
 applicable_metrics
 ) - set(metrics)
 elif set(metrics) - set(applicable_metrics):
 evaluations_per_model_with_additional_metrics[key] = set(metrics) - set(
 applicable_metrics
 )

 if evaluations_per_model_with_missing_metrics:
 for key, missing_metrics in evaluations_per_model_with_missing_metrics.items():
 segments = key.split(":++:")
 print(
 f"Missing following metrics: {missing_metrics} for task with task_id: {segments[0]} and model_id: {segments[1]}"
 )

 # Additional checks
 if level == "aggresive":
 if evaluations_with_additional_models:
 print("====================================================")
 print("Evaluations with additional models")
 print("====================================================")
 for (
 task_id,
 additional_models,
 ) in evaluations_with_additional_models.items():
 print(f"Task ID: {task_id}\tAdditional models: {additional_models}")

 if evaluations_per_model_with_additional_metrics:
 print("====================================================")
 print("Evaluations with additional metrics")
 print("====================================================")
 for (
 key,
 additional_metrics,
 ) in evaluations_per_model_with_additional_metrics.items():
 segments = key.split(":++:")
 print(
 f"Task ID: {segments[0]}\tModel: {segments[1]}\tAdditional metrics: {additional_metrics}"
 )

### Run validator


In [None]:
validate(
 data=read_json(
 filename=""
 ),
 level="aggresive",
)