From 3de2d8e6e5850ee662b11b6cf46519877f1c13fb Mon Sep 17 00:00:00 2001 From: Sierra Guequierre Date: Tue, 2 Jul 2024 14:04:47 -0400 Subject: [PATCH] linter --- docs/services/ml/upload-training-script.md | 133 +++++++++++++++------ 1 file changed, 98 insertions(+), 35 deletions(-) diff --git a/docs/services/ml/upload-training-script.md b/docs/services/ml/upload-training-script.md index c4762bb5f7..0ff1f9e3f1 100644 --- a/docs/services/ml/upload-training-script.md +++ b/docs/services/ml/upload-training-script.md @@ -23,7 +23,8 @@ Create a Python source distribution `tar.gz` file that contains all of the train - The `dataset_file` is a file containing the GCS blob paths and annotations associated with the dataset ID on which the user is training. - The `model_output_directory` is the location where the user saves the model artifacts (the TFLite, PyTorch, TF, ONNX files) they produce from training. - Parse these within your training script (see line 364). + Parse these within your training script. + See line 364 in the example script for an example of this. Then, read in the dataset and prepare the data for training, build and compile the model, train the model on the data, and save the model artifact to Google Container Storage (GCS). For example: @@ -57,12 +58,17 @@ ROUNDING_DIGITS = 5 _INPUT_NORM_MEAN = 127.5 _INPUT_NORM_STD = 127.5 -# IMPORTANT: One of the following two helper functions -# must be included in your training script depending on the type of model you're training. -# This is used for parsing the dataset file produced and stored in Viam. +# IMPORTANT: One of the following two helper functions +# must be included in your training script depending on the +# type of model you're training. +# This is used for parsing the dataset file produced and stored in Viam. -def parse_filenames_and_labels_from_json(filename: str, all_labels: ty.List[str]) -> ty.Tuple[ty.List[str], ty.List[str]]: - """Load and parse the dataset JSON file to return image filenames and corresponding labels. + +def parse_filenames_and_labels_from_json( + filename: str, all_labels: ty.List[str] +) -> ty.Tuple[ty.List[str], ty.List[str]]: + """Load and parse the dataset JSON file to return + image filenames and corresponding labels. Args: filename: JSONLines file containing filenames and labels model_type: either 'single_label' or 'multi_label' @@ -83,11 +89,13 @@ def parse_filenames_and_labels_from_json(filename: str, all_labels: ty.List[str] image_labels.append(labels) return image_filenames, image_labels + def parse_filenames_and_bboxes_from_json( filename: str, all_labels: ty.List[str], ) -> ty.Tuple[ty.List[str], ty.List[str], ty.List[ty.List[float]]]: - """Load and parse the dataset JSON file to return image filenames and corresponding labels with bboxes. + """Load and parse the dataset JSON file to return image filenames + and corresponding labels with bboxes. Args: filename: JSONLines file containing filenames and bboxes """ @@ -105,7 +113,8 @@ def parse_filenames_and_bboxes_from_json( for annotation in annotations: if annotation["annotation_label"] in all_labels: labels.append(annotation["annotation_label"]) - # Store coordinates in rel_yxyx format so that we can use the keras_cv function + # Store coordinates in rel_yxyx format + # so that we can use the keras_cv function coords.append( [ annotation["y_min_normalized"], @@ -118,11 +127,13 @@ def parse_filenames_and_bboxes_from_json( bbox_coords.append(coords) return image_filenames, bbox_labels, bbox_coords + def get_neural_network_params( num_classes: int, model_type: str ) -> ty.Tuple[str, str, str, str]: """Function that returns units and activation used for the last layer - and loss function for the model, based on number of classes and model type. + and loss function for the model, + based on number of classes and model type. Args: labels: list of labels corresponding to images model_type: string single-label or multi-label for desired output @@ -149,6 +160,7 @@ def get_neural_network_params( ) return units, activation, loss, metrics + def preprocessing_layers_classification( img_size: ty.Tuple[int, int] = (256, 256) ) -> ty.Tuple[tf.Tensor, tf.Tensor]: @@ -165,8 +177,9 @@ def preprocessing_layers_classification( ) return preprocessing + def decode_image(image): - """Decodes the image as an uint8 dense vector + """Decode the image as an uint8 dense vector. Args: image: the image file contents as a tensor """ @@ -177,8 +190,10 @@ def decode_image(image): dtype=tf.dtypes.uint8, ) + def check_type_and_decode_image(image_string_tensor): - """Parse an image from gcs and decode it. Ungzip the image from gcs if zipped + """Parse an image from GCS and decode it. + Ungzip the image from GCS if zipped. Args: image_string_tensor: the tensored form of an image gcs string """ @@ -186,6 +201,7 @@ def check_type_and_decode_image(image_string_tensor): image_string = tf.io.read_file(image_string_tensor) return decode_image(image_string) + def encoded_labels( image_labels: ty.List[str], all_labels: ty.List[str], model_type: str ) -> tf.Tensor: @@ -199,6 +215,7 @@ def encoded_labels( ) return encoder(image_labels) + def parse_image_and_encode_labels( filename: str, labels: ty.List[str], @@ -237,7 +254,8 @@ def create_dataset_classification( """Load and parse dataset from Tensorflow datasets. Args: filenames: string list of image paths - labels: list of string lists, where each string list contains up to N_LABEL labels associated with an image + labels: list of string lists, where each string list contains up to + N_LABEL labels associated with an image. all_labels: string list of all N_LABELS model_type: string single_label or multi_label """ @@ -250,13 +268,20 @@ def create_dataset_classification( ) def mapping_fnc(x, y): - return parse_image_and_encode_labels(x, y, all_labels, model_type, img_size) + return parse_image_and_encode_labels( + x, + y, + all_labels, + model_type, + img_size + ) # Parse and preprocess observations in parallel dataset = dataset.map(mapping_fnc, num_parallel_calls=num_parallel_calls) # Shuffle the data for each buffer size - # Disabling reshuffling ensures items from the training and test set will not get shuffled into each other + # Disabling reshuffling ensures items from the training + # and test set will not get shuffled into each other. dataset = dataset.shuffle( buffer_size=shuffle_buffer_size, reshuffle_each_iteration=False ) @@ -311,7 +336,11 @@ def build_and_compile_classification( # Add custom layers global_pooling = tf.keras.layers.GlobalAveragePooling2D() # Output layer - classification = tf.keras.layers.Dense(units, activation=activation, name="output") + classification = tf.keras.layers.Dense( + units, + activation=activation, + name="output" + ) y = tf.keras.Sequential( [ @@ -332,6 +361,7 @@ def build_and_compile_classification( ) return model + def save_labels(labels: ty.List[str], model_dir: str) -> None: filename = os.path.join(model_dir, labels_filename) with open(filename, "w") as f: @@ -339,12 +369,14 @@ def save_labels(labels: ty.List[str], model_dir: str) -> None: f.write(label + "\n") f.write(labels[-1]) + def get_rounded_number(val: tf.Tensor, rounding_digits: int) -> tf.Tensor: if np.isnan(val) or np.isinf(val): return -1 else: return float(round(val, rounding_digits)) + def save_model_metrics_classification( loss_history: callbacks.History, monitored_val: ty.List[str], @@ -358,21 +390,36 @@ def save_model_metrics_classification( test_metrics = model.evaluate(test_images, test_labels) metrics = {} - # Since there could be potentially many occurences of the maximum value being monitored, - # we reverse the list storing the tracked values and take the last occurence. - monitored_metric_max_idx = len(monitored_val) - np.argmax(monitored_val[::-1]) - 1 + # Since there could be many occurences of the + # maximum value being monitored, + # we reverse the list storing the tracked values + # and take the last occurence. + monitored_metric_max_idx = ( + len(monitored_val) + - np.argmax(monitored_val[::-1]) + - 1 + ) for i, key in enumerate(model.metrics_names): metrics["train_" + key] = get_rounded_number( - loss_history.history[key][monitored_metric_max_idx], ROUNDING_DIGITS + loss_history.history[key][monitored_metric_max_idx], + ROUNDING_DIGITS + ) + metrics["test_" + key] = get_rounded_number( + test_metrics[i], + ROUNDING_DIGITS ) - metrics["test_" + key] = get_rounded_number(test_metrics[i], ROUNDING_DIGITS) # Save the loss and test metrics as model metrics filename = os.path.join(model_dir, metrics_filename) with open(filename, "w") as f: json.dump(metrics, f, ensure_ascii=False) -# IMPORTANT: You must include a helper function like the following for your framework type that allows you to save the model artifact to Viam, which will be viewable as a registry item for the ML model name and version specified. + +# IMPORTANT: You must include a helper function like the following +# for your framework type that allows you to save +# the model artifact to Viam, which will be viewable as a registry item +# for the ML model name and version specified. + def save_tflite_classification( model: Model, @@ -380,7 +427,8 @@ def save_tflite_classification( model_name: str, target_shape: ty.Tuple[int, int, int], ) -> None: - # Convert the model to tflite, with batch size 1 so the graph does not have dynamic-sized tensors. + # Convert the model to tflite, with batch size 1 + # so the graph does not have dynamic-sized tensors. input = tf.keras.Input(target_shape, batch_size=1, dtype=tf.uint8) output = model(input, training=False) wrapped_model = tf.keras.Model(inputs=input, outputs=output) @@ -393,14 +441,15 @@ def save_tflite_classification( with open(filename, "wb") as f: f.write(tflite_model) + if __name__ == "__main__": # This parses the required args for running the training script. parser = argparse.ArgumentParser() parser.add_argument("--dataset_file", dest="data_json", type=str) parser.add_argument("--model_output_directory", dest="model_dir", type=str) args = parser.parse_args() - MODEL_DIR = args.model_dir # Use the model directory for saving model artifacts associated with the model name and version. - DATA_JSON = args.data_json # Use the data JSON filename to parse the images and their annotations for the specified dataset. + MODEL_DIR = args.model_dir + DATA_JSON = args.data_json # Set up compute device strategy if len(tf.config.list_physical_devices("GPU")) > 0: @@ -422,7 +471,10 @@ if __name__ == "__main__": # Read dataset file LABELS = ["orange_triangle", "blue_star"] - image_filenames, image_labels = parse_filenames_and_labels_from_json(DATA_JSON, LABELS) + image_filenames, image_labels = parse_filenames_and_labels_from_json( + DATA_JSON, + LABELS + ) # Generate 80/20 split for train and test data train_dataset, test_dataset = create_dataset_classification( filenames=image_filenames, @@ -445,21 +497,26 @@ if __name__ == "__main__": # Get callbacks for training classification callbackEarlyStopping = tf.keras.callbacks.EarlyStopping( - # Stop training when `monitor` value is no longer improving - monitor="binary_accuracy", - # "no longer improving" being defined as "no better than 'min_delta' less" + # Stop training when `monitor` value is no longer improving + monitor="binary_accuracy", + # "no longer improving" being defined + # as "no better than 'min_delta' less" min_delta=1e-3, - # "no longer improving" being further defined as "for at least 'patience' epochs" + # "no longer improving" being further defined + # as "for at least 'patience' epochs" patience=5, - # Restore weights from the best performing model, requires keeping track of model weights and performance. + # Restore weights from the best performing model, + # requires keeping track of model weights and performance. restore_best_weights=True, ) callbackReduceLROnPlateau = tf.keras.callbacks.ReduceLROnPlateau( # Reduce learning rate when `loss` is no longer improving monitor="loss", - # "no longer improving" being defined as "no better than 'min_delta' less" + # "no longer improving" being defined as + # "no better than 'min_delta' less" min_delta=1e-3, - # "no longer improving" being further defined as "for at least 'patience' epochs" + # "no longer improving" being further defined + # as "for at least 'patience' epochs" patience=5, # Default lower bound on learning rate min_lr=0, @@ -469,10 +526,17 @@ if __name__ == "__main__": # Train model on data loss_history = model.fit( - x=train_dataset, epochs=EPOCHS, callbacks=[callbackEarlyStopping, callbackReduceLROnPlateau] + x=train_dataset, + epochs=EPOCHS, + callbacks=[ + callbackEarlyStopping, + callbackReduceLROnPlateau + ] ) + # Get the values of what is being monitored in the early stopping policy, - # since this is what is used to restore best weights for the resulting model. + # since this is what is used to restore + # best weights for the resulting model. monitored_val = callbackEarlyStopping.get_monitor_value( loss_history.history ) @@ -490,7 +554,6 @@ if __name__ == "__main__": save_tflite_classification( model, MODEL_DIR, "beepboop", IMG_SIZE + (3,) ) - ``` {{% /expand%}}.