Spaces:
Runtime error
Runtime error
File size: 15,840 Bytes
105b369 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
from time import sleep
from typing import Optional, Any, Dict, Union, List
from phi.docker.api_client import DockerApiClient
from phi.docker.resource.base import DockerResource
from phi.cli.console import print_info
from phi.utils.log import logger
class DockerContainerMount(DockerResource):
resource_type: str = "ContainerMount"
target: str
source: str
type: str = "volume"
read_only: bool = False
labels: Optional[Dict[str, Any]] = None
class DockerContainer(DockerResource):
resource_type: str = "Container"
# image (str) β The image to run.
image: Optional[str] = None
# command (str or list) β The command to run in the container.
command: Optional[Union[str, List]] = None
# auto_remove (bool) β enable auto-removal of the container when the containerβs process exits.
auto_remove: bool = True
# detach (bool) β Run container in the background and return a Container object.
detach: bool = True
# entrypoint (str or list) β The entrypoint for the container.
entrypoint: Optional[Union[str, List]] = None
# environment (dict or list) β Environment variables to set inside the container
environment: Optional[Union[Dict[str, Any], List]] = None
# group_add (list) β List of additional group names and/or IDs that the container process will run as.
group_add: Optional[List[Any]] = None
# healthcheck (dict) β Specify a test to perform to check that the container is healthy.
healthcheck: Optional[Dict[str, Any]] = None
# hostname (str) β Optional hostname for the container.
hostname: Optional[str] = None
# labels (dict or list) β A dictionary of name-value labels
# e.g. {"label1": "value1", "label2": "value2"})
# or a list of names of labels to set with empty values (e.g. ["label1", "label2"])
labels: Optional[Dict[str, Any]] = None
# mounts (list) β Specification for mounts to be added to the container.
# More powerful alternative to volumes.
# Each item in the list is a DockerContainerMount object which is
# then converted to a docker.types.Mount object.
mounts: Optional[List[DockerContainerMount]] = None
# network (str) β Name of the network this container will be connected to at creation time
network: Optional[str] = None
# network_disabled (bool) β Disable networking.
network_disabled: Optional[str] = None
# network_mode (str) One of:
# bridge - Create a new network stack for the container on on the bridge network.
# none - No networking for this container.
# container:<name|id> - Reuse another containerβs network stack.
# host - Use the host network stack. This mode is incompatible with ports.
# network_mode is incompatible with network.
network_mode: Optional[str] = None
# Platform in the format os[/arch[/variant]].
platform: Optional[str] = None
# ports (dict) β Ports to bind inside the container.
# The keys of the dictionary are the ports to bind inside the container,
# either as an integer or a string in the form port/protocol, where the protocol is either tcp, udp.
#
# The values of the dictionary are the corresponding ports to open on the host, which can be either:
# - The port number, as an integer.
# For example, {'2222/tcp': 3333} will expose port 2222 inside the container
# as port 3333 on the host.
# - None, to assign a random host port. For example, {'2222/tcp': None}.
# - A tuple of (address, port) if you want to specify the host interface.
# For example, {'1111/tcp': ('127.0.0.1', 1111)}.
# - A list of integers, if you want to bind multiple host ports to a single container port.
# For example, {'1111/tcp': [1234, 4567]}.
ports: Optional[Dict[str, Any]] = None
# remove (bool) β Remove the container when it has finished running. Default: False.
remove: Optional[bool] = None
# Restart the container when it exits. Configured as a dictionary with keys:
# Name: One of on-failure, or always.
# MaximumRetryCount: Number of times to restart the container on failure.
# For example: {"Name": "on-failure", "MaximumRetryCount": 5}
restart_policy: Optional[Dict[str, Any]] = None
# stdin_open (bool) β Keep STDIN open even if not attached.
stdin_open: Optional[bool] = None
# stdout (bool) β Return logs from STDOUT when detach=False. Default: True.
stdout: Optional[bool] = None
# stderr (bool) β Return logs from STDERR when detach=False. Default: False.
stderr: Optional[bool] = None
# tty (bool) β Allocate a pseudo-TTY.
tty: Optional[bool] = None
# user (str or int) β Username or UID to run commands as inside the container.
user: Optional[Union[str, int]] = None
# volumes (dict or list) β
# A dictionary to configure volumes mounted inside the container.
# The key is either the host path or a volume name, and the value is a dictionary with the keys:
# bind - The path to mount the volume inside the container
# mode - Either rw to mount the volume read/write, or ro to mount it read-only.
# For example:
# {
# '/home/user1/': {'bind': '/mnt/vol2', 'mode': 'rw'},
# '/var/www': {'bind': '/mnt/vol1', 'mode': 'ro'}
# }
volumes: Optional[Union[Dict[str, Any], List]] = None
# working_dir (str) β Path to the working directory.
working_dir: Optional[str] = None
devices: Optional[list] = None
# Data provided by the resource running on the docker client
container_status: Optional[str] = None
def run_container(self, docker_client: DockerApiClient) -> Optional[Any]:
from docker import DockerClient
from docker.errors import ImageNotFound, APIError
from rich.progress import Progress, SpinnerColumn, TextColumn
print_info("Starting container: {}".format(self.name))
# logger.debug()(
# "Args: {}".format(
# self.json(indent=2, exclude_unset=True, exclude_none=True)
# )
# )
try:
_api_client: DockerClient = docker_client.api_client
with Progress(
SpinnerColumn(spinner_name="dots"), TextColumn("{task.description}"), transient=True
) as progress:
if self.pull:
try:
pull_image_task = progress.add_task("Downloading Image...") # noqa: F841
_api_client.images.pull(self.image, platform=self.platform)
progress.update(pull_image_task, completed=True)
except Exception as pull_exc:
logger.debug(f"Could not pull image: {self.image}: {pull_exc}")
run_container_task = progress.add_task("Running Container...") # noqa: F841
container_object = _api_client.containers.run(
name=self.name,
image=self.image,
command=self.command,
auto_remove=self.auto_remove,
detach=self.detach,
entrypoint=self.entrypoint,
environment=self.environment,
group_add=self.group_add,
healthcheck=self.healthcheck,
hostname=self.hostname,
labels=self.labels,
mounts=self.mounts,
network=self.network,
network_disabled=self.network_disabled,
network_mode=self.network_mode,
platform=self.platform,
ports=self.ports,
remove=self.remove,
restart_policy=self.restart_policy,
stdin_open=self.stdin_open,
stdout=self.stdout,
stderr=self.stderr,
tty=self.tty,
user=self.user,
volumes=self.volumes,
working_dir=self.working_dir,
devices=self.devices,
)
return container_object
except ImageNotFound as img_error:
logger.error(f"Image {self.image} not found. Explanation: {img_error.explanation}")
raise
except APIError as api_err:
logger.error(f"APIError: {api_err.explanation}")
raise
except Exception:
raise
def _create(self, docker_client: DockerApiClient) -> bool:
"""Creates the Container
Args:
docker_client: The DockerApiClient for the current cluster
"""
from docker.models.containers import Container
logger.debug("Creating: {}".format(self.get_resource_name()))
container_object: Optional[Container] = self._read(docker_client)
# Delete the container if it exists
if container_object is not None:
print_info(f"Deleting container {container_object.name}")
self._delete(docker_client)
try:
container_object = self.run_container(docker_client)
if container_object is not None:
logger.debug("Container Created: {}".format(container_object.name))
else:
logger.debug("Container could not be created")
except Exception:
raise
# By this step the container should be created
# Validate that the container is running
logger.debug("Validating container is created...")
if container_object is not None:
container_object.reload()
self.container_status: str = container_object.status
print_info("Container Status: {}".format(self.container_status))
if self.container_status == "running":
logger.debug("Container is running")
return True
elif self.container_status == "created":
from rich.progress import Progress, SpinnerColumn, TextColumn
with Progress(
SpinnerColumn(spinner_name="dots"), TextColumn("{task.description}"), transient=True
) as progress:
task = progress.add_task("Waiting for container to start", total=None) # noqa: F841
while self.container_status != "created":
logger.debug(f"Container Status: {self.container_status}, trying again in 1 seconds")
sleep(1)
container_object.reload()
self.container_status = container_object.status
logger.debug(f"Container Status: {self.container_status}")
if self.container_status in ("running", "created"):
logger.debug("Container Created")
self.active_resource = container_object
return True
logger.debug("Container not found")
return False
def _read(self, docker_client: DockerApiClient) -> Optional[Any]:
"""Returns a Container object if the container is active
Args:
docker_client: The DockerApiClient for the current cluster
"""
from docker import DockerClient
from docker.models.containers import Container
logger.debug("Reading: {}".format(self.get_resource_name()))
container_name: Optional[str] = self.name
try:
_api_client: DockerClient = docker_client.api_client
container_list: Optional[List[Container]] = _api_client.containers.list(
all=True, filters={"name": container_name}
)
if container_list is not None:
for container in container_list:
if container.name == container_name:
logger.debug(f"Container {container_name} exists")
self.active_resource = container
return container
except Exception:
logger.debug(f"Container {container_name} not found")
return None
def _update(self, docker_client: DockerApiClient) -> bool:
"""Updates the Container
Args:
docker_client: The DockerApiClient for the current cluster
"""
logger.debug("Updating: {}".format(self.get_resource_name()))
return self._create(docker_client=docker_client)
def _delete(self, docker_client: DockerApiClient) -> bool:
"""Deletes the Container
Args:
docker_client: The DockerApiClient for the current cluster
"""
from docker.models.containers import Container
from docker.errors import NotFound
logger.debug("Deleting: {}".format(self.get_resource_name()))
container_name: Optional[str] = self.name
container_object: Optional[Container] = self._read(docker_client)
# Return True if there is no Container to delete
if container_object is None:
return True
# Delete Container
try:
self.active_resource = None
self.container_status = container_object.status
logger.debug("Container Status: {}".format(self.container_status))
logger.debug("Stopping Container: {}".format(container_name))
container_object.stop()
# If self.remove is set, then the container would be auto removed after being stopped
# If self.remove is not set, we need to manually remove the container
if not self.remove:
logger.debug("Removing Container: {}".format(container_name))
try:
container_object.remove()
except Exception as remove_exc:
logger.debug(f"Could not remove container: {remove_exc}")
except Exception as e:
logger.exception("Error while deleting container: {}".format(e))
# Validate that the Container is deleted
logger.debug("Validating Container is deleted")
try:
logger.debug("Reloading container_object: {}".format(container_object))
for i in range(10):
container_object.reload()
logger.debug("Waiting for NotFound Exception...")
sleep(1)
except NotFound:
logger.debug("Got NotFound Exception, container is deleted")
return True
def is_active(self, docker_client: DockerApiClient) -> bool:
"""Returns True if the container is running on the docker cluster"""
from docker.models.containers import Container
container_object: Optional[Container] = self.read(docker_client=docker_client)
if container_object is not None:
# Check if container is stopped/paused
status: str = container_object.status
if status in ["exited", "paused"]:
logger.debug(f"Container status: {status}")
return False
return True
return False
def create(self, docker_client: DockerApiClient) -> bool:
# If self.force then always create container
if not self.force:
# If use_cache is True and container is active then return True
if self.use_cache and self.is_active(docker_client=docker_client):
print_info(f"{self.get_resource_type()}: {self.get_resource_name()} already exists")
return True
resource_created = self._create(docker_client=docker_client)
if resource_created:
print_info(f"{self.get_resource_type()}: {self.get_resource_name()} created")
return True
logger.error(f"Failed to create {self.get_resource_type()}: {self.get_resource_name()}")
return False
|