Skip to content

Commit

Permalink
change the defaults of the pool granularity back to op (#27839)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Because switching the enforcement granularity in cloud is a host-cloud
process, this is a big change. This walks back some of the concurrency
defaults to be in line with current behavior

## How I Tested These Changes
BK
  • Loading branch information
prha authored Feb 13, 2025
1 parent b40ba28 commit 64362fc
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 31 deletions.
33 changes: 19 additions & 14 deletions docs/docs/guides/operate/managing-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ concurrency:
max_concurrent_runs: 15
```
## Limit the number of runs that can be in progress for a set of ops
## Limit the number of assets or ops actively executing across all runs
You can assign assets and ops to concurrency pools which allow you to limit the number of in progress runs containing those assets or ops. You first assign your asset or op to a concurrency pool using the `pool` keyword argument.
You can assign assets and ops to concurrency pools which allow you to limit the number of in progress op executions across all runs. You first assign your asset or op to a concurrency pool using the `pool` keyword argument.

<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/operate/concurrency-pool-api.py" language="python" title="Specifying pools on assets and ops" />

Expand All @@ -47,6 +47,23 @@ To specify a limit for the pool "database" using the CLI, use:
dagster instance concurrency set database 1
```

## Limit the number of runs that can be in progress for a set of ops

You can also use concurrency pools to limit the number of in progress runs containing those assets or ops. You can follow the steps in the [Limit the number of assets or ops actively in execution across all runs](#limit-the-number-of-assets-or-ops-actively-executing-across-all-runs) section to assign your assets and ops to pools and to configure the desired limit.

Once you have assigned your assets and ops to your pool, you can change your deployment settings to set the pool enforcement granularity. To limit the total number of runs containing a specific op at any given time (instead of the total number of ops actively executing), we need to set the pool granularity to `run`.

* Dagster Core, add the following to your [dagster.yaml](/guides/deploy/dagster-yaml)
* In Dagster+, add the following to your [deployment settings](/dagster-plus/deployment/management/deployments/deployment-settings-reference)

```yaml
concurrency:
pools:
granularity: 'run'
```

Without this granularity set, the default granularity is set to the `op`. This means that for a pool `foo` with a limit `1`, we enforce that only one op is executing at a given time across all runs, but the number of runs in progress is unaffected by the pool limit.

### Setting a default limit for concurrency pools

* Dagster+: Edit the `concurrency` config in deployment settings via the [Dagster+ UI](/guides/operate/webserver) or the [`dagster-cloud` CLI](/dagster-plus/deployment/management/dagster-cloud-cli/).
Expand Down Expand Up @@ -88,18 +105,6 @@ concurrency:
limit: 10
```

## [Advanced] Limit the number of assets or ops actively in execution across a large set of runs

For deployments with complex jobs containing many ops, blocking entire runs for a small number of concurrency-limited ops may be too coarse-grained for your requirements. Instead of enforcing concurrency limits at the run level, Dagster will ensure that the concurrency limit will be applied at the individual asset or op execution level. This means that if one run completes its materialization of a pool's asset, a materialization of another pool asset in a different run may begin even if the first run is still in progress.

You can set the granularity of the concurrency limit enforcement to be at the op level instead of at the run level:

```yaml
concurrency:
pools:
granularity: op
```

## Prevent runs from starting if another run is already occurring (advanced)

You can use Dagster's rich metadata to use a schedule or a sensor to only start a run when there are no currently running jobs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ export const InstanceConcurrencyKeyInfo = ({concurrencyKey}: {concurrencyKey: st
<tbody>
<tr>
<td style={{verticalAlign: 'middle'}}>Granularity</td>
<td>{granularity === 'op' ? 'Op' : 'Run'}</td>
<td>{granularity === 'run' ? 'Run' : 'Op'}</td>
</tr>
<tr>
<td style={{verticalAlign: 'middle'}}>Limit</td>
Expand Down Expand Up @@ -187,7 +187,7 @@ export const InstanceConcurrencyKeyInfo = ({concurrencyKey}: {concurrencyKey: st
</tbody>
</MetadataTableWIP>
</Box>
{data?.instance.poolConfig?.poolGranularity === 'op' ? (
{data?.instance.poolConfig?.poolGranularity !== 'run' ? (
<>
<Box
padding={{vertical: 16, horizontal: 24}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def get_steps_to_execute(
step_priority = 0

if not self._instance_concurrency_context.claim(
concurrency_key, step.key, step_priority, is_legacy_tag=not step.pool
concurrency_key, step.key, step_priority
):
continue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,13 @@ def claim(
concurrency_key: str,
step_key: str,
step_priority: int = 0,
is_legacy_tag: bool = False,
) -> bool:
if not self._instance.event_log_storage.supports_global_concurrency_limits:
return True

if self._pool_config.pool_granularity == PoolGranularity.RUN or (
self._pool_config.pool_granularity is None and not is_legacy_tag
):
# short-circuit the claiming of the global op concurrency slot
# no need to reset pending claims here since the pool config doesn't change over the
# lifetime of the context
if self._pool_config.pool_granularity == PoolGranularity.RUN:
# with pool granularity set to run, we don't need to enforce the global concurrency
# limits here, since we're already in a launched run.
return True

if step_key in self._pending_claims:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def __init__(
self._launched_pool_counts = defaultdict(int)
self._in_progress_pool_counts = defaultdict(int)
self._slot_count_offset = slot_count_offset
self._pool_granularity = pool_granularity if pool_granularity else PoolGranularity.RUN
self._pool_granularity = pool_granularity if pool_granularity else PoolGranularity.OP
self._in_progress_run_ids: set[str] = set(
[record.dagster_run.run_id for record in in_progress_run_records]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,7 @@ def __init__(self, interval: float):
def global_concurrency_keys(self) -> set[str]:
return {"foo"}

def claim(
self, concurrency_key: str, step_key: str, priority: int = 0, is_legacy_tag: bool = False
):
def claim(self, concurrency_key: str, step_key: str, priority: int = 0):
self._pending_claims.add(step_key)
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,7 @@ def test_default_granularity(concurrency_instance_default_granularity):

with InstanceConcurrencyContext(concurrency_instance_default_granularity, run) as context:
assert context.claim("foo", "a")
assert context.claim("foo", "b")
assert context.claim("foo", "c", is_legacy_tag=True)
assert not context.claim("foo", "d", is_legacy_tag=True)
assert not context.claim("foo", "b")

foo_info = concurrency_instance_default_granularity.event_log_storage.get_concurrency_info(
"foo"
Expand Down

2 comments on commit 64362fc

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-2bcg2lxac-elementl.vercel.app

Built with commit 64362fc.
This pull request is being automatically deployed with vercel-action

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-2rxva3dwm-elementl.vercel.app

Built with commit 64362fc.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.