Skip to content

Commit

Permalink
linter
Browse files Browse the repository at this point in the history
  • Loading branch information
sguequierre committed Jul 2, 2024
1 parent 2b200ae commit 3de2d8e
Showing 1 changed file with 98 additions and 35 deletions.
133 changes: 98 additions & 35 deletions docs/services/ml/upload-training-script.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ Create a Python source distribution `tar.gz` file that contains all of the train
- The `dataset_file` is a file containing the GCS blob paths and annotations associated with the dataset ID on which the user is training.
- The `model_output_directory` is the location where the user saves the model artifacts (the TFLite, PyTorch, TF, ONNX files) they produce from training.

Parse these within your training script (see line 364).
Parse these within your training script.
See line 364 in the example script for an example of this.
Then, read in the dataset and prepare the data for training, build and compile the model, train the model on the data, and save the model artifact to Google Container Storage (GCS).

For example:
Expand Down Expand Up @@ -57,12 +58,17 @@ ROUNDING_DIGITS = 5
_INPUT_NORM_MEAN = 127.5
_INPUT_NORM_STD = 127.5

# IMPORTANT: One of the following two helper functions
# must be included in your training script depending on the type of model you're training.
# This is used for parsing the dataset file produced and stored in Viam.
# IMPORTANT: One of the following two helper functions
# must be included in your training script depending on the
# type of model you're training.
# This is used for parsing the dataset file produced and stored in Viam.

def parse_filenames_and_labels_from_json(filename: str, all_labels: ty.List[str]) -> ty.Tuple[ty.List[str], ty.List[str]]:
"""Load and parse the dataset JSON file to return image filenames and corresponding labels.

def parse_filenames_and_labels_from_json(
filename: str, all_labels: ty.List[str]
) -> ty.Tuple[ty.List[str], ty.List[str]]:
"""Load and parse the dataset JSON file to return
image filenames and corresponding labels.
Args:
filename: JSONLines file containing filenames and labels
model_type: either 'single_label' or 'multi_label'
Expand All @@ -83,11 +89,13 @@ def parse_filenames_and_labels_from_json(filename: str, all_labels: ty.List[str]
image_labels.append(labels)
return image_filenames, image_labels


def parse_filenames_and_bboxes_from_json(
filename: str,
all_labels: ty.List[str],
) -> ty.Tuple[ty.List[str], ty.List[str], ty.List[ty.List[float]]]:
"""Load and parse the dataset JSON file to return image filenames and corresponding labels with bboxes.
"""Load and parse the dataset JSON file to return image filenames
and corresponding labels with bboxes.
Args:
filename: JSONLines file containing filenames and bboxes
"""
Expand All @@ -105,7 +113,8 @@ def parse_filenames_and_bboxes_from_json(
for annotation in annotations:
if annotation["annotation_label"] in all_labels:
labels.append(annotation["annotation_label"])
# Store coordinates in rel_yxyx format so that we can use the keras_cv function
# Store coordinates in rel_yxyx format
# so that we can use the keras_cv function
coords.append(
[
annotation["y_min_normalized"],
Expand All @@ -118,11 +127,13 @@ def parse_filenames_and_bboxes_from_json(
bbox_coords.append(coords)
return image_filenames, bbox_labels, bbox_coords


def get_neural_network_params(
num_classes: int, model_type: str
) -> ty.Tuple[str, str, str, str]:
"""Function that returns units and activation used for the last layer
and loss function for the model, based on number of classes and model type.
and loss function for the model,
based on number of classes and model type.
Args:
labels: list of labels corresponding to images
model_type: string single-label or multi-label for desired output
Expand All @@ -149,6 +160,7 @@ def get_neural_network_params(
)
return units, activation, loss, metrics


def preprocessing_layers_classification(
img_size: ty.Tuple[int, int] = (256, 256)
) -> ty.Tuple[tf.Tensor, tf.Tensor]:
Expand All @@ -165,8 +177,9 @@ def preprocessing_layers_classification(
)
return preprocessing


def decode_image(image):
"""Decodes the image as an uint8 dense vector
"""Decode the image as an uint8 dense vector.
Args:
image: the image file contents as a tensor
"""
Expand All @@ -177,15 +190,18 @@ def decode_image(image):
dtype=tf.dtypes.uint8,
)


def check_type_and_decode_image(image_string_tensor):
"""Parse an image from gcs and decode it. Ungzip the image from gcs if zipped
"""Parse an image from GCS and decode it.
Ungzip the image from GCS if zipped.
Args:
image_string_tensor: the tensored form of an image gcs string
"""
# Read an image from gcs
image_string = tf.io.read_file(image_string_tensor)
return decode_image(image_string)


def encoded_labels(
image_labels: ty.List[str], all_labels: ty.List[str], model_type: str
) -> tf.Tensor:
Expand All @@ -199,6 +215,7 @@ def encoded_labels(
)
return encoder(image_labels)


def parse_image_and_encode_labels(
filename: str,
labels: ty.List[str],
Expand Down Expand Up @@ -237,7 +254,8 @@ def create_dataset_classification(
"""Load and parse dataset from Tensorflow datasets.
Args:
filenames: string list of image paths
labels: list of string lists, where each string list contains up to N_LABEL labels associated with an image
labels: list of string lists, where each string list contains up to
N_LABEL labels associated with an image.
all_labels: string list of all N_LABELS
model_type: string single_label or multi_label
"""
Expand All @@ -250,13 +268,20 @@ def create_dataset_classification(
)

def mapping_fnc(x, y):
return parse_image_and_encode_labels(x, y, all_labels, model_type, img_size)
return parse_image_and_encode_labels(
x,
y,
all_labels,
model_type,
img_size
)

# Parse and preprocess observations in parallel
dataset = dataset.map(mapping_fnc, num_parallel_calls=num_parallel_calls)

# Shuffle the data for each buffer size
# Disabling reshuffling ensures items from the training and test set will not get shuffled into each other
# Disabling reshuffling ensures items from the training
# and test set will not get shuffled into each other.
dataset = dataset.shuffle(
buffer_size=shuffle_buffer_size, reshuffle_each_iteration=False
)
Expand Down Expand Up @@ -311,7 +336,11 @@ def build_and_compile_classification(
# Add custom layers
global_pooling = tf.keras.layers.GlobalAveragePooling2D()
# Output layer
classification = tf.keras.layers.Dense(units, activation=activation, name="output")
classification = tf.keras.layers.Dense(
units,
activation=activation,
name="output"
)

y = tf.keras.Sequential(
[
Expand All @@ -332,19 +361,22 @@ def build_and_compile_classification(
)
return model


def save_labels(labels: ty.List[str], model_dir: str) -> None:
filename = os.path.join(model_dir, labels_filename)
with open(filename, "w") as f:
for label in labels[:-1]:
f.write(label + "\n")
f.write(labels[-1])


def get_rounded_number(val: tf.Tensor, rounding_digits: int) -> tf.Tensor:
if np.isnan(val) or np.isinf(val):
return -1
else:
return float(round(val, rounding_digits))


def save_model_metrics_classification(
loss_history: callbacks.History,
monitored_val: ty.List[str],
Expand All @@ -358,29 +390,45 @@ def save_model_metrics_classification(
test_metrics = model.evaluate(test_images, test_labels)

metrics = {}
# Since there could be potentially many occurences of the maximum value being monitored,
# we reverse the list storing the tracked values and take the last occurence.
monitored_metric_max_idx = len(monitored_val) - np.argmax(monitored_val[::-1]) - 1
# Since there could be many occurences of the
# maximum value being monitored,
# we reverse the list storing the tracked values
# and take the last occurence.
monitored_metric_max_idx = (
len(monitored_val)
- np.argmax(monitored_val[::-1])
- 1
)
for i, key in enumerate(model.metrics_names):
metrics["train_" + key] = get_rounded_number(
loss_history.history[key][monitored_metric_max_idx], ROUNDING_DIGITS
loss_history.history[key][monitored_metric_max_idx],
ROUNDING_DIGITS
)
metrics["test_" + key] = get_rounded_number(
test_metrics[i],
ROUNDING_DIGITS
)
metrics["test_" + key] = get_rounded_number(test_metrics[i], ROUNDING_DIGITS)

# Save the loss and test metrics as model metrics
filename = os.path.join(model_dir, metrics_filename)
with open(filename, "w") as f:
json.dump(metrics, f, ensure_ascii=False)

# IMPORTANT: You must include a helper function like the following for your framework type that allows you to save the model artifact to Viam, which will be viewable as a registry item for the ML model name and version specified.

# IMPORTANT: You must include a helper function like the following
# for your framework type that allows you to save
# the model artifact to Viam, which will be viewable as a registry item
# for the ML model name and version specified.


def save_tflite_classification(
model: Model,
model_dir: str,
model_name: str,
target_shape: ty.Tuple[int, int, int],
) -> None:
# Convert the model to tflite, with batch size 1 so the graph does not have dynamic-sized tensors.
# Convert the model to tflite, with batch size 1
# so the graph does not have dynamic-sized tensors.
input = tf.keras.Input(target_shape, batch_size=1, dtype=tf.uint8)
output = model(input, training=False)
wrapped_model = tf.keras.Model(inputs=input, outputs=output)
Expand All @@ -393,14 +441,15 @@ def save_tflite_classification(
with open(filename, "wb") as f:
f.write(tflite_model)


if __name__ == "__main__":
# This parses the required args for running the training script.
parser = argparse.ArgumentParser()
parser.add_argument("--dataset_file", dest="data_json", type=str)
parser.add_argument("--model_output_directory", dest="model_dir", type=str)
args = parser.parse_args()
MODEL_DIR = args.model_dir # Use the model directory for saving model artifacts associated with the model name and version.
DATA_JSON = args.data_json # Use the data JSON filename to parse the images and their annotations for the specified dataset.
MODEL_DIR = args.model_dir
DATA_JSON = args.data_json

# Set up compute device strategy
if len(tf.config.list_physical_devices("GPU")) > 0:
Expand All @@ -422,7 +471,10 @@ if __name__ == "__main__":

# Read dataset file
LABELS = ["orange_triangle", "blue_star"]
image_filenames, image_labels = parse_filenames_and_labels_from_json(DATA_JSON, LABELS)
image_filenames, image_labels = parse_filenames_and_labels_from_json(
DATA_JSON,
LABELS
)
# Generate 80/20 split for train and test data
train_dataset, test_dataset = create_dataset_classification(
filenames=image_filenames,
Expand All @@ -445,21 +497,26 @@ if __name__ == "__main__":

# Get callbacks for training classification
callbackEarlyStopping = tf.keras.callbacks.EarlyStopping(
# Stop training when `monitor` value is no longer improving
monitor="binary_accuracy",
# "no longer improving" being defined as "no better than 'min_delta' less"
# Stop training when `monitor` value is no longer improving
monitor="binary_accuracy",
# "no longer improving" being defined
# as "no better than 'min_delta' less"
min_delta=1e-3,
# "no longer improving" being further defined as "for at least 'patience' epochs"
# "no longer improving" being further defined
# as "for at least 'patience' epochs"
patience=5,
# Restore weights from the best performing model, requires keeping track of model weights and performance.
# Restore weights from the best performing model,
# requires keeping track of model weights and performance.
restore_best_weights=True,
)
callbackReduceLROnPlateau = tf.keras.callbacks.ReduceLROnPlateau(
# Reduce learning rate when `loss` is no longer improving
monitor="loss",
# "no longer improving" being defined as "no better than 'min_delta' less"
# "no longer improving" being defined as
# "no better than 'min_delta' less"
min_delta=1e-3,
# "no longer improving" being further defined as "for at least 'patience' epochs"
# "no longer improving" being further defined
# as "for at least 'patience' epochs"
patience=5,
# Default lower bound on learning rate
min_lr=0,
Expand All @@ -469,10 +526,17 @@ if __name__ == "__main__":

# Train model on data
loss_history = model.fit(
x=train_dataset, epochs=EPOCHS, callbacks=[callbackEarlyStopping, callbackReduceLROnPlateau]
x=train_dataset,
epochs=EPOCHS,
callbacks=[
callbackEarlyStopping,
callbackReduceLROnPlateau
]
)

# Get the values of what is being monitored in the early stopping policy,
# since this is what is used to restore best weights for the resulting model.
# since this is what is used to restore
# best weights for the resulting model.
monitored_val = callbackEarlyStopping.get_monitor_value(
loss_history.history
)
Expand All @@ -490,7 +554,6 @@ if __name__ == "__main__":
save_tflite_classification(
model, MODEL_DIR, "beepboop", IMG_SIZE + (3,)
)

```

{{% /expand%}}.
Expand Down

0 comments on commit 3de2d8e

Please sign in to comment.