Real-Time-SD-Turbo / user_queue.py
radames's picture
queue
3207814
raw
history blame
1.51 kB
from typing import Dict
from uuid import UUID
import asyncio
from fastapi import WebSocket
from types import SimpleNamespace
from typing import Dict
from typing import Union
UserDataContent = Dict[UUID, Dict[str, Union[WebSocket, asyncio.Queue]]]
class UserData:
def __init__(self):
self.data_content: Dict[UUID, UserDataContent] = {}
async def create_user(self, user_id: UUID, websocket: WebSocket):
self.data_content[user_id] = {
"websocket": websocket,
"queue": asyncio.Queue(),
}
await asyncio.sleep(1)
def check_user(self, user_id: UUID) -> bool:
return user_id in self.data_content
async def update_data(self, user_id: UUID, new_data: SimpleNamespace):
user_session = self.data_content[user_id]
queue = user_session["queue"]
while not queue.empty():
try:
queue.get_nowait()
except asyncio.QueueEmpty:
continue
await queue.put(new_data)
async def get_latest_data(self, user_id: UUID) -> SimpleNamespace:
user_session = self.data_content[user_id]
queue = user_session["queue"]
try:
return await queue.get()
except asyncio.QueueEmpty:
return None
def delete_user(self, user_id: UUID):
if user_id in self.data_content:
del self.data_content[user_id]
def get_user_count(self) -> int:
return len(self.data_content)
user_data = UserData()