File size: 13,767 Bytes
e8faf30
 
 
 
 
 
 
 
 
 
 
54d363d
39adffa
 
e8faf30
 
0dbfcb8
374a5b3
e8faf30
 
 
8ed3901
 
e8faf30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0dbfcb8
e8faf30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ccc4e99
e8faf30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ccc4e99
 
 
 
39adffa
 
ccc4e99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e8faf30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cd14c77
e8faf30
 
 
cd14c77
e8faf30
 
073334d
cd14c77
073334d
 
dae7ab5
cd14c77
dae7ab5
073334d
cd14c77
dae7ab5
0f7ecda
cd14c77
e8faf30
 
cd14c77
e8faf30
 
cd14c77
e8faf30
 
374a5b3
 
 
 
 
e8faf30
 
cec58ea
ab10ce8
e8faf30
ab10ce8
ade4ac9
 
 
cec58ea
ab10ce8
e8faf30
 
 
ab10ce8
 
ade4ac9
 
cec58ea
ab10ce8
e8faf30
ab10ce8
 
ade4ac9
 
0dbfcb8
 
 
1dfe4c5
 
 
8ed3901
1dfe4c5
374a5b3
0dbfcb8
374a5b3
1dfe4c5
374a5b3
 
 
0dbfcb8
 
 
 
 
e8faf30
cec58ea
e8faf30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
281ccac
e8faf30
 
 
 
 
 
 
 
 
 
54d363d
e8faf30
 
 
 
 
 
 
 
b26ff1b
e8faf30
 
 
 
 
 
 
b26ff1b
cec58ea
e8faf30
cec58ea
e8faf30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
import io
from typing import Any, Dict, List

import cv2
import tempfile
import numpy as np
import requests
import torch
from PIL import Image
from transformers import AutoTokenizer, XCLIPModel, XCLIPProcessor
from huggingface_hub import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from decord import VideoReader
from decord import cpu

import timeit
import easyocr
import json

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

EXTRACT_EVERY_X_FRAMES = 6


class EndpointHandler:
    def __init__(self, path=""):
        # Preload all the elements you are going to need at inference.
        # pseudo
        # self.model = load_model(path)
        model_id = "microsoft/xclip-base-patch16-zero-shot"
        # self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.processor = XCLIPProcessor.from_pretrained(path)
        self.model = XCLIPModel.from_pretrained(path).to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained(path)

        logging.set_verbosity_debug()
        self.logger = logging.get_logger(__name__)
        self.reader = easyocr.Reader(["de", "en"])  # Add more languages if needed
        # Check if CUDA (GPU support) is available
        if torch.cuda.is_available():
            self.logger.info("GPU is available for inference.")
            self.logger.info(f"Using {torch.cuda.get_device_name(0)}")
        else:
            self.logger.info("GPU is not available, using CPU for inference.")

    def download_video_as_bytes(self, url: str) -> (bytes, dict):
        """
        Download a video from a given URL, load it in RAM, and return it as bytes.

        Parameters:
        - url (str): The URL of the video to download.

        Returns:
        - bytes or None: The video content as bytes if successful, None otherwise.
        - dict or None: The video download headers if succesful, None otherwise.
        """
        try:
            response = requests.get(url)
            response.raise_for_status()  # Raise an exception for HTTP errors
            return response.content, response.headers
        except requests.RequestException as e:
            print(f"Error downloading the video: {e}")
            return None, None

    def extract_evenly_spaced_frames_from_bytes_cv2(
        self, video_bytes: bytes, num_frames: int = 32
    ) -> list:
        # Write bytes to a temporary file
        with tempfile.NamedTemporaryFile(delete=True, suffix=".mp4") as temp_video:
            temp_video.write(video_bytes)
            temp_video.flush()

            # Create a VideoCapture object using the temporary file's name
            vidcap = cv2.VideoCapture(temp_video.name)

            # Get the total number of frames in the video
            total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))

            # Calculate the interval at which frames should be extracted
            interval = total_frames // num_frames

            frames = []

            for i in range(num_frames):
                # Set the video position to the next frame to be captured
                vidcap.set(cv2.CAP_PROP_POS_FRAMES, i * interval)

                # Read the frame
                success, image = vidcap.read()

                # If successfully read, add the frame to the list
                if success:
                    frames.append(image)

            return frames

    def extract_evenly_spaced_frames_from_bytes(
        self, video_bytes: bytes, num_frames: int = 32
    ) -> list:
        # Create a VideoReader object from bytes
        file_obj = io.BytesIO(video_bytes)
        vr = VideoReader(file_obj, ctx=cpu(0))

        # Get the total number of frames in the video
        total_frames = len(vr)

        # Calculate the interval at which frames should be extracted
        interval = total_frames // num_frames

        frames = []

        for i in range(num_frames):
            # Seek to the next frame to be captured
            frame_index = min(i * interval, total_frames - 1)

            # Read the frame
            frame = vr[frame_index].asnumpy()

            # Add the frame to the list
            frames.append(frame)

        return frames

    def preprocess_frames(self, video_frames):
        """
        Define a preprocessing function to convert video frames into a format suitable for the model
        """
        frames = np.array(video_frames)
        # Use the XCLIP Processor to preprocess the frames
        inputs = self.processor(
            text=None, videos=list(frames), return_tensors="pt", padding=True
        ).to(self.device)

        return inputs

    def embed_frames_with_xclip_processing(self, frames):
        # Initialize an empty list to store the frame embeddings

        # self.logger.info("Preprocessing frames.")
        frame_preprocessed = self.preprocess_frames(frames)

        # Pass the preprocessed frame through the model to get the frame embeddings
        # self.logger.info("Getting video features.")
        frame_embedding = self.model.get_video_features(**frame_preprocessed)

        # Check the shape of the tensor
        # self.logger.info(f"Shape of the batch_emb tensor: {frame_embedding.shape}")

        # Normalize the embeddings if it's a 2D tensor
        if frame_embedding.dim() == 2:
            # self.logger.info("Normalizing embeddings")
            batch_emb = torch.nn.functional.normalize(frame_embedding, p=2, dim=1)
        else:
            # self.logger.info("Skipping normalization due to tensor shape")
            batch_emb = frame_embedding.squeeze(0)

        # self.logger.info("Converting into numpy array")
        batch_emb = batch_emb.cpu().detach().numpy()

        # self.logger.info("Converting to list")
        batch_emb = batch_emb.tolist()

        # self.logger.info("Returning batch_emb list")
        return batch_emb

    def set_default(self, obj):
        if isinstance(obj, set):
            return list(obj)
        raise TypeError

    def process_video(self, video_url, video_metadata):
        try:
            self.logger.info("Downloading video as bytes.")
            download_start_time = timeit.default_timer()
            video_bytes, video_headers = self.download_video_as_bytes(video_url)
            download_end_time = timeit.default_timer()
            self.logger.info(
                f"Video downloading took {download_end_time - download_start_time} seconds"
            )
            self.logger.info("Extracting frames.")
            processing_start_time = timeit.default_timer()
            frames = self.extract_evenly_spaced_frames_from_bytes(
                video_bytes, num_frames=32
            )
            processing_end_time = timeit.default_timer()
            self.logger.info(
                f"Extracting video frames took {processing_end_time - processing_start_time} seconds"
            )
            self.logger.info("Embedding frames with Xclip.")
            embedding_start_time = timeit.default_timer()
            frame_embeddings = self.embed_frames_with_xclip_processing(frames)
            embedding_end_time = timeit.default_timer()
            self.logger.info(
                f"Embedding calculation took {embedding_end_time - embedding_start_time} seconds"
            )
            # Extract text from each frame using EasyOCR
            self.logger.info("Extracting text from frames.")
            text_extraction_start_time = timeit.default_timer()
            # frame_texts = [self.reader.readtext(frame, detail=0) for frame in frames]
            frame_texts = [
                self.reader.readtext(frames[i], detail=0)
                for i in range(0, len(frames), EXTRACT_EVERY_X_FRAMES)
            ]
            texts_set = set()
            for text_list in frame_texts:
                [texts_set.add(text) for text in text_list]

            video_metadata["extracted_text"] = json.dumps(
                texts_set, default=self.set_default
            )
            text_extraction_end_time = timeit.default_timer()
            self.logger.info(
                f"Text extraction took {text_extraction_end_time - text_extraction_start_time} seconds"
            )

            video_metadata["url"] = video_url
            self.logger.info("Returning embeddings and metadata.")
            return frame_embeddings, video_metadata
        except Exception as e:
            print(e)
            return None, None, None

    def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        Process the input data based on its type and return the embeddings.

        This method accepts a dictionary with a 'process_type' key that can be either 'images' or 'text'.
        If 'process_type' is 'images', the method expects a list of image URLs under the 'images_urls' key.
        It downloads and processes these images, and returns their embeddings.
        If 'process_type' is 'text', the method expects a string query under the 'query' key.
        It processes this text and returns its embedding.

        Parameters:
        - data: Dict[str, Any]
            A dictionary containing the data to be processed.
            It must include a 'process_type' key with value either 'images' or 'text'.
            If 'process_type' is 'images', data should also include 'images_urls' key with a list of image URLs.
            If 'process_type' is 'text', data should also include 'query' key with a string query.

        Returns:
        - List[Dict[str, Any]]
            A list of dictionaries, each containing the embeddings of the processed data.
            If an error occurs during processing, the dictionary will include an 'error' key with the error message.

        Raises:
        - ValueError: If the 'process_type' key is not present in data, or if the required keys for 'images' or 'text' are not present or are of the wrong type.
        """

        if data["process_type"] == "videos":
            try:
                if "videos_urls" not in data or not isinstance(
                    data["videos_urls"], list
                ):
                    raise ValueError(
                        "Data must contain 'videos_urls' key with a list of videos urls."
                    )

                batch_size = 4
                if "batch_size" in data:
                    batch_size = int(data["batch_size"])
                # Download and process the videos
                processed_video_embeddings = []
                processed_videos_metadata = []

                for i in range(0, len(data["videos_urls"]), batch_size):
                    videos_urls = data["videos_urls"][i : i + batch_size]
                    videos_metadata = data["videos_metadata"][i : i + batch_size]

                    with ThreadPoolExecutor() as executor:
                        futures = [
                            executor.submit(self.process_video, url, metadata)
                            for url, metadata in zip(videos_urls, videos_metadata)
                        ]
                        for future in as_completed(futures):
                            frame_embeddings, video_metadata = future.result()
                            if frame_embeddings is not None:
                                processed_video_embeddings.append(frame_embeddings)
                                self.logger.info("Finished appending video embedding")
                                processed_metadata = {
                                    "text": video_metadata["caption"],
                                    "source": video_metadata["url"],
                                    "source_type": "video_frames",
                                    **video_metadata,
                                }
                                processed_videos_metadata.append(processed_metadata)
                                self.logger.info("Finished appending video metadata")
                    self.logger.info(f"Finished processing batch {i}")
                # Return the embeddings
                self.logger.info("Returning embeddings and metadata of all batches")
                return {
                    "embeddings": processed_video_embeddings,
                    "metadata": processed_videos_metadata,
                }

            except Exception as e:
                print(f"Error during videos processing: {str(e)}")
                return {"embeddings": [], "error": str(e)}

        elif data["process_type"] == "text":
            if "query" not in data or not isinstance(data["query"], str):
                raise ValueError("Data must contain 'query' key which is a str.")
            query = data["query"]
            inputs = self.tokenizer(query, return_tensors="pt").to(self.device)
            text_emb = self.model.get_text_features(**inputs)
            # detach text emb from graph, move to CPU, and convert to numpy array
            text_emb = text_emb.detach().cpu().numpy()

            # calculate value to normalize each vector by and normalize them
            norm_factor = np.linalg.norm(text_emb, axis=1)

            text_emb = text_emb.T / norm_factor
            # transpose back to (21, 512)
            text_emb = text_emb.T

            # Converting tensor to list for JSON response
            text_emb_list = text_emb.tolist()

            return {"embeddings": text_emb_list}

        else:
            print(
                f"Error during CLIP endpoint processing: data['process_type']: {data['process_type']} neither 'images' or 'text'"
            )
            return {"embeddings": [], "error": str(e)}

        # pseudo
        # self.model(input)