File size: 3,241 Bytes
3d2142b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# ------------------------------------------------------------------------
# Copyright (c) 2023-present, BAAI. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ------------------------------------------------------------------------
"""Engine for testing."""

import time

from tokenize_anything.build_model import model_registry


class InferenceCommand(object):
    """Command to run batched inference."""

    def __init__(self, input_queue, output_queue, kwargs):
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.kwargs = kwargs

    def build_env(self):
        """Build the environment."""
        self.batch_size = self.kwargs.get("batch_size", 1)
        self.batch_timeout = self.kwargs.get("batch_timeout", None)

    def build_model(self):
        """Build and return the model."""
        builder = model_registry[self.kwargs["model_type"]]
        return builder(device=self.kwargs["device"], checkpoint=self.kwargs["weights"])

    def build_predictor(self, model):
        """Build and return the predictor."""
        return self.kwargs["predictor_type"](model, self.kwargs)

    def send_results(self, predictor, indices, examples):
        """Send the inference results."""
        results = predictor.get_results(examples)
        if hasattr(predictor, "timers"):
            time_diffs = dict((k, v.average_time) for k, v in predictor.timers.items())
            for i, outputs in enumerate(results):
                self.output_queue.put((indices[i], time_diffs, outputs))
        else:
            for i, outputs in enumerate(results):
                self.output_queue.put((indices[i], outputs))

    def run(self):
        """Main loop to make the inference outputs."""
        self.build_env()
        model = self.build_model()
        predictor = self.build_predictor(model)
        must_stop = False
        while not must_stop:
            indices, examples = [], []
            deadline, timeout = None, None
            for i in range(self.batch_size):
                if self.batch_timeout and i == 1:
                    deadline = time.monotonic() + self.batch_timeout
                if self.batch_timeout and i >= 1:
                    timeout = deadline - time.monotonic()
                try:
                    index, example = self.input_queue.get(timeout=timeout)
                    if index < 0:
                        must_stop = True
                        break
                    indices.append(index)
                    examples.append(example)
                except Exception:
                    pass
            if len(examples) == 0:
                continue
            self.send_results(predictor, indices, examples)