Skip to content

Commit

Permalink
Make list dags batchable
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Jan 31, 2025
1 parent fd2da69 commit 1827578
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 17 deletions.
2 changes: 2 additions & 0 deletions python_modules/dagster/dagster/_core/workspace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@

T = TypeVar("T")


def _get_webserver_grpc_server_heartbeat_ttl() -> int:
return int(os.getenv("DAGSTER_WEBSERVER_GRPC_SERVER_HEARTBEAT_TTL", "45"))


WEBSERVER_GRPC_SERVER_HEARTBEAT_TTL = _get_webserver_grpc_server_heartbeat_ttl()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# This corresponds directly to the page_limit parameter on airflow's batch dag runs rest API.
# Airflow dag run batch API: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag_runs_batch
DEFAULT_BATCH_DAG_RUNS_LIMIT = 100
DEFAULT_DAG_LIST_LIMIT = 100
SLEEP_SECONDS = 1


Expand Down Expand Up @@ -61,11 +62,13 @@ def __init__(
name: str,
batch_task_instance_limit: int = DEFAULT_BATCH_TASK_RETRIEVAL_LIMIT,
batch_dag_runs_limit: int = DEFAULT_BATCH_DAG_RUNS_LIMIT,
dag_list_limit: int = DEFAULT_DAG_LIST_LIMIT,
) -> None:
self.auth_backend = auth_backend
self.name = check_valid_name(name)
self.batch_task_instance_limit = batch_task_instance_limit
self.batch_dag_runs_limit = batch_dag_runs_limit
self.dag_list_limit = dag_list_limit

@property
def normalized_name(self) -> str:
Expand All @@ -75,24 +78,31 @@ def get_api_url(self) -> str:
return f"{self.auth_backend.get_webserver_url()}/api/v1"

def list_dags(self) -> list["DagInfo"]:
response = self.auth_backend.get_session().get(
f"{self.get_api_url()}/dags", params={"limit": 1000}
)
if response.status_code == 200:
dags = response.json()
webserver_url = self.auth_backend.get_webserver_url()
return [
DagInfo(
webserver_url=webserver_url,
dag_id=dag["dag_id"],
metadata=dag,
)
for dag in dags["dags"]
]
else:
raise DagsterError(
f"Failed to fetch DAGs. Status code: {response.status_code}, Message: {response.text}"
dag_responses = []
webserver_url = self.auth_backend.get_webserver_url()
while True:
prev_len = len(dag_responses)
response = self.auth_backend.get_session().get(
f"{self.get_api_url()}/dags",
params={"limit": self.dag_list_limit, "offset": len(dag_responses)},
)
if response.status_code == 200:
dags = response.json()
dag_responses.extend(
DagInfo(
webserver_url=webserver_url,
dag_id=dag["dag_id"],
metadata=dag,
)
for dag in dags["dags"]
)
if len(dag_responses) - prev_len == 0:
break
else:
raise DagsterError(
f"Failed to fetch DAGs. Status code: {response.status_code}, Message: {response.text}"
)
return dag_responses

def list_variables(self) -> list[dict[str, Any]]:
response = self.auth_backend.get_session().get(f"{self.get_api_url()}/variables")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dagster_airlift.core.airflow_instance import AirflowInstance
from dagster_airlift.core.basic_auth import AirflowBasicAuthBackend
from kitchen_sink.airflow_instance import (
AIRFLOW_BASE_URL,
AIRFLOW_INSTANCE_NAME,
PASSWORD,
USERNAME,
)


def test_configure_dag_list_limit(airflow_instance: None) -> None:
"""Test that batch instance logic correctly retrieves all dags when over batch limit."""
af_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
# Set low list limit, force batched retrieval.
dag_list_limit=1,
)
assert len(af_instance.list_dags()) == 16

0 comments on commit 1827578

Please sign in to comment.