Spaces:
Running
Running
File size: 1,606 Bytes
f59253c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# tasks.py
import asyncio
from typing import Dict
from uuid import uuid4
# A dictionary to keep track of active tasks
tasks: Dict[str, asyncio.Task] = {}
def cleanup_task(task_id: str):
"""
Remove a completed or canceled task from the global `tasks` dictionary.
"""
tasks.pop(task_id, None) # Remove the task if it exists
def create_task(coroutine):
"""
Create a new asyncio task and add it to the global task dictionary.
"""
task_id = str(uuid4()) # Generate a unique ID for the task
task = asyncio.create_task(coroutine) # Create the task
# Add a done callback for cleanup
task.add_done_callback(lambda t: cleanup_task(task_id))
tasks[task_id] = task
return task_id, task
def get_task(task_id: str):
"""
Retrieve a task by its task ID.
"""
return tasks.get(task_id)
def list_tasks():
"""
List all currently active task IDs.
"""
return list(tasks.keys())
async def stop_task(task_id: str):
"""
Cancel a running task and remove it from the global task list.
"""
task = tasks.get(task_id)
if not task:
raise ValueError(f"Task with ID {task_id} not found.")
task.cancel() # Request task cancellation
try:
await task # Wait for the task to handle the cancellation
except asyncio.CancelledError:
# Task successfully canceled
tasks.pop(task_id, None) # Remove it from the dictionary
return {"status": True, "message": f"Task {task_id} successfully stopped."}
return {"status": False, "message": f"Failed to stop task {task_id}."}
|