Spaces:
Paused
Paused
| from collections.abc import Generator, Mapping | |
| from typing import Any, Union | |
| from openai._exceptions import RateLimitError | |
| from configs import dify_config | |
| from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator | |
| from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator | |
| from core.app.apps.chat.app_generator import ChatAppGenerator | |
| from core.app.apps.completion.app_generator import CompletionAppGenerator | |
| from core.app.apps.workflow.app_generator import WorkflowAppGenerator | |
| from core.app.entities.app_invoke_entities import InvokeFrom | |
| from core.app.features.rate_limiting import RateLimit | |
| from models.model import Account, App, AppMode, EndUser | |
| from models.workflow import Workflow | |
| from services.errors.llm import InvokeRateLimitError | |
| from services.workflow_service import WorkflowService | |
| class AppGenerateService: | |
| def generate( | |
| cls, | |
| app_model: App, | |
| user: Union[Account, EndUser], | |
| args: Mapping[str, Any], | |
| invoke_from: InvokeFrom, | |
| streaming: bool = True, | |
| ): | |
| """ | |
| App Content Generate | |
| :param app_model: app model | |
| :param user: user | |
| :param args: args | |
| :param invoke_from: invoke from | |
| :param streaming: streaming | |
| :return: | |
| """ | |
| max_active_request = AppGenerateService._get_max_active_requests(app_model) | |
| rate_limit = RateLimit(app_model.id, max_active_request) | |
| request_id = RateLimit.gen_request_key() | |
| try: | |
| request_id = rate_limit.enter(request_id) | |
| if app_model.mode == AppMode.COMPLETION.value: | |
| return rate_limit.generate( | |
| CompletionAppGenerator().generate( | |
| app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming | |
| ), | |
| request_id, | |
| ) | |
| elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent: | |
| return rate_limit.generate( | |
| AgentChatAppGenerator().generate( | |
| app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming | |
| ), | |
| request_id, | |
| ) | |
| elif app_model.mode == AppMode.CHAT.value: | |
| return rate_limit.generate( | |
| ChatAppGenerator().generate( | |
| app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming | |
| ), | |
| request_id, | |
| ) | |
| elif app_model.mode == AppMode.ADVANCED_CHAT.value: | |
| workflow = cls._get_workflow(app_model, invoke_from) | |
| return rate_limit.generate( | |
| AdvancedChatAppGenerator().generate( | |
| app_model=app_model, | |
| workflow=workflow, | |
| user=user, | |
| args=args, | |
| invoke_from=invoke_from, | |
| stream=streaming, | |
| ), | |
| request_id, | |
| ) | |
| elif app_model.mode == AppMode.WORKFLOW.value: | |
| workflow = cls._get_workflow(app_model, invoke_from) | |
| return rate_limit.generate( | |
| WorkflowAppGenerator().generate( | |
| app_model=app_model, | |
| workflow=workflow, | |
| user=user, | |
| args=args, | |
| invoke_from=invoke_from, | |
| stream=streaming, | |
| ), | |
| request_id, | |
| ) | |
| else: | |
| raise ValueError(f"Invalid app mode {app_model.mode}") | |
| except RateLimitError as e: | |
| raise InvokeRateLimitError(str(e)) | |
| finally: | |
| if not streaming: | |
| rate_limit.exit(request_id) | |
| def _get_max_active_requests(app_model: App) -> int: | |
| max_active_requests = app_model.max_active_requests | |
| if app_model.max_active_requests is None: | |
| max_active_requests = int(dify_config.APP_MAX_ACTIVE_REQUESTS) | |
| return max_active_requests | |
| def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True): | |
| if app_model.mode == AppMode.ADVANCED_CHAT.value: | |
| workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) | |
| return AdvancedChatAppGenerator().single_iteration_generate( | |
| app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming | |
| ) | |
| elif app_model.mode == AppMode.WORKFLOW.value: | |
| workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) | |
| return WorkflowAppGenerator().single_iteration_generate( | |
| app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming | |
| ) | |
| else: | |
| raise ValueError(f"Invalid app mode {app_model.mode}") | |
| def generate_more_like_this( | |
| cls, | |
| app_model: App, | |
| user: Union[Account, EndUser], | |
| message_id: str, | |
| invoke_from: InvokeFrom, | |
| streaming: bool = True, | |
| ) -> Union[dict, Generator]: | |
| """ | |
| Generate more like this | |
| :param app_model: app model | |
| :param user: user | |
| :param message_id: message id | |
| :param invoke_from: invoke from | |
| :param streaming: streaming | |
| :return: | |
| """ | |
| return CompletionAppGenerator().generate_more_like_this( | |
| app_model=app_model, message_id=message_id, user=user, invoke_from=invoke_from, stream=streaming | |
| ) | |
| def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom) -> Workflow: | |
| """ | |
| Get workflow | |
| :param app_model: app model | |
| :param invoke_from: invoke from | |
| :return: | |
| """ | |
| workflow_service = WorkflowService() | |
| if invoke_from == InvokeFrom.DEBUGGER: | |
| # fetch draft workflow by app_model | |
| workflow = workflow_service.get_draft_workflow(app_model=app_model) | |
| if not workflow: | |
| raise ValueError("Workflow not initialized") | |
| else: | |
| # fetch published workflow by app_model | |
| workflow = workflow_service.get_published_workflow(app_model=app_model) | |
| if not workflow: | |
| raise ValueError("Workflow not published") | |
| return workflow | |