diff --git a/luxonis_train/__main__.py b/luxonis_train/__main__.py index 24cfd69b..b1fd3971 100644 --- a/luxonis_train/__main__.py +++ b/luxonis_train/__main__.py @@ -200,6 +200,20 @@ def inspect( exit() +@app.command() +def archive( + executable: Annotated[ + Optional[Path], typer.Option(help="Path to the model file.", show_default=False) + ], + config: ConfigType = None, + opts: OptsType = None, +): + """Generate NN archive.""" + from luxonis_train.core import Archiver + + Archiver(str(config), opts).archive(executable) + + def version_callback(value: bool): if value: typer.echo(f"LuxonisTrain Version: {version(__package__)}") diff --git a/luxonis_train/callbacks/__init__.py b/luxonis_train/callbacks/__init__.py index cec9e000..ae1fe86e 100644 --- a/luxonis_train/callbacks/__init__.py +++ b/luxonis_train/callbacks/__init__.py @@ -8,6 +8,7 @@ from luxonis_train.utils.registry import CALLBACKS +from .archive_on_train_end import ArchiveOnTrainEnd from .export_on_train_end import ExportOnTrainEnd from .luxonis_progress_bar import LuxonisProgressBar from .metadata_logger import MetadataLogger @@ -23,6 +24,7 @@ __all__ = [ + "ArchiveOnTrainEnd", "ExportOnTrainEnd", "LuxonisProgressBar", "MetadataLogger", diff --git a/luxonis_train/callbacks/archive_on_train_end.py b/luxonis_train/callbacks/archive_on_train_end.py new file mode 100644 index 00000000..4f5b6bc2 --- /dev/null +++ b/luxonis_train/callbacks/archive_on_train_end.py @@ -0,0 +1,72 @@ +import logging +import os +from pathlib import Path +from typing import cast + +import lightning.pytorch as pl + +from luxonis_train.utils.config import Config +from luxonis_train.utils.registry import CALLBACKS +from luxonis_train.utils.tracker import LuxonisTrackerPL + + +@CALLBACKS.register_module() +class ArchiveOnTrainEnd(pl.Callback): + def __init__(self, upload_to_mlflow: bool = False): + """Callback that performs archiving of onnx or exported model at the end of + training/export. TODO: description. + + @type upload_to_mlflow: bool + @param upload_to_mlflow: If set to True, overrides the upload url in Archiver + with currently active MLFlow run (if present). + """ + super().__init__() + self.upload_to_mlflow = upload_to_mlflow + + def on_train_end(self, trainer: pl.Trainer, pl_module: pl.LightningModule) -> None: + """Archives the model on train end. + + @type trainer: L{pl.Trainer} + @param trainer: Pytorch Lightning trainer. + @type pl_module: L{pl.LightningModule} + @param pl_module: Pytorch Lightning module. + @raises RuntimeError: If no best model path is found. + """ + from luxonis_train.core.archiver import Archiver + + model_checkpoint_callbacks = [ + c + for c in trainer.callbacks # type: ignore + if isinstance(c, pl.callbacks.ModelCheckpoint) # type: ignore + ] + + # NOTE: assume that first checkpoint callback is based on val loss + best_model_path = model_checkpoint_callbacks[0].best_model_path + if not best_model_path: + raise RuntimeError( + "No best model path found. " + "Please make sure that ModelCheckpoint callback is present " + "and at least one validation epoch has been performed." + ) + cfg: Config = pl_module.cfg + cfg.model.weights = best_model_path + if self.upload_to_mlflow: + if cfg.tracker.is_mlflow: + tracker = cast(LuxonisTrackerPL, trainer.logger) + new_upload_url = f"mlflow://{tracker.project_id}/{tracker.run_id}" + cfg.archiver.upload_url = new_upload_url + else: + logging.getLogger(__name__).warning( + "`upload_to_mlflow` is set to True, " + "but there is no MLFlow active run, skipping." + ) + + onnx_path = str(Path(best_model_path).parent.with_suffix(".onnx")) + if not os.path.exists(onnx_path): + raise FileNotFoundError( + "Model executable not found. Make sure to run exporter callback before archiver callback" + ) + + archiver = Archiver(cfg=cfg) + + archiver.archive(onnx_path) diff --git a/luxonis_train/core/__init__.py b/luxonis_train/core/__init__.py index 6264473b..d3e89663 100644 --- a/luxonis_train/core/__init__.py +++ b/luxonis_train/core/__init__.py @@ -1,6 +1,7 @@ +from .archiver import Archiver from .exporter import Exporter from .inferer import Inferer from .trainer import Trainer from .tuner import Tuner -__all__ = ["Exporter", "Trainer", "Tuner", "Inferer"] +__all__ = ["Exporter", "Trainer", "Tuner", "Inferer", "Archiver"] diff --git a/luxonis_train/core/archiver.py b/luxonis_train/core/archiver.py new file mode 100644 index 00000000..58fc231f --- /dev/null +++ b/luxonis_train/core/archiver.py @@ -0,0 +1,371 @@ +import os +from logging import getLogger +from pathlib import Path +from typing import Any + +import onnx +from luxonis_ml.nn_archive.archive_generator import ArchiveGenerator +from luxonis_ml.nn_archive.config import CONFIG_VERSION +from luxonis_ml.nn_archive.config_building_blocks import ObjectDetectionSubtypeYOLO +from luxonis_ml.utils import LuxonisFileSystem + +from luxonis_train.models import LuxonisModel +from luxonis_train.nodes.enums.head_categorization import ( + ImplementedHeads, + ImplementedHeadsIsSoxtmaxed, +) +from luxonis_train.utils.config import Config + +from .core import Core + +logger = getLogger(__name__) + + +class Archiver(Core): + """Main API which is used to construct the NN archive out of a trainig config and + model executables.""" + + def __init__( + self, + cfg: str | dict[str, Any] | Config, + opts: list[str] | tuple[str, ...] | dict[str, Any] | None = None, + ): + """Constructs a new Archiver instance. + + @type cfg: str | dict[str, Any] | Config + @param cfg: Path to config file or config dict used to setup training. + @type opts: list[str] | tuple[str, ...] | dict[str, Any] | None + @param opts: Argument dict provided through command line, + used for config overriding. + """ + + super().__init__(cfg, opts) + + self.lightning_module = LuxonisModel( + cfg=self.cfg, + dataset_metadata=self.dataset_metadata, + save_dir=self.run_save_dir, + input_shape=self.loader_train.input_shape, + ) + + self.model_name = self.cfg.model.name + + self.archive_name = self.cfg.archiver.archive_name + archive_save_directory = Path(self.cfg.archiver.archive_save_directory) + if not archive_save_directory.exists(): + logger.info(f"Creating archive directory {archive_save_directory}") + archive_save_directory.mkdir(parents=True, exist_ok=True) + self.archive_save_directory = str(archive_save_directory) + + self.inputs = [] + self.outputs = [] + self.heads = [] + + def archive(self, executable_path: str): + """Runs archiving. + + @type executable_path: str + @param executable_path: Path to model executable file (e.g. ONNX model). + """ + + executable_fname = os.path.split(executable_path)[1] + _, executable_suffix = os.path.splitext(executable_fname) + self.archive_name += f"_{executable_suffix[1:]}" + + preprocessing = { # TODO: keep preprocessing same for each input? + "mean": self.cfg.trainer.preprocessing.normalize.params["mean"], + "scale": self.cfg.trainer.preprocessing.normalize.params["std"], + "reverse_channels": self.cfg.trainer.preprocessing.train_rgb, + "interleaved_to_planar": False, # TODO: make it modifiable? + } + + inputs_dict = self._get_inputs(executable_path) + for input_name in inputs_dict: + self._add_input( + name=input_name, + dtype=inputs_dict[input_name]["dtype"], + shape=inputs_dict[input_name]["shape"], + preprocessing=preprocessing, + ) + + outputs_dict = self._get_outputs(executable_path) + for output_name in outputs_dict: + self._add_output(name=output_name, dtype=outputs_dict[output_name]["dtype"]) + + heads_dict = self._get_heads(executable_path) + for head_name in heads_dict: + self._add_head(heads_dict[head_name]) + + model = { + "metadata": { + "name": self.model_name, + "path": executable_fname, + }, + "inputs": self.inputs, + "outputs": self.outputs, + "heads": self.heads, + } + + cfg_dict = { + "config_version": CONFIG_VERSION.__args__[0], + "model": model, + } + + self.archive_path = ArchiveGenerator( + archive_name=self.archive_name, + save_path=self.archive_save_directory, + cfg_dict=cfg_dict, + executables_paths=[executable_path], # TODO: what if more executables? + ).make_archive() + + logger.info(f"archive saved to {self.archive_path}") + + if self.cfg.archiver.upload_url is not None: + self._upload() + + return self.archive_path + + def _get_inputs(self, executable_path: str): + """Get inputs of a model executable. + + @type executable_path: str + @param executable_path: Path to model executable file. + """ + + _, executable_suffix = os.path.splitext(executable_path) + if executable_suffix == ".onnx": + return self._get_onnx_inputs(executable_path) + else: + raise NotImplementedError( + f"Missing input reading function for {executable_suffix} models." + ) + + def _get_onnx_inputs(self, executable_path: str): + """Get inputs of an ONNX model executable. + + @type executable_path: str + @param executable_path: Path to model executable file. + """ + + inputs_dict = {} + model = onnx.load(executable_path) + for input in model.graph.input: + tensor_type = input.type.tensor_type + dtype_idx = tensor_type.elem_type + dtype = str(onnx.helper.tensor_dtype_to_np_dtype(dtype_idx)) + shape = [] + for d in tensor_type.shape.dim: + if d.HasField("dim_value"): + shape.append(d.dim_value) + else: + raise ValueError("Unsupported input dimension identifier type") + inputs_dict[input.name] = {"dtype": dtype, "shape": shape} + return inputs_dict + + def _add_input( + self, + name: str, + dtype: str, + shape: list, + preprocessing: dict, + input_type: str = "image", + ) -> None: + """Add input to self.inputs. + + @type name: str + @param name: Name of the input layer. + @type dtype: str + @param dtype: Data type of the input data (e.g., 'float32'). + @type shape: list + @param shape: Shape of the input data as a list of integers (e.g. [H,W], [H,W,C], [BS,H,W,C], ...). + @type preprocessing: dict + @param preprocessing: Preprocessing steps applied to the input data. + @type input_type: str + @param input_type: Type of input data (e.g., 'image'). + """ + + self.inputs.append( + { + "name": name, + "dtype": dtype, + "input_type": input_type, + "shape": shape, + "preprocessing": preprocessing, + } + ) + + def _get_outputs(self, executable_path): + """Get outputs of a model executable. + + @type executable_path: str + @param executable_path: Path to model executable file. + """ + + _, executable_suffix = os.path.splitext(executable_path) + if executable_suffix == ".onnx": + return self._get_onnx_outputs(executable_path) + else: + raise NotImplementedError( + f"Missing input reading function for {executable_suffix} models." + ) + + def _get_onnx_outputs(self, executable_path): + """Get outputs of an ONNX model executable. + + @type executable_path: str + @param executable_path: Path to model executable file. + """ + + outputs_dict = {} + model = onnx.load(executable_path) + for output in model.graph.output: + tensor_type = output.type.tensor_type + dtype_idx = tensor_type.elem_type + dtype = str(onnx.helper.tensor_dtype_to_np_dtype(dtype_idx)) + outputs_dict[output.name] = {"dtype": dtype} + return outputs_dict + + def _add_output(self, name: str, dtype: str) -> None: + """Add output to self.outputs. + + @type name: str + @param name: Name of the output layer. + @type dtype: str + @param dtype: Data type of the output data (e.g., 'float32'). + """ + + self.outputs.append({"name": name, "dtype": dtype}) + + def _get_classes(self, head_family): + if head_family.startswith("Classification"): + return self.dataset_metadata._classes["class"] + elif head_family.startswith("Object"): + return self.dataset_metadata._classes["boxes"] + elif head_family.startswith("Segmentation"): + return self.dataset_metadata._classes["segmentation"] + elif head_family.startswith("Keypoint"): + return self.dataset_metadata._classes["keypoints"] + else: + raise ValueError( + f"No classes found for the specified head family ({head_family})" + ) + + def _get_head_specific_parameters( + self, head_name, head_alias, executable_path + ) -> dict: + """Get parameters specific to head. + + @type head_name: str + @param head_name: Name of the head (e.g. 'EfficientBBoxHead'). + @type head_alias: str + @param head_alias: Alias of the head (e.g. 'detection_head'). + @type executable_path: str + @param executable_path: Path to model executable file. + """ + + parameters = {} + if head_name == "ClassificationHead": + parameters["is_softmax"] = getattr( + ImplementedHeadsIsSoxtmaxed, head_name + ).value + elif head_name == "EfficientBBoxHead": + parameters["subtype"] = ObjectDetectionSubtypeYOLO.YOLOv6.value + head_node = self.lightning_module._modules["nodes"][head_alias] + parameters["iou_threshold"] = head_node.iou_thres + parameters["conf_threshold"] = head_node.conf_thres + parameters["max_det"] = head_node.max_det + elif head_name in ["SegmentationHead", "BiSeNetHead"]: + parameters["is_softmax"] = getattr( + ImplementedHeadsIsSoxtmaxed, head_name + ).value + elif head_name == "ImplicitKeypointBBoxHead": + parameters["subtype"] = ObjectDetectionSubtypeYOLO.YOLOv7.value + head_node = self.lightning_module._modules["nodes"][head_alias] + parameters["iou_threshold"] = head_node.iou_thres + parameters["conf_threshold"] = head_node.conf_thres + parameters["max_det"] = head_node.max_det + parameters["n_keypoints"] = head_node.n_keypoints + parameters["anchors"] = head_node.anchors.tolist() + + else: + raise ValueError("Unknown head name") + return parameters + + def _get_head_outputs(self, head_name) -> dict: + """Get model outputs in a head-specific format. + + @type head_name: str + @param head_name: Name of the head (e.g. 'EfficientBBoxHead'). + """ + + head_outputs = {} + if head_name == "ClassificationHead": + head_outputs["predictions"] = self.outputs[0]["name"] + elif head_name == "EfficientBBoxHead": + head_outputs["yolo_outputs"] = [output["name"] for output in self.outputs] + elif head_name in ["SegmentationHead", "BiSeNetHead"]: + head_outputs["predictions"] = self.outputs[0]["name"] + elif head_name == "ImplicitKeypointBBoxHead": + head_outputs["predictions"] = self.outputs[0]["name"] + else: + raise ValueError("Unknown head name") + return head_outputs + + def _get_heads(self, executable_path): + """Get model heads. + + @type executable_path: str + @param executable_path: Path to model executable file. + """ + heads_dict = {} + + for node in self.cfg.model.nodes: + node_name = node.name + node_alias = node.alias + # node_inputs = node.inputs + if node_alias in self.lightning_module.outputs: + if node_name in ImplementedHeads.__members__: + head_family = getattr(ImplementedHeads, node_name).value + classes = self._get_classes(head_family) + head_outputs = self._get_head_outputs(node_name) + head_dict = { + "family": head_family, + "outputs": head_outputs, + "classes": classes, + "n_classes": len(classes), + } + head_dict.update( + self._get_head_specific_parameters( + node_name, node_alias, executable_path + ) + ) + heads_dict[node_name] = head_dict + return heads_dict + + def _add_head(self, head_metadata: dict) -> str: + """Add head to self.heads. + + @type metadata: dict + @param metadata: Parameters required by head to run postprocessing. + """ + + self.heads.append(head_metadata) + + def _upload(self): + """Uploads the archive file to specified s3 bucket. + + @raises ValueError: If upload url was not specified in config file. + """ + + if self.cfg.archiver.upload_url is None: + raise ValueError("Upload url must be specified in config file.") + + fs = LuxonisFileSystem(self.cfg.archiver.upload_url, allow_local=False) + logger.info(f"Started Archive upload to {fs.full_path}...") + + fs.put_file( + local_path=self.archive_path, + remote_path=self.archive_name, + ) + + logger.info("Files upload finished") diff --git a/luxonis_train/core/core.py b/luxonis_train/core/core.py index 86b63600..761bc26f 100644 --- a/luxonis_train/core/core.py +++ b/luxonis_train/core/core.py @@ -234,3 +234,7 @@ def get_best_metric_checkpoint_path(self) -> str: @return: Path to best checkpoint with respect to best validation metric """ return self.pl_trainer.checkpoint_callbacks[1].best_model_path # type: ignore + + def reset_logging(self) -> None: + """Close file handlers to release the log file.""" + reset_logging() diff --git a/luxonis_train/nodes/efficient_bbox_head.py b/luxonis_train/nodes/efficient_bbox_head.py index 9f500cd4..a4f3bc93 100644 --- a/luxonis_train/nodes/efficient_bbox_head.py +++ b/luxonis_train/nodes/efficient_bbox_head.py @@ -30,6 +30,7 @@ def __init__( n_heads: Literal[2, 3, 4] = 3, conf_thres: float = 0.25, iou_thres: float = 0.45, + max_det: int = 300, **kwargs, ): """Head for object detection. @@ -45,6 +46,9 @@ def __init__( @type iou_thres: float @param iou_thres: Threshold for IoU. Defaults to C{0.45}. + + @type max_det: int + @param max_det: Maximum number of detections retained after NMS. Defaults to C{300}. """ super().__init__(task_type=LabelType.BOUNDINGBOX, **kwargs) @@ -52,6 +56,7 @@ def __init__( self.conf_thres = conf_thres self.iou_thres = iou_thres + self.max_det = max_det self.stride = self._fit_stride_to_num_heads() self.grid_cell_offset = 0.5 @@ -163,5 +168,6 @@ def _process_to_bbox( conf_thres=self.conf_thres, iou_thres=self.iou_thres, bbox_format="xyxy", + max_det=self.max_det, predicts_objectness=False, ) diff --git a/luxonis_train/nodes/enums/head_categorization.py b/luxonis_train/nodes/enums/head_categorization.py new file mode 100644 index 00000000..56f98ff3 --- /dev/null +++ b/luxonis_train/nodes/enums/head_categorization.py @@ -0,0 +1,21 @@ +from enum import Enum + + +class ImplementedHeads(Enum): + """Task categorization for the implemented heads.""" + + ClassificationHead = "Classification" + EfficientBBoxHead = "ObjectDetectionYOLO" + ImplicitKeypointBBoxHead = "KeypointDetectionYOLO" + SegmentationHead = "Segmentation" + BiSeNetHead = "Segmentation" + + +class ImplementedHeadsIsSoxtmaxed(Enum): + """Softmaxed output categorization for the implemented heads.""" + + ClassificationHead = False + EfficientBBoxHead = None + ImplicitKeypointBBoxHead = None + SegmentationHead = False + BiSeNetHead = False diff --git a/luxonis_train/nodes/implicit_keypoint_bbox_head.py b/luxonis_train/nodes/implicit_keypoint_bbox_head.py index aff2b5a6..7f0c3d61 100644 --- a/luxonis_train/nodes/implicit_keypoint_bbox_head.py +++ b/luxonis_train/nodes/implicit_keypoint_bbox_head.py @@ -30,6 +30,7 @@ def __init__( init_coco_biases: bool = True, conf_thres: float = 0.25, iou_thres: float = 0.45, + max_det: int = 300, **kwargs, ): """Head for object and keypoint detection. @@ -53,6 +54,8 @@ def __init__( @param conf_thres: Threshold for confidence. Defaults to C{0.25}. @type iou_thres: float @param iou_thres: Threshold for IoU. Defaults to C{0.45}. + @type max_det: int + @param max_det: Maximum number of detections retained after NMS. Defaults to C{300}. """ super().__init__(task_type=LabelType.KEYPOINT, **kwargs) @@ -63,6 +66,7 @@ def __init__( self.conf_thres = conf_thres self.iou_thres = iou_thres + self.max_det = max_det n_keypoints = n_keypoints or self.dataset_metadata._n_keypoints @@ -164,6 +168,7 @@ def wrap(self, outputs: tuple[list[Tensor], Tensor]) -> Packet[Tensor]: conf_thres=self.conf_thres, iou_thres=self.iou_thres, bbox_format="cxcywh", + max_det=self.max_det, ) return { diff --git a/luxonis_train/utils/config.py b/luxonis_train/utils/config.py index 591376f8..a2d4f332 100644 --- a/luxonis_train/utils/config.py +++ b/luxonis_train/utils/config.py @@ -269,6 +269,12 @@ def pad_values(values: float | list[float] | None): return self +class ArchiveConfig(BaseModel): + archive_name: str = "nn_archive" + archive_save_directory: str = "output_archive" + upload_url: str | None = None + + class StorageConfig(CustomBaseModel): active: bool = True storage_type: Literal["local", "remote"] = "local" @@ -292,6 +298,7 @@ class Config(LuxonisConfig): tracker: TrackerConfig = TrackerConfig() trainer: TrainerConfig = TrainerConfig() exporter: ExportConfig = ExportConfig() + archiver: ArchiveConfig = ArchiveConfig() tuner: TunerConfig | None = None ENVIRON: Environ = Field(Environ(), exclude=True) diff --git a/media/coverage_badge.svg b/media/coverage_badge.svg index 7a18c7f4..4033e89e 100644 --- a/media/coverage_badge.svg +++ b/media/coverage_badge.svg @@ -15,7 +15,7 @@ coverage coverage - 80% - 80% + 79% + 79% diff --git a/requirements.txt b/requirements.txt index eecf828e..3a884284 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,12 @@ blobconverter>=1.4.2 lightning>=2.0.0 -luxonis-ml[all]>=0.0.1 +#luxonis-ml[all]>=0.0.1 +luxonis-ml[all]@git+https://github.com/luxonis/luxonis-ml.git@dev onnx>=1.12.0 onnxruntime>=1.13.1 onnxsim>=0.4.10 optuna>=3.2.0 +optuna_integration>=3.6.0 psycopg2-binary>=2.9.1 pycocotools>=2.0.7 rich>=13.0.0 @@ -12,3 +14,4 @@ s3fs>=2023.0.0 tensorboard>=2.10.1 torchvision>=0.16.0 typer>=0.9.0 +mlflow>=2.10.0 diff --git a/tests/unittests/test_core/__init__.py b/tests/unittests/test_core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unittests/test_core/test_archiver.py b/tests/unittests/test_core/test_archiver.py new file mode 100644 index 00000000..bdbaa5b9 --- /dev/null +++ b/tests/unittests/test_core/test_archiver.py @@ -0,0 +1,158 @@ +import io +import json +import os +import random +import shutil +import tarfile + +import cv2 +import lightning.pytorch as pl +import numpy as np +import onnx +import yaml +from luxonis_ml.data import LuxonisDataset + +import luxonis_train +from luxonis_train.core import Archiver +from luxonis_train.core.exporter import Exporter +from luxonis_train.core.trainer import Trainer +from luxonis_train.utils.config import Config + + +class TestArchiver: + @classmethod + def setup_class(cls): + """Create and load all files required for testing.""" + + luxonis_train_parent_dir = os.path.dirname( + os.path.dirname(luxonis_train.__file__) + ) + cls.tmp_path = os.path.join( + luxonis_train_parent_dir, "tests", "unittests", "test_core", "tmp" + ) + os.mkdir(cls.tmp_path) + + # make LDF + os.mkdir(os.path.join(cls.tmp_path, "images")) + cls.ldf_name = "dummyLDF" + labels = ["label1", "label2", "label3"] + + def classification_dataset_generator(): + for i in range(10): + img = np.random.randint(0, 256, (10, 10, 3), dtype=np.uint8) + img_file_path = os.path.join(cls.tmp_path, "images", f"img{i}.png") + cv2.imwrite(img_file_path, img) + yield { + "file": img_file_path, + "type": "classification", + "value": True, + "class": random.choice(labels), + } + + if LuxonisDataset.exists(cls.ldf_name): + print("Deleting existing dataset") + LuxonisDataset(cls.ldf_name).delete_dataset() + dataset = LuxonisDataset(cls.ldf_name) + dataset.add(classification_dataset_generator) + dataset.set_classes(list(labels)) + dataset.make_splits() + + # make config + config_dict = { + "model": { + "name": "test_model", + "predefined_model": {"name": "ClassificationModel"}, + }, + "dataset": {"name": cls.ldf_name}, + "tracker": {"save_directory": cls.tmp_path}, + } + cls.config_path = os.path.join(cls.tmp_path, "config.yaml") + with open(cls.config_path, "w") as yaml_file: + yaml_str = yaml.dump(config_dict) + yaml_file.write(yaml_str) + cfg = Config.get_config(config_dict) + + # train model + cfg.trainer.epochs = 1 + cfg.trainer.validation_interval = 1 + cfg.trainer.batch_size = 4 + trainer = Trainer(cfg=cfg) + trainer.train() + callbacks = [ + c + for c in trainer.pl_trainer.callbacks + if isinstance(c, pl.callbacks.ModelCheckpoint) + ] + model_checkpoint_path = callbacks[0].best_model_path + model_ckpt = os.path.join(trainer.run_save_dir, model_checkpoint_path) + trainer.reset_logging() + + # export model to ONNX + cfg.model.weights = model_ckpt + exporter = Exporter(cfg=cfg) + cls.onnx_model_path = os.path.join(cls.tmp_path, "model.onnx") + exporter.export(onnx_path=cls.onnx_model_path) + exporter.reset_logging() + + # make archive + cfg.archiver.archive_save_directory = cls.tmp_path + archiver = Archiver(cls.config_path) + cls.archive_path = archiver.archive(cls.onnx_model_path) + archiver.reset_logging() + + # load archive files into memory + with tarfile.open(cls.archive_path, mode="r") as tar: + cls.archive_fnames = tar.getnames() + for fname in cls.archive_fnames: + f = tar.extractfile(fname) + if fname.endswith(".json"): + cls.json_dict = json.load(f) + elif fname.endswith(".onnx"): + model_bytes = f.read() + model_io = io.BytesIO(model_bytes) + cls.onnx_model = onnx.load(model_io) + + @classmethod + def teardown_class(cls): + """Remove all created files.""" + LuxonisDataset(cls.ldf_name).delete_dataset() + shutil.rmtree(cls.tmp_path) + + def test_archive_creation(self): + """Test if nn_archive was created.""" + assert os.path.exists(self.archive_path) + + def test_archive_suffix(self): + """Test if nn_archive is compressed using xz option (should be the default + option).""" + assert self.archive_path.endswith("tar.xz") + + def test_archive_contents(self): + """Test if nn_archive consists of config.json and model.onnx.""" + assert ( + len(self.archive_fnames) == 2 + and any([fname == "config.json" for fname in self.archive_fnames]) + and any([fname == "model.onnx" for fname in self.archive_fnames]) + ) + + def test_onnx(self): + """Test if archived ONNX model is valid.""" + assert onnx.checker.check_model(self.onnx_model, full_check=True) is None + + def test_config_inputs(self): + """Test if archived config inputs are valid.""" + config_input_names = [] + for input in self.json_dict["model"]["inputs"]: + config_input_names.append(input["name"]) + assert set([input.name for input in self.onnx_model.graph.input]) == set( + config_input_names + ) + + def test_config_outputs(self): + """Test if archived config outputs are valid.""" + config_output_names = [] + for input in self.json_dict["model"]["outputs"]: + config_output_names.append(input["name"]) + assert set([output.name for output in self.onnx_model.graph.output]) == set( + config_output_names + )