From 1827578bdabae8e94fb3086f90258b3e4bc7756f Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 27 Jan 2025 11:34:01 -0800 Subject: [PATCH] Make list dags batchable --- .../dagster/_core/workspace/context.py | 2 + .../dagster_airlift/core/airflow_instance.py | 44 ++++++++++++------- .../integration_tests/test_api.py | 21 +++++++++ 3 files changed, 50 insertions(+), 17 deletions(-) create mode 100644 python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink_tests/integration_tests/test_api.py diff --git a/python_modules/dagster/dagster/_core/workspace/context.py b/python_modules/dagster/dagster/_core/workspace/context.py index 503e659c80a45..cc8e006a7dc9a 100644 --- a/python_modules/dagster/dagster/_core/workspace/context.py +++ b/python_modules/dagster/dagster/_core/workspace/context.py @@ -86,9 +86,11 @@ T = TypeVar("T") + def _get_webserver_grpc_server_heartbeat_ttl() -> int: return int(os.getenv("DAGSTER_WEBSERVER_GRPC_SERVER_HEARTBEAT_TTL", "45")) + WEBSERVER_GRPC_SERVER_HEARTBEAT_TTL = _get_webserver_grpc_server_heartbeat_ttl() diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_instance.py b/python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_instance.py index b0f19a2344d28..9a35de3005065 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_instance.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_instance.py @@ -23,6 +23,7 @@ # This corresponds directly to the page_limit parameter on airflow's batch dag runs rest API. # Airflow dag run batch API: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dag_runs_batch DEFAULT_BATCH_DAG_RUNS_LIMIT = 100 +DEFAULT_DAG_LIST_LIMIT = 100 SLEEP_SECONDS = 1 @@ -61,11 +62,13 @@ def __init__( name: str, batch_task_instance_limit: int = DEFAULT_BATCH_TASK_RETRIEVAL_LIMIT, batch_dag_runs_limit: int = DEFAULT_BATCH_DAG_RUNS_LIMIT, + dag_list_limit: int = DEFAULT_DAG_LIST_LIMIT, ) -> None: self.auth_backend = auth_backend self.name = check_valid_name(name) self.batch_task_instance_limit = batch_task_instance_limit self.batch_dag_runs_limit = batch_dag_runs_limit + self.dag_list_limit = dag_list_limit @property def normalized_name(self) -> str: @@ -75,24 +78,31 @@ def get_api_url(self) -> str: return f"{self.auth_backend.get_webserver_url()}/api/v1" def list_dags(self) -> list["DagInfo"]: - response = self.auth_backend.get_session().get( - f"{self.get_api_url()}/dags", params={"limit": 1000} - ) - if response.status_code == 200: - dags = response.json() - webserver_url = self.auth_backend.get_webserver_url() - return [ - DagInfo( - webserver_url=webserver_url, - dag_id=dag["dag_id"], - metadata=dag, - ) - for dag in dags["dags"] - ] - else: - raise DagsterError( - f"Failed to fetch DAGs. Status code: {response.status_code}, Message: {response.text}" + dag_responses = [] + webserver_url = self.auth_backend.get_webserver_url() + while True: + prev_len = len(dag_responses) + response = self.auth_backend.get_session().get( + f"{self.get_api_url()}/dags", + params={"limit": self.dag_list_limit, "offset": len(dag_responses)}, ) + if response.status_code == 200: + dags = response.json() + dag_responses.extend( + DagInfo( + webserver_url=webserver_url, + dag_id=dag["dag_id"], + metadata=dag, + ) + for dag in dags["dags"] + ) + if len(dag_responses) - prev_len == 0: + break + else: + raise DagsterError( + f"Failed to fetch DAGs. Status code: {response.status_code}, Message: {response.text}" + ) + return dag_responses def list_variables(self) -> list[dict[str, Any]]: response = self.auth_backend.get_session().get(f"{self.get_api_url()}/variables") diff --git a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink_tests/integration_tests/test_api.py b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink_tests/integration_tests/test_api.py new file mode 100644 index 0000000000000..d7bb8d2faf022 --- /dev/null +++ b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink_tests/integration_tests/test_api.py @@ -0,0 +1,21 @@ +from dagster_airlift.core.airflow_instance import AirflowInstance +from dagster_airlift.core.basic_auth import AirflowBasicAuthBackend +from kitchen_sink.airflow_instance import ( + AIRFLOW_BASE_URL, + AIRFLOW_INSTANCE_NAME, + PASSWORD, + USERNAME, +) + + +def test_configure_dag_list_limit(airflow_instance: None) -> None: + """Test that batch instance logic correctly retrieves all dags when over batch limit.""" + af_instance = AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD + ), + name=AIRFLOW_INSTANCE_NAME, + # Set low list limit, force batched retrieval. + dag_list_limit=1, + ) + assert len(af_instance.list_dags()) == 16