From 64362fc78bf38c556b52e53e837d16473d6306a7 Mon Sep 17 00:00:00 2001
From: prha <1040172+prha@users.noreply.github.com>
Date: Thu, 13 Feb 2025 11:15:07 -0800
Subject: [PATCH] change the defaults of the pool granularity back to `op`
(#27839)
## Summary & Motivation
Because switching the enforcement granularity in cloud is a host-cloud
process, this is a big change. This walks back some of the concurrency
defaults to be in line with current behavior
## How I Tested These Changes
BK
---
.../guides/operate/managing-concurrency.md | 33 +++++++++++--------
.../instance/InstanceConcurrencyKeyInfo.tsx | 4 +--
.../dagster/_core/execution/plan/active.py | 2 +-
.../plan/instance_concurrency_context.py | 10 ++----
.../_core/op_concurrency_limits_counter.py | 2 +-
.../core_tests/execution_tests/test_active.py | 4 +--
.../test_instance_concurrency_context.py | 4 +--
7 files changed, 28 insertions(+), 31 deletions(-)
diff --git a/docs/docs/guides/operate/managing-concurrency.md b/docs/docs/guides/operate/managing-concurrency.md
index 04577a00129ad..2d7c4dc45cb44 100644
--- a/docs/docs/guides/operate/managing-concurrency.md
+++ b/docs/docs/guides/operate/managing-concurrency.md
@@ -25,9 +25,9 @@ concurrency:
max_concurrent_runs: 15
```
-## Limit the number of runs that can be in progress for a set of ops
+## Limit the number of assets or ops actively executing across all runs
-You can assign assets and ops to concurrency pools which allow you to limit the number of in progress runs containing those assets or ops. You first assign your asset or op to a concurrency pool using the `pool` keyword argument.
+You can assign assets and ops to concurrency pools which allow you to limit the number of in progress op executions across all runs. You first assign your asset or op to a concurrency pool using the `pool` keyword argument.
@@ -47,6 +47,23 @@ To specify a limit for the pool "database" using the CLI, use:
dagster instance concurrency set database 1
```
+## Limit the number of runs that can be in progress for a set of ops
+
+You can also use concurrency pools to limit the number of in progress runs containing those assets or ops. You can follow the steps in the [Limit the number of assets or ops actively in execution across all runs](#limit-the-number-of-assets-or-ops-actively-executing-across-all-runs) section to assign your assets and ops to pools and to configure the desired limit.
+
+Once you have assigned your assets and ops to your pool, you can change your deployment settings to set the pool enforcement granularity. To limit the total number of runs containing a specific op at any given time (instead of the total number of ops actively executing), we need to set the pool granularity to `run`.
+
+* Dagster Core, add the following to your [dagster.yaml](/guides/deploy/dagster-yaml)
+* In Dagster+, add the following to your [deployment settings](/dagster-plus/deployment/management/deployments/deployment-settings-reference)
+
+```yaml
+concurrency:
+ pools:
+ granularity: 'run'
+```
+
+Without this granularity set, the default granularity is set to the `op`. This means that for a pool `foo` with a limit `1`, we enforce that only one op is executing at a given time across all runs, but the number of runs in progress is unaffected by the pool limit.
+
### Setting a default limit for concurrency pools
* Dagster+: Edit the `concurrency` config in deployment settings via the [Dagster+ UI](/guides/operate/webserver) or the [`dagster-cloud` CLI](/dagster-plus/deployment/management/dagster-cloud-cli/).
@@ -88,18 +105,6 @@ concurrency:
limit: 10
```
-## [Advanced] Limit the number of assets or ops actively in execution across a large set of runs
-
-For deployments with complex jobs containing many ops, blocking entire runs for a small number of concurrency-limited ops may be too coarse-grained for your requirements. Instead of enforcing concurrency limits at the run level, Dagster will ensure that the concurrency limit will be applied at the individual asset or op execution level. This means that if one run completes its materialization of a pool's asset, a materialization of another pool asset in a different run may begin even if the first run is still in progress.
-
-You can set the granularity of the concurrency limit enforcement to be at the op level instead of at the run level:
-
-```yaml
-concurrency:
- pools:
- granularity: op
-```
-
## Prevent runs from starting if another run is already occurring (advanced)
You can use Dagster's rich metadata to use a schedule or a sensor to only start a run when there are no currently running jobs.
diff --git a/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceConcurrencyKeyInfo.tsx b/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceConcurrencyKeyInfo.tsx
index e296d48fbd497..327a527d73f7a 100644
--- a/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceConcurrencyKeyInfo.tsx
+++ b/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceConcurrencyKeyInfo.tsx
@@ -158,7 +158,7 @@ export const InstanceConcurrencyKeyInfo = ({concurrencyKey}: {concurrencyKey: st
Granularity |
- {granularity === 'op' ? 'Op' : 'Run'} |
+ {granularity === 'run' ? 'Run' : 'Op'} |
Limit |
@@ -187,7 +187,7 @@ export const InstanceConcurrencyKeyInfo = ({concurrencyKey}: {concurrencyKey: st
- {data?.instance.poolConfig?.poolGranularity === 'op' ? (
+ {data?.instance.poolConfig?.poolGranularity !== 'run' ? (
<>
bool:
if not self._instance.event_log_storage.supports_global_concurrency_limits:
return True
- if self._pool_config.pool_granularity == PoolGranularity.RUN or (
- self._pool_config.pool_granularity is None and not is_legacy_tag
- ):
- # short-circuit the claiming of the global op concurrency slot
- # no need to reset pending claims here since the pool config doesn't change over the
- # lifetime of the context
+ if self._pool_config.pool_granularity == PoolGranularity.RUN:
+ # with pool granularity set to run, we don't need to enforce the global concurrency
+ # limits here, since we're already in a launched run.
return True
if step_key in self._pending_claims:
diff --git a/python_modules/dagster/dagster/_core/op_concurrency_limits_counter.py b/python_modules/dagster/dagster/_core/op_concurrency_limits_counter.py
index 22513545fb8d8..8861a6a5df014 100644
--- a/python_modules/dagster/dagster/_core/op_concurrency_limits_counter.py
+++ b/python_modules/dagster/dagster/_core/op_concurrency_limits_counter.py
@@ -79,7 +79,7 @@ def __init__(
self._launched_pool_counts = defaultdict(int)
self._in_progress_pool_counts = defaultdict(int)
self._slot_count_offset = slot_count_offset
- self._pool_granularity = pool_granularity if pool_granularity else PoolGranularity.RUN
+ self._pool_granularity = pool_granularity if pool_granularity else PoolGranularity.OP
self._in_progress_run_ids: set[str] = set(
[record.dagster_run.run_id for record in in_progress_run_records]
)
diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_active.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_active.py
index b82d5c1835067..184349462d151 100644
--- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_active.py
+++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_active.py
@@ -252,9 +252,7 @@ def __init__(self, interval: float):
def global_concurrency_keys(self) -> set[str]:
return {"foo"}
- def claim(
- self, concurrency_key: str, step_key: str, priority: int = 0, is_legacy_tag: bool = False
- ):
+ def claim(self, concurrency_key: str, step_key: str, priority: int = 0):
self._pending_claims.add(step_key)
return False
diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py
index 7b1400e3f2293..92a79adc4bbc9 100644
--- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py
+++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py
@@ -425,9 +425,7 @@ def test_default_granularity(concurrency_instance_default_granularity):
with InstanceConcurrencyContext(concurrency_instance_default_granularity, run) as context:
assert context.claim("foo", "a")
- assert context.claim("foo", "b")
- assert context.claim("foo", "c", is_legacy_tag=True)
- assert not context.claim("foo", "d", is_legacy_tag=True)
+ assert not context.claim("foo", "b")
foo_info = concurrency_instance_default_granularity.event_log_storage.get_concurrency_info(
"foo"