From c60a582951f5ad8e73a2c234d3fe01ee2b85b6cb Mon Sep 17 00:00:00 2001 From: David Hung Date: Tue, 21 Jan 2025 12:10:57 -0800 Subject: [PATCH 1/5] Add sample for Headless PrPr (single node) --- samples/ml/ml_jobs/README.md | 260 +++++++ .../single-node/pytorch-cifar10/README.md | 115 +++ .../pytorch-cifar10/requirements.txt | 1 + .../pytorch-cifar10/scripts/check_job.py | 25 + .../pytorch-cifar10/scripts/job_spec.yaml | 9 + .../pytorch-cifar10/scripts/submit_job.py | 75 ++ .../single-node/pytorch-cifar10/src/model.py | 20 + .../pytorch-cifar10/src/requirements.txt | 4 + .../single-node/pytorch-cifar10/src/train.py | 78 ++ .../pytorch-cifar10/src/train_wandb.py | 88 +++ .../single-node/xgb-loan-apps/README.md | 55 ++ .../airflow/single_node_xgb_dag_example.py | 86 +++ .../jupyter/Headless_Runtime_Demo.ipynb | 686 ++++++++++++++++++ .../xgb-loan-apps/requirements.txt | 3 + .../single-node/xgb-loan-apps/src/evaluate.py | 65 ++ .../single-node/xgb-loan-apps/src/main.py | 143 ++++ .../xgb-loan-apps/src/model_utils.py | 123 ++++ .../xgb-loan-apps/src/prepare_data.py | 44 ++ .../single-node/xgb-loan-apps/src/train.py | 107 +++ 19 files changed, 1987 insertions(+) create mode 100644 samples/ml/ml_jobs/README.md create mode 100644 samples/ml/ml_jobs/single-node/pytorch-cifar10/README.md create mode 100644 samples/ml/ml_jobs/single-node/pytorch-cifar10/requirements.txt create mode 100644 samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/check_job.py create mode 100644 samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/job_spec.yaml create mode 100644 samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/submit_job.py create mode 100644 samples/ml/ml_jobs/single-node/pytorch-cifar10/src/model.py create mode 100644 samples/ml/ml_jobs/single-node/pytorch-cifar10/src/requirements.txt create mode 100644 samples/ml/ml_jobs/single-node/pytorch-cifar10/src/train.py create mode 100644 samples/ml/ml_jobs/single-node/pytorch-cifar10/src/train_wandb.py create mode 100644 samples/ml/ml_jobs/single-node/xgb-loan-apps/README.md create mode 100644 samples/ml/ml_jobs/single-node/xgb-loan-apps/airflow/single_node_xgb_dag_example.py create mode 100644 samples/ml/ml_jobs/single-node/xgb-loan-apps/jupyter/Headless_Runtime_Demo.ipynb create mode 100644 samples/ml/ml_jobs/single-node/xgb-loan-apps/requirements.txt create mode 100644 samples/ml/ml_jobs/single-node/xgb-loan-apps/src/evaluate.py create mode 100644 samples/ml/ml_jobs/single-node/xgb-loan-apps/src/main.py create mode 100644 samples/ml/ml_jobs/single-node/xgb-loan-apps/src/model_utils.py create mode 100644 samples/ml/ml_jobs/single-node/xgb-loan-apps/src/prepare_data.py create mode 100644 samples/ml/ml_jobs/single-node/xgb-loan-apps/src/train.py diff --git a/samples/ml/ml_jobs/README.md b/samples/ml/ml_jobs/README.md new file mode 100644 index 0000000..5f6ba37 --- /dev/null +++ b/samples/ml/ml_jobs/README.md @@ -0,0 +1,260 @@ +# Headless Container Runtime Jobs + +## Setup + +The Runtime Job API (`snowflake.ml.jobs`) API is available in +`snowflake-ml-python>=1.7.4`. + +```bash +pip install snowflake-ml-python>=1.7.4 + +# Alternative: Install from private build/wheel file +pip install snowflake_ml_python-1.7.4a20240117-py3-none-any.whl +``` + +> NOTE: The Runtime Job API currently only supports Python 3.10. + Attempting to use the API with a different Python version may yield + unexpected errors. + +The Runtime Job API jobs requires the `ENABLE_SNOWSERVICES_ASYNC_JOBS` +to be enabled in your Snowflake account or session. + + +```sql +-- Enable for session +ALTER SESSION SET ENABLE_SNOWSERVICES_ASYNC_JOBS = TRUE; + +-- Enable for account (requires ACCOUNTADMIN) +ALTER ACCOUNT SET ENABLE_SNOWSERVICES_ASYNC_JOBS = TRUE; +``` + +## Getting Started + +### Prerequisites + +Create a compute pool if you don't already have one ready to use. + +```sql +CREATE COMPUTE POOL IF NOT EXISTS HEADLESS_JOB_POOL -- Customize as desired + MIN_NODES = 1 + MAX_NODES = 1 -- Increase if more concurrency desired + INSTANCE_FAMILY = CPU_X64_S -- See https://docs.snowflake.com/en/sql-reference/sql/create-compute-pool +``` + +### Function Dispatch + +Python functions can be executed as Runtime Jobs using the `snowflake.ml.jobs.remote` +decorator. + +```python +from snowflake.ml.jobs import remote + +compute_pool = "MY_COMPUTE_POOL" +@remote(compute_pool, stage_name="payload_stage") +def hello_world(name: str = "world"): + # We recommend importing any needed modules *inside* the function definition + import datetime + + print(f"{datetime.now()} Hello {name}!") + +# Function invocation returns a job handle (snowflake.ml.jobs.MLJob) +job = hello_world("developer") + +print(job.id) # Jobs are given unique IDs +print(job.status) # Check job status +print(job.get_logs()) # Check job's console logs +print(job.wait(timeout=10)) # Block until job completion with optional timeout +``` + +> NOTE: Compute pool startup can take several minutes and can cause job execution + to be delayed; subsequent job executions should start much faster. + Consider manually starting the compute pool using + `ALTER COMPUTE POOL RESUME` prior to job execution. + +### File-based Dispatch + +The API also supports submitting entire Python files for execution for more +flexibility. + +```python +# /path/to/repo/my_script.py + +def main(*args): + print("Hello world", *args) + +if __name__ == '__main__': + import sys + main(*sys.argv[1:]) +``` + +```python +from snowflake.ml.jobs import submit_file, submit_directory + +compute_pool = "MY_COMPUTE_POOL" + +# Upload and run a single script +job1 = submit_file( + "/path/to/repo/my_script.py", + compute_pool, + stage_name="payload_stage", + args=["arg1", "--arg2_key", "arg2_value"], # (Optional) args are passed to script as-is +) + +# Upload an entire directory and run a contained entrypoint +# This is useful if your code is organized into multiple modules/files +job2 = submit_directory( + "/path/to/repo/", + compute_pool, + entrypoint="my_script.py", + stage_name="payload_stage", + args=["arg1", "arg2"], # (Optional) args are passed to script as-is +) +``` + +`job1` and `job2` are job handles, see [Function Dispatch](#function-dispatch) +for usage examples. + +## Advanced Usage + +### Custom Python Dependencies + +The Runtime Job API runs payloads inside the Snowflake +[Container Runtime for ML](https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-runtime-ml) +environment which comes pre-installed with most commonly used Python packages +for machine learning and data science. Most use cases should work "out of the box" +with no additional Python packages needed. If custom dependencies are required, +the API supports specifying `pip_requirements` which will be installed on runtime +startup. + +Installing packages to the runtime environment requires an +[External Access Integration](https://docs.snowflake.com/en/developer-guide/external-network-access/creating-using-external-network-access) + +```sql +-- Requires ACCOUNTADMIN +-- Snowflake provides a pre-configured network rule for PyPI access +CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI + ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule) + ENABLED = true; +GRANT USAGE ON INTEGRATION PYPI_EAI TO ROLE ; +``` + +```python +from snowflake.ml.jobs import remote, submit_file, submit_directory + +compute_pool = "MY_COMPUTE_POOL" + +# Example only; numpy is already installed in the runtime environment by default +@remote( + compute_pool, + stage_name="payload_stage", + pip_requirements=["numpy"], + external_access_integrations=["pypi_eai"], +) +def hello_world(name: str = "world"): + # We recommend importing any needed modules *inside* the function definition + import datetime + + print(f"{datetime.now()} Hello {name}!") + +job1 = hello_world("developer") + +# Can use standard pip/requirements syntax to specify versions +job2 = submit_file( + "/path/to/repo/my_script.py", + compute_pool, + stage_name="payload_stage", + pip_requirements=["numpy==2.2.*"], + external_access_integrations=["pypi_eai"], +) + +# Can provide PIP_INDEX_URL to install packages from private source(s) +job3 = submit_directory( + "/path/to/repo/", + compute_pool, + entrypoint="my_script.py", + stage_name="payload_stage", + pip_requirements=["custom-package"], + external_access_integrations=["custom_feed_eai"], # Configure EAI as needed + env_vars={'PIP_INDEX_URL': 'https://my-private-pypi-server.com/simple'}, +) +``` + +### Airflow Integration + +The Runtime Job API can be used in Airflow using the +[SnowparkOperator](https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/operators/snowpark.html). + +> NOTE: Make sure `snowflake-ml-python>=1.7.4` is installed in your Airflow worker environment(s) + +```python +import datetime +from airflow.decorators import dag, task +from snowflake.ml import remote, submit_file + +@dag(start_date=datetime.datetime(2025, 1, 1), schedule="@daily") +def my_dag(): + @task.snowpark() + def task_from_function(): + # SnowparkOperator automatically creates a Snowpark Session + # which the Runtime Job API can infer from context + @remote("HEADLESS_JOB_POOL", stage_name="payload_stage") + def my_function(): + print("Hello world") + job = my_function() + print("Job %s submitted" % job.id) + print("Job %s ended with status %s. Logs:\n" % (job.id, job.wait(), job.get_logs())) + + @task.snowpark() + def task_from_file(): + # SnowparkOperator automatically creates a Snowpark Session + # which the Runtime Job API can infer from context + job = submit_file( + "./my_script.py", + "HEADLESS_JOB_POOL", + stage_name="payload_stage", + ) + print("Job %s submitted" % job.id) + print("Job %s ended with status %s. Logs:\n" % (job.id, job.wait(), job.get_logs())) + + task_from_function() + task_from_file() + +my_dag() +``` + +## Next Steps + +- See the [XGBoost Classifier Example](./single-node/xgb-loan-apps/) for a full + walkthrough of training and deploying an XGBoost model. +- See the [PyTorch Classifier Example](./single-node/pytorch-cifar10/) for a full + walkthrough of training a PyTorch model with Weights and Biases integration + +## Known Limitations + +1. The Headless Runtime currently only supports Python 3.10. Attempting to use +other Python versions may throw errors like `UnpicklingError`. +1. Running a large number of jobs can result in service start failure due to +`Number of services limit exceeded for the account`. This will be fixed in an upcoming release. + - This prevents any kind of SPCS service from starting on the account, including Notebooks and Model Inference + - As a temporary workaround, please avoid launching more than 200 concurrent + jobs and manually delete completed and failed jobs. + ```sql + SHOW JOB SERVICES LIKE 'MLJOB%'; + DROP SERVICE ; + ``` + ```python + from snowflake.ml.jobs import list_jobs, delete_job + for row in list_jobs(limit=-1).collect(): + if row["status"] in {"DONE", "FAILED"}: + delete_job(row["id"]) + ``` +1. Job logs are lost upon compute pool suspension even if the job entity itself has not been deleted. +This may happen either due to manual suspension `ALTER COMPUTE POOL MY_POOL SUSPEND` +or auto suspension on idle timeout. + - Compute pool auto suspension can be disabled using `ALTER COMPUTE POOL MY_POOL SET AUTO_SUSPEND_SECS = 0` + - For more information, see + [Compute pool privileges](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/working-with-compute-pool#compute-pool-privileges) + and [Compute pool cost](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/accounts-orgs-usage-views#compute-pool-cost) +1. Job payload stages (specified via `stage_name` param) are not automatically + cleaned up. Please manually clean up the payload stage(s) to prevent + excessive storage costs. \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/pytorch-cifar10/README.md b/samples/ml/ml_jobs/single-node/pytorch-cifar10/README.md new file mode 100644 index 0000000..028490b --- /dev/null +++ b/samples/ml/ml_jobs/single-node/pytorch-cifar10/README.md @@ -0,0 +1,115 @@ +# PyTorch Classifier Example + +## Setup + +Install Python requirements using `pip install -r requirements.txt` from the sample directory. + +## Training Script + +The model training code is located in the [src directory](./src/) and contains multiple files: +- A pip `requirements.txt` file +- `model.py` contains the model definition +- `train.py` contains basic training logic +- `train_wandb.py` is a variant of `train.py` with added [Weights and Biases integration](#weights-and-biases-integration) + +### Upload data to stage + +Manually download the CIFAR-10 datasets and upload them to a stage. + +Python: +```python +import torchvision +import torchvision.transforms as transforms + +transform = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) +]) + +trainset = torchvision.datasets.CIFAR10(root="./data/cifar10", train=True, download=True, transform=transform) +testset = torchvision.datasets.CIFAR10(root="./data/cifar10", train=False, download=True, transform=transform) +``` + +Shell: +```bash +snow stage create cifar10_data +snow stage copy ./data/cifar10 @cifar10_data --recursive +``` + +## Launch Job + +Scripts needed to launch model training as a job are included in the +[scripts/](./scripts) directory. It includes: +- `submit_job.py` facilitates deploying the training scripts as a job +- `check_job.py` can be used to inspect running and recently completed jobs. +- `job_spec.yaml` includes additional service specification values to demonstrate + using the `spec_override` parameter of the ML Job API. + +```bash +python scripts/submit_job.py -h # View available options +python scripts/submit_job.py -p DEMO_POOL_CPU -e pypi_eai # Basic run +python scripts/submit_job.py -p DEMO_POOL_CPU -e pypi_eai -c preprod8 --epochs 1 # Quick run on preprod8 for debugging +``` + +The API uploads the payload from `./src/` into a Snowflake stage and generates a +[service specification](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/specification-reference) +configured to mount the stage and run the entrypoint `train.py`. +In this example, we additionally provide a `job_spec.yml` used to mount the `@cifar10_data` +stage created in the [Upload Data to Stage](#upload-data-to-stage) step. +Finally, the job is executed as an SPCS [JOB SERVICE](https://docs.snowflake.com/en/sql-reference/sql/execute-job-service) on `DEMO_POOL_CPU`. + +> NOTE: There is currently no automatic cleanup of the uploaded payloads. Be sure to + manually clean up the payload stage(s) to prevent unintended storage costs. + +The above command(s) will print a message like `Started job with ID: MLJOB_(uuid)`. +You can check on the progress of the submitted job using [scripts/check_job.py](./scripts/check_job.py) + +```bash +python scripts/check_job.py MLJOB_2CECA414_F52E_4F45_840A_9623A52DA5C4 +python scripts/check_job.py MLJOB_2CECA414_F52E_4F45_840A_9623A52DA5C4 -c preprod8 # Use preprod8 connection +python scripts/check_job.py MLJOB_2CECA414_F52E_4F45_840A_9623A52DA5C4 --show-logs # Print any job execution logs +python scripts/check_job.py MLJOB_2CECA414_F52E_4F45_840A_9623A52DA5C4 --block # Block until job completion +``` + +## Weights and Biases Integration + +Integrating with Weights and Biases (W&B) requires adding just a +[few lines of code](https://docs.wandb.ai/quickstart/#putting-it-all-together) to the training script. +[train_wandb.py](./src/train_wandb.py) is an updated version of the training script with W&B integration included. + +Configure an External Access Integration to allow your SPCS service to connect to the W&B servers. + +SQL: + +```sql +CREATE OR REPLACE NETWORK RULE WANDB_RULE + MODE = EGRESS + TYPE = HOST_PORT + VALUE_LIST = ('api.wandb.ai:443'); + +CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION WANDB_EAI + ALLOWED_NETWORK_RULES = (WANDB_RULE) + ENABLED = true; +GRANT USAGE ON INTEGRATION WANDB_EAI TO ROLE ; +``` + +Configure W&B authentication by securely injecting your W&B API key as a [Snowflake Secrets](https://docs.snowflake.com/en/user-guide/api-authentication#managing-secrets). + +SQL: + +```sql +CREATE OR REPLACE SECRET WANDB_SECRET + TYPE = GENERIC_STRING + SECRET_STRING = ''; +``` + +Include the newly configured External Access Integration and Secret when running +[submit_job.py](./scripts/submit_job.py): + +```bash +python scripts/submit_job.py \ + -p DEMO_POOL_CPU \ + -e pypi_eai wandb_eai \ + -c preprod8 \ + --wandb-secret-name wandb_secret +``` diff --git a/samples/ml/ml_jobs/single-node/pytorch-cifar10/requirements.txt b/samples/ml/ml_jobs/single-node/pytorch-cifar10/requirements.txt new file mode 100644 index 0000000..62780fe --- /dev/null +++ b/samples/ml/ml_jobs/single-node/pytorch-cifar10/requirements.txt @@ -0,0 +1 @@ +snowflake-ml-python>=1.7.4 \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/check_job.py b/samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/check_job.py new file mode 100644 index 0000000..0d51b66 --- /dev/null +++ b/samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/check_job.py @@ -0,0 +1,25 @@ +import argparse +import snowflake.ml.jobs as jobs + +from snowflake.snowpark import Session +from snowflake.ml.utils.connection_params import SnowflakeLoginOptions + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("job_id", type=str, help="Job ID to check") + parser.add_argument("--show-logs", action="store_true", help="Show job logs") + parser.add_argument("--block", action="store_true", help="Block until job completes") + parser.add_argument("-c", "--snowflake-config", type=str, required=False) + args = parser.parse_args() + + session = Session.builder.configs(SnowflakeLoginOptions(args.snowflake_config)).create() + + job = jobs.get_job(args.job_id, session=session) + if args.block: + job.wait() + print("Job status:", job.status) + if args.show_logs: + job.show_logs() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/job_spec.yaml b/samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/job_spec.yaml new file mode 100644 index 0000000..51382c0 --- /dev/null +++ b/samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/job_spec.yaml @@ -0,0 +1,9 @@ +spec: + containers: + - name: "main" + volumeMounts: + - name: cifar10-data + mountPath: /data + volumes: + - name: cifar10-data + source: "@cifar10_data" diff --git a/samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/submit_job.py b/samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/submit_job.py new file mode 100644 index 0000000..5cb5a00 --- /dev/null +++ b/samples/ml/ml_jobs/single-node/pytorch-cifar10/scripts/submit_job.py @@ -0,0 +1,75 @@ +import os +import yaml +import snowflake.ml.jobs as jobs +import logging +from snowflake.snowpark import Session + +def load_spec(filepath: str) -> dict: + with open(filepath, "r") as f: + return yaml.safe_load(f) + + +def run_job(session: Session, compute_pool: str, external_access_integrations: list, payload_stage: str = "payload_stage", wandb_secret_name: str = None, args_list: list = None, block: bool = False) -> None: + spec = load_spec(os.path.join(os.path.dirname(__file__), "job_spec.yaml")) + payload_source = os.path.realpath(os.path.join(os.path.dirname(__file__), "..", "src")) + entrypoint = "train.py" + args = [ + "--data-dir", + "/data", + "--model-dir", + "/opt/app", + "--tensorboard-dir", + "/logs", + ] + if args_list: + args = args + args_list + + if wandb_secret_name: + spec["spec"]["containers"][0]["secrets"] = [ + { + "snowflakeSecret": wandb_secret_name, + "secretKeyRef": "secret_string", + "envVarName": "WANDB_API_KEY", + }, + ] + entrypoint = "train_wandb.py" + + logging.info("Payload will be uploaded to stage: %s", payload_stage) + job = jobs.submit_directory( + payload_source, + entrypoint=entrypoint, + args=args, + compute_pool=compute_pool, + stage_name=payload_stage, + spec_overrides=spec, + external_access_integrations=external_access_integrations, + session=session, + ) + print("Started job with ID:", job.id) + if block: + job.wait() + job.show_logs() + + +if __name__ == '__main__': + import argparse + from snowflake.ml.utils.connection_params import SnowflakeLoginOptions + + parser = argparse.ArgumentParser() + parser.add_argument("-p", "--compute-pool", type=str, required=True) + parser.add_argument("-e", "--external-access-integrations", type=str, nargs="*", required=True, help="PyPI EAI and any additional EAIs (e.g. W&B)") + parser.add_argument("--wandb-secret-name", type=str, required=False, help="Name of the secret containing the W&B API key") + parser.add_argument("-c", "--snowflake-config", type=str, required=False) + parser.add_argument("-s", "--payload_stage", type=str, required=False, default="payload_stage") + parser.add_argument("--block", action="store_true", help="Block until job completes") + args, unparsed_args = parser.parse_known_args() + + session = Session.builder.configs(SnowflakeLoginOptions(args.snowflake_config)).create() + + run_job( + session=session, + compute_pool=args.compute_pool, + external_access_integrations=args.external_access_integrations, + wandb_secret_name=args.wandb_secret_name, + args_list=unparsed_args, + ) \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/model.py b/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/model.py new file mode 100644 index 0000000..31b2b6b --- /dev/null +++ b/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/model.py @@ -0,0 +1,20 @@ +import torch.nn as nn + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = self.pool(nn.functional.relu(self.conv1(x))) + x = self.pool(nn.functional.relu(self.conv2(x))) + x = x.view(-1, 16 * 5 * 5) + x = nn.functional.relu(self.fc1(x)) + x = nn.functional.relu(self.fc2(x)) + x = self.fc3(x) + return x \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/requirements.txt b/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/requirements.txt new file mode 100644 index 0000000..5b28126 --- /dev/null +++ b/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/requirements.txt @@ -0,0 +1,4 @@ +torch +torchvision +tensorboard +wandb diff --git a/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/train.py b/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/train.py new file mode 100644 index 0000000..b112601 --- /dev/null +++ b/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/train.py @@ -0,0 +1,78 @@ +import os +import argparse +import torch +import torch.nn as nn +import torch.optim as optim +import torchvision +import torchvision.transforms as transforms +from torch.utils.data import DataLoader +from torch.utils.tensorboard import SummaryWriter +from model import Net + +def train(args): + device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + + transform = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) + ]) + + trainset = torchvision.datasets.CIFAR10(root=args.data_dir, train=True, download=False, transform=transform) + trainloader = DataLoader(trainset, batch_size=args.batch_size, shuffle=True, num_workers=2) + + testset = torchvision.datasets.CIFAR10(root=args.data_dir, train=False, download=False, transform=transform) + testloader = DataLoader(testset, batch_size=args.batch_size, shuffle=False, num_workers=2) + + net = Net().to(device) + criterion = nn.CrossEntropyLoss() + optimizer = optim.SGD(net.parameters(), lr=args.learning_rate, momentum=args.momentum) + + writer = SummaryWriter(log_dir=args.tensorboard_dir) + + for epoch in range(args.epochs): + running_loss = 0.0 + for i, data in enumerate(trainloader, 0): + inputs, labels = data[0].to(device), data[1].to(device) + + optimizer.zero_grad() + outputs = net(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + running_loss += loss.item() + if i % 200 == 199: + avg_loss = running_loss / 200 + writer.add_scalar('training loss', avg_loss, epoch * len(trainloader) + i) + running_loss = 0.0 + + correct = 0 + total = 0 + with torch.no_grad(): + for data in testloader: + images, labels = data[0].to(device), data[1].to(device) + outputs = net(images) + _, predicted = torch.max(outputs.data, 1) + total += labels.size(0) + correct += (predicted == labels).sum().item() + + accuracy = 100 * correct / total + writer.add_scalar('test accuracy', accuracy, epoch) + print(f'Epoch {epoch+1}, Test Accuracy: {accuracy}%') + + print('Finished Training') + model_path = os.path.join(args.model_dir, 'cifar_net.pth') + torch.save(net.state_dict(), model_path) + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='CIFAR-10 training script') + parser.add_argument('--batch-size', type=int, default=64, help='input batch size for training (default: 64)') + parser.add_argument('--epochs', type=int, default=10, help='number of epochs to train (default: 10)') + parser.add_argument('--learning-rate', type=float, default=0.001, help='learning rate (default: 0.001)') + parser.add_argument('--momentum', type=float, default=0.9, help='SGD momentum (default: 0.9)') + parser.add_argument('--data-dir', type=str, default='/data', help='Data directory') + parser.add_argument('--model-dir', type=str, default='/model', help='Model/checkpoint save directory') + parser.add_argument('--tensorboard-dir', type=str, default='/tensorboard', help='Tensorboard log directory') + args = parser.parse_args() + + train(args) \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/train_wandb.py b/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/train_wandb.py new file mode 100644 index 0000000..4f5d936 --- /dev/null +++ b/samples/ml/ml_jobs/single-node/pytorch-cifar10/src/train_wandb.py @@ -0,0 +1,88 @@ +import os +import argparse +import torch +import torch.nn as nn +import torch.optim as optim +import torchvision +import torchvision.transforms as transforms +from torch.utils.data import DataLoader +from torch.utils.tensorboard import SummaryWriter +import wandb +from model import Net + +def train(args): + device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + + transform = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) + ]) + + trainset = torchvision.datasets.CIFAR10(root=args.data_dir, train=True, download=False, transform=transform) + trainloader = DataLoader(trainset, batch_size=args.batch_size, shuffle=True, num_workers=2) + + testset = torchvision.datasets.CIFAR10(root=args.data_dir, train=False, download=False, transform=transform) + testloader = DataLoader(testset, batch_size=args.batch_size, shuffle=False, num_workers=2) + + wandb.login(key=os.environ['WANDB_API_KEY']) + service_name = os.environ.get('SNOWFLAKE_SERVICE_NAME', 'test') + wandb.init(project=f"experiment-{service_name}") + + net = Net().to(device) + wandb.watch(net) + criterion = nn.CrossEntropyLoss() + optimizer = optim.SGD(net.parameters(), lr=args.learning_rate, momentum=args.momentum) + + writer = SummaryWriter(log_dir=args.tensorboard_dir) + + for epoch in range(args.epochs): + running_loss = 0.0 + for i, data in enumerate(trainloader, 0): + inputs, labels = data[0].to(device), data[1].to(device) + + optimizer.zero_grad() + outputs = net(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + running_loss += loss.item() + if i % 200 == 199: + avg_loss = running_loss / 200 + writer.add_scalar('training loss', avg_loss, epoch * len(trainloader) + i) + wandb.log({"training loss": avg_loss}, step=epoch * len(trainloader) + i) + running_loss = 0.0 + + correct = 0 + total = 0 + with torch.no_grad(): + for data in testloader: + images, labels = data[0].to(device), data[1].to(device) + outputs = net(images) + _, predicted = torch.max(outputs.data, 1) + total += labels.size(0) + correct += (predicted == labels).sum().item() + + accuracy = 100 * correct / total + writer.add_scalar('test accuracy', accuracy, epoch) + wandb.log({"test accuracy": accuracy}, step=epoch) + print(f'Epoch {epoch+1}, Test Accuracy: {accuracy}%') + + print('Finished Training') + model_path = os.path.join(args.model_dir, 'cifar_net.pth') + torch.save(net.state_dict(), model_path) + wandb.save(model_path) + wandb.finish() + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='CIFAR-10 training script') + parser.add_argument('--batch-size', type=int, default=64, help='input batch size for training (default: 64)') + parser.add_argument('--epochs', type=int, default=10, help='number of epochs to train (default: 10)') + parser.add_argument('--learning-rate', type=float, default=0.001, help='learning rate (default: 0.001)') + parser.add_argument('--momentum', type=float, default=0.9, help='SGD momentum (default: 0.9)') + parser.add_argument('--data-dir', type=str, default='/data', help='Data directory') + parser.add_argument('--model-dir', type=str, default='/model', help='Model/checkpoint save directory') + parser.add_argument('--tensorboard-dir', type=str, default='/tensorboard', help='Tensorboard log directory') + args = parser.parse_args() + + train(args) \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/xgb-loan-apps/README.md b/samples/ml/ml_jobs/single-node/xgb-loan-apps/README.md new file mode 100644 index 0000000..6cb4e4e --- /dev/null +++ b/samples/ml/ml_jobs/single-node/xgb-loan-apps/README.md @@ -0,0 +1,55 @@ +# XGBoost Classifier Example + +## Setup + +> NOTE: The MLJob API currently only supports Python 3.10 clients. + +Install Python requirements using `pip install -r requirements.txt` from the sample directory. + +This sample uses synthetic data for training and evaluation. Be sure to run data +generation using the [Jupyter notebook](#jupyter) or [prepare_data.py](src/prepare_data.py) +before attempting to run the [VSCode](#vscode) scripts. + +### Connecting to Snowflake in Python + +The scripts included in this example use the `SnowflakeLoginOptions` utility API +from `snowflake-ml-python` to retrieve Snowflake connection settings from config +files must be authored before use. See [Configure Connections](https://docs.snowflake.com/developer-guide/snowflake-cli/connecting/configure-connections#define-connections) +for information on how to define default Snowflake connection(s) in a config.toml +file. + +```python +from snowflake.ml.utils.connection_params import SnowflakeLoginOptions + +# Requires valid ~/.snowflake/config.toml file +session = Session.builder.configs(SnowflakeLoginOptions()).create() +``` + +## Jupyter + +[Headless_Runtime_Demo.ipynb](jupyter/Headless_Runtime_Demo.ipynb) +shows an example of using Headless Container Runtimes to push function execution +into a Container Runtime instance from a Jupyter Notebook + +```bash +jupyter notebook jupyter/Headless_Runtime_Demo.ipynb +``` + +## VSCode + +Payloads can also be dispatched from VSCode or any other IDE. [main.py](src/main.py) +demonstrates how payloads can be dispatched using either a function decorator or +via the `submit_job` API. + +### Script Parameters + +- `--source_table` (OPTIONAL) Training data location. Defaults to `loan_applications` + which is created in the [setup step](#setup) +- `--save_mode` (OPTIONAL) Controls whether to save model to a local path or into Model Registry. Defaults to local +- `--output_dir` (OPTIONAL) Local save path. Only used if `save_mode=local` + +### Example + +```bash +python headless/single-node/xgb-loan-apps/src/main.py +``` \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/xgb-loan-apps/airflow/single_node_xgb_dag_example.py b/samples/ml/ml_jobs/single-node/xgb-loan-apps/airflow/single_node_xgb_dag_example.py new file mode 100644 index 0000000..2f5ab8d --- /dev/null +++ b/samples/ml/ml_jobs/single-node/xgb-loan-apps/airflow/single_node_xgb_dag_example.py @@ -0,0 +1,86 @@ +import os +from datetime import datetime, timedelta, UTC + +from airflow.decorators import dag, task +from airflow.exceptions import AirflowException +from snowflake.ml import jobs + +_PAYLOAD_SOURCE = os.path.realpath(os.path.join(os.path.dirname(__file__), "..", "src")) + +@dag( + dag_id="single_node_xgb_dag_example", + dag_display_name="Single Node XGBoost Example DAG", + schedule=timedelta(weeks=1), + start_date=datetime(2024, 12, 15, tzinfo=UTC), + catchup=False, +) +def single_node_xgb_dag_example(): + """ + Basic sample of a DAG that trains an XGBoost model on a single node + using the preview MLJob APIs in Snowflake. + """ + + @task.snowpark() + def prepare_data(): + # Kick off preprocessing job on SPCS + job = jobs.submit_directory( + _PAYLOAD_SOURCE, + entrypoint="prepare_data.py", + args=["--table_name", "HEADLESS_DEMO_DB.DAG_DEMO.DATA_TABLE", "--num_rows", "100000"], + compute_pool="SYSTEM_COMPUTE_POOL_CPU", + stage_name="HEADLESS_DEMO_DB.DAG_DEMO.PAYLOAD_STAGE", + ) + + # Block until job completes + job.wait() + + # Print logs for observability + job.show_logs() + + return job.id + + @task.snowpark() + def start_training_job(run_id: str): + job = jobs.submit_directory( + _PAYLOAD_SOURCE, + entrypoint="train.py", + args=[ + "--source_data", "HEADLESS_DEMO_DB.DAG_DEMO.DATA_TABLE", + "--output_dir", f"@HEADLESS_DEMO_DB.DAG_DEMO.MODELS/{run_id}", + ], + compute_pool="SYSTEM_COMPUTE_POOL_GPU", + stage_name="HEADLESS_DEMO_DB.DAG_DEMO.PAYLOAD_STAGE", + # num_instances=4, # Multi-node not supported in PrPr + ) + + return job.id + + @task.snowpark() + def wait_for_completion(job_id: str): + job = jobs.get_job(job_id) + job.wait() + if job.status == "DONE": + print("Job completed. Logs:\n%s" % job.get_logs()) + elif job.status == "FAILED": + raise AirflowException("Job failed. Logs:\n%s" % job.get_logs()) + raise AirflowException("Invalid job status %s. Logs:\n%s" % (job.status, job.get_logs())) + + @task.snowpark() + def evaluate_model(run_id: str): + # Run eval job to completion and retrieve result + job = jobs.submit_directory( + _PAYLOAD_SOURCE, + entrypoint="evaluate.py", + args=["--model_path", f"@HEADLESS_DEMO_DB.DAG_DEMO.MODELS/{run_id}", "--source_data", "HEADLESS_DEMO_DB.DAG_DEMO.DATA_TABLE"], + compute_pool="SYSTEM_COMPUTE_POOL_GPU", + stage_name="HEADLESS_DEMO_DB.DAG_DEMO.PAYLOAD_STAGE", + ) + + job.wait() + job.show_logs() + + run_id = prepare_data() + job_id = start_training_job(run_id) + wait_for_completion(job_id) >> evaluate_model(run_id) + +single_node_xgb_dag_example() diff --git a/samples/ml/ml_jobs/single-node/xgb-loan-apps/jupyter/Headless_Runtime_Demo.ipynb b/samples/ml/ml_jobs/single-node/xgb-loan-apps/jupyter/Headless_Runtime_Demo.ipynb new file mode 100644 index 0000000..0dccb3d --- /dev/null +++ b/samples/ml/ml_jobs/single-node/xgb-loan-apps/jupyter/Headless_Runtime_Demo.ipynb @@ -0,0 +1,686 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "7a6496bf-b928-4069-adfa-083299a21a13", + "metadata": {}, + "source": [ + "### Set up Snowpark Session\n", + "\n", + "See [Configure Connections](https://docs.snowflake.com/developer-guide/snowflake-cli/connecting/configure-connections#define-connections)\n", + "for information on how to define default Snowflake connection(s) in a config.toml\n", + "file." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f9880be-f3c2-4100-a35c-0e72595f3bdd", + "metadata": {}, + "outputs": [], + "source": [ + "from snowflake.snowpark import Session, Row\n", + "\n", + "# Requires valid ~/.snowflake/config.toml file\n", + "session = Session.builder.getOrCreate()\n", + "print(session)" + ] + }, + { + "cell_type": "markdown", + "id": "7aa9a253-38c5-4641-a01a-eaf899d24199", + "metadata": {}, + "source": [ + "#### Set up Snowflake resources" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "46e5fad4-748b-4390-b4fa-748a10547835", + "metadata": {}, + "outputs": [], + "source": [ + "schema_name = \"HEADLESS_DEMO\"\n", + "session.sql(f\"CREATE SCHEMA IF NOT EXISTS {schema_name}\").collect()\n", + "session.use_schema(schema_name)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "7fb47093-9b23-492d-a135-5722729a0c7a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[Row(status='DEMO_POOL_CPU already exists, statement succeeded.')]" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Create compute pool\n", + "def create_compute_pool(name: str, instance_family: str, min_nodes: int = 1, max_nodes: int = 10) -> list[Row]:\n", + " query = f\"\"\"\n", + " CREATE COMPUTE POOL IF NOT EXISTS {name}\n", + " MIN_NODES = {min_nodes}\n", + " MAX_NODES = {max_nodes}\n", + " INSTANCE_FAMILY = {instance_family}\n", + " \"\"\"\n", + " return session.sql(query).collect()\n", + "\n", + "compute_pool = \"DEMO_POOL_CPU\"\n", + "create_compute_pool(compute_pool, \"CPU_X64_S\")" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "84804c16-a359-4e5b-9037-207cc9675b75", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[Row(status='LOAN_APPLICATIONS already exists, statement succeeded.')]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Generate synthetic data\n", + "def generate_data(table_name: str, num_rows: int, replace: bool = False) -> list[Row]:\n", + " query = f\"\"\"\n", + " CREATE{\" OR REPLACE\" if replace else \"\"} TABLE{\"\" if replace else \" IF NOT EXISTS\"} {table_name} AS\n", + " SELECT \n", + " ROW_NUMBER() OVER (ORDER BY RANDOM()) as application_id,\n", + " ROUND(NORMAL(40, 10, RANDOM())) as age,\n", + " ROUND(NORMAL(65000, 20000, RANDOM())) as income,\n", + " ROUND(NORMAL(680, 50, RANDOM())) as credit_score,\n", + " ROUND(NORMAL(5, 2, RANDOM())) as employment_length,\n", + " ROUND(NORMAL(25000, 8000, RANDOM())) as loan_amount,\n", + " ROUND(NORMAL(35, 10, RANDOM()), 2) as debt_to_income,\n", + " ROUND(NORMAL(5, 2, RANDOM())) as number_of_credit_lines,\n", + " GREATEST(0, ROUND(NORMAL(1, 1, RANDOM()))) as previous_defaults,\n", + " ARRAY_CONSTRUCT(\n", + " 'home_improvement', 'debt_consolidation', 'business', 'education',\n", + " 'major_purchase', 'medical', 'vehicle', 'other'\n", + " )[UNIFORM(1, 8, RANDOM())] as loan_purpose,\n", + " RANDOM() < 0.15 as is_default,\n", + " TIMEADD(\"MINUTE\", UNIFORM(-525600, 0, RANDOM()), CURRENT_TIMESTAMP()) as created_at\n", + " FROM TABLE(GENERATOR(rowcount => {num_rows}))\n", + " ORDER BY created_at;\n", + " \"\"\"\n", + " return session.sql(query).collect()\n", + "\n", + "table_name = \"loan_applications\"\n", + "generate_data(table_name, 1e5)" + ] + }, + { + "cell_type": "markdown", + "id": "a6783667-50b2-4f41-a72c-3969e431f858", + "metadata": {}, + "source": [ + "### Prepare Model Script" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "f6ba7a8b-a944-4149-a46e-48196350b70e", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import os\n", + "import pickle\n", + "from time import perf_counter\n", + "from typing import Literal, Optional\n", + "\n", + "import pandas as pd\n", + "import xgboost as xgb\n", + "from sklearn.compose import ColumnTransformer\n", + "from sklearn.impute import SimpleImputer\n", + "from sklearn.metrics import accuracy_score, classification_report, roc_auc_score\n", + "from sklearn.model_selection import train_test_split\n", + "from sklearn.pipeline import Pipeline\n", + "from sklearn.preprocessing import OneHotEncoder, StandardScaler\n", + "\n", + "\n", + "from snowflake.ml.data.data_connector import DataConnector\n", + "from snowflake.ml.registry import Registry as ModelRegistry\n", + "from snowflake.ml.utils.connection_params import SnowflakeLoginOptions\n", + "from snowflake.snowpark import Session\n", + "\n", + "\n", + "def create_data_connector(session, table_name: str) -> DataConnector:\n", + " \"\"\"Load data from Snowflake table\"\"\"\n", + " # Example query - modify according to your schema\n", + " query = f\"\"\"\n", + " SELECT\n", + " age,\n", + " income,\n", + " credit_score,\n", + " employment_length,\n", + " loan_amount,\n", + " debt_to_income,\n", + " number_of_credit_lines,\n", + " previous_defaults,\n", + " loan_purpose,\n", + " is_default\n", + " FROM {table_name}\n", + " \"\"\"\n", + " sp_df = session.sql(query)\n", + " return DataConnector.from_dataframe(sp_df)\n", + "\n", + "\n", + "def build_pipeline(**model_params) -> Pipeline:\n", + " \"\"\"Create pipeline with preprocessors and model\"\"\"\n", + " # Define column types\n", + " categorical_cols = [\"LOAN_PURPOSE\"]\n", + " numerical_cols = [\n", + " \"AGE\",\n", + " \"INCOME\",\n", + " \"CREDIT_SCORE\",\n", + " \"EMPLOYMENT_LENGTH\",\n", + " \"LOAN_AMOUNT\",\n", + " \"DEBT_TO_INCOME\",\n", + " \"NUMBER_OF_CREDIT_LINES\",\n", + " \"PREVIOUS_DEFAULTS\",\n", + " ]\n", + "\n", + " # Numerical preprocessing pipeline\n", + " numeric_transformer = Pipeline(\n", + " steps=[\n", + " (\"imputer\", SimpleImputer(strategy=\"median\")),\n", + " (\"scaler\", StandardScaler()),\n", + " ]\n", + " )\n", + "\n", + " # Categorical preprocessing pipeline\n", + " categorical_transformer = Pipeline(\n", + " steps=[\n", + " (\"imputer\", SimpleImputer(strategy=\"constant\", fill_value=\"missing\")),\n", + " (\"onehot\", OneHotEncoder(handle_unknown=\"ignore\", sparse_output=False)),\n", + " ]\n", + " )\n", + "\n", + " # Combine transformers\n", + " preprocessor = ColumnTransformer(\n", + " transformers=[\n", + " (\"num\", numeric_transformer, numerical_cols),\n", + " (\"cat\", categorical_transformer, categorical_cols),\n", + " ]\n", + " )\n", + "\n", + " # Define model parameters\n", + " default_params = {\n", + " \"objective\": \"binary:logistic\",\n", + " \"eval_metric\": \"auc\",\n", + " \"max_depth\": 6,\n", + " \"learning_rate\": 0.1,\n", + " \"n_estimators\": 100,\n", + " \"subsample\": 0.8,\n", + " \"colsample_bytree\": 0.8,\n", + " \"random_state\": 42,\n", + " }\n", + " model = xgb.XGBClassifier(**(model_params or default_params))\n", + "\n", + " return Pipeline([(\"preprocessor\", preprocessor), (\"classifier\", model)])\n", + "\n", + "\n", + "def evaluate_model(model: Pipeline, X_test: pd.DataFrame, y_test: pd.DataFrame):\n", + " \"\"\"Evaluate model performance\"\"\"\n", + " # Make predictions\n", + " y_pred = model.predict(X_test)\n", + " y_pred_proba = model.predict_proba(X_test)[:, 1]\n", + "\n", + " # Calculate metrics\n", + " metrics = {\n", + " \"accuracy\": accuracy_score(y_test, y_pred),\n", + " \"roc_auc\": roc_auc_score(y_test, y_pred_proba),\n", + " \"classification_report\": classification_report(y_test, y_pred),\n", + " }\n", + "\n", + " return metrics\n", + "\n", + "\n", + "def save_to_registry(\n", + " session: Session,\n", + " model: Pipeline,\n", + " model_name: str,\n", + " metrics: dict,\n", + " sample_input_data: pd.DataFrame,\n", + "):\n", + " \"\"\"Save model and artifacts to Snowflake Model Registry\"\"\"\n", + " # Initialize model registry\n", + " registry = ModelRegistry(session)\n", + "\n", + " # Save to registry\n", + " registry.log_model(\n", + " model=model,\n", + " model_name=model_name,\n", + " metrics=metrics,\n", + " sample_input_data=sample_input_data[:5],\n", + " conda_dependencies=[\"xgboost\"],\n", + " )\n", + "\n", + "\n", + "def train(session: Session, source_data: str, save_mode: Literal[\"local\", \"registry\"] = \"local\", output_dir: Optional[str] = None, **kwargs):\n", + " # Load data\n", + " dc = create_data_connector(session, table_name=source_data)\n", + " print(\"Loading data...\", end=\"\", flush=True)\n", + " start = perf_counter()\n", + " df = dc.to_pandas()\n", + " elapsed = perf_counter() - start\n", + " print(f\" done! Loaded {len(df)} rows, elapsed={elapsed:.3f}s\")\n", + "\n", + " # Split data\n", + " X = df.drop(\"IS_DEFAULT\", axis=1)\n", + " y = df[\"IS_DEFAULT\"]\n", + " X_train, X_test, y_train, y_test = train_test_split(\n", + " X, y, test_size=0.2, random_state=42\n", + " )\n", + "\n", + " # Train model\n", + " model = build_pipeline()\n", + " print(\"Training model...\", end=\"\")\n", + " start = perf_counter()\n", + " model.fit(X_train, y_train)\n", + " elapsed = perf_counter() - start\n", + " print(f\" done! Elapsed={elapsed:.3f}s\")\n", + "\n", + " # Evaluate model\n", + " print(\"Evaluating model...\", end=\"\")\n", + " start = perf_counter()\n", + " metrics = evaluate_model(\n", + " model,\n", + " X_test,\n", + " y_test,\n", + " )\n", + " elapsed = perf_counter() - start\n", + " print(f\" done! Elapsed={elapsed:.3f}s\")\n", + "\n", + " # Print evaluation results\n", + " print(\"\\nModel Performance Metrics:\")\n", + " print(f\"Accuracy: {metrics['accuracy']:.4f}\")\n", + " print(f\"ROC AUC: {metrics['roc_auc']:.4f}\")\n", + " # Uncomment below for full classification report\n", + " # print(\"\\nClassification Report:\")\n", + " # print(metrics[\"classification_report\"])\n", + "\n", + " start = perf_counter()\n", + " if save_mode == \"local\":\n", + " # Save model locally\n", + " print(\"Saving model to disk...\", end=\"\")\n", + " output_dir = output_dir or '.'\n", + " model_subdir = os.environ.get(\"SNOWFLAKE_SERVICE_NAME\", \"output\")\n", + " model_dir = os.path.join(output_dir, model_subdir) if not output_dir.endswith(model_subdir) else output_dir\n", + " os.makedirs(model_dir, exist_ok=True)\n", + " with open(os.path.join(model_dir, \"model.pkl\"), \"wb\") as f:\n", + " pickle.dump(model, f)\n", + " with open(os.path.join(model_dir, \"metrics.json\"), \"w\") as f:\n", + " json.dump(metrics, f, indent=2)\n", + " elif save_mode == \"registry\":\n", + " # Save model to registry\n", + " print(\"Logging model to Model Registry...\", end=\"\")\n", + " save_to_registry(\n", + " session,\n", + " model=model,\n", + " model_name=\"loan_default_predictor\",\n", + " metrics=metrics,\n", + " sample_input_data=X_train,\n", + " )\n", + " elapsed = perf_counter() - start\n", + " print(f\" done! Elapsed={elapsed:.3f}s\")" + ] + }, + { + "cell_type": "markdown", + "id": "897e6b37-0cbc-4878-8e06-a6dae60b6249", + "metadata": {}, + "source": [ + "### Run training locally" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "ef1c3106-7e4a-4c7c-82e1-0fd1f6de417f", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "DataConnector.from_dataframe() is in private preview since 1.6.0. Do not use it in production. \n", + "DataConnector.from_sql() is in private preview since 1.7.3. Do not use it in production. \n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Loading data... done! Loaded 100000 rows, elapsed=0.651s\n", + " done! Elapsed=0.374s\n", + "Evaluating model... done! Elapsed=0.042s\n", + "\n", + "Model Performance Metrics:\n", + "Accuracy: 0.4940\n", + "ROC AUC: 0.4957\n", + "Saving model to disk... done! Elapsed=0.002s\n" + ] + } + ], + "source": [ + "train(session, table_name)" + ] + }, + { + "cell_type": "markdown", + "id": "e84a9f41-1113-47f5-9041-a8cf1f904a9f", + "metadata": {}, + "source": [ + "### Train with GPU on remote SPCS instance" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "c2940e5a-4f85-46d9-9b49-aed6ac79fb53", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "remote() is in private preview since 1.7.4. Do not use it in production. \n" + ] + } + ], + "source": [ + "from snowflake.ml.jobs import remote\n", + "\n", + "@remote(compute_pool, stage_name=\"payload_stage\")\n", + "def train_remote(source_data: str, save_mode: str = \"local\", output_dir: str = None):\n", + " # Retrieve session from SPCS service context\n", + " from snowflake.ml.utils import connection_params\n", + " session = Session.builder.configs(connection_params.SnowflakeLoginOptions()).create()\n", + "\n", + " # Run training script\n", + " train(session, source_data, save_mode, output_dir)\n", + "\n", + "train_job = train_remote(table_name)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "3eb1ad66-851f-4fdb-887c-4dcaaf5bf2c5", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "ML_JOB_3D5AA40C_52DF_47AA_A996_C6D5649F1047\n", + "PENDING\n" + ] + } + ], + "source": [ + "print(train_job.id)\n", + "print(train_job.status)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "1909e552-d289-4bf4-8840-926d99295acb", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "MLJob.wait() is in private preview since 1.7.4. Do not use it in production. \n", + "MLJob.show_logs() is in private preview since 1.7.4. Do not use it in production. \n", + "MLJob.get_logs() is in private preview since 1.7.4. Do not use it in production. \n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "'micromamba' is running as a subprocess and can't modify the parent shell.\n", + "Thus you must initialize your shell before using activate and deactivate.\n", + "\n", + "To initialize the current bash shell, run:\n", + " $ eval \"$(micromamba shell hook --shell bash)\"\n", + "and then activate or deactivate with:\n", + " $ micromamba activate\n", + "To automatically initialize all future (bash) shells, run:\n", + " $ micromamba shell init --shell bash --root-prefix=~/micromamba\n", + "If your shell was already initialized, reinitialize your shell with:\n", + " $ micromamba shell reinit --shell bash\n", + "Otherwise, this may be an issue. In the meantime you can run commands. See:\n", + " $ micromamba run --help\n", + "\n", + "Supported shells are {bash, zsh, csh, xonsh, cmd.exe, powershell, fish}.\n", + "Creating log directories...\n", + "+ set -e\n", + "+ echo 'Creating log directories...'\n", + "+ mkdir -p /var/log/managedservices/user/mlrs\n", + "+ mkdir -p /var/log/managedservices/system/mlrs\n", + "+ mkdir -p /var/log/managedservices/system/ray\n", + "+ echo '*/1 * * * * root /etc/ray_copy_cron.sh'\n", + "+ echo ''\n", + "+ chmod 744 /etc/cron.d/ray_copy_cron\n", + "+ service cron start\n", + " * Starting periodic command scheduler cron\n", + " ...done.\n", + "+ mkdir -p /tmp/prometheus-multi-dir\n", + "+ '[' -n /opt/app ']'\n", + "+ cd /opt/app\n", + "+ export PYTHONPATH=/opt/env/site-packages/\n", + "+ PYTHONPATH=/opt/env/site-packages/\n", + "+ MLRS_REQUIREMENTS_FILE=requirements.txt\n", + "+ '[' -f requirements.txt ']'\n", + "+ MLRS_CONDA_ENV_FILE=environment.yml\n", + "+ '[' -f environment.yml ']'\n", + "++ df --output=size --block-size=1 /dev/shm\n", + "++ tail -n 1\n", + "+ shm_size=4294967296\n", + "++ ifconfig eth0\n", + "++ sed -En -e 's/.*inet ([0-9.]+).*/\u0001/p'\n", + "+ eth0Ip=$'\\001'\n", + "+ log_dir=/tmp/ray\n", + "+ '[' -z $'\\001' ']'\n", + "+ common_params=(\"--node-ip-address=$eth0Ip\" \"--object-manager-port=${RAY_OBJECT_MANAGER_PORT:-12011}\" \"--node-manager-port=${RAY_NODE_MANAGER_PORT:-12012}\" \"--runtime-env-agent-port=${RAY_RUNTIME_ENV_AGENT_PORT:-12013}\" \"--dashboard-agent-grpc-port=${RAY_DASHBOARD_AGENT_GRPC_PORT:-12014}\" \"--dashboard-agent-listen-port=${RAY_DASHBOARD_AGENT_LISTEN_PORT:-12015}\" \"--min-worker-port=${RAY_MIN_WORKER_PORT:-12031}\" \"--max-worker-port=${RAY_MAX_WORKER_PORT:-13000}\" \"--metrics-export-port=11502\" \"--temp-dir=$log_dir\" \"--disable-usage-stats\")\n", + "+ head_params=(\"--head\" \"--port=${RAY_HEAD_GCS_PORT:-12001}\" \"--ray-client-server-port=${RAY_HEAD_CLIENT_SERVER_PORT:-10001}\" \"--dashboard-host=${NODE_IP_ADDRESS}\" \"--dashboard-grpc-port=${RAY_HEAD_DASHBOARD_GRPC_PORT:-12002}\" \"--dashboard-port=${DASHBOARD_PORT}\" \"--resources={\\\"node_tag:head\\\":1}\")\n", + "+ echo Running command: python /opt/app/func.py loan_applications\n", + "+ python /opt/app/func.py loan_applications\n", + "Running command: python /opt/app/func.py loan_applications\n", + "+ ray start $'--node-ip-address=\\001' --object-manager-port=12011 --node-manager-port=12012 --runtime-env-agent-port=12013 --dashboard-agent-grpc-port=12014 --dashboard-agent-listen-port=12015 --min-worker-port=12031 --max-worker-port=13000 --metrics-export-port=11502 --temp-dir=/tmp/ray --disable-usage-stats --head --port=12001 --ray-client-server-port=10001 --dashboard-host=0.0.0.0 --dashboard-grpc-port=12002 --dashboard-port=12003 '--resources={\"node_tag:head\":1}'\n", + "+ python -m web.ml_runtime_grpc_server\n", + "SnowflakeLoginOptions() is in private preview since 0.2.0. Do not use it in production. \n", + "DataConnector.from_dataframe() is in private preview since 1.6.0. Do not use it in production. \n", + "[2025-01-13 23:25:19,059 E 44 44] gcs_rpc_client.h:179: Failed to connect to GCS at address \u0001:12001 within 5 seconds.\n", + "2025-01-13 23:25:19,317\tINFO worker.py:1777 -- Started a local Ray instance. View the dashboard at \u001b[1m\u001b[32m127.0.0.1:8265 \u001b[39m\u001b[22m\n", + "2025-01-13 23:25:21,147\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-13_23-25-18_052965_46/logs/ray-data\n", + "2025-01-13 23:25:21,147\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadResultSetDataSource]\n", + "Running 0: 0.00 row [00:00, ? row/s]\n", + "Running. Resources: 3/3 CPU, 0/0 GPU, 768.0MB/1.9GB object_store_memory (pending: 0 CPU, 0 GPU): : 0.00 row [00:01, ? row/s]\n", + "- ReadResultSetDataSource: 3 active, 197 queued 🚧, [cpu: 3.0, objects: 768.0MB]: : 0.00 row [00:01, ? row/s]\u001b[A\n", + " \n", + "\u001b[A\u001b[36m(ReadResultSetDataSource pid=236)\u001b[0m SnowflakeLoginOptions() is in private preview since 0.2.0. Do not use it in production. \n", + "Running. Resources: 3/3 CPU, 0/0 GPU, 768.0MB/1.9GB object_store_memory (pending: 0 CPU, 0 GPU): : 0.00 row [00:01, ? row/s]\n", + "Running. Resources: 2/3 CPU, 0/0 GPU, 256.0MB/1.9GB object_store_memory (pending: 0 CPU, 0 GPU): : 0.00 row [00:02, ? row/s]\n", + "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:02, ? row/s]\u001b[A\n", + "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:02, ? row/s]\u001b[A\n", + " \n", + "✔️ Dataset execution finished in 2.16 seconds: : 0.00 row [00:02, ? row/s] \n", + "\n", + "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:02, ? row/s]\u001b[A\n", + "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:02, ? row/s]\u001b[A\n", + "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:02, ? row/s]\n", + "2025-01-13 23:25:23,316\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-13_23-25-18_052965_46/logs/ray-data\n", + "2025-01-13 23:25:23,316\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadResultSetDataSource]\n", + "Running 0: 0.00 row [00:00, ? row/s]\n", + "Running. Resources: 1/3 CPU, 0/0 GPU, 269.3KB/1.9GB object_store_memory (pending: 0 CPU, 0 GPU): 7%|▋ | 98.7k/1.32M [00:01<00:13, 93.1k row/s]\n", + "- ReadResultSetDataSource: 3 active, 180 queued 🚧, [cpu: 3.0, objects: 808.0KB]: : 0.00 row [00:01, ? row/s]\u001b[A\n", + "- ReadResultSetDataSource: 3 active, 180 queued 🚧, [cpu: 3.0, objects: 808.0KB]: 0%| | 0.00/1.04M [00:01=1.7.4 +jupyter +xgboost \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/evaluate.py b/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/evaluate.py new file mode 100644 index 0000000..1f359f3 --- /dev/null +++ b/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/evaluate.py @@ -0,0 +1,65 @@ +import json +import os +import pickle +from typing import Any, Dict +from time import perf_counter +import fsspec + +from snowflake.snowpark import Session + +from model_utils import create_data_connector, evaluate_model + +def load_model(session: Session, model_path: str) -> Any: + if model_path.startswith("@"): + model_path = "sfc://" + model_path + with fsspec.open(model_path, snowpark_session=session) as f: + model = pickle.load(f) + return model + +def do_eval(session: Session, source_data: str, model_path: str) -> Dict[str, Any]: + # Load data + dc = create_data_connector(session, table_name=source_data) + print("Loading data...", end="", flush=True) + start = perf_counter() + df = dc.to_pandas() + elapsed = perf_counter() - start + print(f" done! Loaded {len(df)} rows, elapsed={elapsed:.3f}s") + + # Split data + X = df.drop("IS_DEFAULT", axis=1) + y = df["IS_DEFAULT"] + + # Load model + print("Loading model...", end="", flush=True) + start = perf_counter() + with open(os.path.join(model_path, "model.pkl"), "rb") as f: + model = pickle.load(f) + elapsed = perf_counter() - start + print(f" done! Elapsed={elapsed:.3f}s") + + # Run evaluation + print("Running evaluation...", end="", flush=True) + start = perf_counter() + metrics = evaluate_model(model, X, y) + elapsed = perf_counter() - start + print(f" done! Elapsed={elapsed:.3f}s") + + print(json.dumps(metrics, indent=2)) + return metrics + + +if __name__ == "__main__": + import argparse + from snowflake.ml.utils.connection_params import SnowflakeLoginOptions + + parser = argparse.ArgumentParser() + parser.add_argument( + "--source_data", default="loan_applications", help="Name of input data table" + ) + parser.add_argument( + "--model_path", type=str, help="Path to model file(s)", + ) + args = parser.parse_args() + + session = Session.builder.configs(SnowflakeLoginOptions()).create() + do_eval(session, **vars(args)) diff --git a/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/main.py b/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/main.py new file mode 100644 index 0000000..d2de95d --- /dev/null +++ b/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/main.py @@ -0,0 +1,143 @@ +import json +import os +import pickle +import cloudpickle as cp +from time import perf_counter + +from sklearn.model_selection import train_test_split +from snowflake.ml.utils.connection_params import SnowflakeLoginOptions +from snowflake.ml import jobs +from snowflake.snowpark import Session + +import model_utils +cp.register_pickle_by_value(model_utils) + +COMPUTE_POOL = "DEMO_POOL_CPU" + +@jobs.remote(COMPUTE_POOL, stage_name="payload_stage") +def train_model(source_data: str, save_mode: str = "local", output_dir: str = None): + # Initialize Snowflake session + # See https://docs.snowflake.com/developer-guide/snowflake-cli/connecting/configure-connections#define-connections + # for how to define default connections in a config.toml file + session = Session.builder.configs(SnowflakeLoginOptions()).create() + + # Load data + dc = model_utils.create_data_connector(session, table_name=source_data) + print("Loading data...", end="", flush=True) + start = perf_counter() + df = dc.to_pandas() + elapsed = perf_counter() - start + print(f" done! Loaded {len(df)} rows, elapsed={elapsed:.3f}s") + + # Split data + X = df.drop("IS_DEFAULT", axis=1) + y = df["IS_DEFAULT"] + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42 + ) + + # Train model + model = model_utils.build_pipeline() + print("Training model...", end="") + start = perf_counter() + model.fit(X_train, y_train) + elapsed = perf_counter() - start + print(f" done! Elapsed={elapsed:.3f}s") + + # Evaluate model + print("Evaluating model...", end="") + start = perf_counter() + metrics = model_utils.evaluate_model( + model, + X_test, + y_test, + ) + elapsed = perf_counter() - start + print(f" done! Elapsed={elapsed:.3f}s") + + # Print evaluation results + print("\nModel Performance Metrics:") + print(f"Accuracy: {metrics['accuracy']:.4f}") + print(f"ROC AUC: {metrics['roc_auc']:.4f}") + # Uncomment below for full classification report + # print("\nClassification Report:") + # print(metrics["classification_report"]) + + start = perf_counter() + if save_mode == "local": + # Save model locally + print("Saving model to disk...", end="") + output_dir = output_dir or os.path.dirname(__file__) + model_subdir = os.environ.get("SNOWFLAKE_SERVICE_NAME", "output") + model_dir = os.path.join(output_dir, model_subdir) if not output_dir.endswith(model_subdir) else output_dir + os.makedirs(model_dir, exist_ok=True) + with open(os.path.join(model_dir, "model.pkl"), "wb") as f: + pickle.dump(model, f) + with open(os.path.join(model_dir, "metrics.json"), "w") as f: + json.dump(metrics, f, indent=2) + elif save_mode == "registry": + # Save model to registry + print("Logging model to Model Registry...", end="") + model_utils.save_to_registry( + session, + model=model, + model_name="loan_default_predictor", + metrics=metrics, + sample_input_data=X_train, + ) + elapsed = perf_counter() - start + print(f" done! Elapsed={elapsed:.3f}s") + + # Close Snowflake session + session.close() + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + "--source_data", default="loan_applications", help="Name of input data table" + ) + parser.add_argument( + "--save_mode", + choices=["local", "registry"], + default="local", + help="Model save mode", + ) + parser.add_argument( + "--output_dir", type=str, help="Local save path. Only relevant if save_mode=local" + ) + args = parser.parse_args() + + # We need a Snowflake session to submit jobs + session = Session.builder.configs(SnowflakeLoginOptions("preprod8")).create() + + # Kick off training using decorated function + job1 = train_model(**vars(args)) + print("Submitted job from decorated function:", job1.id) + + # Alternatively, submit job from files using job manager + arg_list = ["--source_data", args.source_data, "--save_mode", args.save_mode] + if args.output_dir: + arg_list.extend(["--output_dir", args.output_dir]) + job2 = jobs.submit_directory( + os.path.realpath(os.path.dirname(__file__)), + compute_pool=COMPUTE_POOL, + stage_name="payload_stage", + entrypoint="train.py", + args=arg_list, + session=session, + ) + print("Submitted job from files using job manager:", job2.id) + + # Show job results on completion + job1.wait() + print("============= Job 1 =============") + job1.show_logs() + print("============= End job 1 =============") + + job2.wait() + print("============= Job 2 =============") + job2.show_logs() + print("============= End job 2 =============") diff --git a/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/model_utils.py b/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/model_utils.py new file mode 100644 index 0000000..1b60c07 --- /dev/null +++ b/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/model_utils.py @@ -0,0 +1,123 @@ +import pandas as pd +import xgboost as xgb +from sklearn.compose import ColumnTransformer +from sklearn.impute import SimpleImputer +from sklearn.metrics import accuracy_score, classification_report, roc_auc_score +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import OneHotEncoder, StandardScaler +from snowflake.ml.data.data_connector import DataConnector +from snowflake.ml.registry import Registry as ModelRegistry +from snowflake.snowpark import Session + + +def create_data_connector(session, table_name: str) -> DataConnector: + """Load data from Snowflake table""" + # Example query - modify according to your schema + query = f""" + SELECT + age, + income, + credit_score, + employment_length, + loan_amount, + debt_to_income, + number_of_credit_lines, + previous_defaults, + loan_purpose, + is_default + FROM {table_name} + """ + sp_df = session.sql(query) + return DataConnector.from_dataframe(sp_df) + + +def build_pipeline(model_params: dict = None) -> Pipeline: + """Create pipeline with preprocessors and model""" + # Define column types + categorical_cols = ["LOAN_PURPOSE"] + numerical_cols = [ + "AGE", + "INCOME", + "CREDIT_SCORE", + "EMPLOYMENT_LENGTH", + "LOAN_AMOUNT", + "DEBT_TO_INCOME", + "NUMBER_OF_CREDIT_LINES", + "PREVIOUS_DEFAULTS", + ] + + # Numerical preprocessing pipeline + numeric_transformer = Pipeline( + steps=[ + ("imputer", SimpleImputer(strategy="median")), + ("scaler", StandardScaler()), + ] + ) + + # Categorical preprocessing pipeline + categorical_transformer = Pipeline( + steps=[ + ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), + ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), + ] + ) + + # Combine transformers + preprocessor = ColumnTransformer( + transformers=[ + ("num", numeric_transformer, numerical_cols), + ("cat", categorical_transformer, categorical_cols), + ] + ) + + # Define model parameters + default_params = { + "objective": "binary:logistic", + "eval_metric": "auc", + "max_depth": 6, + "learning_rate": 0.1, + "n_estimators": 100, + "subsample": 0.8, + "colsample_bytree": 0.8, + "random_state": 42, + } + model = xgb.XGBClassifier(**(model_params or default_params)) + + return Pipeline([("preprocessor", preprocessor), ("classifier", model)]) + + +def evaluate_model(model: Pipeline, X_test: pd.DataFrame, y_test: pd.DataFrame): + """Evaluate model performance""" + # Make predictions + y_pred = model.predict(X_test) + y_pred_proba = model.predict_proba(X_test)[:, 1] + + # Calculate metrics + metrics = { + "accuracy": accuracy_score(y_test, y_pred), + "roc_auc": roc_auc_score(y_test, y_pred_proba), + "classification_report": classification_report(y_test, y_pred), + } + + return metrics + + +def save_to_registry( + session: Session, + model: Pipeline, + model_name: str, + metrics: dict, + sample_input_data: pd.DataFrame, +): + """Save model and artifacts to Snowflake Model Registry""" + # Initialize model registry + registry = ModelRegistry(session) + + # Save to registry + registry.log_model( + model=model, + model_name=model_name, + metrics=metrics, + sample_input_data=sample_input_data[:5], + conda_dependencies=["xgboost"], + ) \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/prepare_data.py b/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/prepare_data.py new file mode 100644 index 0000000..da0fd1e --- /dev/null +++ b/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/prepare_data.py @@ -0,0 +1,44 @@ +from snowflake import snowpark + +def generate_data(session: snowpark.Session, table_name: str, num_rows: int, overwrite: bool = False) -> list[snowpark.Row]: + query = f""" + CREATE{" OR REPLACE" if overwrite else ""} TABLE{"" if overwrite else " IF NOT EXISTS"} {table_name} AS + SELECT + ROW_NUMBER() OVER (ORDER BY RANDOM()) as application_id, + ROUND(NORMAL(40, 10, RANDOM())) as age, + ROUND(NORMAL(65000, 20000, RANDOM())) as income, + ROUND(NORMAL(680, 50, RANDOM())) as credit_score, + ROUND(NORMAL(5, 2, RANDOM())) as employment_length, + ROUND(NORMAL(25000, 8000, RANDOM())) as loan_amount, + ROUND(NORMAL(35, 10, RANDOM()), 2) as debt_to_income, + ROUND(NORMAL(5, 2, RANDOM())) as number_of_credit_lines, + GREATEST(0, ROUND(NORMAL(1, 1, RANDOM()))) as previous_defaults, + ARRAY_CONSTRUCT( + 'home_improvement', 'debt_consolidation', 'business', 'education', + 'major_purchase', 'medical', 'vehicle', 'other' + )[UNIFORM(1, 8, RANDOM())] as loan_purpose, + RANDOM() < 0.15 as is_default, + TIMEADD("MINUTE", UNIFORM(-525600, 0, RANDOM()), CURRENT_TIMESTAMP()) as created_at + FROM TABLE(GENERATOR(rowcount => {num_rows})) + ORDER BY created_at; + """ + return session.sql(query).collect() + +if __name__ == "__main__": + import argparse + from snowflake.ml.utils.connection_params import SnowflakeLoginOptions + + parser = argparse.ArgumentParser() + parser.add_argument( + "--table_name", default="loan_applications", help="Name of input data table" + ) + parser.add_argument( + "-n", "--num_rows", type=int, default=10000, help="Number of rows to generate" + ) + parser.add_argument( + "--overwrite", action="store_true", help="Overwrite table if it already exists" + ) + args = parser.parse_args() + + session = snowpark.Session.builder.configs(SnowflakeLoginOptions()).create() + generate_data(session, **vars(args)) \ No newline at end of file diff --git a/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/train.py b/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/train.py new file mode 100644 index 0000000..5bb328e --- /dev/null +++ b/samples/ml/ml_jobs/single-node/xgb-loan-apps/src/train.py @@ -0,0 +1,107 @@ +import json +import os +import pickle +from time import perf_counter + +from sklearn.model_selection import train_test_split +from snowflake.ml.utils.connection_params import SnowflakeLoginOptions +from snowflake.snowpark import Session + +from model_utils import create_data_connector, build_pipeline, evaluate_model, save_to_registry + +def do_train(source_data: str, save_mode: str = "local", output_dir: str = None): + # Initialize Snowflake session + # See https://docs.snowflake.com/developer-guide/snowflake-cli/connecting/configure-connections#define-connections + # for how to define default connections in a config.toml file + session = Session.builder.configs(SnowflakeLoginOptions()).create() + + # Load data + dc = create_data_connector(session, table_name=source_data) + print("Loading data...", end="", flush=True) + start = perf_counter() + df = dc.to_pandas() + elapsed = perf_counter() - start + print(f" done! Loaded {len(df)} rows, elapsed={elapsed:.3f}s") + + # Split data + X = df.drop("IS_DEFAULT", axis=1) + y = df["IS_DEFAULT"] + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42 + ) + + # Train model + model = build_pipeline() + print("Training model...", end="") + start = perf_counter() + model.fit(X_train, y_train) + elapsed = perf_counter() - start + print(f" done! Elapsed={elapsed:.3f}s") + + # Evaluate model + print("Evaluating model...", end="") + start = perf_counter() + metrics = evaluate_model( + model, + X_test, + y_test, + ) + elapsed = perf_counter() - start + print(f" done! Elapsed={elapsed:.3f}s") + + # Print evaluation results + print("\nModel Performance Metrics:") + print(f"Accuracy: {metrics['accuracy']:.4f}") + print(f"ROC AUC: {metrics['roc_auc']:.4f}") + # Uncomment below for full classification report + # print("\nClassification Report:") + # print(metrics["classification_report"]) + + start = perf_counter() + if save_mode == "local": + # Save model locally + print("Saving model to disk...", end="") + output_dir = output_dir or os.path.dirname(__file__) + model_subdir = os.environ.get("SNOWFLAKE_SERVICE_NAME", "output") + model_dir = os.path.join(output_dir, model_subdir) if not output_dir.endswith(model_subdir) else output_dir + os.makedirs(model_dir, exist_ok=True) + with open(os.path.join(model_dir, "model.pkl"), "wb") as f: + pickle.dump(model, f) + with open(os.path.join(model_dir, "metrics.json"), "w") as f: + json.dump(metrics, f, indent=2) + elif save_mode == "registry": + # Save model to registry + print("Logging model to Model Registry...", end="") + save_to_registry( + session, + model=model, + model_name="loan_default_predictor", + metrics=metrics, + sample_input_data=X_train, + ) + elapsed = perf_counter() - start + print(f" done! Elapsed={elapsed:.3f}s") + + # Close Snowflake session + session.close() + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + "--source_data", default="loan_applications", help="Name of input data table" + ) + parser.add_argument( + "--save_mode", + choices=["local", "registry"], + default="local", + help="Model save mode", + ) + parser.add_argument( + "--output_dir", type=str, help="Local save path. Only relevant if save_mode=local" + ) + args = parser.parse_args() + + do_train(**vars(args)) From ab2d0a4762a33fcacb416d73e51ddfdce0962ef8 Mon Sep 17 00:00:00 2001 From: David Hung Date: Tue, 28 Jan 2025 11:12:13 -0800 Subject: [PATCH 2/5] Update Airflow sample --- .../airflow/single_node_xgb_dag_example.py | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/samples/ml/ml_jobs/single-node/xgb-loan-apps/airflow/single_node_xgb_dag_example.py b/samples/ml/ml_jobs/single-node/xgb-loan-apps/airflow/single_node_xgb_dag_example.py index 2f5ab8d..d420db6 100644 --- a/samples/ml/ml_jobs/single-node/xgb-loan-apps/airflow/single_node_xgb_dag_example.py +++ b/samples/ml/ml_jobs/single-node/xgb-loan-apps/airflow/single_node_xgb_dag_example.py @@ -1,5 +1,5 @@ import os -from datetime import datetime, timedelta, UTC +from datetime import datetime, timedelta, timezone from airflow.decorators import dag, task from airflow.exceptions import AirflowException @@ -11,7 +11,7 @@ dag_id="single_node_xgb_dag_example", dag_display_name="Single Node XGBoost Example DAG", schedule=timedelta(weeks=1), - start_date=datetime(2024, 12, 15, tzinfo=UTC), + start_date=datetime(2024, 12, 15, tzinfo=timezone.utc), catchup=False, ) def single_node_xgb_dag_example(): @@ -27,7 +27,7 @@ def prepare_data(): _PAYLOAD_SOURCE, entrypoint="prepare_data.py", args=["--table_name", "HEADLESS_DEMO_DB.DAG_DEMO.DATA_TABLE", "--num_rows", "100000"], - compute_pool="SYSTEM_COMPUTE_POOL_CPU", + compute_pool="DEMO_POOL_CPU", stage_name="HEADLESS_DEMO_DB.DAG_DEMO.PAYLOAD_STAGE", ) @@ -40,15 +40,15 @@ def prepare_data(): return job.id @task.snowpark() - def start_training_job(run_id: str): + def start_training_job(model_id: str): job = jobs.submit_directory( _PAYLOAD_SOURCE, entrypoint="train.py", args=[ "--source_data", "HEADLESS_DEMO_DB.DAG_DEMO.DATA_TABLE", - "--output_dir", f"@HEADLESS_DEMO_DB.DAG_DEMO.MODELS/{run_id}", + "--output_dir", f"@HEADLESS_DEMO_DB.DAG_DEMO.MODELS/{model_id}", ], - compute_pool="SYSTEM_COMPUTE_POOL_GPU", + compute_pool="DEMO_POOL_CPU", stage_name="HEADLESS_DEMO_DB.DAG_DEMO.PAYLOAD_STAGE", # num_instances=4, # Multi-node not supported in PrPr ) @@ -58,29 +58,30 @@ def start_training_job(run_id: str): @task.snowpark() def wait_for_completion(job_id: str): job = jobs.get_job(job_id) - job.wait() - if job.status == "DONE": + status = job.wait() + if status == "DONE": print("Job completed. Logs:\n%s" % job.get_logs()) - elif job.status == "FAILED": + return + elif status == "FAILED": raise AirflowException("Job failed. Logs:\n%s" % job.get_logs()) - raise AirflowException("Invalid job status %s. Logs:\n%s" % (job.status, job.get_logs())) + raise AirflowException("Invalid job status %s. Logs:\n%s" % (status, job.get_logs())) @task.snowpark() - def evaluate_model(run_id: str): + def evaluate_model(model_id: str): # Run eval job to completion and retrieve result job = jobs.submit_directory( _PAYLOAD_SOURCE, entrypoint="evaluate.py", - args=["--model_path", f"@HEADLESS_DEMO_DB.DAG_DEMO.MODELS/{run_id}", "--source_data", "HEADLESS_DEMO_DB.DAG_DEMO.DATA_TABLE"], - compute_pool="SYSTEM_COMPUTE_POOL_GPU", + args=["--model_path", f"@HEADLESS_DEMO_DB.DAG_DEMO.MODELS/{model_id}", "--source_data", "HEADLESS_DEMO_DB.DAG_DEMO.DATA_TABLE"], + compute_pool="DEMO_POOL_CPU", stage_name="HEADLESS_DEMO_DB.DAG_DEMO.PAYLOAD_STAGE", ) job.wait() job.show_logs() - run_id = prepare_data() - job_id = start_training_job(run_id) - wait_for_completion(job_id) >> evaluate_model(run_id) + model_id = prepare_data() + job_id = start_training_job(model_id) + wait_for_completion(job_id) >> evaluate_model(model_id) single_node_xgb_dag_example() From cf2a9b3e707be441e9d256357712d6f04c249b1e Mon Sep 17 00:00:00 2001 From: David Hung Date: Tue, 28 Jan 2025 11:12:27 -0800 Subject: [PATCH 3/5] Make sample pool name consistent --- samples/ml/ml_jobs/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/ml/ml_jobs/README.md b/samples/ml/ml_jobs/README.md index 5f6ba37..e179527 100644 --- a/samples/ml/ml_jobs/README.md +++ b/samples/ml/ml_jobs/README.md @@ -35,7 +35,7 @@ ALTER ACCOUNT SET ENABLE_SNOWSERVICES_ASYNC_JOBS = TRUE; Create a compute pool if you don't already have one ready to use. ```sql -CREATE COMPUTE POOL IF NOT EXISTS HEADLESS_JOB_POOL -- Customize as desired +CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL -- Customize as desired MIN_NODES = 1 MAX_NODES = 1 -- Increase if more concurrency desired INSTANCE_FAMILY = CPU_X64_S -- See https://docs.snowflake.com/en/sql-reference/sql/create-compute-pool From 2cf6e1608159f817fad4c46a8d565b3066f9a587 Mon Sep 17 00:00:00 2001 From: David Hung Date: Tue, 28 Jan 2025 15:40:54 -0800 Subject: [PATCH 4/5] Update README --- samples/ml/ml_jobs/README.md | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/samples/ml/ml_jobs/README.md b/samples/ml/ml_jobs/README.md index e179527..7951e57 100644 --- a/samples/ml/ml_jobs/README.md +++ b/samples/ml/ml_jobs/README.md @@ -1,4 +1,18 @@ -# Headless Container Runtime Jobs +# ML Jobs (PrPr) + +Snowflake ML Jobs enables you to run machine learning workloads inside Snowflake +[ML Container Runtimes](https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-runtime-ml) +from any environment. This solution allows you to: + +- Leverage GPU and high-memory CPU instances for resource-intensive tasks +- Use your preferred development environment (VS Code, external notebooks, etc.) +- Maintain flexibility with custom dependencies and packages +- (Coming soon) Scale workloads across multiple nodes effortlessly + +Whether you're looking to productionize your ML workflows or prefer working in +your own development environment, Snowflake ML Jobs provides the same powerful +capabilities available in Snowflake Notebooks in a more flexible, integration-friendly +format. ## Setup @@ -7,9 +21,6 @@ The Runtime Job API (`snowflake.ml.jobs`) API is available in ```bash pip install snowflake-ml-python>=1.7.4 - -# Alternative: Install from private build/wheel file -pip install snowflake_ml_python-1.7.4a20240117-py3-none-any.whl ``` > NOTE: The Runtime Job API currently only supports Python 3.10. From 408e185da0d2e9812fc2cbb03880e9fd00c0ffe5 Mon Sep 17 00:00:00 2001 From: David Hung Date: Tue, 28 Jan 2025 15:41:40 -0800 Subject: [PATCH 5/5] Update notebook --- .../jupyter/Headless_Runtime_Demo.ipynb | 137 +++++++----------- 1 file changed, 52 insertions(+), 85 deletions(-) diff --git a/samples/ml/ml_jobs/single-node/xgb-loan-apps/jupyter/Headless_Runtime_Demo.ipynb b/samples/ml/ml_jobs/single-node/xgb-loan-apps/jupyter/Headless_Runtime_Demo.ipynb index 0dccb3d..7d747d0 100644 --- a/samples/ml/ml_jobs/single-node/xgb-loan-apps/jupyter/Headless_Runtime_Demo.ipynb +++ b/samples/ml/ml_jobs/single-node/xgb-loan-apps/jupyter/Headless_Runtime_Demo.ipynb @@ -370,13 +370,13 @@ "name": "stdout", "output_type": "stream", "text": [ - "Loading data... done! Loaded 100000 rows, elapsed=0.651s\n", - " done! Elapsed=0.374s\n", - "Evaluating model... done! Elapsed=0.042s\n", + "Loading data... done! Loaded 100000 rows, elapsed=0.670s\n", + "Training model... done! Elapsed=0.364s\n", + "Evaluating model... done! Elapsed=0.043s\n", "\n", "Model Performance Metrics:\n", - "Accuracy: 0.4940\n", - "ROC AUC: 0.4957\n", + "Accuracy: 0.5033\n", + "ROC AUC: 0.5002\n", "Saving model to disk... done! Elapsed=0.002s\n" ] } @@ -390,7 +390,7 @@ "id": "e84a9f41-1113-47f5-9041-a8cf1f904a9f", "metadata": {}, "source": [ - "### Train with GPU on remote SPCS instance" + "### Train with remote SPCS instance\n" ] }, { @@ -432,7 +432,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "ML_JOB_3D5AA40C_52DF_47AA_A996_C6D5649F1047\n", + "MLJOB_CFECAC29_2E68_4754_B4F0_B0B35452A642\n", "PENDING\n" ] } @@ -478,86 +478,53 @@ "\n", "Supported shells are {bash, zsh, csh, xonsh, cmd.exe, powershell, fish}.\n", "Creating log directories...\n", - "+ set -e\n", - "+ echo 'Creating log directories...'\n", - "+ mkdir -p /var/log/managedservices/user/mlrs\n", - "+ mkdir -p /var/log/managedservices/system/mlrs\n", - "+ mkdir -p /var/log/managedservices/system/ray\n", - "+ echo '*/1 * * * * root /etc/ray_copy_cron.sh'\n", - "+ echo ''\n", - "+ chmod 744 /etc/cron.d/ray_copy_cron\n", - "+ service cron start\n", " * Starting periodic command scheduler cron\n", " ...done.\n", - "+ mkdir -p /tmp/prometheus-multi-dir\n", - "+ '[' -n /opt/app ']'\n", - "+ cd /opt/app\n", - "+ export PYTHONPATH=/opt/env/site-packages/\n", - "+ PYTHONPATH=/opt/env/site-packages/\n", - "+ MLRS_REQUIREMENTS_FILE=requirements.txt\n", - "+ '[' -f requirements.txt ']'\n", - "+ MLRS_CONDA_ENV_FILE=environment.yml\n", - "+ '[' -f environment.yml ']'\n", - "++ df --output=size --block-size=1 /dev/shm\n", - "++ tail -n 1\n", - "+ shm_size=4294967296\n", - "++ ifconfig eth0\n", - "++ sed -En -e 's/.*inet ([0-9.]+).*/\u0001/p'\n", - "+ eth0Ip=$'\\001'\n", - "+ log_dir=/tmp/ray\n", - "+ '[' -z $'\\001' ']'\n", - "+ common_params=(\"--node-ip-address=$eth0Ip\" \"--object-manager-port=${RAY_OBJECT_MANAGER_PORT:-12011}\" \"--node-manager-port=${RAY_NODE_MANAGER_PORT:-12012}\" \"--runtime-env-agent-port=${RAY_RUNTIME_ENV_AGENT_PORT:-12013}\" \"--dashboard-agent-grpc-port=${RAY_DASHBOARD_AGENT_GRPC_PORT:-12014}\" \"--dashboard-agent-listen-port=${RAY_DASHBOARD_AGENT_LISTEN_PORT:-12015}\" \"--min-worker-port=${RAY_MIN_WORKER_PORT:-12031}\" \"--max-worker-port=${RAY_MAX_WORKER_PORT:-13000}\" \"--metrics-export-port=11502\" \"--temp-dir=$log_dir\" \"--disable-usage-stats\")\n", - "+ head_params=(\"--head\" \"--port=${RAY_HEAD_GCS_PORT:-12001}\" \"--ray-client-server-port=${RAY_HEAD_CLIENT_SERVER_PORT:-10001}\" \"--dashboard-host=${NODE_IP_ADDRESS}\" \"--dashboard-grpc-port=${RAY_HEAD_DASHBOARD_GRPC_PORT:-12002}\" \"--dashboard-port=${DASHBOARD_PORT}\" \"--resources={\\\"node_tag:head\\\":1}\")\n", - "+ echo Running command: python /opt/app/func.py loan_applications\n", - "+ python /opt/app/func.py loan_applications\n", "Running command: python /opt/app/func.py loan_applications\n", - "+ ray start $'--node-ip-address=\\001' --object-manager-port=12011 --node-manager-port=12012 --runtime-env-agent-port=12013 --dashboard-agent-grpc-port=12014 --dashboard-agent-listen-port=12015 --min-worker-port=12031 --max-worker-port=13000 --metrics-export-port=11502 --temp-dir=/tmp/ray --disable-usage-stats --head --port=12001 --ray-client-server-port=10001 --dashboard-host=0.0.0.0 --dashboard-grpc-port=12002 --dashboard-port=12003 '--resources={\"node_tag:head\":1}'\n", - "+ python -m web.ml_runtime_grpc_server\n", "SnowflakeLoginOptions() is in private preview since 0.2.0. Do not use it in production. \n", "DataConnector.from_dataframe() is in private preview since 1.6.0. Do not use it in production. \n", - "[2025-01-13 23:25:19,059 E 44 44] gcs_rpc_client.h:179: Failed to connect to GCS at address \u0001:12001 within 5 seconds.\n", - "2025-01-13 23:25:19,317\tINFO worker.py:1777 -- Started a local Ray instance. View the dashboard at \u001b[1m\u001b[32m127.0.0.1:8265 \u001b[39m\u001b[22m\n", - "2025-01-13 23:25:21,147\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-13_23-25-18_052965_46/logs/ray-data\n", - "2025-01-13 23:25:21,147\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadResultSetDataSource]\n", + "2025-01-28 19:05:11,114\tINFO worker.py:1777 -- Started a local Ray instance. View the dashboard at \u001b[1m\u001b[32m127.0.0.1:8265 \u001b[39m\u001b[22m\n", + "[2025-01-28 19:05:11,719 E 44 44] gcs_rpc_client.h:179: Failed to connect to GCS at address \u0001:12001 within 5 seconds.\n", + "2025-01-28 19:05:12,988\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-28_19-05-09_910308_46/logs/ray-data\n", + "2025-01-28 19:05:12,988\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadResultSetDataSource]\n", "Running 0: 0.00 row [00:00, ? row/s]\n", "Running. Resources: 3/3 CPU, 0/0 GPU, 768.0MB/1.9GB object_store_memory (pending: 0 CPU, 0 GPU): : 0.00 row [00:01, ? row/s]\n", "- ReadResultSetDataSource: 3 active, 197 queued 🚧, [cpu: 3.0, objects: 768.0MB]: : 0.00 row [00:01, ? row/s]\u001b[A\n", " \n", - "\u001b[A\u001b[36m(ReadResultSetDataSource pid=236)\u001b[0m SnowflakeLoginOptions() is in private preview since 0.2.0. Do not use it in production. \n", + "\u001b[A\u001b[36m(ReadResultSetDataSource pid=234)\u001b[0m SnowflakeLoginOptions() is in private preview since 0.2.0. Do not use it in production. \n", "Running. Resources: 3/3 CPU, 0/0 GPU, 768.0MB/1.9GB object_store_memory (pending: 0 CPU, 0 GPU): : 0.00 row [00:01, ? row/s]\n", - "Running. Resources: 2/3 CPU, 0/0 GPU, 256.0MB/1.9GB object_store_memory (pending: 0 CPU, 0 GPU): : 0.00 row [00:02, ? row/s]\n", - "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:02, ? row/s]\u001b[A\n", - "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:02, ? row/s]\u001b[A\n", " \n", - "✔️ Dataset execution finished in 2.16 seconds: : 0.00 row [00:02, ? row/s] \n", + "✔️ Dataset execution finished in 2.07 seconds: : 0.00 row [00:02, ? row/s] \n", "\n", - "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:02, ? row/s]\u001b[A\n", - "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:02, ? row/s]\u001b[A\n", - "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:02, ? row/s]\n", - "2025-01-13 23:25:23,316\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-13_23-25-18_052965_46/logs/ray-data\n", - "2025-01-13 23:25:23,316\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadResultSetDataSource]\n", + "- ReadResultSetDataSource: 3 active, 197 queued 🚧, [cpu: 3.0, objects: 768.0MB]: : 0.00 row [00:01, ? row/s]\u001b[A\n", + "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:01, ? row/s]\u001b[A\n", + "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:01, ? row/s]\u001b[A\n", + "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:01, ? row/s]\u001b[A\n", + "- ReadResultSetDataSource: 3 active, 196 queued 🚧, [cpu: 3.0, objects: 384.0MB]: : 0.00 row [00:01, ? row/s]\n", + "2025-01-28 19:05:15,063\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-28_19-05-09_910308_46/logs/ray-data\n", + "2025-01-28 19:05:15,063\tINFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadResultSetDataSource]\n", "Running 0: 0.00 row [00:00, ? row/s]\n", - "Running. Resources: 1/3 CPU, 0/0 GPU, 269.3KB/1.9GB object_store_memory (pending: 0 CPU, 0 GPU): 7%|▋ | 98.7k/1.32M [00:01<00:13, 93.1k row/s]\n", - "- ReadResultSetDataSource: 3 active, 180 queued 🚧, [cpu: 3.0, objects: 808.0KB]: : 0.00 row [00:01, ? row/s]\u001b[A\n", - "- ReadResultSetDataSource: 3 active, 180 queued 🚧, [cpu: 3.0, objects: 808.0KB]: 0%| | 0.00/1.04M [00:01