# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license """ Export a YOLO PyTorch model to other formats. TensorFlow exports authored by https://github.com/zldrobit. Format | `format=argument` | Model --- | --- | --- PyTorch | - | yolo11n.pt TorchScript | `torchscript` | yolo11n.torchscript ONNX | `onnx` | yolo11n.onnx OpenVINO | `openvino` | yolo11n_openvino_model/ TensorRT | `engine` | yolo11n.engine CoreML | `coreml` | yolo11n.mlpackage TensorFlow SavedModel | `saved_model` | yolo11n_saved_model/ TensorFlow GraphDef | `pb` | yolo11n.pb TensorFlow Lite | `tflite` | yolo11n.tflite TensorFlow Edge TPU | `edgetpu` | yolo11n_edgetpu.tflite TensorFlow.js | `tfjs` | yolo11n_web_model/ PaddlePaddle | `paddle` | yolo11n_paddle_model/ MNN | `mnn` | yolo11n.mnn NCNN | `ncnn` | yolo11n_ncnn_model/ IMX | `imx` | yolo11n_imx_model/ Requirements: $ pip install "ultralytics[export]" Python: from ultralytics import YOLO model = YOLO('yolo11n.pt') results = model.export(format='onnx') CLI: $ yolo mode=export model=yolo11n.pt format=onnx Inference: $ yolo predict model=yolo11n.pt # PyTorch yolo11n.torchscript # TorchScript yolo11n.onnx # ONNX Runtime or OpenCV DNN with dnn=True yolo11n_openvino_model # OpenVINO yolo11n.engine # TensorRT yolo11n.mlpackage # CoreML (macOS-only) yolo11n_saved_model # TensorFlow SavedModel yolo11n.pb # TensorFlow GraphDef yolo11n.tflite # TensorFlow Lite yolo11n_edgetpu.tflite # TensorFlow Edge TPU yolo11n_paddle_model # PaddlePaddle yolo11n.mnn # MNN yolo11n_ncnn_model # NCNN yolo11n_imx_model # IMX TensorFlow.js: $ cd .. && git clone https://github.com/zldrobit/tfjs-yolov5-example.git && cd tfjs-yolov5-example $ npm install $ ln -s ../../yolo11n_web_model public/yolo11n_web_model $ npm start """ import gc import json import os import shutil import subprocess import time import warnings from copy import deepcopy from datetime import datetime from pathlib import Path import numpy as np import torch from ultralytics.cfg import TASK2DATA, get_cfg from ultralytics.data import build_dataloader from ultralytics.data.dataset import YOLODataset from ultralytics.data.utils import check_cls_dataset, check_det_dataset from ultralytics.nn.autobackend import check_class_names, default_class_names from ultralytics.nn.modules import C2f, Classify, Detect, RTDETRDecoder from ultralytics.nn.tasks import DetectionModel, SegmentationModel, WorldModel from ultralytics.utils import ( ARM64, DEFAULT_CFG, IS_JETSON, LINUX, LOGGER, MACOS, PYTHON_VERSION, ROOT, WINDOWS, __version__, callbacks, colorstr, get_default_args, yaml_save, ) from ultralytics.utils.checks import ( check_imgsz, check_is_path_safe, check_requirements, check_version, is_sudo_available, ) from ultralytics.utils.downloads import attempt_download_asset, get_github_assets, safe_download from ultralytics.utils.files import file_size, spaces_in_path from ultralytics.utils.ops import Profile from ultralytics.utils.torch_utils import TORCH_1_13, get_latest_opset, select_device def export_formats(): """Ultralytics YOLO export formats.""" x = [ ["PyTorch", "-", ".pt", True, True, []], ["TorchScript", "torchscript", ".torchscript", True, True, ["batch", "optimize"]], ["ONNX", "onnx", ".onnx", True, True, ["batch", "dynamic", "half", "opset", "simplify"]], ["OpenVINO", "openvino", "_openvino_model", True, False, ["batch", "dynamic", "half", "int8"]], ["TensorRT", "engine", ".engine", False, True, ["batch", "dynamic", "half", "int8", "simplify"]], ["CoreML", "coreml", ".mlpackage", True, False, ["batch", "half", "int8", "nms"]], ["TensorFlow SavedModel", "saved_model", "_saved_model", True, True, ["batch", "int8", "keras"]], ["TensorFlow GraphDef", "pb", ".pb", True, True, ["batch"]], ["TensorFlow Lite", "tflite", ".tflite", True, False, ["batch", "half", "int8"]], ["TensorFlow Edge TPU", "edgetpu", "_edgetpu.tflite", True, False, []], ["TensorFlow.js", "tfjs", "_web_model", True, False, ["batch", "half", "int8"]], ["PaddlePaddle", "paddle", "_paddle_model", True, True, ["batch"]], ["MNN", "mnn", ".mnn", True, True, ["batch", "half", "int8"]], ["NCNN", "ncnn", "_ncnn_model", True, True, ["batch", "half"]], ["IMX", "imx", "_imx_model", True, True, ["int8"]], ] return dict(zip(["Format", "Argument", "Suffix", "CPU", "GPU", "Arguments"], zip(*x))) def validate_args(format, passed_args, valid_args): """ Validates arguments based on format. Args: format (str): The export format. passed_args (Namespace): The arguments used during export. valid_args (dict): List of valid arguments for the format. Raises: AssertionError: If an argument that's not supported by the export format is used, or if format doesn't have the supported arguments listed. """ # Only check valid usage of these args export_args = ["half", "int8", "dynamic", "keras", "nms", "batch"] assert valid_args is not None, f"ERROR ❌️ valid arguments for '{format}' not listed." custom = {"batch": 1, "data": None, "device": None} # exporter defaults default_args = get_cfg(DEFAULT_CFG, custom) for arg in export_args: not_default = getattr(passed_args, arg, None) != getattr(default_args, arg, None) if not_default: assert arg in valid_args, f"ERROR ❌️ argument '{arg}' is not supported for format='{format}'" def gd_outputs(gd): """TensorFlow GraphDef model output node names.""" name_list, input_list = [], [] for node in gd.node: # tensorflow.core.framework.node_def_pb2.NodeDef name_list.append(node.name) input_list.extend(node.input) return sorted(f"{x}:0" for x in list(set(name_list) - set(input_list)) if not x.startswith("NoOp")) def try_export(inner_func): """YOLO export decorator, i.e. @try_export.""" inner_args = get_default_args(inner_func) def outer_func(*args, **kwargs): """Export a model.""" prefix = inner_args["prefix"] try: with Profile() as dt: f, model = inner_func(*args, **kwargs) LOGGER.info(f"{prefix} export success ✅ {dt.t:.1f}s, saved as '{f}' ({file_size(f):.1f} MB)") return f, model except Exception as e: LOGGER.error(f"{prefix} export failure ❌ {dt.t:.1f}s: {e}") raise e return outer_func class Exporter: """ A class for exporting a model. Attributes: args (SimpleNamespace): Configuration for the exporter. callbacks (list, optional): List of callback functions. Defaults to None. """ def __init__(self, cfg=DEFAULT_CFG, overrides=None, _callbacks=None): """ Initializes the Exporter class. Args: cfg (str, optional): Path to a configuration file. Defaults to DEFAULT_CFG. overrides (dict, optional): Configuration overrides. Defaults to None. _callbacks (dict, optional): Dictionary of callback functions. Defaults to None. """ self.args = get_cfg(cfg, overrides) if self.args.format.lower() in {"coreml", "mlmodel"}: # fix attempt for protobuf<3.20.x errors os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python" # must run before TensorBoard callback self.callbacks = _callbacks or callbacks.get_default_callbacks() callbacks.add_integration_callbacks(self) def __call__(self, model=None) -> str: """Returns list of exported files/dirs after running callbacks.""" self.run_callbacks("on_export_start") t = time.time() fmt = self.args.format.lower() # to lowercase if fmt in {"tensorrt", "trt"}: # 'engine' aliases fmt = "engine" if fmt in {"mlmodel", "mlpackage", "mlprogram", "apple", "ios", "coreml"}: # 'coreml' aliases fmt = "coreml" fmts_dict = export_formats() fmts = tuple(fmts_dict["Argument"][1:]) # available export formats if fmt not in fmts: import difflib # Get the closest match if format is invalid matches = difflib.get_close_matches(fmt, fmts, n=1, cutoff=0.6) # 60% similarity required to match if not matches: raise ValueError(f"Invalid export format='{fmt}'. Valid formats are {fmts}") LOGGER.warning(f"WARNING ⚠️ Invalid export format='{fmt}', updating to format='{matches[0]}'") fmt = matches[0] flags = [x == fmt for x in fmts] if sum(flags) != 1: raise ValueError(f"Invalid export format='{fmt}'. Valid formats are {fmts}") ( jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, paddle, mnn, ncnn, imx, ) = flags # export booleans is_tf_format = any((saved_model, pb, tflite, edgetpu, tfjs)) # Device dla = None if fmt == "engine" and self.args.device is None: LOGGER.warning("WARNING ⚠️ TensorRT requires GPU export, automatically assigning device=0") self.args.device = "0" if fmt == "engine" and "dla" in str(self.args.device): # convert int/list to str first dla = self.args.device.split(":")[-1] self.args.device = "0" # update device to "0" assert dla in {"0", "1"}, f"Expected self.args.device='dla:0' or 'dla:1, but got {self.args.device}." self.device = select_device("cpu" if self.args.device is None else self.args.device) # Argument compatibility checks fmt_keys = fmts_dict["Arguments"][flags.index(True) + 1] validate_args(fmt, self.args, fmt_keys) if imx and not self.args.int8: LOGGER.warning("WARNING ⚠️ IMX only supports int8 export, setting int8=True.") self.args.int8 = True if not hasattr(model, "names"): model.names = default_class_names() model.names = check_class_names(model.names) if self.args.half and self.args.int8: LOGGER.warning("WARNING ⚠️ half=True and int8=True are mutually exclusive, setting half=False.") self.args.half = False if self.args.half and onnx and self.device.type == "cpu": LOGGER.warning("WARNING ⚠️ half=True only compatible with GPU export, i.e. use device=0") self.args.half = False assert not self.args.dynamic, "half=True not compatible with dynamic=True, i.e. use only one." self.imgsz = check_imgsz(self.args.imgsz, stride=model.stride, min_dim=2) # check image size if self.args.int8 and engine: self.args.dynamic = True # enforce dynamic to export TensorRT INT8 if self.args.optimize: assert not ncnn, "optimize=True not compatible with format='ncnn', i.e. use optimize=False" assert self.device.type == "cpu", "optimize=True not compatible with cuda devices, i.e. use device='cpu'" if self.args.int8 and tflite: assert not getattr(model, "end2end", False), "TFLite INT8 export not supported for end2end models." if edgetpu: if not LINUX: raise SystemError("Edge TPU export only supported on Linux. See https://coral.ai/docs/edgetpu/compiler") elif self.args.batch != 1: # see github.com/ultralytics/ultralytics/pull/13420 LOGGER.warning("WARNING ⚠️ Edge TPU export requires batch size 1, setting batch=1.") self.args.batch = 1 if isinstance(model, WorldModel): LOGGER.warning( "WARNING ⚠️ YOLOWorld (original version) export is not supported to any format.\n" "WARNING ⚠️ YOLOWorldv2 models (i.e. 'yolov8s-worldv2.pt') only support export to " "(torchscript, onnx, openvino, engine, coreml) formats. " "See https://docs.ultralytics.com/models/yolo-world for details." ) model.clip_model = None # openvino int8 export error: https://github.com/ultralytics/ultralytics/pull/18445 if self.args.int8 and not self.args.data: self.args.data = DEFAULT_CFG.data or TASK2DATA[getattr(model, "task", "detect")] # assign default data LOGGER.warning( "WARNING ⚠️ INT8 export requires a missing 'data' arg for calibration. " f"Using default 'data={self.args.data}'." ) # Input im = torch.zeros(self.args.batch, 3, *self.imgsz).to(self.device) file = Path( getattr(model, "pt_path", None) or getattr(model, "yaml_file", None) or model.yaml.get("yaml_file", "") ) if file.suffix in {".yaml", ".yml"}: file = Path(file.name) # Update model model = deepcopy(model).to(self.device) for p in model.parameters(): p.requires_grad = False model.eval() model.float() model = model.fuse() if imx: from ultralytics.utils.torch_utils import FXModel model = FXModel(model) for m in model.modules(): if isinstance(m, Classify): m.export = True if isinstance(m, (Detect, RTDETRDecoder)): # includes all Detect subclasses like Segment, Pose, OBB m.dynamic = self.args.dynamic m.export = True m.format = self.args.format m.max_det = self.args.max_det elif isinstance(m, C2f) and not is_tf_format: # EdgeTPU does not support FlexSplitV while split provides cleaner ONNX graph m.forward = m.forward_split if isinstance(m, Detect) and imx: from ultralytics.utils.tal import make_anchors m.anchors, m.strides = ( x.transpose(0, 1) for x in make_anchors( torch.cat([s / m.stride.unsqueeze(-1) for s in self.imgsz], dim=1), m.stride, 0.5 ) ) y = None for _ in range(2): y = model(im) # dry runs if self.args.half and onnx and self.device.type != "cpu": im, model = im.half(), model.half() # to FP16 # Filter warnings warnings.filterwarnings("ignore", category=torch.jit.TracerWarning) # suppress TracerWarning warnings.filterwarnings("ignore", category=UserWarning) # suppress shape prim::Constant missing ONNX warning warnings.filterwarnings("ignore", category=DeprecationWarning) # suppress CoreML np.bool deprecation warning # Assign self.im = im self.model = model self.file = file self.output_shape = ( tuple(y.shape) if isinstance(y, torch.Tensor) else tuple(tuple(x.shape if isinstance(x, torch.Tensor) else []) for x in y) ) self.pretty_name = Path(self.model.yaml.get("yaml_file", self.file)).stem.replace("yolo", "YOLO") data = model.args["data"] if hasattr(model, "args") and isinstance(model.args, dict) else "" description = f"Ultralytics {self.pretty_name} model {f'trained on {data}' if data else ''}" self.metadata = { "description": description, "author": "Ultralytics", "date": datetime.now().isoformat(), "version": __version__, "license": "AGPL-3.0 License (https://ultralytics.com/license)", "docs": "https://docs.ultralytics.com", "stride": int(max(model.stride)), "task": model.task, "batch": self.args.batch, "imgsz": self.imgsz, "names": model.names, "args": {k: v for k, v in self.args if k in fmt_keys}, } # model metadata if model.task == "pose": self.metadata["kpt_shape"] = model.model[-1].kpt_shape LOGGER.info( f"\n{colorstr('PyTorch:')} starting from '{file}' with input shape {tuple(im.shape)} BCHW and " f"output shape(s) {self.output_shape} ({file_size(file):.1f} MB)" ) # Exports f = [""] * len(fmts) # exported filenames if jit or ncnn: # TorchScript f[0], _ = self.export_torchscript() if engine: # TensorRT required before ONNX f[1], _ = self.export_engine(dla=dla) if onnx: # ONNX f[2], _ = self.export_onnx() if xml: # OpenVINO f[3], _ = self.export_openvino() if coreml: # CoreML f[4], _ = self.export_coreml() if is_tf_format: # TensorFlow formats self.args.int8 |= edgetpu f[5], keras_model = self.export_saved_model() if pb or tfjs: # pb prerequisite to tfjs f[6], _ = self.export_pb(keras_model=keras_model) if tflite: f[7], _ = self.export_tflite(keras_model=keras_model, nms=False, agnostic_nms=self.args.agnostic_nms) if edgetpu: f[8], _ = self.export_edgetpu(tflite_model=Path(f[5]) / f"{self.file.stem}_full_integer_quant.tflite") if tfjs: f[9], _ = self.export_tfjs() if paddle: # PaddlePaddle f[10], _ = self.export_paddle() if mnn: # MNN f[11], _ = self.export_mnn() if ncnn: # NCNN f[12], _ = self.export_ncnn() if imx: f[13], _ = self.export_imx() # Finish f = [str(x) for x in f if x] # filter out '' and None if any(f): f = str(Path(f[-1])) square = self.imgsz[0] == self.imgsz[1] s = ( "" if square else f"WARNING ⚠️ non-PyTorch val requires square images, 'imgsz={self.imgsz}' will not " f"work. Use export 'imgsz={max(self.imgsz)}' if val is required." ) imgsz = self.imgsz[0] if square else str(self.imgsz)[1:-1].replace(" ", "") predict_data = f"data={data}" if model.task == "segment" and fmt == "pb" else "" q = "int8" if self.args.int8 else "half" if self.args.half else "" # quantization LOGGER.info( f"\nExport complete ({time.time() - t:.1f}s)" f"\nResults saved to {colorstr('bold', file.parent.resolve())}" f"\nPredict: yolo predict task={model.task} model={f} imgsz={imgsz} {q} {predict_data}" f"\nValidate: yolo val task={model.task} model={f} imgsz={imgsz} data={data} {q} {s}" f"\nVisualize: https://netron.app" ) self.run_callbacks("on_export_end") return f # return list of exported files/dirs def get_int8_calibration_dataloader(self, prefix=""): """Build and return a dataloader suitable for calibration of INT8 models.""" LOGGER.info(f"{prefix} collecting INT8 calibration images from 'data={self.args.data}'") data = (check_cls_dataset if self.model.task == "classify" else check_det_dataset)(self.args.data) # TensorRT INT8 calibration should use 2x batch size batch = self.args.batch * (2 if self.args.format == "engine" else 1) dataset = YOLODataset( data[self.args.split or "val"], data=data, task=self.model.task, imgsz=self.imgsz[0], augment=False, batch_size=batch, ) n = len(dataset) if n < self.args.batch: raise ValueError( f"The calibration dataset ({n} images) must have at least as many images as the batch size ('batch={self.args.batch}')." ) elif n < 300: LOGGER.warning(f"{prefix} WARNING ⚠️ >300 images recommended for INT8 calibration, found {n} images.") return build_dataloader(dataset, batch=batch, workers=0) # required for batch loading @try_export def export_torchscript(self, prefix=colorstr("TorchScript:")): """YOLO TorchScript model export.""" LOGGER.info(f"\n{prefix} starting export with torch {torch.__version__}...") f = self.file.with_suffix(".torchscript") ts = torch.jit.trace(self.model, self.im, strict=False) extra_files = {"config.txt": json.dumps(self.metadata)} # torch._C.ExtraFilesMap() if self.args.optimize: # https://pytorch.org/tutorials/recipes/mobile_interpreter.html LOGGER.info(f"{prefix} optimizing for mobile...") from torch.utils.mobile_optimizer import optimize_for_mobile optimize_for_mobile(ts)._save_for_lite_interpreter(str(f), _extra_files=extra_files) else: ts.save(str(f), _extra_files=extra_files) return f, None @try_export def export_onnx(self, prefix=colorstr("ONNX:")): """YOLO ONNX export.""" requirements = ["onnx>=1.12.0"] if self.args.simplify: requirements += ["onnxslim", "onnxruntime" + ("-gpu" if torch.cuda.is_available() else "")] check_requirements(requirements) import onnx # noqa opset_version = self.args.opset or get_latest_opset() LOGGER.info(f"\n{prefix} starting export with onnx {onnx.__version__} opset {opset_version}...") f = str(self.file.with_suffix(".onnx")) output_names = ["output0", "output1"] if isinstance(self.model, SegmentationModel) else ["output0"] dynamic = self.args.dynamic if dynamic: dynamic = {"images": {0: "batch", 2: "height", 3: "width"}} # shape(1,3,640,640) if isinstance(self.model, SegmentationModel): dynamic["output0"] = {0: "batch", 2: "anchors"} # shape(1, 116, 8400) dynamic["output1"] = {0: "batch", 2: "mask_height", 3: "mask_width"} # shape(1,32,160,160) elif isinstance(self.model, DetectionModel): dynamic["output0"] = {0: "batch", 2: "anchors"} # shape(1, 84, 8400) torch.onnx.export( self.model.cpu() if dynamic else self.model, # dynamic=True only compatible with cpu self.im.cpu() if dynamic else self.im, f, verbose=False, opset_version=opset_version, do_constant_folding=True, # WARNING: DNN inference with torch>=1.12 may require do_constant_folding=False input_names=["images"], output_names=output_names, dynamic_axes=dynamic or None, ) # Checks model_onnx = onnx.load(f) # load onnx model # Simplify if self.args.simplify: try: import onnxslim LOGGER.info(f"{prefix} slimming with onnxslim {onnxslim.__version__}...") model_onnx = onnxslim.slim(model_onnx) except Exception as e: LOGGER.warning(f"{prefix} simplifier failure: {e}") # Metadata for k, v in self.metadata.items(): meta = model_onnx.metadata_props.add() meta.key, meta.value = k, str(v) onnx.save(model_onnx, f) return f, model_onnx @try_export def export_openvino(self, prefix=colorstr("OpenVINO:")): """YOLO OpenVINO export.""" check_requirements("openvino>=2024.5.0") import openvino as ov LOGGER.info(f"\n{prefix} starting export with openvino {ov.__version__}...") assert TORCH_1_13, f"OpenVINO export requires torch>=1.13.0 but torch=={torch.__version__} is installed" ov_model = ov.convert_model( self.model, input=None if self.args.dynamic else [self.im.shape], example_input=self.im, ) def serialize(ov_model, file): """Set RT info, serialize and save metadata YAML.""" ov_model.set_rt_info("YOLO", ["model_info", "model_type"]) ov_model.set_rt_info(True, ["model_info", "reverse_input_channels"]) ov_model.set_rt_info(114, ["model_info", "pad_value"]) ov_model.set_rt_info([255.0], ["model_info", "scale_values"]) ov_model.set_rt_info(self.args.iou, ["model_info", "iou_threshold"]) ov_model.set_rt_info([v.replace(" ", "_") for v in self.model.names.values()], ["model_info", "labels"]) if self.model.task != "classify": ov_model.set_rt_info("fit_to_window_letterbox", ["model_info", "resize_type"]) ov.runtime.save_model(ov_model, file, compress_to_fp16=self.args.half) yaml_save(Path(file).parent / "metadata.yaml", self.metadata) # add metadata.yaml if self.args.int8: fq = str(self.file).replace(self.file.suffix, f"_int8_openvino_model{os.sep}") fq_ov = str(Path(fq) / self.file.with_suffix(".xml").name) check_requirements("nncf>=2.14.0") import nncf def transform_fn(data_item) -> np.ndarray: """Quantization transform function.""" data_item: torch.Tensor = data_item["img"] if isinstance(data_item, dict) else data_item assert data_item.dtype == torch.uint8, "Input image must be uint8 for the quantization preprocessing" im = data_item.numpy().astype(np.float32) / 255.0 # uint8 to fp16/32 and 0 - 255 to 0.0 - 1.0 return np.expand_dims(im, 0) if im.ndim == 3 else im # Generate calibration data for integer quantization ignored_scope = None if isinstance(self.model.model[-1], Detect): # Includes all Detect subclasses like Segment, Pose, OBB, WorldDetect head_module_name = ".".join(list(self.model.named_modules())[-1][0].split(".")[:2]) ignored_scope = nncf.IgnoredScope( # ignore operations patterns=[ f".*{head_module_name}/.*/Add", f".*{head_module_name}/.*/Sub*", f".*{head_module_name}/.*/Mul*", f".*{head_module_name}/.*/Div*", f".*{head_module_name}\\.dfl.*", ], types=["Sigmoid"], ) quantized_ov_model = nncf.quantize( model=ov_model, calibration_dataset=nncf.Dataset(self.get_int8_calibration_dataloader(prefix), transform_fn), preset=nncf.QuantizationPreset.MIXED, ignored_scope=ignored_scope, ) serialize(quantized_ov_model, fq_ov) return fq, None f = str(self.file).replace(self.file.suffix, f"_openvino_model{os.sep}") f_ov = str(Path(f) / self.file.with_suffix(".xml").name) serialize(ov_model, f_ov) return f, None @try_export def export_paddle(self, prefix=colorstr("PaddlePaddle:")): """YOLO Paddle export.""" check_requirements(("paddlepaddle-gpu" if torch.cuda.is_available() else "paddlepaddle", "x2paddle")) import x2paddle # noqa from x2paddle.convert import pytorch2paddle # noqa LOGGER.info(f"\n{prefix} starting export with X2Paddle {x2paddle.__version__}...") f = str(self.file).replace(self.file.suffix, f"_paddle_model{os.sep}") pytorch2paddle(module=self.model, save_dir=f, jit_type="trace", input_examples=[self.im]) # export yaml_save(Path(f) / "metadata.yaml", self.metadata) # add metadata.yaml return f, None @try_export def export_mnn(self, prefix=colorstr("MNN:")): """YOLOv8 MNN export using MNN https://github.com/alibaba/MNN.""" f_onnx, _ = self.export_onnx() # get onnx model first check_requirements("MNN>=2.9.6") import MNN # noqa from MNN.tools import mnnconvert # Setup and checks LOGGER.info(f"\n{prefix} starting export with MNN {MNN.version()}...") assert Path(f_onnx).exists(), f"failed to export ONNX file: {f_onnx}" f = str(self.file.with_suffix(".mnn")) # MNN model file args = ["", "-f", "ONNX", "--modelFile", f_onnx, "--MNNModel", f, "--bizCode", json.dumps(self.metadata)] if self.args.int8: args.extend(("--weightQuantBits", "8")) if self.args.half: args.append("--fp16") mnnconvert.convert(args) # remove scratch file for model convert optimize convert_scratch = Path(self.file.parent / ".__convert_external_data.bin") if convert_scratch.exists(): convert_scratch.unlink() return f, None @try_export def export_ncnn(self, prefix=colorstr("NCNN:")): """YOLO NCNN export using PNNX https://github.com/pnnx/pnnx.""" check_requirements("ncnn") import ncnn # noqa LOGGER.info(f"\n{prefix} starting export with NCNN {ncnn.__version__}...") f = Path(str(self.file).replace(self.file.suffix, f"_ncnn_model{os.sep}")) f_ts = self.file.with_suffix(".torchscript") name = Path("pnnx.exe" if WINDOWS else "pnnx") # PNNX filename pnnx = name if name.is_file() else (ROOT / name) if not pnnx.is_file(): LOGGER.warning( f"{prefix} WARNING ⚠️ PNNX not found. Attempting to download binary file from " "https://github.com/pnnx/pnnx/.\nNote PNNX Binary file must be placed in current working directory " f"or in {ROOT}. See PNNX repo for full installation instructions." ) system = "macos" if MACOS else "windows" if WINDOWS else "linux-aarch64" if ARM64 else "linux" try: release, assets = get_github_assets(repo="pnnx/pnnx") asset = [x for x in assets if f"{system}.zip" in x][0] assert isinstance(asset, str), "Unable to retrieve PNNX repo assets" # i.e. pnnx-20240410-macos.zip LOGGER.info(f"{prefix} successfully found latest PNNX asset file {asset}") except Exception as e: release = "20240410" asset = f"pnnx-{release}-{system}.zip" LOGGER.warning(f"{prefix} WARNING ⚠️ PNNX GitHub assets not found: {e}, using default {asset}") unzip_dir = safe_download(f"https://github.com/pnnx/pnnx/releases/download/{release}/{asset}", delete=True) if check_is_path_safe(Path.cwd(), unzip_dir): # avoid path traversal security vulnerability shutil.move(src=unzip_dir / name, dst=pnnx) # move binary to ROOT pnnx.chmod(0o777) # set read, write, and execute permissions for everyone shutil.rmtree(unzip_dir) # delete unzip dir ncnn_args = [ f"ncnnparam={f / 'model.ncnn.param'}", f"ncnnbin={f / 'model.ncnn.bin'}", f"ncnnpy={f / 'model_ncnn.py'}", ] pnnx_args = [ f"pnnxparam={f / 'model.pnnx.param'}", f"pnnxbin={f / 'model.pnnx.bin'}", f"pnnxpy={f / 'model_pnnx.py'}", f"pnnxonnx={f / 'model.pnnx.onnx'}", ] cmd = [ str(pnnx), str(f_ts), *ncnn_args, *pnnx_args, f"fp16={int(self.args.half)}", f"device={self.device.type}", f'inputshape="{[self.args.batch, 3, *self.imgsz]}"', ] f.mkdir(exist_ok=True) # make ncnn_model directory LOGGER.info(f"{prefix} running '{' '.join(cmd)}'") subprocess.run(cmd, check=True) # Remove debug files pnnx_files = [x.split("=")[-1] for x in pnnx_args] for f_debug in ("debug.bin", "debug.param", "debug2.bin", "debug2.param", *pnnx_files): Path(f_debug).unlink(missing_ok=True) yaml_save(f / "metadata.yaml", self.metadata) # add metadata.yaml return str(f), None @try_export def export_coreml(self, prefix=colorstr("CoreML:")): """YOLO CoreML export.""" mlmodel = self.args.format.lower() == "mlmodel" # legacy *.mlmodel export format requested check_requirements("coremltools>=6.0,<=6.2" if mlmodel else "coremltools>=7.0") import coremltools as ct # noqa LOGGER.info(f"\n{prefix} starting export with coremltools {ct.__version__}...") assert not WINDOWS, "CoreML export is not supported on Windows, please run on macOS or Linux." assert self.args.batch == 1, "CoreML batch sizes > 1 are not supported. Please retry at 'batch=1'." f = self.file.with_suffix(".mlmodel" if mlmodel else ".mlpackage") if f.is_dir(): shutil.rmtree(f) if self.args.nms and getattr(self.model, "end2end", False): LOGGER.warning(f"{prefix} WARNING ⚠️ 'nms=True' is not available for end2end models. Forcing 'nms=False'.") self.args.nms = False bias = [0.0, 0.0, 0.0] scale = 1 / 255 classifier_config = None if self.model.task == "classify": classifier_config = ct.ClassifierConfig(list(self.model.names.values())) if self.args.nms else None model = self.model elif self.model.task == "detect": model = IOSDetectModel(self.model, self.im) if self.args.nms else self.model else: if self.args.nms: LOGGER.warning(f"{prefix} WARNING ⚠️ 'nms=True' is only available for Detect models like 'yolov8n.pt'.") # TODO CoreML Segment and Pose model pipelining model = self.model ts = torch.jit.trace(model.eval(), self.im, strict=False) # TorchScript model ct_model = ct.convert( ts, inputs=[ct.ImageType("image", shape=self.im.shape, scale=scale, bias=bias)], classifier_config=classifier_config, convert_to="neuralnetwork" if mlmodel else "mlprogram", ) bits, mode = (8, "kmeans") if self.args.int8 else (16, "linear") if self.args.half else (32, None) if bits < 32: if "kmeans" in mode: check_requirements("scikit-learn") # scikit-learn package required for k-means quantization if mlmodel: ct_model = ct.models.neural_network.quantization_utils.quantize_weights(ct_model, bits, mode) elif bits == 8: # mlprogram already quantized to FP16 import coremltools.optimize.coreml as cto op_config = cto.OpPalettizerConfig(mode="kmeans", nbits=bits, weight_threshold=512) config = cto.OptimizationConfig(global_config=op_config) ct_model = cto.palettize_weights(ct_model, config=config) if self.args.nms and self.model.task == "detect": if mlmodel: # coremltools<=6.2 NMS export requires Python<3.11 check_version(PYTHON_VERSION, "<3.11", name="Python ", hard=True) weights_dir = None else: ct_model.save(str(f)) # save otherwise weights_dir does not exist weights_dir = str(f / "Data/com.apple.CoreML/weights") ct_model = self._pipeline_coreml(ct_model, weights_dir=weights_dir) m = self.metadata # metadata dict ct_model.short_description = m.pop("description") ct_model.author = m.pop("author") ct_model.license = m.pop("license") ct_model.version = m.pop("version") ct_model.user_defined_metadata.update({k: str(v) for k, v in m.items()}) try: ct_model.save(str(f)) # save *.mlpackage except Exception as e: LOGGER.warning( f"{prefix} WARNING ⚠️ CoreML export to *.mlpackage failed ({e}), reverting to *.mlmodel export. " f"Known coremltools Python 3.11 and Windows bugs https://github.com/apple/coremltools/issues/1928." ) f = f.with_suffix(".mlmodel") ct_model.save(str(f)) return f, ct_model @try_export def export_engine(self, dla=None, prefix=colorstr("TensorRT:")): """YOLO TensorRT export https://developer.nvidia.com/tensorrt.""" assert self.im.device.type != "cpu", "export running on CPU but must be on GPU, i.e. use 'device=0'" f_onnx, _ = self.export_onnx() # run before TRT import https://github.com/ultralytics/ultralytics/issues/7016 try: import tensorrt as trt # noqa except ImportError: if LINUX: check_requirements("tensorrt>7.0.0,!=10.1.0") import tensorrt as trt # noqa check_version(trt.__version__, ">=7.0.0", hard=True) check_version(trt.__version__, "!=10.1.0", msg="https://github.com/ultralytics/ultralytics/pull/14239") # Setup and checks LOGGER.info(f"\n{prefix} starting export with TensorRT {trt.__version__}...") is_trt10 = int(trt.__version__.split(".")[0]) >= 10 # is TensorRT >= 10 assert Path(f_onnx).exists(), f"failed to export ONNX file: {f_onnx}" f = self.file.with_suffix(".engine") # TensorRT engine file logger = trt.Logger(trt.Logger.INFO) if self.args.verbose: logger.min_severity = trt.Logger.Severity.VERBOSE # Engine builder builder = trt.Builder(logger) config = builder.create_builder_config() workspace = int(self.args.workspace * (1 << 30)) if self.args.workspace is not None else 0 if is_trt10 and workspace > 0: config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, workspace) elif workspace > 0: # TensorRT versions 7, 8 config.max_workspace_size = workspace flag = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) network = builder.create_network(flag) half = builder.platform_has_fast_fp16 and self.args.half int8 = builder.platform_has_fast_int8 and self.args.int8 # Optionally switch to DLA if enabled if dla is not None: if not IS_JETSON: raise ValueError("DLA is only available on NVIDIA Jetson devices") LOGGER.info(f"{prefix} enabling DLA on core {dla}...") if not self.args.half and not self.args.int8: raise ValueError( "DLA requires either 'half=True' (FP16) or 'int8=True' (INT8) to be enabled. Please enable one of them and try again." ) config.default_device_type = trt.DeviceType.DLA config.DLA_core = int(dla) config.set_flag(trt.BuilderFlag.GPU_FALLBACK) # Read ONNX file parser = trt.OnnxParser(network, logger) if not parser.parse_from_file(f_onnx): raise RuntimeError(f"failed to load ONNX file: {f_onnx}") # Network inputs inputs = [network.get_input(i) for i in range(network.num_inputs)] outputs = [network.get_output(i) for i in range(network.num_outputs)] for inp in inputs: LOGGER.info(f'{prefix} input "{inp.name}" with shape{inp.shape} {inp.dtype}') for out in outputs: LOGGER.info(f'{prefix} output "{out.name}" with shape{out.shape} {out.dtype}') if self.args.dynamic: shape = self.im.shape if shape[0] <= 1: LOGGER.warning(f"{prefix} WARNING ⚠️ 'dynamic=True' model requires max batch size, i.e. 'batch=16'") profile = builder.create_optimization_profile() min_shape = (1, shape[1], 32, 32) # minimum input shape max_shape = (*shape[:2], *(int(max(1, workspace) * d) for d in shape[2:])) # max input shape for inp in inputs: profile.set_shape(inp.name, min=min_shape, opt=shape, max=max_shape) config.add_optimization_profile(profile) LOGGER.info(f"{prefix} building {'INT8' if int8 else 'FP' + ('16' if half else '32')} engine as {f}") if int8: config.set_flag(trt.BuilderFlag.INT8) config.set_calibration_profile(profile) config.profiling_verbosity = trt.ProfilingVerbosity.DETAILED class EngineCalibrator(trt.IInt8Calibrator): def __init__( self, dataset, # ultralytics.data.build.InfiniteDataLoader batch: int, cache: str = "", ) -> None: trt.IInt8Calibrator.__init__(self) self.dataset = dataset self.data_iter = iter(dataset) self.algo = trt.CalibrationAlgoType.ENTROPY_CALIBRATION_2 self.batch = batch self.cache = Path(cache) def get_algorithm(self) -> trt.CalibrationAlgoType: """Get the calibration algorithm to use.""" return self.algo def get_batch_size(self) -> int: """Get the batch size to use for calibration.""" return self.batch or 1 def get_batch(self, names) -> list: """Get the next batch to use for calibration, as a list of device memory pointers.""" try: im0s = next(self.data_iter)["img"] / 255.0 im0s = im0s.to("cuda") if im0s.device.type == "cpu" else im0s return [int(im0s.data_ptr())] except StopIteration: # Return [] or None, signal to TensorRT there is no calibration data remaining return None def read_calibration_cache(self) -> bytes: """Use existing cache instead of calibrating again, otherwise, implicitly return None.""" if self.cache.exists() and self.cache.suffix == ".cache": return self.cache.read_bytes() def write_calibration_cache(self, cache) -> None: """Write calibration cache to disk.""" _ = self.cache.write_bytes(cache) # Load dataset w/ builder (for batching) and calibrate config.int8_calibrator = EngineCalibrator( dataset=self.get_int8_calibration_dataloader(prefix), batch=2 * self.args.batch, # TensorRT INT8 calibration should use 2x batch size cache=str(self.file.with_suffix(".cache")), ) elif half: config.set_flag(trt.BuilderFlag.FP16) # Free CUDA memory del self.model gc.collect() torch.cuda.empty_cache() # Write file build = builder.build_serialized_network if is_trt10 else builder.build_engine with build(network, config) as engine, open(f, "wb") as t: # Metadata meta = json.dumps(self.metadata) t.write(len(meta).to_bytes(4, byteorder="little", signed=True)) t.write(meta.encode()) # Model t.write(engine if is_trt10 else engine.serialize()) return f, None @try_export def export_saved_model(self, prefix=colorstr("TensorFlow SavedModel:")): """YOLO TensorFlow SavedModel export.""" cuda = torch.cuda.is_available() try: import tensorflow as tf # noqa except ImportError: suffix = "-macos" if MACOS else "-aarch64" if ARM64 else "" if cuda else "-cpu" version = ">=2.0.0" check_requirements(f"tensorflow{suffix}{version}") import tensorflow as tf # noqa check_requirements( ( "keras", # required by 'onnx2tf' package "tf_keras", # required by 'onnx2tf' package "sng4onnx>=1.0.1", # required by 'onnx2tf' package "onnx_graphsurgeon>=0.3.26", # required by 'onnx2tf' package "onnx>=1.12.0", "onnx2tf>1.17.5,<=1.26.3", "onnxslim>=0.1.31", "tflite_support<=0.4.3" if IS_JETSON else "tflite_support", # fix ImportError 'GLIBCXX_3.4.29' "flatbuffers>=23.5.26,<100", # update old 'flatbuffers' included inside tensorflow package "onnxruntime-gpu" if cuda else "onnxruntime", ), cmds="--extra-index-url https://pypi.ngc.nvidia.com", # onnx_graphsurgeon only on NVIDIA ) LOGGER.info(f"\n{prefix} starting export with tensorflow {tf.__version__}...") check_version( tf.__version__, ">=2.0.0", name="tensorflow", verbose=True, msg="https://github.com/ultralytics/ultralytics/issues/5161", ) import onnx2tf f = Path(str(self.file).replace(self.file.suffix, "_saved_model")) if f.is_dir(): shutil.rmtree(f) # delete output folder # Pre-download calibration file to fix https://github.com/PINTO0309/onnx2tf/issues/545 onnx2tf_file = Path("calibration_image_sample_data_20x128x128x3_float32.npy") if not onnx2tf_file.exists(): attempt_download_asset(f"{onnx2tf_file}.zip", unzip=True, delete=True) # Export to ONNX self.args.simplify = True f_onnx, _ = self.export_onnx() # Export to TF np_data = None if self.args.int8: tmp_file = f / "tmp_tflite_int8_calibration_images.npy" # int8 calibration images file if self.args.data: f.mkdir() images = [batch["img"] for batch in self.get_int8_calibration_dataloader(prefix)] images = torch.nn.functional.interpolate(torch.cat(images, 0).float(), size=self.imgsz).permute( 0, 2, 3, 1 ) np.save(str(tmp_file), images.numpy().astype(np.float32)) # BHWC np_data = [["images", tmp_file, [[[[0, 0, 0]]]], [[[[255, 255, 255]]]]]] LOGGER.info(f"{prefix} starting TFLite export with onnx2tf {onnx2tf.__version__}...") keras_model = onnx2tf.convert( input_onnx_file_path=f_onnx, output_folder_path=str(f), not_use_onnxsim=True, verbosity="error", # note INT8-FP16 activation bug https://github.com/ultralytics/ultralytics/issues/15873 output_integer_quantized_tflite=self.args.int8, quant_type="per-tensor", # "per-tensor" (faster) or "per-channel" (slower but more accurate) custom_input_op_name_np_data_path=np_data, disable_group_convolution=True, # for end-to-end model compatibility enable_batchmatmul_unfold=True, # for end-to-end model compatibility ) yaml_save(f / "metadata.yaml", self.metadata) # add metadata.yaml # Remove/rename TFLite models if self.args.int8: tmp_file.unlink(missing_ok=True) for file in f.rglob("*_dynamic_range_quant.tflite"): file.rename(file.with_name(file.stem.replace("_dynamic_range_quant", "_int8") + file.suffix)) for file in f.rglob("*_integer_quant_with_int16_act.tflite"): file.unlink() # delete extra fp16 activation TFLite files # Add TFLite metadata for file in f.rglob("*.tflite"): f.unlink() if "quant_with_int16_act.tflite" in str(f) else self._add_tflite_metadata(file) return str(f), keras_model # or keras_model = tf.saved_model.load(f, tags=None, options=None) @try_export def export_pb(self, keras_model, prefix=colorstr("TensorFlow GraphDef:")): """YOLO TensorFlow GraphDef *.pb export https://github.com/leimao/Frozen_Graph_TensorFlow.""" import tensorflow as tf # noqa from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2 # noqa LOGGER.info(f"\n{prefix} starting export with tensorflow {tf.__version__}...") f = self.file.with_suffix(".pb") m = tf.function(lambda x: keras_model(x)) # full model m = m.get_concrete_function(tf.TensorSpec(keras_model.inputs[0].shape, keras_model.inputs[0].dtype)) frozen_func = convert_variables_to_constants_v2(m) frozen_func.graph.as_graph_def() tf.io.write_graph(graph_or_graph_def=frozen_func.graph, logdir=str(f.parent), name=f.name, as_text=False) return f, None @try_export def export_tflite(self, keras_model, nms, agnostic_nms, prefix=colorstr("TensorFlow Lite:")): """YOLO TensorFlow Lite export.""" # BUG https://github.com/ultralytics/ultralytics/issues/13436 import tensorflow as tf # noqa LOGGER.info(f"\n{prefix} starting export with tensorflow {tf.__version__}...") saved_model = Path(str(self.file).replace(self.file.suffix, "_saved_model")) if self.args.int8: f = saved_model / f"{self.file.stem}_int8.tflite" # fp32 in/out elif self.args.half: f = saved_model / f"{self.file.stem}_float16.tflite" # fp32 in/out else: f = saved_model / f"{self.file.stem}_float32.tflite" return str(f), None @try_export def export_edgetpu(self, tflite_model="", prefix=colorstr("Edge TPU:")): """YOLO Edge TPU export https://coral.ai/docs/edgetpu/models-intro/.""" LOGGER.warning(f"{prefix} WARNING ⚠️ Edge TPU known bug https://github.com/ultralytics/ultralytics/issues/1185") cmd = "edgetpu_compiler --version" help_url = "https://coral.ai/docs/edgetpu/compiler/" assert LINUX, f"export only supported on Linux. See {help_url}" if subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True).returncode != 0: LOGGER.info(f"\n{prefix} export requires Edge TPU compiler. Attempting install from {help_url}") for c in ( "curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -", 'echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | ' "sudo tee /etc/apt/sources.list.d/coral-edgetpu.list", "sudo apt-get update", "sudo apt-get install edgetpu-compiler", ): subprocess.run(c if is_sudo_available() else c.replace("sudo ", ""), shell=True, check=True) ver = subprocess.run(cmd, shell=True, capture_output=True, check=True).stdout.decode().split()[-1] LOGGER.info(f"\n{prefix} starting export with Edge TPU compiler {ver}...") f = str(tflite_model).replace(".tflite", "_edgetpu.tflite") # Edge TPU model cmd = ( "edgetpu_compiler " f'--out_dir "{Path(f).parent}" ' "--show_operations " "--search_delegate " "--delegate_search_step 30 " "--timeout_sec 180 " f'"{tflite_model}"' ) LOGGER.info(f"{prefix} running '{cmd}'") subprocess.run(cmd, shell=True) self._add_tflite_metadata(f) return f, None @try_export def export_tfjs(self, prefix=colorstr("TensorFlow.js:")): """YOLO TensorFlow.js export.""" check_requirements("tensorflowjs") if ARM64: # Fix error: `np.object` was a deprecated alias for the builtin `object` when exporting to TF.js on ARM64 check_requirements("numpy==1.23.5") import tensorflow as tf import tensorflowjs as tfjs # noqa LOGGER.info(f"\n{prefix} starting export with tensorflowjs {tfjs.__version__}...") f = str(self.file).replace(self.file.suffix, "_web_model") # js dir f_pb = str(self.file.with_suffix(".pb")) # *.pb path gd = tf.Graph().as_graph_def() # TF GraphDef with open(f_pb, "rb") as file: gd.ParseFromString(file.read()) outputs = ",".join(gd_outputs(gd)) LOGGER.info(f"\n{prefix} output node names: {outputs}") quantization = "--quantize_float16" if self.args.half else "--quantize_uint8" if self.args.int8 else "" with spaces_in_path(f_pb) as fpb_, spaces_in_path(f) as f_: # exporter can not handle spaces in path cmd = ( "tensorflowjs_converter " f'--input_format=tf_frozen_model {quantization} --output_node_names={outputs} "{fpb_}" "{f_}"' ) LOGGER.info(f"{prefix} running '{cmd}'") subprocess.run(cmd, shell=True) if " " in f: LOGGER.warning(f"{prefix} WARNING ⚠️ your model may not work correctly with spaces in path '{f}'.") # Add metadata yaml_save(Path(f) / "metadata.yaml", self.metadata) # add metadata.yaml return f, None @try_export def export_imx(self, prefix=colorstr("IMX:")): """YOLO IMX export.""" gptq = False assert LINUX, ( "export only supported on Linux. See https://developer.aitrios.sony-semicon.com/en/raspberrypi-ai-camera/documentation/imx500-converter" ) if getattr(self.model, "end2end", False): raise ValueError("IMX export is not supported for end2end models.") if "C2f" not in self.model.__str__(): raise ValueError("IMX export is only supported for YOLOv8n detection models") check_requirements(("model-compression-toolkit==2.1.1", "sony-custom-layers==0.2.0", "tensorflow==2.12.0")) check_requirements("imx500-converter[pt]==3.14.3") # Separate requirements for imx500-converter import model_compression_toolkit as mct import onnx from sony_custom_layers.pytorch.object_detection.nms import multiclass_nms try: out = subprocess.run( ["java", "--version"], check=True, capture_output=True ) # Java 17 is required for imx500-converter if "openjdk 17" not in str(out.stdout): raise FileNotFoundError except FileNotFoundError: c = ["apt", "install", "-y", "openjdk-17-jdk", "openjdk-17-jre"] if is_sudo_available(): c.insert(0, "sudo") subprocess.run(c, check=True) def representative_dataset_gen(dataloader=self.get_int8_calibration_dataloader(prefix)): for batch in dataloader: img = batch["img"] img = img / 255.0 yield [img] tpc = mct.get_target_platform_capabilities( fw_name="pytorch", target_platform_name="imx500", target_platform_version="v1" ) config = mct.core.CoreConfig( mixed_precision_config=mct.core.MixedPrecisionQuantizationConfig(num_of_images=10), quantization_config=mct.core.QuantizationConfig(concat_threshold_update=True), ) resource_utilization = mct.core.ResourceUtilization(weights_memory=3146176 * 0.76) quant_model = ( mct.gptq.pytorch_gradient_post_training_quantization( # Perform Gradient-Based Post Training Quantization model=self.model, representative_data_gen=representative_dataset_gen, target_resource_utilization=resource_utilization, gptq_config=mct.gptq.get_pytorch_gptq_config(n_epochs=1000, use_hessian_based_weights=False), core_config=config, target_platform_capabilities=tpc, )[0] if gptq else mct.ptq.pytorch_post_training_quantization( # Perform post training quantization in_module=self.model, representative_data_gen=representative_dataset_gen, target_resource_utilization=resource_utilization, core_config=config, target_platform_capabilities=tpc, )[0] ) class NMSWrapper(torch.nn.Module): def __init__( self, model: torch.nn.Module, score_threshold: float = 0.001, iou_threshold: float = 0.7, max_detections: int = 300, ): """ Wrapping PyTorch Module with multiclass_nms layer from sony_custom_layers. Args: model (nn.Module): Model instance. score_threshold (float): Score threshold for non-maximum suppression. iou_threshold (float): Intersection over union threshold for non-maximum suppression. max_detections (float): The number of detections to return. """ super().__init__() self.model = model self.score_threshold = score_threshold self.iou_threshold = iou_threshold self.max_detections = max_detections def forward(self, images): # model inference outputs = self.model(images) boxes = outputs[0] scores = outputs[1] nms = multiclass_nms( boxes=boxes, scores=scores, score_threshold=self.score_threshold, iou_threshold=self.iou_threshold, max_detections=self.max_detections, ) return nms quant_model = NMSWrapper( model=quant_model, score_threshold=self.args.conf or 0.001, iou_threshold=self.args.iou, max_detections=self.args.max_det, ).to(self.device) f = Path(str(self.file).replace(self.file.suffix, "_imx_model")) f.mkdir(exist_ok=True) onnx_model = f / Path(str(self.file).replace(self.file.suffix, "_imx.onnx")) # js dir mct.exporter.pytorch_export_model( model=quant_model, save_model_path=onnx_model, repr_dataset=representative_dataset_gen ) model_onnx = onnx.load(onnx_model) # load onnx model for k, v in self.metadata.items(): meta = model_onnx.metadata_props.add() meta.key, meta.value = k, str(v) onnx.save(model_onnx, onnx_model) subprocess.run( ["imxconv-pt", "-i", str(onnx_model), "-o", str(f), "--no-input-persistency", "--overwrite-output"], check=True, ) # Needed for imx models. with open(f / "labels.txt", "w") as file: file.writelines([f"{name}\n" for _, name in self.model.names.items()]) return f, None def _add_tflite_metadata(self, file): """Add metadata to *.tflite models per https://www.tensorflow.org/lite/models/convert/metadata.""" import flatbuffers try: # TFLite Support bug https://github.com/tensorflow/tflite-support/issues/954#issuecomment-2108570845 from tensorflow_lite_support.metadata import metadata_schema_py_generated as schema # noqa from tensorflow_lite_support.metadata.python import metadata # noqa except ImportError: # ARM64 systems may not have the 'tensorflow_lite_support' package available from tflite_support import metadata # noqa from tflite_support import metadata_schema_py_generated as schema # noqa # Create model info model_meta = schema.ModelMetadataT() model_meta.name = self.metadata["description"] model_meta.version = self.metadata["version"] model_meta.author = self.metadata["author"] model_meta.license = self.metadata["license"] # Label file tmp_file = Path(file).parent / "temp_meta.txt" with open(tmp_file, "w") as f: f.write(str(self.metadata)) label_file = schema.AssociatedFileT() label_file.name = tmp_file.name label_file.type = schema.AssociatedFileType.TENSOR_AXIS_LABELS # Create input info input_meta = schema.TensorMetadataT() input_meta.name = "image" input_meta.description = "Input image to be detected." input_meta.content = schema.ContentT() input_meta.content.contentProperties = schema.ImagePropertiesT() input_meta.content.contentProperties.colorSpace = schema.ColorSpaceType.RGB input_meta.content.contentPropertiesType = schema.ContentProperties.ImageProperties # Create output info output1 = schema.TensorMetadataT() output1.name = "output" output1.description = "Coordinates of detected objects, class labels, and confidence score" output1.associatedFiles = [label_file] if self.model.task == "segment": output2 = schema.TensorMetadataT() output2.name = "output" output2.description = "Mask protos" output2.associatedFiles = [label_file] # Create subgraph info subgraph = schema.SubGraphMetadataT() subgraph.inputTensorMetadata = [input_meta] subgraph.outputTensorMetadata = [output1, output2] if self.model.task == "segment" else [output1] model_meta.subgraphMetadata = [subgraph] b = flatbuffers.Builder(0) b.Finish(model_meta.Pack(b), metadata.MetadataPopulator.METADATA_FILE_IDENTIFIER) metadata_buf = b.Output() populator = metadata.MetadataPopulator.with_model_file(str(file)) populator.load_metadata_buffer(metadata_buf) populator.load_associated_files([str(tmp_file)]) populator.populate() tmp_file.unlink() def _pipeline_coreml(self, model, weights_dir=None, prefix=colorstr("CoreML Pipeline:")): """YOLO CoreML pipeline.""" import coremltools as ct # noqa LOGGER.info(f"{prefix} starting pipeline with coremltools {ct.__version__}...") _, _, h, w = list(self.im.shape) # BCHW # Output shapes spec = model.get_spec() out0, out1 = iter(spec.description.output) if MACOS: from PIL import Image img = Image.new("RGB", (w, h)) # w=192, h=320 out = model.predict({"image": img}) out0_shape = out[out0.name].shape # (3780, 80) out1_shape = out[out1.name].shape # (3780, 4) else: # linux and windows can not run model.predict(), get sizes from PyTorch model output y out0_shape = self.output_shape[2], self.output_shape[1] - 4 # (3780, 80) out1_shape = self.output_shape[2], 4 # (3780, 4) # Checks names = self.metadata["names"] nx, ny = spec.description.input[0].type.imageType.width, spec.description.input[0].type.imageType.height _, nc = out0_shape # number of anchors, number of classes assert len(names) == nc, f"{len(names)} names found for nc={nc}" # check # Define output shapes (missing) out0.type.multiArrayType.shape[:] = out0_shape # (3780, 80) out1.type.multiArrayType.shape[:] = out1_shape # (3780, 4) # Model from spec model = ct.models.MLModel(spec, weights_dir=weights_dir) # 3. Create NMS protobuf nms_spec = ct.proto.Model_pb2.Model() nms_spec.specificationVersion = 5 for i in range(2): decoder_output = model._spec.description.output[i].SerializeToString() nms_spec.description.input.add() nms_spec.description.input[i].ParseFromString(decoder_output) nms_spec.description.output.add() nms_spec.description.output[i].ParseFromString(decoder_output) nms_spec.description.output[0].name = "confidence" nms_spec.description.output[1].name = "coordinates" output_sizes = [nc, 4] for i in range(2): ma_type = nms_spec.description.output[i].type.multiArrayType ma_type.shapeRange.sizeRanges.add() ma_type.shapeRange.sizeRanges[0].lowerBound = 0 ma_type.shapeRange.sizeRanges[0].upperBound = -1 ma_type.shapeRange.sizeRanges.add() ma_type.shapeRange.sizeRanges[1].lowerBound = output_sizes[i] ma_type.shapeRange.sizeRanges[1].upperBound = output_sizes[i] del ma_type.shape[:] nms = nms_spec.nonMaximumSuppression nms.confidenceInputFeatureName = out0.name # 1x507x80 nms.coordinatesInputFeatureName = out1.name # 1x507x4 nms.confidenceOutputFeatureName = "confidence" nms.coordinatesOutputFeatureName = "coordinates" nms.iouThresholdInputFeatureName = "iouThreshold" nms.confidenceThresholdInputFeatureName = "confidenceThreshold" nms.iouThreshold = 0.45 nms.confidenceThreshold = 0.25 nms.pickTop.perClass = True nms.stringClassLabels.vector.extend(names.values()) nms_model = ct.models.MLModel(nms_spec) # 4. Pipeline models together pipeline = ct.models.pipeline.Pipeline( input_features=[ ("image", ct.models.datatypes.Array(3, ny, nx)), ("iouThreshold", ct.models.datatypes.Double()), ("confidenceThreshold", ct.models.datatypes.Double()), ], output_features=["confidence", "coordinates"], ) pipeline.add_model(model) pipeline.add_model(nms_model) # Correct datatypes pipeline.spec.description.input[0].ParseFromString(model._spec.description.input[0].SerializeToString()) pipeline.spec.description.output[0].ParseFromString(nms_model._spec.description.output[0].SerializeToString()) pipeline.spec.description.output[1].ParseFromString(nms_model._spec.description.output[1].SerializeToString()) # Update metadata pipeline.spec.specificationVersion = 5 pipeline.spec.description.metadata.userDefined.update( {"IoU threshold": str(nms.iouThreshold), "Confidence threshold": str(nms.confidenceThreshold)} ) # Save the model model = ct.models.MLModel(pipeline.spec, weights_dir=weights_dir) model.input_description["image"] = "Input image" model.input_description["iouThreshold"] = f"(optional) IoU threshold override (default: {nms.iouThreshold})" model.input_description["confidenceThreshold"] = ( f"(optional) Confidence threshold override (default: {nms.confidenceThreshold})" ) model.output_description["confidence"] = 'Boxes × Class confidence (see user-defined metadata "classes")' model.output_description["coordinates"] = "Boxes × [x, y, width, height] (relative to image size)" LOGGER.info(f"{prefix} pipeline success") return model def add_callback(self, event: str, callback): """Appends the given callback.""" self.callbacks[event].append(callback) def run_callbacks(self, event: str): """Execute all callbacks for a given event.""" for callback in self.callbacks.get(event, []): callback(self) class IOSDetectModel(torch.nn.Module): """Wrap an Ultralytics YOLO model for Apple iOS CoreML export.""" def __init__(self, model, im): """Initialize the IOSDetectModel class with a YOLO model and example image.""" super().__init__() _, _, h, w = im.shape # batch, channel, height, width self.model = model self.nc = len(model.names) # number of classes if w == h: self.normalize = 1.0 / w # scalar else: self.normalize = torch.tensor([1.0 / w, 1.0 / h, 1.0 / w, 1.0 / h]) # broadcast (slower, smaller) def forward(self, x): """Normalize predictions of object detection model with input size-dependent factors.""" xywh, cls = self.model(x)[0].transpose(0, 1).split((4, self.nc), 1) return cls, xywh * self.normalize # confidence (3780, 80), coordinates (3780, 4)