Spaces:
Running
Running
import gc | |
import threading | |
import time | |
import uuid | |
from typing import Optional | |
from loguru import logger | |
from carvekit.api.interface import Interface | |
from carvekit.web.schemas.config import WebAPIConfig | |
from carvekit.web.utils.init_utils import init_interface | |
from carvekit.web.other.removebg import process_remove_bg | |
class MLProcessor(threading.Thread): | |
"""Simple ml task queue processor""" | |
def __init__(self, api_config: WebAPIConfig): | |
super().__init__() | |
self.api_config = api_config | |
self.interface: Optional[Interface] = None | |
self.jobs = {} | |
self.completed_jobs = {} | |
def run(self): | |
"""Starts listening for new jobs.""" | |
unused_completed_jobs_timer = time.time() | |
if self.interface is None: | |
self.interface = init_interface(self.api_config) | |
while True: | |
# Clear unused completed jobs every hour | |
if time.time() - unused_completed_jobs_timer > 60: | |
self.clear_old_completed_jobs() | |
unused_completed_jobs_timer = time.time() | |
if len(self.jobs.keys()) >= 1: | |
id = list(self.jobs.keys())[0] | |
data = self.jobs[id] | |
# TODO add pydantic scheme here | |
response = process_remove_bg( | |
self.interface, data[0], data[1], data[2], data[3] | |
) | |
self.completed_jobs[id] = [response, time.time()] | |
try: | |
del self.jobs[id] | |
except KeyError or NameError as e: | |
logger.error(f"Something went wrong with Task Queue: {str(e)}") | |
gc.collect() | |
else: | |
time.sleep(1) | |
continue | |
def clear_old_completed_jobs(self): | |
"""Clears old completed jobs""" | |
if len(self.completed_jobs.keys()) >= 1: | |
for job_id in self.completed_jobs.keys(): | |
job_finished_time = self.completed_jobs[job_id][1] | |
if time.time() - job_finished_time > 3600: | |
try: | |
del self.completed_jobs[job_id] | |
except KeyError or NameError as e: | |
logger.error(f"Something went wrong with Task Queue: {str(e)}") | |
gc.collect() | |
def job_status(self, id: str) -> str: | |
""" | |
Returns current job status | |
Args: | |
id: id of the job | |
Returns: | |
Current job status for specified id. Job status can be [finished, wait, not_found] | |
""" | |
if id in self.completed_jobs.keys(): | |
return "finished" | |
elif id in self.jobs.keys(): | |
return "wait" | |
else: | |
return "not_found" | |
def job_result(self, id: str): | |
""" | |
Returns job processing result. | |
Args: | |
id: id of the job | |
Returns: | |
job processing result. | |
""" | |
if id in self.completed_jobs.keys(): | |
data = self.completed_jobs[id][0] | |
try: | |
del self.completed_jobs[id] | |
except KeyError or NameError: | |
pass | |
return data | |
else: | |
return False | |
def job_create(self, data: list): | |
""" | |
Send job to ML Processor | |
Args: | |
data: data object | |
""" | |
if self.is_alive() is False: | |
self.start() | |
id = uuid.uuid4().hex | |
self.jobs[id] = data | |
return id | |