AmmarFahmy
adding all files
105b369
from time import sleep
from typing import Optional, Any, Dict, Union, List
from phi.docker.api_client import DockerApiClient
from phi.docker.resource.base import DockerResource
from phi.cli.console import print_info
from phi.utils.log import logger
class DockerContainerMount(DockerResource):
resource_type: str = "ContainerMount"
target: str
source: str
type: str = "volume"
read_only: bool = False
labels: Optional[Dict[str, Any]] = None
class DockerContainer(DockerResource):
resource_type: str = "Container"
# image (str) – The image to run.
image: Optional[str] = None
# command (str or list) – The command to run in the container.
command: Optional[Union[str, List]] = None
# auto_remove (bool) – enable auto-removal of the container when the container’s process exits.
auto_remove: bool = True
# detach (bool) – Run container in the background and return a Container object.
detach: bool = True
# entrypoint (str or list) – The entrypoint for the container.
entrypoint: Optional[Union[str, List]] = None
# environment (dict or list) – Environment variables to set inside the container
environment: Optional[Union[Dict[str, Any], List]] = None
# group_add (list) – List of additional group names and/or IDs that the container process will run as.
group_add: Optional[List[Any]] = None
# healthcheck (dict) – Specify a test to perform to check that the container is healthy.
healthcheck: Optional[Dict[str, Any]] = None
# hostname (str) – Optional hostname for the container.
hostname: Optional[str] = None
# labels (dict or list) – A dictionary of name-value labels
# e.g. {"label1": "value1", "label2": "value2"})
# or a list of names of labels to set with empty values (e.g. ["label1", "label2"])
labels: Optional[Dict[str, Any]] = None
# mounts (list) – Specification for mounts to be added to the container.
# More powerful alternative to volumes.
# Each item in the list is a DockerContainerMount object which is
# then converted to a docker.types.Mount object.
mounts: Optional[List[DockerContainerMount]] = None
# network (str) – Name of the network this container will be connected to at creation time
network: Optional[str] = None
# network_disabled (bool) – Disable networking.
network_disabled: Optional[str] = None
# network_mode (str) One of:
# bridge - Create a new network stack for the container on on the bridge network.
# none - No networking for this container.
# container:<name|id> - Reuse another container’s network stack.
# host - Use the host network stack. This mode is incompatible with ports.
# network_mode is incompatible with network.
network_mode: Optional[str] = None
# Platform in the format os[/arch[/variant]].
platform: Optional[str] = None
# ports (dict) – Ports to bind inside the container.
# The keys of the dictionary are the ports to bind inside the container,
# either as an integer or a string in the form port/protocol, where the protocol is either tcp, udp.
#
# The values of the dictionary are the corresponding ports to open on the host, which can be either:
# - The port number, as an integer.
# For example, {'2222/tcp': 3333} will expose port 2222 inside the container
# as port 3333 on the host.
# - None, to assign a random host port. For example, {'2222/tcp': None}.
# - A tuple of (address, port) if you want to specify the host interface.
# For example, {'1111/tcp': ('127.0.0.1', 1111)}.
# - A list of integers, if you want to bind multiple host ports to a single container port.
# For example, {'1111/tcp': [1234, 4567]}.
ports: Optional[Dict[str, Any]] = None
# remove (bool) – Remove the container when it has finished running. Default: False.
remove: Optional[bool] = None
# Restart the container when it exits. Configured as a dictionary with keys:
# Name: One of on-failure, or always.
# MaximumRetryCount: Number of times to restart the container on failure.
# For example: {"Name": "on-failure", "MaximumRetryCount": 5}
restart_policy: Optional[Dict[str, Any]] = None
# stdin_open (bool) – Keep STDIN open even if not attached.
stdin_open: Optional[bool] = None
# stdout (bool) – Return logs from STDOUT when detach=False. Default: True.
stdout: Optional[bool] = None
# stderr (bool) – Return logs from STDERR when detach=False. Default: False.
stderr: Optional[bool] = None
# tty (bool) – Allocate a pseudo-TTY.
tty: Optional[bool] = None
# user (str or int) – Username or UID to run commands as inside the container.
user: Optional[Union[str, int]] = None
# volumes (dict or list) –
# A dictionary to configure volumes mounted inside the container.
# The key is either the host path or a volume name, and the value is a dictionary with the keys:
# bind - The path to mount the volume inside the container
# mode - Either rw to mount the volume read/write, or ro to mount it read-only.
# For example:
# {
# '/home/user1/': {'bind': '/mnt/vol2', 'mode': 'rw'},
# '/var/www': {'bind': '/mnt/vol1', 'mode': 'ro'}
# }
volumes: Optional[Union[Dict[str, Any], List]] = None
# working_dir (str) – Path to the working directory.
working_dir: Optional[str] = None
devices: Optional[list] = None
# Data provided by the resource running on the docker client
container_status: Optional[str] = None
def run_container(self, docker_client: DockerApiClient) -> Optional[Any]:
from docker import DockerClient
from docker.errors import ImageNotFound, APIError
from rich.progress import Progress, SpinnerColumn, TextColumn
print_info("Starting container: {}".format(self.name))
# logger.debug()(
# "Args: {}".format(
# self.json(indent=2, exclude_unset=True, exclude_none=True)
# )
# )
try:
_api_client: DockerClient = docker_client.api_client
with Progress(
SpinnerColumn(spinner_name="dots"), TextColumn("{task.description}"), transient=True
) as progress:
if self.pull:
try:
pull_image_task = progress.add_task("Downloading Image...") # noqa: F841
_api_client.images.pull(self.image, platform=self.platform)
progress.update(pull_image_task, completed=True)
except Exception as pull_exc:
logger.debug(f"Could not pull image: {self.image}: {pull_exc}")
run_container_task = progress.add_task("Running Container...") # noqa: F841
container_object = _api_client.containers.run(
name=self.name,
image=self.image,
command=self.command,
auto_remove=self.auto_remove,
detach=self.detach,
entrypoint=self.entrypoint,
environment=self.environment,
group_add=self.group_add,
healthcheck=self.healthcheck,
hostname=self.hostname,
labels=self.labels,
mounts=self.mounts,
network=self.network,
network_disabled=self.network_disabled,
network_mode=self.network_mode,
platform=self.platform,
ports=self.ports,
remove=self.remove,
restart_policy=self.restart_policy,
stdin_open=self.stdin_open,
stdout=self.stdout,
stderr=self.stderr,
tty=self.tty,
user=self.user,
volumes=self.volumes,
working_dir=self.working_dir,
devices=self.devices,
)
return container_object
except ImageNotFound as img_error:
logger.error(f"Image {self.image} not found. Explanation: {img_error.explanation}")
raise
except APIError as api_err:
logger.error(f"APIError: {api_err.explanation}")
raise
except Exception:
raise
def _create(self, docker_client: DockerApiClient) -> bool:
"""Creates the Container
Args:
docker_client: The DockerApiClient for the current cluster
"""
from docker.models.containers import Container
logger.debug("Creating: {}".format(self.get_resource_name()))
container_object: Optional[Container] = self._read(docker_client)
# Delete the container if it exists
if container_object is not None:
print_info(f"Deleting container {container_object.name}")
self._delete(docker_client)
try:
container_object = self.run_container(docker_client)
if container_object is not None:
logger.debug("Container Created: {}".format(container_object.name))
else:
logger.debug("Container could not be created")
except Exception:
raise
# By this step the container should be created
# Validate that the container is running
logger.debug("Validating container is created...")
if container_object is not None:
container_object.reload()
self.container_status: str = container_object.status
print_info("Container Status: {}".format(self.container_status))
if self.container_status == "running":
logger.debug("Container is running")
return True
elif self.container_status == "created":
from rich.progress import Progress, SpinnerColumn, TextColumn
with Progress(
SpinnerColumn(spinner_name="dots"), TextColumn("{task.description}"), transient=True
) as progress:
task = progress.add_task("Waiting for container to start", total=None) # noqa: F841
while self.container_status != "created":
logger.debug(f"Container Status: {self.container_status}, trying again in 1 seconds")
sleep(1)
container_object.reload()
self.container_status = container_object.status
logger.debug(f"Container Status: {self.container_status}")
if self.container_status in ("running", "created"):
logger.debug("Container Created")
self.active_resource = container_object
return True
logger.debug("Container not found")
return False
def _read(self, docker_client: DockerApiClient) -> Optional[Any]:
"""Returns a Container object if the container is active
Args:
docker_client: The DockerApiClient for the current cluster
"""
from docker import DockerClient
from docker.models.containers import Container
logger.debug("Reading: {}".format(self.get_resource_name()))
container_name: Optional[str] = self.name
try:
_api_client: DockerClient = docker_client.api_client
container_list: Optional[List[Container]] = _api_client.containers.list(
all=True, filters={"name": container_name}
)
if container_list is not None:
for container in container_list:
if container.name == container_name:
logger.debug(f"Container {container_name} exists")
self.active_resource = container
return container
except Exception:
logger.debug(f"Container {container_name} not found")
return None
def _update(self, docker_client: DockerApiClient) -> bool:
"""Updates the Container
Args:
docker_client: The DockerApiClient for the current cluster
"""
logger.debug("Updating: {}".format(self.get_resource_name()))
return self._create(docker_client=docker_client)
def _delete(self, docker_client: DockerApiClient) -> bool:
"""Deletes the Container
Args:
docker_client: The DockerApiClient for the current cluster
"""
from docker.models.containers import Container
from docker.errors import NotFound
logger.debug("Deleting: {}".format(self.get_resource_name()))
container_name: Optional[str] = self.name
container_object: Optional[Container] = self._read(docker_client)
# Return True if there is no Container to delete
if container_object is None:
return True
# Delete Container
try:
self.active_resource = None
self.container_status = container_object.status
logger.debug("Container Status: {}".format(self.container_status))
logger.debug("Stopping Container: {}".format(container_name))
container_object.stop()
# If self.remove is set, then the container would be auto removed after being stopped
# If self.remove is not set, we need to manually remove the container
if not self.remove:
logger.debug("Removing Container: {}".format(container_name))
try:
container_object.remove()
except Exception as remove_exc:
logger.debug(f"Could not remove container: {remove_exc}")
except Exception as e:
logger.exception("Error while deleting container: {}".format(e))
# Validate that the Container is deleted
logger.debug("Validating Container is deleted")
try:
logger.debug("Reloading container_object: {}".format(container_object))
for i in range(10):
container_object.reload()
logger.debug("Waiting for NotFound Exception...")
sleep(1)
except NotFound:
logger.debug("Got NotFound Exception, container is deleted")
return True
def is_active(self, docker_client: DockerApiClient) -> bool:
"""Returns True if the container is running on the docker cluster"""
from docker.models.containers import Container
container_object: Optional[Container] = self.read(docker_client=docker_client)
if container_object is not None:
# Check if container is stopped/paused
status: str = container_object.status
if status in ["exited", "paused"]:
logger.debug(f"Container status: {status}")
return False
return True
return False
def create(self, docker_client: DockerApiClient) -> bool:
# If self.force then always create container
if not self.force:
# If use_cache is True and container is active then return True
if self.use_cache and self.is_active(docker_client=docker_client):
print_info(f"{self.get_resource_type()}: {self.get_resource_name()} already exists")
return True
resource_created = self._create(docker_client=docker_client)
if resource_created:
print_info(f"{self.get_resource_type()}: {self.get_resource_name()} created")
return True
logger.error(f"Failed to create {self.get_resource_type()}: {self.get_resource_name()}")
return False