DuckDB-SQL-Eval / evaluation_logic.py
cfahlgren1's picture
cfahlgren1 HF staff
flatten results for dataset
ee5875c
raw
history blame
9.82 kB
import os
import sys
from pathlib import Path
from datetime import datetime
import json
import traceback
import uuid
from huggingface_hub import CommitScheduler
current_dir = Path(__file__).resolve().parent
duckdb_nsql_dir = current_dir / 'duckdb-nsql'
eval_dir = duckdb_nsql_dir / 'eval'
sys.path.extend([str(current_dir), str(duckdb_nsql_dir), str(eval_dir)])
from eval.predict import get_manifest, DefaultLoader, PROMPT_FORMATTERS, generate_sql
from eval.evaluate import evaluate, compute_metrics, get_to_print
from eval.evaluate import test_suite_evaluation, read_tables_json
from eval.schema import TextToSQLParams, Table
AVAILABLE_PROMPT_FORMATS = list(PROMPT_FORMATTERS.keys())
prediction_folder = Path("prediction_results/")
evaluation_folder = Path("evaluation_results/")
file_uuid = uuid.uuid4()
prediction_scheduler = CommitScheduler(
repo_id="sql-console/duckdb-nsql-predictions",
repo_type="dataset",
folder_path=prediction_folder,
path_in_repo="data",
every=10,
)
evaluation_scheduler = CommitScheduler(
repo_id="sql-console/duckdb-nsql-scores",
repo_type="dataset",
folder_path=evaluation_folder,
path_in_repo="data",
every=10,
)
def save_prediction(inference_api, model_name, prompt_format, question, generated_sql):
prediction_file = prediction_folder / f"prediction_{file_uuid}.json"
prediction_folder.mkdir(parents=True, exist_ok=True)
with prediction_scheduler.lock:
with prediction_file.open("a") as f:
json.dump({
"inference_api": inference_api,
"model_name": model_name,
"prompt_format": prompt_format,
"question": question,
"generated_sql": generated_sql,
"timestamp": datetime.now().isoformat()
}, f)
def save_evaluation(inference_api, model_name, prompt_format, metrics):
evaluation_file = evaluation_folder / f"evaluation_{file_uuid}.json"
evaluation_folder.mkdir(parents=True, exist_ok=True)
# Extract and flatten the category-specific execution metrics
categories = ['easy', 'medium', 'hard', 'duckdb', 'ddl', 'all']
flattened_metrics = {
"inference_api": inference_api,
"model_name": model_name,
"prompt_format": prompt_format,
"timestamp": datetime.now().isoformat()
}
# Flatten each category's metrics into separate columns
for category in categories:
if category in metrics['exec']:
category_metrics = metrics['exec'][category]
flattened_metrics[f"{category}_count"] = category_metrics['count']
flattened_metrics[f"{category}_execution_accuracy"] = category_metrics['exec']
else:
flattened_metrics[f"{category}_count"] = 0
flattened_metrics[f"{category}_execution_accuracy"] = 0.0
with evaluation_scheduler.lock:
with evaluation_file.open("a") as f:
json.dump(flattened_metrics, f)
f.write('\n')
def run_prediction(inference_api, model_name, prompt_format, output_file):
dataset_path = str(eval_dir / "data/dev.json")
table_meta_path = str(eval_dir / "data/tables.json")
stop_tokens = [';']
max_tokens = 30000
temperature = 0.1
num_beams = -1
manifest_client = inference_api
manifest_engine = model_name
manifest_connection = "http://localhost:5000"
overwrite_manifest = True
parallel = False
yield "Starting prediction..."
try:
# Initialize necessary components
data_formatter = DefaultLoader()
prompt_formatter = PROMPT_FORMATTERS[prompt_format]()
# Load manifest
manifest = get_manifest(
manifest_client=manifest_client,
manifest_connection=manifest_connection,
manifest_engine=manifest_engine,
)
# Load data
data = data_formatter.load_data(dataset_path)
db_to_tables = data_formatter.load_table_metadata(table_meta_path)
# Prepare input for generate_sql
text_to_sql_inputs = []
for input_question in data:
question = input_question["question"]
db_id = input_question.get("db_id", "none")
if db_id != "none":
table_params = list(db_to_tables.get(db_id, {}).values())
else:
table_params = []
text_to_sql_inputs.append(TextToSQLParams(
instruction=question,
database=db_id,
tables=table_params,
))
# Generate SQL
generated_sqls = generate_sql(
manifest=manifest,
text_to_sql_in=text_to_sql_inputs,
retrieved_docs=[[] for _ in text_to_sql_inputs],
prompt_formatter=prompt_formatter,
stop_tokens=stop_tokens,
overwrite_manifest=overwrite_manifest,
max_tokens=max_tokens,
temperature=temperature,
num_beams=num_beams,
parallel=parallel
)
# Save results
output_file.parent.mkdir(parents=True, exist_ok=True)
with output_file.open('w') as f:
for original_data, (sql, _) in zip(data, generated_sqls):
output = {**original_data, "pred": sql}
json.dump(output, f)
f.write('\n')
# Save prediction to dataset
save_prediction(inference_api, model_name, prompt_format, original_data["question"], sql)
yield f"Prediction completed. Results saved to {output_file}"
except Exception as e:
yield f"Prediction failed with error: {str(e)}"
yield f"Error traceback: {traceback.format_exc()}"
def run_evaluation(inference_api, model_name, prompt_format="duckdbinstgraniteshort"):
if "OPENROUTER_API_KEY" not in os.environ:
yield "Error: OPENROUTER_API_KEY not found in environment variables."
return
if "HF_TOKEN" not in os.environ:
yield "Error: HF_TOKEN not found in environment variables."
return
try:
# Set up the arguments
dataset_path = str(eval_dir / "data/dev.json")
table_meta_path = str(eval_dir / "data/tables.json")
output_dir = eval_dir / "output"
yield f"Using model: {model_name}"
yield f"Using prompt format: {prompt_format}"
output_file = output_dir / f"{prompt_format}_0docs_{model_name.replace('/', '_')}_dev_{datetime.now().strftime('%y-%m-%d')}.json"
# Ensure the output directory exists
output_dir.mkdir(parents=True, exist_ok=True)
if output_file.exists():
yield f"Prediction file already exists: {output_file}"
yield "Skipping prediction step and proceeding to evaluation."
else:
# Run prediction
for output in run_prediction(inference_api, model_name, prompt_format, output_file):
yield output
# Run evaluation
yield "Starting evaluation..."
# Set up evaluation arguments
gold_path = Path(dataset_path)
db_dir = str(eval_dir / "data/databases/")
tables_path = Path(table_meta_path)
kmaps = test_suite_evaluation.build_foreign_key_map_from_json(str(tables_path))
db_schemas = read_tables_json(str(tables_path))
gold_sqls_dict = json.load(gold_path.open("r", encoding="utf-8"))
pred_sqls_dict = [json.loads(l) for l in output_file.open("r").readlines()]
gold_sqls = [p.get("query", p.get("sql", "")) for p in gold_sqls_dict]
setup_sqls = [p["setup_sql"] for p in gold_sqls_dict]
validate_sqls = [p["validation_sql"] for p in gold_sqls_dict]
gold_dbs = [p.get("db_id", p.get("db", "")) for p in gold_sqls_dict]
pred_sqls = [p["pred"] for p in pred_sqls_dict]
categories = [p.get("category", "") for p in gold_sqls_dict]
yield "Computing metrics..."
metrics = compute_metrics(
gold_sqls=gold_sqls,
pred_sqls=pred_sqls,
gold_dbs=gold_dbs,
setup_sqls=setup_sqls,
validate_sqls=validate_sqls,
kmaps=kmaps,
db_schemas=db_schemas,
database_dir=db_dir,
lowercase_schema_match=False,
model_name=model_name,
categories=categories,
)
# Save evaluation results to dataset
save_evaluation(inference_api, model_name, prompt_format, metrics)
yield "Evaluation completed."
if metrics:
yield "Overall Results:"
overall_metrics = metrics['exec']['all']
yield f"All (n={overall_metrics['count']}) - Execution Accuracy: {overall_metrics['exec']:.3f}"
yield f"All (n={overall_metrics['count']}) - Edit Distance: {metrics['edit_distance']['edit_distance']:.3f}"
categories = ['easy', 'medium', 'hard', 'duckdb', 'ddl', 'all']
for category in categories:
if category in metrics['exec']:
category_metrics = metrics['exec'][category]
yield f"{category} (n={category_metrics['count']}) - Execution Accuracy: {category_metrics['exec']:.3f}"
else:
yield f"{category}: No data available"
else:
yield "No evaluation metrics returned."
except Exception as e:
yield f"An unexpected error occurred: {str(e)}"
yield f"Error traceback: {traceback.format_exc()}"
if __name__ == "__main__":
model_name = input("Enter the model name: ")
prompt_format = input("Enter the prompt format (default is duckdbinstgraniteshort): ") or "duckdbinstgraniteshort"
for result in run_evaluation(model_name, prompt_format):
print(result, flush=True)