Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cid/prompt #160

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 3 additions & 21 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,8 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

* Fix warnings related to not closing requests sessions
* Loading bar no longer jumps from 0% to 100% for large uploads
## [0.1.0a20]

### Added

* Added the project's `export` method, which exports the resources in the project's staging area to a specified location.
* Added `llm` as a supported `architectureType` for models.
* Added `protobuf<3.20` to requirements to fix compatibility issue with Tensorflow.
* Warnings if the dependencies from the `requirement_txt_file` and current environment are inconsistent.
* Paths to custom SSL certificates can now be modified by altering `openlayer.api.VERIFY_REQUESTS`. The value can either be True (default), False, or a path to a certificate.
* Ability to check for goal statuses through the API.

### Changed

* Renamed conda environment created by the model runner from `new-openlayer` to `model-runner-env-%m-%d-%H-%M-%S-%f`.
* Modified the zero-index integer checks for `predictionsColumnName` and `labelColumnName` to support dataset uploads with only a sample of the classes.
* Renamed `predictionsColumnName` argument from the datasets' configuration YAML to `predictionScoresColumnName`.
* Migrated package name from [openlayer](https://pypi.org/project/openlayer/) to [openlayer](https://pypi.org/project/openlayer/) due to a company name change.
* Required Python version `>=3.7` and `<3.9`.
* Added `prompt` as an optional field in the config for LLM production data.
* `llm_monitor` for OpenAI ChatCompletion models records the `prompt` used and uploads it.
31 changes: 15 additions & 16 deletions openlayer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
project.status()
project.push()
"""
import copy
import os
import shutil
import tarfile
Expand All @@ -39,7 +38,7 @@
from .inference_pipelines import InferencePipeline
from .project_versions import ProjectVersion
from .projects import Project
from .schemas import BaselineModelSchema, DatasetSchema, ModelSchema
from .schemas import dataset_schemas, model_schemas
from .tasks import TaskType
from .validators import (
baseline_model_validators,
Expand Down Expand Up @@ -334,7 +333,9 @@ def add_model(
# Load model config and augment with defaults
if model_config_file_path is not None:
model_config = utils.read_yaml(model_config_file_path)
model_data = ModelSchema().load({"task_type": task_type.value, **model_config})
model_data = model_schemas.ModelSchema().load(
{"task_type": task_type.value, **model_config}
)

# Copy relevant resources to temp directory
with tempfile.TemporaryDirectory() as temp_dir:
Expand Down Expand Up @@ -432,7 +433,7 @@ def add_baseline_model(
if model_config_file_path is not None:
model_config = utils.read_yaml(model_config_file_path)
model_config["modelType"] = "baseline"
model_data = BaselineModelSchema().load(
model_data = model_schemas.BaselineModelSchema().load(
{"task_type": task_type.value, **model_config}
)

Expand Down Expand Up @@ -481,7 +482,7 @@ def add_dataset(
# Load dataset config and augment with defaults
if dataset_config_file_path is not None:
dataset_config = utils.read_yaml(dataset_config_file_path)
dataset_data = DatasetSchema().load(
dataset_data = dataset_schemas.DatasetSchema().load(
{"task_type": task_type.value, **dataset_config}
)
if dataset_data.get("columnNames") is None:
Expand Down Expand Up @@ -930,7 +931,7 @@ def create_inference_pipeline(
" upload.",
) from None

reference_dataset_data = DatasetSchema().load(
reference_dataset_data = dataset_schemas.ReferenceDatasetSchema().load(
{"task_type": task_type.value, **reference_dataset_config}
)

Expand Down Expand Up @@ -1034,7 +1035,7 @@ def upload_reference_dataset(
) from None

# Load dataset config and augment with defaults
dataset_data = DatasetSchema().load(
dataset_data = dataset_schemas.ReferenceDatasetSchema().load(
{"task_type": task_type.value, **dataset_config}
)
# Add default columns if not present
Expand Down Expand Up @@ -1115,7 +1116,10 @@ def stream_data(
stream_config, stream_df = self._add_default_columns(
config=stream_config, df=stream_df
)
stream_config = self._strip_read_only_fields(stream_config)

# Remove the `label` for the upload
stream_config.pop("label", None)

body = {
"config": stream_config,
"rows": stream_df.to_dict(orient="records"),
Expand All @@ -1128,13 +1132,6 @@ def stream_data(
if self.verbose:
print("Stream published!")

def _strip_read_only_fields(self, config: Dict[str, any]) -> Dict[str, any]:
"""Strips read-only fields from the config."""
stripped_config = copy.deepcopy(config)
for field in ["columnNames", "label"]:
stripped_config.pop(field, None)
return stripped_config

def publish_batch_data(
self,
inference_pipeline_id: str,
Expand Down Expand Up @@ -1244,7 +1241,9 @@ def _validate_production_data_and_load_config(
"Make sure to fix all of the issues listed above before the upload.",
) from None

config = DatasetSchema().load({"task_type": task_type.value, **config})
config = dataset_schemas.ProductionDataSchema().load(
{"task_type": task_type.value, **config}
)

return config

Expand Down
22 changes: 22 additions & 0 deletions openlayer/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"""
import os

import marshmallow as ma

# ---------------------------- Commit/staging flow --------------------------- #
VALID_RESOURCE_NAMES = {"model", "training", "validation", "fine-tuning"}
OPENLAYER_DIR = os.path.join(os.path.expanduser("~"), ".openlayer")
Expand All @@ -12,3 +14,23 @@

# ----------------------------------- APIs ----------------------------------- #
REQUESTS_TIMEOUT = 60 * 60 * 3 # 3 hours

# ---------------------------- Validation patterns --------------------------- #
COLUMN_NAME_REGEX = validate = ma.validate.Regexp(
r"^(?!openlayer)[a-zA-Z0-9_-]+$",
error="strings that are not alphanumeric with underscores or hyphens."
+ " Spaces and special characters are not allowed."
+ " The string cannot start with `openlayer`.",
)
LANGUAGE_CODE_REGEX = ma.validate.Regexp(
r"^[a-z]{2}(-[A-Z]{2})?$",
error="`language` of the dataset is not in the ISO 639-1 (alpha-2 code) format.",
)

COLUMN_NAME_VALIDATION_LIST = [
ma.validate.Length(
min=1,
max=60,
),
COLUMN_NAME_REGEX,
]
10 changes: 6 additions & 4 deletions openlayer/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ class DatasetType(Enum):
Used by the ``dataset_type`` argument of the :meth:`openlayer.OpenlayerClient.add_dataset` and
:meth:`openlayer.OpenlayerClient.add_dataframe` methods."""

#: For validation sets.
Validation = "validation"
#: For training sets.
Training = "training"
#: For fine-tuning data.
FineTuning = "fine-tuning"
#: For production data.
Production = "production"
#: For reference datasets.
Reference = "reference"
#: For training sets.
Training = "training"
#: For validation sets.
Validation = "validation"


class Dataset:
Expand Down
91 changes: 68 additions & 23 deletions openlayer/llm_monitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
import time
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple

import openai
import pandas as pd
Expand Down Expand Up @@ -204,9 +204,12 @@ def modified_create_chat_completion(*args, **kwargs) -> str:
latency = (time.time() - start_time) * 1000

try:
input_data = self._format_user_messages(kwargs["messages"])
prompt, input_data = self.format_input(kwargs["messages"])
output_data = response.choices[0].message.content.strip()
num_of_tokens = response.usage.total_tokens
config = self.data_config.copy()
config["prompt"] = prompt
config.update({"inputVariableNames": list(input_data.keys())})

self._append_row_to_df(
input_data=input_data,
Expand All @@ -215,10 +218,10 @@ def modified_create_chat_completion(*args, **kwargs) -> str:
latency=latency,
)

self._handle_data_publishing()
self._handle_data_publishing(config=config)
# pylint: disable=broad-except
except Exception as e:
logger.error("Failed to track chat request. %s", e)
logger.error("Failed to monitor chat request. %s", e)

return response

Expand All @@ -242,7 +245,7 @@ def modified_create_completion(*args, **kwargs):
num_of_tokens = int(response.usage.total_tokens / len(prompts))

self._append_row_to_df(
input_data=input_data,
input_data={"message": input_data},
output_data=output_data,
num_of_tokens=num_of_tokens,
latency=latency,
Expand All @@ -251,19 +254,52 @@ def modified_create_completion(*args, **kwargs):
self._handle_data_publishing()
# pylint: disable=broad-except
except Exception as e:
logger.error("Failed to track completion request. %s", e)
logger.error("Failed to monitor completion request. %s", e)

return response

return modified_create_completion

@staticmethod
def _format_user_messages(conversation_list: List[Dict[str, str]]) -> str:
"""Extracts the 'user' messages from the conversation list and returns them
as a single string."""
return "\n".join(
item["content"] for item in conversation_list if item["role"] == "user"
).strip()
def format_input(
messages: List[Dict[str, str]]
) -> Tuple[List[Dict[str, str]], Dict[str, str]]:
"""Formats the input messages.

Returns messages (prompt) replacing the user messages with input variables
in brackets (e.g., ``{{ message_0 }}``) and a dictionary mapping the input variable
names to the original user messages.

Parameters
----------
messages : List[Dict[str, str]]
List of messages that were sent to the chat completion model. Each message
is a dictionary with the following keys:

- ``role``: The role of the message. Can be either ``"user"`` or ``"system"``.
- ``content``: The content of the message.

Returns
-------
Tuple(List[Dict[str, str]], Dict[str, str])
The formatted messages and the mapping from input variable names to the
original user messages.
"""
input_messages = []
input_variables = {}
for i, message in enumerate(messages):
if message["role"] == "user":
input_variable_name = f"message_{i}"
input_messages.append(
{
"role": message["role"],
"content": f"{{{{ {input_variable_name} }}}}",
}
)
input_variables[input_variable_name] = message["content"]
else:
input_messages.append(message)
return input_messages, input_variables

@staticmethod
def _split_list(lst: List, n_parts: int) -> List[List]:
Expand All @@ -288,37 +324,46 @@ def _split_list(lst: List, n_parts: int) -> List[List]:
return result

def _append_row_to_df(
self, input_data: str, output_data: str, num_of_tokens: int, latency: float
self,
input_data: Dict[str, str],
output_data: str,
num_of_tokens: int,
latency: float,
) -> None:
"""Appends a row with input/output, number of tokens, and latency to the
df."""
row = pd.DataFrame(
[
{
"input": input_data,
"output": output_data,
"tokens": num_of_tokens,
"latency": latency,
**input_data,
**{
"output": output_data,
"tokens": num_of_tokens,
"latency": latency,
},
}
]
)
if self.accumulate_data:
self.df = pd.concat([self.df, row], ignore_index=True)
else:
self.df = row
self.df = self.df.astype(
{"input": object, "output": object, "tokens": int, "latency": float}
)

def _handle_data_publishing(self) -> None:
# Perform casting
input_columns = [col for col in self.df.columns if col.startswith("message")]
casting_dict = {col: object for col in input_columns}
casting_dict.update({"output": object, "tokens": int, "latency": float})
self.df = self.df.astype(casting_dict)

def _handle_data_publishing(self, config: Optional[Dict[str, any]] = None) -> None:
"""Handle data publishing.

If `publish` is set to True, publish the latest row to Openlayer.
"""
if self.publish:
self.inference_pipeline.stream_data(
stream_data=self.df.tail(1).to_dict(orient="records"),
stream_config=self.data_config,
stream_config=config or self.data_config,
)

def start_monitoring(self) -> None:
Expand Down Expand Up @@ -411,7 +456,7 @@ def publish_batch_data(self):
def data_config(self) -> Dict[str, any]:
"""Data config for the df. Used for publishing data to Openlayer."""
return {
"inputVariableNames": ["input"],
"inputVariableNames": ["message"],
"label": "production",
"outputColumnName": "output",
"numOfTokenColumnName": "tokens",
Expand Down
Loading
Loading