Spaces:
Runtime error
Runtime error
File size: 1,897 Bytes
105b369 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
from typing import Optional, Union, List, Dict
from phi.docker.app.airflow.base import AirflowBase, ContainerContext
class AirflowWorker(AirflowBase):
# -*- App Name
name: str = "airflow-worker"
# Command for the container
command: Optional[Union[str, List[str]]] = "worker"
# Queue name for the worker
queue_name: str = "default"
# Open the worker_log_port if open_worker_log_port=True
# When you start an airflow worker, airflow starts a tiny web server subprocess to serve the workers
# local log files to the airflow main web server, which then builds pages and sends them to users.
# This defines the port on which the logs are served. It needs to be unused, and open visible from
# the main web server to connect into the workers.
open_worker_log_port: bool = True
# Worker log port number on the container
worker_log_port: int = 8793
# Worker log port number on the container
worker_log_host_port: Optional[int] = None
def get_container_env(self, container_context: ContainerContext) -> Dict[str, str]:
container_env: Dict[str, str] = super().get_container_env(container_context=container_context)
# Set the queue name
container_env["QUEUE_NAME"] = self.queue_name
# Set the worker log port
if self.open_worker_log_port:
container_env["AIRFLOW__LOGGING__WORKER_LOG_SERVER_PORT"] = str(self.worker_log_port)
return container_env
def get_container_ports(self) -> Dict[str, int]:
container_ports: Dict[str, int] = super().get_container_ports()
# if open_worker_log_port = True, open the worker_log_port_number
if self.open_worker_log_port and self.worker_log_host_port is not None:
# Open the port
container_ports[str(self.worker_log_port)] = self.worker_log_host_port
return container_ports
|