#!/usr/bin/env python # -*- coding: utf-8 -*- import falcon from engine import evaluate_solution from threading import Lock from typing import Dict import logging class RootResource: def __init__(self): self.main_lock = Lock() self.task_locks: Dict[str, Lock] = {} self.max_tasks = 5 def on_get(self, request, response): response.text = 'Human Eval for Solidity Server v1.2410.0' def on_post(self, request, response): payload = request.media if 'task_id' not in payload or 'solution' not in payload: response.status = falcon.HTTP_400 response.media = { 'error': 'task_id or solution are missing', } return task_id = payload['task_id'] solution = payload['solution'] try: with self.main_lock: if len(self.task_locks) >= self.max_tasks: # Remove the oldest task lock if we've reached the limit oldest_task = next(iter(self.task_locks)) del self.task_locks[oldest_task] if task_id not in self.task_locks: self.task_locks[task_id] = Lock() with self.task_locks[task_id]: passed = evaluate_solution(task_id, solution) response.media = {'passed': passed} except FileNotFoundError as e: logging.error('Task not found: {}'.format(str(e))) response.status = falcon.HTTP_404 response.media = {'error': 'Task not found'} except Exception as e: logging.error('Error processing request: {}'.format(str(e))) response.status = falcon.HTTP_500 response.media = {'error': 'Internal server error'}