|
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from contextlib import ExitStack, contextmanager
|
|
from copy import deepcopy
|
|
from unittest import mock
|
|
import torch
|
|
from torch import nn
|
|
|
|
|
|
import detectron2
|
|
from detectron2.structures import Boxes, Instances
|
|
from detectron2.utils.env import _import_file
|
|
|
|
_counter = 0
|
|
|
|
|
|
def _clear_jit_cache():
|
|
from torch.jit._recursive import concrete_type_store
|
|
from torch.jit._state import _jit_caching_layer
|
|
|
|
concrete_type_store.type_store.clear()
|
|
_jit_caching_layer.clear()
|
|
|
|
|
|
def _add_instances_conversion_methods(newInstances):
|
|
"""
|
|
Add from_instances methods to the scripted Instances class.
|
|
"""
|
|
cls_name = newInstances.__name__
|
|
|
|
@torch.jit.unused
|
|
def from_instances(instances: Instances):
|
|
"""
|
|
Create scripted Instances from original Instances
|
|
"""
|
|
fields = instances.get_fields()
|
|
image_size = instances.image_size
|
|
ret = newInstances(image_size)
|
|
for name, val in fields.items():
|
|
assert hasattr(ret, f"_{name}"), f"No attribute named {name} in {cls_name}"
|
|
setattr(ret, name, deepcopy(val))
|
|
return ret
|
|
|
|
newInstances.from_instances = from_instances
|
|
|
|
|
|
@contextmanager
|
|
def patch_instances(fields):
|
|
"""
|
|
A contextmanager, under which the Instances class in detectron2 is replaced
|
|
by a statically-typed scriptable class, defined by `fields`.
|
|
See more in `scripting_with_instances`.
|
|
"""
|
|
|
|
with tempfile.TemporaryDirectory(prefix="detectron2") as dir, tempfile.NamedTemporaryFile(
|
|
mode="w", encoding="utf-8", suffix=".py", dir=dir, delete=False
|
|
) as f:
|
|
try:
|
|
|
|
|
|
_clear_jit_cache()
|
|
|
|
cls_name, s = _gen_instance_module(fields)
|
|
f.write(s)
|
|
f.flush()
|
|
f.close()
|
|
|
|
module = _import(f.name)
|
|
new_instances = getattr(module, cls_name)
|
|
_ = torch.jit.script(new_instances)
|
|
|
|
Instances.__torch_script_class__ = True
|
|
|
|
Instances._jit_override_qualname = torch._jit_internal._qualified_name(new_instances)
|
|
|
|
_add_instances_conversion_methods(new_instances)
|
|
yield new_instances
|
|
finally:
|
|
try:
|
|
del Instances.__torch_script_class__
|
|
del Instances._jit_override_qualname
|
|
except AttributeError:
|
|
pass
|
|
sys.modules.pop(module.__name__)
|
|
|
|
|
|
def _gen_instance_class(fields):
|
|
"""
|
|
Args:
|
|
fields (dict[name: type])
|
|
"""
|
|
|
|
class _FieldType:
|
|
def __init__(self, name, type_):
|
|
assert isinstance(name, str), f"Field name must be str, got {name}"
|
|
self.name = name
|
|
self.type_ = type_
|
|
self.annotation = f"{type_.__module__}.{type_.__name__}"
|
|
|
|
fields = [_FieldType(k, v) for k, v in fields.items()]
|
|
|
|
def indent(level, s):
|
|
return " " * 4 * level + s
|
|
|
|
lines = []
|
|
|
|
global _counter
|
|
_counter += 1
|
|
|
|
cls_name = "ScriptedInstances{}".format(_counter)
|
|
|
|
field_names = tuple(x.name for x in fields)
|
|
extra_args = ", ".join([f"{f.name}: Optional[{f.annotation}] = None" for f in fields])
|
|
lines.append(
|
|
f"""
|
|
class {cls_name}:
|
|
def __init__(self, image_size: Tuple[int, int], {extra_args}):
|
|
self.image_size = image_size
|
|
self._field_names = {field_names}
|
|
"""
|
|
)
|
|
|
|
for f in fields:
|
|
lines.append(
|
|
indent(2, f"self._{f.name} = torch.jit.annotate(Optional[{f.annotation}], {f.name})")
|
|
)
|
|
|
|
for f in fields:
|
|
lines.append(
|
|
f"""
|
|
@property
|
|
def {f.name}(self) -> {f.annotation}:
|
|
# has to use a local for type refinement
|
|
# https://pytorch.org/docs/stable/jit_language_reference.html#optional-type-refinement
|
|
t = self._{f.name}
|
|
assert t is not None, "{f.name} is None and cannot be accessed!"
|
|
return t
|
|
|
|
@{f.name}.setter
|
|
def {f.name}(self, value: {f.annotation}) -> None:
|
|
self._{f.name} = value
|
|
"""
|
|
)
|
|
|
|
|
|
lines.append(
|
|
"""
|
|
def __len__(self) -> int:
|
|
"""
|
|
)
|
|
for f in fields:
|
|
lines.append(
|
|
f"""
|
|
t = self._{f.name}
|
|
if t is not None:
|
|
return len(t)
|
|
"""
|
|
)
|
|
lines.append(
|
|
"""
|
|
raise NotImplementedError("Empty Instances does not support __len__!")
|
|
"""
|
|
)
|
|
|
|
|
|
lines.append(
|
|
"""
|
|
def has(self, name: str) -> bool:
|
|
"""
|
|
)
|
|
for f in fields:
|
|
lines.append(
|
|
f"""
|
|
if name == "{f.name}":
|
|
return self._{f.name} is not None
|
|
"""
|
|
)
|
|
lines.append(
|
|
"""
|
|
return False
|
|
"""
|
|
)
|
|
|
|
|
|
none_args = ", None" * len(fields)
|
|
lines.append(
|
|
f"""
|
|
def to(self, device: torch.device) -> "{cls_name}":
|
|
ret = {cls_name}(self.image_size{none_args})
|
|
"""
|
|
)
|
|
for f in fields:
|
|
if hasattr(f.type_, "to"):
|
|
lines.append(
|
|
f"""
|
|
t = self._{f.name}
|
|
if t is not None:
|
|
ret._{f.name} = t.to(device)
|
|
"""
|
|
)
|
|
else:
|
|
|
|
|
|
pass
|
|
lines.append(
|
|
"""
|
|
return ret
|
|
"""
|
|
)
|
|
|
|
|
|
none_args = ", None" * len(fields)
|
|
lines.append(
|
|
f"""
|
|
def __getitem__(self, item) -> "{cls_name}":
|
|
ret = {cls_name}(self.image_size{none_args})
|
|
"""
|
|
)
|
|
for f in fields:
|
|
lines.append(
|
|
f"""
|
|
t = self._{f.name}
|
|
if t is not None:
|
|
ret._{f.name} = t[item]
|
|
"""
|
|
)
|
|
lines.append(
|
|
"""
|
|
return ret
|
|
"""
|
|
)
|
|
|
|
|
|
|
|
none_args = ", None" * len(fields)
|
|
lines.append(
|
|
f"""
|
|
def cat(self, instances: List["{cls_name}"]) -> "{cls_name}":
|
|
ret = {cls_name}(self.image_size{none_args})
|
|
"""
|
|
)
|
|
for f in fields:
|
|
lines.append(
|
|
f"""
|
|
t = self._{f.name}
|
|
if t is not None:
|
|
values: List[{f.annotation}] = [x.{f.name} for x in instances]
|
|
if torch.jit.isinstance(t, torch.Tensor):
|
|
ret._{f.name} = torch.cat(values, dim=0)
|
|
else:
|
|
ret._{f.name} = t.cat(values)
|
|
"""
|
|
)
|
|
lines.append(
|
|
"""
|
|
return ret"""
|
|
)
|
|
|
|
|
|
lines.append(
|
|
"""
|
|
def get_fields(self) -> Dict[str, Tensor]:
|
|
ret = {}
|
|
"""
|
|
)
|
|
for f in fields:
|
|
if f.type_ == Boxes:
|
|
stmt = "t.tensor"
|
|
elif f.type_ == torch.Tensor:
|
|
stmt = "t"
|
|
else:
|
|
stmt = f'assert False, "unsupported type {str(f.type_)}"'
|
|
lines.append(
|
|
f"""
|
|
t = self._{f.name}
|
|
if t is not None:
|
|
ret["{f.name}"] = {stmt}
|
|
"""
|
|
)
|
|
lines.append(
|
|
"""
|
|
return ret"""
|
|
)
|
|
return cls_name, os.linesep.join(lines)
|
|
|
|
|
|
def _gen_instance_module(fields):
|
|
|
|
s = """
|
|
from copy import deepcopy
|
|
import torch
|
|
from torch import Tensor
|
|
import typing
|
|
from typing import *
|
|
|
|
import detectron2
|
|
from detectron2.structures import Boxes, Instances
|
|
|
|
"""
|
|
|
|
cls_name, cls_def = _gen_instance_class(fields)
|
|
s += cls_def
|
|
return cls_name, s
|
|
|
|
|
|
def _import(path):
|
|
return _import_file(
|
|
"{}{}".format(sys.modules[__name__].__name__, _counter), path, make_importable=True
|
|
)
|
|
|
|
|
|
@contextmanager
|
|
def patch_builtin_len(modules=()):
|
|
"""
|
|
Patch the builtin len() function of a few detectron2 modules
|
|
to use __len__ instead, because __len__ does not convert values to
|
|
integers and therefore is friendly to tracing.
|
|
|
|
Args:
|
|
modules (list[stsr]): names of extra modules to patch len(), in
|
|
addition to those in detectron2.
|
|
"""
|
|
|
|
def _new_len(obj):
|
|
return obj.__len__()
|
|
|
|
with ExitStack() as stack:
|
|
MODULES = [
|
|
"detectron2.modeling.roi_heads.fast_rcnn",
|
|
"detectron2.modeling.roi_heads.mask_head",
|
|
"detectron2.modeling.roi_heads.keypoint_head",
|
|
] + list(modules)
|
|
ctxs = [stack.enter_context(mock.patch(mod + ".len")) for mod in MODULES]
|
|
for m in ctxs:
|
|
m.side_effect = _new_len
|
|
yield
|
|
|
|
|
|
def patch_nonscriptable_classes():
|
|
"""
|
|
Apply patches on a few nonscriptable detectron2 classes.
|
|
Should not have side-effects on eager usage.
|
|
"""
|
|
|
|
|
|
|
|
from detectron2.modeling.backbone import ResNet, FPN
|
|
|
|
|
|
|
|
|
|
|
|
def prepare_resnet(self):
|
|
ret = deepcopy(self)
|
|
ret.stages = nn.ModuleList(ret.stages)
|
|
for k in self.stage_names:
|
|
delattr(ret, k)
|
|
return ret
|
|
|
|
ResNet.__prepare_scriptable__ = prepare_resnet
|
|
|
|
def prepare_fpn(self):
|
|
ret = deepcopy(self)
|
|
ret.lateral_convs = nn.ModuleList(ret.lateral_convs)
|
|
ret.output_convs = nn.ModuleList(ret.output_convs)
|
|
for name, _ in self.named_children():
|
|
if name.startswith("fpn_"):
|
|
delattr(ret, name)
|
|
return ret
|
|
|
|
FPN.__prepare_scriptable__ = prepare_fpn
|
|
|
|
|
|
|
|
from detectron2.modeling.roi_heads import StandardROIHeads
|
|
|
|
if hasattr(StandardROIHeads, "__annotations__"):
|
|
|
|
StandardROIHeads.__annotations__ = deepcopy(StandardROIHeads.__annotations__)
|
|
StandardROIHeads.__annotations__["mask_on"] = torch.jit.Final[bool]
|
|
StandardROIHeads.__annotations__["keypoint_on"] = torch.jit.Final[bool]
|
|
|
|
|
|
|
|
patch_nonscriptable_classes()
|
|
|
|
|
|
@contextmanager
|
|
def freeze_training_mode(model):
|
|
"""
|
|
A context manager that annotates the "training" attribute of every submodule
|
|
to constant, so that the training codepath in these modules can be
|
|
meta-compiled away. Upon exiting, the annotations are reverted.
|
|
"""
|
|
classes = {type(x) for x in model.modules()}
|
|
|
|
|
|
classes = {x for x in classes if not hasattr(x, "__constants__")}
|
|
for cls in classes:
|
|
cls.__annotations__["training"] = torch.jit.Final[bool]
|
|
yield
|
|
for cls in classes:
|
|
cls.__annotations__["training"] = bool
|
|
|