File size: 15,840 Bytes
105b369
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
from time import sleep
from typing import Optional, Any, Dict, Union, List

from phi.docker.api_client import DockerApiClient
from phi.docker.resource.base import DockerResource
from phi.cli.console import print_info
from phi.utils.log import logger


class DockerContainerMount(DockerResource):
    resource_type: str = "ContainerMount"

    target: str
    source: str
    type: str = "volume"
    read_only: bool = False
    labels: Optional[Dict[str, Any]] = None


class DockerContainer(DockerResource):
    resource_type: str = "Container"

    # image (str) – The image to run.
    image: Optional[str] = None
    # command (str or list) – The command to run in the container.
    command: Optional[Union[str, List]] = None
    # auto_remove (bool) – enable auto-removal of the container when the container’s process exits.
    auto_remove: bool = True
    # detach (bool) – Run container in the background and return a Container object.
    detach: bool = True
    # entrypoint (str or list) – The entrypoint for the container.
    entrypoint: Optional[Union[str, List]] = None
    # environment (dict or list) – Environment variables to set inside the container
    environment: Optional[Union[Dict[str, Any], List]] = None
    # group_add (list) – List of additional group names and/or IDs that the container process will run as.
    group_add: Optional[List[Any]] = None
    # healthcheck (dict) – Specify a test to perform to check that the container is healthy.
    healthcheck: Optional[Dict[str, Any]] = None
    # hostname (str) – Optional hostname for the container.
    hostname: Optional[str] = None
    # labels (dict or list) – A dictionary of name-value labels
    # e.g. {"label1": "value1", "label2": "value2"})
    # or a list of names of labels to set with empty values (e.g. ["label1", "label2"])
    labels: Optional[Dict[str, Any]] = None
    # mounts (list) – Specification for mounts to be added to the container.
    # More powerful alternative to volumes.
    # Each item in the list is a DockerContainerMount object which is
    # then converted to a docker.types.Mount object.
    mounts: Optional[List[DockerContainerMount]] = None
    # network (str) – Name of the network this container will be connected to at creation time
    network: Optional[str] = None
    # network_disabled (bool) – Disable networking.
    network_disabled: Optional[str] = None
    # network_mode (str) One of:
    # bridge - Create a new network stack for the container on on the bridge network.
    # none - No networking for this container.
    # container:<name|id> - Reuse another container’s network stack.
    # host - Use the host network stack. This mode is incompatible with ports.
    # network_mode is incompatible with network.
    network_mode: Optional[str] = None
    # Platform in the format os[/arch[/variant]].
    platform: Optional[str] = None
    # ports (dict) – Ports to bind inside the container.
    # The keys of the dictionary are the ports to bind inside the container,
    # either as an integer or a string in the form port/protocol, where the protocol is either tcp, udp.
    #
    # The values of the dictionary are the corresponding ports to open on the host, which can be either:
    #   - The port number, as an integer.
    #       For example, {'2222/tcp': 3333} will expose port 2222 inside the container
    #       as port 3333 on the host.
    #   - None, to assign a random host port. For example, {'2222/tcp': None}.
    #   - A tuple of (address, port) if you want to specify the host interface.
    #       For example, {'1111/tcp': ('127.0.0.1', 1111)}.
    #   - A list of integers, if you want to bind multiple host ports to a single container port.
    #       For example, {'1111/tcp': [1234, 4567]}.
    ports: Optional[Dict[str, Any]] = None
    # remove (bool) – Remove the container when it has finished running. Default: False.
    remove: Optional[bool] = None
    # Restart the container when it exits. Configured as a dictionary with keys:
    # Name: One of on-failure, or always.
    # MaximumRetryCount: Number of times to restart the container on failure.
    # For example: {"Name": "on-failure", "MaximumRetryCount": 5}
    restart_policy: Optional[Dict[str, Any]] = None
    # stdin_open (bool) – Keep STDIN open even if not attached.
    stdin_open: Optional[bool] = None
    # stdout (bool) – Return logs from STDOUT when detach=False. Default: True.
    stdout: Optional[bool] = None
    # stderr (bool) – Return logs from STDERR when detach=False. Default: False.
    stderr: Optional[bool] = None
    # tty (bool) – Allocate a pseudo-TTY.
    tty: Optional[bool] = None
    # user (str or int) – Username or UID to run commands as inside the container.
    user: Optional[Union[str, int]] = None
    # volumes (dict or list) –
    # A dictionary to configure volumes mounted inside the container.
    # The key is either the host path or a volume name, and the value is a dictionary with the keys:
    # bind - The path to mount the volume inside the container
    # mode - Either rw to mount the volume read/write, or ro to mount it read-only.
    # For example:
    # {
    #   '/home/user1/': {'bind': '/mnt/vol2', 'mode': 'rw'},
    #   '/var/www': {'bind': '/mnt/vol1', 'mode': 'ro'}
    # }
    volumes: Optional[Union[Dict[str, Any], List]] = None
    # working_dir (str) – Path to the working directory.
    working_dir: Optional[str] = None
    devices: Optional[list] = None

    # Data provided by the resource running on the docker client
    container_status: Optional[str] = None

    def run_container(self, docker_client: DockerApiClient) -> Optional[Any]:
        from docker import DockerClient
        from docker.errors import ImageNotFound, APIError
        from rich.progress import Progress, SpinnerColumn, TextColumn

        print_info("Starting container: {}".format(self.name))
        # logger.debug()(
        #     "Args: {}".format(
        #         self.json(indent=2, exclude_unset=True, exclude_none=True)
        #     )
        # )
        try:
            _api_client: DockerClient = docker_client.api_client
            with Progress(
                SpinnerColumn(spinner_name="dots"), TextColumn("{task.description}"), transient=True
            ) as progress:
                if self.pull:
                    try:
                        pull_image_task = progress.add_task("Downloading Image...")  # noqa: F841
                        _api_client.images.pull(self.image, platform=self.platform)
                        progress.update(pull_image_task, completed=True)
                    except Exception as pull_exc:
                        logger.debug(f"Could not pull image: {self.image}: {pull_exc}")
                run_container_task = progress.add_task("Running Container...")  # noqa: F841
                container_object = _api_client.containers.run(
                    name=self.name,
                    image=self.image,
                    command=self.command,
                    auto_remove=self.auto_remove,
                    detach=self.detach,
                    entrypoint=self.entrypoint,
                    environment=self.environment,
                    group_add=self.group_add,
                    healthcheck=self.healthcheck,
                    hostname=self.hostname,
                    labels=self.labels,
                    mounts=self.mounts,
                    network=self.network,
                    network_disabled=self.network_disabled,
                    network_mode=self.network_mode,
                    platform=self.platform,
                    ports=self.ports,
                    remove=self.remove,
                    restart_policy=self.restart_policy,
                    stdin_open=self.stdin_open,
                    stdout=self.stdout,
                    stderr=self.stderr,
                    tty=self.tty,
                    user=self.user,
                    volumes=self.volumes,
                    working_dir=self.working_dir,
                    devices=self.devices,
                )
                return container_object
        except ImageNotFound as img_error:
            logger.error(f"Image {self.image} not found. Explanation: {img_error.explanation}")
            raise
        except APIError as api_err:
            logger.error(f"APIError: {api_err.explanation}")
            raise
        except Exception:
            raise

    def _create(self, docker_client: DockerApiClient) -> bool:
        """Creates the Container

        Args:
            docker_client: The DockerApiClient for the current cluster
        """
        from docker.models.containers import Container

        logger.debug("Creating: {}".format(self.get_resource_name()))
        container_object: Optional[Container] = self._read(docker_client)

        # Delete the container if it exists
        if container_object is not None:
            print_info(f"Deleting container {container_object.name}")
            self._delete(docker_client)

        try:
            container_object = self.run_container(docker_client)
            if container_object is not None:
                logger.debug("Container Created: {}".format(container_object.name))
            else:
                logger.debug("Container could not be created")
        except Exception:
            raise

        # By this step the container should be created
        # Validate that the container is running
        logger.debug("Validating container is created...")
        if container_object is not None:
            container_object.reload()
            self.container_status: str = container_object.status
            print_info("Container Status: {}".format(self.container_status))

            if self.container_status == "running":
                logger.debug("Container is running")
                return True
            elif self.container_status == "created":
                from rich.progress import Progress, SpinnerColumn, TextColumn

                with Progress(
                    SpinnerColumn(spinner_name="dots"), TextColumn("{task.description}"), transient=True
                ) as progress:
                    task = progress.add_task("Waiting for container to start", total=None)  # noqa: F841
                    while self.container_status != "created":
                        logger.debug(f"Container Status: {self.container_status}, trying again in 1 seconds")
                        sleep(1)
                        container_object.reload()
                        self.container_status = container_object.status
                    logger.debug(f"Container Status: {self.container_status}")

            if self.container_status in ("running", "created"):
                logger.debug("Container Created")
                self.active_resource = container_object
                return True

        logger.debug("Container not found")
        return False

    def _read(self, docker_client: DockerApiClient) -> Optional[Any]:
        """Returns a Container object if the container is active

        Args:
            docker_client: The DockerApiClient for the current cluster
        """
        from docker import DockerClient
        from docker.models.containers import Container

        logger.debug("Reading: {}".format(self.get_resource_name()))
        container_name: Optional[str] = self.name
        try:
            _api_client: DockerClient = docker_client.api_client
            container_list: Optional[List[Container]] = _api_client.containers.list(
                all=True, filters={"name": container_name}
            )
            if container_list is not None:
                for container in container_list:
                    if container.name == container_name:
                        logger.debug(f"Container {container_name} exists")
                        self.active_resource = container
                        return container
        except Exception:
            logger.debug(f"Container {container_name} not found")
        return None

    def _update(self, docker_client: DockerApiClient) -> bool:
        """Updates the Container

        Args:
            docker_client: The DockerApiClient for the current cluster
        """
        logger.debug("Updating: {}".format(self.get_resource_name()))
        return self._create(docker_client=docker_client)

    def _delete(self, docker_client: DockerApiClient) -> bool:
        """Deletes the Container

        Args:
            docker_client: The DockerApiClient for the current cluster
        """
        from docker.models.containers import Container
        from docker.errors import NotFound

        logger.debug("Deleting: {}".format(self.get_resource_name()))
        container_name: Optional[str] = self.name
        container_object: Optional[Container] = self._read(docker_client)
        # Return True if there is no Container to delete
        if container_object is None:
            return True

        # Delete Container
        try:
            self.active_resource = None
            self.container_status = container_object.status
            logger.debug("Container Status: {}".format(self.container_status))
            logger.debug("Stopping Container: {}".format(container_name))
            container_object.stop()
            # If self.remove is set, then the container would be auto removed after being stopped
            # If self.remove is not set, we need to manually remove the container
            if not self.remove:
                logger.debug("Removing Container: {}".format(container_name))
                try:
                    container_object.remove()
                except Exception as remove_exc:
                    logger.debug(f"Could not remove container: {remove_exc}")
        except Exception as e:
            logger.exception("Error while deleting container: {}".format(e))

        # Validate that the Container is deleted
        logger.debug("Validating Container is deleted")
        try:
            logger.debug("Reloading container_object: {}".format(container_object))
            for i in range(10):
                container_object.reload()
                logger.debug("Waiting for NotFound Exception...")
                sleep(1)
        except NotFound:
            logger.debug("Got NotFound Exception, container is deleted")

        return True

    def is_active(self, docker_client: DockerApiClient) -> bool:
        """Returns True if the container is running on the docker cluster"""
        from docker.models.containers import Container

        container_object: Optional[Container] = self.read(docker_client=docker_client)
        if container_object is not None:
            # Check if container is stopped/paused
            status: str = container_object.status
            if status in ["exited", "paused"]:
                logger.debug(f"Container status: {status}")
                return False
            return True
        return False

    def create(self, docker_client: DockerApiClient) -> bool:
        # If self.force then always create container
        if not self.force:
            # If use_cache is True and container is active then return True
            if self.use_cache and self.is_active(docker_client=docker_client):
                print_info(f"{self.get_resource_type()}: {self.get_resource_name()} already exists")
                return True

        resource_created = self._create(docker_client=docker_client)
        if resource_created:
            print_info(f"{self.get_resource_type()}: {self.get_resource_name()} created")
            return True
        logger.error(f"Failed to create {self.get_resource_type()}: {self.get_resource_name()}")
        return False