from time import sleep from typing import Optional, Any, Dict, Union, List from phi.docker.api_client import DockerApiClient from phi.docker.resource.base import DockerResource from phi.cli.console import print_info from phi.utils.log import logger class DockerContainerMount(DockerResource): resource_type: str = "ContainerMount" target: str source: str type: str = "volume" read_only: bool = False labels: Optional[Dict[str, Any]] = None class DockerContainer(DockerResource): resource_type: str = "Container" # image (str) – The image to run. image: Optional[str] = None # command (str or list) – The command to run in the container. command: Optional[Union[str, List]] = None # auto_remove (bool) – enable auto-removal of the container when the container’s process exits. auto_remove: bool = True # detach (bool) – Run container in the background and return a Container object. detach: bool = True # entrypoint (str or list) – The entrypoint for the container. entrypoint: Optional[Union[str, List]] = None # environment (dict or list) – Environment variables to set inside the container environment: Optional[Union[Dict[str, Any], List]] = None # group_add (list) – List of additional group names and/or IDs that the container process will run as. group_add: Optional[List[Any]] = None # healthcheck (dict) – Specify a test to perform to check that the container is healthy. healthcheck: Optional[Dict[str, Any]] = None # hostname (str) – Optional hostname for the container. hostname: Optional[str] = None # labels (dict or list) – A dictionary of name-value labels # e.g. {"label1": "value1", "label2": "value2"}) # or a list of names of labels to set with empty values (e.g. ["label1", "label2"]) labels: Optional[Dict[str, Any]] = None # mounts (list) – Specification for mounts to be added to the container. # More powerful alternative to volumes. # Each item in the list is a DockerContainerMount object which is # then converted to a docker.types.Mount object. mounts: Optional[List[DockerContainerMount]] = None # network (str) – Name of the network this container will be connected to at creation time network: Optional[str] = None # network_disabled (bool) – Disable networking. network_disabled: Optional[str] = None # network_mode (str) One of: # bridge - Create a new network stack for the container on on the bridge network. # none - No networking for this container. # container: - Reuse another container’s network stack. # host - Use the host network stack. This mode is incompatible with ports. # network_mode is incompatible with network. network_mode: Optional[str] = None # Platform in the format os[/arch[/variant]]. platform: Optional[str] = None # ports (dict) – Ports to bind inside the container. # The keys of the dictionary are the ports to bind inside the container, # either as an integer or a string in the form port/protocol, where the protocol is either tcp, udp. # # The values of the dictionary are the corresponding ports to open on the host, which can be either: # - The port number, as an integer. # For example, {'2222/tcp': 3333} will expose port 2222 inside the container # as port 3333 on the host. # - None, to assign a random host port. For example, {'2222/tcp': None}. # - A tuple of (address, port) if you want to specify the host interface. # For example, {'1111/tcp': ('127.0.0.1', 1111)}. # - A list of integers, if you want to bind multiple host ports to a single container port. # For example, {'1111/tcp': [1234, 4567]}. ports: Optional[Dict[str, Any]] = None # remove (bool) – Remove the container when it has finished running. Default: False. remove: Optional[bool] = None # Restart the container when it exits. Configured as a dictionary with keys: # Name: One of on-failure, or always. # MaximumRetryCount: Number of times to restart the container on failure. # For example: {"Name": "on-failure", "MaximumRetryCount": 5} restart_policy: Optional[Dict[str, Any]] = None # stdin_open (bool) – Keep STDIN open even if not attached. stdin_open: Optional[bool] = None # stdout (bool) – Return logs from STDOUT when detach=False. Default: True. stdout: Optional[bool] = None # stderr (bool) – Return logs from STDERR when detach=False. Default: False. stderr: Optional[bool] = None # tty (bool) – Allocate a pseudo-TTY. tty: Optional[bool] = None # user (str or int) – Username or UID to run commands as inside the container. user: Optional[Union[str, int]] = None # volumes (dict or list) – # A dictionary to configure volumes mounted inside the container. # The key is either the host path or a volume name, and the value is a dictionary with the keys: # bind - The path to mount the volume inside the container # mode - Either rw to mount the volume read/write, or ro to mount it read-only. # For example: # { # '/home/user1/': {'bind': '/mnt/vol2', 'mode': 'rw'}, # '/var/www': {'bind': '/mnt/vol1', 'mode': 'ro'} # } volumes: Optional[Union[Dict[str, Any], List]] = None # working_dir (str) – Path to the working directory. working_dir: Optional[str] = None devices: Optional[list] = None # Data provided by the resource running on the docker client container_status: Optional[str] = None def run_container(self, docker_client: DockerApiClient) -> Optional[Any]: from docker import DockerClient from docker.errors import ImageNotFound, APIError from rich.progress import Progress, SpinnerColumn, TextColumn print_info("Starting container: {}".format(self.name)) # logger.debug()( # "Args: {}".format( # self.json(indent=2, exclude_unset=True, exclude_none=True) # ) # ) try: _api_client: DockerClient = docker_client.api_client with Progress( SpinnerColumn(spinner_name="dots"), TextColumn("{task.description}"), transient=True ) as progress: if self.pull: try: pull_image_task = progress.add_task("Downloading Image...") # noqa: F841 _api_client.images.pull(self.image, platform=self.platform) progress.update(pull_image_task, completed=True) except Exception as pull_exc: logger.debug(f"Could not pull image: {self.image}: {pull_exc}") run_container_task = progress.add_task("Running Container...") # noqa: F841 container_object = _api_client.containers.run( name=self.name, image=self.image, command=self.command, auto_remove=self.auto_remove, detach=self.detach, entrypoint=self.entrypoint, environment=self.environment, group_add=self.group_add, healthcheck=self.healthcheck, hostname=self.hostname, labels=self.labels, mounts=self.mounts, network=self.network, network_disabled=self.network_disabled, network_mode=self.network_mode, platform=self.platform, ports=self.ports, remove=self.remove, restart_policy=self.restart_policy, stdin_open=self.stdin_open, stdout=self.stdout, stderr=self.stderr, tty=self.tty, user=self.user, volumes=self.volumes, working_dir=self.working_dir, devices=self.devices, ) return container_object except ImageNotFound as img_error: logger.error(f"Image {self.image} not found. Explanation: {img_error.explanation}") raise except APIError as api_err: logger.error(f"APIError: {api_err.explanation}") raise except Exception: raise def _create(self, docker_client: DockerApiClient) -> bool: """Creates the Container Args: docker_client: The DockerApiClient for the current cluster """ from docker.models.containers import Container logger.debug("Creating: {}".format(self.get_resource_name())) container_object: Optional[Container] = self._read(docker_client) # Delete the container if it exists if container_object is not None: print_info(f"Deleting container {container_object.name}") self._delete(docker_client) try: container_object = self.run_container(docker_client) if container_object is not None: logger.debug("Container Created: {}".format(container_object.name)) else: logger.debug("Container could not be created") except Exception: raise # By this step the container should be created # Validate that the container is running logger.debug("Validating container is created...") if container_object is not None: container_object.reload() self.container_status: str = container_object.status print_info("Container Status: {}".format(self.container_status)) if self.container_status == "running": logger.debug("Container is running") return True elif self.container_status == "created": from rich.progress import Progress, SpinnerColumn, TextColumn with Progress( SpinnerColumn(spinner_name="dots"), TextColumn("{task.description}"), transient=True ) as progress: task = progress.add_task("Waiting for container to start", total=None) # noqa: F841 while self.container_status != "created": logger.debug(f"Container Status: {self.container_status}, trying again in 1 seconds") sleep(1) container_object.reload() self.container_status = container_object.status logger.debug(f"Container Status: {self.container_status}") if self.container_status in ("running", "created"): logger.debug("Container Created") self.active_resource = container_object return True logger.debug("Container not found") return False def _read(self, docker_client: DockerApiClient) -> Optional[Any]: """Returns a Container object if the container is active Args: docker_client: The DockerApiClient for the current cluster """ from docker import DockerClient from docker.models.containers import Container logger.debug("Reading: {}".format(self.get_resource_name())) container_name: Optional[str] = self.name try: _api_client: DockerClient = docker_client.api_client container_list: Optional[List[Container]] = _api_client.containers.list( all=True, filters={"name": container_name} ) if container_list is not None: for container in container_list: if container.name == container_name: logger.debug(f"Container {container_name} exists") self.active_resource = container return container except Exception: logger.debug(f"Container {container_name} not found") return None def _update(self, docker_client: DockerApiClient) -> bool: """Updates the Container Args: docker_client: The DockerApiClient for the current cluster """ logger.debug("Updating: {}".format(self.get_resource_name())) return self._create(docker_client=docker_client) def _delete(self, docker_client: DockerApiClient) -> bool: """Deletes the Container Args: docker_client: The DockerApiClient for the current cluster """ from docker.models.containers import Container from docker.errors import NotFound logger.debug("Deleting: {}".format(self.get_resource_name())) container_name: Optional[str] = self.name container_object: Optional[Container] = self._read(docker_client) # Return True if there is no Container to delete if container_object is None: return True # Delete Container try: self.active_resource = None self.container_status = container_object.status logger.debug("Container Status: {}".format(self.container_status)) logger.debug("Stopping Container: {}".format(container_name)) container_object.stop() # If self.remove is set, then the container would be auto removed after being stopped # If self.remove is not set, we need to manually remove the container if not self.remove: logger.debug("Removing Container: {}".format(container_name)) try: container_object.remove() except Exception as remove_exc: logger.debug(f"Could not remove container: {remove_exc}") except Exception as e: logger.exception("Error while deleting container: {}".format(e)) # Validate that the Container is deleted logger.debug("Validating Container is deleted") try: logger.debug("Reloading container_object: {}".format(container_object)) for i in range(10): container_object.reload() logger.debug("Waiting for NotFound Exception...") sleep(1) except NotFound: logger.debug("Got NotFound Exception, container is deleted") return True def is_active(self, docker_client: DockerApiClient) -> bool: """Returns True if the container is running on the docker cluster""" from docker.models.containers import Container container_object: Optional[Container] = self.read(docker_client=docker_client) if container_object is not None: # Check if container is stopped/paused status: str = container_object.status if status in ["exited", "paused"]: logger.debug(f"Container status: {status}") return False return True return False def create(self, docker_client: DockerApiClient) -> bool: # If self.force then always create container if not self.force: # If use_cache is True and container is active then return True if self.use_cache and self.is_active(docker_client=docker_client): print_info(f"{self.get_resource_type()}: {self.get_resource_name()} already exists") return True resource_created = self._create(docker_client=docker_client) if resource_created: print_info(f"{self.get_resource_type()}: {self.get_resource_name()} created") return True logger.error(f"Failed to create {self.get_resource_type()}: {self.get_resource_name()}") return False