Skip to content

Commit

Permalink
improvement: updates to custom metric runner
Browse files Browse the repository at this point in the history
  • Loading branch information
whoseoyster authored and stainless-app[bot] committed Aug 21, 2024
1 parent b5bec3a commit 71ecc84
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 24 deletions.
12 changes: 9 additions & 3 deletions src/openlayer/lib/core/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ class OpenlayerModel(abc.ABC):
def run_from_cli(self) -> None:
"""Run the model from the command line."""
parser = argparse.ArgumentParser(description="Run data through a model.")
parser.add_argument("--dataset-path", type=str, required=True, help="Path to the dataset")
parser.add_argument(
"--dataset-path", type=str, required=True, help="Path to the dataset"
)
parser.add_argument(
"--output-dir",
type=str,
Expand All @@ -61,14 +63,16 @@ def run_from_cli(self) -> None:
def batch(self, dataset_path: str, output_dir: str) -> None:
"""Reads the dataset from a file and runs the model on it."""
# Load the dataset into a pandas DataFrame
fmt = "csv"
if dataset_path.endswith(".csv"):
df = pd.read_csv(dataset_path)
elif dataset_path.endswith(".json"):
df = pd.read_json(dataset_path, orient="records")
fmt = "json"

# Call the model's run_batch method, passing in the DataFrame
output_df, config = self.run_batch_from_df(df)
self.write_output_to_directory(output_df, config, output_dir)
self.write_output_to_directory(output_df, config, output_dir, fmt)

def run_batch_from_df(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, dict]:
"""Function that runs the model and returns the result."""
Expand All @@ -83,7 +87,9 @@ def run_batch_from_df(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, dict]:
# Filter row_dict to only include keys that are valid parameters
# for the 'run' method
row_dict = row.to_dict()
filtered_kwargs = {k: v for k, v in row_dict.items() if k in run_signature.parameters}
filtered_kwargs = {
k: v for k, v in row_dict.items() if k in run_signature.parameters
}

# Call the run method with filtered kwargs
output = self.run(**filtered_kwargs)
Expand Down
70 changes: 49 additions & 21 deletions src/openlayer/lib/core/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self):
self.config_path: str = ""
self.config: Dict[str, Any] = {}
self.datasets: List[Dataset] = []
self.selected_metrics: Optional[List[str]] = None
self.likely_dir: str = ""

def run_metrics(self, metrics: List[BaseMetric]) -> None:
"""Run a list of metrics."""
Expand Down Expand Up @@ -87,30 +87,28 @@ def _parse_args(self) -> None:
type=str,
required=False,
default="",
help="The path to your openlayer.json. Uses working dir if not provided.",
help=(
"The path to your openlayer.json. Uses parent parent dir if not "
"provided (assuming location is metrics/metric_name/run.py)."
),
)

# Parse the arguments
args = parser.parse_args()
self.config_path = args.config_path
self.likely_dir = os.path.dirname(os.path.dirname(os.getcwd()))

def _load_openlayer_json(self) -> None:
"""Load the openlayer.json file."""

if not self.config_path:
openlayer_json_path = os.path.join(os.getcwd(), "openlayer.json")
openlayer_json_path = os.path.join(self.likely_dir, "openlayer.json")
else:
openlayer_json_path = self.config_path

with open(openlayer_json_path, "r", encoding="utf-8") as f:
self.config = json.load(f)

# Extract selected metrics
if "metrics" in self.config and "settings" in self.config["metrics"]:
self.selected_metrics = [
metric["key"] for metric in self.config["metrics"]["settings"] if metric["selected"]
]

def _load_datasets(self) -> None:
"""Compute the metric from the command line."""

Expand All @@ -125,20 +123,34 @@ def _load_datasets(self) -> None:
# Read the outputs directory for dataset folders. For each, load
# the config.json and the dataset.json files into a dict and a dataframe

for dataset_folder in os.listdir(output_directory):
full_output_dir = os.path.join(self.likely_dir, output_directory)

for dataset_folder in os.listdir(full_output_dir):
if dataset_folder not in dataset_names:
continue
dataset_path = os.path.join(output_directory, dataset_folder)
dataset_path = os.path.join(full_output_dir, dataset_folder)
config_path = os.path.join(dataset_path, "config.json")
with open(config_path, "r", encoding="utf-8") as f:
dataset_config = json.load(f)
# Merge with the dataset fields from the openlayer.json
dataset_dict = next(
(
item
for item in datasets_list
if item["name"] == dataset_folder
),
None,
)
dataset_config = {**dataset_dict, **dataset_config}

# Load the dataset into a pandas DataFrame
if os.path.exists(os.path.join(dataset_path, "dataset.csv")):
dataset_df = pd.read_csv(os.path.join(dataset_path, "dataset.csv"))
data_format = "csv"
elif os.path.exists(os.path.join(dataset_path, "dataset.json")):
dataset_df = pd.read_json(os.path.join(dataset_path, "dataset.json"), orient="records")
dataset_df = pd.read_json(
os.path.join(dataset_path, "dataset.json"), orient="records"
)
data_format = "json"
else:
raise ValueError(f"No dataset found in {dataset_folder}.")
Expand All @@ -153,19 +165,20 @@ def _load_datasets(self) -> None:
)
)
else:
raise ValueError("No model found in the openlayer.json file. Cannot compute metric.")
raise ValueError(
"No model found in the openlayer.json file. Cannot compute metric."
)

if not datasets:
raise ValueError("No datasets found in the openlayer.json file. Cannot compute metric.")
raise ValueError(
"No datasets found in the openlayer.json file. Cannot compute metric."
)

self.datasets = datasets

def _compute_metrics(self, metrics: List[BaseMetric]) -> None:
"""Compute the metrics."""
for metric in metrics:
if self.selected_metrics and metric.key not in self.selected_metrics:
print(f"Skipping metric {metric.key} as it is not a selected metric.")
continue
metric.compute(self.datasets)

def _write_updated_datasets_to_output(self) -> None:
Expand Down Expand Up @@ -200,10 +213,14 @@ class BaseMetric(abc.ABC):
Your metric's class should inherit from this class and implement the compute method.
"""

@abc.abstractmethod
def get_key(self) -> str:
"""Return the key of the metric. This should correspond to the folder name."""
pass

@property
def key(self) -> str:
"""Return the key of the metric."""
return self.__class__.__name__
return self.get_key()

def compute(self, datasets: List[Dataset]) -> None:
"""Compute the metric on the model outputs."""
Expand All @@ -226,15 +243,26 @@ def compute_on_dataset(self, dataset: Dataset) -> MetricReturn:
"""Compute the metric on a specific dataset."""
pass

def _write_metric_return_to_file(self, metric_return: MetricReturn, output_dir: str) -> None:
def _write_metric_return_to_file(
self, metric_return: MetricReturn, output_dir: str
) -> None:
"""Write the metric return to a file."""

# Create the directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Turn the metric return to a dict
metric_return_dict = asdict(metric_return)
# Convert the set to a list
metric_return_dict["added_cols"] = list(metric_return.added_cols)

with open(os.path.join(output_dir, f"{self.key}.json"), "w", encoding="utf-8") as f:
with open(
os.path.join(output_dir, f"{self.key}.json"), "w", encoding="utf-8"
) as f:
json.dump(metric_return_dict, f, indent=4)
print(f"Metric ({self.key}) value written to {output_dir}/{self.key}.json")

def run(self) -> None:
"""Run the metric."""
metric_runner = MetricRunner()
metric_runner.run_metrics([self])

0 comments on commit 71ecc84

Please sign in to comment.