Spaces:
Build error
Build error
from queue import Queue | |
from typing import Any | |
from langchain.callbacks.base import BaseCallbackHandler | |
class QueueCallback(BaseCallbackHandler): | |
"""Callback handler for streaming LLM responses to a queue.""" | |
def __init__(self, queue: Queue): | |
self.queue = queue | |
def on_llm_new_token(self, token: str, **kwargs: Any) -> None: | |
self.queue.put(token) | |
def on_llm_end(self, *args, **kwargs: Any) -> None: | |
return self.queue.empty() |