Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add samples for Headless PrPr (single node) #156

Merged
merged 5 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 271 additions & 0 deletions samples/ml/ml_jobs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
# ML Jobs (PrPr)

Snowflake ML Jobs enables you to run machine learning workloads inside Snowflake
[ML Container Runtimes](https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-runtime-ml)
from any environment. This solution allows you to:

- Leverage GPU and high-memory CPU instances for resource-intensive tasks
- Use your preferred development environment (VS Code, external notebooks, etc.)
- Maintain flexibility with custom dependencies and packages
- (Coming soon) Scale workloads across multiple nodes effortlessly

Whether you're looking to productionize your ML workflows or prefer working in
your own development environment, Snowflake ML Jobs provides the same powerful
capabilities available in Snowflake Notebooks in a more flexible, integration-friendly
format.

## Setup

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add an intro section here so we put these steps into context? Like "The remote ML execution framework relies on SPCS jobs (link to doc) to execute the user's code inside the Container Runtime environment..."

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also let's inform the user that these below steps are steps to get your environment in place to use our remote execution framework

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that in a separate document. I'm envisioning this README as a super concise "getting started" guide with the assumption that the user already wants to use headless. Overall we'd probably want each of these as separate documents:

  1. Overview - provide background, motivations, and introduce concepts. Similar to https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-runtime-ml
  2. API Reference - e.g. https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/index
  3. Quick start (i.e. this README) - short technical guide to get user up and running in <5 minutes, with pointers to additional resources for more advanced usage
  4. Tutorials (e.g. pytorch-cifar10/README.md and xgb-loan-apps/README.md) - full end-to-end walkthroughs

The intro sections would go in No.1 (Overview). WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need proper docs for this regardless. We could provide a lightweight overview just so users aren't dropped in cold?

The Runtime Job API (`snowflake.ml.jobs`) API is available in
`snowflake-ml-python>=1.7.4`.

```bash
pip install snowflake-ml-python>=1.7.4
```

> NOTE: The Runtime Job API currently only supports Python 3.10.
Attempting to use the API with a different Python version may yield
unexpected errors.

The Runtime Job API jobs requires the `ENABLE_SNOWSERVICES_ASYNC_JOBS`
to be enabled in your Snowflake account or session.


```sql
-- Enable for session
ALTER SESSION SET ENABLE_SNOWSERVICES_ASYNC_JOBS = TRUE;

-- Enable for account (requires ACCOUNTADMIN)
ALTER ACCOUNT SET ENABLE_SNOWSERVICES_ASYNC_JOBS = TRUE;
```

## Getting Started

### Prerequisites

Create a compute pool if you don't already have one ready to use.

```sql
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL -- Customize as desired
MIN_NODES = 1
MAX_NODES = 1 -- Increase if more concurrency desired
INSTANCE_FAMILY = CPU_X64_S -- See https://docs.snowflake.com/en/sql-reference/sql/create-compute-pool
```

### Function Dispatch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also include an intro here.

Maybe you can pull from the PRD. So say something like "Users who are looking to gain the benefits offered by Snowflake's Container Runtime for ML (link to docs), including flexibility with packages, choice of CPU vs GPUs, and ability to use distributed APIs to scale your workloads, but want to do so from their own IDE can instrument their code to execute remotely..."


Python functions can be executed as Runtime Jobs using the `snowflake.ml.jobs.remote`
decorator.

```python
from snowflake.ml.jobs import remote

compute_pool = "MY_COMPUTE_POOL"
@remote(compute_pool, stage_name="payload_stage")
def hello_world(name: str = "world"):
# We recommend importing any needed modules *inside* the function definition
import datetime

print(f"{datetime.now()} Hello {name}!")

# Function invocation returns a job handle (snowflake.ml.jobs.MLJob)
job = hello_world("developer")

print(job.id) # Jobs are given unique IDs
print(job.status) # Check job status
print(job.get_logs()) # Check job's console logs
print(job.wait(timeout=10)) # Block until job completion with optional timeout
```

> NOTE: Compute pool startup can take several minutes and can cause job execution
to be delayed; subsequent job executions should start much faster.
Consider manually starting the compute pool using
`ALTER COMPUTE POOL <POOL_NAME> RESUME` prior to job execution.

### File-based Dispatch

The API also supports submitting entire Python files for execution for more
flexibility.

```python
# /path/to/repo/my_script.py

def main(*args):
print("Hello world", *args)

if __name__ == '__main__':
import sys
main(*sys.argv[1:])
```

```python
from snowflake.ml.jobs import submit_file, submit_directory

compute_pool = "MY_COMPUTE_POOL"

# Upload and run a single script
job1 = submit_file(
"/path/to/repo/my_script.py",
compute_pool,
stage_name="payload_stage",
args=["arg1", "--arg2_key", "arg2_value"], # (Optional) args are passed to script as-is
)

# Upload an entire directory and run a contained entrypoint
# This is useful if your code is organized into multiple modules/files
job2 = submit_directory(
"/path/to/repo/",
compute_pool,
entrypoint="my_script.py",
stage_name="payload_stage",
args=["arg1", "arg2"], # (Optional) args are passed to script as-is
)
```

`job1` and `job2` are job handles, see [Function Dispatch](#function-dispatch)
for usage examples.

## Advanced Usage

### Custom Python Dependencies

The Runtime Job API runs payloads inside the Snowflake
[Container Runtime for ML](https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-runtime-ml)
environment which comes pre-installed with most commonly used Python packages
for machine learning and data science. Most use cases should work "out of the box"
with no additional Python packages needed. If custom dependencies are required,
the API supports specifying `pip_requirements` which will be installed on runtime
startup.

Installing packages to the runtime environment requires an
[External Access Integration](https://docs.snowflake.com/en/developer-guide/external-network-access/creating-using-external-network-access)

```sql
-- Requires ACCOUNTADMIN
-- Snowflake provides a pre-configured network rule for PyPI access
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
ENABLED = true;
GRANT USAGE ON INTEGRATION PYPI_EAI TO ROLE <role_name>;
```

```python
from snowflake.ml.jobs import remote, submit_file, submit_directory

compute_pool = "MY_COMPUTE_POOL"

# Example only; numpy is already installed in the runtime environment by default
@remote(
compute_pool,
stage_name="payload_stage",
pip_requirements=["numpy"],
external_access_integrations=["pypi_eai"],
)
def hello_world(name: str = "world"):
# We recommend importing any needed modules *inside* the function definition
import datetime

print(f"{datetime.now()} Hello {name}!")

job1 = hello_world("developer")

# Can use standard pip/requirements syntax to specify versions
job2 = submit_file(
"/path/to/repo/my_script.py",
compute_pool,
stage_name="payload_stage",
pip_requirements=["numpy==2.2.*"],
external_access_integrations=["pypi_eai"],
)

# Can provide PIP_INDEX_URL to install packages from private source(s)
job3 = submit_directory(
"/path/to/repo/",
compute_pool,
entrypoint="my_script.py",
stage_name="payload_stage",
pip_requirements=["custom-package"],
external_access_integrations=["custom_feed_eai"], # Configure EAI as needed
env_vars={'PIP_INDEX_URL': 'https://my-private-pypi-server.com/simple'},
)
```

### Airflow Integration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well. Let's provide an intro for why this is different than any other Airflow integration. i.e. Building an ML pipeline so that steps in the workflow can execute in the Container Runtime () with benefits such as ...


The Runtime Job API can be used in Airflow using the
[SnowparkOperator](https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/operators/snowpark.html).

> NOTE: Make sure `snowflake-ml-python>=1.7.4` is installed in your Airflow worker environment(s)

```python
import datetime
from airflow.decorators import dag, task
from snowflake.ml import remote, submit_file

@dag(start_date=datetime.datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task.snowpark()
def task_from_function():
# SnowparkOperator automatically creates a Snowpark Session
# which the Runtime Job API can infer from context
@remote("HEADLESS_JOB_POOL", stage_name="payload_stage")
def my_function():
print("Hello world")
job = my_function()
print("Job %s submitted" % job.id)
print("Job %s ended with status %s. Logs:\n" % (job.id, job.wait(), job.get_logs()))

@task.snowpark()
def task_from_file():
# SnowparkOperator automatically creates a Snowpark Session
# which the Runtime Job API can infer from context
job = submit_file(
"./my_script.py",
"HEADLESS_JOB_POOL",
stage_name="payload_stage",
)
print("Job %s submitted" % job.id)
print("Job %s ended with status %s. Logs:\n" % (job.id, job.wait(), job.get_logs()))

task_from_function()
task_from_file()

my_dag()
```

## Next Steps

- See the [XGBoost Classifier Example](./single-node/xgb-loan-apps/) for a full
walkthrough of training and deploying an XGBoost model.
- See the [PyTorch Classifier Example](./single-node/pytorch-cifar10/) for a full
walkthrough of training a PyTorch model with Weights and Biases integration

## Known Limitations

1. The Headless Runtime currently only supports Python 3.10. Attempting to use
other Python versions may throw errors like `UnpicklingError`.
1. Running a large number of jobs can result in service start failure due to
`Number of services limit exceeded for the account`. This will be fixed in an upcoming release.
- This prevents any kind of SPCS service from starting on the account, including Notebooks and Model Inference
- As a temporary workaround, please avoid launching more than 200 concurrent
jobs and manually delete completed and failed jobs.
```sql
SHOW JOB SERVICES LIKE 'MLJOB%';
DROP SERVICE <service_name>;
```
```python
from snowflake.ml.jobs import list_jobs, delete_job
for row in list_jobs(limit=-1).collect():
if row["status"] in {"DONE", "FAILED"}:
delete_job(row["id"])
```
1. Job logs are lost upon compute pool suspension even if the job entity itself has not been deleted.
This may happen either due to manual suspension `ALTER COMPUTE POOL MY_POOL SUSPEND`
or auto suspension on idle timeout.
- Compute pool auto suspension can be disabled using `ALTER COMPUTE POOL MY_POOL SET AUTO_SUSPEND_SECS = 0`
- For more information, see
[Compute pool privileges](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/working-with-compute-pool#compute-pool-privileges)
and [Compute pool cost](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/accounts-orgs-usage-views#compute-pool-cost)
1. Job payload stages (specified via `stage_name` param) are not automatically
cleaned up. Please manually clean up the payload stage(s) to prevent
excessive storage costs.
115 changes: 115 additions & 0 deletions samples/ml/ml_jobs/single-node/pytorch-cifar10/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# PyTorch Classifier Example

## Setup

Install Python requirements using `pip install -r requirements.txt` from the sample directory.

## Training Script

The model training code is located in the [src directory](./src/) and contains multiple files:
- A pip `requirements.txt` file
- `model.py` contains the model definition
- `train.py` contains basic training logic
- `train_wandb.py` is a variant of `train.py` with added [Weights and Biases integration](#weights-and-biases-integration)

### Upload data to stage

Manually download the CIFAR-10 datasets and upload them to a stage.

Python:
```python
import torchvision
import torchvision.transforms as transforms

transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

trainset = torchvision.datasets.CIFAR10(root="./data/cifar10", train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR10(root="./data/cifar10", train=False, download=True, transform=transform)
```

Shell:
```bash
snow stage create cifar10_data
snow stage copy ./data/cifar10 @cifar10_data --recursive
```

## Launch Job

Scripts needed to launch model training as a job are included in the
[scripts/](./scripts) directory. It includes:
- `submit_job.py` facilitates deploying the training scripts as a job
- `check_job.py` can be used to inspect running and recently completed jobs.
- `job_spec.yaml` includes additional service specification values to demonstrate
using the `spec_override` parameter of the ML Job API.

```bash
python scripts/submit_job.py -h # View available options
python scripts/submit_job.py -p DEMO_POOL_CPU -e pypi_eai # Basic run
python scripts/submit_job.py -p DEMO_POOL_CPU -e pypi_eai -c preprod8 --epochs 1 # Quick run on preprod8 for debugging
```

The API uploads the payload from `./src/` into a Snowflake stage and generates a
[service specification](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/specification-reference)
configured to mount the stage and run the entrypoint `train.py`.
In this example, we additionally provide a `job_spec.yml` used to mount the `@cifar10_data`
stage created in the [Upload Data to Stage](#upload-data-to-stage) step.
Finally, the job is executed as an SPCS [JOB SERVICE](https://docs.snowflake.com/en/sql-reference/sql/execute-job-service) on `DEMO_POOL_CPU`.

> NOTE: There is currently no automatic cleanup of the uploaded payloads. Be sure to
manually clean up the payload stage(s) to prevent unintended storage costs.

The above command(s) will print a message like `Started job with ID: MLJOB_(uuid)`.
You can check on the progress of the submitted job using [scripts/check_job.py](./scripts/check_job.py)

```bash
python scripts/check_job.py MLJOB_2CECA414_F52E_4F45_840A_9623A52DA5C4
python scripts/check_job.py MLJOB_2CECA414_F52E_4F45_840A_9623A52DA5C4 -c preprod8 # Use preprod8 connection
python scripts/check_job.py MLJOB_2CECA414_F52E_4F45_840A_9623A52DA5C4 --show-logs # Print any job execution logs
python scripts/check_job.py MLJOB_2CECA414_F52E_4F45_840A_9623A52DA5C4 --block # Block until job completion
```

## Weights and Biases Integration

Integrating with Weights and Biases (W&B) requires adding just a
[few lines of code](https://docs.wandb.ai/quickstart/#putting-it-all-together) to the training script.
[train_wandb.py](./src/train_wandb.py) is an updated version of the training script with W&B integration included.

Configure an External Access Integration to allow your SPCS service to connect to the W&B servers.

SQL:

```sql
CREATE OR REPLACE NETWORK RULE WANDB_RULE
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('api.wandb.ai:443');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION WANDB_EAI
ALLOWED_NETWORK_RULES = (WANDB_RULE)
ENABLED = true;
GRANT USAGE ON INTEGRATION WANDB_EAI TO ROLE <role_name>;
```

Configure W&B authentication by securely injecting your W&B API key as a [Snowflake Secrets](https://docs.snowflake.com/en/user-guide/api-authentication#managing-secrets).

SQL:

```sql
CREATE OR REPLACE SECRET WANDB_SECRET
TYPE = GENERIC_STRING
SECRET_STRING = '<api_key>';
```

Include the newly configured External Access Integration and Secret when running
[submit_job.py](./scripts/submit_job.py):

```bash
python scripts/submit_job.py \
-p DEMO_POOL_CPU \
-e pypi_eai wandb_eai \
-c preprod8 \
--wandb-secret-name wandb_secret
```
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
snowflake-ml-python>=1.7.4
Loading